Skip to content

Commit a989cac

Browse files
committed
Operator GroupJoin v2
1 parent 7cc5baa commit a989cac

File tree

4 files changed

+800
-0
lines changed

4 files changed

+800
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import rx.operators.OperationFirstOrDefault;
5656
import rx.operators.OperationGroupBy;
5757
import rx.operators.OperationGroupByUntil;
58+
import rx.operators.OperationGroupJoin;
5859
import rx.operators.OperationInterval;
5960
import rx.operators.OperationJoin;
6061
import rx.operators.OperationJoinPatterns;
@@ -5099,6 +5100,27 @@ public <K> Observable<GroupedObservable<K, T>> groupBy(final Func1<? super T, ?
50995100
return create(OperationGroupBy.groupBy(this, keySelector));
51005101
}
51015102

5103+
/**
5104+
* Return an Observable which correlates two sequences when they overlap and groups the results.
5105+
*
5106+
* @param right the other Observable to correlate values of this observable to
5107+
* @param leftDuration function that returns an Observable which indicates the duration of
5108+
* the values of this Observable
5109+
* @param rightDuration function that returns an Observable which indicates the duration of
5110+
* the values of the right Observable
5111+
* @param resultSelector function that takes a left value, the right observable and returns the
5112+
* value to be emitted
5113+
* @return an Observable that emits grouped values based on overlapping durations from this and
5114+
* another Observable
5115+
*
5116+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244235.aspx">MSDN: Observable.GroupJoin</a>
5117+
*/
5118+
public <T2, D1, D2, R> Observable<R> groupJoin(Observable<T2> right, Func1<? super T, ? extends Observable<D1>> leftDuration,
5119+
Func1<? super T2, ? extends Observable<D2>> rightDuration,
5120+
Func2<? super T, ? super Observable<T2>, ? extends R> resultSelector) {
5121+
return create(new OperationGroupJoin<T, T2, D1, D2, R>(this, right, leftDuration, rightDuration, resultSelector));
5122+
}
5123+
51025124
/**
51035125
* Returns an {@link Observable} that emits <code>true</code> if the source
51045126
* {@link Observable} is empty, otherwise <code>false</code>.
Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import java.util.ArrayList;
19+
import java.util.HashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
import rx.Observable;
23+
import rx.Observable.OnSubscribeFunc;
24+
import rx.Observer;
25+
import rx.Subscription;
26+
import rx.subjects.PublishSubject;
27+
import rx.subjects.Subject;
28+
import rx.subscriptions.CompositeSubscription;
29+
import rx.subscriptions.RefCountSubscription;
30+
import rx.subscriptions.SerialSubscription;
31+
import rx.util.functions.Func1;
32+
import rx.util.functions.Func2;
33+
34+
/**
35+
* Corrrelates two sequences when they overlap and groups the results.
36+
*
37+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244235.aspx">MSDN: Observable.GroupJoin</a>
38+
*/
39+
public class OperationGroupJoin<T1, T2, D1, D2, R> implements OnSubscribeFunc<R> {
40+
protected final Observable<T1> left;
41+
protected final Observable<T2> right;
42+
protected final Func1<? super T1, ? extends Observable<D1>> leftDuration;
43+
protected final Func1<? super T2, ? extends Observable<D2>> rightDuration;
44+
protected final Func2<? super T1, ? super Observable<T2>, ? extends R> resultSelector;
45+
public OperationGroupJoin(
46+
Observable<T1> left,
47+
Observable<T2> right,
48+
Func1<? super T1, ? extends Observable<D1>> leftDuration,
49+
Func1<? super T2, ? extends Observable<D2>> rightDuration,
50+
Func2<? super T1, ? super Observable<T2>, ? extends R> resultSelector
51+
) {
52+
this.left = left;
53+
this.right = right;
54+
this.leftDuration = leftDuration;
55+
this.rightDuration = rightDuration;
56+
this.resultSelector = resultSelector;
57+
}
58+
@Override
59+
public Subscription onSubscribe(Observer<? super R> t1) {
60+
ResultManager ro = new ResultManager(t1);
61+
ro.init();
62+
return ro;
63+
}
64+
/** Manages sub-observers and subscriptions. */
65+
class ResultManager implements Subscription {
66+
final RefCountSubscription cancel;
67+
final Observer<? super R> observer;
68+
final CompositeSubscription group;
69+
final Object guard = new Object();
70+
int leftIds;
71+
int rightIds;
72+
final Map<Integer, Observer<T2>> leftMap = new HashMap<Integer, Observer<T2>>();
73+
final Map<Integer, T2> rightMap = new HashMap<Integer, T2>();
74+
boolean leftDone;
75+
boolean rightDone;
76+
public ResultManager(Observer<? super R> observer) {
77+
this.observer = observer;
78+
this.group = new CompositeSubscription();
79+
this.cancel = new RefCountSubscription(group);
80+
}
81+
public void init() {
82+
SerialSubscription s1 = new SerialSubscription();
83+
SerialSubscription s2 = new SerialSubscription();
84+
85+
group.add(s1);
86+
group.add(s2);
87+
88+
s1.setSubscription(left.subscribe(new LeftObserver(s1)));
89+
s2.setSubscription(right.subscribe(new RightObserver(s2)));
90+
91+
}
92+
93+
@Override
94+
public void unsubscribe() {
95+
cancel.unsubscribe();
96+
}
97+
void groupsOnCompleted() {
98+
List<Observer<T2>> list = new ArrayList<Observer<T2>>(leftMap.values());
99+
leftMap.clear();
100+
rightMap.clear();
101+
for (Observer<T2> o : list) {
102+
o.onCompleted();
103+
}
104+
}
105+
/** Observe the left source. */
106+
class LeftObserver implements Observer<T1> {
107+
final Subscription tosource;
108+
public LeftObserver(Subscription tosource) {
109+
this.tosource = tosource;
110+
}
111+
@Override
112+
public void onNext(T1 args) {
113+
try {
114+
int id;
115+
Subject<T2, T2> subj = PublishSubject.create();
116+
synchronized (guard) {
117+
id = leftIds++;
118+
leftMap.put(id, subj);
119+
}
120+
121+
Observable<T2> window = Observable.create(new WindowObservableFunc<T2>(subj, cancel));
122+
123+
Observable<D1> duration = leftDuration.call(args);
124+
125+
SerialSubscription sduration = new SerialSubscription();
126+
group.add(sduration);
127+
sduration.setSubscription(duration.subscribe(new LeftDurationObserver(id, sduration, subj)));
128+
129+
R result = resultSelector.call(args, window);
130+
131+
synchronized (guard) {
132+
observer.onNext(result);
133+
for (T2 t2 : rightMap.values()) {
134+
subj.onNext(t2);
135+
136+
}
137+
}
138+
} catch (Throwable t) {
139+
onError(t);
140+
}
141+
}
142+
143+
@Override
144+
public void onCompleted() {
145+
synchronized (guard) {
146+
leftDone = true;
147+
if (rightDone) {
148+
groupsOnCompleted();
149+
observer.onCompleted();
150+
cancel.unsubscribe();
151+
}
152+
}
153+
}
154+
155+
@Override
156+
public void onError(Throwable e) {
157+
synchronized (guard) {
158+
for (Observer<T2> o : leftMap.values()) {
159+
o.onError(e);
160+
}
161+
observer.onError(e);
162+
cancel.unsubscribe();
163+
}
164+
}
165+
166+
167+
}
168+
/** Observe the right source. */
169+
class RightObserver implements Observer<T2> {
170+
final Subscription tosource;
171+
public RightObserver(Subscription tosource) {
172+
this.tosource = tosource;
173+
}
174+
@Override
175+
public void onNext(T2 args) {
176+
try {
177+
int id;
178+
synchronized (guard) {
179+
id = rightIds++;
180+
rightMap.put(id, args);
181+
}
182+
Observable<D2> duration = rightDuration.call(args);
183+
184+
SerialSubscription sduration = new SerialSubscription();
185+
group.add(sduration);
186+
sduration.setSubscription(duration.subscribe(new RightDurationObserver(id, sduration)));
187+
188+
synchronized (guard) {
189+
for (Observer<T2> o : leftMap.values()) {
190+
o.onNext(args);
191+
}
192+
}
193+
} catch (Throwable t) {
194+
onError(t);
195+
}
196+
}
197+
198+
@Override
199+
public void onCompleted() {
200+
// tosource.unsubscribe();
201+
synchronized (guard) {
202+
rightDone = true;
203+
if (leftDone) {
204+
groupsOnCompleted();
205+
observer.onCompleted();
206+
cancel.unsubscribe();
207+
}
208+
}
209+
}
210+
211+
@Override
212+
public void onError(Throwable e) {
213+
synchronized (guard) {
214+
for (Observer<T2> o : leftMap.values()) {
215+
o.onError(e);
216+
}
217+
218+
observer.onError(e);
219+
cancel.unsubscribe();
220+
}
221+
}
222+
}
223+
/** Observe left duration and apply termination. */
224+
class LeftDurationObserver implements Observer<D1> {
225+
final int id;
226+
final Subscription sduration;
227+
final Observer<T2> gr;
228+
public LeftDurationObserver(int id, Subscription sduration, Observer<T2> gr) {
229+
this.id = id;
230+
this.sduration = sduration;
231+
this.gr = gr;
232+
}
233+
234+
@Override
235+
public void onCompleted() {
236+
synchronized (guard) {
237+
if (leftMap.remove(id) != null) {
238+
gr.onCompleted();
239+
}
240+
}
241+
group.remove(sduration);
242+
}
243+
244+
@Override
245+
public void onError(Throwable e) {
246+
synchronized (guard) {
247+
observer.onError(e);
248+
}
249+
cancel.unsubscribe();
250+
}
251+
252+
@Override
253+
public void onNext(D1 args) {
254+
onCompleted();
255+
}
256+
}
257+
/** Observe right duration and apply termination. */
258+
class RightDurationObserver implements Observer<D2> {
259+
final int id;
260+
final Subscription sduration;
261+
public RightDurationObserver(int id, Subscription sduration) {
262+
this.id = id;
263+
this.sduration = sduration;
264+
}
265+
266+
@Override
267+
public void onCompleted() {
268+
synchronized (guard) {
269+
rightMap.remove(id);
270+
}
271+
group.remove(sduration);
272+
}
273+
274+
@Override
275+
public void onError(Throwable e) {
276+
synchronized (guard) {
277+
observer.onError(e);
278+
}
279+
cancel.unsubscribe();
280+
}
281+
282+
@Override
283+
public void onNext(D2 args) {
284+
onCompleted();
285+
}
286+
}
287+
}
288+
/**
289+
* The reference-counted window observable.
290+
* Subscribes to the underlying Observable by using a reference-counted
291+
* subscription.
292+
*/
293+
static class WindowObservableFunc<T> implements OnSubscribeFunc<T> {
294+
final RefCountSubscription refCount;
295+
final Observable<T> underlying;
296+
public WindowObservableFunc(Observable<T> underlying, RefCountSubscription refCount) {
297+
this.refCount = refCount;
298+
this.underlying = underlying;
299+
}
300+
301+
@Override
302+
public Subscription onSubscribe(Observer<? super T> t1) {
303+
CompositeSubscription cs = new CompositeSubscription();
304+
cs.add(refCount.getSubscription());
305+
WindowObserver wo = new WindowObserver(t1, cs);
306+
cs.add(underlying.subscribe(wo));
307+
return cs;
308+
}
309+
/** Observe activities on the window. */
310+
class WindowObserver implements Observer<T> {
311+
final Observer<? super T> observer;
312+
final Subscription self;
313+
public WindowObserver(Observer<? super T> observer, Subscription self) {
314+
this.observer = observer;
315+
this.self = self;
316+
}
317+
@Override
318+
public void onNext(T args) {
319+
observer.onNext(args);
320+
}
321+
@Override
322+
public void onError(Throwable e) {
323+
observer.onError(e);
324+
self.unsubscribe();
325+
}
326+
@Override
327+
public void onCompleted() {
328+
observer.onCompleted();
329+
self.unsubscribe();
330+
}
331+
}
332+
}
333+
}

0 commit comments

Comments
 (0)