Skip to content

Commit 2df77ee

Browse files
authored
Fix concurrent pipeline modification bug (#2739)
* Fix concurrent pipeline modification bug When executing a request, it's possible for multiple threads to interact and modify the channel pipeline without synchronizing which can cause issues. This commit fixes this issue by ensuring that code that modifies the pipeline all run within the context of the channel's event loop. * Review comments Fix usages of doInEventLoop() in cases where the caller returns a Future. When calling doInEventLoop(), the caller needs to properly deal with cases where the returned future fails exceptionally, and take the appropriate action with the future that the caller itself returns. * Add additional tests
1 parent f8cf965 commit 2df77ee

File tree

11 files changed

+234
-51
lines changed

11 files changed

+234
-51
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "Netty NIO HTTP Client",
3+
"contributor": "",
4+
"type": "bugfix",
5+
"description": "When executing a request, it's possible for multiple threads to interact and modify the channel pipeline without synchronizing which can cause issues. This commit fixes this issue by ensuring that code that modifies the pipeline all run within the context of the channel's event loop."
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/FutureCancelHandler.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,42 +23,53 @@
2323
import io.netty.channel.ChannelInboundHandlerAdapter;
2424
import java.io.IOException;
2525
import software.amazon.awssdk.annotations.SdkInternalApi;
26+
import software.amazon.awssdk.utils.Logger;
2627

2728
/**
2829
* Closes the channel if the execution future has been cancelled.
2930
*/
3031
@SdkInternalApi
3132
@ChannelHandler.Sharable
3233
public final class FutureCancelHandler extends ChannelInboundHandlerAdapter {
34+
private static final Logger LOG = Logger.loggerFor(FutureCancelHandler.class);
3335
private static final FutureCancelHandler INSTANCE = new FutureCancelHandler();
3436

3537
private FutureCancelHandler() {
3638
}
3739

3840
@Override
3941
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) {
40-
if (cancelled(ctx, e)) {
42+
if (!(e instanceof FutureCancelledException)) {
43+
ctx.fireExceptionCaught(e);
44+
return;
45+
}
46+
47+
FutureCancelledException cancelledException = (FutureCancelledException) e;
48+
49+
if (currentRequestCancelled(ctx, cancelledException)) {
4150
RequestContext requestContext = ctx.channel().attr(REQUEST_CONTEXT_KEY).get();
4251
requestContext.handler().onError(e);
4352
ctx.fireExceptionCaught(new IOException("Request cancelled"));
4453
ctx.close();
4554
requestContext.channelPool().release(ctx.channel());
4655
} else {
47-
ctx.fireExceptionCaught(e);
56+
LOG.debug(() -> String.format("Received a cancellation exception but it did not match the current execution ID. "
57+
+ "Exception's execution ID is %d, but the current ID on the channel is %d. Exception"
58+
+ " is being ignored.",
59+
cancelledException.getExecutionId(),
60+
executionId(ctx)));
4861
}
4962
}
5063

5164
public static FutureCancelHandler getInstance() {
5265
return INSTANCE;
5366
}
5467

55-
private boolean cancelled(ChannelHandlerContext ctx, Throwable t) {
56-
if (!(t instanceof FutureCancelledException)) {
57-
return false;
58-
}
59-
60-
FutureCancelledException e = (FutureCancelledException) t;
68+
private boolean currentRequestCancelled(ChannelHandlerContext ctx, FutureCancelledException e) {
69+
return e.getExecutionId() == executionId(ctx);
70+
}
6171

62-
return e.getExecutionId() == ctx.channel().attr(EXECUTION_ID_KEY).get();
72+
private Long executionId(ChannelHandlerContext ctx) {
73+
return ctx.channel().attr(EXECUTION_ID_KEY).get();
6374
}
6475
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPool.java

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222
import io.netty.channel.ChannelHandler;
2323
import io.netty.handler.timeout.ReadTimeoutHandler;
2424
import io.netty.handler.timeout.WriteTimeoutHandler;
25+
import io.netty.util.concurrent.DefaultPromise;
2526
import io.netty.util.concurrent.Future;
2627
import io.netty.util.concurrent.Promise;
2728
import java.util.concurrent.CompletableFuture;
2829
import software.amazon.awssdk.annotations.SdkInternalApi;
2930
import software.amazon.awssdk.http.nio.netty.internal.http2.FlushOnReadHandler;
3031
import software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler;
32+
import software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils;
3133
import software.amazon.awssdk.metrics.MetricCollector;
3234

3335
/**
@@ -55,37 +57,37 @@ public Future<Channel> acquire(Promise<Channel> promise) {
5557

5658
@Override
5759
public Future<Void> release(Channel channel) {
58-
removePerRequestHandlers(channel);
59-
return delegate.release(channel);
60+
return release(channel, new DefaultPromise<>(channel.eventLoop()));
6061
}
6162

6263
@Override
6364
public Future<Void> release(Channel channel, Promise<Void> promise) {
64-
removePerRequestHandlers(channel);
65-
return delegate.release(channel, promise);
65+
removePerRequestHandlers(channel).addListener(future -> delegate.release(channel, promise));
66+
return promise;
6667
}
6768

6869
@Override
6970
public void close() {
7071
delegate.close();
7172
}
7273

73-
private void removePerRequestHandlers(Channel channel) {
74-
channel.attr(IN_USE).set(false);
74+
private Future<?> removePerRequestHandlers(Channel channel) {
75+
return NettyUtils.doInEventLoop(channel.eventLoop(), () -> {
76+
channel.attr(IN_USE).set(false);
7577

76-
// Only remove per request handler if the channel is registered
77-
// or open since DefaultChannelPipeline would remove handlers if
78-
// channel is closed and unregistered
79-
// See DefaultChannelPipeline.java#L1403
80-
if (channel.isOpen() || channel.isRegistered()) {
81-
removeIfExists(channel.pipeline(),
82-
HttpStreamsClientHandler.class,
83-
LastHttpContentHandler.class,
84-
FlushOnReadHandler.class,
85-
ResponseHandler.class,
86-
ReadTimeoutHandler.class,
87-
WriteTimeoutHandler.class);
88-
}
78+
// Only remove per request handler if the channel is registered
79+
// or open since DefaultChannelPipeline would remove handlers if
80+
// channel is closed and unregistered
81+
if (channel.isOpen() || channel.isRegistered()) {
82+
removeIfExists(channel.pipeline(),
83+
HttpStreamsClientHandler.class,
84+
LastHttpContentHandler.class,
85+
FlushOnReadHandler.class,
86+
ResponseHandler.class,
87+
ReadTimeoutHandler.class,
88+
WriteTimeoutHandler.class);
89+
}
90+
});
8991
}
9092

9193
@Override

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HonorCloseOnReleaseChannelPool.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,12 @@ public Future<Void> release(Channel channel, Promise<Void> promise) {
6666
}
6767

6868
delegatePool.release(channel, promise);
69+
}).addListener(f -> {
70+
if (!f.isSuccess()) {
71+
promise.tryFailure(f.cause());
72+
}
6973
});
74+
7075
return promise;
7176
}
7277

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/IdleConnectionCountingChannelPool.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.netty.channel.Channel;
2121
import io.netty.channel.pool.ChannelPool;
2222
import io.netty.util.AttributeKey;
23+
import io.netty.util.concurrent.DefaultPromise;
2324
import io.netty.util.concurrent.EventExecutor;
2425
import io.netty.util.concurrent.Future;
2526
import io.netty.util.concurrent.Promise;
@@ -91,14 +92,13 @@ public Future<Channel> acquire(Promise<Channel> promise) {
9192

9293
@Override
9394
public Future<Void> release(Channel channel) {
94-
channelReleased(channel);
95-
return delegatePool.release(channel);
95+
return release(channel, new DefaultPromise<>(executor));
9696
}
9797

9898
@Override
9999
public Future<Void> release(Channel channel, Promise<Void> promise) {
100-
channelReleased(channel);
101-
return delegatePool.release(channel, promise);
100+
channelReleased(channel).addListener(f -> delegatePool.release(channel, promise));
101+
return promise;
102102
}
103103

104104
@Override
@@ -112,6 +112,10 @@ public CompletableFuture<Void> collectChannelPoolMetrics(MetricCollector metrics
112112
doInEventLoop(executor, () -> {
113113
metrics.reportMetric(HttpMetric.AVAILABLE_CONCURRENCY, idleConnections);
114114
result.complete(null);
115+
}).addListener(f -> {
116+
if (!f.isSuccess()) {
117+
result.completeExceptionally(f.cause());
118+
}
115119
});
116120
return result;
117121
}
@@ -153,8 +157,8 @@ private void channelAcquired(Channel channel) {
153157
/**
154158
* Invoked when a channel is released, marking it idle until it's acquired.
155159
*/
156-
private void channelReleased(Channel channel) {
157-
doInEventLoop(executor, () -> {
160+
private Future<?> channelReleased(Channel channel) {
161+
return doInEventLoop(executor, () -> {
158162
ChannelIdleState channelIdleState = getChannelIdleState(channel);
159163

160164
if (channelIdleState == null) {

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
import software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler;
6969
import software.amazon.awssdk.http.nio.netty.internal.nrs.StreamedHttpRequest;
7070
import software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils;
71+
import software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils;
7172
import software.amazon.awssdk.metrics.MetricCollector;
7273

7374
@SdkInternalApi
@@ -164,10 +165,12 @@ private void verifyMetricsWereCollected(CompletableFuture<Void> metricsFuture) {
164165
private void makeRequestListener(Future<Channel> channelFuture) {
165166
if (channelFuture.isSuccess()) {
166167
channel = channelFuture.getNow();
167-
configureChannel();
168-
if (tryConfigurePipeline()) {
169-
makeRequest();
170-
}
168+
NettyUtils.doInEventLoop(channel.eventLoop(), () -> {
169+
configureChannel();
170+
if (tryConfigurePipeline()) {
171+
makeRequest();
172+
}
173+
});
171174
} else {
172175
handleFailure(() -> "Failed to create connection to " + endpoint(), channelFuture.cause());
173176
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/NettyUtils.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -130,13 +130,19 @@ public static <T> GenericFutureListener<Future<T>> promiseNotifyingListener(Prom
130130
*
131131
* @param eventExecutor Executor to run task in.
132132
* @param runnable Task to run.
133+
*
134+
* @return The {@code Future} from from the executor.
133135
*/
134-
public static void doInEventLoop(EventExecutor eventExecutor, Runnable runnable) {
136+
public static Future<?> doInEventLoop(EventExecutor eventExecutor, Runnable runnable) {
135137
if (eventExecutor.inEventLoop()) {
136-
runnable.run();
137-
} else {
138-
eventExecutor.submit(runnable);
138+
try {
139+
runnable.run();
140+
return eventExecutor.newSucceededFuture(null);
141+
} catch (Throwable t) {
142+
return eventExecutor.newFailedFuture(t);
143+
}
139144
}
145+
return eventExecutor.submit(runnable);
140146
}
141147

142148
/**

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HonorCloseOnReleaseChannelPoolTest.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,14 @@
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
1919
import static org.mockito.Matchers.any;
20+
import static org.mockito.Mockito.when;
2021

22+
import io.netty.channel.Channel;
23+
import io.netty.channel.nio.NioEventLoopGroup;
2124
import io.netty.channel.pool.ChannelPool;
25+
import io.netty.util.concurrent.Future;
26+
import io.netty.util.concurrent.Promise;
27+
import java.util.concurrent.TimeUnit;
2228
import org.junit.Test;
2329
import org.mockito.Mockito;
2430
import org.mockito.internal.verification.Times;
@@ -53,4 +59,36 @@ public void releaseClosesIfFlagged() throws Exception {
5359
Mockito.verify(channelPool, new Times(0)).release(any());
5460
Mockito.verify(channelPool, new Times(1)).release(any(), any());
5561
}
62+
63+
@Test
64+
public void release_delegateReleaseFails_futureFailed() throws Exception {
65+
NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup();
66+
try {
67+
ChannelPool mockDelegatePool = Mockito.mock(ChannelPool.class);
68+
MockChannel channel = new MockChannel();
69+
70+
nioEventLoopGroup.register(channel);
71+
72+
HonorCloseOnReleaseChannelPool channelPool = new HonorCloseOnReleaseChannelPool(mockDelegatePool);
73+
74+
RuntimeException errorToThrow = new RuntimeException("failed!");
75+
when(mockDelegatePool.release(any(Channel.class), any(Promise.class))).thenThrow(errorToThrow);
76+
77+
Promise<Void> promise = channel.eventLoop().newPromise();
78+
79+
Future<Void> releasePromise = channelPool.release(channel, promise);
80+
try {
81+
releasePromise.get(1, TimeUnit.SECONDS);
82+
} catch (Exception e) {
83+
if (e instanceof InterruptedException) {
84+
throw e;
85+
}
86+
// ignore any other exception
87+
}
88+
assertThat(releasePromise.isSuccess()).isFalse();
89+
assertThat(releasePromise.cause()).isSameAs(errorToThrow);
90+
} finally {
91+
nioEventLoopGroup.shutdownGracefully();
92+
}
93+
}
5694
}

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/IdleConnectionCountingChannelPoolTest.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616
package software.amazon.awssdk.http.nio.netty.internal;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1920
import static org.mockito.Matchers.any;
21+
import static org.mockito.Mockito.doThrow;
2022
import static org.mockito.Mockito.mock;
2123

2224
import io.netty.channel.Channel;
@@ -26,6 +28,8 @@
2628
import io.netty.channel.pool.ChannelPool;
2729
import io.netty.util.concurrent.Future;
2830
import io.netty.util.concurrent.Promise;
31+
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.TimeUnit;
2933
import org.junit.After;
3034
import org.junit.Before;
3135
import org.junit.Test;
@@ -170,6 +174,20 @@ public void stochastic_rapidAcquireCloseReleaseIsCalculatedCorrectly() throws In
170174
}
171175
}
172176

177+
@Test
178+
public void collectChannelPoolMetrics_failes_futureFailed() throws Exception {
179+
MockChannel channel = new MockChannel();
180+
eventLoopGroup.register(channel);
181+
182+
RuntimeException errorToThrow = new RuntimeException("failed!");
183+
MetricCollector mockMetricCollector = mock(MetricCollector.class);
184+
doThrow(errorToThrow).when(mockMetricCollector).reportMetric(any(), any());
185+
186+
CompletableFuture<Void> collectFuture = idleCountingPool.collectChannelPoolMetrics(mockMetricCollector);
187+
188+
assertThatThrownBy(() -> collectFuture.get(1, TimeUnit.SECONDS)).hasCause(errorToThrow);
189+
}
190+
173191
private int getIdleConnectionCount() {
174192
MetricCollector metricCollector = MetricCollector.create("test");
175193
idleCountingPool.collectChannelPoolMetrics(metricCollector).join();
@@ -186,10 +204,10 @@ private void stubDelegatePoolAcquiresForSuccess() {
186204
}
187205

188206
private void stubDelegatePoolReleasesForSuccess() {
189-
Mockito.when(delegatePool.release(any())).thenAnswer((Answer<Future<Channel>>) invocation -> {
190-
Channel channel = invocation.getArgumentAt(0, Channel.class);
191-
Promise<Channel> result = channel.eventLoop().newPromise();
192-
return result.setSuccess(channel);
207+
Mockito.when(delegatePool.release(any(Channel.class), any(Promise.class))).thenAnswer((Answer<Future<Void>>) invocation -> {
208+
Promise<Void> promise = invocation.getArgumentAt(1, Promise.class);
209+
promise.setSuccess(null);
210+
return promise;
193211
});
194212
}
195213

0 commit comments

Comments
 (0)