Skip to content

Commit 219f755

Browse files
authored
Handle exceptions thrown by the consumer passed to SdkPublisher.subscribe or filter. (#2877)
Previously, the future returned by `subscribe` or `filter` would never be completed in this scenario. Now, the result future is completed exceptionally with the exception raised by the consumer.
1 parent faa99c0 commit 219f755

File tree

6 files changed

+134
-8
lines changed

6 files changed

+134
-8
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"contributor": "",
4+
"type": "bugfix",
5+
"description": "Complete the future returned by SdkPublisher.subscribe or filter exceptionally if the subscriber or predicate throws an exception."
6+
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/async/SdkPublishersTest.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,17 @@
1616
package software.amazon.awssdk.core.async;
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
1920

2021
import java.nio.ByteBuffer;
2122
import java.nio.charset.StandardCharsets;
2223
import java.util.ArrayList;
2324
import java.util.List;
2425

26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.TimeoutException;
2530
import org.junit.Test;
2631
import org.reactivestreams.Publisher;
2732
import org.reactivestreams.Subscriber;
@@ -84,6 +89,60 @@ public void mapHandlesError() {
8489
assertThat(fakeSubscriber.recordedErrors()).containsExactly(exception);
8590
}
8691

92+
@Test
93+
public void subscribeHandlesError() {
94+
FakeSdkPublisher<String> fakePublisher = new FakeSdkPublisher<>();
95+
RuntimeException exception = new IllegalArgumentException("Failure!");
96+
97+
CompletableFuture<Void> subscribeFuture = fakePublisher.subscribe(s -> {
98+
throw exception;
99+
});
100+
101+
fakePublisher.publish("one");
102+
fakePublisher.complete();
103+
104+
assertThat(subscribeFuture.isCompletedExceptionally()).isTrue();
105+
assertThatThrownBy(() -> subscribeFuture.get(5, TimeUnit.SECONDS))
106+
.isInstanceOf(ExecutionException.class)
107+
.hasCause(exception);
108+
}
109+
110+
@Test
111+
public void filterHandlesError() {
112+
FakeSdkPublisher<String> fakePublisher = new FakeSdkPublisher<>();
113+
RuntimeException exception = new IllegalArgumentException("Failure!");
114+
115+
CompletableFuture<Void> subscribeFuture = fakePublisher.filter(s -> {
116+
throw exception;
117+
}).subscribe(r -> {});
118+
119+
fakePublisher.publish("one");
120+
fakePublisher.complete();
121+
122+
assertThat(subscribeFuture.isCompletedExceptionally()).isTrue();
123+
assertThatThrownBy(() -> subscribeFuture.get(5, TimeUnit.SECONDS))
124+
.isInstanceOf(ExecutionException.class)
125+
.hasCause(exception);
126+
}
127+
128+
@Test
129+
public void flatMapIterableHandlesError() {
130+
FakeSdkPublisher<String> fakePublisher = new FakeSdkPublisher<>();
131+
RuntimeException exception = new IllegalArgumentException("Failure!");
132+
133+
CompletableFuture<Void> subscribeFuture = fakePublisher.flatMapIterable(s -> {
134+
throw exception;
135+
}).subscribe(r -> {});
136+
137+
fakePublisher.publish("one");
138+
fakePublisher.complete();
139+
140+
assertThat(subscribeFuture.isCompletedExceptionally()).isTrue();
141+
assertThatThrownBy(() -> subscribeFuture.get(5, TimeUnit.SECONDS))
142+
.isInstanceOf(ExecutionException.class)
143+
.hasCause(exception);
144+
}
145+
87146
private final static class FakeByteBufferSubscriber implements Subscriber<ByteBuffer> {
88147
private final List<String> recordedEvents = new ArrayList<>();
89148

services/dynamodb/src/it/java/software/amazon/awssdk/services/dynamodb/PaginatorIntegrationTest.java

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,15 @@
2424
import java.util.List;
2525
import java.util.Map;
2626
import java.util.Random;
27+
import java.util.concurrent.ExecutionException;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.TimeoutException;
30+
import java.util.function.BiFunction;
2731
import java.util.stream.Stream;
2832
import org.junit.AfterClass;
2933
import org.junit.BeforeClass;
3034
import org.junit.Test;
35+
import software.amazon.awssdk.core.async.SdkPublisher;
3136
import software.amazon.awssdk.core.pagination.sync.SdkIterable;
3237
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
3338
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -43,6 +48,7 @@
4348
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
4449
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
4550
import software.amazon.awssdk.services.dynamodb.paginators.ScanIterable;
51+
import software.amazon.awssdk.services.dynamodb.paginators.ScanPublisher;
4652
import utils.resources.tables.BasicTempTable;
4753
import utils.test.util.DynamoDBTestBase;
4854
import utils.test.util.TableUtils;
@@ -211,6 +217,46 @@ public void mix_Iterator_And_Stream_Calls() {
211217
}
212218
assertEquals(ITEM_COUNT, count);
213219
}
220+
221+
@Test
222+
public void sdkPublisher_subscribe_handlesExceptions() throws Exception {
223+
RuntimeException innerException = new RuntimeException();
224+
ScanRequest request = scanRequest(2);
225+
try {
226+
dynamoAsync.scanPaginator(request).subscribe(r -> {
227+
throw innerException;
228+
}).get(5, TimeUnit.SECONDS);
229+
} catch (ExecutionException executionException) {
230+
assertThat(executionException.getCause()).isSameAs(innerException);
231+
}
232+
}
233+
234+
@Test
235+
public void sdkPublisher_filter_handlesExceptions() {
236+
sdkPublisherFunctionHandlesException((p, t) -> p.filter(f -> { throw t; }));
237+
}
238+
239+
@Test
240+
public void sdkPublisher_map_handlesExceptions() {
241+
sdkPublisherFunctionHandlesException((p, t) -> p.map(f -> { throw t; }));
242+
}
243+
244+
@Test
245+
public void sdkPublisher_flatMapIterable_handlesExceptions() {
246+
sdkPublisherFunctionHandlesException((p, t) -> p.flatMapIterable(f -> { throw t; }));
247+
}
248+
249+
public void sdkPublisherFunctionHandlesException(BiFunction<ScanPublisher, RuntimeException, SdkPublisher<?>> function) {
250+
RuntimeException innerException = new RuntimeException();
251+
ScanRequest request = scanRequest(2);
252+
try {
253+
function.apply(dynamoAsync.scanPaginator(request), innerException).subscribe(r -> {}).get(5, TimeUnit.SECONDS);
254+
} catch (ExecutionException executionException) {
255+
assertThat(executionException.getCause()).isSameAs(innerException);
256+
} catch (InterruptedException | TimeoutException e) {
257+
throw new AssertionError("SDK Publisher function did not handle exceptions correctly.", e);
258+
}
259+
}
214260

215261
private static void putTestData() {
216262
Map<String, AttributeValue> item = new HashMap();

services/dynamodb/src/test/java/utils/test/util/DynamoDBTestBase.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
import java.util.HashSet;
2525
import java.util.Map;
2626
import java.util.Set;
27-
2827
import software.amazon.awssdk.awscore.exception.AwsServiceException;
2928
import software.amazon.awssdk.core.exception.SdkClientException;
3029
import software.amazon.awssdk.regions.Region;
30+
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
3131
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
3232
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
3333
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
@@ -43,6 +43,8 @@ public class DynamoDBTestBase extends AwsTestBase {
4343

4444
protected static DynamoDbClient dynamo;
4545

46+
protected static DynamoDbAsyncClient dynamoAsync;
47+
4648
private static final Logger log = Logger.loggerFor(DynamoDBTestBase.class);
4749

4850
public static void setUpTestBase() {
@@ -53,6 +55,7 @@ public static void setUpTestBase() {
5355
}
5456

5557
dynamo = DynamoDbClient.builder().region(REGION).credentialsProvider(CREDENTIALS_PROVIDER_CHAIN).build();
58+
dynamoAsync = DynamoDbAsyncClient.builder().region(REGION).credentialsProvider(CREDENTIALS_PROVIDER_CHAIN).build();
5659
}
5760

5861
public static DynamoDbClient getClient() {

utils/src/main/java/software/amazon/awssdk/utils/async/FilteringSubscriber.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,17 @@ public void onSubscribe(Subscription subscription) {
4040

4141
@Override
4242
public void onNext(T t) {
43-
if (predicate.test(t)) {
44-
subscriber.onNext(t);
45-
} else {
46-
// Consumed a demand but didn't deliver. Request other to make up for it
47-
subscription.request(1);
43+
try {
44+
if (predicate.test(t)) {
45+
subscriber.onNext(t);
46+
} else {
47+
// Consumed a demand but didn't deliver. Request other to make up for it
48+
subscription.request(1);
49+
}
50+
} catch (RuntimeException e) {
51+
// Handle the predicate throwing an exception
52+
subscription.cancel();
53+
onError(e);
4854
}
4955
}
5056
}

utils/src/main/java/software/amazon/awssdk/utils/async/SequentialSubscriber.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,14 @@ public void onSubscribe(Subscription subscription) {
4646

4747
@Override
4848
public void onNext(T t) {
49-
consumer.accept(t);
50-
subscription.request(1);
49+
try {
50+
consumer.accept(t);
51+
subscription.request(1);
52+
} catch (RuntimeException e) {
53+
// Handle the consumer throwing an exception
54+
subscription.cancel();
55+
future.completeExceptionally(e);
56+
}
5157
}
5258

5359
@Override

0 commit comments

Comments
 (0)