Skip to content

Expand http2 connection window upon new stream acquisition #1661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "Netty NIO Http Client",
"type": "bugfix",
"description": "Expand Http2 connection-level flow control window when a new stream is acquired on that connection so that the connection-level window size is proportional to the number of streams."
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.netty.channel.Channel;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.util.AttributeKey;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -48,6 +49,12 @@ public final class ChannelAttributeKey {
public static final AttributeKey<PingTracker> PING_TRACKER =
AttributeKey.newInstance("aws.http.nio.netty.async.h2.pingTracker");

public static final AttributeKey<Http2Connection> HTTP2_CONNECTION =
AttributeKey.newInstance("aws.http.nio.netty.async.http2Connection");

public static final AttributeKey<Integer> HTTP2_INITIAL_WINDOW_SIZE =
AttributeKey.newInstance("aws.http.nio.netty.async.http2InitialWindowSize");

/**
* Value of the MAX_CONCURRENT_STREAMS from the server's SETTING frame.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_CONNECTION;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE;
import static software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration.HTTP2_CONNECTION_PING_TIMEOUT_SECONDS;
import static software.amazon.awssdk.utils.NumericUtils.saturatedCast;
Expand Down Expand Up @@ -151,6 +153,9 @@ private void configureHttp2(Channel ch, ChannelPipeline pipeline) {
codec.connection().addListener(new Http2GoAwayEventListener(ch));

pipeline.addLast(codec);
ch.attr(HTTP2_CONNECTION).set(codec.connection());

ch.attr(HTTP2_INITIAL_WINDOW_SIZE).set(clientInitialWindowSize);
pipeline.addLast(new Http2MultiplexHandler(new NoOpChannelInitializer()));
pipeline.addLast(new Http2SettingsFrameHandler(ch, clientMaxStreams, channelPoolRef));
if (healthCheckPingPeriod == null) {
Expand All @@ -170,7 +175,6 @@ private static class NoOpChannelInitializer extends ChannelInitializer<Channel>
protected void initChannel(Channel ch) {
}
}

}


Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

package software.amazon.awssdk.http.nio.netty.internal.http2;

import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_CONNECTION;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.MAX_CONCURRENT_STREAMS;
import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.doInEventLoop;

import io.netty.channel.Channel;
Expand All @@ -24,6 +27,10 @@
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.Future;
Expand Down Expand Up @@ -147,7 +154,7 @@ private void acquireStreamOnNewConnection(Promise<Channel> promise) {

private void acquireStreamOnFreshConnection(Promise<Channel> promise, Channel parentChannel, Protocol protocol) {
try {
Long maxStreams = parentChannel.attr(ChannelAttributeKey.MAX_CONCURRENT_STREAMS).get();
Long maxStreams = parentChannel.attr(MAX_CONCURRENT_STREAMS).get();

Validate.isTrue(protocol == Protocol.HTTP2,
"Protocol negotiated on connection (%s) was expected to be HTTP/2, but it "
Expand Down Expand Up @@ -202,7 +209,37 @@ private void cacheConnectionForFutureStreams(Channel stream,
promise.setSuccess(stream);
}

/**
* By default, connection window size is a constant value:
* connectionWindowSize = 65535 + (configureInitialWindowSize - 65535) * 2.
* See https://github.com/netty/netty/blob/5c458c9a98d4d3d0345e58495e017175156d624f/codec-http2/src/main/java/io/netty
* /handler/codec/http2/Http2FrameCodec.java#L255
* We should expand connection window so that the window size proportional to the number of concurrent streams within the
* connection.
* Note that when {@code WINDOW_UPDATE} will be sent depends on the processedWindow in DefaultHttp2LocalFlowController.
*/
private void tryExpandConnectionWindow(Channel parentChannel) {
doInEventLoop(parentChannel.eventLoop(), () -> {
Http2Connection http2Connection = parentChannel.attr(HTTP2_CONNECTION).get();
Integer initialWindowSize = parentChannel.attr(HTTP2_INITIAL_WINDOW_SIZE).get();

Validate.notNull(http2Connection, "http2Connection should not be null on channel " + parentChannel);
Validate.notNull(http2Connection, "initialWindowSize should not be null on channel " + parentChannel);

Http2Stream connectionStream = http2Connection.connectionStream();
log.debug(() -> "Expanding connection window size for " + parentChannel + " by " + initialWindowSize);
try {
Http2LocalFlowController localFlowController = http2Connection.local().flowController();
localFlowController.incrementWindowSize(connectionStream, initialWindowSize);

} catch (Http2Exception e) {
log.warn(() -> "Failed to increment windowSize of connection " + parentChannel, e);
}
});
}

private Void failAndCloseParent(Promise<Channel> promise, Channel parentChannel, Throwable exception) {
log.debug(() -> "Channel acquiring failed, closing connection " + parentChannel, exception);
promise.setFailure(exception);
closeAndReleaseParent(parentChannel);
return null;
Expand Down Expand Up @@ -233,6 +270,8 @@ private boolean acquireStreamOnInitializedConnection(MultiplexedChannelRecord ch
channel.attr(ChannelAttributeKey.HTTP2_MULTIPLEXED_CHANNEL_POOL).set(this);
channel.attr(MULTIPLEXED_CHANNEL).set(channelRecord);
promise.setSuccess(channel);

tryExpandConnectionWindow(channel.parent());
} catch (Exception e) {
promise.setFailure(e);
}
Expand Down Expand Up @@ -324,7 +363,7 @@ void handleGoAway(Channel parentChannel, int lastStreamId, GoAwayException excep
multiplexedChannel.handleGoAway(lastStreamId, exception);
} else {
// If we don't have a multiplexed channel, the parent channel hasn't been fully initialized. Close it now.
closeAndReleaseParent(parentChannel);
closeAndReleaseParent(parentChannel, exception);
}
} catch (Exception e) {
log.error(() -> "Failed to handle GOAWAY frame on channel " + parentChannel, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.HTTP2_CONNECTION;
import static software.amazon.awssdk.http.nio.netty.internal.http2.utils.Http2TestUtils.newHttp2Channel;

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
Expand All @@ -28,19 +30,25 @@
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.FailedFuture;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.Promise;
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.Mockito;
import software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey;

/**
* Tests for {@link Http2MultiplexedChannelPool}.
Expand Down Expand Up @@ -201,4 +209,53 @@ public void interruptDuringClosePreservesFlag() throws InterruptedException {
channel.close().awaitUninterruptibly();
}
}

@Test
public void acquire_shouldExpandConnectionWindowSizeProportionally() {
int maxConcurrentStream = 3;
EmbeddedChannel channel = newHttp2Channel();
channel.attr(ChannelAttributeKey.MAX_CONCURRENT_STREAMS).set((long) maxConcurrentStream);

try {
ChannelPool connectionPool = Mockito.mock(ChannelPool.class);

loopGroup.register(channel).awaitUninterruptibly();
Promise<Channel> channelPromise = new DefaultPromise<>(loopGroup.next());
channelPromise.setSuccess(channel);

Mockito.when(connectionPool.acquire()).thenReturn(channelPromise);

Http2MultiplexedChannelPool h2Pool = new Http2MultiplexedChannelPool(connectionPool, loopGroup,
Collections.emptySet(), null);

Future<Channel> acquire = h2Pool.acquire();
acquire.awaitUninterruptibly();
channel.runPendingTasks();

Http2Connection http2Connection = channel.attr(HTTP2_CONNECTION).get();
Http2LocalFlowController flowController =
http2Connection.local().flowController();

System.out.println(flowController.initialWindowSize());
Http2Stream connectionStream = http2Connection.stream(0);

// 1_048_576 (initial configured window size), 65535 (configured initial window size)
// (1048576 - 65535) *2 + 65535 = 2031617
assertThat(flowController.windowSize(connectionStream)).isEqualTo(2031617);

// 2031617 + 1048576 (configured initial window size) = 3080193
assertThat(flowController.initialWindowSize(connectionStream)).isEqualTo(3080193);

// acquire again
h2Pool.acquire().awaitUninterruptibly();
channel.runPendingTasks();

// 3080193 + 1048576 (configured initial window size) = 4128769
assertThat(flowController.initialWindowSize(connectionStream)).isEqualTo(4128769);

Mockito.verify(connectionPool, Mockito.times(1)).acquire();
} finally {
channel.close();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.internal.http2.utils;


import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.logging.LogLevel;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey;

public final class Http2TestUtils {
public static final int INITIAL_WINDOW_SIZE = 1_048_576;

public static EmbeddedChannel newHttp2Channel() {
return newHttp2Channel(new NoOpHandler());
}

public static EmbeddedChannel newHttp2Channel(ChannelHandler channelHandler) {
Http2FrameCodec http2FrameCodec = Http2FrameCodecBuilder.forClient().initialSettings(
Http2Settings.defaultSettings().initialWindowSize(INITIAL_WINDOW_SIZE))
.frameLogger(new Http2FrameLogger(LogLevel.DEBUG)).build();
EmbeddedChannel channel = new EmbeddedChannel(http2FrameCodec,
new Http2MultiplexHandler(channelHandler));

channel.attr(ChannelAttributeKey.HTTP2_CONNECTION).set(http2FrameCodec.connection());
channel.attr(ChannelAttributeKey.HTTP2_INITIAL_WINDOW_SIZE).set(INITIAL_WINDOW_SIZE);
channel.attr(ChannelAttributeKey.PROTOCOL_FUTURE).set(CompletableFuture.completedFuture(Protocol.HTTP2));
return channel;
}

private static class NoOpHandler extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) {
}
}
}