Skip to content

Commit cfa7155

Browse files
committed
Rewrite concat operation to not block on subscribe
The concat operator previously blocked on calling subscribe until all the sequences had finished. In quite some cases this results in unwanted (and unexpected) behaviour, such as when prefixing an infinite Observable with a fixed one, for example when using startWith (which calls concat): someInputStream.startWith(123).subscribe(x -> print(x)); This statement will block indefinitely if the input stream is infinite. Also on finite sequences it seems silly to have to wait for them to finish. In this new approach the incoming observables are put into a queue, instead of waiting for the whole sequence to finish. When the first observable completes, the next one is taken from the queue and subscribed to, and so on. The queue can be extended while processing the observables, and onCompleted is only called when both the source of observables has completed and all observables in the queue have been read.
1 parent 62ec36e commit cfa7155

File tree

1 file changed

+88
-79
lines changed

1 file changed

+88
-79
lines changed

rxjava-core/src/main/java/rx/operators/OperationConcat.java

Lines changed: 88 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import java.util.Arrays;
2222
import java.util.ArrayList;
2323
import java.util.List;
24+
import java.util.Queue;
2425
import java.util.concurrent.CountDownLatch;
2526
import java.util.concurrent.atomic.AtomicBoolean;
2627
import java.util.concurrent.atomic.AtomicReference;
28+
import java.util.concurrent.ConcurrentLinkedQueue;
2729

2830
import org.junit.Test;
2931

@@ -41,23 +43,10 @@ public final class OperationConcat {
4143

4244
/**
4345
* Combine the observable sequences from the list of Observables into one
44-
* observable sequence without any transformation. If either the outer
46+
* observable sequence without any transformation. If either the outer
4547
* observable or an inner observable calls onError, we will call onError.
46-
*
47-
* <p/>
48-
*
49-
* The outer observable might run on a separate thread from (one of) the
50-
* inner observables; in this case care must be taken to avoid a deadlock.
51-
* The Concat operation may block the outer thread while servicing an inner
52-
* thread in order to ensure a well-defined ordering of elements; therefore
53-
* none of the inner threads must be implemented in a way that might wait on
54-
* the outer thread.
55-
*
5648
* <p/>
5749
*
58-
* Beware that concat(o1,o2).subscribe() is a blocking call from
59-
* which it is impossible to unsubscribe if observables are running on same thread.
60-
*
6150
* @param sequences An observable sequence of elements to project.
6251
* @return An observable sequence whose elements are the result of combining the output from the list of Observables.
6352
*/
@@ -70,73 +59,101 @@ public static <T> Func1<Observer<T>, Subscription> concat(final List<Observable<
7059
}
7160

7261
public static <T> Func1<Observer<T>, Subscription> concat(final Observable<Observable<T>> sequences) {
73-
return new Func1<Observer<T>, Subscription>() {
74-
75-
@Override
76-
public Subscription call(Observer<T> observer) {
77-
return new ConcatSubscription<T>(sequences, observer);
78-
}
79-
};
62+
return new Concat<T>(sequences);
8063
}
8164

82-
private static class ConcatSubscription<T> extends BooleanSubscription {
83-
// Might be updated by an inner thread's onError during the outer
84-
// thread's onNext, then read in the outer thread's onComplete.
85-
final AtomicBoolean innerError = new AtomicBoolean(false);
65+
private static class Concat<T> implements Func1<Observer<T>, Subscription> {
66+
private Observable<Observable<T>> sequences;
67+
private AtomicObservableSubscription innerSubscription = null;
68+
69+
public Concat(Observable<Observable<T>> sequences) {
70+
this.sequences = sequences;
71+
}
8672

87-
public ConcatSubscription(Observable<Observable<T>> sequences, final Observer<T> observer) {
73+
public Subscription call(final Observer<T> observer) {
74+
final AtomicBoolean completedOrErred = new AtomicBoolean(false);
75+
final AtomicBoolean allSequencesReceived = new AtomicBoolean(false);
76+
final Queue<Observable<T>> nextSequences = new ConcurrentLinkedQueue<Observable<T>>();
8877
final AtomicObservableSubscription outerSubscription = new AtomicObservableSubscription();
89-
outerSubscription.wrap(sequences.subscribe(new Observer<Observable<T>>() {
78+
79+
final Observer<T> reusableObserver = new Observer<T>() {
9080
@Override
91-
public void onNext(Observable<T> nextSequence) {
92-
// We will not return from onNext until the inner observer completes.
93-
// NB: while we are in onNext, the well-behaved outer observable will not call onError or onCompleted.
94-
final CountDownLatch latch = new CountDownLatch(1);
95-
final AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription();
96-
innerSubscription.wrap(nextSequence.subscribe(new Observer<T>() {
97-
@Override
98-
public void onNext(T item) {
99-
// Make our best-effort to release resources in the face of unsubscribe.
100-
if (isUnsubscribed()) {
101-
innerSubscription.unsubscribe();
102-
outerSubscription.unsubscribe();
103-
} else {
104-
observer.onNext(item);
81+
public void onNext(T item) {
82+
observer.onNext(item);
83+
}
84+
@Override
85+
public void onError(Exception e) {
86+
if (completedOrErred.compareAndSet(false, true)) {
87+
outerSubscription.unsubscribe();
88+
observer.onError(e);
89+
}
90+
}
91+
@Override
92+
public void onCompleted() {
93+
synchronized (nextSequences) {
94+
if (nextSequences.isEmpty()) {
95+
// No new sequences available at the moment
96+
innerSubscription = null;
97+
if (allSequencesReceived.get()) {
98+
// No new sequences are coming, we are finished
99+
if (completedOrErred.compareAndSet(false, true)) {
100+
observer.onCompleted();
101+
}
105102
}
106-
}
107-
@Override
108-
public void onError(Exception e) {
109-
outerSubscription.unsubscribe();
110-
innerError.set(true);
111-
observer.onError(e);
112-
latch.countDown();
113-
}
114-
@Override
115-
public void onCompleted() {
103+
} else {
116104
// Continue on to the next sequence
117-
latch.countDown();
105+
innerSubscription = new AtomicObservableSubscription();
106+
innerSubscription.wrap(nextSequences.poll().subscribe(this));
107+
}
108+
}
109+
}
110+
};
111+
112+
outerSubscription.wrap(sequences.subscribe(new Observer<Observable<T>>() {
113+
@Override
114+
public void onNext(Observable<T> nextSequence) {
115+
synchronized (nextSequences) {
116+
if (innerSubscription == null) {
117+
// We are currently not subscribed to any sequence
118+
innerSubscription = new AtomicObservableSubscription();
119+
innerSubscription.wrap(nextSequence.subscribe(reusableObserver));
120+
} else {
121+
// Put this sequence at the end of the queue
122+
nextSequences.add(nextSequence);
118123
}
119-
}));
120-
try {
121-
latch.await();
122-
} catch (InterruptedException e) {
123-
Thread.currentThread().interrupt();
124-
throw Exceptions.propagate(e);
125124
}
126125
}
127126
@Override
128127
public void onError(Exception e) {
129-
// NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
130-
observer.onError(e);
128+
if (completedOrErred.compareAndSet(false, true)) {
129+
if (innerSubscription != null) {
130+
innerSubscription.unsubscribe();
131+
}
132+
observer.onError(e);
133+
}
131134
}
132135
@Override
133136
public void onCompleted() {
134-
// NB: a well-behaved observable will not interleave on{Next,Error,Completed} calls.
135-
if (!innerError.get()) {
136-
observer.onCompleted();
137+
allSequencesReceived.set(true);
138+
if (innerSubscription == null) {
139+
// We are not subscribed to any sequence, and none are coming anymore
140+
if (completedOrErred.compareAndSet(false, true)) {
141+
observer.onCompleted();
142+
}
137143
}
138144
}
139145
}));
146+
147+
return new Subscription() {
148+
@Override
149+
public void unsubscribe() {
150+
synchronized (nextSequences) {
151+
if (innerSubscription != null)
152+
innerSubscription.unsubscribe();
153+
outerSubscription.unsubscribe();
154+
}
155+
}
156+
};
140157
}
141158
}
142159

@@ -445,7 +462,7 @@ public void testConcatConcurrentWithInfinity() {
445462

446463

447464
/**
448-
* The outer observable is running on the same thread and subscribe() in this case is a blocking call. Calling unsubscribe() is no-op because the sequence is complete.
465+
* Test unsubscribing the concatenated Observable in a single thread.
449466
*/
450467
@Test
451468
public void testConcatUnsubscribe() {
@@ -459,20 +476,13 @@ public void testConcatUnsubscribe() {
459476
@SuppressWarnings("unchecked")
460477
final Observable<String> concat = Observable.create(concat(w1, w2));
461478
final AtomicObservableSubscription s1 = new AtomicObservableSubscription();
462-
Thread t = new Thread() {
463-
@Override
464-
public void run() {
465-
// NB: this statement does not complete until after "six" has been delivered.
466-
s1.wrap(concat.subscribe(aObserver));
467-
}
468-
};
469-
t.start();
479+
470480
try {
481+
// Subscribe
482+
s1.wrap(concat.subscribe(aObserver));
471483
//Block main thread to allow observable "w1" to complete and observable "w2" to call onNext once.
472484
callOnce.await();
473-
// NB: This statement has no effect, since s1 cannot possibly
474-
// wrap anything until "six" has been delivered, which cannot
475-
// happen until we okToContinue.countDown()
485+
// Unsubcribe
476486
s1.unsubscribe();
477487
//Unblock the observable to continue.
478488
okToContinue.countDown();
@@ -488,10 +498,9 @@ public void run() {
488498
inOrder.verify(aObserver, times(1)).onNext("two");
489499
inOrder.verify(aObserver, times(1)).onNext("three");
490500
inOrder.verify(aObserver, times(1)).onNext("four");
491-
// NB: you might hope that five and six are not delivered, but see above.
492-
inOrder.verify(aObserver, times(1)).onNext("five");
493-
inOrder.verify(aObserver, times(1)).onNext("six");
494-
inOrder.verify(aObserver, times(1)).onCompleted();
501+
inOrder.verify(aObserver, never()).onNext("five");
502+
inOrder.verify(aObserver, never()).onNext("six");
503+
inOrder.verify(aObserver, never()).onCompleted();
495504

496505
}
497506

0 commit comments

Comments
 (0)