Skip to content

PR https://github.com/spring-projects/spring-data-elasticsearch/pull/2497 might be missing valuable unit test case #2576

Closed
@patpatpat123

Description

@patpatpat123

Hello team,

In PR #2497, a great improvement was made for saveAll on flux.

797dbb5#diff-07601eb84212c1000dfec4ee54028e05ccbdae2f81a9756601a3ed0175e66bc4R229

However, the unit test cases from this PR are happy test cases (maybe a bit to happy). The cases are only a portion of what could really happen with a Flux.

I pulled the repo and added a generic test case, nothing fancy. Instead of a steady and simplistic flux, just add another unit test with this flux:

saveAll(Flux.interval(Duration.ofMillis(100)).map([...]))

Adding this unit test with Flux.interval (short interval), which is a common use case, can help improve the overall stability of this repo.

Question 1: Would it be possible to enhance the current test cases with a test where the flux is "more aggressive"?

Furthermore, adding this test will reveal a possible drawback with the current implementation.

The current implementation is vulnerable to Could not emit buffer due to lack of requests issue.

This is very easy to reproduce with a sample piece of code such as

@Service
public final class SomeService implements CommandLineRunner {

    @Autowired
    ReactiveElasticSeachRepository reactiveElasticSeachRepository;

    @Override
    public void run(final String[] args) {
        Flux<Foo> createCommandFlux = Flux.interval(Duration.ofMillis(100)).map(i -> new Foo(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC), String.valueOf(i)));
        Flux<Foo> savedFlux = reactiveElasticSeachRepository.saveAll(createCommandFlux);
        savedFlux.subscribe();
    }

}

issue.zip

While the root cause of the problem is from Reactor Core, the reactor team also mentioned the instability of this .bufferTimeout() method, and instead, even the reactor team suggest the usage of .window() + concatMap() instead.

Question 2: Instead of using a reactive operator which is known to be unstable, can this repo use a more appropriate operator such as .window() + concatMap()?

Thank you

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions