Skip to content

Commit 1348505

Browse files
Merge branch 'throttleLast' into throttle
Conflicts: rxjava-core/src/main/java/rx/Observable.java rxjava-core/src/main/java/rx/operators/OperationThrottle.java
2 parents 3696ef6 + 6d9ec5c commit 1348505

File tree

3 files changed

+270
-0
lines changed

3 files changed

+270
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import rx.operators.OperationTakeWhile;
6767
import rx.operators.OperationThrottle;
6868
import rx.operators.OperationThrottleWithTimeout;
69+
import rx.operators.OperationThrottleLast;
6970
import rx.operators.OperationTimestamp;
7071
import rx.operators.OperationToObservableFuture;
7172
import rx.operators.OperationToObservableIterable;
@@ -1843,6 +1844,38 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
18431844
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
18441845
return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit, scheduler));
18451846
}
1847+
1848+
1849+
/**
1850+
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
1851+
*
1852+
* @param timeout
1853+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1854+
*
1855+
* @param unit
1856+
* The {@link TimeUnit} for the timeout.
1857+
*
1858+
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1859+
*/
1860+
public Observable<T> throttleLast(long timeout, TimeUnit unit) {
1861+
return create(OperationThrottleLast.throttleLast(this, timeout, unit));
1862+
}
1863+
1864+
/**
1865+
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
1866+
*
1867+
* @param timeout
1868+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1869+
* @param unit
1870+
* The {@link TimeUnit} for the timeout.
1871+
* @param scheduler
1872+
* The {@link Scheduler} to use when timing incoming values.
1873+
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1874+
*/
1875+
public Observable<T> throttleLast(long timeout, TimeUnit unit, Scheduler scheduler) {
1876+
return create(OperationThrottleLast.throttleLast(this, timeout, unit, scheduler));
1877+
}
1878+
18461879

18471880
/**
18481881
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.
@@ -2301,6 +2334,7 @@ public static <T1, T2, T3, T4, T5, T6, T7, R> Observable<R> combineLatest(Observ
23012334
}
23022335

23032336
/**
2337+
<<<<<<< HEAD
23042338
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
23052339
*
23062340
* @param timeout The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
@@ -2324,6 +2358,8 @@ public Observable<T> throttle(long timeout, TimeUnit unit, Scheduler scheduler)
23242358
}
23252359

23262360
/**
2361+
=======
2362+
>>>>>>> throttleLast
23272363
* @see #combineLatest(Observable, Observable, Func2)
23282364
*/
23292365
public static <T1, T2, T3, T4, T5, T6, T7, T8, R> Observable<R> combineLatest(Observable<? extends T1> o1, Observable<? extends T2> o2, Observable<? extends T3> o3, Observable<? extends T4> o4, Observable<? extends T5> o5, Observable<? extends T6> o6, Observable<? extends T7> o7, Observable<? extends T8> o8,
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.junit.Before;
24+
import org.junit.Test;
25+
import org.mockito.InOrder;
26+
27+
import rx.Observable;
28+
import rx.Observable.OnSubscribeFunc;
29+
import rx.Observer;
30+
import rx.Scheduler;
31+
import rx.Subscription;
32+
import rx.concurrency.Schedulers;
33+
import rx.concurrency.TestScheduler;
34+
import rx.subscriptions.Subscriptions;
35+
import rx.util.functions.Action0;
36+
import rx.util.functions.Func1;
37+
38+
/**
39+
* This operation is used to filter out bursts of events. This is done by ignoring the events from an observable which are too
40+
* quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published
41+
* as soon as the timeout expires.
42+
*/
43+
public final class OperationThrottleLast {
44+
45+
/**
46+
* This operation filters out events which are published too quickly in succession. This is done by dropping events which are
47+
* followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet)
48+
* the last received event is published.
49+
*
50+
* @param items
51+
* The {@link Observable} which is publishing events.
52+
* @param timeout
53+
* How long each event has to be the 'last event' before it gets published.
54+
* @param unit
55+
* The unit of time for the specified timeout.
56+
* @return A {@link Func1} which performs the throttle operation.
57+
*/
58+
public static <T> OnSubscribeFunc<T> throttleLast(Observable<T> items, long timeout, TimeUnit unit) {
59+
return throttleLast(items, timeout, unit, Schedulers.threadPoolForComputation());
60+
}
61+
62+
/**
63+
* This operation filters out events which are published too quickly in succession. This is done by dropping events which are
64+
* followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet)
65+
* the last received event is published.
66+
*
67+
* @param items
68+
* The {@link Observable} which is publishing events.
69+
* @param timeout
70+
* How long each event has to be the 'last event' before it gets published.
71+
* @param unit
72+
* The unit of time for the specified timeout.
73+
* @param scheduler
74+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
75+
* @return A {@link Func1} which performs the throttle operation.
76+
*/
77+
public static <T> OnSubscribeFunc<T> throttleLast(final Observable<T> items, final long timeout, final TimeUnit unit, final Scheduler scheduler) {
78+
return new OnSubscribeFunc<T>() {
79+
@Override
80+
public Subscription onSubscribe(Observer<? super T> observer) {
81+
return items.window(timeout, unit, scheduler).flatMap(new Func1<Observable<T>, Observable<T>>() {
82+
83+
@Override
84+
public Observable<T> call(Observable<T> o) {
85+
return o.takeLast(1);
86+
}
87+
}).subscribe(observer);
88+
}
89+
};
90+
}
91+
92+
public static class UnitTest {
93+
94+
private TestScheduler scheduler;
95+
private Observer<String> observer;
96+
97+
@Before
98+
@SuppressWarnings("unchecked")
99+
public void before() {
100+
scheduler = new TestScheduler();
101+
observer = mock(Observer.class);
102+
}
103+
104+
@Test
105+
public void testThrottlingWithCompleted() {
106+
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
107+
@Override
108+
public Subscription onSubscribe(Observer<? super String> observer) {
109+
publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires.
110+
publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires.
111+
publishNext(observer, 900, "three"); // Should be skipped since onCompleted will arrive before the timeout expires.
112+
publishCompleted(observer, 1000); // Should be published as soon as the timeout expires.
113+
114+
return Subscriptions.empty();
115+
}
116+
});
117+
118+
Observable<String> sampled = Observable.create(OperationThrottleLast.throttleLast(source, 400, TimeUnit.MILLISECONDS, scheduler));
119+
sampled.subscribe(observer);
120+
121+
InOrder inOrder = inOrder(observer);
122+
123+
scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS);
124+
inOrder.verify(observer, times(1)).onNext("two");
125+
inOrder.verify(observer, times(1)).onCompleted();
126+
inOrder.verifyNoMoreInteractions();
127+
}
128+
129+
@Test
130+
public void testThrottlingWithError() {
131+
Observable<String> source = Observable.create(new OnSubscribeFunc<String>() {
132+
@Override
133+
public Subscription onSubscribe(Observer<? super String> observer) {
134+
Exception error = new TestException();
135+
publishNext(observer, 100, "one"); // Should be published since "two" will arrive after the timeout expires.
136+
publishNext(observer, 600, "two"); // Should be skipped since onError will arrive before the timeout expires.
137+
publishError(observer, 700, error); // Should be published as soon as the timeout expires.
138+
139+
return Subscriptions.empty();
140+
}
141+
});
142+
143+
Observable<String> sampled = Observable.create(OperationThrottleLast.throttleLast(source, 400, TimeUnit.MILLISECONDS, scheduler));
144+
sampled.subscribe(observer);
145+
146+
InOrder inOrder = inOrder(observer);
147+
148+
scheduler.advanceTimeTo(400, TimeUnit.MILLISECONDS);
149+
inOrder.verify(observer).onNext("one");
150+
scheduler.advanceTimeTo(701, TimeUnit.MILLISECONDS);
151+
inOrder.verify(observer).onError(any(TestException.class));
152+
inOrder.verifyNoMoreInteractions();
153+
}
154+
155+
private <T> void publishCompleted(final Observer<T> observer, long delay) {
156+
scheduler.schedule(new Action0() {
157+
@Override
158+
public void call() {
159+
observer.onCompleted();
160+
}
161+
}, delay, TimeUnit.MILLISECONDS);
162+
}
163+
164+
private <T> void publishError(final Observer<T> observer, long delay, final Exception error) {
165+
scheduler.schedule(new Action0() {
166+
@Override
167+
public void call() {
168+
observer.onError(error);
169+
}
170+
}, delay, TimeUnit.MILLISECONDS);
171+
}
172+
173+
private <T> void publishNext(final Observer<T> observer, long delay, final T value) {
174+
scheduler.schedule(new Action0() {
175+
@Override
176+
public void call() {
177+
observer.onNext(value);
178+
}
179+
}, delay, TimeUnit.MILLISECONDS);
180+
}
181+
182+
@SuppressWarnings("serial")
183+
private class TestException extends Exception {
184+
}
185+
186+
}
187+
188+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package rx;
2+
3+
import static org.mockito.Mockito.*;
4+
5+
import java.util.concurrent.TimeUnit;
6+
7+
import org.junit.Test;
8+
import org.mockito.InOrder;
9+
10+
import rx.concurrency.TestScheduler;
11+
import rx.subjects.PublishSubject;
12+
13+
public class ThrottleLastTests {
14+
15+
@Test
16+
public void testThrottle() {
17+
@SuppressWarnings("unchecked")
18+
Observer<Integer> observer = mock(Observer.class);
19+
TestScheduler s = new TestScheduler();
20+
PublishSubject<Integer> o = PublishSubject.create();
21+
o.throttleLast(500, TimeUnit.MILLISECONDS, s).subscribe(observer);
22+
23+
// send events with simulated time increments
24+
s.advanceTimeTo(0, TimeUnit.MILLISECONDS);
25+
o.onNext(1); // skip
26+
o.onNext(2); // deliver
27+
s.advanceTimeTo(501, TimeUnit.MILLISECONDS);
28+
o.onNext(3); // skip
29+
s.advanceTimeTo(600, TimeUnit.MILLISECONDS);
30+
o.onNext(4); // skip
31+
s.advanceTimeTo(700, TimeUnit.MILLISECONDS);
32+
o.onNext(5); // skip
33+
o.onNext(6); // deliver
34+
s.advanceTimeTo(1001, TimeUnit.MILLISECONDS);
35+
o.onNext(7); // deliver
36+
s.advanceTimeTo(1501, TimeUnit.MILLISECONDS);
37+
o.onCompleted();
38+
39+
InOrder inOrder = inOrder(observer);
40+
inOrder.verify(observer).onNext(2);
41+
inOrder.verify(observer).onNext(6);
42+
inOrder.verify(observer).onNext(7);
43+
inOrder.verify(observer).onCompleted();
44+
inOrder.verifyNoMoreInteractions();
45+
}
46+
}

0 commit comments

Comments
 (0)