Skip to content

Commit 8be7848

Browse files
authored
Merge pull request #363 from ibolotin/headers-frame-too-large
Throw an exception instead of trying to transmit a header frame that exceeds max frame size
2 parents e0d79c7 + 4b8ab88 commit 8be7848

File tree

2 files changed

+57
-3
lines changed

2 files changed

+57
-3
lines changed

src/main/java/com/rabbitmq/client/impl/AMQCommand.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -101,17 +101,22 @@ public void transmit(AMQChannel channel) throws IOException {
101101

102102
synchronized (assembler) {
103103
Method m = this.assembler.getMethod();
104-
connection.writeFrame(m.toFrame(channelNumber));
105104
if (m.hasContent()) {
106105
byte[] body = this.assembler.getContentBody();
107106

108-
connection.writeFrame(this.assembler.getContentHeader()
109-
.toFrame(channelNumber, body.length));
107+
Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
110108

111109
int frameMax = connection.getFrameMax();
112110
int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
113111
- EMPTY_FRAME_SIZE;
114112

113+
if (headerFrame.size() > frameMax) {
114+
throw new IllegalArgumentException("Content headers exceeded max frame size: " +
115+
headerFrame.size() + " > " + frameMax);
116+
}
117+
connection.writeFrame(m.toFrame(channelNumber));
118+
connection.writeFrame(headerFrame);
119+
115120
for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
116121
int remaining = body.length - offset;
117122

@@ -121,6 +126,8 @@ public void transmit(AMQChannel channel) throws IOException {
121126
offset, fragmentLength);
122127
connection.writeFrame(frame);
123128
}
129+
} else {
130+
connection.writeFrame(m.toFrame(channelNumber));
124131
}
125132
}
126133

src/test/java/com/rabbitmq/client/test/functional/FrameMax.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717
package com.rabbitmq.client.test.functional;
1818

1919
import static org.junit.Assert.assertEquals;
20+
import static org.junit.Assert.assertTrue;
2021
import static org.junit.Assert.fail;
2122

23+
import java.io.ByteArrayOutputStream;
24+
import java.io.DataOutputStream;
2225
import java.io.IOException;
2326
import java.net.Socket;
27+
import java.util.HashMap;
2428
import java.util.List;
29+
import java.util.Map;
2530
import java.util.concurrent.ExecutorService;
2631
import java.util.concurrent.TimeoutException;
2732

@@ -35,6 +40,7 @@
3540
import com.rabbitmq.client.GetResponse;
3641
import com.rabbitmq.client.impl.AMQCommand;
3742
import com.rabbitmq.client.impl.AMQConnection;
43+
import com.rabbitmq.client.impl.ContentHeaderPropertyWriter;
3844
import com.rabbitmq.client.impl.Frame;
3945
import com.rabbitmq.client.impl.FrameHandler;
4046
import com.rabbitmq.client.impl.LongStringHelper;
@@ -104,6 +110,47 @@ public FrameMax() {
104110
expectError(AMQP.FRAME_ERROR);
105111
}
106112

113+
/* client should throw exception if headers exceed negotiated
114+
* frame size */
115+
@Test public void rejectHeadersExceedingFrameMax()
116+
throws IOException, TimeoutException {
117+
declareTransientTopicExchange("x");
118+
String queueName = channel.queueDeclare().getQueue();
119+
channel.queueBind(queueName, "x", "foobar");
120+
121+
Map<String, Object> headers = new HashMap<String, Object>();
122+
String headerName = "x-huge-header";
123+
124+
// create headers with zero-length value to calculate maximum header value size before exceeding frame_max
125+
headers.put(headerName, LongStringHelper.asLongString(new byte[0]));
126+
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build();
127+
Frame minimalHeaderFrame = properties.toFrame(0, 0);
128+
int maxHeaderValueSize = FRAME_MAX - minimalHeaderFrame.size();
129+
130+
// create headers with maximum header value size (frame size equals frame_max)
131+
headers.put(headerName, LongStringHelper.asLongString(new byte[maxHeaderValueSize]));
132+
properties = new AMQP.BasicProperties.Builder().headers(headers).build();
133+
134+
basicPublishVolatile(new byte[100], "x", "foobar", properties);
135+
assertDelivered(queueName, 1);
136+
137+
// create headers with frame size exceeding frame_max by 1
138+
headers.put(headerName, LongStringHelper.asLongString(new byte[maxHeaderValueSize + 1]));
139+
properties = new AMQP.BasicProperties.Builder().headers(headers).build();
140+
try {
141+
basicPublishVolatile(new byte[100], "x", "foobar", properties);
142+
fail("expected rejectHeadersExceedingFrameMax to throw");
143+
} catch (IllegalArgumentException iae) {
144+
assertTrue(iae.getMessage().startsWith("Content headers exceeded max frame size"));
145+
// check that the channel is still operational
146+
assertDelivered(queueName, 0);
147+
}
148+
149+
// cleanup
150+
deleteExchange("x");
151+
}
152+
153+
107154
/* ConnectionFactory that uses MyFrameHandler rather than
108155
* SocketFrameHandler. */
109156
private static class MyConnectionFactory extends ConnectionFactory {

0 commit comments

Comments
 (0)