24
24
import java .util .concurrent .ConcurrentLinkedQueue ;
25
25
import java .util .concurrent .CountDownLatch ;
26
26
import java .util .concurrent .TimeUnit ;
27
+ import java .util .concurrent .atomic .AtomicBoolean ;
27
28
import java .util .concurrent .atomic .AtomicInteger ;
28
29
import java .util .concurrent .atomic .AtomicReference ;
29
30
33
34
import rx .Observer ;
34
35
import rx .Subscription ;
35
36
import rx .observables .GroupedObservable ;
37
+ import rx .subscriptions .BooleanSubscription ;
36
38
import rx .subscriptions .Subscriptions ;
37
39
import rx .util .functions .Action1 ;
38
40
import rx .util .functions .Func1 ;
@@ -63,14 +65,18 @@ private static class GroupBy<K, V> implements Func1<Observer<GroupedObservable<K
63
65
64
66
private final Observable <KeyValue <K , V >> source ;
65
67
private final ConcurrentHashMap <K , GroupedSubject <K , V >> groupedObservables = new ConcurrentHashMap <K , GroupedSubject <K , V >>();
68
+ private final AtomicObservableSubscription actualParentSubscription = new AtomicObservableSubscription ();
69
+ private final AtomicInteger numGroupSubscriptions = new AtomicInteger ();
70
+ private final AtomicBoolean unsubscribeRequested = new AtomicBoolean (false );
66
71
67
72
private GroupBy (Observable <KeyValue <K , V >> source ) {
68
73
this .source = source ;
69
74
}
70
75
71
76
@ Override
72
77
public Subscription call (final Observer <GroupedObservable <K , V >> observer ) {
73
- return source .subscribe (new Observer <KeyValue <K , V >>() {
78
+ final GroupBy <K , V > _this = this ;
79
+ actualParentSubscription .wrap (source .subscribe (new Observer <KeyValue <K , V >>() {
74
80
75
81
@ Override
76
82
public void onCompleted () {
@@ -96,12 +102,17 @@ public void onError(Exception e) {
96
102
public void onNext (KeyValue <K , V > value ) {
97
103
GroupedSubject <K , V > gs = groupedObservables .get (value .key );
98
104
if (gs == null ) {
105
+ if (unsubscribeRequested .get ()) {
106
+ // unsubscribe has been requested so don't create new groups
107
+ // only send data to groups already created
108
+ return ;
109
+ }
99
110
/*
100
111
* Technically the source should be single-threaded so we shouldn't need to do this but I am
101
112
* programming defensively as most operators are so this can work with a concurrent sequence
102
113
* if it ends up receiving one.
103
114
*/
104
- GroupedSubject <K , V > newGs = GroupedSubject .<K , V > create (value .key );
115
+ GroupedSubject <K , V > newGs = GroupedSubject .<K , V > create (value .key , _this );
105
116
GroupedSubject <K , V > existing = groupedObservables .putIfAbsent (value .key , newGs );
106
117
if (existing == null ) {
107
118
// we won so use the one we created
@@ -115,34 +126,72 @@ public void onNext(KeyValue<K, V> value) {
115
126
}
116
127
gs .onNext (value .value );
117
128
}
118
- });
129
+ }));
130
+
131
+ return new Subscription () {
132
+
133
+ @ Override
134
+ public void unsubscribe () {
135
+ if (numGroupSubscriptions .get () == 0 ) {
136
+ // if we have no group subscriptions we will unsubscribe
137
+ actualParentSubscription .unsubscribe ();
138
+ // otherwise we mark to not send any more groups (waiting on existing groups to finish)
139
+ unsubscribeRequested .set (true );
140
+ }
141
+ }
142
+ };
143
+ }
144
+
145
+ /**
146
+ * Children notify of being subscribed to.
147
+ *
148
+ * @param key
149
+ */
150
+ private void subscribeKey (K key ) {
151
+ numGroupSubscriptions .incrementAndGet ();
152
+ }
153
+
154
+ /**
155
+ * Children notify of being unsubscribed from.
156
+ *
157
+ * @param key
158
+ */
159
+ private void unsubscribeKey (K key ) {
160
+ int c = numGroupSubscriptions .decrementAndGet ();
161
+ if (c == 0 ) {
162
+ actualParentSubscription .unsubscribe ();
163
+ }
119
164
}
120
165
}
121
166
122
167
private static class GroupedSubject <K , T > extends GroupedObservable <K , T > implements Observer <T > {
123
168
124
- static <K , T > GroupedSubject <K , T > create (K key ) {
169
+ static <K , T > GroupedSubject <K , T > create (final K key , final GroupBy < K , T > parent ) {
125
170
@ SuppressWarnings ("unchecked" )
126
171
final AtomicReference <Observer <T >> subscribedObserver = new AtomicReference <Observer <T >>(EMPTY_OBSERVER );
127
172
128
173
return new GroupedSubject <K , T >(key , new Func1 <Observer <T >, Subscription >() {
129
174
175
+ private final AtomicObservableSubscription subscription = new AtomicObservableSubscription ();
176
+
130
177
@ Override
131
178
public Subscription call (Observer <T > observer ) {
132
179
// register Observer
133
180
subscribedObserver .set (observer );
134
181
135
- return new Subscription () {
182
+ parent .subscribeKey (key );
183
+
184
+ return subscription .wrap (new Subscription () {
136
185
137
186
@ SuppressWarnings ("unchecked" )
138
187
@ Override
139
188
public void unsubscribe () {
140
189
// we remove the Observer so we stop emitting further events (they will be ignored if parent continues to send)
141
190
subscribedObserver .set (EMPTY_OBSERVER );
142
- // I don't believe we need to worry about the parent here as it's a separate sequence that would
143
- // be unsubscribed to directly if that needs to happen.
191
+ // now we need to notify the parent that we're unsubscribed
192
+ parent . unsubscribeKey ( key );
144
193
}
145
- };
194
+ }) ;
146
195
}
147
196
}, subscribedObserver );
148
197
}
@@ -232,11 +281,63 @@ public void testEmpty() {
232
281
assertTrue (map .isEmpty ());
233
282
}
234
283
284
+ @ Test
285
+ public void testError () {
286
+ Observable <String > sourceStrings = Observable .from ("one" , "two" , "three" , "four" , "five" , "six" );
287
+ Observable <String > errorSource = Observable .error (new RuntimeException ("forced failure" ));
288
+ @ SuppressWarnings ("unchecked" )
289
+ Observable <String > source = Observable .concat (sourceStrings , errorSource );
290
+
291
+ Observable <GroupedObservable <Integer , String >> grouped = Observable .create (groupBy (source , length ));
292
+
293
+ final AtomicInteger groupCounter = new AtomicInteger ();
294
+ final AtomicInteger eventCounter = new AtomicInteger ();
295
+ final AtomicReference <Exception > error = new AtomicReference <Exception >();
296
+
297
+ grouped .mapMany (new Func1 <GroupedObservable <Integer , String >, Observable <String >>() {
298
+
299
+ @ Override
300
+ public Observable <String > call (final GroupedObservable <Integer , String > o ) {
301
+ groupCounter .incrementAndGet ();
302
+ return o .map (new Func1 <String , String >() {
303
+
304
+ @ Override
305
+ public String call (String v ) {
306
+ return "Event => key: " + o .getKey () + " value: " + v ;
307
+ }
308
+ });
309
+ }
310
+ }).subscribe (new Observer <String >() {
311
+
312
+ @ Override
313
+ public void onCompleted () {
314
+
315
+ }
316
+
317
+ @ Override
318
+ public void onError (Exception e ) {
319
+ e .printStackTrace ();
320
+ error .set (e );
321
+ }
322
+
323
+ @ Override
324
+ public void onNext (String v ) {
325
+ eventCounter .incrementAndGet ();
326
+ System .out .println (v );
327
+
328
+ }
329
+ });
330
+
331
+ assertEquals (3 , groupCounter .get ());
332
+ assertEquals (6 , eventCounter .get ());
333
+ assertNotNull (error .get ());
334
+ }
335
+
235
336
private static <K , V > Map <K , Collection <V >> toMap (Observable <GroupedObservable <K , V >> observable ) {
236
337
237
338
final ConcurrentHashMap <K , Collection <V >> result = new ConcurrentHashMap <K , Collection <V >>();
238
339
239
- observable .forEach (new Action1 <GroupedObservable <K , V >>() {
340
+ observable .toBlockingObservable (). forEach (new Action1 <GroupedObservable <K , V >>() {
240
341
241
342
@ Override
242
343
public void call (final GroupedObservable <K , V > o ) {
@@ -344,6 +445,107 @@ public void onNext(String outputMessage) {
344
445
345
446
}
346
447
448
+ /*
449
+ * We will only take 1 group with 20 events from it and then unsubscribe.
450
+ */
451
+ @ Test
452
+ public void testUnsubscribe () throws InterruptedException {
453
+
454
+ final AtomicInteger eventCounter = new AtomicInteger ();
455
+ final AtomicInteger subscribeCounter = new AtomicInteger ();
456
+ final AtomicInteger groupCounter = new AtomicInteger ();
457
+ final AtomicInteger sentEventCounter = new AtomicInteger ();
458
+ final CountDownLatch latch = new CountDownLatch (1 );
459
+ final int count = 100 ;
460
+ final int groupCount = 2 ;
461
+
462
+ Observable <Event > es = Observable .create (new Func1 <Observer <Event >, Subscription >() {
463
+
464
+ @ Override
465
+ public Subscription call (final Observer <Event > observer ) {
466
+ final BooleanSubscription s = new BooleanSubscription ();
467
+ System .out .println ("*** Subscribing to EventStream ***" );
468
+ subscribeCounter .incrementAndGet ();
469
+ new Thread (new Runnable () {
470
+
471
+ @ Override
472
+ public void run () {
473
+ for (int i = 0 ; i < count ; i ++) {
474
+ if (s .isUnsubscribed ()) {
475
+ break ;
476
+ }
477
+ Event e = new Event ();
478
+ e .source = i % groupCount ;
479
+ e .message = "Event-" + i ;
480
+ observer .onNext (e );
481
+ sentEventCounter .incrementAndGet ();
482
+ }
483
+ observer .onCompleted ();
484
+ }
485
+
486
+ }).start ();
487
+ return s ;
488
+ }
489
+
490
+ });
491
+
492
+ es .groupBy (new Func1 <Event , Integer >() {
493
+
494
+ @ Override
495
+ public Integer call (Event e ) {
496
+ return e .source ;
497
+ }
498
+ })
499
+ .take (1 ) // we want only the first group
500
+ .mapMany (new Func1 <GroupedObservable <Integer , Event >, Observable <String >>() {
501
+
502
+ @ Override
503
+ public Observable <String > call (GroupedObservable <Integer , Event > eventGroupedObservable ) {
504
+ System .out .println ("GroupedObservable Key: " + eventGroupedObservable .getKey ());
505
+ groupCounter .incrementAndGet ();
506
+
507
+ return eventGroupedObservable
508
+ .take (20 ) // limit to only 20 events on this group
509
+ .map (new Func1 <Event , String >() {
510
+
511
+ @ Override
512
+ public String call (Event event ) {
513
+ return "Source: " + event .source + " Message: " + event .message ;
514
+ }
515
+ });
516
+
517
+ };
518
+ }).subscribe (new Observer <String >() {
519
+
520
+ @ Override
521
+ public void onCompleted () {
522
+ latch .countDown ();
523
+ }
524
+
525
+ @ Override
526
+ public void onError (Exception e ) {
527
+ e .printStackTrace ();
528
+ latch .countDown ();
529
+ }
530
+
531
+ @ Override
532
+ public void onNext (String outputMessage ) {
533
+ System .out .println (outputMessage );
534
+ eventCounter .incrementAndGet ();
535
+ }
536
+ });
537
+
538
+ latch .await (5000 , TimeUnit .MILLISECONDS );
539
+ assertEquals (1 , subscribeCounter .get ());
540
+ assertEquals (1 , groupCounter .get ());
541
+ assertEquals (20 , eventCounter .get ());
542
+ // sentEvents will go until 'eventCounter' hits 20 and then unsubscribes
543
+ // which means it will also send (but ignore) the 19 events for the other group
544
+ // It will not however send all 100 events.
545
+ assertEquals (39 , sentEventCounter .get ());
546
+
547
+ }
548
+
347
549
private static class Event {
348
550
int source ;
349
551
String message ;
0 commit comments