Skip to content

Commit 0124df2

Browse files
committed
Expand http2 connection window when a new stream is acquired on that connection
1 parent 38f54b2 commit 0124df2

File tree

6 files changed

+173
-3
lines changed

6 files changed

+173
-3
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"category": "Netty NIO Http Client",
3+
"type": "bugfix",
4+
"description": "Expand Http2 connection-level flow control window when a new stream is acquired on that connection so that the connection-level window size is proportional to the number of streams."
5+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelAttributeKey.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import io.netty.channel.Channel;
1919
import io.netty.handler.codec.http.LastHttpContent;
20+
import io.netty.handler.codec.http2.Http2Connection;
2021
import io.netty.util.AttributeKey;
2122
import java.nio.ByteBuffer;
2223
import java.util.concurrent.CompletableFuture;
@@ -48,6 +49,12 @@ public final class ChannelAttributeKey {
4849
public static final AttributeKey<PingTracker> PING_TRACKER =
4950
AttributeKey.newInstance("aws.http.nio.netty.async.h2.pingTracker");
5051

52+
public static final AttributeKey<Http2Connection> HTTP2_CONNECTION =
53+
AttributeKey.newInstance("aws.http.nio.netty.async.http2Connection");
54+
55+
public static final AttributeKey<Integer> HTTP2_INITIAL_WINDOW_SIZE =
56+
AttributeKey.newInstance("aws.http.nio.netty.async.http2InitialWindowSize");
57+
5158
/**
5259
* Value of the MAX_CONCURRENT_STREAMS from the server's SETTING frame.
5360
*/

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ChannelPipelineInitializer.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.http.nio.netty.internal;
1717

18+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_CONNECTION;
19+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE;
1820
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE;
1921
import static software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration.HTTP2_CONNECTION_PING_TIMEOUT_SECONDS;
2022
import static software.amazon.awssdk.utils.NumericUtils.saturatedCast;
@@ -151,6 +153,9 @@ private void configureHttp2(Channel ch, ChannelPipeline pipeline) {
151153
codec.connection().addListener(new Http2GoAwayEventListener(ch));
152154

153155
pipeline.addLast(codec);
156+
ch.attr(HTTP2_CONNECTION).set(codec.connection());
157+
158+
ch.attr(HTTP2_INITIAL_WINDOW_SIZE).set(clientInitialWindowSize);
154159
pipeline.addLast(new Http2MultiplexHandler(new NoOpChannelInitializer()));
155160
pipeline.addLast(new Http2SettingsFrameHandler(ch, clientMaxStreams, channelPoolRef));
156161
if (healthCheckPingPeriod == null) {
@@ -170,7 +175,6 @@ private static class NoOpChannelInitializer extends ChannelInitializer<Channel>
170175
protected void initChannel(Channel ch) {
171176
}
172177
}
173-
174178
}
175179

176180

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPool.java

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515

1616
package software.amazon.awssdk.http.nio.netty.internal.http2;
1717

18+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_CONNECTION;
19+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE;
20+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.MAX_CONCURRENT_STREAMS;
1821
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop;
1922

2023
import io.netty.channel.Channel;
@@ -24,6 +27,10 @@
2427
import io.netty.channel.EventLoop;
2528
import io.netty.channel.EventLoopGroup;
2629
import io.netty.channel.pool.ChannelPool;
30+
import io.netty.handler.codec.http2.Http2Connection;
31+
import io.netty.handler.codec.http2.Http2Exception;
32+
import io.netty.handler.codec.http2.Http2LocalFlowController;
33+
import io.netty.handler.codec.http2.Http2Stream;
2734
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
2835
import io.netty.util.AttributeKey;
2936
import io.netty.util.concurrent.Future;
@@ -147,7 +154,7 @@ private void acquireStreamOnNewConnection(Promise<Channel> promise) {
147154

148155
private void acquireStreamOnFreshConnection(Promise<Channel> promise, Channel parentChannel, Protocol protocol) {
149156
try {
150-
Long maxStreams = parentChannel.attr(ChannelAttributeKey.MAX_CONCURRENT_STREAMS).get();
157+
Long maxStreams = parentChannel.attr(MAX_CONCURRENT_STREAMS).get();
151158

152159
Validate.isTrue(protocol == Protocol.HTTP2,
153160
"Protocol negotiated on connection (%s) was expected to be HTTP/2, but it "
@@ -202,7 +209,37 @@ private void cacheConnectionForFutureStreams(Channel stream,
202209
promise.setSuccess(stream);
203210
}
204211

212+
/**
213+
* By default, connection window size is a constant value:
214+
* connectionWindowSize = 65535 + (configureInitialWindowSize - 65535) * 2.
215+
* See https://github.com/netty/netty/blob/5c458c9a98d4d3d0345e58495e017175156d624f/codec-http2/src/main/java/io/netty
216+
* /handler/codec/http2/Http2FrameCodec.java#L255
217+
* We should expand connection window so that the window size proportional to the number of concurrent streams within the
218+
* connection.
219+
* Note that when {@code WINDOW_UPDATE} will be sent depends on the processedWindow in DefaultHttp2LocalFlowController.
220+
*/
221+
private void tryExpandConnectionWindow(Channel parentChannel) {
222+
doInEventLoop(parentChannel.eventLoop(), () -> {
223+
Http2Connection http2Connection = parentChannel.attr(HTTP2_CONNECTION).get();
224+
Integer initialWindowSize = parentChannel.attr(HTTP2_INITIAL_WINDOW_SIZE).get();
225+
226+
Validate.notNull(http2Connection, "http2Connection should not be null on channel " + parentChannel);
227+
Validate.notNull(http2Connection, "initialWindowSize should not be null on channel " + parentChannel);
228+
229+
Http2Stream connectionStream = http2Connection.connectionStream();
230+
log.debug(() -> "Expanding connection window size for " + parentChannel + " by " + initialWindowSize);
231+
try {
232+
Http2LocalFlowController localFlowController = http2Connection.local().flowController();
233+
localFlowController.incrementWindowSize(connectionStream, initialWindowSize);
234+
235+
} catch (Http2Exception e) {
236+
log.warn(() -> "Failed to increment windowSize of connection " + parentChannel, e);
237+
}
238+
});
239+
}
240+
205241
private Void failAndCloseParent(Promise<Channel> promise, Channel parentChannel, Throwable exception) {
242+
log.debug(() -> "Channel acquiring failed, closing connection " + parentChannel, exception);
206243
promise.setFailure(exception);
207244
closeAndReleaseParent(parentChannel);
208245
return null;
@@ -233,6 +270,8 @@ private boolean acquireStreamOnInitializedConnection(MultiplexedChannelRecord ch
233270
channel.attr(ChannelAttributeKey.HTTP2_MULTIPLEXED_CHANNEL_POOL).set(this);
234271
channel.attr(MULTIPLEXED_CHANNEL).set(channelRecord);
235272
promise.setSuccess(channel);
273+
274+
tryExpandConnectionWindow(channel.parent());
236275
} catch (Exception e) {
237276
promise.setFailure(e);
238277
}
@@ -324,7 +363,7 @@ void handleGoAway(Channel parentChannel, int lastStreamId, GoAwayException excep
324363
multiplexedChannel.handleGoAway(lastStreamId, exception);
325364
} else {
326365
// If we don't have a multiplexed channel, the parent channel hasn't been fully initialized. Close it now.
327-
closeAndReleaseParent(parentChannel);
366+
closeAndReleaseParent(parentChannel, exception);
328367
}
329368
} catch (Exception e) {
330369
log.error(() -> "Failed to handle GOAWAY frame on channel " + parentChannel, e);

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/http2/Http2MultiplexedChannelPoolTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import static org.mockito.Matchers.isA;
2121
import static org.mockito.Mockito.mock;
2222
import static org.mockito.Mockito.when;
23+
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_CONNECTION;
24+
import static software.amazon.awssdk.http.nio.netty.internal.http2.utils.Http2TestUtils.newHttp2Channel;
2325

2426
import io.netty.channel.Channel;
2527
import io.netty.channel.EventLoopGroup;
@@ -28,19 +30,25 @@
2830
import io.netty.channel.pool.ChannelPool;
2931
import io.netty.channel.socket.SocketChannel;
3032
import io.netty.channel.socket.nio.NioSocketChannel;
33+
import io.netty.handler.codec.http2.Http2Connection;
34+
import io.netty.handler.codec.http2.Http2FrameCodec;
35+
import io.netty.handler.codec.http2.Http2LocalFlowController;
36+
import io.netty.handler.codec.http2.Http2Stream;
3137
import io.netty.util.concurrent.DefaultPromise;
3238
import io.netty.util.concurrent.FailedFuture;
3339
import io.netty.util.concurrent.Future;
3440
import io.netty.util.concurrent.Promise;
3541
import java.io.IOException;
3642
import java.util.Collections;
3743
import java.util.concurrent.CompletableFuture;
44+
import java.util.concurrent.ExecutionException;
3845
import org.junit.AfterClass;
3946
import org.junit.BeforeClass;
4047
import org.junit.Test;
4148
import org.mockito.ArgumentCaptor;
4249
import org.mockito.InOrder;
4350
import org.mockito.Mockito;
51+
import software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey;
4452

4553
/**
4654
* Tests for {@link Http2MultiplexedChannelPool}.
@@ -201,4 +209,53 @@ public void interruptDuringClosePreservesFlag() throws InterruptedException {
201209
channel.close().awaitUninterruptibly();
202210
}
203211
}
212+
213+
@Test
214+
public void acquire_shouldExpandConnectionWindowSizeProportionally() {
215+
int maxConcurrentStream = 3;
216+
EmbeddedChannel channel = newHttp2Channel();
217+
channel.attr(ChannelAttributeKey.MAX_CONCURRENT_STREAMS).set((long) maxConcurrentStream);
218+
219+
try {
220+
ChannelPool connectionPool = Mockito.mock(ChannelPool.class);
221+
222+
loopGroup.register(channel).awaitUninterruptibly();
223+
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());
224+
channelPromise.setSuccess(channel);
225+
226+
Mockito.when(connectionPool.acquire()).thenReturn(channelPromise);
227+
228+
Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup,
229+
Collections.emptySet(), null);
230+
231+
Future<Channel> acquire = h2Pool.acquire();
232+
acquire.awaitUninterruptibly();
233+
channel.runPendingTasks();
234+
235+
Http2Connection http2Connection = channel.attr(HTTP2_CONNECTION).get();
236+
Http2LocalFlowController flowController =
237+
http2Connection.local().flowController();
238+
239+
System.out.println(flowController.initialWindowSize());
240+
Http2Stream connectionStream = http2Connection.stream(0);
241+
242+
// 1_048_576 (initial configured window size), 65535 (configured initial window size)
243+
// (1048576 - 65535) *2 + 65535 = 2031617
244+
assertThat(flowController.windowSize(connectionStream)).isEqualTo(2031617);
245+
246+
// 2031617 + 1048576 (configured initial window size) = 3080193
247+
assertThat(flowController.initialWindowSize(connectionStream)).isEqualTo(3080193);
248+
249+
// acquire again
250+
h2Pool.acquire().awaitUninterruptibly();
251+
channel.runPendingTasks();
252+
253+
// 3080193 + 1048576 (configured initial window size) = 4128769
254+
assertThat(flowController.initialWindowSize(connectionStream)).isEqualTo(4128769);
255+
256+
Mockito.verify(connectionPool, Mockito.times(1)).acquire();
257+
} finally {
258+
channel.close();
259+
}
260+
}
204261
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.http.nio.netty.internal.http2.utils;
17+
18+
19+
import io.netty.channel.Channel;
20+
import io.netty.channel.ChannelHandler;
21+
import io.netty.channel.ChannelInitializer;
22+
import io.netty.channel.embedded.EmbeddedChannel;
23+
import io.netty.handler.codec.http2.Http2FrameCodec;
24+
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
25+
import io.netty.handler.codec.http2.Http2FrameLogger;
26+
import io.netty.handler.codec.http2.Http2MultiplexHandler;
27+
import io.netty.handler.codec.http2.Http2Settings;
28+
import io.netty.handler.logging.LogLevel;
29+
import java.util.concurrent.CompletableFuture;
30+
import software.amazon.awssdk.http.Protocol;
31+
import software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey;
32+
33+
public final class Http2TestUtils {
34+
public static final int INITIAL_WINDOW_SIZE = 1_048_576;
35+
36+
public static EmbeddedChannel newHttp2Channel() {
37+
return newHttp2Channel(new NoOpHandler());
38+
}
39+
40+
public static EmbeddedChannel newHttp2Channel(ChannelHandler channelHandler) {
41+
Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient().initialSettings(
42+
Http2Settings.defaultSettings().initialWindowSize(INITIAL_WINDOW_SIZE))
43+
.frameLogger(new Http2FrameLogger(LogLevel.DEBUG)).build();
44+
EmbeddedChannel channel = new EmbeddedChannel(http2FrameCodec,
45+
new Http2MultiplexHandler(channelHandler));
46+
47+
channel.attr(ChannelAttributeKey.HTTP2_CONNECTION).set(http2FrameCodec.connection());
48+
channel.attr(ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE).set(INITIAL_WINDOW_SIZE);
49+
channel.attr(ChannelAttributeKey.PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP2));
50+
return channel;
51+
}
52+
53+
private static class NoOpHandler extends ChannelInitializer<Channel> {
54+
@Override
55+
protected void initChannel(Channel ch) {
56+
}
57+
}
58+
}

0 commit comments

Comments
 (0)