Description
Hello team,
In PR #2497, a great improvement was made for saveAll on flux.
797dbb5#diff-07601eb84212c1000dfec4ee54028e05ccbdae2f81a9756601a3ed0175e66bc4R229
However, the unit test cases from this PR are happy test cases (maybe a bit to happy). The cases are only a portion of what could really happen with a Flux.
I pulled the repo and added a generic test case, nothing fancy. Instead of a steady and simplistic flux, just add another unit test with this flux:
saveAll(Flux.interval(Duration.ofMillis(100)).map([...]))
Adding this unit test with Flux.interval (short interval), which is a common use case, can help improve the overall stability of this repo.
Question 1: Would it be possible to enhance the current test cases with a test where the flux is "more aggressive"?
Furthermore, adding this test will reveal a possible drawback with the current implementation.
The current implementation is vulnerable to Could not emit buffer due to lack of requests
issue.
This is very easy to reproduce with a sample piece of code such as
@Service
public final class SomeService implements CommandLineRunner {
@Autowired
ReactiveElasticSeachRepository reactiveElasticSeachRepository;
@Override
public void run(final String[] args) {
Flux<Foo> createCommandFlux = Flux.interval(Duration.ofMillis(100)).map(i -> new Foo(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC), String.valueOf(i)));
Flux<Foo> savedFlux = reactiveElasticSeachRepository.saveAll(createCommandFlux);
savedFlux.subscribe();
}
}
While the root cause of the problem is from Reactor Core, the reactor team also mentioned the instability of this .bufferTimeout()
method, and instead, even the reactor team suggest the usage of .window() + concatMap()
instead.
Question 2: Instead of using a reactive operator which is known to be unstable, can this repo use a more appropriate operator such as .window() + concatMap()
?
Thank you