Skip to content

Merge CRT HTTP client changes (request and response refactor) #3507

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@
<Bug pattern="URF_UNREAD_FIELD, RV_RETURN_VALUE_IGNORED"/>
</Match>

<Match>
<Class name="software.amazon.awssdk.utils.async.StoringSubscriber"/>
<Method name="drop"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED"/>
</Match>


<Match>
<Class name="software.amazon.awssdk.http.crt.internal.response.CrtResponseBodyPublisher" />
<Method name="subscribe"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ public interface Builder extends SdkAsyncHttpClient.Builder<AwsCrtAsyncHttpClien
*
* @param readBufferSize The number of bytes that can be buffered
* @return The builder of the method chaining.
*
* TODO: This is also used for the write buffer size. Should we rename it?
*/
Builder readBufferSize(int readBufferSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public CompletableFuture<Void> execute(CrtRequestContext executionContext) {

HttpRequest crtRequest = CrtRequestAdapter.toCrtRequest(executionContext);
HttpStreamResponseHandler crtResponseHandler =
CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, executionContext);
CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, asyncRequest.responseHandler());

// Submit the request on the connection
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,27 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber;
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult;

/**
* Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods
*/
@SdkInternalApi
final class CrtRequestBodyAdapter implements HttpRequestBodyStream {
private final int windowSize;
private final CrtRequestBodySubscriber requestBodySubscriber;
private final SdkHttpContentPublisher requestPublisher;
private final ByteBufferStoringSubscriber requestBodySubscriber;

CrtRequestBodyAdapter(SdkHttpContentPublisher requestPublisher, int windowSize) {
this.windowSize = Validate.isPositive(windowSize, "windowSize is <= 0");
this.requestBodySubscriber = new CrtRequestBodySubscriber(windowSize);
CrtRequestBodyAdapter(SdkHttpContentPublisher requestPublisher, int readLimit) {
this.requestPublisher = requestPublisher;
this.requestBodySubscriber = new ByteBufferStoringSubscriber(readLimit);
requestPublisher.subscribe(requestBodySubscriber);
}

@Override
public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
return requestBodySubscriber.transferRequestBody(bodyBytesOut);
return requestBodySubscriber.transferTo(bodyBytesOut) == TransferResult.END_OF_STREAM;
}

@Override
public long getLength() {
return requestPublisher.contentLength().orElse(0L);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.http.crt.internal.response;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.CRT;
Expand All @@ -26,10 +27,10 @@
import software.amazon.awssdk.crt.http.HttpStreamResponseHandler;
import software.amazon.awssdk.http.HttpStatusFamily;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.crt.internal.CrtRequestContext;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.async.SimplePublisher;

/**
* Implements the CrtHttpStreamHandler API and converts CRT callbacks into calls to SDK AsyncExecuteRequest methods
Expand All @@ -39,97 +40,110 @@ public final class CrtResponseAdapter implements HttpStreamResponseHandler {
private static final Logger log = Logger.loggerFor(CrtResponseAdapter.class);

private final HttpClientConnection connection;
private final CompletableFuture<Void> responseComplete;
private final AsyncExecuteRequest sdkRequest;
private final SdkHttpResponse.Builder respBuilder = SdkHttpResponse.builder();
private final int windowSize;
private CrtResponseBodyPublisher respBodyPublisher;
private final CompletableFuture<Void> completionFuture;
private final SdkAsyncHttpResponseHandler responseHandler;
private final SimplePublisher<ByteBuffer> responsePublisher = new SimplePublisher<>();

private CrtResponseAdapter(HttpClientConnection connection,
CompletableFuture<Void> responseComplete,
AsyncExecuteRequest sdkRequest,
int windowSize) {
this.connection = Validate.notNull(connection, "HttpConnection is null");
this.responseComplete = Validate.notNull(responseComplete, "reqComplete Future is null");
this.sdkRequest = Validate.notNull(sdkRequest, "AsyncExecuteRequest Future is null");
this.windowSize = Validate.isPositive(windowSize, "windowSize is <= 0");
}
private final SdkHttpResponse.Builder responseBuilder = SdkHttpResponse.builder();

public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnection connection,
CompletableFuture<Void> responseComplete,
CrtRequestContext request) {
return new CrtResponseAdapter(connection, responseComplete, request.sdkRequest(), request.readBufferSize());
private CrtResponseAdapter(HttpClientConnection connection,
CompletableFuture<Void> completionFuture,
SdkAsyncHttpResponseHandler responseHandler) {
this.connection = Validate.paramNotNull(connection, "connection");
this.completionFuture = Validate.paramNotNull(completionFuture, "completionFuture");
this.responseHandler = Validate.paramNotNull(responseHandler, "responseHandler");
}

private void initRespBodyPublisherIfNeeded(HttpStream stream) {
if (respBodyPublisher == null) {
respBodyPublisher = new CrtResponseBodyPublisher(connection, stream, responseComplete, windowSize);
}
public static HttpStreamResponseHandler toCrtResponseHandler(HttpClientConnection crtConn,
CompletableFuture<Void> requestFuture,
SdkAsyncHttpResponseHandler responseHandler) {
return new CrtResponseAdapter(crtConn, requestFuture, responseHandler);
}

@Override
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blockType, HttpHeader[] nextHeaders) {
initRespBodyPublisherIfNeeded(stream);

for (HttpHeader h : nextHeaders) {
respBuilder.appendHeader(h.getName(), h.getValue());
public void onResponseHeaders(HttpStream stream, int responseStatusCode, int headerType, HttpHeader[] nextHeaders) {
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
for (HttpHeader h : nextHeaders) {
responseBuilder.appendHeader(h.getName(), h.getValue());
}
}
}

@Override
public void onResponseHeadersDone(HttpStream stream, int headerType) {
if (headerType == HttpHeaderBlock.MAIN.getValue()) {
initRespBodyPublisherIfNeeded(stream);

respBuilder.statusCode(stream.getResponseStatusCode());
sdkRequest.responseHandler().onHeaders(respBuilder.build());
sdkRequest.responseHandler().onStream(respBodyPublisher);
responseBuilder.statusCode(stream.getResponseStatusCode());
responseHandler.onHeaders(responseBuilder.build());
responseHandler.onStream(responsePublisher);
}
}

@Override
public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) {
initRespBodyPublisherIfNeeded(stream);
CompletableFuture<Void> writeFuture = responsePublisher.send(ByteBuffer.wrap(bodyBytesIn));

if (writeFuture.isDone() && !writeFuture.isCompletedExceptionally()) {
// Optimization: If write succeeded immediately, return non-zero to avoid the extra call back into the CRT.
return bodyBytesIn.length;
}

respBodyPublisher.queueBuffer(bodyBytesIn);
respBodyPublisher.publishToSubscribers();
writeFuture.whenComplete((result, failure) -> {
if (failure != null) {
failResponseHandlerAndFuture(stream, failure);
return;
}

stream.incrementWindow(bodyBytesIn.length);
});

/*
* Intentionally zero. We manually manage the crt stream's window within the body publisher by updating with
* the exact amount we were able to push to the subcriber.
*
* See the call to stream.incrementWindow() in AwsCrtResponseBodyPublisher.
*/
return 0;
}

@Override
public void onResponseComplete(HttpStream stream, int errorCode) {
initRespBodyPublisherIfNeeded(stream);

if (HttpStatusFamily.of(respBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
connection.shutdown();
}

if (errorCode == CRT.AWS_CRT_SUCCESS) {
log.debug(() -> "Response Completed Successfully");
respBodyPublisher.setQueueComplete();
respBodyPublisher.publishToSubscribers();
onSuccessfulResponseComplete(stream);
} else {
HttpException error = new HttpException(errorCode);
log.error(() -> "Response Encountered an Error.", error);

// Invoke Error Callback on SdkAsyncHttpResponseHandler
try {
sdkRequest.responseHandler().onError(error);
} catch (Exception e) {
log.error(() -> String.format("SdkAsyncHttpResponseHandler %s threw an exception in onError: %s",
sdkRequest.responseHandler(), e));
onFailedResponseComplete(stream, new HttpException(errorCode));
}
}

private void onSuccessfulResponseComplete(HttpStream stream) {
responsePublisher.complete().whenComplete((result, failure) -> {
if (failure != null) {
failResponseHandlerAndFuture(stream, failure);
return;
}

if (HttpStatusFamily.of(responseBuilder.statusCode()) == HttpStatusFamily.SERVER_ERROR) {
connection.shutdown();
}

// Invoke Error Callback on any Subscriber's of the Response Body
respBodyPublisher.setError(error);
respBodyPublisher.publishToSubscribers();
connection.close();
stream.close();
completionFuture.complete(null);
});
}

private void onFailedResponseComplete(HttpStream stream, HttpException error) {
log.error(() -> "HTTP response encountered an error.", error);
responsePublisher.error(error);
failResponseHandlerAndFuture(stream, error);
}

private void failResponseHandlerAndFuture(HttpStream stream, Throwable error) {
callResponseHandlerOnError(error);
completionFuture.completeExceptionally(error);
connection.shutdown();
connection.close();
stream.close();
}

private void callResponseHandlerOnError(Throwable error) {
try {
responseHandler.onError(error);
} catch (RuntimeException e) {
log.warn(() -> "Exception raised from SdkAsyncHttpResponseHandler#onError.", e);
}
}
}
Loading