Skip to content

Fixed the bug in the transfer manager where files were downloaded seq… #3098

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "S3 Transfer Manager (Preview)",
"contributor": "",
"type": "bugfix",
"description": "Fixed the bug in the transfer manager where files were downloaded sequentially in downloadDirectory. See [#3092](https://github.com/aws/aws-sdk-java-v2/issues/3092)"
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void onSubscribe(Subscription subscription) {
return;
}
this.subscription = subscription;
subscription.request(1);
subscription.request(maxConcurrentExecutions);
}

@Override
Expand Down Expand Up @@ -162,6 +162,13 @@ public void onComplete() {
flushBufferIfNeeded();
}

/**
* @return the number of requests that are currently in flight
*/
public int numRequestsInFlight() {
return numRequestsInFlight.get();
}

private static boolean isCompleteEvent(Object event) {
return COMPLETE_EVENT.equals(event);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,29 @@

import static org.assertj.core.api.Assertions.assertThat;

import io.reactivex.Flowable;
import io.reactivex.Observable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.IntStream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class AsyncBufferingSubscriberTest {
private static final int MAX_CONCURRENT_EXECUTIONS = 5;
private AsyncBufferingSubscriber<String> subscriber;
private Function<String, CompletableFuture<?>> consumer;
private CompletableFuture<Void> returnFuture;
Expand All @@ -45,7 +48,7 @@ class AsyncBufferingSubscriberTest {

@BeforeAll
public static void setUp() {
scheduledExecutorService = Executors.newScheduledThreadPool(2);
scheduledExecutorService = Executors.newScheduledThreadPool(5);
}

@BeforeEach
Expand All @@ -55,15 +58,16 @@ public void setUpPerTest() {
futures.add(new CompletableFuture<>());
}
Iterator<CompletableFuture<Void>> iterator = futures.iterator();
consumer = s -> iterator.next();

futures.forEach(f -> {
consumer = s -> {
CompletableFuture<Void> future = iterator.next();
scheduledExecutorService.schedule(() -> {
f.complete(null);
}, 1, TimeUnit.SECONDS);
});
future.complete(null);
}, 200, TimeUnit.MILLISECONDS);
return future;
};

subscriber = new AsyncBufferingSubscriber<>(consumer, returnFuture,
5);
MAX_CONCURRENT_EXECUTIONS);
}

@AfterAll
Expand All @@ -74,9 +78,21 @@ public static void cleanUp() {
@ParameterizedTest
@ValueSource(ints = {1, 4, 11, 20, 100})
void differentNumberOfStrings_shouldCompleteSuccessfully(int numberOfStrings) throws Exception {
new TestPublisher(numberOfStrings).subscribe(subscriber);
Flowable.fromArray(IntStream.range(0, numberOfStrings).mapToObj(String::valueOf).toArray(String[]::new)).subscribe(subscriber);


List<Integer> numRequestsInFlightSampling = new ArrayList<>();

Disposable disposable = Observable.interval(100, TimeUnit.MILLISECONDS, Schedulers.newThread())
.map(time -> subscriber.numRequestsInFlight())
.subscribe(numRequestsInFlightSampling::add, t -> {});

returnFuture.get(1000, TimeUnit.SECONDS);
assertThat(returnFuture).isCompleted().isNotCompletedExceptionally();
if (numberOfStrings >= MAX_CONCURRENT_EXECUTIONS) {
assertThat(numRequestsInFlightSampling).contains(MAX_CONCURRENT_EXECUTIONS);
}
disposable.dispose();
}

@Test
Expand All @@ -96,40 +112,4 @@ public void cancel() {
subscriber.onError(exception);
assertThat(returnFuture).isCompletedExceptionally();
}



private static final class TestPublisher implements Publisher<String> {
private final int numberOfStrings;
private volatile boolean isDone = false;
private final AtomicInteger requestNumber = new AtomicInteger(0);

private TestPublisher(int numberOfStrings) {
this.numberOfStrings = numberOfStrings;
}

@Override
public void subscribe(Subscriber<? super String> subscriber) {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
if (isDone) {
return;
}

if (requestNumber.incrementAndGet() > numberOfStrings) {
isDone = true;
subscriber.onComplete();
return;
}

subscriber.onNext("key" + requestNumber.get());
}

@Override
public void cancel() {
}
});
}
}
}