Skip to content

Commit 870eb50

Browse files
John Viegasjoviegas
John Viegas
authored andcommitted
BugFix for PR #2044 S3AsyncClient FilteringSubscriber and LimitingSubscriber Timeouts on get
1 parent c66f0a5 commit 870eb50

File tree

6 files changed

+298
-7
lines changed

6 files changed

+298
-7
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "AWS SDK for Java v2",
3+
"contributor": "",
4+
"type": "bugfix",
5+
"description": "Fixing FilteringSubscriber and LimitingSubscriber to complete when subscribing criteria is completed."
6+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package utils;
17+
18+
import org.junit.Test;
19+
import org.junit.runner.RunWith;
20+
import org.mockito.Mock;
21+
import org.mockito.runners.MockitoJUnitRunner;
22+
import org.reactivestreams.Subscriber;
23+
import software.amazon.awssdk.utils.internal.async.EmptySubscription;
24+
25+
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
26+
import static org.mockito.Mockito.never;
27+
import static org.mockito.Mockito.verify;
28+
29+
@RunWith(MockitoJUnitRunner.class)
30+
public class EmptySubscriptionTest {
31+
32+
@Mock
33+
private Subscriber<String> mockSubscriber;
34+
35+
@Test
36+
public void emptySubscription_with_invalid_request() {
37+
EmptySubscription emptySubscription = new EmptySubscription(mockSubscriber);
38+
assertThatIllegalArgumentException().isThrownBy(() -> emptySubscription.request(-1));
39+
}
40+
41+
@Test
42+
public void emptySubscription_with_normal_execution() {
43+
EmptySubscription emptySubscription = new EmptySubscription(mockSubscriber);
44+
emptySubscription.request(1);
45+
verify(mockSubscriber).onComplete();
46+
}
47+
48+
@Test
49+
public void emptySubscription_when_terminated_externally() {
50+
EmptySubscription emptySubscription = new EmptySubscription(mockSubscriber);
51+
emptySubscription.cancel();
52+
emptySubscription.request(1);
53+
verify(mockSubscriber, never()).onComplete();
54+
}
55+
}
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package utils;
17+
18+
import org.junit.Before;
19+
import org.junit.Test;
20+
import org.junit.runner.RunWith;
21+
import org.mockito.Mock;
22+
import org.mockito.Mockito;
23+
import org.mockito.runners.MockitoJUnitRunner;
24+
import org.reactivestreams.Subscriber;
25+
import software.amazon.awssdk.core.pagination.async.AsyncPageFetcher;
26+
import software.amazon.awssdk.core.pagination.async.PaginatedItemsPublisher;
27+
import software.amazon.awssdk.utils.async.LimitingSubscriber;
28+
import software.amazon.awssdk.utils.internal.async.EmptySubscription;
29+
30+
import java.util.ArrayList;
31+
import java.util.Arrays;
32+
import java.util.Iterator;
33+
import java.util.List;
34+
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.ExecutorService;
37+
import java.util.concurrent.Executors;
38+
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.TimeoutException;
40+
import java.util.function.Function;
41+
42+
import static org.assertj.core.api.Assertions.assertThat;
43+
import static org.mockito.Matchers.anyObject;
44+
import static org.mockito.Mockito.*;
45+
46+
@RunWith(MockitoJUnitRunner.class)
47+
public class SdkSubscriberTest {
48+
49+
public static final Function<Integer, Iterator<Integer>> SAMPLE_ITERATOR = response -> Arrays.asList(1, 2, 3, 4, 5, 6).listIterator();
50+
public static final Function<Integer, Iterator<Integer>> EMPTY_ITERATOR = response -> new ArrayList<Integer>().listIterator();
51+
@Mock
52+
AsyncPageFetcher asyncPageFetcher;
53+
PaginatedItemsPublisher<Integer, Integer> itemsPublisher;
54+
55+
@Mock
56+
Subscriber<Integer> mockSubscriber;
57+
58+
@Before
59+
public void setUp() {
60+
doReturn(CompletableFuture.completedFuture(1))
61+
.when(asyncPageFetcher).nextPage(null);
62+
doReturn(false)
63+
.when(asyncPageFetcher).hasNextPage(anyObject());
64+
}
65+
66+
@Test
67+
public void limitingSubscriber_with_different_limits() throws InterruptedException, ExecutionException, TimeoutException {
68+
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
69+
.iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build();
70+
71+
final List<Integer> belowLimit = new ArrayList<>();
72+
itemsPublisher.limit(3).subscribe(e -> belowLimit.add(e)).get(5, TimeUnit.SECONDS);
73+
assertThat(belowLimit).isEqualTo(Arrays.asList(1, 2, 3));
74+
75+
final List<Integer> beyondLimit = new ArrayList<>();
76+
itemsPublisher.limit(33).subscribe(e -> beyondLimit.add(e)).get(5, TimeUnit.SECONDS);
77+
assertThat(beyondLimit).isEqualTo(Arrays.asList(1, 2, 3, 4, 5, 6));
78+
79+
final List<Integer> zeroLimit = new ArrayList<>();
80+
itemsPublisher.limit(0).subscribe(e -> zeroLimit.add(e)).get(5, TimeUnit.SECONDS);
81+
assertThat(zeroLimit).isEqualTo(Arrays.asList());
82+
}
83+
84+
@Test
85+
public void filteringSubscriber_with_different_filters() throws InterruptedException, ExecutionException, TimeoutException {
86+
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
87+
.iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build();
88+
89+
final List<Integer> filteredSomeList = new ArrayList<>();
90+
itemsPublisher.filter(i -> i % 2 == 0).subscribe(e -> filteredSomeList.add(e)).get(5, TimeUnit.SECONDS);
91+
assertThat(filteredSomeList).isEqualTo(Arrays.asList(2, 4, 6));
92+
93+
final List<Integer> filteredAllList = new ArrayList<>();
94+
itemsPublisher.filter(i -> i % 10 == 0).subscribe(e -> filteredAllList.add(e)).get(5, TimeUnit.SECONDS);
95+
assertThat(filteredAllList).isEqualTo(Arrays.asList());
96+
97+
final List<Integer> filteredNone = new ArrayList<>();
98+
itemsPublisher.filter(i -> i % 1 == 0).subscribe(e -> filteredNone.add(e)).get(5, TimeUnit.SECONDS);
99+
assertThat(filteredNone).isEqualTo(Arrays.asList(1, 2, 3, 4, 5, 6));
100+
101+
}
102+
103+
@Test
104+
public void limit_and_filter_subscriber_chained_with_different_conditions() throws InterruptedException, ExecutionException, TimeoutException {
105+
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
106+
.iteratorFunction(SAMPLE_ITERATOR).isLastPage(false).build();
107+
108+
final List<Integer> belowLimitWithFiltering = new ArrayList<>();
109+
itemsPublisher.limit(4).filter(i -> i % 2 == 0).subscribe(e -> belowLimitWithFiltering.add(e)).get(5, TimeUnit.SECONDS);
110+
assertThat(belowLimitWithFiltering).isEqualTo(Arrays.asList(2, 4));
111+
112+
final List<Integer> beyondLimitWithAllFiltering = new ArrayList<>();
113+
itemsPublisher.limit(33).filter(i -> i % 10 == 0).subscribe(e -> beyondLimitWithAllFiltering.add(e)).get(5, TimeUnit.SECONDS);
114+
assertThat(beyondLimitWithAllFiltering).isEqualTo(Arrays.asList());
115+
116+
final List<Integer> zeroLimitAndNoFilter = new ArrayList<>();
117+
itemsPublisher.limit(0).filter(i -> i % 1 == 0).subscribe(e -> zeroLimitAndNoFilter.add(e)).get(5, TimeUnit.SECONDS);
118+
assertThat(zeroLimitAndNoFilter).isEqualTo(Arrays.asList());
119+
120+
final List<Integer> filteringbelowLimitWith = new ArrayList<>();
121+
itemsPublisher.filter(i -> i % 2 == 0).limit(2).subscribe(e -> filteringbelowLimitWith.add(e)).get(5, TimeUnit.SECONDS);
122+
assertThat(filteringbelowLimitWith).isEqualTo(Arrays.asList(2, 4));
123+
124+
final List<Integer> filteringAndOutsideLimit = new ArrayList<>();
125+
itemsPublisher.filter(i -> i % 10 == 0).limit(33).subscribe(e -> filteringAndOutsideLimit.add(e)).get(5, TimeUnit.SECONDS);
126+
assertThat(filteringAndOutsideLimit).isEqualTo(Arrays.asList());
127+
}
128+
129+
@Test
130+
public void limit__subscriber_with_empty_input_and_zero_limit() throws InterruptedException, ExecutionException, TimeoutException {
131+
itemsPublisher = PaginatedItemsPublisher.builder().nextPageFetcher(asyncPageFetcher)
132+
.iteratorFunction(EMPTY_ITERATOR).isLastPage(false).build();
133+
134+
final List<Integer> zeroLimit = new ArrayList<>();
135+
itemsPublisher.limit(0).subscribe(e -> zeroLimit.add(e)).get(5, TimeUnit.SECONDS);
136+
assertThat(zeroLimit).isEqualTo(Arrays.asList());
137+
138+
final List<Integer> nonZeroLimit = new ArrayList<>();
139+
itemsPublisher.limit(10).subscribe(e -> nonZeroLimit.add(e)).get(5, TimeUnit.SECONDS);
140+
assertThat(zeroLimit).isEqualTo(Arrays.asList());
141+
}
142+
143+
144+
@Test
145+
public void limiting_subscriber_with_multiple_thread_publishers() throws InterruptedException {
146+
final int limitFactor = 5;
147+
LimitingSubscriber<Integer> limitingSubscriber = new LimitingSubscriber<>(mockSubscriber, limitFactor);
148+
limitingSubscriber.onSubscribe(new EmptySubscription(mockSubscriber));
149+
final ExecutorService executorService = Executors.newFixedThreadPool(10);
150+
for (int i = 0; i < 10; i++) {
151+
final Integer integer = Integer.valueOf(i);
152+
executorService.submit(() -> limitingSubscriber.onNext(new Integer(integer)));
153+
}
154+
executorService.awaitTermination(300, TimeUnit.MILLISECONDS);
155+
Mockito.verify(mockSubscriber, times(limitFactor)).onNext(anyInt());
156+
Mockito.verify(mockSubscriber).onComplete();
157+
Mockito.verify(mockSubscriber).onSubscribe(anyObject());
158+
Mockito.verify(mockSubscriber, never()).onError(anyObject());
159+
}
160+
}

utils/src/main/java/software/amazon/awssdk/utils/async/FilteringSubscriber.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,8 @@ public FilteringSubscriber(Subscriber<? super T> sourceSubscriber, Predicate<T>
3434

3535
@Override
3636
public void onSubscribe(Subscription subscription) {
37-
super.onSubscribe(subscription);
3837
this.subscription = subscription;
38+
super.onSubscribe(subscription);
3939
}
4040

4141
@Override

utils/src/main/java/software/amazon/awssdk/utils/async/LimitingSubscriber.java

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.reactivestreams.Subscriber;
2020
import org.reactivestreams.Subscription;
2121
import software.amazon.awssdk.annotations.SdkProtectedApi;
22+
import software.amazon.awssdk.utils.internal.async.EmptySubscription;
2223

2324
@SdkProtectedApi
2425
public class LimitingSubscriber<T> extends DelegatingSubscriber<T, T> {
@@ -35,19 +36,25 @@ public LimitingSubscriber(Subscriber<? super T> subscriber, int limit) {
3536

3637
@Override
3738
public void onSubscribe(Subscription subscription) {
38-
super.onSubscribe(subscription);
3939
this.subscription = subscription;
40+
if (limit == 0) {
41+
subscription.cancel();
42+
super.onSubscribe(new EmptySubscription(super.subscriber));
43+
} else {
44+
super.onSubscribe(subscription);
45+
}
4046
}
4147

4248
@Override
4349
public void onNext(T t) {
50+
int deliveredItems = delivered.incrementAndGet();
4451
// We may get more events even after cancelling so we ignore them.
45-
if (delivered.get() < limit) {
52+
if (deliveredItems <= limit) {
4653
subscriber.onNext(t);
47-
}
48-
// If we've met the limit then we can cancel the subscription
49-
if (delivered.incrementAndGet() >= limit) {
50-
subscription.cancel();
54+
if (deliveredItems == limit) {
55+
subscription.cancel();
56+
subscriber.onComplete();
57+
}
5158
}
5259
}
5360
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.utils.internal.async;
17+
18+
import java.util.concurrent.atomic.AtomicBoolean;
19+
import org.reactivestreams.Subscriber;
20+
import org.reactivestreams.Subscription;
21+
import software.amazon.awssdk.annotations.SdkInternalApi;
22+
23+
/**
24+
* A NoOp implementation of {@link Subscription} interface.
25+
*
26+
* This subscription calls {@link Subscriber#onComplete()} on first request for data and then terminates the subscription.
27+
*/
28+
@SdkInternalApi
29+
public final class EmptySubscription implements Subscription {
30+
31+
private final AtomicBoolean isTerminated = new AtomicBoolean(false);
32+
private final Subscriber<?> subscriber;
33+
34+
public EmptySubscription(Subscriber<?> subscriber) {
35+
this.subscriber = subscriber;
36+
}
37+
38+
@Override
39+
public void request(long n) {
40+
if (isTerminated()) {
41+
return;
42+
}
43+
if (n <= 0) {
44+
throw new IllegalArgumentException("Non-positive request signals are illegal");
45+
}
46+
if (terminate()) {
47+
subscriber.onComplete();
48+
}
49+
}
50+
51+
@Override
52+
public void cancel() {
53+
terminate();
54+
}
55+
56+
private boolean terminate() {
57+
return isTerminated.compareAndSet(false, true);
58+
}
59+
60+
private boolean isTerminated() {
61+
return isTerminated.get();
62+
}
63+
}

0 commit comments

Comments
 (0)