diff --git a/src/main/java/com/rabbitmq/client/impl/AMQCommand.java b/src/main/java/com/rabbitmq/client/impl/AMQCommand.java index 5543ee7c1f..e4457076c9 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQCommand.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQCommand.java @@ -101,17 +101,22 @@ public void transmit(AMQChannel channel) throws IOException { synchronized (assembler) { Method m = this.assembler.getMethod(); - connection.writeFrame(m.toFrame(channelNumber)); if (m.hasContent()) { byte[] body = this.assembler.getContentBody(); - connection.writeFrame(this.assembler.getContentHeader() - .toFrame(channelNumber, body.length)); + Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length); int frameMax = connection.getFrameMax(); int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax - EMPTY_FRAME_SIZE; + if (headerFrame.size() > frameMax) { + throw new IllegalArgumentException("Content headers exceeded max frame size: " + + headerFrame.size() + " > " + frameMax); + } + connection.writeFrame(m.toFrame(channelNumber)); + connection.writeFrame(headerFrame); + for (int offset = 0; offset < body.length; offset += bodyPayloadMax) { int remaining = body.length - offset; @@ -121,6 +126,8 @@ public void transmit(AMQChannel channel) throws IOException { offset, fragmentLength); connection.writeFrame(frame); } + } else { + connection.writeFrame(m.toFrame(channelNumber)); } } diff --git a/src/test/java/com/rabbitmq/client/test/functional/FrameMax.java b/src/test/java/com/rabbitmq/client/test/functional/FrameMax.java index 7e1019c4cb..07d7eb9d9f 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/FrameMax.java +++ b/src/test/java/com/rabbitmq/client/test/functional/FrameMax.java @@ -17,11 +17,16 @@ package com.rabbitmq.client.test.functional; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; @@ -35,6 +40,7 @@ import com.rabbitmq.client.GetResponse; import com.rabbitmq.client.impl.AMQCommand; import com.rabbitmq.client.impl.AMQConnection; +import com.rabbitmq.client.impl.ContentHeaderPropertyWriter; import com.rabbitmq.client.impl.Frame; import com.rabbitmq.client.impl.FrameHandler; import com.rabbitmq.client.impl.LongStringHelper; @@ -104,6 +110,47 @@ public FrameMax() { expectError(AMQP.FRAME_ERROR); } + /* client should throw exception if headers exceed negotiated + * frame size */ + @Test public void rejectHeadersExceedingFrameMax() + throws IOException, TimeoutException { + declareTransientTopicExchange("x"); + String queueName = channel.queueDeclare().getQueue(); + channel.queueBind(queueName, "x", "foobar"); + + Map headers = new HashMap(); + String headerName = "x-huge-header"; + + // create headers with zero-length value to calculate maximum header value size before exceeding frame_max + headers.put(headerName, LongStringHelper.asLongString(new byte[0])); + AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build(); + Frame minimalHeaderFrame = properties.toFrame(0, 0); + int maxHeaderValueSize = FRAME_MAX - minimalHeaderFrame.size(); + + // create headers with maximum header value size (frame size equals frame_max) + headers.put(headerName, LongStringHelper.asLongString(new byte[maxHeaderValueSize])); + properties = new AMQP.BasicProperties.Builder().headers(headers).build(); + + basicPublishVolatile(new byte[100], "x", "foobar", properties); + assertDelivered(queueName, 1); + + // create headers with frame size exceeding frame_max by 1 + headers.put(headerName, LongStringHelper.asLongString(new byte[maxHeaderValueSize + 1])); + properties = new AMQP.BasicProperties.Builder().headers(headers).build(); + try { + basicPublishVolatile(new byte[100], "x", "foobar", properties); + fail("expected rejectHeadersExceedingFrameMax to throw"); + } catch (IllegalArgumentException iae) { + assertTrue(iae.getMessage().startsWith("Content headers exceeded max frame size")); + // check that the channel is still operational + assertDelivered(queueName, 0); + } + + // cleanup + deleteExchange("x"); + } + + /* ConnectionFactory that uses MyFrameHandler rather than * SocketFrameHandler. */ private static class MyConnectionFactory extends ConnectionFactory {