diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index c2e18393d5..de381e206e 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -12251,7 +12251,7 @@ public final > E subscribeWith(E subscriber) { @SchedulerSupport(SchedulerSupport.CUSTOM) public final Flowable subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); - return RxJavaPlugins.onAssembly(new FlowableSubscribeOn(this, scheduler)); + return RxJavaPlugins.onAssembly(new FlowableSubscribeOn(this, scheduler, this instanceof FlowableCreate)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java index 8e63ecf642..b74410f3f0 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOn.java @@ -21,18 +21,28 @@ import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; +/** + * Subscribes to the source Flowable on the specified Scheduler and makes + * sure downstream requests are scheduled there as well. + * + * @param the value type emitted + */ public final class FlowableSubscribeOn extends AbstractFlowableWithUpstream { + final Scheduler scheduler; - public FlowableSubscribeOn(Publisher source, Scheduler scheduler) { + final boolean nonScheduledRequests; + + public FlowableSubscribeOn(Publisher source, Scheduler scheduler, boolean nonScheduledRequests) { super(source); this.scheduler = scheduler; + this.nonScheduledRequests = nonScheduledRequests; } @Override public void subscribeActual(final Subscriber s) { Scheduler.Worker w = scheduler.createWorker(); - final SubscribeOnSubscriber sos = new SubscribeOnSubscriber(s, w, source); + final SubscribeOnSubscriber sos = new SubscribeOnSubscriber(s, w, source, nonScheduledRequests); s.onSubscribe(sos); w.schedule(sos); @@ -42,21 +52,26 @@ static final class SubscribeOnSubscriber extends AtomicReference implements Subscriber, Subscription, Runnable { private static final long serialVersionUID = 8094547886072529208L; + final Subscriber actual; + final Scheduler.Worker worker; final AtomicReference s; final AtomicLong requested; + final boolean nonScheduledRequests; + Publisher source; - SubscribeOnSubscriber(Subscriber actual, Scheduler.Worker worker, Publisher source) { + SubscribeOnSubscriber(Subscriber actual, Scheduler.Worker worker, Publisher source, boolean nonScheduledRequests) { this.actual = actual; this.worker = worker; this.source = source; this.s = new AtomicReference(); this.requested = new AtomicLong(); + this.nonScheduledRequests = nonScheduledRequests; } @Override @@ -114,7 +129,7 @@ public void request(final long n) { } void requestUpstream(final long n, final Subscription s) { - if (Thread.currentThread() == get()) { + if (nonScheduledRequests || Thread.currentThread() == get()) { s.request(n); } else { worker.schedule(new Runnable() { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java index 6df9948062..c774b1c143 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSubscribeOnTest.java @@ -24,6 +24,7 @@ import io.reactivex.*; import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.operators.flowable.FlowableSubscribeOn.SubscribeOnSubscriber; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.schedulers.*; @@ -295,7 +296,7 @@ public void deferredRequestRace() { Worker w = Schedulers.computation().createWorker(); - final SubscribeOnSubscriber so = new SubscribeOnSubscriber(ts, w, Flowable.never()); + final SubscribeOnSubscriber so = new SubscribeOnSubscriber(ts, w, Flowable.never(), true); ts.onSubscribe(so); final BooleanSubscription bs = new BooleanSubscription(); @@ -321,4 +322,50 @@ public void run() { } } } + + @Test + public void nonScheduledRequests() { + TestSubscriber ts = Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter s) throws Exception { + for (int i = 1; i < 1001; i++) { + s.onNext(i); + Thread.sleep(1); + } + s.onComplete(); + } + }, BackpressureStrategy.DROP) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertNoErrors() + .assertComplete(); + + int c = ts.valueCount(); + + assertTrue("" + c, c > Flowable.bufferSize()); + } + + @Test + public void scheduledRequests() { + Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter s) throws Exception { + for (int i = 1; i < 1001; i++) { + s.onNext(i); + Thread.sleep(1); + } + s.onComplete(); + } + }, BackpressureStrategy.DROP) + .map(Functions.identity()) + .subscribeOn(Schedulers.single()) + .observeOn(Schedulers.computation()) + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertValueCount(Flowable.bufferSize()) + .assertNoErrors() + .assertComplete(); + } } \ No newline at end of file