diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java
index 82e65c46f5..bb9953ca1e 100644
--- a/rxjava-core/src/main/java/rx/Observable.java
+++ b/rxjava-core/src/main/java/rx/Observable.java
@@ -108,6 +108,7 @@
import rx.operators.OperatorTimestamp;
import rx.operators.OperatorToObservableList;
import rx.operators.OperatorToObservableSortedList;
+import rx.operators.OperatorUnsubscribeOn;
import rx.operators.OperatorZip;
import rx.operators.OperatorZipIterable;
import rx.plugins.RxJavaObservableExecutionHook;
@@ -7074,14 +7075,14 @@ public final Subscription subscribe(Subscriber super T> observer, Scheduler sc
}
/**
- * Asynchronously subscribes and unsubscribes Observers to this Observable on the specified
+ * Asynchronously subscribes Observers to this Observable on the specified
* {@link Scheduler}.
*
*
*
* @param scheduler
- * the {@link Scheduler} to perform subscription and unsubscription actions on
- * @return the source Observable modified so that its subscriptions and unsubscriptions happen on the
+ * the {@link Scheduler} to perform subscription actions on
+ * @return the source Observable modified so that its subscriptions happen on the
* specified {@link Scheduler}
* @see RxJava Wiki: subscribeOn()
* @see #subscribeOn(rx.Scheduler, int)
@@ -8204,6 +8205,17 @@ public final Observable> toSortedList(Func2 super T, ? super T, Intege
return lift(new OperatorToObservableSortedList(sortFunction));
}
+ /**
+ * Asynchronously unsubscribes on the specified {@link Scheduler}.
+ *
+ * @param scheduler
+ * the {@link Scheduler} to perform subscription and unsubscription actions on
+ * @return the source Observable modified so that its unsubscriptions happen on the specified {@link Scheduler}
+ */
+ public final Observable unsubscribeOn(Scheduler scheduler) {
+ return lift(new OperatorUnsubscribeOn(scheduler));
+ }
+
/**
* Returns an Observable that represents a filtered version of the source Observable.
*
diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
index b8e782ef37..0c49a38162 100644
--- a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
+++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
@@ -23,46 +23,16 @@
import rx.util.functions.Action1;
/**
- * Subscribes and unsubscribes Observers on the specified Scheduler.
+ * Subscribes Observers on the specified Scheduler.
*
- * Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables
- * in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred.
- *
- * See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous
- * subscribe is solving.
- *
*
*/
public class OperatorSubscribeOn implements Operator> {
private final Scheduler scheduler;
- /**
- * Indicate that events fired between the original subscription time and
- * the actual subscription time should not get lost.
- */
- private final boolean dontLoseEvents;
- /** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
- private final int bufferSize;
public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
- this.dontLoseEvents = false;
- this.bufferSize = -1;
- }
-
- /**
- * Construct a SubscribeOn operator.
- *
- * @param scheduler
- * the target scheduler
- * @param bufferSize
- * if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
- * block the source. -1 indicates an unbounded buffer
- */
- public OperatorSubscribeOn(Scheduler scheduler, int bufferSize) {
- this.scheduler = scheduler;
- this.dontLoseEvents = true;
- this.bufferSize = bufferSize;
}
@Override
@@ -71,7 +41,7 @@ public Subscriber super Observable> call(final Subscriber super T> subscr
@Override
public void onCompleted() {
- // ignore
+ // ignore because this is a nested Observable and we expect only 1 Observable emitted to onNext
}
@Override
@@ -79,33 +49,15 @@ public void onError(Throwable e) {
subscriber.onError(e);
}
- boolean checkNeedBuffer(Observable> o) {
- return dontLoseEvents;
- }
-
@Override
public void onNext(final Observable o) {
- if (checkNeedBuffer(o)) {
- // use buffering (possibly blocking) for a possibly synchronous subscribe
- final BufferUntilSubscriber bus = new BufferUntilSubscriber(bufferSize, subscriber);
- o.subscribe(bus);
- subscriber.add(scheduler.schedule(new Action1() {
- @Override
- public void call(final Inner inner) {
- bus.enterPassthroughMode();
- }
- }));
- return;
- } else {
- // no buffering (async subscribe)
- subscriber.add(scheduler.schedule(new Action1() {
+ subscriber.add(scheduler.schedule(new Action1() {
- @Override
- public void call(final Inner inner) {
- o.subscribe(subscriber);
- }
- }));
- }
+ @Override
+ public void call(final Inner inner) {
+ o.subscribe(subscriber);
+ }
+ }));
}
};
diff --git a/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java
new file mode 100644
index 0000000000..284cfcc5bd
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java
@@ -0,0 +1,113 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.operators;
+
+import rx.Observable;
+import rx.Observable.Operator;
+import rx.Scheduler;
+import rx.Scheduler.Inner;
+import rx.Subscriber;
+import rx.util.functions.Action1;
+
+/**
+ * Subscribes and unsubscribes Observers on the specified Scheduler.
+ *
+ * Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables
+ * in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred.
+ *
+ * See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous
+ * subscribe is solving.
+ *
+ *
+ */
+public class OperatorSubscribeOnBounded implements Operator> {
+
+ private final Scheduler scheduler;
+ /**
+ * Indicate that events fired between the original subscription time and
+ * the actual subscription time should not get lost.
+ */
+ private final boolean dontLoseEvents;
+ /** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
+ private final int bufferSize;
+
+ public OperatorSubscribeOnBounded(Scheduler scheduler) {
+ this.scheduler = scheduler;
+ this.dontLoseEvents = false;
+ this.bufferSize = -1;
+ }
+
+ /**
+ * Construct a SubscribeOn operator.
+ *
+ * @param scheduler
+ * the target scheduler
+ * @param bufferSize
+ * if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
+ * block the source. -1 indicates an unbounded buffer
+ */
+ public OperatorSubscribeOnBounded(Scheduler scheduler, int bufferSize) {
+ this.scheduler = scheduler;
+ this.dontLoseEvents = true;
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ public Subscriber super Observable> call(final Subscriber super T> subscriber) {
+ return new Subscriber>(subscriber) {
+
+ @Override
+ public void onCompleted() {
+ // ignore
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ subscriber.onError(e);
+ }
+
+ boolean checkNeedBuffer(Observable> o) {
+ return dontLoseEvents;
+ }
+
+ @Override
+ public void onNext(final Observable o) {
+ if (checkNeedBuffer(o)) {
+ // use buffering (possibly blocking) for a possibly synchronous subscribe
+ final BufferUntilSubscriber bus = new BufferUntilSubscriber(bufferSize, subscriber);
+ o.subscribe(bus);
+ subscriber.add(scheduler.schedule(new Action1() {
+ @Override
+ public void call(final Inner inner) {
+ bus.enterPassthroughMode();
+ }
+ }));
+ return;
+ } else {
+ // no buffering (async subscribe)
+ subscriber.add(scheduler.schedule(new Action1() {
+
+ @Override
+ public void call(final Inner inner) {
+ o.subscribe(subscriber);
+ }
+ }));
+ }
+ }
+
+ };
+ }
+}
diff --git a/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java
new file mode 100644
index 0000000000..0d734c5f73
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.operators;
+
+import rx.Observable.Operator;
+import rx.Scheduler;
+import rx.Scheduler.Inner;
+import rx.Subscriber;
+import rx.subscriptions.CompositeSubscription;
+import rx.subscriptions.MultipleAssignmentSubscription;
+import rx.subscriptions.Subscriptions;
+import rx.util.functions.Action0;
+import rx.util.functions.Action1;
+
+/**
+ * Unsubscribes on the specified Scheduler.
+ *
+ */
+public class OperatorUnsubscribeOn implements Operator {
+
+ private final Scheduler scheduler;
+
+ public OperatorUnsubscribeOn(Scheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ @Override
+ public Subscriber super T> call(final Subscriber super T> subscriber) {
+ final CompositeSubscription parentSubscription = new CompositeSubscription();
+ subscriber.add(Subscriptions.create(new Action0() {
+
+ @Override
+ public void call() {
+ final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
+ mas.set(scheduler.schedule(new Action1() {
+
+ @Override
+ public void call(final Inner inner) {
+ parentSubscription.unsubscribe();
+ mas.unsubscribe();
+ }
+ }));
+ }
+
+ }));
+
+ return new Subscriber(parentSubscription) {
+
+ @Override
+ public void onCompleted() {
+ subscriber.onCompleted();
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ subscriber.onError(e);
+ }
+
+ @Override
+ public void onNext(T t) {
+ subscriber.onNext(t);
+ }
+
+ };
+ }
+}
diff --git a/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java b/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java
index 2f201dbb1b..7d4d98aace 100644
--- a/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java
+++ b/rxjava-core/src/test/java/rx/operators/OperatorGroupByTest.java
@@ -700,7 +700,7 @@ public void call() {
});
} else {
- return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 1)).delay(400, TimeUnit.MILLISECONDS).map(new Func1() {
+ return group.nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread(), 1)).delay(400, TimeUnit.MILLISECONDS).map(new Func1() {
@Override
public String call(Integer t1) {
@@ -826,7 +826,7 @@ public Integer call(Integer t) {
@Override
public Observable call(final GroupedObservable group) {
- return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 0)).map(new Func1() {
+ return group.nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread(), 0)).map(new Func1() {
@Override
public String call(Integer t1) {
diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnBoundedTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnBoundedTest.java
new file mode 100644
index 0000000000..cbd94e59d7
--- /dev/null
+++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnBoundedTest.java
@@ -0,0 +1,403 @@
+/**
+ * Copyright 2014 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.operators;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Test;
+
+import rx.Observable;
+import rx.Observable.OnSubscribe;
+import rx.Scheduler;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.observables.GroupedObservable;
+import rx.observers.TestObserver;
+import rx.observers.TestSubscriber;
+import rx.schedulers.Schedulers;
+import rx.subjects.PublishSubject;
+import rx.subscriptions.Subscriptions;
+import rx.util.Timestamped;
+import rx.util.functions.Action0;
+import rx.util.functions.Action1;
+import rx.util.functions.Func1;
+
+public class OperatorSubscribeOnBoundedTest {
+
+ private static class ThreadSubscription implements Subscription {
+ private volatile Thread thread;
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private final Subscription s = Subscriptions.create(new Action0() {
+
+ @Override
+ public void call() {
+ thread = Thread.currentThread();
+ latch.countDown();
+ }
+
+ });
+
+ @Override
+ public void unsubscribe() {
+ s.unsubscribe();
+ }
+
+ @Override
+ public boolean isUnsubscribed() {
+ return s.isUnsubscribed();
+ }
+
+ public Thread getThread() throws InterruptedException {
+ latch.await();
+ return thread;
+ }
+ }
+
+ @Test
+ public void testSubscribeOnAndVerifySubscribeAndUnsubscribeThreads()
+ throws InterruptedException {
+ final ThreadSubscription subscription = new ThreadSubscription();
+ final AtomicReference subscribeThread = new AtomicReference();
+ Observable w = Observable.create(new OnSubscribe() {
+
+ @Override
+ public void call(Subscriber super Integer> t1) {
+ subscribeThread.set(Thread.currentThread());
+ t1.add(subscription);
+ t1.onNext(1);
+ t1.onNext(2);
+ t1.onCompleted();
+ }
+ });
+
+ TestObserver observer = new TestObserver();
+ w.nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread())).subscribe(observer);
+
+ Thread unsubscribeThread = subscription.getThread();
+
+ assertNotNull(unsubscribeThread);
+ assertNotSame(Thread.currentThread(), unsubscribeThread);
+
+ assertNotNull(subscribeThread.get());
+ assertNotSame(Thread.currentThread(), subscribeThread.get());
+ // True for Schedulers.newThread()
+ assertTrue(unsubscribeThread == subscribeThread.get());
+
+ observer.assertReceivedOnNext(Arrays.asList(1, 2));
+ observer.assertTerminalEvent();
+ }
+
+ @Test(timeout = 2000)
+ public void testIssue813() throws InterruptedException {
+ // https://github.com/Netflix/RxJava/issues/813
+ final CountDownLatch scheduled = new CountDownLatch(1);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final CountDownLatch doneLatch = new CountDownLatch(1);
+
+ TestObserver observer = new TestObserver();
+ final ThreadSubscription s = new ThreadSubscription();
+
+ final Subscription subscription = Observable
+ .create(new Observable.OnSubscribe() {
+ @Override
+ public void call(
+ final Subscriber super Integer> subscriber) {
+ subscriber.add(s);
+ scheduled.countDown();
+ try {
+ latch.await();
+
+ // this should not run because the await above will be interrupted by the unsubscribe
+ subscriber.onCompleted();
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted because it is unsubscribed");
+ Thread.currentThread().interrupt();
+ } catch (Throwable e) {
+ subscriber.onError(e);
+ } finally {
+ doneLatch.countDown();
+ }
+ }
+ }).nest().lift(new OperatorSubscribeOnBounded(Schedulers.computation())).subscribe(observer);
+
+ // wait for scheduling
+ scheduled.await();
+ // trigger unsubscribe
+ subscription.unsubscribe();
+ // As unsubscribe is called in other thread, we need to wait for it.
+ s.getThread();
+ latch.countDown();
+ doneLatch.await();
+ assertEquals(0, observer.getOnErrorEvents().size());
+ // 0 because the unsubscribe interrupts and prevents onCompleted from being executed
+ assertEquals(0, observer.getOnCompletedEvents().size());
+ }
+
+ public static class SlowScheduler extends Scheduler {
+ final Scheduler actual;
+ final long delay;
+ final TimeUnit unit;
+
+ public SlowScheduler() {
+ this(Schedulers.computation(), 2, TimeUnit.SECONDS);
+ }
+
+ public SlowScheduler(Scheduler actual, long delay, TimeUnit unit) {
+ this.actual = actual;
+ this.delay = delay;
+ this.unit = unit;
+ }
+
+ @Override
+ public Subscription schedule(final Action1 action) {
+ return actual.schedule(action, delay, unit);
+ }
+
+ @Override
+ public Subscription schedule(final Action1 action, final long delayTime, final TimeUnit delayUnit) {
+ TimeUnit common = delayUnit.compareTo(unit) < 0 ? delayUnit : unit;
+ long t = common.convert(delayTime, delayUnit) + common.convert(delay, unit);
+ return actual.schedule(action, t, common);
+ }
+ }
+
+ @Test
+ public void testSubscribeOnPublishSubjectWithSlowScheduler() {
+ PublishSubject ps = PublishSubject.create();
+ TestSubscriber ts = new TestSubscriber();
+ ps.nest().lift(new OperatorSubscribeOnBounded(new SlowScheduler(), 0)).subscribe(ts);
+ ps.onNext(1);
+ ps.onNext(2);
+ ps.onCompleted();
+
+ ts.awaitTerminalEvent();
+ ts.assertReceivedOnNext(Arrays.asList(1, 2));
+ }
+
+ @Test
+ public void testGroupsWithNestedSubscribeOn() throws InterruptedException {
+ final ArrayList results = new ArrayList();
+ Observable.create(new OnSubscribe() {
+
+ @Override
+ public void call(Subscriber super Integer> sub) {
+ sub.onNext(1);
+ sub.onNext(2);
+ sub.onNext(1);
+ sub.onNext(2);
+ sub.onCompleted();
+ }
+
+ }).groupBy(new Func1() {
+
+ @Override
+ public Integer call(Integer t) {
+ return t;
+ }
+
+ }).flatMap(new Func1, Observable>() {
+
+ @Override
+ public Observable call(final GroupedObservable group) {
+ return group.nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread(), 0)).map(new Func1() {
+
+ @Override
+ public String call(Integer t1) {
+ System.out.println("Received: " + t1 + " on group : " + group.getKey());
+ return "first groups: " + t1;
+ }
+
+ });
+ }
+
+ }).toBlockingObservable().forEach(new Action1() {
+
+ @Override
+ public void call(String s) {
+ results.add(s);
+ }
+
+ });
+
+ System.out.println("Results: " + results);
+ assertEquals(4, results.size());
+ }
+
+ @Test
+ public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException {
+ final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete
+ final ArrayList results = new ArrayList();
+ Observable.create(new OnSubscribe() {
+
+ @Override
+ public void call(Subscriber super Integer> sub) {
+ sub.onNext(1);
+ sub.onNext(2);
+ sub.onNext(1);
+ sub.onNext(2);
+ try {
+ first.await();
+ } catch (InterruptedException e) {
+ sub.onError(e);
+ return;
+ }
+ sub.onNext(3);
+ sub.onNext(3);
+ sub.onCompleted();
+ }
+
+ }).groupBy(new Func1() {
+
+ @Override
+ public Integer call(Integer t) {
+ return t;
+ }
+
+ }).flatMap(new Func1, Observable>() {
+
+ @Override
+ public Observable call(final GroupedObservable group) {
+ if (group.getKey() < 3) {
+ return group.map(new Func1() {
+
+ @Override
+ public String call(Integer t1) {
+ return "first groups: " + t1;
+ }
+
+ })
+ // must take(2) so an onCompleted + unsubscribe happens on these first 2 groups
+ .take(2).doOnCompleted(new Action0() {
+
+ @Override
+ public void call() {
+ first.countDown();
+ }
+
+ });
+ } else {
+ return group.nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread(), 0))
+ .delay(400, TimeUnit.MILLISECONDS).map(new Func1() {
+
+ @Override
+ public String call(Integer t1) {
+ return "last group: " + t1;
+ }
+
+ });
+ }
+ }
+
+ }).toBlockingObservable().forEach(new Action1() {
+
+ @Override
+ public void call(String s) {
+ results.add(s);
+ }
+
+ });
+
+ System.out.println("Results: " + results);
+ assertEquals(6, results.size());
+ }
+
+ void testBoundedBufferingWithSize(int size) throws Exception {
+ Observable timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS);
+
+ final List deltas = Collections.synchronizedList(new ArrayList());
+
+ Subscription s = timer.timestamp().nest().lift(new OperatorSubscribeOnBounded>(
+ new SlowScheduler(Schedulers.computation(), 1, TimeUnit.SECONDS), size)).map(new Func1, Long>() {
+ @Override
+ public Long call(Timestamped t1) {
+ long v = System.currentTimeMillis() - t1.getTimestampMillis();
+ return v;
+ }
+ }).doOnNext(new Action1() {
+ @Override
+ public void call(Long t1) {
+ deltas.add(t1);
+ }
+ }).subscribe();
+
+ Thread.sleep(2050);
+
+ s.unsubscribe();
+
+ if (deltas.size() < size + 1) {
+ fail("To few items in deltas: " + deltas);
+ }
+ for (int i = 0; i < size + 1; i++) {
+ if (deltas.get(i) < 500) {
+ fail(i + "th item arrived too early: " + deltas);
+ }
+ }
+ for (int i = size + 1; i < deltas.size(); i++) {
+ if (deltas.get(i) >= 500) {
+ fail(i + "th item arrived too late: " + deltas);
+ }
+ }
+ }
+
+ @Test(timeout = 5000)
+ public void testBoundedBufferingOfZero() throws Exception {
+ testBoundedBufferingWithSize(0);
+ }
+
+ @Test(timeout = 5000)
+ public void testBoundedBufferingOfOne() throws Exception {
+ testBoundedBufferingWithSize(1);
+ }
+
+ @Test(timeout = 5000)
+ public void testBoundedBufferingOfTwo() throws Exception {
+ testBoundedBufferingWithSize(2);
+ }
+
+ @Test(timeout = 5000)
+ public void testUnsubscribeInfiniteStream() throws InterruptedException {
+ TestSubscriber ts = new TestSubscriber();
+ final AtomicInteger count = new AtomicInteger();
+ Observable.create(new OnSubscribe() {
+
+ @Override
+ public void call(Subscriber super Integer> sub) {
+ for (int i = 1; !sub.isUnsubscribed(); i++) {
+ count.incrementAndGet();
+ sub.onNext(i);
+ }
+ }
+
+ }).nest().lift(new OperatorSubscribeOnBounded(Schedulers.newThread())).take(10).subscribe(ts);
+
+ ts.awaitTerminalEventAndUnsubscribeOnTimeout(1000, TimeUnit.MILLISECONDS);
+ Thread.sleep(200); // give time for the loop to continue
+ ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
+ assertEquals(10, count.get());
+ }
+
+}
diff --git a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java
index 54d348dc3c..8e2e75b66b 100644
--- a/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java
+++ b/rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java
@@ -17,14 +17,10 @@
import static org.junit.Assert.*;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import org.junit.Test;
@@ -33,84 +29,13 @@
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
-import rx.observables.GroupedObservable;
import rx.observers.TestObserver;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
-import rx.subjects.PublishSubject;
-import rx.subscriptions.Subscriptions;
-import rx.util.Timestamped;
-import rx.util.functions.Action0;
import rx.util.functions.Action1;
-import rx.util.functions.Func1;
public class OperatorSubscribeOnTest {
- private static class ThreadSubscription implements Subscription {
- private volatile Thread thread;
-
- private final CountDownLatch latch = new CountDownLatch(1);
-
- private final Subscription s = Subscriptions.create(new Action0() {
-
- @Override
- public void call() {
- thread = Thread.currentThread();
- latch.countDown();
- }
-
- });
-
- @Override
- public void unsubscribe() {
- s.unsubscribe();
- }
-
- @Override
- public boolean isUnsubscribed() {
- return s.isUnsubscribed();
- }
-
- public Thread getThread() throws InterruptedException {
- latch.await();
- return thread;
- }
- }
-
- @Test
- public void testSubscribeOnAndVerifySubscribeAndUnsubscribeThreads()
- throws InterruptedException {
- final ThreadSubscription subscription = new ThreadSubscription();
- final AtomicReference subscribeThread = new AtomicReference();
- Observable w = Observable.create(new OnSubscribe() {
-
- @Override
- public void call(Subscriber super Integer> t1) {
- subscribeThread.set(Thread.currentThread());
- t1.add(subscription);
- t1.onNext(1);
- t1.onNext(2);
- t1.onCompleted();
- }
- });
-
- TestObserver observer = new TestObserver();
- w.subscribeOn(Schedulers.newThread()).subscribe(observer);
-
- Thread unsubscribeThread = subscription.getThread();
-
- assertNotNull(unsubscribeThread);
- assertNotSame(Thread.currentThread(), unsubscribeThread);
-
- assertNotNull(subscribeThread.get());
- assertNotSame(Thread.currentThread(), subscribeThread.get());
- // True for Schedulers.newThread()
- assertTrue(unsubscribeThread == subscribeThread.get());
-
- observer.assertReceivedOnNext(Arrays.asList(1, 2));
- observer.assertTerminalEvent();
- }
-
@Test(timeout = 2000)
public void testIssue813() throws InterruptedException {
// https://github.com/Netflix/RxJava/issues/813
@@ -119,23 +44,22 @@ public void testIssue813() throws InterruptedException {
final CountDownLatch doneLatch = new CountDownLatch(1);
TestObserver observer = new TestObserver();
- final ThreadSubscription s = new ThreadSubscription();
final Subscription subscription = Observable
.create(new Observable.OnSubscribe() {
@Override
public void call(
final Subscriber super Integer> subscriber) {
- subscriber.add(s);
scheduled.countDown();
try {
latch.await();
+ System.out.println("emit onCompleted");
// this should not run because the await above will be interrupted by the unsubscribe
subscriber.onCompleted();
} catch (InterruptedException e) {
- System.out.println("Interrupted because it is unsubscribed");
- Thread.currentThread().interrupt();
+ e.printStackTrace();
+ throw new RuntimeException("should not occur since we are not interuppting");
} catch (Throwable e) {
subscriber.onError(e);
} finally {
@@ -148,12 +72,10 @@ public void call(
scheduled.await();
// trigger unsubscribe
subscription.unsubscribe();
- // As unsubscribe is called in other thread, we need to wait for it.
- s.getThread();
latch.countDown();
doneLatch.await();
assertEquals(0, observer.getOnErrorEvents().size());
- // 0 because the unsubscribe interrupts and prevents onCompleted from being executed
+ // the unsubscribe shuts down the scheduler which causes the latch to be interrupted
assertEquals(0, observer.getOnCompletedEvents().size());
}
@@ -185,199 +107,6 @@ public Subscription schedule(final Action1 action, final long d
}
}
- @Test
- public void testSubscribeOnPublishSubjectWithSlowScheduler() {
- PublishSubject ps = PublishSubject.create();
- TestSubscriber ts = new TestSubscriber();
- ps.nest().lift(new OperatorSubscribeOn(new SlowScheduler(), 0)).subscribe(ts);
- ps.onNext(1);
- ps.onNext(2);
- ps.onCompleted();
-
- ts.awaitTerminalEvent();
- ts.assertReceivedOnNext(Arrays.asList(1, 2));
- }
-
- @Test
- public void testGroupsWithNestedSubscribeOn() throws InterruptedException {
- final ArrayList results = new ArrayList();
- Observable.create(new OnSubscribe() {
-
- @Override
- public void call(Subscriber super Integer> sub) {
- sub.onNext(1);
- sub.onNext(2);
- sub.onNext(1);
- sub.onNext(2);
- sub.onCompleted();
- }
-
- }).groupBy(new Func1() {
-
- @Override
- public Integer call(Integer t) {
- return t;
- }
-
- }).flatMap(new Func1, Observable>() {
-
- @Override
- public Observable call(final GroupedObservable group) {
- return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 0)).map(new Func1() {
-
- @Override
- public String call(Integer t1) {
- System.out.println("Received: " + t1 + " on group : " + group.getKey());
- return "first groups: " + t1;
- }
-
- });
- }
-
- }).toBlockingObservable().forEach(new Action1() {
-
- @Override
- public void call(String s) {
- results.add(s);
- }
-
- });
-
- System.out.println("Results: " + results);
- assertEquals(4, results.size());
- }
-
- @Test
- public void testFirstGroupsCompleteAndParentSlowToThenEmitFinalGroupsWhichThenSubscribesOnAndDelaysAndThenCompletes() throws InterruptedException {
- final CountDownLatch first = new CountDownLatch(2); // there are two groups to first complete
- final ArrayList results = new ArrayList();
- Observable.create(new OnSubscribe() {
-
- @Override
- public void call(Subscriber super Integer> sub) {
- sub.onNext(1);
- sub.onNext(2);
- sub.onNext(1);
- sub.onNext(2);
- try {
- first.await();
- } catch (InterruptedException e) {
- sub.onError(e);
- return;
- }
- sub.onNext(3);
- sub.onNext(3);
- sub.onCompleted();
- }
-
- }).groupBy(new Func1() {
-
- @Override
- public Integer call(Integer t) {
- return t;
- }
-
- }).flatMap(new Func1, Observable>() {
-
- @Override
- public Observable call(final GroupedObservable group) {
- if (group.getKey() < 3) {
- return group.map(new Func1() {
-
- @Override
- public String call(Integer t1) {
- return "first groups: " + t1;
- }
-
- })
- // must take(2) so an onCompleted + unsubscribe happens on these first 2 groups
- .take(2).doOnCompleted(new Action0() {
-
- @Override
- public void call() {
- first.countDown();
- }
-
- });
- } else {
- return group.nest().lift(new OperatorSubscribeOn(Schedulers.newThread(), 0))
- .delay(400, TimeUnit.MILLISECONDS).map(new Func1() {
-
- @Override
- public String call(Integer t1) {
- return "last group: " + t1;
- }
-
- });
- }
- }
-
- }).toBlockingObservable().forEach(new Action1() {
-
- @Override
- public void call(String s) {
- results.add(s);
- }
-
- });
-
- System.out.println("Results: " + results);
- assertEquals(6, results.size());
- }
-
- void testBoundedBufferingWithSize(int size) throws Exception {
- Observable timer = Observable.timer(100, 100, TimeUnit.MILLISECONDS);
-
- final List deltas = Collections.synchronizedList(new ArrayList());
-
- Subscription s = timer.timestamp().nest().lift(new OperatorSubscribeOn>(
- new SlowScheduler(Schedulers.computation(), 1, TimeUnit.SECONDS), size)).map(new Func1, Long>() {
- @Override
- public Long call(Timestamped t1) {
- long v = System.currentTimeMillis() - t1.getTimestampMillis();
- return v;
- }
- }).doOnNext(new Action1() {
- @Override
- public void call(Long t1) {
- deltas.add(t1);
- }
- }).subscribe();
-
- Thread.sleep(2050);
-
- s.unsubscribe();
-
- if (deltas.size() < size + 1) {
- fail("To few items in deltas: " + deltas);
- }
- for (int i = 0; i < size + 1; i++) {
- if (deltas.get(i) < 500) {
- fail(i + "th item arrived too early: " + deltas);
- }
- }
- for (int i = size + 1; i < deltas.size(); i++) {
- if (deltas.get(i) >= 500) {
- fail(i + "th item arrived too late: " + deltas);
- }
- }
- }
-
- @Test(timeout = 5000)
- public void testBoundedBufferingOfZero() throws Exception {
- testBoundedBufferingWithSize(0);
- }
-
- @Test(timeout = 5000)
- public void testBoundedBufferingOfOne() throws Exception {
- testBoundedBufferingWithSize(1);
- }
-
- @Test(timeout = 5000)
- public void testBoundedBufferingOfTwo() throws Exception {
- testBoundedBufferingWithSize(2);
- }
-
@Test(timeout = 5000)
public void testUnsubscribeInfiniteStream() throws InterruptedException {
TestSubscriber ts = new TestSubscriber();
diff --git a/rxjava-core/src/test/java/rx/operators/OperatorUnsubscribeOnTest.java b/rxjava-core/src/test/java/rx/operators/OperatorUnsubscribeOnTest.java
new file mode 100644
index 0000000000..9aa75c39c0
--- /dev/null
+++ b/rxjava-core/src/test/java/rx/operators/OperatorUnsubscribeOnTest.java
@@ -0,0 +1,190 @@
+package rx.operators;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.Test;
+
+import rx.Observable;
+import rx.Observable.OnSubscribe;
+import rx.Scheduler;
+import rx.Subscriber;
+import rx.Subscription;
+import rx.observers.TestObserver;
+import rx.schedulers.Schedulers;
+import rx.subscriptions.Subscriptions;
+import rx.util.functions.Action0;
+import rx.util.functions.Action1;
+
+public class OperatorUnsubscribeOnTest {
+
+ @Test
+ public void testUnsubscribeWhenSubscribeOnAndUnsubscribeOnAreOnSameThread() throws InterruptedException {
+ UIEventLoopScheduler UI_EVENT_LOOP = new UIEventLoopScheduler();
+ try {
+ final ThreadSubscription subscription = new ThreadSubscription();
+ final AtomicReference subscribeThread = new AtomicReference();
+ Observable w = Observable.create(new OnSubscribe() {
+
+ @Override
+ public void call(Subscriber super Integer> t1) {
+ subscribeThread.set(Thread.currentThread());
+ t1.add(subscription);
+ t1.onNext(1);
+ t1.onNext(2);
+ t1.onCompleted();
+ }
+ });
+
+ TestObserver observer = new TestObserver();
+ w.subscribeOn(UI_EVENT_LOOP).observeOn(Schedulers.computation()).unsubscribeOn(UI_EVENT_LOOP).subscribe(observer);
+
+ Thread unsubscribeThread = subscription.getThread();
+
+ assertNotNull(unsubscribeThread);
+ assertNotSame(Thread.currentThread(), unsubscribeThread);
+
+ assertNotNull(subscribeThread.get());
+ assertNotSame(Thread.currentThread(), subscribeThread.get());
+ // True for Schedulers.newThread()
+
+ System.out.println("unsubscribeThread: " + unsubscribeThread);
+ System.out.println("subscribeThread.get(): " + subscribeThread.get());
+ assertTrue(unsubscribeThread == UI_EVENT_LOOP.getThread());
+
+ observer.assertReceivedOnNext(Arrays.asList(1, 2));
+ observer.assertTerminalEvent();
+ } finally {
+ UI_EVENT_LOOP.shutdown();
+ }
+ }
+
+ @Test
+ public void testUnsubscribeWhenSubscribeOnAndUnsubscribeOnAreOnDifferentThreads() throws InterruptedException {
+ UIEventLoopScheduler UI_EVENT_LOOP = new UIEventLoopScheduler();
+ try {
+ final ThreadSubscription subscription = new ThreadSubscription();
+ final AtomicReference subscribeThread = new AtomicReference();
+ Observable w = Observable.create(new OnSubscribe() {
+
+ @Override
+ public void call(Subscriber super Integer> t1) {
+ subscribeThread.set(Thread.currentThread());
+ t1.add(subscription);
+ t1.onNext(1);
+ t1.onNext(2);
+ t1.onCompleted();
+ }
+ });
+
+ TestObserver observer = new TestObserver();
+ w.subscribeOn(Schedulers.newThread()).observeOn(Schedulers.computation()).unsubscribeOn(UI_EVENT_LOOP).subscribe(observer);
+
+ Thread unsubscribeThread = subscription.getThread();
+
+ assertNotNull(unsubscribeThread);
+ assertNotSame(Thread.currentThread(), unsubscribeThread);
+
+ assertNotNull(subscribeThread.get());
+ assertNotSame(Thread.currentThread(), subscribeThread.get());
+ // True for Schedulers.newThread()
+
+ System.out.println("unsubscribeThread: " + unsubscribeThread);
+ System.out.println("subscribeThread.get(): " + subscribeThread.get());
+ assertTrue(unsubscribeThread == UI_EVENT_LOOP.getThread());
+
+ observer.assertReceivedOnNext(Arrays.asList(1, 2));
+ observer.assertTerminalEvent();
+ } finally {
+ UI_EVENT_LOOP.shutdown();
+ }
+ }
+
+ private static class ThreadSubscription implements Subscription {
+ private volatile Thread thread;
+
+ private final CountDownLatch latch = new CountDownLatch(1);
+
+ private final Subscription s = Subscriptions.create(new Action0() {
+
+ @Override
+ public void call() {
+ System.out.println("unsubscribe invoked: " + Thread.currentThread());
+ thread = Thread.currentThread();
+ latch.countDown();
+ }
+
+ });
+
+ @Override
+ public void unsubscribe() {
+ s.unsubscribe();
+ }
+
+ @Override
+ public boolean isUnsubscribed() {
+ return s.isUnsubscribed();
+ }
+
+ public Thread getThread() throws InterruptedException {
+ latch.await();
+ return thread;
+ }
+ }
+
+ public static class UIEventLoopScheduler extends Scheduler {
+
+ private final Scheduler.Inner eventLoop;
+ private final Subscription s;
+ private volatile Thread t;
+
+ public UIEventLoopScheduler() {
+ /*
+ * DON'T DO THIS IN PRODUCTION CODE
+ */
+ final AtomicReference innerScheduler = new AtomicReference();
+ final CountDownLatch latch = new CountDownLatch(1);
+ s = Schedulers.newThread().schedule(new Action1() {
+
+ @Override
+ public void call(Inner inner) {
+ t = Thread.currentThread();
+ innerScheduler.set(inner);
+ latch.countDown();
+ }
+
+ });
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("failed to initialize and get inner scheduler");
+ }
+ eventLoop = innerScheduler.get();
+ }
+
+ @Override
+ public Subscription schedule(Action1 action) {
+ eventLoop.schedule(action);
+ return Subscriptions.empty();
+ }
+
+ @Override
+ public Subscription schedule(Action1 action, long delayTime, TimeUnit unit) {
+ eventLoop.schedule(action);
+ return Subscriptions.empty();
+ }
+
+ public void shutdown() {
+ s.unsubscribe();
+ }
+
+ public Thread getThread() {
+ return t;
+ }
+
+ }
+}