Skip to content

Commit f35284e

Browse files
committed
Released ignored HTTP content in Netty client
When cancelling a subscrption, it's possible onNext() is still invoked by the publisher if it processes signals asynchronously. While the adapter ignores the content rather forwarding it to the downstream subscriber, it leaves a dangling reference on the input HttpCotent. Fix this issue by decrementing the reference before returning.
1 parent b25ad4e commit f35284e

File tree

3 files changed

+80
-0
lines changed

3 files changed

+80
-0
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "Netty NIO HTTP Client",
4+
"contributor": "",
5+
"description": "Fix a bug where the Netty HTTP client can leak memory when a response stream is cancelled prematurely but the upstream publisher continues to invoke onNext for some time before stopping. Fixes [#2051](https://github.com/aws/aws-sdk-java-v2/issues/2051)."
6+
}

http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/ResponseHandler.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import io.netty.handler.codec.http.HttpUtil;
3939
import io.netty.handler.timeout.ReadTimeoutException;
4040
import io.netty.handler.timeout.WriteTimeoutException;
41+
import io.netty.util.ReferenceCountUtil;
4142
import java.io.IOException;
4243
import java.nio.ByteBuffer;
4344
import java.util.List;
@@ -251,6 +252,7 @@ private void onCancel() {
251252
public void onNext(HttpContent httpContent) {
252253
// isDone may be true if the subscriber cancelled
253254
if (isDone.get()) {
255+
ReferenceCountUtil.release(httpContent);
254256
return;
255257
}
256258

http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/PublisherAdapterTest.java

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,15 @@
2020
import static org.mockito.Mockito.mock;
2121
import static org.mockito.Mockito.times;
2222
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.verifyNoMoreInteractions;
2324
import static org.mockito.Mockito.when;
2425
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.EXECUTE_FUTURE_KEY;
2526
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.PROTOCOL_FUTURE;
2627
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.REQUEST_CONTEXT_KEY;
2728

2829
import io.netty.buffer.ByteBufAllocator;
2930
import io.netty.buffer.EmptyByteBuf;
31+
import io.netty.buffer.Unpooled;
3032
import io.netty.channel.ChannelHandlerContext;
3133
import io.netty.channel.EventLoopGroup;
3234
import io.netty.handler.codec.http.DefaultHttpContent;
@@ -43,6 +45,7 @@
4345
import org.junit.runner.RunWith;
4446
import org.mockito.Mock;
4547
import org.mockito.runners.MockitoJUnitRunner;
48+
import org.reactivestreams.Publisher;
4649
import org.reactivestreams.Subscriber;
4750
import org.reactivestreams.Subscription;
4851
import software.amazon.awssdk.http.Protocol;
@@ -155,6 +158,75 @@ public void errorOccurred_shouldInvokeResponseHandler() {
155158
verify(responseHandler).onError(exception);
156159
}
157160

161+
@Test
162+
public void subscriptionCancelled_upstreamPublisherCallsOnNext_httpContentReleased() {
163+
HttpContent firstContent = mock(HttpContent.class);
164+
when(firstContent.content()).thenReturn(Unpooled.EMPTY_BUFFER);
165+
166+
HttpContent[] contentToIgnore = new HttpContent[8];
167+
for (int i = 0; i < contentToIgnore.length; ++i) {
168+
contentToIgnore[i] = mock(HttpContent.class);
169+
when(contentToIgnore[i].content()).thenReturn(Unpooled.EMPTY_BUFFER);
170+
}
171+
172+
Publisher<HttpContent> publisher = subscriber -> subscriber.onSubscribe(new Subscription() {
173+
@Override
174+
public void request(long l) {
175+
// We ignore any cancel signal and just publish all the content
176+
subscriber.onNext(firstContent);
177+
178+
for (int i = 0; i < l && i < contentToIgnore.length; ++i) {
179+
subscriber.onNext(contentToIgnore[i]);
180+
}
181+
}
182+
183+
@Override
184+
public void cancel() {
185+
// no-op
186+
}
187+
});
188+
189+
DefaultStreamedHttpResponse streamedResponse = new DefaultStreamedHttpResponse(HttpVersion.HTTP_1_1,
190+
HttpResponseStatus.OK, publisher);
191+
192+
Subscriber<ByteBuffer> subscriber = new Subscriber<ByteBuffer>() {
193+
private Subscription subscription;
194+
195+
@Override
196+
public void onSubscribe(Subscription subscription) {
197+
this.subscription = subscription;
198+
subscription.request(Long.MAX_VALUE);
199+
}
200+
201+
@Override
202+
public void onNext(ByteBuffer byteBuffer) {
203+
subscription.cancel();
204+
}
205+
206+
@Override
207+
public void onError(Throwable throwable) {
208+
}
209+
210+
@Override
211+
public void onComplete() {
212+
}
213+
};
214+
215+
ResponseHandler.PublisherAdapter publisherAdapter = new ResponseHandler.PublisherAdapter(streamedResponse, ctx,
216+
requestContext, executeFuture);
217+
218+
publisherAdapter.subscribe(subscriber);
219+
220+
// First one should be accessed as normal
221+
verify(firstContent).content();
222+
verify(firstContent).release();
223+
224+
for (int i = 0; i < contentToIgnore.length; ++i) {
225+
verify(contentToIgnore[i]).release();
226+
verifyNoMoreInteractions(contentToIgnore[i]);
227+
}
228+
}
229+
158230
static final class TestSubscriber implements Subscriber<ByteBuffer> {
159231

160232
private Subscription subscription;

0 commit comments

Comments
 (0)