Skip to content

Commit e85c3d1

Browse files
Synchronize terminalState and subscription logic
- remove the race condition that existed between a subscription coming in while onError/onCompleted was being called
1 parent 644806b commit e85c3d1

File tree

1 file changed

+34
-22
lines changed

1 file changed

+34
-22
lines changed

rxjava-core/src/main/java/rx/subjects/PublishSubject.java

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public static <T> PublishSubject<T> create() {
7272
Func1<Observer<T>, Subscription> onSubscribe = new Func1<Observer<T>, Subscription>() {
7373
@Override
7474
public Subscription call(Observer<T> observer) {
75-
// first check if terminal state exist
75+
// shortcut check if terminal state exists already
7676
Subscription s = checkTerminalState(observer);
7777
if(s != null) return s;
7878

@@ -86,29 +86,27 @@ public void unsubscribe() {
8686
}
8787
});
8888

89-
// on subscribe add it to the map of outbound observers to notify
90-
observers.put(subscription, observer);
91-
92-
// check terminal state again
93-
s = checkTerminalState(observer);
94-
if(s != null) return s;
95-
9689
/**
97-
* NOTE: There is a race condition here.
98-
*
99-
* 1) terminal state gets set in onError or onCompleted
100-
* 2) observers.put adds a new observer
101-
* 3) checkTerminalState emits onError/onCompleted
102-
* 4) onError or onCompleted also emits onError/onCompleted since it was adds to observers
90+
* NOTE: We are synchronizing to avoid a race condition between terminalState being set and
91+
* a new observer being added to observers.
10392
*
104-
* Thus the terminal state could end up being sent twice.
93+
* The synchronization only occurs on subscription and terminal states, it does not affect onNext calls
94+
* so a high-volume hot-observable will not pay this cost for emitting data.
10595
*
106-
* I'm going to leave this for now as AtomicObserver will protect against this
107-
* and I'd rather not add blocking synchronization in here unless the above race condition
108-
* truly is an issue.
96+
* Due to the restricted impact of blocking synchronization here I have not pursued more complicated
97+
* approaches to try and stay completely non-blocking.
10998
*/
110-
111-
return subscription;
99+
synchronized (terminalState) {
100+
// check terminal state again
101+
s = checkTerminalState(observer);
102+
if (s != null)
103+
return s;
104+
105+
// on subscribe add it to the map of outbound observers to notify
106+
observers.put(subscription, observer);
107+
108+
return subscription;
109+
}
112110
}
113111

114112
private Subscription checkTerminalState(Observer<T> observer) {
@@ -141,7 +139,14 @@ protected PublishSubject(Func1<Observer<T>, Subscription> onSubscribe, Concurren
141139

142140
@Override
143141
public void onCompleted() {
144-
terminalState.set(new Notification<T>());
142+
/**
143+
* Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
144+
* Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
145+
* onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
146+
*/
147+
synchronized (terminalState) {
148+
terminalState.set(new Notification<T>());
149+
}
145150
for (Observer<T> observer : snapshotOfValues()) {
146151
observer.onCompleted();
147152
}
@@ -150,7 +155,14 @@ public void onCompleted() {
150155

151156
@Override
152157
public void onError(Exception e) {
153-
terminalState.set(new Notification<T>(e));
158+
/**
159+
* Synchronizing despite terminalState being an AtomicReference because of multi-step logic in subscription.
160+
* Why use AtomicReference then? Convenient for passing around a mutable reference holder between the
161+
* onSubscribe function and PublishSubject instance... and it's a "better volatile" for the shortcut codepath.
162+
*/
163+
synchronized (terminalState) {
164+
terminalState.set(new Notification<T>(e));
165+
}
154166
for (Observer<T> observer : snapshotOfValues()) {
155167
observer.onError(e);
156168
}

0 commit comments

Comments
 (0)