diff --git a/.changes/next-release/bugfix-S3TransferManagerPreview-000ecbf.json b/.changes/next-release/bugfix-S3TransferManagerPreview-000ecbf.json new file mode 100644 index 000000000000..6957b1a6b9d8 --- /dev/null +++ b/.changes/next-release/bugfix-S3TransferManagerPreview-000ecbf.json @@ -0,0 +1,6 @@ +{ + "category": "S3 Transfer Manager (Preview)", + "contributor": "", + "type": "bugfix", + "description": "Fixed the bug in the transfer manager where files were downloaded sequentially in downloadDirectory. See [#3092](https://github.com/aws/aws-sdk-java-v2/issues/3092)" +} diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java index 8ac981c92cbe..8b30e7fec5a7 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriber.java @@ -65,7 +65,7 @@ public void onSubscribe(Subscription subscription) { return; } this.subscription = subscription; - subscription.request(1); + subscription.request(maxConcurrentExecutions); } @Override @@ -162,6 +162,13 @@ public void onComplete() { flushBufferIfNeeded(); } + /** + * @return the number of requests that are currently in flight + */ + public int numRequestsInFlight() { + return numRequestsInFlight.get(); + } + private static boolean isCompleteEvent(Object event) { return COMPLETE_EVENT.equals(event); } diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java index e4beff325fbd..5671d53a6a55 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/AsyncBufferingSubscriberTest.java @@ -17,6 +17,10 @@ import static org.assertj.core.api.Assertions.assertThat; +import io.reactivex.Flowable; +import io.reactivex.Observable; +import io.reactivex.disposables.Disposable; +import io.reactivex.schedulers.Schedulers; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @@ -24,19 +28,18 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import java.util.stream.IntStream; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; class AsyncBufferingSubscriberTest { + private static final int MAX_CONCURRENT_EXECUTIONS = 5; private AsyncBufferingSubscriber subscriber; private Function> consumer; private CompletableFuture returnFuture; @@ -45,7 +48,7 @@ class AsyncBufferingSubscriberTest { @BeforeAll public static void setUp() { - scheduledExecutorService = Executors.newScheduledThreadPool(2); + scheduledExecutorService = Executors.newScheduledThreadPool(5); } @BeforeEach @@ -55,15 +58,16 @@ public void setUpPerTest() { futures.add(new CompletableFuture<>()); } Iterator> iterator = futures.iterator(); - consumer = s -> iterator.next(); - - futures.forEach(f -> { + consumer = s -> { + CompletableFuture future = iterator.next(); scheduledExecutorService.schedule(() -> { - f.complete(null); - }, 1, TimeUnit.SECONDS); - }); + future.complete(null); + }, 200, TimeUnit.MILLISECONDS); + return future; + }; + subscriber = new AsyncBufferingSubscriber<>(consumer, returnFuture, - 5); + MAX_CONCURRENT_EXECUTIONS); } @AfterAll @@ -74,9 +78,21 @@ public static void cleanUp() { @ParameterizedTest @ValueSource(ints = {1, 4, 11, 20, 100}) void differentNumberOfStrings_shouldCompleteSuccessfully(int numberOfStrings) throws Exception { - new TestPublisher(numberOfStrings).subscribe(subscriber); + Flowable.fromArray(IntStream.range(0, numberOfStrings).mapToObj(String::valueOf).toArray(String[]::new)).subscribe(subscriber); + + + List numRequestsInFlightSampling = new ArrayList<>(); + + Disposable disposable = Observable.interval(100, TimeUnit.MILLISECONDS, Schedulers.newThread()) + .map(time -> subscriber.numRequestsInFlight()) + .subscribe(numRequestsInFlightSampling::add, t -> {}); + returnFuture.get(1000, TimeUnit.SECONDS); assertThat(returnFuture).isCompleted().isNotCompletedExceptionally(); + if (numberOfStrings >= MAX_CONCURRENT_EXECUTIONS) { + assertThat(numRequestsInFlightSampling).contains(MAX_CONCURRENT_EXECUTIONS); + } + disposable.dispose(); } @Test @@ -96,40 +112,4 @@ public void cancel() { subscriber.onError(exception); assertThat(returnFuture).isCompletedExceptionally(); } - - - - private static final class TestPublisher implements Publisher { - private final int numberOfStrings; - private volatile boolean isDone = false; - private final AtomicInteger requestNumber = new AtomicInteger(0); - - private TestPublisher(int numberOfStrings) { - this.numberOfStrings = numberOfStrings; - } - - @Override - public void subscribe(Subscriber subscriber) { - subscriber.onSubscribe(new Subscription() { - @Override - public void request(long n) { - if (isDone) { - return; - } - - if (requestNumber.incrementAndGet() > numberOfStrings) { - isDone = true; - subscriber.onComplete(); - return; - } - - subscriber.onNext("key" + requestNumber.get()); - } - - @Override - public void cancel() { - } - }); - } - } }