Skip to content

Flowable#groupBy race leads to a back-pressure issue #7100

Open
@bsideup

Description

@bsideup

Hi!

While debugging reactor/reactor-core#2352 we wanted to check whether RxJava has the same issue since, given the history of both projects :)

Apparently, with 3.0.7, the same construction in RxJava fails with a very similar issue (although the failure is different):

final int total = 100;

Long count = Flowable.range(0, total)
                     .groupBy(i -> (i / 2) * 2)
                     .flatMapMaybe(Flowable::firstElement, false, 1)
                     .observeOn(Schedulers.io())
                     .count()
                     .blockingGet();
assertThat(total - count).as("count").isZero();

Gives (not 100% reliably, consider running in "rerun until failure" mode):

io.reactivex.rxjava3.exceptions.MissingBackpressureException: Unable to emit a new group (#97) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.

	at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onNext(FlowableGroupBy.java:197)

A few interesting observations:

  1. Changing observeOn's buffer size to 131 and higher makes it always pass
  2. 130 would sometimes fail with Unable to emit a new group (#99) due to lack of requests
  3. 129 would sometimes fail with Unable to emit a new group (#98) due to lack of requests
  4. 128 would sometimes fail with Unable to emit a new group (#97) due to lack of requests
  5. etc etc

So it looks like there is a race between cancellation of the group and starting a new one, although I haven't investigated RxJava's issue much.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions