Skip to content

Add more tests to verify connection closure and small code clean up #1034

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private SdkException handleErrorResponse(SdkHttpFullResponse httpResponse,
*/
private void closeInputStreamIfNeeded(SdkHttpFullResponse httpResponse,
boolean didRequestFail) {
// Always close on failed requests. Close on successful unless streaming operation.
// Always close on failed requests. Close on successful requests unless it needs connection left open
if (didRequestFail || !successResponseHandler.needsConnectionLeftOpen()) {
Optional.ofNullable(httpResponse)
.flatMap(SdkHttpFullResponse::content) // If no content, no need to close
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,12 @@ public CompletableFuture<Response<OutputT>> prepare() {
return headersFuture.thenCompose(headers -> {
if (headers.isSuccessful()) {
return transformFuture.thenApply(r -> Response.fromSuccess(r, response));
} else {
if (errorTransformFuture != null) {
return errorTransformFuture.thenApply(e -> Response.fromFailure(e, response));
} else {
return CompletableFuture.completedFuture(Response.fromFailure(null, response));
}
}

if (errorTransformFuture != null) {
return errorTransformFuture.thenApply(e -> Response.fromFailure(e, response));
}
return CompletableFuture.completedFuture(Response.fromFailure(null, response));
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.io.IOException;
import java.net.URI;
import java.util.concurrent.ExecutorService;
import org.apache.log4j.BasicConfigurator;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -31,15 +32,16 @@
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption;
import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient;
import software.amazon.awssdk.core.internal.http.timers.ClientExecutionAndRequestTimerTestUtils;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
import software.amazon.awssdk.http.HttpExecuteResponse;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.SdkHttpResponse;
import utils.HttpTestUtils;
Expand All @@ -54,6 +56,9 @@ public class AmazonHttpClientTest {
@Mock
private ExecutableHttpRequest abortableCallable;

@Mock
private ExecutorService executor;

private AmazonSyncHttpClient client;

@Before
Expand Down Expand Up @@ -146,6 +151,20 @@ public void testUserAgentPrefixAndSuffixAreAdded() {
Assert.assertTrue(userAgent.endsWith(suffix));
}

@Test
public void closeClient_shouldCloseDependencies() {
SdkClientConfiguration config = HttpTestUtils.testClientConfiguration()
.toBuilder()
.option(SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR, executor)
.option(SdkClientOption.SYNC_HTTP_CLIENT, sdkHttpClient)
.build();

AmazonSyncHttpClient client = new AmazonSyncHttpClient(config);
client.close();
verify(sdkHttpClient).close();
verify(executor).shutdown();
}

private void stubSuccessfulResponse() throws Exception {
when(abortableCallable.call()).thenReturn(HttpExecuteResponse.builder().response(SdkHttpResponse.builder()
.statusCode(200)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.protocol.HttpRequestExecutor;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.ExecutableHttpRequest;
import software.amazon.awssdk.http.HttpExecuteRequest;
Expand All @@ -76,6 +77,7 @@
import software.amazon.awssdk.http.apache.internal.conn.SdkConnectionKeepAliveStrategy;
import software.amazon.awssdk.http.apache.internal.conn.SdkTlsSocketFactory;
import software.amazon.awssdk.http.apache.internal.impl.ApacheHttpRequestFactory;
import software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient;
import software.amazon.awssdk.http.apache.internal.impl.ConnectionManagerAwareHttpClient;
import software.amazon.awssdk.http.apache.internal.utils.ApacheUtils;
import software.amazon.awssdk.utils.AttributeMap;
Expand All @@ -100,6 +102,15 @@ public final class ApacheHttpClient implements SdkHttpClient {
private final ApacheHttpRequestConfig requestConfig;
private final AttributeMap resolvedOptions;

@SdkTestInternalApi
ApacheHttpClient(ConnectionManagerAwareHttpClient httpClient,
ApacheHttpRequestConfig requestConfig,
AttributeMap resolvedOptions) {
this.httpClient = httpClient;
this.requestConfig = requestConfig;
this.resolvedOptions = resolvedOptions;
}

private ApacheHttpClient(DefaultBuilder builder, AttributeMap resolvedOptions) {
this.httpClient = createClient(builder, resolvedOptions);
this.requestConfig = createRequestConfig(builder, resolvedOptions);
Expand Down Expand Up @@ -136,7 +147,7 @@ private ConnectionManagerAwareHttpClient createClient(ApacheHttpClient.DefaultBu
cm, connectionMaxIdleTime(configuration).toMillis());
}

return new software.amazon.awssdk.http.apache.internal.impl.ApacheSdkHttpClient(builder.build(), cm);
return new ApacheSdkHttpClient(builder.build(), cm);
}

private void addProxyConfig(HttpClientBuilder builder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static final class Builder {
private Duration connectionTimeout;
private Duration connectionAcquireTimeout;
private InetAddress localAddress;
private Boolean expectContinueEnabled;
private boolean expectContinueEnabled;
private ProxyConfiguration proxyConfiguration;

private Builder() {
Expand All @@ -109,7 +109,7 @@ public Builder localAddress(InetAddress localAddress) {
return this;
}

public Builder expectContinueEnabled(Boolean expectContinueEnabled) {
public Builder expectContinueEnabled(boolean expectContinueEnabled) {
this.expectContinueEnabled = expectContinueEnabled;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,31 @@

package software.amazon.awssdk.http.apache;

import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES;

import java.net.HttpURLConnection;
import org.apache.http.conn.HttpClientConnectionManager;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.http.SdkHttpClient;
import software.amazon.awssdk.http.SdkHttpClientTestSuite;
import software.amazon.awssdk.http.apache.internal.ApacheHttpRequestConfig;
import software.amazon.awssdk.http.apache.internal.impl.ConnectionManagerAwareHttpClient;
import software.amazon.awssdk.utils.AttributeMap;

@RunWith(MockitoJUnitRunner.class)
public class ApacheHttpClientWireMockTest extends SdkHttpClientTestSuite {

@Mock
private ConnectionManagerAwareHttpClient httpClient;

@Mock
private HttpClientConnectionManager connectionManager;

@Override
protected SdkHttpClient createSdkHttpClient(SdkHttpClientOptions options) {
return ApacheHttpClient.builder().build();
Expand All @@ -41,4 +54,14 @@ public void noSslException_WhenCertCheckingDisabled() throws Exception {

testForResponseCodeUsingHttps(client, HttpURLConnection.HTTP_OK);
}

@Test
public void closeClient_shouldCloseUnderlyingResources() {
ApacheHttpClient client = new ApacheHttpClient(httpClient, ApacheHttpRequestConfig.builder().build(), AttributeMap.empty());
when(httpClient.getHttpClientConnectionManager()).thenReturn(connectionManager);

client.close();
verify(connectionManager).shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpRequest;
Expand Down Expand Up @@ -88,6 +89,19 @@ public final class NettyNioAsyncHttpClient implements SdkAsyncHttpClient {
this.sdkChannelOptions = channelOptions(builder);
}

@SdkTestInternalApi
NettyNioAsyncHttpClient(SdkEventLoopGroup sdkEventLoopGroup,
SdkChannelPoolMap<URI, ChannelPool> pools,
SdkChannelOptions sdkChannelOptions,
NettyConfiguration configuration,
long maxStreams) {
this.sdkEventLoopGroup = sdkEventLoopGroup;
this.pools = pools;
this.sdkChannelOptions = sdkChannelOptions;
this.configuration = configuration;
this.maxStreams = maxStreams;
}

private SdkChannelOptions channelOptions(DefaultBuilder builder) {
return builder.sdkChannelOptions;
}
Expand Down Expand Up @@ -171,7 +185,7 @@ private SdkEventLoopGroup nonManagedEventLoopGroup(SdkEventLoopGroup eventLoopGr

@Override
public void close() {
pools.forEach(e -> runAndLogError(log, "Unable to close channel pool for " + e.getKey(), e.getValue()::close));
runAndLogError(log, "Unable to close channel pools", pools::close);
runAndLogError(log, "Unable to shutdown event loop", sdkEventLoopGroup.eventLoopGroup()::shutdownGracefully);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,17 @@ protected void channelRead0(ChannelHandlerContext channelContext, HttpObject msg
ByteBuffer bb = copyToByteBuffer(fullContent);
fullContent.release();
requestContext.handler().onStream(new FullResponseContentPublisher(channelContext, bb, ef));
finalizeRequest(requestContext, channelContext);
finalizeResponse(requestContext, channelContext);
}
}

private static void finalizeRequest(RequestContext requestContext, ChannelHandlerContext channelContext) {
/**
* Finalize the response by completing the execute future and release the channel pool being used.
*
* @param requestContext the request context
* @param channelContext the channel context
*/
private static void finalizeResponse(RequestContext requestContext, ChannelHandlerContext channelContext) {
channelContext.channel().attr(RESPONSE_COMPLETE_KEY).set(true);
executeFuture(channelContext).complete(null);
if (!channelContext.channel().attr(KEEP_ALIVE).get()) {
Expand Down Expand Up @@ -268,7 +274,7 @@ public void onComplete() {
runAndLogError(String.format("Subscriber %s threw an exception in onComplete.", subscriber.toString()),
subscriber::onComplete);
} finally {
finalizeRequest(requestContext, channelContext);
finalizeResponse(requestContext, channelContext);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPool;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
import java.net.Socket;
Expand Down Expand Up @@ -84,6 +85,9 @@
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.http.nio.netty.internal.NettyConfiguration;
import software.amazon.awssdk.http.nio.netty.internal.SdkChannelOptions;
import software.amazon.awssdk.http.nio.netty.internal.SdkChannelPoolMap;
import software.amazon.awssdk.utils.AttributeMap;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -193,6 +197,29 @@ public void customChannelFactoryIsUsed() throws Exception {
Mockito.verify(channelFactory, atLeastOnce()).newChannel();
}

@Test
public void closeClient_shouldCloseUnderlyingResources() {
SdkEventLoopGroup eventLoopGroup = SdkEventLoopGroup.builder().build();
ChannelPool channelPool = mock(ChannelPool.class);
SdkChannelPoolMap<URI, ChannelPool> sdkChannelPoolMap = new SdkChannelPoolMap<URI, ChannelPool>() {
@Override
protected ChannelPool newPool(URI key) {
return channelPool;
}
};

sdkChannelPoolMap.get(URI.create("http://blah"));
SdkChannelOptions channelOptions = new SdkChannelOptions();
NettyConfiguration nettyConfiguration = new NettyConfiguration(AttributeMap.empty());

SdkAsyncHttpClient client = new NettyNioAsyncHttpClient(eventLoopGroup, sdkChannelPoolMap, channelOptions, nettyConfiguration, 1);

client.close();
assertThat(eventLoopGroup.eventLoopGroup().isShuttingDown()).isTrue();
assertThat(sdkChannelPoolMap).isEmpty();
Mockito.verify(channelPool).close();
}

/**
* Make a simple async request and wait for it to fiish.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {

@Override
public void close() {

// Nothing to close. The connections will be closed by closing the InputStreams.
}

private HttpURLConnection createAndConfigureConnection(HttpExecuteRequest request) {
Expand Down
Loading