diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-ef61843.json b/.changes/next-release/bugfix-AWSSDKforJavav2-ef61843.json new file mode 100644 index 000000000000..23dfbbb163ab --- /dev/null +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-ef61843.json @@ -0,0 +1,6 @@ +{ + "category": "AWS SDK for Java v2", + "contributor": "", + "type": "bugfix", + "description": "Complete the future returned by SdkPublisher.subscribe or filter exceptionally if the subscriber or predicate throws an exception." +} diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/SdkPublishersTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/SdkPublishersTest.java index b7c88409e4a7..838ff2af095d 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/SdkPublishersTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/async/SdkPublishersTest.java @@ -16,12 +16,17 @@ package software.amazon.awssdk.core.async; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.Test; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -84,6 +89,60 @@ public void mapHandlesError() { assertThat(fakeSubscriber.recordedErrors()).containsExactly(exception); } + @Test + public void subscribeHandlesError() { + FakeSdkPublisher fakePublisher = new FakeSdkPublisher<>(); + RuntimeException exception = new IllegalArgumentException("Failure!"); + + CompletableFuture subscribeFuture = fakePublisher.subscribe(s -> { + throw exception; + }); + + fakePublisher.publish("one"); + fakePublisher.complete(); + + assertThat(subscribeFuture.isCompletedExceptionally()).isTrue(); + assertThatThrownBy(() -> subscribeFuture.get(5, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasCause(exception); + } + + @Test + public void filterHandlesError() { + FakeSdkPublisher fakePublisher = new FakeSdkPublisher<>(); + RuntimeException exception = new IllegalArgumentException("Failure!"); + + CompletableFuture subscribeFuture = fakePublisher.filter(s -> { + throw exception; + }).subscribe(r -> {}); + + fakePublisher.publish("one"); + fakePublisher.complete(); + + assertThat(subscribeFuture.isCompletedExceptionally()).isTrue(); + assertThatThrownBy(() -> subscribeFuture.get(5, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasCause(exception); + } + + @Test + public void flatMapIterableHandlesError() { + FakeSdkPublisher fakePublisher = new FakeSdkPublisher<>(); + RuntimeException exception = new IllegalArgumentException("Failure!"); + + CompletableFuture subscribeFuture = fakePublisher.flatMapIterable(s -> { + throw exception; + }).subscribe(r -> {}); + + fakePublisher.publish("one"); + fakePublisher.complete(); + + assertThat(subscribeFuture.isCompletedExceptionally()).isTrue(); + assertThatThrownBy(() -> subscribeFuture.get(5, TimeUnit.SECONDS)) + .isInstanceOf(ExecutionException.class) + .hasCause(exception); + } + private final static class FakeByteBufferSubscriber implements Subscriber { private final List recordedEvents = new ArrayList<>(); diff --git a/services/dynamodb/src/it/java/software/amazon/awssdk/services/dynamodb/PaginatorIntegrationTest.java b/services/dynamodb/src/it/java/software/amazon/awssdk/services/dynamodb/PaginatorIntegrationTest.java index 837698604839..e0041619ef0f 100644 --- a/services/dynamodb/src/it/java/software/amazon/awssdk/services/dynamodb/PaginatorIntegrationTest.java +++ b/services/dynamodb/src/it/java/software/amazon/awssdk/services/dynamodb/PaginatorIntegrationTest.java @@ -24,10 +24,15 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.BiFunction; import java.util.stream.Stream; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.core.pagination.sync.SdkIterable; import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; @@ -43,6 +48,7 @@ import software.amazon.awssdk.services.dynamodb.model.ScanRequest; import software.amazon.awssdk.services.dynamodb.model.ScanResponse; import software.amazon.awssdk.services.dynamodb.paginators.ScanIterable; +import software.amazon.awssdk.services.dynamodb.paginators.ScanPublisher; import utils.resources.tables.BasicTempTable; import utils.test.util.DynamoDBTestBase; import utils.test.util.TableUtils; @@ -211,6 +217,46 @@ public void mix_Iterator_And_Stream_Calls() { } assertEquals(ITEM_COUNT, count); } + + @Test + public void sdkPublisher_subscribe_handlesExceptions() throws Exception { + RuntimeException innerException = new RuntimeException(); + ScanRequest request = scanRequest(2); + try { + dynamoAsync.scanPaginator(request).subscribe(r -> { + throw innerException; + }).get(5, TimeUnit.SECONDS); + } catch (ExecutionException executionException) { + assertThat(executionException.getCause()).isSameAs(innerException); + } + } + + @Test + public void sdkPublisher_filter_handlesExceptions() { + sdkPublisherFunctionHandlesException((p, t) -> p.filter(f -> { throw t; })); + } + + @Test + public void sdkPublisher_map_handlesExceptions() { + sdkPublisherFunctionHandlesException((p, t) -> p.map(f -> { throw t; })); + } + + @Test + public void sdkPublisher_flatMapIterable_handlesExceptions() { + sdkPublisherFunctionHandlesException((p, t) -> p.flatMapIterable(f -> { throw t; })); + } + + public void sdkPublisherFunctionHandlesException(BiFunction> function) { + RuntimeException innerException = new RuntimeException(); + ScanRequest request = scanRequest(2); + try { + function.apply(dynamoAsync.scanPaginator(request), innerException).subscribe(r -> {}).get(5, TimeUnit.SECONDS); + } catch (ExecutionException executionException) { + assertThat(executionException.getCause()).isSameAs(innerException); + } catch (InterruptedException | TimeoutException e) { + throw new AssertionError("SDK Publisher function did not handle exceptions correctly.", e); + } + } private static void putTestData() { Map item = new HashMap(); diff --git a/services/dynamodb/src/test/java/utils/test/util/DynamoDBTestBase.java b/services/dynamodb/src/test/java/utils/test/util/DynamoDBTestBase.java index 70058d71b4e3..0c1e07c7cf69 100644 --- a/services/dynamodb/src/test/java/utils/test/util/DynamoDBTestBase.java +++ b/services/dynamodb/src/test/java/utils/test/util/DynamoDBTestBase.java @@ -24,10 +24,10 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; - import software.amazon.awssdk.awscore.exception.AwsServiceException; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest; @@ -43,6 +43,8 @@ public class DynamoDBTestBase extends AwsTestBase { protected static DynamoDbClient dynamo; + protected static DynamoDbAsyncClient dynamoAsync; + private static final Logger log = Logger.loggerFor(DynamoDBTestBase.class); public static void setUpTestBase() { @@ -53,6 +55,7 @@ public static void setUpTestBase() { } dynamo = DynamoDbClient.builder().region(REGION).credentialsProvider(CREDENTIALS_PROVIDER_CHAIN).build(); + dynamoAsync = DynamoDbAsyncClient.builder().region(REGION).credentialsProvider(CREDENTIALS_PROVIDER_CHAIN).build(); } public static DynamoDbClient getClient() { diff --git a/utils/src/main/java/software/amazon/awssdk/utils/async/FilteringSubscriber.java b/utils/src/main/java/software/amazon/awssdk/utils/async/FilteringSubscriber.java index c93f16870919..973a40cc29d4 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/async/FilteringSubscriber.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/async/FilteringSubscriber.java @@ -40,11 +40,17 @@ public void onSubscribe(Subscription subscription) { @Override public void onNext(T t) { - if (predicate.test(t)) { - subscriber.onNext(t); - } else { - // Consumed a demand but didn't deliver. Request other to make up for it - subscription.request(1); + try { + if (predicate.test(t)) { + subscriber.onNext(t); + } else { + // Consumed a demand but didn't deliver. Request other to make up for it + subscription.request(1); + } + } catch (RuntimeException e) { + // Handle the predicate throwing an exception + subscription.cancel(); + onError(e); } } } diff --git a/utils/src/main/java/software/amazon/awssdk/utils/async/SequentialSubscriber.java b/utils/src/main/java/software/amazon/awssdk/utils/async/SequentialSubscriber.java index 77db8e2d15b5..2b7dbc3225aa 100644 --- a/utils/src/main/java/software/amazon/awssdk/utils/async/SequentialSubscriber.java +++ b/utils/src/main/java/software/amazon/awssdk/utils/async/SequentialSubscriber.java @@ -46,8 +46,14 @@ public void onSubscribe(Subscription subscription) { @Override public void onNext(T t) { - consumer.accept(t); - subscription.request(1); + try { + consumer.accept(t); + subscription.request(1); + } catch (RuntimeException e) { + // Handle the consumer throwing an exception + subscription.cancel(); + future.completeExceptionally(e); + } } @Override