Skip to content

Commit 8e40275

Browse files
committed
Fix for handling transormation error in async DynamoDb Enhanced Client operations that return an SdkPublisher
1 parent 35c5f89 commit 8e40275

File tree

14 files changed

+684
-10
lines changed

14 files changed

+684
-10
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS DynamoDB Enhanced Client",
4+
"contributor": "",
5+
"description": "Publisher streams returned by async resources in the DynamoDB Enhanced Client now correctly handle mapping errors when they are encountered in the stream by calling onError on the subscriber and then implicitly cancelling the subscription. Previously the stream would just permanently hang and never complete."
6+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "The mapped publisher returned by SdkPublisher.map will now handle exceptions thrown by the mapping function by calling onError on its subscriber and then cancelling the subscription rather than throwing it back to the publishing process when it attempts to publish data."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/SdkPublisher.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,11 @@
2424
import org.reactivestreams.Subscriber;
2525
import software.amazon.awssdk.annotations.SdkPublicApi;
2626
import software.amazon.awssdk.utils.async.BufferingSubscriber;
27-
import software.amazon.awssdk.utils.async.DelegatingSubscriber;
2827
import software.amazon.awssdk.utils.async.FilteringSubscriber;
2928
import software.amazon.awssdk.utils.async.FlatteningSubscriber;
3029
import software.amazon.awssdk.utils.async.LimitingSubscriber;
3130
import software.amazon.awssdk.utils.async.SequentialSubscriber;
31+
import software.amazon.awssdk.utils.internal.MappingSubscriber;
3232

3333
/**
3434
* Interface that is implemented by the Async auto-paginated responses.
@@ -79,12 +79,7 @@ default SdkPublisher<T> filter(Predicate<T> predicate) {
7979
* @return New publisher with events mapped according to the given function.
8080
*/
8181
default <U> SdkPublisher<U> map(Function<T, U> mapper) {
82-
return subscriber -> subscribe(new DelegatingSubscriber<T, U>(subscriber) {
83-
@Override
84-
public void onNext(T t) {
85-
subscriber.onNext(mapper.apply(t));
86-
}
87-
});
82+
return subscriber -> subscribe(MappingSubscriber.create(subscriber, mapper));
8883
}
8984

9085
/**

core/sdk-core/src/test/java/software/amazon/awssdk/core/async/SdkPublishersTest.java

Lines changed: 84 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
import org.reactivestreams.Publisher;
2727
import org.reactivestreams.Subscriber;
2828
import org.reactivestreams.Subscription;
29-
3029
import software.amazon.awssdk.core.internal.async.SdkPublishers;
3130
import utils.FakePublisher;
31+
import utils.FakeSdkPublisher;
3232

3333
public class SdkPublishersTest {
3434
@Test
@@ -45,6 +45,45 @@ public void envelopeWrappedPublisher() {
4545
assertThat(fakeSubscriber.recordedEvents()).containsExactly("prefix:content", ":suffix");
4646
}
4747

48+
@Test
49+
public void mapTransformsCorrectly() {
50+
FakeSdkPublisher<String> fakePublisher = new FakeSdkPublisher<>();
51+
FakeStringSubscriber fakeSubscriber = new FakeStringSubscriber();
52+
fakePublisher.map(String::toUpperCase).subscribe(fakeSubscriber);
53+
54+
fakePublisher.publish("one");
55+
fakePublisher.publish("two");
56+
fakePublisher.complete();
57+
58+
assertThat(fakeSubscriber.recordedEvents()).containsExactly("ONE", "TWO");
59+
assertThat(fakeSubscriber.isComplete()).isTrue();
60+
assertThat(fakeSubscriber.isError()).isFalse();
61+
}
62+
63+
@Test
64+
public void mapHandlesError() {
65+
FakeSdkPublisher<String> fakePublisher = new FakeSdkPublisher<>();
66+
FakeStringSubscriber fakeSubscriber = new FakeStringSubscriber();
67+
RuntimeException exception = new IllegalArgumentException("Twos are not supported");
68+
69+
fakePublisher.map(s -> {
70+
if ("two".equals(s)) {
71+
throw exception;
72+
}
73+
74+
return s.toUpperCase();
75+
}).subscribe(fakeSubscriber);
76+
77+
fakePublisher.publish("one");
78+
fakePublisher.publish("two");
79+
fakePublisher.publish("three");
80+
81+
assertThat(fakeSubscriber.recordedEvents()).containsExactly("ONE");
82+
assertThat(fakeSubscriber.isComplete()).isFalse();
83+
assertThat(fakeSubscriber.isError()).isTrue();
84+
assertThat(fakeSubscriber.recordedErrors()).containsExactly(exception);
85+
}
86+
4887
private final static class FakeByteBufferSubscriber implements Subscriber<ByteBuffer> {
4988
private final List<String> recordedEvents = new ArrayList<>();
5089

@@ -73,4 +112,48 @@ public List<String> recordedEvents() {
73112
return this.recordedEvents;
74113
}
75114
}
115+
116+
private final static class FakeStringSubscriber implements Subscriber<String> {
117+
private final List<String> recordedEvents = new ArrayList<>();
118+
private final List<Throwable> recordedErrors = new ArrayList<>();
119+
private boolean isComplete = false;
120+
private boolean isError = false;
121+
122+
@Override
123+
public void onSubscribe(Subscription s) {
124+
s.request(1000);
125+
}
126+
127+
@Override
128+
public void onNext(String s) {
129+
recordedEvents.add(s);
130+
}
131+
132+
@Override
133+
public void onError(Throwable t) {
134+
recordedErrors.add(t);
135+
this.isError = true;
136+
}
137+
138+
@Override
139+
public void onComplete() {
140+
this.isComplete = true;
141+
}
142+
143+
public List<String> recordedEvents() {
144+
return this.recordedEvents;
145+
}
146+
147+
public List<Throwable> recordedErrors() {
148+
return this.recordedErrors;
149+
}
150+
151+
public boolean isComplete() {
152+
return isComplete;
153+
}
154+
155+
public boolean isError() {
156+
return isError;
157+
}
158+
}
76159
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package utils;
17+
18+
import org.reactivestreams.Subscriber;
19+
import org.reactivestreams.Subscription;
20+
import software.amazon.awssdk.core.async.SdkPublisher;
21+
22+
public class FakeSdkPublisher<T> implements SdkPublisher<T> {
23+
private Subscriber<? super T> delegateSubscriber;
24+
25+
@Override
26+
public void subscribe(Subscriber<? super T> subscriber) {
27+
this.delegateSubscriber = subscriber;
28+
this.delegateSubscriber.onSubscribe(new FakeSubscription());
29+
}
30+
31+
public void publish(T str) {
32+
this.delegateSubscriber.onNext(str);
33+
}
34+
35+
public void complete() {
36+
this.delegateSubscriber.onComplete();
37+
}
38+
39+
public void doThrow(Throwable t) {
40+
this.delegateSubscriber.onError(t);
41+
}
42+
43+
private static final class FakeSubscription implements Subscription {
44+
@Override
45+
public void request(long n) {
46+
47+
}
48+
49+
@Override
50+
public void cancel() {
51+
52+
}
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.enhanced.dynamodb.functionaltests;
17+
18+
import java.util.Iterator;
19+
import java.util.List;
20+
import java.util.concurrent.CompletionException;
21+
import org.junit.After;
22+
import org.junit.Before;
23+
import org.junit.Rule;
24+
import org.junit.Test;
25+
import org.junit.rules.ExpectedException;
26+
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbAsyncTable;
27+
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedAsyncClient;
28+
import software.amazon.awssdk.enhanced.dynamodb.Key;
29+
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
30+
import software.amazon.awssdk.enhanced.dynamodb.functionaltests.models.FakeEnum;
31+
import software.amazon.awssdk.enhanced.dynamodb.functionaltests.models.FakeEnumRecord;
32+
import software.amazon.awssdk.enhanced.dynamodb.functionaltests.models.FakeEnumShortened;
33+
import software.amazon.awssdk.enhanced.dynamodb.functionaltests.models.FakeEnumShortenedRecord;
34+
import software.amazon.awssdk.enhanced.dynamodb.model.Page;
35+
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
36+
37+
import static org.assertj.core.api.Assertions.assertThat;
38+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
39+
40+
public class FailedConversionAsyncTest extends LocalDynamoDbAsyncTestBase {
41+
private static final TableSchema<FakeEnumRecord> TABLE_SCHEMA = TableSchema.fromClass(FakeEnumRecord.class);
42+
private static final TableSchema<FakeEnumShortenedRecord> SHORT_TABLE_SCHEMA =
43+
TableSchema.fromClass(FakeEnumShortenedRecord.class);
44+
45+
private final DynamoDbEnhancedAsyncClient enhancedClient =
46+
DynamoDbEnhancedAsyncClient.builder()
47+
.dynamoDbClient(getDynamoDbAsyncClient())
48+
.build();
49+
50+
private final DynamoDbAsyncTable<FakeEnumRecord> mappedTable =
51+
enhancedClient.table(getConcreteTableName("table-name"), TABLE_SCHEMA);
52+
private final DynamoDbAsyncTable<FakeEnumShortenedRecord> mappedShortTable =
53+
enhancedClient.table(getConcreteTableName("table-name"), SHORT_TABLE_SCHEMA);
54+
55+
@Rule
56+
public ExpectedException exception = ExpectedException.none();
57+
58+
@Before
59+
public void createTable() {
60+
mappedTable.createTable(r -> r.provisionedThroughput(getDefaultProvisionedThroughput())).join();
61+
}
62+
63+
@After
64+
public void deleteTable() {
65+
getDynamoDbAsyncClient().deleteTable(DeleteTableRequest.builder()
66+
.tableName(getConcreteTableName("table-name"))
67+
.build()).join();
68+
}
69+
70+
@Test
71+
public void exceptionOnRead() {
72+
FakeEnumRecord record = new FakeEnumRecord();
73+
record.setId("123");
74+
record.setEnumAttribute(FakeEnum.TWO);
75+
mappedTable.putItem(record).join();
76+
77+
assertThatThrownBy(() -> mappedShortTable.getItem(Key.builder().partitionValue("123").build()).join())
78+
.isInstanceOf(CompletionException.class)
79+
.hasCauseInstanceOf(IllegalArgumentException.class)
80+
.hasMessageContaining("TWO")
81+
.hasMessageContaining("FakeEnumShortened");
82+
}
83+
84+
@Test
85+
public void iterableExceptionOnRead() {
86+
FakeEnumRecord record = new FakeEnumRecord();
87+
record.setId("1");
88+
record.setEnumAttribute(FakeEnum.ONE);
89+
mappedTable.putItem(record).join();
90+
record.setId("2");
91+
record.setEnumAttribute(FakeEnum.TWO);
92+
mappedTable.putItem(record).join();
93+
94+
List<Page<FakeEnumShortenedRecord>> results =
95+
drainPublisherToError(mappedShortTable.scan(r -> r.limit(1)), 1, IllegalArgumentException.class);
96+
97+
assertThat(results).hasOnlyOneElementSatisfying(
98+
page -> assertThat(page.items()).hasOnlyOneElementSatisfying(item -> {
99+
assertThat(item.getId()).isEqualTo("1");
100+
assertThat(item.getEnumAttribute()).isEqualTo(FakeEnumShortened.ONE);
101+
}));
102+
}
103+
}

0 commit comments

Comments
 (0)