Skip to content

Commit d5678e0

Browse files
Merge pull request ReactiveX#273 from billyy/concat
Concat
2 parents b7fb332 + e877810 commit d5678e0

File tree

1 file changed

+154
-80
lines changed

1 file changed

+154
-80
lines changed

rxjava-core/src/main/java/rx/operators/OperationConcat.java

Lines changed: 154 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121
import java.util.Arrays;
2222
import java.util.ArrayList;
2323
import java.util.List;
24+
import java.util.Queue;
2425
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
2527
import java.util.concurrent.atomic.AtomicBoolean;
2628
import java.util.concurrent.atomic.AtomicReference;
29+
import java.util.concurrent.ConcurrentLinkedQueue;
2730

31+
import org.junit.Ignore;
2832
import org.junit.Test;
2933

3034
import org.mockito.InOrder;
@@ -40,23 +44,10 @@ public final class OperationConcat {
4044

4145
/**
4246
* Combine the observable sequences from the list of Observables into one
43-
* observable sequence without any transformation. If either the outer
47+
* observable sequence without any transformation. If either the outer
4448
* observable or an inner observable calls onError, we will call onError.
45-
*
46-
* <p/>
47-
*
48-
* The outer observable might run on a separate thread from (one of) the
49-
* inner observables; in this case care must be taken to avoid a deadlock.
50-
* The Concat operation may block the outer thread while servicing an inner
51-
* thread in order to ensure a well-defined ordering of elements; therefore
52-
* none of the inner threads must be implemented in a way that might wait on
53-
* the outer thread.
54-
*
5549
* <p/>
5650
*
57-
* Beware that concat(o1,o2).subscribe() is a blocking call from
58-
* which it is impossible to unsubscribe if observables are running on same thread.
59-
*
6051
* @param sequences An observable sequence of elements to project.
6152
* @return An observable sequence whose elements are the result of combining the output from the list of Observables.
6253
*/
@@ -69,73 +60,101 @@ public static <T> Func1<Observer<T>, Subscription> concat(final List<Observable<
6960
}
7061

7162
public static <T> Func1<Observer<T>, Subscription> concat(final Observable<Observable<T>> sequences) {
72-
return new Func1<Observer<T>, Subscription>() {
73-
74-
@Override
75-
public Subscription call(Observer<T> observer) {
76-
return new ConcatSubscription<T>(sequences, observer);
77-
}
78-
};
63+
return new Concat<T>(sequences);
7964
}
8065

81-
private static class ConcatSubscription<T> extends BooleanSubscription {
82-
// Might be updated by an inner thread's onError during the outer
83-
// thread's onNext, then read in the outer thread's onComplete.
84-
final AtomicBoolean innerError = new AtomicBoolean(false);
66+
private static class Concat<T> implements Func1<Observer<T>, Subscription> {
67+
private Observable<Observable<T>> sequences;
68+
private AtomicObservableSubscription innerSubscription = null;
8569

86-
public ConcatSubscription(Observable<Observable<T>> sequences, final Observer<T> observer) {
70+
public Concat(Observable<Observable<T>> sequences) {
71+
this.sequences = sequences;
72+
}
73+
74+
public Subscription call(final Observer<T> observer) {
75+
final AtomicBoolean completedOrErred = new AtomicBoolean(false);
76+
final AtomicBoolean allSequencesReceived = new AtomicBoolean(false);
77+
final Queue<Observable<T>> nextSequences = new ConcurrentLinkedQueue<Observable<T>>();
8778
final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription();
88-
outerSubscription.wrap(sequences.subscribe(new Observer<Observable<T>>() {
79+
80+
final Observer<T> reusableObserver = new Observer<T>() {
8981
@Override
90-
public void onNext(Observable<T> nextSequence) {
91-
// We will not return from onNext until the inner observer completes.
92-
// NB: while we are in onNext, the well-behaved outer observable will not call onError or onCompleted.
93-
final CountDownLatch latch = new CountDownLatch(1);
94-
final AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription();
95-
innerSubscription.wrap(nextSequence.subscribe(new Observer<T>() {
96-
@Override
97-
public void onNext(T item) {
98-
// Make our best-effort to release resources in the face of unsubscribe.
99-
if (isUnsubscribed()) {
100-
innerSubscription.unsubscribe();
101-
outerSubscription.unsubscribe();
102-
} else {
103-
observer.onNext(item);
82+
public void onNext(T item) {
83+
observer.onNext(item);
84+
}
85+
@Override
86+
public void onError(Exception e) {
87+
if (completedOrErred.compareAndSet(false, true)) {
88+
outerSubscription.unsubscribe();
89+
observer.onError(e);
90+
}
91+
}
92+
@Override
93+
public void onCompleted() {
94+
synchronized (nextSequences) {
95+
if (nextSequences.isEmpty()) {
96+
// No new sequences available at the moment
97+
innerSubscription = null;
98+
if (allSequencesReceived.get()) {
99+
// No new sequences are coming, we are finished
100+
if (completedOrErred.compareAndSet(false, true)) {
101+
observer.onCompleted();
102+
}
104103
}
105-
}
106-
@Override
107-
public void onError(Exception e) {
108-
outerSubscription.unsubscribe();
109-
innerError.set(true);
110-
observer.onError(e);
111-
latch.countDown();
112-
}
113-
@Override
114-
public void onCompleted() {
104+
} else {
115105
// Continue on to the next sequence
116-
latch.countDown();
106+
innerSubscription = new AtomicObservableSubscription();
107+
innerSubscription.wrap(nextSequences.poll().subscribe(this));
108+
}
109+
}
110+
}
111+
};
112+
113+
outerSubscription.wrap(sequences.subscribe(new Observer<Observable<T>>() {
114+
@Override
115+
public void onNext(Observable<T> nextSequence) {
116+
synchronized (nextSequences) {
117+
if (innerSubscription == null) {
118+
// We are currently not subscribed to any sequence
119+
innerSubscription = new AtomicObservableSubscription();
120+
innerSubscription.wrap(nextSequence.subscribe(reusableObserver));
121+
} else {
122+
// Put this sequence at the end of the queue
123+
nextSequences.add(nextSequence);
117124
}
118-
}));
119-
try {
120-
latch.await();
121-
} catch (InterruptedException e) {
122-
Thread.currentThread().interrupt();
123-
throw Exceptions.propagate(e);
124125
}
125126
}
126127
@Override
127128
public void onError(Exception e) {
128-
// NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
129-
observer.onError(e);
129+
if (completedOrErred.compareAndSet(false, true)) {
130+
if (innerSubscription != null) {
131+
innerSubscription.unsubscribe();
132+
}
133+
observer.onError(e);
134+
}
130135
}
131136
@Override
132137
public void onCompleted() {
133-
// NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
134-
if (!innerError.get()) {
135-
observer.onCompleted();
138+
allSequencesReceived.set(true);
139+
if (innerSubscription == null) {
140+
// We are not subscribed to any sequence, and none are coming anymore
141+
if (completedOrErred.compareAndSet(false, true)) {
142+
observer.onCompleted();
143+
}
136144
}
137145
}
138146
}));
147+
148+
return new Subscription() {
149+
@Override
150+
public void unsubscribe() {
151+
synchronized (nextSequences) {
152+
if (innerSubscription != null)
153+
innerSubscription.unsubscribe();
154+
outerSubscription.unsubscribe();
155+
}
156+
}
157+
};
139158
}
140159
}
141160

@@ -442,9 +461,72 @@ public void testConcatConcurrentWithInfinity() {
442461

443462
}
444463

464+
465+
466+
@Test
467+
public void testConcatUnSubscribeNotBlockingObservables() {
468+
469+
final CountDownLatch okToContinueW1 = new CountDownLatch(1);
470+
final CountDownLatch okToContinueW2 = new CountDownLatch(1);
471+
472+
final TestObservable<String> w1 = new TestObservable<String>(null, okToContinueW1, "one", "two", "three");
473+
final TestObservable<String> w2 = new TestObservable<String>(null, okToContinueW2, "four", "five", "six");
474+
475+
@SuppressWarnings("unchecked")
476+
Observer<String> aObserver = mock(Observer.class);
477+
Observable<Observable<String>> observableOfObservables = Observable.create(new Func1<Observer<Observable<String>>, Subscription>() {
478+
479+
@Override
480+
public Subscription call(Observer<Observable<String>> observer) {
481+
// simulate what would happen in an observable
482+
observer.onNext(w1);
483+
observer.onNext(w2);
484+
observer.onCompleted();
485+
486+
return new Subscription() {
487+
488+
@Override
489+
public void unsubscribe() {
490+
}
491+
492+
};
493+
}
494+
495+
});
496+
Observable<String> concat = Observable.create(concat(observableOfObservables));
497+
498+
concat.subscribe(aObserver);
499+
500+
verify(aObserver, times(0)).onCompleted();
501+
502+
503+
//Wait for the thread to start up.
504+
try {
505+
Thread.sleep(25);
506+
w1.t.join();
507+
w2.t.join();
508+
okToContinueW1.countDown();
509+
okToContinueW2.countDown();
510+
} catch (InterruptedException e) {
511+
// TODO Auto-generated catch block
512+
e.printStackTrace();
513+
}
514+
515+
InOrder inOrder = inOrder(aObserver);
516+
inOrder.verify(aObserver, times(1)).onNext("one");
517+
inOrder.verify(aObserver, times(1)).onNext("two");
518+
inOrder.verify(aObserver, times(1)).onNext("three");
519+
inOrder.verify(aObserver, times(1)).onNext("four");
520+
inOrder.verify(aObserver, times(1)).onNext("five");
521+
inOrder.verify(aObserver, times(1)).onNext("six");
522+
verify(aObserver, times(1)).onCompleted();
523+
524+
525+
}
526+
445527

446528
/**
447-
* The outer observable is running on the same thread and subscribe() in this case is a blocking call. Calling unsubscribe() is no-op because the sequence is complete.
529+
* Test unsubscribing the concatenated Observable in a single thread.
448530
*/
449531
@Test
450532
public void testConcatUnsubscribe() {
@@ -458,20 +540,13 @@ public void testConcatUnsubscribe() {
458540
@SuppressWarnings("unchecked")
459541
final Observable<String> concat = Observable.create(concat(w1, w2));
460542
final AtomicObservableSubscription s1 = new AtomicObservableSubscription();
461-
Thread t = new Thread() {
462-
@Override
463-
public void run() {
464-
// NB: this statement does not complete until after "six" has been delivered.
465-
s1.wrap(concat.subscribe(aObserver));
466-
}
467-
};
468-
t.start();
543+
469544
try {
545+
// Subscribe
546+
s1.wrap(concat.subscribe(aObserver));
470547
//Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
471548
callOnce.await();
472-
// NB: This statement has no effect, since s1 cannot possibly
473-
// wrap anything until "six" has been delivered, which cannot
474-
// happen until we okToContinue.countDown()
549+
// Unsubcribe
475550
s1.unsubscribe();
476551
//Unblock the observable to continue.
477552
okToContinue.countDown();
@@ -487,10 +562,9 @@ public void run() {
487562
inOrder.verify(aObserver, times(1)).onNext("two");
488563
inOrder.verify(aObserver, times(1)).onNext("three");
489564
inOrder.verify(aObserver, times(1)).onNext("four");
490-
// NB: you might hope that five and six are not delivered, but see above.
491-
inOrder.verify(aObserver, times(1)).onNext("five");
492-
inOrder.verify(aObserver, times(1)).onNext("six");
493-
inOrder.verify(aObserver, times(1)).onCompleted();
565+
inOrder.verify(aObserver, never()).onNext("five");
566+
inOrder.verify(aObserver, never()).onNext("six");
567+
inOrder.verify(aObserver, never()).onCompleted();
494568

495569
}
496570

@@ -598,7 +672,7 @@ public void run() {
598672
once.countDown();
599673
//Block until the main thread has called unsubscribe.
600674
if (null != okToContinue)
601-
okToContinue.await();
675+
okToContinue.await(1, TimeUnit.SECONDS);
602676
}
603677
if (subscribed)
604678
observer.onCompleted();

0 commit comments

Comments
 (0)