Description
Publisher ListObjectsV2Publisher
with chained downstream filter does not complete, when data being ready at subscription time.
Describe the bug
Running unit test to our S3AsyncClient
logic with completed futures as mocked response. Where mocking of S3AsyncClient.listObjectsV2Paginator
returns real instance of ListObjectsV2Publisher
. Using chained operations such as filter
or limit
never completes (see unit test below) and no close exception is raised.
Expected Behavior
Reactive streams chaining should complete, when data being ready at subscription time.
Steps to Reproduce
Prepared example unit test with Mockito.
public class AwsReadyPublisherTest {
@Test
public void shouldProcessReadyData() throws InterruptedException, ExecutionException, TimeoutException {
// when
S3AsyncClient client = mock(S3AsyncClient.class);
ListObjectsV2Response res = ListObjectsV2Response.builder().contents(
Stream.of(1, 2, 3, 4, 5)
.map(i -> S3Object.builder().key(Integer.toString(i)).build())
.toArray(S3Object[]::new)
).isTruncated(false).build();
doReturn(CompletableFuture.completedFuture(res))
.when(client).listObjectsV2(any(ListObjectsV2Request.class));
doAnswer(i -> new ListObjectsV2Publisher(client, i.getArgument(0, ListObjectsV2Request.class)))
.when(client).listObjectsV2Paginator(any(ListObjectsV2Request.class));
// then - expect no exception
client.listObjectsV2Paginator(ListObjectsV2Request.builder().bucket("test-bucket").build())
.contents()
.filter(o -> Integer.parseInt(o.key()) < 3)
.subscribe(o -> System.out.println(o.key()))
.exceptionally(e -> {
e.printStackTrace();
return null;
})
.get(5, TimeUnit.SECONDS);
}
}
Possible Solution
Looking at implementation of FilteringSubscriber
or LimitingSubscriber
specifically method onSubscribe
you can see that subscription is assigned after parent's onSubscribe
is executed. As data is being ready, the parent subscriber will request to upstream. Therefore when FilteringSubscriber
filters out an item and tries to request next the subscription is null
. No exception is being propagated outside.
The method should first assign the subscription.
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
super.onSubscribe(subscription);
}
Context
We have to prepare workaround to our unit tests.
Your Environment
- AWS Java SDK version used: 2.14.8
- JDK version used: 1.8
- Operating System and version: