Skip to content

Commit accdd96

Browse files
Operators: throttleWithTimeout, throttleFirst, throttleLast
- javadocs explaining differences - link between throttleLast and sample (aliase) - refactored throttleFirst to be a more efficient implementations - concurrency changes to throttleWithTimeout
1 parent be2f4a5 commit accdd96

File tree

6 files changed

+79
-541
lines changed

6 files changed

+79
-541
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
*/
1616
package rx;
1717

18-
import static rx.util.functions.Functions.not;
18+
import static rx.util.functions.Functions.*;
1919

2020
import java.util.ArrayList;
2121
import java.util.Arrays;
2222
import java.util.Collection;
23-
import java.util.Collections;
2423
import java.util.List;
2524
import java.util.concurrent.Future;
2625
import java.util.concurrent.TimeUnit;
@@ -64,10 +63,8 @@
6463
import rx.operators.OperationTakeLast;
6564
import rx.operators.OperationTakeUntil;
6665
import rx.operators.OperationTakeWhile;
67-
import rx.operators.OperationThrottle;
6866
import rx.operators.OperationThrottleFirst;
6967
import rx.operators.OperationThrottleWithTimeout;
70-
import rx.operators.OperationThrottleLast;
7168
import rx.operators.OperationTimestamp;
7269
import rx.operators.OperationToObservableFuture;
7370
import rx.operators.OperationToObservableIterable;
@@ -1814,9 +1811,9 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
18141811
}
18151812

18161813
/**
1817-
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer reset on each `onNext` call.
1814+
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
18181815
* <p>
1819-
* NOTE: If the timeout is set higher than the rate of traffic then this will drop all data.
1816+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
18201817
*
18211818
* @param timeout
18221819
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
@@ -1830,9 +1827,9 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
18301827
}
18311828

18321829
/**
1833-
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer reset on each `onNext` call.
1830+
* Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
18341831
* <p>
1835-
* NOTE: If the timeout is set higher than the rate of traffic then this will drop all data.
1832+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
18361833
*
18371834
* @param timeout
18381835
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
@@ -1847,64 +1844,70 @@ public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler
18471844
}
18481845

18491846
/**
1850-
* Throttles to first value in each window.
1847+
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
1848+
* <p>
1849+
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
18511850
*
1852-
* @param windowDuration
1853-
* Duration of windows within with the first value will be chosen.
1851+
* @param skipDuration
1852+
* Time to wait before sending another value after emitting last value.
18541853
* @param unit
18551854
* The unit of time for the specified timeout.
1855+
* @param scheduler
1856+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
18561857
* @return Observable which performs the throttle operation.
18571858
*/
18581859
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
18591860
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit));
18601861
}
18611862

18621863
/**
1863-
* Throttles to first value in each window.
1864+
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
1865+
* <p>
1866+
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
18641867
*
1865-
* @param windowDuration
1866-
* Duration of windows within with the first value will be chosen.
1868+
* @param skipDuration
1869+
* Time to wait before sending another value after emitting last value.
18671870
* @param unit
18681871
* The unit of time for the specified timeout.
18691872
* @param scheduler
18701873
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
18711874
* @return Observable which performs the throttle operation.
18721875
*/
1873-
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit, Scheduler scheduler) {
1874-
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit, scheduler));
1876+
public Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) {
1877+
return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler));
18751878
}
1876-
1877-
1879+
18781880
/**
1879-
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
1880-
*
1881-
* @param timeout
1882-
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1881+
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
1882+
* <p>
1883+
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
18831884
*
1885+
* @param intervalDuration
1886+
* Duration of windows within with the last value will be chosen.
18841887
* @param unit
1885-
* The {@link TimeUnit} for the timeout.
1886-
*
1887-
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1888+
* The unit of time for the specified interval.
1889+
* @return Observable which performs the throttle operation.
1890+
* @see {@link #sample(long, TimeUnit)}
18881891
*/
1889-
public Observable<T> throttleLast(long timeout, TimeUnit unit) {
1890-
return create(OperationThrottleLast.throttleLast(this, timeout, unit));
1892+
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
1893+
return sample(intervalDuration, unit);
18911894
}
18921895

18931896
/**
1894-
* Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired.
1897+
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
1898+
* <p>
1899+
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
18951900
*
1896-
* @param timeout
1897-
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1901+
* @param intervalDuration
1902+
* Duration of windows within with the last value will be chosen.
18981903
* @param unit
1899-
* The {@link TimeUnit} for the timeout.
1900-
* @param scheduler
1901-
* The {@link Scheduler} to use when timing incoming values.
1902-
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1904+
* The unit of time for the specified interval.
1905+
* @return Observable which performs the throttle operation.
1906+
* @see {@link #sample(long, TimeUnit, Scheduler)}
19031907
*/
1904-
public Observable<T> throttleLast(long timeout, TimeUnit unit, Scheduler scheduler) {
1905-
return create(OperationThrottleLast.throttleLast(this, timeout, unit, scheduler));
1908+
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) {
1909+
return sample(intervalDuration, unit, scheduler);
19061910
}
1907-
19081911

19091912
/**
19101913
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.

0 commit comments

Comments
 (0)