40
40
import static org .mockito .Matchers .any ;
41
41
import static org .mockito .Matchers .anyString ;
42
42
43
-
44
43
/**
45
44
* This operation transforms an {@link Observable} sequence of {@link Observable} sequences into a single
46
45
* {@link Observable} sequence which only produces values from the most recently published {@link Observable}
@@ -74,32 +73,32 @@ public Switch(Observable<Observable<T>> sequences) {
74
73
75
74
@ Override
76
75
public Subscription call (Observer <T > observer ) {
77
- AtomicObservableSubscription subscription = new AtomicObservableSubscription ();
78
- subscription .wrap (sequences .subscribe (new SwitchObserver <T >(observer , subscription )));
79
- return subscription ;
76
+ AtomicObservableSubscription subscription = new AtomicObservableSubscription ();
77
+ subscription .wrap (sequences .subscribe (new SwitchObserver <T >(observer , subscription )));
78
+ return subscription ;
80
79
}
81
80
}
82
81
83
82
private static class SwitchObserver <T > implements Observer <Observable <T >> {
84
83
85
84
private final Observer <T > observer ;
86
- private final AtomicObservableSubscription parent ;
87
- private final AtomicReference <Subscription > subsequence = new AtomicReference <Subscription >();
85
+ private final AtomicObservableSubscription parent ;
86
+ private final AtomicReference <Subscription > subsequence = new AtomicReference <Subscription >();
88
87
89
88
public SwitchObserver (Observer <T > observer , AtomicObservableSubscription parent ) {
90
89
this .observer = observer ;
91
- this .parent = parent ;
90
+ this .parent = parent ;
92
91
}
93
92
94
93
@ Override
95
94
public void onCompleted () {
96
- unsubscribeFromSubSequence ();
95
+ unsubscribeFromSubSequence ();
97
96
observer .onCompleted ();
98
97
}
99
98
100
99
@ Override
101
100
public void onError (Exception e ) {
102
- unsubscribeFromSubSequence ();
101
+ unsubscribeFromSubSequence ();
103
102
observer .onError (e );
104
103
}
105
104
@@ -110,28 +109,28 @@ public void onNext(Observable<T> args) {
110
109
subsequence .set (args .subscribe (new Observer <T >() {
111
110
@ Override
112
111
public void onCompleted () {
113
- // Do nothing.
112
+ // Do nothing.
114
113
}
115
114
116
115
@ Override
117
116
public void onError (Exception e ) {
118
- parent .unsubscribe ();
119
- observer .onError (e );
117
+ parent .unsubscribe ();
118
+ observer .onError (e );
120
119
}
121
120
122
- @ Override
121
+ @ Override
123
122
public void onNext (T args ) {
124
123
observer .onNext (args );
125
124
}
126
125
}));
127
126
}
128
127
129
- private void unsubscribeFromSubSequence () {
130
- Subscription previousSubscription = subsequence .get ();
128
+ private void unsubscribeFromSubSequence () {
129
+ Subscription previousSubscription = subsequence .get ();
131
130
if (previousSubscription != null ) {
132
131
previousSubscription .unsubscribe ();
133
132
}
134
- }
133
+ }
135
134
}
136
135
137
136
public static class UnitTest {
0 commit comments