Skip to content

Fixed an issue where event streams might fail with ClassCastException or NoSuchElementException. #2695

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions core/aws-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,6 @@
<artifactId>utils</artifactId>
<version>${awsjavasdk.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.eventstream</groupId>
<artifactId>eventstream</artifactId>
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void onComplete() {
.executor(Executors.newSingleThreadExecutor())
.future(new CompletableFuture<>())
.build();
transformer.prepare();
transformer.onStream(SdkPublisher.adapt(bytePublisher));
latch.await();
assertThat(numEvents)
Expand Down Expand Up @@ -327,9 +328,10 @@ private void verifyExceptionThrown(Map<String, HeaderValue> headers) {

Flowable<ByteBuffer> bytePublisher = Flowable.just(exceptionMessage.toByteBuffer());

SubscribingResponseHandler handler = new SubscribingResponseHandler();
AsyncResponseTransformer<SdkResponse, Void> transformer =
EventStreamAsyncResponseTransformer.builder()
.eventStreamResponseHandler(new SubscribingResponseHandler())
.eventStreamResponseHandler(handler)
.exceptionResponseHandler((response, executionAttributes) -> exception)
.executor(Executors.newSingleThreadExecutor())
.future(new CompletableFuture<>())
Expand All @@ -343,13 +345,16 @@ private void verifyExceptionThrown(Map<String, HeaderValue> headers) {
cf.join();
} catch (CompletionException e) {
if (e.getCause() instanceof SdkServiceException) {
throw ((SdkServiceException) e.getCause());
throw e.getCause();
}
}
}).isSameAs(exception);

assertThat(handler.exceptionOccurredCalled).isTrue();
}

private static class SubscribingResponseHandler implements EventStreamResponseHandler<Object, Object> {
private volatile boolean exceptionOccurredCalled = false;

@Override
public void responseReceived(Object response) {
Expand All @@ -363,6 +368,7 @@ public void onEventStream(SdkPublisher<Object> publisher) {

@Override
public void exceptionOccurred(Throwable throwable) {
exceptionOccurredCalled = true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.function.Predicate;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.utils.async.BufferingSubscriber;
import software.amazon.awssdk.utils.async.EventListeningSubscriber;
import software.amazon.awssdk.utils.async.FilteringSubscriber;
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
import software.amazon.awssdk.utils.async.LimitingSubscriber;
Expand Down Expand Up @@ -116,6 +118,36 @@ default SdkPublisher<T> limit(int limit) {
return subscriber -> subscribe(new LimitingSubscriber<>(subscriber, limit));
}

/**
* Add a callback that will be invoked after this publisher invokes {@link Subscriber#onComplete()}.
*
* @param afterOnComplete The logic that should be run immediately after onComplete.
* @return New publisher that invokes the requested callback.
*/
default SdkPublisher<T> doAfterOnComplete(Runnable afterOnComplete) {
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, afterOnComplete, null, null));
}

/**
* Add a callback that will be invoked after this publisher invokes {@link Subscriber#onError(Throwable)}.
*
* @param afterOnError The logic that should be run immediately after onError.
* @return New publisher that invokes the requested callback.
*/
default SdkPublisher<T> doAfterOnError(Consumer<Throwable> afterOnError) {
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, afterOnError, null));
}

/**
* Add a callback that will be invoked after this publisher invokes {@link Subscription#cancel()}.
*
* @param afterOnCancel The logic that should be run immediately after cancellation of the subscription.
* @return New publisher that invokes the requested callback.
*/
default SdkPublisher<T> doAfterOnCancel(Runnable afterOnCancel) {
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, null, afterOnCancel));
}

/**
* Subscribes to the publisher with the given {@link Consumer}. This consumer will be called for each event
* published. There is no backpressure using this method if the Consumer dispatches processing asynchronously. If more
Expand Down
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
<exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
<maven-deploy-plugin.version>2.8.2</maven-deploy-plugin.version>
<build-helper-maven-plugin.version>3.0.0</build-helper-maven-plugin.version>
<japicmp-maven-plugin.version>0.15.3</japicmp-maven-plugin.version>
<japicmp-maven-plugin.version>0.14.4</japicmp-maven-plugin.version>

<!-- These properties are used by Step functions for its dependencies -->
<json-path.version>2.4.0</json-path.version>
Expand Down Expand Up @@ -519,6 +519,7 @@
<onlyModified>true</onlyModified>
<excludes>
<exclude>*.internal.*</exclude>
<exclude>software.amazon.awssdk.thirdparty.*</exclude>
<exclude>software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler</exclude>
<exclude>software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler</exclude>
</excludes>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -201,9 +203,9 @@ private List<SubscribeToShardEventStream> subscribeToShard() throws Throwable {
SubscribeToShardResponseHandler.builder()
.subscriber(events::add)
.build())
.join();
.get(10, TimeUnit.SECONDS);
return events;
} catch (CompletionException e) {
} catch (ExecutionException e) {
throw e.getCause();
}
}
Expand Down Expand Up @@ -234,9 +236,6 @@ public void request(long l) {

@Override
public void cancel() {
RuntimeException e = new RuntimeException();
subscriber.onError(e);
value.onError(e);
}
}));
return cf;
Expand Down
5 changes: 5 additions & 0 deletions utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
<artifactId>commons-io</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@

package software.amazon.awssdk.utils.async;

import java.util.concurrent.atomic.AtomicBoolean;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkProtectedApi;

@SdkProtectedApi
public abstract class DelegatingSubscriber<T, U> implements Subscriber<T> {

protected final Subscriber<? super U> subscriber;
private final AtomicBoolean complete = new AtomicBoolean(false);

protected DelegatingSubscriber(Subscriber<? super U> subscriber) {
this.subscriber = subscriber;
Expand All @@ -35,12 +36,15 @@ public void onSubscribe(Subscription subscription) {

@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
if (complete.compareAndSet(false, true)) {
subscriber.onError(throwable);
}
}

@Override
public void onComplete() {
subscriber.onComplete();
if (complete.compareAndSet(false, true)) {
subscriber.onComplete();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.utils.async;

import java.util.function.Consumer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.utils.Logger;

/**
* A {@link Subscriber} that can invoke callbacks during various parts of the subscriber and subscription lifecycle.
*/
@SdkProtectedApi
public final class EventListeningSubscriber<T> extends DelegatingSubscriber<T, T> {
private static final Logger log = Logger.loggerFor(EventListeningSubscriber.class);

private final Runnable afterCompleteListener;
private final Consumer<Throwable> afterErrorListener;
private final Runnable afterCancelListener;

public EventListeningSubscriber(Subscriber<T> subscriber,
Runnable afterCompleteListener,
Consumer<Throwable> afterErrorListener,
Runnable afterCancelListener) {
super(subscriber);
this.afterCompleteListener = afterCompleteListener;
this.afterErrorListener = afterErrorListener;
this.afterCancelListener = afterCancelListener;
}

@Override
public void onNext(T t) {
super.subscriber.onNext(t);
}

@Override
public void onSubscribe(Subscription subscription) {
super.onSubscribe(new CancelListeningSubscriber(subscription));
}

@Override
public void onError(Throwable throwable) {
super.onError(throwable);
if (afterErrorListener != null) {
callListener(() -> afterErrorListener.accept(throwable),
"Post-onError callback failed. This exception will be dropped.");
}
}

@Override
public void onComplete() {
super.onComplete();
callListener(afterCompleteListener, "Post-onComplete callback failed. This exception will be dropped.");
}

private class CancelListeningSubscriber extends DelegatingSubscription {
protected CancelListeningSubscriber(Subscription s) {
super(s);
}

@Override
public void cancel() {
super.cancel();
callListener(afterCancelListener, "Post-cancel callback failed. This exception will be dropped.");
}
}

private void callListener(Runnable listener, String listenerFailureMessage) {
if (listener != null) {
try {
listener.run();
} catch (RuntimeException e) {
log.error(() -> listenerFailureMessage, e);
}
}
}
}
Loading