Skip to content

Commit 79a24cc

Browse files
authored
Fixed the bug in the transfer manager where files were downloaded seq… (#3098)
* Fixed the bug in the transfer manager where files were downloaded sequentially in downloadDirectory * Add test
1 parent 704f71f commit 79a24cc

File tree

3 files changed

+42
-49
lines changed

3 files changed

+42
-49
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"category": "S3 Transfer Manager (Preview)",
3+
"contributor": "",
4+
"type": "bugfix",
5+
"description": "Fixed the bug in the transfer manager where files were downloaded sequentially in downloadDirectory. See [#3092](https://github.com/aws/aws-sdk-java-v2/issues/3092)"
6+
}

services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public void onSubscribe(Subscription subscription) {
6565
return;
6666
}
6767
this.subscription = subscription;
68-
subscription.request(1);
68+
subscription.request(maxConcurrentExecutions);
6969
}
7070

7171
@Override
@@ -162,6 +162,13 @@ public void onComplete() {
162162
flushBufferIfNeeded();
163163
}
164164

165+
/**
166+
* @return the number of requests that are currently in flight
167+
*/
168+
public int numRequestsInFlight() {
169+
return numRequestsInFlight.get();
170+
}
171+
165172
private static boolean isCompleteEvent(Object event) {
166173
return COMPLETE_EVENT.equals(event);
167174
}

services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java

Lines changed: 28 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,26 +17,29 @@
1717

1818
import static org.assertj.core.api.Assertions.assertThat;
1919

20+
import io.reactivex.Flowable;
21+
import io.reactivex.Observable;
22+
import io.reactivex.disposables.Disposable;
23+
import io.reactivex.schedulers.Schedulers;
2024
import java.util.ArrayList;
2125
import java.util.Iterator;
2226
import java.util.List;
2327
import java.util.concurrent.CompletableFuture;
2428
import java.util.concurrent.Executors;
2529
import java.util.concurrent.ScheduledExecutorService;
2630
import java.util.concurrent.TimeUnit;
27-
import java.util.concurrent.atomic.AtomicInteger;
2831
import java.util.function.Function;
32+
import java.util.stream.IntStream;
2933
import org.junit.jupiter.api.AfterAll;
3034
import org.junit.jupiter.api.BeforeAll;
3135
import org.junit.jupiter.api.BeforeEach;
3236
import org.junit.jupiter.api.Test;
3337
import org.junit.jupiter.params.ParameterizedTest;
3438
import org.junit.jupiter.params.provider.ValueSource;
35-
import org.reactivestreams.Publisher;
36-
import org.reactivestreams.Subscriber;
3739
import org.reactivestreams.Subscription;
3840

3941
class AsyncBufferingSubscriberTest {
42+
private static final int MAX_CONCURRENT_EXECUTIONS = 5;
4043
private AsyncBufferingSubscriber<String> subscriber;
4144
private Function<String, CompletableFuture<?>> consumer;
4245
private CompletableFuture<Void> returnFuture;
@@ -45,7 +48,7 @@ class AsyncBufferingSubscriberTest {
4548

4649
@BeforeAll
4750
public static void setUp() {
48-
scheduledExecutorService = Executors.newScheduledThreadPool(2);
51+
scheduledExecutorService = Executors.newScheduledThreadPool(5);
4952
}
5053

5154
@BeforeEach
@@ -55,15 +58,16 @@ public void setUpPerTest() {
5558
futures.add(new CompletableFuture<>());
5659
}
5760
Iterator<CompletableFuture<Void>> iterator = futures.iterator();
58-
consumer = s -> iterator.next();
59-
60-
futures.forEach(f -> {
61+
consumer = s -> {
62+
CompletableFuture<Void> future = iterator.next();
6163
scheduledExecutorService.schedule(() -> {
62-
f.complete(null);
63-
}, 1, TimeUnit.SECONDS);
64-
});
64+
future.complete(null);
65+
}, 200, TimeUnit.MILLISECONDS);
66+
return future;
67+
};
68+
6569
subscriber = new AsyncBufferingSubscriber<>(consumer, returnFuture,
66-
5);
70+
MAX_CONCURRENT_EXECUTIONS);
6771
}
6872

6973
@AfterAll
@@ -74,9 +78,21 @@ public static void cleanUp() {
7478
@ParameterizedTest
7579
@ValueSource(ints = {1, 4, 11, 20, 100})
7680
void differentNumberOfStrings_shouldCompleteSuccessfully(int numberOfStrings) throws Exception {
77-
new TestPublisher(numberOfStrings).subscribe(subscriber);
81+
Flowable.fromArray(IntStream.range(0, numberOfStrings).mapToObj(String::valueOf).toArray(String[]::new)).subscribe(subscriber);
82+
83+
84+
List<Integer> numRequestsInFlightSampling = new ArrayList<>();
85+
86+
Disposable disposable = Observable.interval(100, TimeUnit.MILLISECONDS, Schedulers.newThread())
87+
.map(time -> subscriber.numRequestsInFlight())
88+
.subscribe(numRequestsInFlightSampling::add, t -> {});
89+
7890
returnFuture.get(1000, TimeUnit.SECONDS);
7991
assertThat(returnFuture).isCompleted().isNotCompletedExceptionally();
92+
if (numberOfStrings >= MAX_CONCURRENT_EXECUTIONS) {
93+
assertThat(numRequestsInFlightSampling).contains(MAX_CONCURRENT_EXECUTIONS);
94+
}
95+
disposable.dispose();
8096
}
8197

8298
@Test
@@ -96,40 +112,4 @@ public void cancel() {
96112
subscriber.onError(exception);
97113
assertThat(returnFuture).isCompletedExceptionally();
98114
}
99-
100-
101-
102-
private static final class TestPublisher implements Publisher<String> {
103-
private final int numberOfStrings;
104-
private volatile boolean isDone = false;
105-
private final AtomicInteger requestNumber = new AtomicInteger(0);
106-
107-
private TestPublisher(int numberOfStrings) {
108-
this.numberOfStrings = numberOfStrings;
109-
}
110-
111-
@Override
112-
public void subscribe(Subscriber<? super String> subscriber) {
113-
subscriber.onSubscribe(new Subscription() {
114-
@Override
115-
public void request(long n) {
116-
if (isDone) {
117-
return;
118-
}
119-
120-
if (requestNumber.incrementAndGet() > numberOfStrings) {
121-
isDone = true;
122-
subscriber.onComplete();
123-
return;
124-
}
125-
126-
subscriber.onNext("key" + requestNumber.get());
127-
}
128-
129-
@Override
130-
public void cancel() {
131-
}
132-
});
133-
}
134-
}
135115
}

0 commit comments

Comments
 (0)