Skip to content

S3AsyncClient page publisher not completing #2044

Closed
@langrp

Description

@langrp

Publisher ListObjectsV2Publisher with chained downstream filter does not complete, when data being ready at subscription time.

Describe the bug

Running unit test to our S3AsyncClient logic with completed futures as mocked response. Where mocking of S3AsyncClient.listObjectsV2Paginator returns real instance of ListObjectsV2Publisher. Using chained operations such as filter or limit never completes (see unit test below) and no close exception is raised.

Expected Behavior

Reactive streams chaining should complete, when data being ready at subscription time.

Steps to Reproduce

Prepared example unit test with Mockito.

public class AwsReadyPublisherTest {

  @Test
  public void shouldProcessReadyData() throws InterruptedException, ExecutionException, TimeoutException {
    // when
    S3AsyncClient client = mock(S3AsyncClient.class);

    ListObjectsV2Response res = ListObjectsV2Response.builder().contents(
      Stream.of(1, 2, 3, 4, 5)
        .map(i -> S3Object.builder().key(Integer.toString(i)).build())
        .toArray(S3Object[]::new)
    ).isTruncated(false).build();

    doReturn(CompletableFuture.completedFuture(res))
      .when(client).listObjectsV2(any(ListObjectsV2Request.class));
    doAnswer(i -> new ListObjectsV2Publisher(client, i.getArgument(0, ListObjectsV2Request.class)))
      .when(client).listObjectsV2Paginator(any(ListObjectsV2Request.class));

    // then - expect no exception
    client.listObjectsV2Paginator(ListObjectsV2Request.builder().bucket("test-bucket").build())
      .contents()
      .filter(o -> Integer.parseInt(o.key()) < 3)
      .subscribe(o -> System.out.println(o.key()))
      .exceptionally(e -> {
        e.printStackTrace();
        return null;
      })
      .get(5, TimeUnit.SECONDS);
  }

}

Possible Solution

Looking at implementation of FilteringSubscriber or LimitingSubscriber specifically method onSubscribe you can see that subscription is assigned after parent's onSubscribe is executed. As data is being ready, the parent subscriber will request to upstream. Therefore when FilteringSubscriber filters out an item and tries to request next the subscription is null. No exception is being propagated outside.

The method should first assign the subscription.

    public void onSubscribe(Subscription subscription) {
        this.subscription = subscription;
        super.onSubscribe(subscription);
    }

Context

We have to prepare workaround to our unit tests.

Your Environment

  • AWS Java SDK version used: 2.14.8
  • JDK version used: 1.8
  • Operating System and version:

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugThis issue is a bug.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions