diff --git a/src/main/java/com/rabbitmq/client/impl/AMQCommand.java b/src/main/java/com/rabbitmq/client/impl/AMQCommand.java index e4457076c9..929eaa3804 100644 --- a/src/main/java/com/rabbitmq/client/impl/AMQCommand.java +++ b/src/main/java/com/rabbitmq/client/impl/AMQCommand.java @@ -107,12 +107,12 @@ public void transmit(AMQChannel channel) throws IOException { Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length); int frameMax = connection.getFrameMax(); - int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax - - EMPTY_FRAME_SIZE; + boolean cappedFrameMax = frameMax > 0; + int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length; - if (headerFrame.size() > frameMax) { - throw new IllegalArgumentException("Content headers exceeded max frame size: " + - headerFrame.size() + " > " + frameMax); + if (cappedFrameMax && headerFrame.size() > frameMax) { + String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax); + throw new IllegalArgumentException(msg); } connection.writeFrame(m.toFrame(channelNumber)); connection.writeFrame(headerFrame); diff --git a/src/test/java/com/rabbitmq/client/test/functional/FrameMax.java b/src/test/java/com/rabbitmq/client/test/functional/FrameMax.java index 07d7eb9d9f..303d01e00d 100644 --- a/src/test/java/com/rabbitmq/client/test/functional/FrameMax.java +++ b/src/test/java/com/rabbitmq/client/test/functional/FrameMax.java @@ -30,6 +30,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeoutException; +import com.rabbitmq.client.impl.AMQBasicProperties; import com.rabbitmq.client.test.TestUtils; import org.junit.Test; @@ -104,9 +105,10 @@ public FrameMax() { closeChannel(); closeConnection(); ConnectionFactory cf = new GenerousConnectionFactory(); + cf.setRequestedFrameMax(8192); connection = cf.newConnection(); openChannel(); - basicPublishVolatile(new byte[connection.getFrameMax()], "void"); + basicPublishVolatile(new byte[connection.getFrameMax() * 2], "void"); expectError(AMQP.FRAME_ERROR); } @@ -123,7 +125,9 @@ public FrameMax() { // create headers with zero-length value to calculate maximum header value size before exceeding frame_max headers.put(headerName, LongStringHelper.asLongString(new byte[0])); - AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build(); + AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() + .headers(headers) + .build(); Frame minimalHeaderFrame = properties.toFrame(0, 0); int maxHeaderValueSize = FRAME_MAX - minimalHeaderFrame.size(); @@ -151,6 +155,33 @@ public FrameMax() { } + // see rabbitmq/rabbitmq-java-client#407 + @Test public void unlimitedFrameMaxWithHeaders() + throws IOException, TimeoutException { + closeChannel(); + closeConnection(); + ConnectionFactory cf = newConnectionFactory(); + cf.setRequestedFrameMax(0); + connection = cf.newConnection(); + openChannel(); + + Map headers = new HashMap(); + headers.put("h1", LongStringHelper.asLongString(new byte[250])); + headers.put("h1", LongStringHelper.asLongString(new byte[500])); + headers.put("h1", LongStringHelper.asLongString(new byte[750])); + headers.put("h1", LongStringHelper.asLongString(new byte[5000])); + headers.put("h1", LongStringHelper.asLongString(new byte[50000])); + AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder() + .headers(headers) + .build(); + basicPublishVolatile(new byte[500000], "", "", properties); + } + + @Override + protected boolean isAutomaticRecoveryEnabled() { + return false; + } + /* ConnectionFactory that uses MyFrameHandler rather than * SocketFrameHandler. */ private static class MyConnectionFactory extends ConnectionFactory {