Skip to content

Commit be2f4a5

Browse files
Merge branch 'throttleFirst' into throttle
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents 1348505 + 7d13094 commit be2f4a5

File tree

3 files changed

+260
-26
lines changed

3 files changed

+260
-26
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import rx.operators.OperationTakeUntil;
6666
import rx.operators.OperationTakeWhile;
6767
import rx.operators.OperationThrottle;
68+
import rx.operators.OperationThrottleFirst;
6869
import rx.operators.OperationThrottleWithTimeout;
6970
import rx.operators.OperationThrottleLast;
7071
import rx.operators.OperationTimestamp;
@@ -1845,6 +1846,34 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler
18451846
return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit, scheduler));
18461847
}
18471848

1849+
/**
1850+
* Throttles to first value in each window.
1851+
*
1852+
* @param windowDuration
1853+
* Duration of windows within with the first value will be chosen.
1854+
* @param unit
1855+
* The unit of time for the specified timeout.
1856+
* @return Observable which performs the throttle operation.
1857+
*/
1858+
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
1859+
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit));
1860+
}
1861+
1862+
/**
1863+
* Throttles to first value in each window.
1864+
*
1865+
* @param windowDuration
1866+
* Duration of windows within with the first value will be chosen.
1867+
* @param unit
1868+
* The unit of time for the specified timeout.
1869+
* @param scheduler
1870+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1871+
* @return Observable which performs the throttle operation.
1872+
*/
1873+
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit, Scheduler scheduler) {
1874+
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit, scheduler));
1875+
}
1876+
18481877

18491878
/**
18501879
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
@@ -2334,32 +2363,6 @@ public static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> combineLatest(Observ
23342363
}
23352364

23362365
/**
2337-
<<<<<<< HEAD
2338-
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
2339-
*
2340-
* @param timeout The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
2341-
* @param unit The {@link TimeUnit} for the timeout.
2342-
* @return An {@link Observable} which filters out values which are too quickly followed up with never values.
2343-
*/
2344-
public Observable<T> throttle(long timeout, TimeUnit unit) {
2345-
return create(OperationThrottle.throttle(this, timeout, unit));
2346-
}
2347-
2348-
/**
2349-
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
2350-
*
2351-
* @param timeout The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
2352-
* @param unit The {@link TimeUnit} for the timeout.
2353-
* @param scheduler The {@link Scheduler} to use when timing incoming values.
2354-
* @return An {@link Observable} which filters out values which are too quickly followed up with never values.
2355-
*/
2356-
public Observable<T> throttle(long timeout, TimeUnit unit, Scheduler scheduler) {
2357-
return create(OperationThrottle.throttle(this, timeout, unit, scheduler));
2358-
}
2359-
2360-
/**
2361-
=======
2362-
>>>>>>> throttleLast
23632366
* @see #combineLatest(Observable, Observable, Func2)
23642367
*/
23652368
public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.junit.Before;
24+
import org.junit.Test;
25+
import org.mockito.InOrder;
26+
27+
import rx.Observable;
28+
import rx.Observable.OnSubscribeFunc;
29+
import rx.Observer;
30+
import rx.Scheduler;
31+
import rx.Subscription;
32+
import rx.concurrency.Schedulers;
33+
import rx.concurrency.TestScheduler;
34+
import rx.subscriptions.Subscriptions;
35+
import rx.util.functions.Action0;
36+
import rx.util.functions.Func1;
37+
38+
/**
39+
* Throttle by windowing a stream and returning the first value in each window.
40+
*/
41+
public final class OperationThrottleFirst {
42+
43+
/**
44+
* Throttles to first value in each window.
45+
*
46+
* @param items
47+
* The {@link Observable} which is publishing events.
48+
* @param windowDuration
49+
* Duration of windows within with the first value will be chosen.
50+
* @param unit
51+
* The unit of time for the specified timeout.
52+
* @return A {@link Func1} which performs the throttle operation.
53+
*/
54+
public static <T> OnSubscribeFunc<T> throttleFirst(Observable<T> items, long windowDuration, TimeUnit unit) {
55+
return throttleFirst(items, windowDuration, unit, Schedulers.threadPoolForComputation());
56+
}
57+
58+
/**
59+
* Throttles to first value in each window.
60+
*
61+
* @param items
62+
* The {@link Observable} which is publishing events.
63+
* @param windowDuration
64+
* Duration of windows within with the first value will be chosen.
65+
* @param unit
66+
* The unit of time for the specified timeout.
67+
* @param scheduler
68+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
69+
* @return A {@link Func1} which performs the throttle operation.
70+
*/
71+
public static <T> OnSubscribeFunc<T> throttleFirst(final Observable<T> items, final long windowDuration, final TimeUnit unit, final Scheduler scheduler) {
72+
return new OnSubscribeFunc<T>() {
73+
@Override
74+
public Subscription onSubscribe(Observer<? super T> observer) {
75+
return items.window(windowDuration, unit, scheduler).flatMap(new Func1<Observable<T>, Observable<T>>() {
76+
77+
@Override
78+
public Observable<T> call(Observable<T> o) {
79+
return o.takeFirst();
80+
}
81+
}).subscribe(observer);
82+
}
83+
};
84+
}
85+
86+
public static class UnitTest {
87+
88+
private TestScheduler scheduler;
89+
private Observer<String> observer;
90+
91+
@Before
92+
@SuppressWarnings("unchecked")
93+
public void before() {
94+
scheduler = new TestScheduler();
95+
observer = mock(Observer.class);
96+
}
97+
98+
@Test
99+
public void testThrottlingWithCompleted() {
100+
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
101+
@Override
102+
public Subscription onSubscribe(Observer<? super String> observer) {
103+
publishNext(observer, 100, "one"); // publish as it's first
104+
publishNext(observer, 300, "two"); // skip as it's last within the first 400
105+
publishNext(observer, 900, "three"); // publish
106+
publishNext(observer, 905, "four"); // skip
107+
publishCompleted(observer, 1000); // Should be published as soon as the timeout expires.
108+
109+
return Subscriptions.empty();
110+
}
111+
});
112+
113+
Observable<String> sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler));
114+
sampled.subscribe(observer);
115+
116+
InOrder inOrder = inOrder(observer);
117+
118+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
119+
inOrder.verify(observer, times(1)).onNext("one");
120+
inOrder.verify(observer, times(0)).onNext("two");
121+
inOrder.verify(observer, times(1)).onNext("three");
122+
inOrder.verify(observer, times(0)).onNext("four");
123+
inOrder.verify(observer, times(1)).onCompleted();
124+
inOrder.verifyNoMoreInteractions();
125+
}
126+
127+
@Test
128+
public void testThrottlingWithError() {
129+
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
130+
@Override
131+
public Subscription onSubscribe(Observer<? super String> observer) {
132+
Exception error = new TestException();
133+
publishNext(observer, 100, "one"); // Should be published since it is first
134+
publishNext(observer, 200, "two"); // Should be skipped since onError will arrive before the timeout expires
135+
publishError(observer, 300, error); // Should be published as soon as the timeout expires.
136+
137+
return Subscriptions.empty();
138+
}
139+
});
140+
141+
Observable<String> sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler));
142+
sampled.subscribe(observer);
143+
144+
InOrder inOrder = inOrder(observer);
145+
146+
scheduler.advanceTimeTo(400, TimeUnit.MILLISECONDS);
147+
inOrder.verify(observer).onNext("one");
148+
inOrder.verify(observer).onError(any(TestException.class));
149+
inOrder.verifyNoMoreInteractions();
150+
}
151+
152+
private <T> void publishCompleted(final Observer<T> observer, long delay) {
153+
scheduler.schedule(new Action0() {
154+
@Override
155+
public void call() {
156+
observer.onCompleted();
157+
}
158+
}, delay, TimeUnit.MILLISECONDS);
159+
}
160+
161+
private <T> void publishError(final Observer<T> observer, long delay, final Exception error) {
162+
scheduler.schedule(new Action0() {
163+
@Override
164+
public void call() {
165+
observer.onError(error);
166+
}
167+
}, delay, TimeUnit.MILLISECONDS);
168+
}
169+
170+
private <T> void publishNext(final Observer<T> observer, long delay, final T value) {
171+
scheduler.schedule(new Action0() {
172+
@Override
173+
public void call() {
174+
observer.onNext(value);
175+
}
176+
}, delay, TimeUnit.MILLISECONDS);
177+
}
178+
179+
@SuppressWarnings("serial")
180+
private class TestException extends Exception {
181+
}
182+
183+
}
184+
185+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package rx;
2+
3+
import static org.mockito.Mockito.*;
4+
5+
import java.util.concurrent.TimeUnit;
6+
7+
import org.junit.Test;
8+
import org.mockito.InOrder;
9+
10+
import rx.concurrency.TestScheduler;
11+
import rx.subjects.PublishSubject;
12+
13+
public class ThrottleFirstTests {
14+
15+
@Test
16+
public void testThrottle() {
17+
@SuppressWarnings("unchecked")
18+
Observer<Integer> observer = mock(Observer.class);
19+
TestScheduler s = new TestScheduler();
20+
PublishSubject<Integer> o = PublishSubject.create();
21+
o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer);
22+
23+
// send events with simulated time increments
24+
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
25+
o.onNext(1); // deliver
26+
o.onNext(2); // skip
27+
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
28+
o.onNext(3); // deliver
29+
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
30+
o.onNext(4); // skip
31+
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
32+
o.onNext(5); // skip
33+
o.onNext(6); // skip
34+
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
35+
o.onNext(7); // deliver
36+
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
37+
o.onCompleted();
38+
39+
InOrder inOrder = inOrder(observer);
40+
inOrder.verify(observer).onNext(1);
41+
inOrder.verify(observer).onNext(3);
42+
inOrder.verify(observer).onNext(7);
43+
inOrder.verify(observer).onCompleted();
44+
inOrder.verifyNoMoreInteractions();
45+
}
46+
}

0 commit comments

Comments
 (0)