diff --git a/.changes/next-release/bugfix-NettyNIOHttpClient-cc6bbb2.json b/.changes/next-release/bugfix-NettyNIOHttpClient-cc6bbb2.json new file mode 100644 index 000000000000..3969c5c4290c --- /dev/null +++ b/.changes/next-release/bugfix-NettyNIOHttpClient-cc6bbb2.json @@ -0,0 +1,5 @@ +{ + "category": "Netty NIO Http Client", + "type": "bugfix", + "description": "Expand Http2 connection-level flow control window when a new stream is acquired on that connection so that the connection-level window size is proportional to the number of streams." +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java index 182ec6ce3921..f2feca135ffa 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java @@ -17,6 +17,7 @@ import io.netty.channel.Channel; import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http2.Http2Connection; import io.netty.util.AttributeKey; import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; @@ -48,6 +49,12 @@ public final class ChannelAttributeKey { public static final AttributeKey PING_TRACKER = AttributeKey.newInstance("aws.http.nio.netty.async.h2.pingTracker"); + public static final AttributeKey HTTP2_CONNECTION = + AttributeKey.newInstance("aws.http.nio.netty.async.http2Connection"); + + public static final AttributeKey HTTP2_INITIAL_WINDOW_SIZE = + AttributeKey.newInstance("aws.http.nio.netty.async.http2InitialWindowSize"); + /** * Value of the MAX_CONCURRENT_STREAMS from the server's SETTING frame. */ diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelPipelineInitializer.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelPipelineInitializer.java index edb7c44f70ec..4b19986bffc6 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelPipelineInitializer.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelPipelineInitializer.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.http.nio.netty.internal; +import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_CONNECTION; +import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE; import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE; import static software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration.HTTP2_CONNECTION_PING_TIMEOUT_SECONDS; import static software.amazon.awssdk.utils.NumericUtils.saturatedCast; @@ -151,6 +153,9 @@ private void configureHttp2(Channel ch, ChannelPipeline pipeline) { codec.connection().addListener(new Http2GoAwayEventListener(ch)); pipeline.addLast(codec); + ch.attr(HTTP2_CONNECTION).set(codec.connection()); + + ch.attr(HTTP2_INITIAL_WINDOW_SIZE).set(clientInitialWindowSize); pipeline.addLast(new Http2MultiplexHandler(new NoOpChannelInitializer())); pipeline.addLast(new Http2SettingsFrameHandler(ch, clientMaxStreams, channelPoolRef)); if (healthCheckPingPeriod == null) { @@ -170,7 +175,6 @@ private static class NoOpChannelInitializer extends ChannelInitializer protected void initChannel(Channel ch) { } } - } diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPool.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPool.java index 3e251c755626..0b720f52170b 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPool.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPool.java @@ -15,6 +15,9 @@ package software.amazon.awssdk.http.nio.netty.internal.http2; +import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_CONNECTION; +import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE; +import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.MAX_CONCURRENT_STREAMS; import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop; import io.netty.channel.Channel; @@ -24,6 +27,10 @@ import io.netty.channel.EventLoop; import io.netty.channel.EventLoopGroup; import io.netty.channel.pool.ChannelPool; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2Exception; +import io.netty.handler.codec.http2.Http2LocalFlowController; +import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; import io.netty.util.AttributeKey; import io.netty.util.concurrent.Future; @@ -147,7 +154,7 @@ private void acquireStreamOnNewConnection(Promise promise) { private void acquireStreamOnFreshConnection(Promise promise, Channel parentChannel, Protocol protocol) { try { - Long maxStreams = parentChannel.attr(ChannelAttributeKey.MAX_CONCURRENT_STREAMS).get(); + Long maxStreams = parentChannel.attr(MAX_CONCURRENT_STREAMS).get(); Validate.isTrue(protocol == Protocol.HTTP2, "Protocol negotiated on connection (%s) was expected to be HTTP/2, but it " @@ -202,7 +209,37 @@ private void cacheConnectionForFutureStreams(Channel stream, promise.setSuccess(stream); } + /** + * By default, connection window size is a constant value: + * connectionWindowSize = 65535 + (configureInitialWindowSize - 65535) * 2. + * See https://github.com/netty/netty/blob/5c458c9a98d4d3d0345e58495e017175156d624f/codec-http2/src/main/java/io/netty + * /handler/codec/http2/Http2FrameCodec.java#L255 + * We should expand connection window so that the window size proportional to the number of concurrent streams within the + * connection. + * Note that when {@code WINDOW_UPDATE} will be sent depends on the processedWindow in DefaultHttp2LocalFlowController. + */ + private void tryExpandConnectionWindow(Channel parentChannel) { + doInEventLoop(parentChannel.eventLoop(), () -> { + Http2Connection http2Connection = parentChannel.attr(HTTP2_CONNECTION).get(); + Integer initialWindowSize = parentChannel.attr(HTTP2_INITIAL_WINDOW_SIZE).get(); + + Validate.notNull(http2Connection, "http2Connection should not be null on channel " + parentChannel); + Validate.notNull(http2Connection, "initialWindowSize should not be null on channel " + parentChannel); + + Http2Stream connectionStream = http2Connection.connectionStream(); + log.debug(() -> "Expanding connection window size for " + parentChannel + " by " + initialWindowSize); + try { + Http2LocalFlowController localFlowController = http2Connection.local().flowController(); + localFlowController.incrementWindowSize(connectionStream, initialWindowSize); + + } catch (Http2Exception e) { + log.warn(() -> "Failed to increment windowSize of connection " + parentChannel, e); + } + }); + } + private Void failAndCloseParent(Promise promise, Channel parentChannel, Throwable exception) { + log.debug(() -> "Channel acquiring failed, closing connection " + parentChannel, exception); promise.setFailure(exception); closeAndReleaseParent(parentChannel); return null; @@ -233,6 +270,8 @@ private boolean acquireStreamOnInitializedConnection(MultiplexedChannelRecord ch channel.attr(ChannelAttributeKey.HTTP2_MULTIPLEXED_CHANNEL_POOL).set(this); channel.attr(MULTIPLEXED_CHANNEL).set(channelRecord); promise.setSuccess(channel); + + tryExpandConnectionWindow(channel.parent()); } catch (Exception e) { promise.setFailure(e); } @@ -324,7 +363,7 @@ void handleGoAway(Channel parentChannel, int lastStreamId, GoAwayException excep multiplexedChannel.handleGoAway(lastStreamId, exception); } else { // If we don't have a multiplexed channel, the parent channel hasn't been fully initialized. Close it now. - closeAndReleaseParent(parentChannel); + closeAndReleaseParent(parentChannel, exception); } } catch (Exception e) { log.error(() -> "Failed to handle GOAWAY frame on channel " + parentChannel, e); diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPoolTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPoolTest.java index bfa6e8f9deb1..ca40d14cb6e0 100644 --- a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPoolTest.java +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPoolTest.java @@ -20,6 +20,8 @@ import static org.mockito.Matchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_CONNECTION; +import static software.amazon.awssdk.http.nio.netty.internal.http2.utils.Http2TestUtils.newHttp2Channel; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; @@ -28,6 +30,10 @@ import io.netty.channel.pool.ChannelPool; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http2.Http2Connection; +import io.netty.handler.codec.http2.Http2FrameCodec; +import io.netty.handler.codec.http2.Http2LocalFlowController; +import io.netty.handler.codec.http2.Http2Stream; import io.netty.util.concurrent.DefaultPromise; import io.netty.util.concurrent.FailedFuture; import io.netty.util.concurrent.Future; @@ -35,12 +41,14 @@ import java.io.IOException; import java.util.Collections; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mockito; +import software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey; /** * Tests for {@link Http2MultiplexedChannelPool}. @@ -201,4 +209,53 @@ public void interruptDuringClosePreservesFlag() throws InterruptedException { channel.close().awaitUninterruptibly(); } } + + @Test + public void acquire_shouldExpandConnectionWindowSizeProportionally() { + int maxConcurrentStream = 3; + EmbeddedChannel channel = newHttp2Channel(); + channel.attr(ChannelAttributeKey.MAX_CONCURRENT_STREAMS).set((long) maxConcurrentStream); + + try { + ChannelPool connectionPool = Mockito.mock(ChannelPool.class); + + loopGroup.register(channel).awaitUninterruptibly(); + Promise channelPromise = new DefaultPromise<>(loopGroup.next()); + channelPromise.setSuccess(channel); + + Mockito.when(connectionPool.acquire()).thenReturn(channelPromise); + + Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup, + Collections.emptySet(), null); + + Future acquire = h2Pool.acquire(); + acquire.awaitUninterruptibly(); + channel.runPendingTasks(); + + Http2Connection http2Connection = channel.attr(HTTP2_CONNECTION).get(); + Http2LocalFlowController flowController = + http2Connection.local().flowController(); + + System.out.println(flowController.initialWindowSize()); + Http2Stream connectionStream = http2Connection.stream(0); + + // 1_048_576 (initial configured window size), 65535 (configured initial window size) + // (1048576 - 65535) *2 + 65535 = 2031617 + assertThat(flowController.windowSize(connectionStream)).isEqualTo(2031617); + + // 2031617 + 1048576 (configured initial window size) = 3080193 + assertThat(flowController.initialWindowSize(connectionStream)).isEqualTo(3080193); + + // acquire again + h2Pool.acquire().awaitUninterruptibly(); + channel.runPendingTasks(); + + // 3080193 + 1048576 (configured initial window size) = 4128769 + assertThat(flowController.initialWindowSize(connectionStream)).isEqualTo(4128769); + + Mockito.verify(connectionPool, Mockito.times(1)).acquire(); + } finally { + channel.close(); + } + } } diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/utils/Http2TestUtils.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/utils/Http2TestUtils.java new file mode 100644 index 000000000000..b91650047453 --- /dev/null +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/utils/Http2TestUtils.java @@ -0,0 +1,58 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.nio.netty.internal.http2.utils; + + +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.codec.http2.Http2FrameCodec; +import io.netty.handler.codec.http2.Http2FrameCodecBuilder; +import io.netty.handler.codec.http2.Http2FrameLogger; +import io.netty.handler.codec.http2.Http2MultiplexHandler; +import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.logging.LogLevel; +import java.util.concurrent.CompletableFuture; +import software.amazon.awssdk.http.Protocol; +import software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey; + +public final class Http2TestUtils { + public static final int INITIAL_WINDOW_SIZE = 1_048_576; + + public static EmbeddedChannel newHttp2Channel() { + return newHttp2Channel(new NoOpHandler()); + } + + public static EmbeddedChannel newHttp2Channel(ChannelHandler channelHandler) { + Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient().initialSettings( + Http2Settings.defaultSettings().initialWindowSize(INITIAL_WINDOW_SIZE)) + .frameLogger(new Http2FrameLogger(LogLevel.DEBUG)).build(); + EmbeddedChannel channel = new EmbeddedChannel(http2FrameCodec, + new Http2MultiplexHandler(channelHandler)); + + channel.attr(ChannelAttributeKey.HTTP2_CONNECTION).set(http2FrameCodec.connection()); + channel.attr(ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE).set(INITIAL_WINDOW_SIZE); + channel.attr(ChannelAttributeKey.PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP2)); + return channel; + } + + private static class NoOpHandler extends ChannelInitializer { + @Override + protected void initChannel(Channel ch) { + } + } +}