|
36 | 36 | import rx.Subscription;
|
37 | 37 | import rx.subscriptions.BooleanSubscription;
|
38 | 38 | import rx.util.AtomicObservableSubscription;
|
39 |
| -import rx.util.Exceptions; |
| 39 | + |
40 | 40 | import rx.util.functions.Func1;
|
41 | 41 |
|
42 | 42 | public final class OperationConcat {
|
@@ -460,6 +460,38 @@ public void testConcatConcurrentWithInfinity() {
|
460 | 460 |
|
461 | 461 | }
|
462 | 462 |
|
| 463 | + @Test |
| 464 | + public void testConcatConcurrentWithInfinityFirstSequence() { |
| 465 | + final TestObservable<String> w1 = new TestObservable<String>("one", "two", "three"); |
| 466 | + //This observable will send "hello" MAX_VALUE time. |
| 467 | + final TestObservable<String> w2 = new TestObservable<String>("hello", Integer.MAX_VALUE); |
| 468 | + |
| 469 | + @SuppressWarnings("unchecked") |
| 470 | + Observer<String> aObserver = mock(Observer.class); |
| 471 | + @SuppressWarnings("unchecked") |
| 472 | + TestObservable<Observable<String>> observableOfObservables = new TestObservable<Observable<String>>(w2, w1); |
| 473 | + Func1<Observer<String>, Subscription> concatF = concat(observableOfObservables); |
| 474 | + |
| 475 | + Observable<String> concat = Observable.create(concatF); |
| 476 | + |
| 477 | + concat.take(50).subscribe(aObserver); |
| 478 | + |
| 479 | + //Wait for the thread to start up. |
| 480 | + try { |
| 481 | + Thread.sleep(25); |
| 482 | + w2.t.join(); |
| 483 | + } catch (InterruptedException e) { |
| 484 | + // TODO Auto-generated catch block |
| 485 | + e.printStackTrace(); |
| 486 | + } |
| 487 | + |
| 488 | + InOrder inOrder = inOrder(aObserver); |
| 489 | + inOrder.verify(aObserver, times(50)).onNext("hello"); |
| 490 | + verify(aObserver, times(1)).onCompleted(); |
| 491 | + verify(aObserver, never()).onError(any(Exception.class)); |
| 492 | + |
| 493 | + } |
| 494 | + |
463 | 495 |
|
464 | 496 | /**
|
465 | 497 | * Test unsubscribing the concatenated Observable in a single thread.
|
|
0 commit comments