Skip to content

Commit 985ef39

Browse files
authored
Fixed an issue where event streams might fail with ClassCastException or NoSuchElementException. (#2695)
* Revert "Revert "Fixed an issue where event streams might fail with ClassCastException or NoSuchElementException. (#2684)"" This reverts commit fcb4ef3. * Fix the event listener callback delegate.
1 parent 1ef4b2b commit 985ef39

File tree

13 files changed

+764
-403
lines changed

13 files changed

+764
-403
lines changed

core/aws-core/pom.xml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,6 @@
7373
<artifactId>utils</artifactId>
7474
<version>${awsjavasdk.version}</version>
7575
</dependency>
76-
<dependency>
77-
<groupId>org.slf4j</groupId>
78-
<artifactId>slf4j-api</artifactId>
79-
</dependency>
8076
<dependency>
8177
<groupId>software.amazon.eventstream</groupId>
8278
<artifactId>eventstream</artifactId>

core/aws-core/src/main/java/software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformer.java

Lines changed: 129 additions & 344 deletions
Large diffs are not rendered by default.

core/aws-core/src/test/java/software/amazon/awssdk/awscore/eventstream/EventStreamAsyncResponseTransformerTest.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ public void onComplete() {
8787
.executor(Executors.newSingleThreadExecutor())
8888
.future(new CompletableFuture<>())
8989
.build();
90+
transformer.prepare();
9091
transformer.onStream(SdkPublisher.adapt(bytePublisher));
9192
latch.await();
9293
assertThat(numEvents)
@@ -327,9 +328,10 @@ private void verifyExceptionThrown(Map<String, HeaderValue> headers) {
327328

328329
Flowable<ByteBuffer> bytePublisher = Flowable.just(exceptionMessage.toByteBuffer());
329330

331+
SubscribingResponseHandler handler = new SubscribingResponseHandler();
330332
AsyncResponseTransformer<SdkResponse, Void> transformer =
331333
EventStreamAsyncResponseTransformer.builder()
332-
.eventStreamResponseHandler(new SubscribingResponseHandler())
334+
.eventStreamResponseHandler(handler)
333335
.exceptionResponseHandler((response, executionAttributes) -> exception)
334336
.executor(Executors.newSingleThreadExecutor())
335337
.future(new CompletableFuture<>())
@@ -343,13 +345,16 @@ private void verifyExceptionThrown(Map<String, HeaderValue> headers) {
343345
cf.join();
344346
} catch (CompletionException e) {
345347
if (e.getCause() instanceof SdkServiceException) {
346-
throw ((SdkServiceException) e.getCause());
348+
throw e.getCause();
347349
}
348350
}
349351
}).isSameAs(exception);
352+
353+
assertThat(handler.exceptionOccurredCalled).isTrue();
350354
}
351355

352356
private static class SubscribingResponseHandler implements EventStreamResponseHandler<Object, Object> {
357+
private volatile boolean exceptionOccurredCalled = false;
353358

354359
@Override
355360
public void responseReceived(Object response) {
@@ -363,6 +368,7 @@ public void onEventStream(SdkPublisher<Object> publisher) {
363368

364369
@Override
365370
public void exceptionOccurred(Throwable throwable) {
371+
exceptionOccurredCalled = true;
366372
}
367373

368374
@Override

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/SdkPublisher.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@
2222
import java.util.function.Predicate;
2323
import org.reactivestreams.Publisher;
2424
import org.reactivestreams.Subscriber;
25+
import org.reactivestreams.Subscription;
2526
import software.amazon.awssdk.annotations.SdkPublicApi;
2627
import software.amazon.awssdk.utils.async.BufferingSubscriber;
28+
import software.amazon.awssdk.utils.async.EventListeningSubscriber;
2729
import software.amazon.awssdk.utils.async.FilteringSubscriber;
2830
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
2931
import software.amazon.awssdk.utils.async.LimitingSubscriber;
@@ -116,6 +118,36 @@ default SdkPublisher<T> limit(int limit) {
116118
return subscriber -> subscribe(new LimitingSubscriber<>(subscriber, limit));
117119
}
118120

121+
/**
122+
* Add a callback that will be invoked after this publisher invokes {@link Subscriber#onComplete()}.
123+
*
124+
* @param afterOnComplete The logic that should be run immediately after onComplete.
125+
* @return New publisher that invokes the requested callback.
126+
*/
127+
default SdkPublisher<T> doAfterOnComplete(Runnable afterOnComplete) {
128+
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, afterOnComplete, null, null));
129+
}
130+
131+
/**
132+
* Add a callback that will be invoked after this publisher invokes {@link Subscriber#onError(Throwable)}.
133+
*
134+
* @param afterOnError The logic that should be run immediately after onError.
135+
* @return New publisher that invokes the requested callback.
136+
*/
137+
default SdkPublisher<T> doAfterOnError(Consumer<Throwable> afterOnError) {
138+
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, afterOnError, null));
139+
}
140+
141+
/**
142+
* Add a callback that will be invoked after this publisher invokes {@link Subscription#cancel()}.
143+
*
144+
* @param afterOnCancel The logic that should be run immediately after cancellation of the subscription.
145+
* @return New publisher that invokes the requested callback.
146+
*/
147+
default SdkPublisher<T> doAfterOnCancel(Runnable afterOnCancel) {
148+
return subscriber -> subscribe(new EventListeningSubscriber<>(subscriber, null, null, afterOnCancel));
149+
}
150+
119151
/**
120152
* Subscribes to the publisher with the given {@link Consumer}. This consumer will be called for each event
121153
* published. There is no backpressure using this method if the Consumer dispatches processing asynchronously. If more

pom.xml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@
146146
<exec-maven-plugin.version>1.6.0</exec-maven-plugin.version>
147147
<maven-deploy-plugin.version>2.8.2</maven-deploy-plugin.version>
148148
<build-helper-maven-plugin.version>3.0.0</build-helper-maven-plugin.version>
149-
<japicmp-maven-plugin.version>0.15.3</japicmp-maven-plugin.version>
149+
<japicmp-maven-plugin.version>0.14.4</japicmp-maven-plugin.version>
150150

151151
<!-- These properties are used by Step functions for its dependencies -->
152152
<json-path.version>2.4.0</json-path.version>
@@ -519,6 +519,7 @@
519519
<onlyModified>true</onlyModified>
520520
<excludes>
521521
<exclude>*.internal.*</exclude>
522+
<exclude>software.amazon.awssdk.thirdparty.*</exclude>
522523
<exclude>software.amazon.awssdk.awscore.client.handler.AwsAsyncClientHandler</exclude>
523524
<exclude>software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler</exclude>
524525
</excludes>

services/kinesis/src/test/java/software/amazon/awssdk/services/kinesis/SubscribeToShardUnmarshallingTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import java.util.List;
3030
import java.util.concurrent.CompletableFuture;
3131
import java.util.concurrent.CompletionException;
32+
import java.util.concurrent.ExecutionException;
33+
import java.util.concurrent.TimeUnit;
3234
import java.util.concurrent.atomic.AtomicInteger;
3335
import org.junit.Before;
3436
import org.junit.Test;
@@ -201,9 +203,9 @@ private List<SubscribeToShardEventStream> subscribeToShard() throws Throwable {
201203
SubscribeToShardResponseHandler.builder()
202204
.subscriber(events::add)
203205
.build())
204-
.join();
206+
.get(10, TimeUnit.SECONDS);
205207
return events;
206-
} catch (CompletionException e) {
208+
} catch (ExecutionException e) {
207209
throw e.getCause();
208210
}
209211
}
@@ -234,9 +236,6 @@ public void request(long l) {
234236

235237
@Override
236238
public void cancel() {
237-
RuntimeException e = new RuntimeException();
238-
subscriber.onError(e);
239-
value.onError(e);
240239
}
241240
}));
242241
return cf;

utils/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,11 @@
8484
<artifactId>commons-io</artifactId>
8585
<scope>test</scope>
8686
</dependency>
87+
<dependency>
88+
<groupId>org.reactivestreams</groupId>
89+
<artifactId>reactive-streams-tck</artifactId>
90+
<scope>test</scope>
91+
</dependency>
8792
</dependencies>
8893

8994
<build>

utils/src/main/java/software/amazon/awssdk/utils/async/DelegatingSubscriber.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,15 @@
1515

1616
package software.amazon.awssdk.utils.async;
1717

18+
import java.util.concurrent.atomic.AtomicBoolean;
1819
import org.reactivestreams.Subscriber;
1920
import org.reactivestreams.Subscription;
2021
import software.amazon.awssdk.annotations.SdkProtectedApi;
2122

2223
@SdkProtectedApi
2324
public abstract class DelegatingSubscriber<T, U> implements Subscriber<T> {
24-
2525
protected final Subscriber<? super U> subscriber;
26+
private final AtomicBoolean complete = new AtomicBoolean(false);
2627

2728
protected DelegatingSubscriber(Subscriber<? super U> subscriber) {
2829
this.subscriber = subscriber;
@@ -35,12 +36,15 @@ public void onSubscribe(Subscription subscription) {
3536

3637
@Override
3738
public void onError(Throwable throwable) {
38-
subscriber.onError(throwable);
39+
if (complete.compareAndSet(false, true)) {
40+
subscriber.onError(throwable);
41+
}
3942
}
4043

4144
@Override
4245
public void onComplete() {
43-
subscriber.onComplete();
46+
if (complete.compareAndSet(false, true)) {
47+
subscriber.onComplete();
48+
}
4449
}
45-
4650
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.utils.async;
17+
18+
import java.util.function.Consumer;
19+
import org.reactivestreams.Subscriber;
20+
import org.reactivestreams.Subscription;
21+
import software.amazon.awssdk.annotations.SdkProtectedApi;
22+
import software.amazon.awssdk.utils.Logger;
23+
24+
/**
25+
* A {@link Subscriber} that can invoke callbacks during various parts of the subscriber and subscription lifecycle.
26+
*/
27+
@SdkProtectedApi
28+
public final class EventListeningSubscriber<T> extends DelegatingSubscriber<T, T> {
29+
private static final Logger log = Logger.loggerFor(EventListeningSubscriber.class);
30+
31+
private final Runnable afterCompleteListener;
32+
private final Consumer<Throwable> afterErrorListener;
33+
private final Runnable afterCancelListener;
34+
35+
public EventListeningSubscriber(Subscriber<T> subscriber,
36+
Runnable afterCompleteListener,
37+
Consumer<Throwable> afterErrorListener,
38+
Runnable afterCancelListener) {
39+
super(subscriber);
40+
this.afterCompleteListener = afterCompleteListener;
41+
this.afterErrorListener = afterErrorListener;
42+
this.afterCancelListener = afterCancelListener;
43+
}
44+
45+
@Override
46+
public void onNext(T t) {
47+
super.subscriber.onNext(t);
48+
}
49+
50+
@Override
51+
public void onSubscribe(Subscription subscription) {
52+
super.onSubscribe(new CancelListeningSubscriber(subscription));
53+
}
54+
55+
@Override
56+
public void onError(Throwable throwable) {
57+
super.onError(throwable);
58+
if (afterErrorListener != null) {
59+
callListener(() -> afterErrorListener.accept(throwable),
60+
"Post-onError callback failed. This exception will be dropped.");
61+
}
62+
}
63+
64+
@Override
65+
public void onComplete() {
66+
super.onComplete();
67+
callListener(afterCompleteListener, "Post-onComplete callback failed. This exception will be dropped.");
68+
}
69+
70+
private class CancelListeningSubscriber extends DelegatingSubscription {
71+
protected CancelListeningSubscriber(Subscription s) {
72+
super(s);
73+
}
74+
75+
@Override
76+
public void cancel() {
77+
super.cancel();
78+
callListener(afterCancelListener, "Post-cancel callback failed. This exception will be dropped.");
79+
}
80+
}
81+
82+
private void callListener(Runnable listener, String listenerFailureMessage) {
83+
if (listener != null) {
84+
try {
85+
listener.run();
86+
} catch (RuntimeException e) {
87+
log.error(() -> listenerFailureMessage, e);
88+
}
89+
}
90+
}
91+
}

0 commit comments

Comments
 (0)