Skip to content

Commit aedea1f

Browse files
author
igor_bolotin
committed
Throw an exception instead of trying to transmit a header frame that
exceeds max frame size
1 parent e0d79c7 commit aedea1f

File tree

2 files changed

+56
-3
lines changed

2 files changed

+56
-3
lines changed

src/main/java/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,22 @@ public void transmit(AMQChannel channel) throws IOException {
101101

102102
synchronized (assembler) {
103103
Method m = this.assembler.getMethod();
104-
connection.writeFrame(m.toFrame(channelNumber));
105104
if (m.hasContent()) {
106105
byte[] body = this.assembler.getContentBody();
107106

108-
connection.writeFrame(this.assembler.getContentHeader()
109-
.toFrame(channelNumber, body.length));
107+
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
110108

111109
int frameMax = connection.getFrameMax();
112110
int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
113111
- EMPTY_FRAME_SIZE;
114112

113+
if (headerFrame.size() > frameMax) {
114+
throw new IllegalArgumentException("Content headers exceeded max frame size: " +
115+
headerFrame.size() + " > " + frameMax);
116+
}
117+
connection.writeFrame(m.toFrame(channelNumber));
118+
connection.writeFrame(headerFrame);
119+
115120
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
116121
int remaining = body.length - offset;
117122

@@ -121,6 +126,8 @@ public void transmit(AMQChannel channel) throws IOException {
121126
offset, fragmentLength);
122127
connection.writeFrame(frame);
123128
}
129+
} else {
130+
connection.writeFrame(m.toFrame(channelNumber));
124131
}
125132
}
126133

src/test/java/com/rabbitmq/client/test/functional/FrameMax.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717
package com.rabbitmq.client.test.functional;
1818

1919
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertTrue;
2021
import static org.junit.Assert.fail;
2122

23+
import java.io.ByteArrayOutputStream;
24+
import java.io.DataOutputStream;
2225
import java.io.IOException;
2326
import java.net.Socket;
27+
import java.util.HashMap;
2428
import java.util.List;
29+
import java.util.Map;
2530
import java.util.concurrent.ExecutorService;
2631
import java.util.concurrent.TimeoutException;
2732

@@ -35,6 +40,7 @@
3540
import com.rabbitmq.client.GetResponse;
3641
import com.rabbitmq.client.impl.AMQCommand;
3742
import com.rabbitmq.client.impl.AMQConnection;
43+
import com.rabbitmq.client.impl.ContentHeaderPropertyWriter;
3844
import com.rabbitmq.client.impl.Frame;
3945
import com.rabbitmq.client.impl.FrameHandler;
4046
import com.rabbitmq.client.impl.LongStringHelper;
@@ -104,6 +110,46 @@ public FrameMax() {
104110
expectError(AMQP.FRAME_ERROR);
105111
}
106112

113+
/* client should throw exception if headers exceed negotiated
114+
* frame size */
115+
@Test public void rejectHeadersExceedingFrameMax()
116+
throws IOException, TimeoutException {
117+
declareTransientTopicExchange("x");
118+
119+
String queueName = channel.queueDeclare().getQueue();
120+
channel.queueBind(queueName, "x", "foobar");
121+
122+
Map<String, Object> headers = new HashMap<String, Object>();
123+
String headerName = "x-huge-header";
124+
headers.put(headerName, "");
125+
126+
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build();
127+
128+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
129+
properties.writePropertiesTo(new ContentHeaderPropertyWriter(new DataOutputStream(baos)));
130+
int overhead = baos.size();
131+
132+
headers.put(headerName, LongStringHelper.asLongString(new byte[REAL_FRAME_MAX - overhead]));
133+
134+
basicPublishVolatile(new byte[100], "x", "foobar", properties);
135+
assertDelivered(queueName, 1);
136+
137+
headers.put(headerName, LongStringHelper.asLongString(new byte[REAL_FRAME_MAX - overhead - 1]));
138+
properties = new AMQP.BasicProperties.Builder().headers(headers).build();
139+
140+
try {
141+
basicPublishVolatile(new byte[100], "x", "foobar", properties);
142+
fail("expected rejectHeadersExceedingFrameMax to throw");
143+
} catch (IllegalArgumentException iae) {
144+
assertTrue(iae.getMessage().startsWith("Content headers exceeded max frame size"));
145+
// check that the channel is still operational
146+
assertDelivered(queueName, 0);
147+
}
148+
149+
deleteExchange("x");
150+
}
151+
152+
107153
/* ConnectionFactory that uses MyFrameHandler rather than
108154
* SocketFrameHandler. */
109155
private static class MyConnectionFactory extends ConnectionFactory {

0 commit comments

Comments
 (0)