Skip to content

Commit 0063b90

Browse files
committed
Add new unit test to check for non-blocking.
1 parent 9b3204b commit 0063b90

File tree

1 file changed

+66
-1
lines changed

1 file changed

+66
-1
lines changed

rxjava-core/src/main/java/rx/operators/OperationConcat.java

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@
2323
import java.util.List;
2424
import java.util.Queue;
2525
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
2627
import java.util.concurrent.atomic.AtomicBoolean;
2728
import java.util.concurrent.atomic.AtomicReference;
2829
import java.util.concurrent.ConcurrentLinkedQueue;
2930

31+
import org.junit.Ignore;
3032
import org.junit.Test;
3133

3234
import org.mockito.InOrder;
@@ -460,6 +462,69 @@ public void testConcatConcurrentWithInfinity() {
460462

461463
}
462464

465+
466+
467+
@Test
468+
public void testConcatUnSubscribeNotBlockingObservables() {
469+
470+
final CountDownLatch okToContinueW1 = new CountDownLatch(1);
471+
final CountDownLatch okToContinueW2 = new CountDownLatch(1);
472+
473+
final TestObservable<String> w1 = new TestObservable<String>(null, okToContinueW1, "one", "two", "three");
474+
final TestObservable<String> w2 = new TestObservable<String>(null, okToContinueW2, "four", "five", "six");
475+
476+
@SuppressWarnings("unchecked")
477+
Observer<String> aObserver = mock(Observer.class);
478+
Observable<Observable<String>> observableOfObservables = Observable.create(new Func1<Observer<Observable<String>>, Subscription>() {
479+
480+
@Override
481+
public Subscription call(Observer<Observable<String>> observer) {
482+
// simulate what would happen in an observable
483+
observer.onNext(w1);
484+
observer.onNext(w2);
485+
observer.onCompleted();
486+
487+
return new Subscription() {
488+
489+
@Override
490+
public void unsubscribe() {
491+
}
492+
493+
};
494+
}
495+
496+
});
497+
Observable<String> concat = Observable.create(concat(observableOfObservables));
498+
499+
concat.subscribe(aObserver);
500+
501+
verify(aObserver, times(0)).onCompleted();
502+
503+
504+
//Wait for the thread to start up.
505+
try {
506+
Thread.sleep(25);
507+
w1.t.join();
508+
w2.t.join();
509+
okToContinueW1.countDown();
510+
okToContinueW2.countDown();
511+
} catch (InterruptedException e) {
512+
// TODO Auto-generated catch block
513+
e.printStackTrace();
514+
}
515+
516+
InOrder inOrder = inOrder(aObserver);
517+
inOrder.verify(aObserver, times(1)).onNext("one");
518+
inOrder.verify(aObserver, times(1)).onNext("two");
519+
inOrder.verify(aObserver, times(1)).onNext("three");
520+
inOrder.verify(aObserver, times(1)).onNext("four");
521+
inOrder.verify(aObserver, times(1)).onNext("five");
522+
inOrder.verify(aObserver, times(1)).onNext("six");
523+
verify(aObserver, times(1)).onCompleted();
524+
525+
526+
}
527+
463528

464529
/**
465530
* Test unsubscribing the concatenated Observable in a single thread.
@@ -608,7 +673,7 @@ public void run() {
608673
once.countDown();
609674
//Block until the main thread has called unsubscribe.
610675
if (null != okToContinue)
611-
okToContinue.await();
676+
okToContinue.await(1, TimeUnit.SECONDS);
612677
}
613678
if (subscribed)
614679
observer.onCompleted();

0 commit comments

Comments
 (0)