From 873fa75bf80fb6dd05c53d791eac8441147c528b Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 17 Feb 2014 12:30:48 -0800 Subject: [PATCH] Split SubscribeOn into SubscribeOn/UnsubscribeOn Working with @headinthebox based on discussions at https://github.com/Netflix/RxJava/pull/869 and https://github.com/Netflix/RxJava/pull/880#issuecomment-35163539 we determined that there are times when `unsubscribeOn` behavior is needed. The `subscribeOn` operator can not mix `subscribe` and `unsubscribe` scheduling behavior without breaking the `lift`/`Subscriber` behavior that allows unsubscribing synchronous sources. The newly added `unsubscribeOn` operator will not work with synchronous unsubscribes, but it will work for the targeted use cases such as UI event handlers. --- rxjava-core/src/main/java/rx/Observable.java | 18 +- .../rx/operators/OperatorSubscribeOn.java | 64 +-- .../operators/OperatorSubscribeOnBounded.java | 113 +++++ .../rx/operators/OperatorUnsubscribeOn.java | 79 ++++ .../rx/operators/OperatorGroupByTest.java | 4 +- .../OperatorSubscribeOnBoundedTest.java | 403 ++++++++++++++++++ .../rx/operators/OperatorSubscribeOnTest.java | 279 +----------- .../operators/OperatorUnsubscribeOnTest.java | 190 +++++++++ 8 files changed, 814 insertions(+), 336 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnBoundedTest.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperatorUnsubscribeOnTest.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 82e65c46f5..bb9953ca1e 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -108,6 +108,7 @@ import rx.operators.OperatorTimestamp; import rx.operators.OperatorToObservableList; import rx.operators.OperatorToObservableSortedList; +import rx.operators.OperatorUnsubscribeOn; import rx.operators.OperatorZip; import rx.operators.OperatorZipIterable; import rx.plugins.RxJavaObservableExecutionHook; @@ -7074,14 +7075,14 @@ public final Subscription subscribe(Subscriber observer, Scheduler sc } /** - * Asynchronously subscribes and unsubscribes Observers to this Observable on the specified + * Asynchronously subscribes Observers to this Observable on the specified * {@link Scheduler}. *

* * * @param scheduler - * the {@link Scheduler} to perform subscription and unsubscription actions on - * @return the source Observable modified so that its subscriptions and unsubscriptions happen on the + * the {@link Scheduler} to perform subscription actions on + * @return the source Observable modified so that its subscriptions happen on the * specified {@link Scheduler} * @see RxJava Wiki: subscribeOn() * @see #subscribeOn(rx.Scheduler, int) @@ -8204,6 +8205,17 @@ public final Observable> toSortedList(Func2(sortFunction)); } + /** + * Asynchronously unsubscribes on the specified {@link Scheduler}. + * + * @param scheduler + * the {@link Scheduler} to perform subscription and unsubscription actions on + * @return the source Observable modified so that its unsubscriptions happen on the specified {@link Scheduler} + */ + public final Observable unsubscribeOn(Scheduler scheduler) { + return lift(new OperatorUnsubscribeOn(scheduler)); + } + /** * Returns an Observable that represents a filtered version of the source Observable. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java index b8e782ef37..0c49a38162 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java @@ -23,46 +23,16 @@ import rx.util.functions.Action1; /** - * Subscribes and unsubscribes Observers on the specified Scheduler. + * Subscribes Observers on the specified Scheduler. *

- * Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables - * in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred. - *

- * See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous - * subscribe is solving. - * * */ public class OperatorSubscribeOn implements Operator> { private final Scheduler scheduler; - /** - * Indicate that events fired between the original subscription time and - * the actual subscription time should not get lost. - */ - private final boolean dontLoseEvents; - /** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */ - private final int bufferSize; public OperatorSubscribeOn(Scheduler scheduler) { this.scheduler = scheduler; - this.dontLoseEvents = false; - this.bufferSize = -1; - } - - /** - * Construct a SubscribeOn operator. - * - * @param scheduler - * the target scheduler - * @param bufferSize - * if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will - * block the source. -1 indicates an unbounded buffer - */ - public OperatorSubscribeOn(Scheduler scheduler, int bufferSize) { - this.scheduler = scheduler; - this.dontLoseEvents = true; - this.bufferSize = bufferSize; } @Override @@ -71,7 +41,7 @@ public Subscriber> call(final Subscriber subscr @Override public void onCompleted() { - // ignore + // ignore because this is a nested Observable and we expect only 1 Observable emitted to onNext } @Override @@ -79,33 +49,15 @@ public void onError(Throwable e) { subscriber.onError(e); } - boolean checkNeedBuffer(Observable o) { - return dontLoseEvents; - } - @Override public void onNext(final Observable o) { - if (checkNeedBuffer(o)) { - // use buffering (possibly blocking) for a possibly synchronous subscribe - final BufferUntilSubscriber bus = new BufferUntilSubscriber(bufferSize, subscriber); - o.subscribe(bus); - subscriber.add(scheduler.schedule(new Action1() { - @Override - public void call(final Inner inner) { - bus.enterPassthroughMode(); - } - })); - return; - } else { - // no buffering (async subscribe) - subscriber.add(scheduler.schedule(new Action1() { + subscriber.add(scheduler.schedule(new Action1() { - @Override - public void call(final Inner inner) { - o.subscribe(subscriber); - } - })); - } + @Override + public void call(final Inner inner) { + o.subscribe(subscriber); + } + })); } }; diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java new file mode 100644 index 0000000000..284cfcc5bd --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java @@ -0,0 +1,113 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Scheduler; +import rx.Scheduler.Inner; +import rx.Subscriber; +import rx.util.functions.Action1; + +/** + * Subscribes and unsubscribes Observers on the specified Scheduler. + *

+ * Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables + * in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred. + *

+ * See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous + * subscribe is solving. + * + * + */ +public class OperatorSubscribeOnBounded implements Operator> { + + private final Scheduler scheduler; + /** + * Indicate that events fired between the original subscription time and + * the actual subscription time should not get lost. + */ + private final boolean dontLoseEvents; + /** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */ + private final int bufferSize; + + public OperatorSubscribeOnBounded(Scheduler scheduler) { + this.scheduler = scheduler; + this.dontLoseEvents = false; + this.bufferSize = -1; + } + + /** + * Construct a SubscribeOn operator. + * + * @param scheduler + * the target scheduler + * @param bufferSize + * if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will + * block the source. -1 indicates an unbounded buffer + */ + public OperatorSubscribeOnBounded(Scheduler scheduler, int bufferSize) { + this.scheduler = scheduler; + this.dontLoseEvents = true; + this.bufferSize = bufferSize; + } + + @Override + public Subscriber> call(final Subscriber subscriber) { + return new Subscriber>(subscriber) { + + @Override + public void onCompleted() { + // ignore + } + + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + + boolean checkNeedBuffer(Observable o) { + return dontLoseEvents; + } + + @Override + public void onNext(final Observable o) { + if (checkNeedBuffer(o)) { + // use buffering (possibly blocking) for a possibly synchronous subscribe + final BufferUntilSubscriber bus = new BufferUntilSubscriber(bufferSize, subscriber); + o.subscribe(bus); + subscriber.add(scheduler.schedule(new Action1() { + @Override + public void call(final Inner inner) { + bus.enterPassthroughMode(); + } + })); + return; + } else { + // no buffering (async subscribe) + subscriber.add(scheduler.schedule(new Action1() { + + @Override + public void call(final Inner inner) { + o.subscribe(subscriber); + } + })); + } + } + + }; + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java new file mode 100644 index 0000000000..0d734c5f73 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java @@ -0,0 +1,79 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable.Operator; +import rx.Scheduler; +import rx.Scheduler.Inner; +import rx.Subscriber; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.MultipleAssignmentSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +/** + * Unsubscribes on the specified Scheduler. + *

+ */ +public class OperatorUnsubscribeOn implements Operator { + + private final Scheduler scheduler; + + public OperatorUnsubscribeOn(Scheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public Subscriber call(final Subscriber subscriber) { + final CompositeSubscription parentSubscription = new CompositeSubscription(); + subscriber.add(Subscriptions.create(new Action0() { + + @Override + public void call() { + final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription(); + mas.set(scheduler.schedule(new Action1() { + + @Override + public void call(final Inner inner) { + parentSubscription.unsubscribe(); + mas.unsubscribe(); + } + })); + } + + })); + + return new Subscriber(parentSubscription) { + + @Override + public void onCompleted() { + subscriber.onCompleted(); + } + + @Override + public void onError(Throwable e) { + subscriber.onError(e); + } + + @Override + public void onNext(T t) { + subscriber.onNext(t); + } + + }; + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java b/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java index 2f201dbb1b..7d4d98aace 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java @@ -700,7 +700,7 @@ public void call() { }); } else { - return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 1)).delay(400, TimeUnit.MILLISECONDS).map(new Func1() { + return group.nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread(), 1)).delay(400, TimeUnit.MILLISECONDS).map(new Func1() { @Override public String call(Integer t1) { @@ -826,7 +826,7 @@ public Integer call(Integer t) { @Override public Observable call(final GroupedObservable group) { - return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 0)).map(new Func1() { + return group.nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread(), 0)).map(new Func1() { @Override public String call(Integer t1) { diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnBoundedTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnBoundedTest.java new file mode 100644 index 0000000000..cbd94e59d7 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnBoundedTest.java @@ -0,0 +1,403 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Scheduler; +import rx.Subscriber; +import rx.Subscription; +import rx.observables.GroupedObservable; +import rx.observers.TestObserver; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; +import rx.subscriptions.Subscriptions; +import rx.util.Timestamped; +import rx.util.functions.Action0; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class OperatorSubscribeOnBoundedTest { + + private static class ThreadSubscription implements Subscription { + private volatile Thread thread; + + private final CountDownLatch latch = new CountDownLatch(1); + + private final Subscription s = Subscriptions.create(new Action0() { + + @Override + public void call() { + thread = Thread.currentThread(); + latch.countDown(); + } + + }); + + @Override + public void unsubscribe() { + s.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return s.isUnsubscribed(); + } + + public Thread getThread() throws InterruptedException { + latch.await(); + return thread; + } + } + + @Test + public void testSubscribeOnAndVerifySubscribeAndUnsubscribeThreads() + throws InterruptedException { + final ThreadSubscription subscription = new ThreadSubscription(); + final AtomicReference subscribeThread = new AtomicReference(); + Observable w = Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber t1) { + subscribeThread.set(Thread.currentThread()); + t1.add(subscription); + t1.onNext(1); + t1.onNext(2); + t1.onCompleted(); + } + }); + + TestObserver observer = new TestObserver(); + w.nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread())).subscribe(observer); + + Thread unsubscribeThread = subscription.getThread(); + + assertNotNull(unsubscribeThread); + assertNotSame(Thread.currentThread(), unsubscribeThread); + + assertNotNull(subscribeThread.get()); + assertNotSame(Thread.currentThread(), subscribeThread.get()); + // True for Schedulers.newThread() + assertTrue(unsubscribeThread == subscribeThread.get()); + + observer.assertReceivedOnNext(Arrays.asList(1, 2)); + observer.assertTerminalEvent(); + } + + @Test(timeout = 2000) + public void testIssue813() throws InterruptedException { + // https://github.com/Netflix/RxJava/issues/813 + final CountDownLatch scheduled = new CountDownLatch(1); + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch doneLatch = new CountDownLatch(1); + + TestObserver observer = new TestObserver(); + final ThreadSubscription s = new ThreadSubscription(); + + final Subscription subscription = Observable + .create(new Observable.OnSubscribe() { + @Override + public void call( + final Subscriber subscriber) { + subscriber.add(s); + scheduled.countDown(); + try { + latch.await(); + + // this should not run because the await above will be interrupted by the unsubscribe + subscriber.onCompleted(); + } catch (InterruptedException e) { + System.out.println("Interrupted because it is unsubscribed"); + Thread.currentThread().interrupt(); + } catch (Throwable e) { + subscriber.onError(e); + } finally { + doneLatch.countDown(); + } + } + }).nest().lift(new OperatorSubscribeOnBounded(Schedulers.computation())).subscribe(observer); + + // wait for scheduling + scheduled.await(); + // trigger unsubscribe + subscription.unsubscribe(); + // As unsubscribe is called in other thread, we need to wait for it. + s.getThread(); + latch.countDown(); + doneLatch.await(); + assertEquals(0, observer.getOnErrorEvents().size()); + // 0 because the unsubscribe interrupts and prevents onCompleted from being executed + assertEquals(0, observer.getOnCompletedEvents().size()); + } + + public static class SlowScheduler extends Scheduler { + final Scheduler actual; + final long delay; + final TimeUnit unit; + + public SlowScheduler() { + this(Schedulers.computation(), 2, TimeUnit.SECONDS); + } + + public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) { + this.actual = actual; + this.delay = delay; + this.unit = unit; + } + + @Override + public Subscription schedule(final Action1 action) { + return actual.schedule(action, delay, unit); + } + + @Override + public Subscription schedule(final Action1 action, final long delayTime, final TimeUnit delayUnit) { + TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit; + long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit); + return actual.schedule(action, t, common); + } + } + + @Test + public void testSubscribeOnPublishSubjectWithSlowScheduler() { + PublishSubject ps = PublishSubject.create(); + TestSubscriber ts = new TestSubscriber(); + ps.nest().lift(new OperatorSubscribeOnBounded(new SlowScheduler(), 0)).subscribe(ts); + ps.onNext(1); + ps.onNext(2); + ps.onCompleted(); + + ts.awaitTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList(1, 2)); + } + + @Test + public void testGroupsWithNestedSubscribeOn() throws InterruptedException { + final ArrayList results = new ArrayList(); + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber sub) { + sub.onNext(1); + sub.onNext(2); + sub.onNext(1); + sub.onNext(2); + sub.onCompleted(); + } + + }).groupBy(new Func1() { + + @Override + public Integer call(Integer t) { + return t; + } + + }).flatMap(new Func1, Observable>() { + + @Override + public Observable call(final GroupedObservable group) { + return group.nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread(), 0)).map(new Func1() { + + @Override + public String call(Integer t1) { + System.out.println("Received: " + t1 + " on group : " + group.getKey()); + return "first groups: " + t1; + } + + }); + } + + }).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String s) { + results.add(s); + } + + }); + + System.out.println("Results: " + results); + assertEquals(4, results.size()); + } + + @Test + public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException { + final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete + final ArrayList results = new ArrayList(); + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber sub) { + sub.onNext(1); + sub.onNext(2); + sub.onNext(1); + sub.onNext(2); + try { + first.await(); + } catch (InterruptedException e) { + sub.onError(e); + return; + } + sub.onNext(3); + sub.onNext(3); + sub.onCompleted(); + } + + }).groupBy(new Func1() { + + @Override + public Integer call(Integer t) { + return t; + } + + }).flatMap(new Func1, Observable>() { + + @Override + public Observable call(final GroupedObservable group) { + if (group.getKey() < 3) { + return group.map(new Func1() { + + @Override + public String call(Integer t1) { + return "first groups: " + t1; + } + + }) + // must take(2) so an onCompleted + unsubscribe happens on these first 2 groups + .take(2).doOnCompleted(new Action0() { + + @Override + public void call() { + first.countDown(); + } + + }); + } else { + return group.nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread(), 0)) + .delay(400, TimeUnit.MILLISECONDS).map(new Func1() { + + @Override + public String call(Integer t1) { + return "last group: " + t1; + } + + }); + } + } + + }).toBlockingObservable().forEach(new Action1() { + + @Override + public void call(String s) { + results.add(s); + } + + }); + + System.out.println("Results: " + results); + assertEquals(6, results.size()); + } + + void testBoundedBufferingWithSize(int size) throws Exception { + Observable timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS); + + final List deltas = Collections.synchronizedList(new ArrayList()); + + Subscription s = timer.timestamp().nest().lift(new OperatorSubscribeOnBounded>( + new SlowScheduler(Schedulers.computation(), 1, TimeUnit.SECONDS), size)).map(new Func1, Long>() { + @Override + public Long call(Timestamped t1) { + long v = System.currentTimeMillis() - t1.getTimestampMillis(); + return v; + } + }).doOnNext(new Action1() { + @Override + public void call(Long t1) { + deltas.add(t1); + } + }).subscribe(); + + Thread.sleep(2050); + + s.unsubscribe(); + + if (deltas.size() < size + 1) { + fail("To few items in deltas: " + deltas); + } + for (int i = 0; i < size + 1; i++) { + if (deltas.get(i) < 500) { + fail(i + "th item arrived too early: " + deltas); + } + } + for (int i = size + 1; i < deltas.size(); i++) { + if (deltas.get(i) >= 500) { + fail(i + "th item arrived too late: " + deltas); + } + } + } + + @Test(timeout = 5000) + public void testBoundedBufferingOfZero() throws Exception { + testBoundedBufferingWithSize(0); + } + + @Test(timeout = 5000) + public void testBoundedBufferingOfOne() throws Exception { + testBoundedBufferingWithSize(1); + } + + @Test(timeout = 5000) + public void testBoundedBufferingOfTwo() throws Exception { + testBoundedBufferingWithSize(2); + } + + @Test(timeout = 5000) + public void testUnsubscribeInfiniteStream() throws InterruptedException { + TestSubscriber ts = new TestSubscriber(); + final AtomicInteger count = new AtomicInteger(); + Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber sub) { + for (int i = 1; !sub.isUnsubscribed(); i++) { + count.incrementAndGet(); + sub.onNext(i); + } + } + + }).nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread())).take(10).subscribe(ts); + + ts.awaitTerminalEventAndUnsubscribeOnTimeout(1000, TimeUnit.MILLISECONDS); + Thread.sleep(200); // give time for the loop to continue + ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + assertEquals(10, count.get()); + } + +} diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java index 54d348dc3c..8e2e75b66b 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java @@ -17,14 +17,10 @@ import static org.junit.Assert.*; -import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; -import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; @@ -33,84 +29,13 @@ import rx.Scheduler; import rx.Subscriber; import rx.Subscription; -import rx.observables.GroupedObservable; import rx.observers.TestObserver; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; -import rx.subjects.PublishSubject; -import rx.subscriptions.Subscriptions; -import rx.util.Timestamped; -import rx.util.functions.Action0; import rx.util.functions.Action1; -import rx.util.functions.Func1; public class OperatorSubscribeOnTest { - private static class ThreadSubscription implements Subscription { - private volatile Thread thread; - - private final CountDownLatch latch = new CountDownLatch(1); - - private final Subscription s = Subscriptions.create(new Action0() { - - @Override - public void call() { - thread = Thread.currentThread(); - latch.countDown(); - } - - }); - - @Override - public void unsubscribe() { - s.unsubscribe(); - } - - @Override - public boolean isUnsubscribed() { - return s.isUnsubscribed(); - } - - public Thread getThread() throws InterruptedException { - latch.await(); - return thread; - } - } - - @Test - public void testSubscribeOnAndVerifySubscribeAndUnsubscribeThreads() - throws InterruptedException { - final ThreadSubscription subscription = new ThreadSubscription(); - final AtomicReference subscribeThread = new AtomicReference(); - Observable w = Observable.create(new OnSubscribe() { - - @Override - public void call(Subscriber t1) { - subscribeThread.set(Thread.currentThread()); - t1.add(subscription); - t1.onNext(1); - t1.onNext(2); - t1.onCompleted(); - } - }); - - TestObserver observer = new TestObserver(); - w.subscribeOn(Schedulers.newThread()).subscribe(observer); - - Thread unsubscribeThread = subscription.getThread(); - - assertNotNull(unsubscribeThread); - assertNotSame(Thread.currentThread(), unsubscribeThread); - - assertNotNull(subscribeThread.get()); - assertNotSame(Thread.currentThread(), subscribeThread.get()); - // True for Schedulers.newThread() - assertTrue(unsubscribeThread == subscribeThread.get()); - - observer.assertReceivedOnNext(Arrays.asList(1, 2)); - observer.assertTerminalEvent(); - } - @Test(timeout = 2000) public void testIssue813() throws InterruptedException { // https://github.com/Netflix/RxJava/issues/813 @@ -119,23 +44,22 @@ public void testIssue813() throws InterruptedException { final CountDownLatch doneLatch = new CountDownLatch(1); TestObserver observer = new TestObserver(); - final ThreadSubscription s = new ThreadSubscription(); final Subscription subscription = Observable .create(new Observable.OnSubscribe() { @Override public void call( final Subscriber subscriber) { - subscriber.add(s); scheduled.countDown(); try { latch.await(); + System.out.println("emit onCompleted"); // this should not run because the await above will be interrupted by the unsubscribe subscriber.onCompleted(); } catch (InterruptedException e) { - System.out.println("Interrupted because it is unsubscribed"); - Thread.currentThread().interrupt(); + e.printStackTrace(); + throw new RuntimeException("should not occur since we are not interuppting"); } catch (Throwable e) { subscriber.onError(e); } finally { @@ -148,12 +72,10 @@ public void call( scheduled.await(); // trigger unsubscribe subscription.unsubscribe(); - // As unsubscribe is called in other thread, we need to wait for it. - s.getThread(); latch.countDown(); doneLatch.await(); assertEquals(0, observer.getOnErrorEvents().size()); - // 0 because the unsubscribe interrupts and prevents onCompleted from being executed + // the unsubscribe shuts down the scheduler which causes the latch to be interrupted assertEquals(0, observer.getOnCompletedEvents().size()); } @@ -185,199 +107,6 @@ public Subscription schedule(final Action1 action, final long d } } - @Test - public void testSubscribeOnPublishSubjectWithSlowScheduler() { - PublishSubject ps = PublishSubject.create(); - TestSubscriber ts = new TestSubscriber(); - ps.nest().lift(new OperatorSubscribeOn(new SlowScheduler(), 0)).subscribe(ts); - ps.onNext(1); - ps.onNext(2); - ps.onCompleted(); - - ts.awaitTerminalEvent(); - ts.assertReceivedOnNext(Arrays.asList(1, 2)); - } - - @Test - public void testGroupsWithNestedSubscribeOn() throws InterruptedException { - final ArrayList results = new ArrayList(); - Observable.create(new OnSubscribe() { - - @Override - public void call(Subscriber sub) { - sub.onNext(1); - sub.onNext(2); - sub.onNext(1); - sub.onNext(2); - sub.onCompleted(); - } - - }).groupBy(new Func1() { - - @Override - public Integer call(Integer t) { - return t; - } - - }).flatMap(new Func1, Observable>() { - - @Override - public Observable call(final GroupedObservable group) { - return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 0)).map(new Func1() { - - @Override - public String call(Integer t1) { - System.out.println("Received: " + t1 + " on group : " + group.getKey()); - return "first groups: " + t1; - } - - }); - } - - }).toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String s) { - results.add(s); - } - - }); - - System.out.println("Results: " + results); - assertEquals(4, results.size()); - } - - @Test - public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException { - final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete - final ArrayList results = new ArrayList(); - Observable.create(new OnSubscribe() { - - @Override - public void call(Subscriber sub) { - sub.onNext(1); - sub.onNext(2); - sub.onNext(1); - sub.onNext(2); - try { - first.await(); - } catch (InterruptedException e) { - sub.onError(e); - return; - } - sub.onNext(3); - sub.onNext(3); - sub.onCompleted(); - } - - }).groupBy(new Func1() { - - @Override - public Integer call(Integer t) { - return t; - } - - }).flatMap(new Func1, Observable>() { - - @Override - public Observable call(final GroupedObservable group) { - if (group.getKey() < 3) { - return group.map(new Func1() { - - @Override - public String call(Integer t1) { - return "first groups: " + t1; - } - - }) - // must take(2) so an onCompleted + unsubscribe happens on these first 2 groups - .take(2).doOnCompleted(new Action0() { - - @Override - public void call() { - first.countDown(); - } - - }); - } else { - return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 0)) - .delay(400, TimeUnit.MILLISECONDS).map(new Func1() { - - @Override - public String call(Integer t1) { - return "last group: " + t1; - } - - }); - } - } - - }).toBlockingObservable().forEach(new Action1() { - - @Override - public void call(String s) { - results.add(s); - } - - }); - - System.out.println("Results: " + results); - assertEquals(6, results.size()); - } - - void testBoundedBufferingWithSize(int size) throws Exception { - Observable timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS); - - final List deltas = Collections.synchronizedList(new ArrayList()); - - Subscription s = timer.timestamp().nest().lift(new OperatorSubscribeOn>( - new SlowScheduler(Schedulers.computation(), 1, TimeUnit.SECONDS), size)).map(new Func1, Long>() { - @Override - public Long call(Timestamped t1) { - long v = System.currentTimeMillis() - t1.getTimestampMillis(); - return v; - } - }).doOnNext(new Action1() { - @Override - public void call(Long t1) { - deltas.add(t1); - } - }).subscribe(); - - Thread.sleep(2050); - - s.unsubscribe(); - - if (deltas.size() < size + 1) { - fail("To few items in deltas: " + deltas); - } - for (int i = 0; i < size + 1; i++) { - if (deltas.get(i) < 500) { - fail(i + "th item arrived too early: " + deltas); - } - } - for (int i = size + 1; i < deltas.size(); i++) { - if (deltas.get(i) >= 500) { - fail(i + "th item arrived too late: " + deltas); - } - } - } - - @Test(timeout = 5000) - public void testBoundedBufferingOfZero() throws Exception { - testBoundedBufferingWithSize(0); - } - - @Test(timeout = 5000) - public void testBoundedBufferingOfOne() throws Exception { - testBoundedBufferingWithSize(1); - } - - @Test(timeout = 5000) - public void testBoundedBufferingOfTwo() throws Exception { - testBoundedBufferingWithSize(2); - } - @Test(timeout = 5000) public void testUnsubscribeInfiniteStream() throws InterruptedException { TestSubscriber ts = new TestSubscriber(); diff --git a/rxjava-core/src/test/java/rx/operators/OperatorUnsubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorUnsubscribeOnTest.java new file mode 100644 index 0000000000..9aa75c39c0 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperatorUnsubscribeOnTest.java @@ -0,0 +1,190 @@ +package rx.operators; + +import static org.junit.Assert.*; + +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Scheduler; +import rx.Subscriber; +import rx.Subscription; +import rx.observers.TestObserver; +import rx.schedulers.Schedulers; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Action1; + +public class OperatorUnsubscribeOnTest { + + @Test + public void testUnsubscribeWhenSubscribeOnAndUnsubscribeOnAreOnSameThread() throws InterruptedException { + UIEventLoopScheduler UI_EVENT_LOOP = new UIEventLoopScheduler(); + try { + final ThreadSubscription subscription = new ThreadSubscription(); + final AtomicReference subscribeThread = new AtomicReference(); + Observable w = Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber t1) { + subscribeThread.set(Thread.currentThread()); + t1.add(subscription); + t1.onNext(1); + t1.onNext(2); + t1.onCompleted(); + } + }); + + TestObserver observer = new TestObserver(); + w.subscribeOn(UI_EVENT_LOOP).observeOn(Schedulers.computation()).unsubscribeOn(UI_EVENT_LOOP).subscribe(observer); + + Thread unsubscribeThread = subscription.getThread(); + + assertNotNull(unsubscribeThread); + assertNotSame(Thread.currentThread(), unsubscribeThread); + + assertNotNull(subscribeThread.get()); + assertNotSame(Thread.currentThread(), subscribeThread.get()); + // True for Schedulers.newThread() + + System.out.println("unsubscribeThread: " + unsubscribeThread); + System.out.println("subscribeThread.get(): " + subscribeThread.get()); + assertTrue(unsubscribeThread == UI_EVENT_LOOP.getThread()); + + observer.assertReceivedOnNext(Arrays.asList(1, 2)); + observer.assertTerminalEvent(); + } finally { + UI_EVENT_LOOP.shutdown(); + } + } + + @Test + public void testUnsubscribeWhenSubscribeOnAndUnsubscribeOnAreOnDifferentThreads() throws InterruptedException { + UIEventLoopScheduler UI_EVENT_LOOP = new UIEventLoopScheduler(); + try { + final ThreadSubscription subscription = new ThreadSubscription(); + final AtomicReference subscribeThread = new AtomicReference(); + Observable w = Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber t1) { + subscribeThread.set(Thread.currentThread()); + t1.add(subscription); + t1.onNext(1); + t1.onNext(2); + t1.onCompleted(); + } + }); + + TestObserver observer = new TestObserver(); + w.subscribeOn(Schedulers.newThread()).observeOn(Schedulers.computation()).unsubscribeOn(UI_EVENT_LOOP).subscribe(observer); + + Thread unsubscribeThread = subscription.getThread(); + + assertNotNull(unsubscribeThread); + assertNotSame(Thread.currentThread(), unsubscribeThread); + + assertNotNull(subscribeThread.get()); + assertNotSame(Thread.currentThread(), subscribeThread.get()); + // True for Schedulers.newThread() + + System.out.println("unsubscribeThread: " + unsubscribeThread); + System.out.println("subscribeThread.get(): " + subscribeThread.get()); + assertTrue(unsubscribeThread == UI_EVENT_LOOP.getThread()); + + observer.assertReceivedOnNext(Arrays.asList(1, 2)); + observer.assertTerminalEvent(); + } finally { + UI_EVENT_LOOP.shutdown(); + } + } + + private static class ThreadSubscription implements Subscription { + private volatile Thread thread; + + private final CountDownLatch latch = new CountDownLatch(1); + + private final Subscription s = Subscriptions.create(new Action0() { + + @Override + public void call() { + System.out.println("unsubscribe invoked: " + Thread.currentThread()); + thread = Thread.currentThread(); + latch.countDown(); + } + + }); + + @Override + public void unsubscribe() { + s.unsubscribe(); + } + + @Override + public boolean isUnsubscribed() { + return s.isUnsubscribed(); + } + + public Thread getThread() throws InterruptedException { + latch.await(); + return thread; + } + } + + public static class UIEventLoopScheduler extends Scheduler { + + private final Scheduler.Inner eventLoop; + private final Subscription s; + private volatile Thread t; + + public UIEventLoopScheduler() { + /* + * DON'T DO THIS IN PRODUCTION CODE + */ + final AtomicReference innerScheduler = new AtomicReference(); + final CountDownLatch latch = new CountDownLatch(1); + s = Schedulers.newThread().schedule(new Action1() { + + @Override + public void call(Inner inner) { + t = Thread.currentThread(); + innerScheduler.set(inner); + latch.countDown(); + } + + }); + try { + latch.await(); + } catch (InterruptedException e) { + throw new RuntimeException("failed to initialize and get inner scheduler"); + } + eventLoop = innerScheduler.get(); + } + + @Override + public Subscription schedule(Action1 action) { + eventLoop.schedule(action); + return Subscriptions.empty(); + } + + @Override + public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) { + eventLoop.schedule(action); + return Subscriptions.empty(); + } + + public void shutdown() { + s.unsubscribe(); + } + + public Thread getThread() { + return t; + } + + } +}