Open
Description
Hi!
While debugging reactor/reactor-core#2352 we wanted to check whether RxJava has the same issue since, given the history of both projects :)
Apparently, with 3.0.7
, the same construction in RxJava fails with a very similar issue (although the failure is different):
final int total = 100;
Long count = Flowable.range(0, total)
.groupBy(i -> (i / 2) * 2)
.flatMapMaybe(Flowable::firstElement, false, 1)
.observeOn(Schedulers.io())
.count()
.blockingGet();
assertThat(total - count).as("count").isZero();
Gives (not 100% reliably, consider running in "rerun until failure" mode):
io.reactivex.rxjava3.exceptions.MissingBackpressureException: Unable to emit a new group (#97) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.
at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onNext(FlowableGroupBy.java:197)
A few interesting observations:
- Changing
observeOn
's buffer size to131
and higher makes it always pass 130
would sometimes fail withUnable to emit a new group (#99) due to lack of requests
129
would sometimes fail withUnable to emit a new group (#98) due to lack of requests
128
would sometimes fail withUnable to emit a new group (#97) due to lack of requests
- etc etc
So it looks like there is a race between cancellation of the group and starting a new one, although I haven't investigated RxJava's issue much.