1
+ /**
2
+ * Copyright 2013 Netflix, Inc.
3
+ *
4
+ * Licensed under the Apache License, Version 2.0 (the "License");
5
+ * you may not use this file except in compliance with the License.
6
+ * You may obtain a copy of the License at
7
+ *
8
+ * http://www.apache.org/licenses/LICENSE-2.0
9
+ *
10
+ * Unless required by applicable law or agreed to in writing, software
11
+ * distributed under the License is distributed on an "AS IS" BASIS,
12
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
+ * See the License for the specific language governing permissions and
14
+ * limitations under the License.
15
+ */
16
+ package rx .operators ;
17
+
18
+ import java .util .ArrayList ;
19
+ import java .util .HashMap ;
20
+ import java .util .List ;
21
+ import java .util .Map ;
22
+ import rx .Observable ;
23
+ import rx .Observable .OnSubscribeFunc ;
24
+ import rx .Observer ;
25
+ import rx .Subscription ;
26
+ import rx .subjects .PublishSubject ;
27
+ import rx .subjects .Subject ;
28
+ import rx .subscriptions .CompositeSubscription ;
29
+ import rx .subscriptions .RefCountSubscription ;
30
+ import rx .subscriptions .SerialSubscription ;
31
+ import rx .util .functions .Func1 ;
32
+ import rx .util .functions .Func2 ;
33
+
34
+ /**
35
+ * Corrrelates two sequences when they overlap and groups the results.
36
+ *
37
+ * @see <a href="http://msdn.microsoft.com/en-us/library/hh244235.aspx">MSDN: Observable.GroupJoin</a>
38
+ */
39
+ public class OperationGroupJoin <T1 , T2 , D1 , D2 , R > implements OnSubscribeFunc <R > {
40
+ protected final Observable <T1 > left ;
41
+ protected final Observable <T2 > right ;
42
+ protected final Func1 <? super T1 , ? extends Observable <D1 >> leftDuration ;
43
+ protected final Func1 <? super T2 , ? extends Observable <D2 >> rightDuration ;
44
+ protected final Func2 <? super T1 , ? super Observable <T2 >, ? extends R > resultSelector ;
45
+ public OperationGroupJoin (
46
+ Observable <T1 > left ,
47
+ Observable <T2 > right ,
48
+ Func1 <? super T1 , ? extends Observable <D1 >> leftDuration ,
49
+ Func1 <? super T2 , ? extends Observable <D2 >> rightDuration ,
50
+ Func2 <? super T1 , ? super Observable <T2 >, ? extends R > resultSelector
51
+ ) {
52
+ this .left = left ;
53
+ this .right = right ;
54
+ this .leftDuration = leftDuration ;
55
+ this .rightDuration = rightDuration ;
56
+ this .resultSelector = resultSelector ;
57
+ }
58
+ @ Override
59
+ public Subscription onSubscribe (Observer <? super R > t1 ) {
60
+ ResultManager ro = new ResultManager (t1 );
61
+ ro .init ();
62
+ return ro ;
63
+ }
64
+ /** Manages sub-observers and subscriptions. */
65
+ class ResultManager implements Subscription {
66
+ final RefCountSubscription cancel ;
67
+ final Observer <? super R > observer ;
68
+ final CompositeSubscription group ;
69
+ final Object guard = new Object ();
70
+ int leftIds ;
71
+ int rightIds ;
72
+ final Map <Integer , Observer <T2 >> leftMap = new HashMap <Integer , Observer <T2 >>();
73
+ final Map <Integer , T2 > rightMap = new HashMap <Integer , T2 >();
74
+ boolean leftDone ;
75
+ boolean rightDone ;
76
+ public ResultManager (Observer <? super R > observer ) {
77
+ this .observer = observer ;
78
+ this .group = new CompositeSubscription ();
79
+ this .cancel = new RefCountSubscription (group );
80
+ }
81
+ public void init () {
82
+ SerialSubscription s1 = new SerialSubscription ();
83
+ SerialSubscription s2 = new SerialSubscription ();
84
+
85
+ group .add (s1 );
86
+ group .add (s2 );
87
+
88
+ s1 .setSubscription (left .subscribe (new LeftObserver (s1 )));
89
+ s2 .setSubscription (right .subscribe (new RightObserver (s2 )));
90
+
91
+ }
92
+
93
+ @ Override
94
+ public void unsubscribe () {
95
+ cancel .unsubscribe ();
96
+ }
97
+ void groupsOnCompleted () {
98
+ List <Observer <T2 >> list = new ArrayList <Observer <T2 >>(leftMap .values ());
99
+ leftMap .clear ();
100
+ rightMap .clear ();
101
+ for (Observer <T2 > o : list ) {
102
+ o .onCompleted ();
103
+ }
104
+ }
105
+ /** Observe the left source. */
106
+ class LeftObserver implements Observer <T1 > {
107
+ final Subscription tosource ;
108
+ public LeftObserver (Subscription tosource ) {
109
+ this .tosource = tosource ;
110
+ }
111
+ @ Override
112
+ public void onNext (T1 args ) {
113
+ try {
114
+ int id ;
115
+ Subject <T2 , T2 > subj = PublishSubject .create ();
116
+ synchronized (guard ) {
117
+ id = leftIds ++;
118
+ leftMap .put (id , subj );
119
+ }
120
+
121
+ Observable <T2 > window = Observable .create (new WindowObservableFunc <T2 >(subj , cancel ));
122
+
123
+ Observable <D1 > duration = leftDuration .call (args );
124
+
125
+ SerialSubscription sduration = new SerialSubscription ();
126
+ group .add (sduration );
127
+ sduration .setSubscription (duration .subscribe (new LeftDurationObserver (id , sduration , subj )));
128
+
129
+ R result = resultSelector .call (args , window );
130
+
131
+ synchronized (guard ) {
132
+ observer .onNext (result );
133
+ for (T2 t2 : rightMap .values ()) {
134
+ subj .onNext (t2 );
135
+
136
+ }
137
+ }
138
+ } catch (Throwable t ) {
139
+ onError (t );
140
+ }
141
+ }
142
+
143
+ @ Override
144
+ public void onCompleted () {
145
+ synchronized (guard ) {
146
+ leftDone = true ;
147
+ if (rightDone ) {
148
+ groupsOnCompleted ();
149
+ observer .onCompleted ();
150
+ cancel .unsubscribe ();
151
+ }
152
+ }
153
+ }
154
+
155
+ @ Override
156
+ public void onError (Throwable e ) {
157
+ synchronized (guard ) {
158
+ for (Observer <T2 > o : leftMap .values ()) {
159
+ o .onError (e );
160
+ }
161
+ observer .onError (e );
162
+ cancel .unsubscribe ();
163
+ }
164
+ }
165
+
166
+
167
+ }
168
+ /** Observe the right source. */
169
+ class RightObserver implements Observer <T2 > {
170
+ final Subscription tosource ;
171
+ public RightObserver (Subscription tosource ) {
172
+ this .tosource = tosource ;
173
+ }
174
+ @ Override
175
+ public void onNext (T2 args ) {
176
+ try {
177
+ int id ;
178
+ synchronized (guard ) {
179
+ id = rightIds ++;
180
+ rightMap .put (id , args );
181
+ }
182
+ Observable <D2 > duration = rightDuration .call (args );
183
+
184
+ SerialSubscription sduration = new SerialSubscription ();
185
+ group .add (sduration );
186
+ sduration .setSubscription (duration .subscribe (new RightDurationObserver (id , sduration )));
187
+
188
+ synchronized (guard ) {
189
+ for (Observer <T2 > o : leftMap .values ()) {
190
+ o .onNext (args );
191
+ }
192
+ }
193
+ } catch (Throwable t ) {
194
+ onError (t );
195
+ }
196
+ }
197
+
198
+ @ Override
199
+ public void onCompleted () {
200
+ // tosource.unsubscribe();
201
+ synchronized (guard ) {
202
+ rightDone = true ;
203
+ if (leftDone ) {
204
+ groupsOnCompleted ();
205
+ observer .onCompleted ();
206
+ cancel .unsubscribe ();
207
+ }
208
+ }
209
+ }
210
+
211
+ @ Override
212
+ public void onError (Throwable e ) {
213
+ synchronized (guard ) {
214
+ for (Observer <T2 > o : leftMap .values ()) {
215
+ o .onError (e );
216
+ }
217
+
218
+ observer .onError (e );
219
+ cancel .unsubscribe ();
220
+ }
221
+ }
222
+ }
223
+ /** Observe left duration and apply termination. */
224
+ class LeftDurationObserver implements Observer <D1 > {
225
+ final int id ;
226
+ final Subscription sduration ;
227
+ final Observer <T2 > gr ;
228
+ public LeftDurationObserver (int id , Subscription sduration , Observer <T2 > gr ) {
229
+ this .id = id ;
230
+ this .sduration = sduration ;
231
+ this .gr = gr ;
232
+ }
233
+
234
+ @ Override
235
+ public void onCompleted () {
236
+ synchronized (guard ) {
237
+ if (leftMap .remove (id ) != null ) {
238
+ gr .onCompleted ();
239
+ }
240
+ }
241
+ group .remove (sduration );
242
+ }
243
+
244
+ @ Override
245
+ public void onError (Throwable e ) {
246
+ synchronized (guard ) {
247
+ observer .onError (e );
248
+ }
249
+ cancel .unsubscribe ();
250
+ }
251
+
252
+ @ Override
253
+ public void onNext (D1 args ) {
254
+ onCompleted ();
255
+ }
256
+ }
257
+ /** Observe right duration and apply termination. */
258
+ class RightDurationObserver implements Observer <D2 > {
259
+ final int id ;
260
+ final Subscription sduration ;
261
+ public RightDurationObserver (int id , Subscription sduration ) {
262
+ this .id = id ;
263
+ this .sduration = sduration ;
264
+ }
265
+
266
+ @ Override
267
+ public void onCompleted () {
268
+ synchronized (guard ) {
269
+ rightMap .remove (id );
270
+ }
271
+ group .remove (sduration );
272
+ }
273
+
274
+ @ Override
275
+ public void onError (Throwable e ) {
276
+ synchronized (guard ) {
277
+ observer .onError (e );
278
+ }
279
+ cancel .unsubscribe ();
280
+ }
281
+
282
+ @ Override
283
+ public void onNext (D2 args ) {
284
+ onCompleted ();
285
+ }
286
+ }
287
+ }
288
+ /**
289
+ * The reference-counted window observable.
290
+ * Subscribes to the underlying Observable by using a reference-counted
291
+ * subscription.
292
+ */
293
+ static class WindowObservableFunc <T > implements OnSubscribeFunc <T > {
294
+ final RefCountSubscription refCount ;
295
+ final Observable <T > underlying ;
296
+ public WindowObservableFunc (Observable <T > underlying , RefCountSubscription refCount ) {
297
+ this .refCount = refCount ;
298
+ this .underlying = underlying ;
299
+ }
300
+
301
+ @ Override
302
+ public Subscription onSubscribe (Observer <? super T > t1 ) {
303
+ CompositeSubscription cs = new CompositeSubscription ();
304
+ cs .add (refCount .getSubscription ());
305
+ WindowObserver wo = new WindowObserver (t1 , cs );
306
+ cs .add (underlying .subscribe (wo ));
307
+ return cs ;
308
+ }
309
+ /** Observe activities on the window. */
310
+ class WindowObserver implements Observer <T > {
311
+ final Observer <? super T > observer ;
312
+ final Subscription self ;
313
+ public WindowObserver (Observer <? super T > observer , Subscription self ) {
314
+ this .observer = observer ;
315
+ this .self = self ;
316
+ }
317
+ @ Override
318
+ public void onNext (T args ) {
319
+ observer .onNext (args );
320
+ }
321
+ @ Override
322
+ public void onError (Throwable e ) {
323
+ observer .onError (e );
324
+ self .unsubscribe ();
325
+ }
326
+ @ Override
327
+ public void onCompleted () {
328
+ observer .onCompleted ();
329
+ self .unsubscribe ();
330
+ }
331
+ }
332
+ }
333
+ }
0 commit comments