41
41
* quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published
42
42
* as soon as the timeout expires.
43
43
*/
44
- public final class OperationThrottleWithTimeout {
44
+ public final class OperationDebounce {
45
45
46
46
/**
47
47
* This operation filters out events which are published too quickly in succession. This is done by dropping events which are
@@ -56,8 +56,8 @@ public final class OperationThrottleWithTimeout {
56
56
* The unit of time for the specified timeout.
57
57
* @return A {@link Func1} which performs the throttle operation.
58
58
*/
59
- public static <T > OnSubscribeFunc <T > throttleWithTimeout (Observable <T > items , long timeout , TimeUnit unit ) {
60
- return throttleWithTimeout (items , timeout , unit , Schedulers .threadPoolForComputation ());
59
+ public static <T > OnSubscribeFunc <T > debounce (Observable <T > items , long timeout , TimeUnit unit ) {
60
+ return debounce (items , timeout , unit , Schedulers .threadPoolForComputation ());
61
61
}
62
62
63
63
/**
@@ -75,23 +75,23 @@ public static <T> OnSubscribeFunc<T> throttleWithTimeout(Observable<T> items, lo
75
75
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
76
76
* @return A {@link Func1} which performs the throttle operation.
77
77
*/
78
- public static <T > OnSubscribeFunc <T > throttleWithTimeout (final Observable <T > items , final long timeout , final TimeUnit unit , final Scheduler scheduler ) {
78
+ public static <T > OnSubscribeFunc <T > debounce (final Observable <T > items , final long timeout , final TimeUnit unit , final Scheduler scheduler ) {
79
79
return new OnSubscribeFunc <T >() {
80
80
@ Override
81
81
public Subscription onSubscribe (Observer <? super T > observer ) {
82
- return new Throttle <T >(items , timeout , unit , scheduler ).onSubscribe (observer );
82
+ return new Debounce <T >(items , timeout , unit , scheduler ).onSubscribe (observer );
83
83
}
84
84
};
85
85
}
86
86
87
- private static class Throttle <T > implements OnSubscribeFunc <T > {
87
+ private static class Debounce <T > implements OnSubscribeFunc <T > {
88
88
89
89
private final Observable <T > items ;
90
90
private final long timeout ;
91
91
private final TimeUnit unit ;
92
92
private final Scheduler scheduler ;
93
93
94
- public Throttle (Observable <T > items , long timeout , TimeUnit unit , Scheduler scheduler ) {
94
+ public Debounce (Observable <T > items , long timeout , TimeUnit unit , Scheduler scheduler ) {
95
95
this .items = items ;
96
96
this .timeout = timeout ;
97
97
this .unit = unit ;
@@ -100,11 +100,11 @@ public Throttle(Observable<T> items, long timeout, TimeUnit unit, Scheduler sche
100
100
101
101
@ Override
102
102
public Subscription onSubscribe (Observer <? super T > observer ) {
103
- return items .subscribe (new ThrottledObserver <T >(observer , timeout , unit , scheduler ));
103
+ return items .subscribe (new DebounceObserver <T >(observer , timeout , unit , scheduler ));
104
104
}
105
105
}
106
106
107
- private static class ThrottledObserver <T > implements Observer <T > {
107
+ private static class DebounceObserver <T > implements Observer <T > {
108
108
109
109
private final Observer <? super T > observer ;
110
110
private final long timeout ;
@@ -113,7 +113,7 @@ private static class ThrottledObserver<T> implements Observer<T> {
113
113
114
114
private final AtomicReference <Subscription > lastScheduledNotification = new AtomicReference <Subscription >();
115
115
116
- public ThrottledObserver (Observer <? super T > observer , long timeout , TimeUnit unit , Scheduler scheduler ) {
116
+ public DebounceObserver (Observer <? super T > observer , long timeout , TimeUnit unit , Scheduler scheduler ) {
117
117
// we need to synchronize the observer since the on* events can be coming from different
118
118
// threads and are thus non-deterministic and could be interleaved
119
119
this .observer = new SynchronizedObserver <T >(observer );
@@ -174,7 +174,7 @@ public void before() {
174
174
}
175
175
176
176
@ Test
177
- public void testThrottlingWithCompleted () {
177
+ public void testDebounceWithCompleted () {
178
178
Observable <String > source = Observable .create (new OnSubscribeFunc <String >() {
179
179
@ Override
180
180
public Subscription onSubscribe (Observer <? super String > observer ) {
@@ -187,7 +187,7 @@ public Subscription onSubscribe(Observer<? super String> observer) {
187
187
}
188
188
});
189
189
190
- Observable <String > sampled = Observable .create (OperationThrottleWithTimeout . throttleWithTimeout (source , 400 , TimeUnit .MILLISECONDS , scheduler ));
190
+ Observable <String > sampled = Observable .create (OperationDebounce . debounce (source , 400 , TimeUnit .MILLISECONDS , scheduler ));
191
191
sampled .subscribe (observer );
192
192
193
193
scheduler .advanceTimeTo (0 , TimeUnit .MILLISECONDS );
@@ -201,7 +201,38 @@ public Subscription onSubscribe(Observer<? super String> observer) {
201
201
}
202
202
203
203
@ Test
204
- public void testThrottlingWithError () {
204
+ public void testDebounceNeverEmits () {
205
+ Observable <String > source = Observable .create (new OnSubscribeFunc <String >() {
206
+ @ Override
207
+ public Subscription onSubscribe (Observer <? super String > observer ) {
208
+ // all should be skipped since they are happening faster than the 200ms timeout
209
+ publishNext (observer , 100 , "a" ); // Should be skipped
210
+ publishNext (observer , 200 , "b" ); // Should be skipped
211
+ publishNext (observer , 300 , "c" ); // Should be skipped
212
+ publishNext (observer , 400 , "d" ); // Should be skipped
213
+ publishNext (observer , 500 , "e" ); // Should be skipped
214
+ publishNext (observer , 600 , "f" ); // Should be skipped
215
+ publishNext (observer , 700 , "g" ); // Should be skipped
216
+ publishNext (observer , 800 , "h" ); // Should be skipped
217
+ publishCompleted (observer , 900 ); // Should be published as soon as the timeout expires.
218
+
219
+ return Subscriptions .empty ();
220
+ }
221
+ });
222
+
223
+ Observable <String > sampled = Observable .create (OperationDebounce .debounce (source , 200 , TimeUnit .MILLISECONDS , scheduler ));
224
+ sampled .subscribe (observer );
225
+
226
+ scheduler .advanceTimeTo (0 , TimeUnit .MILLISECONDS );
227
+ InOrder inOrder = inOrder (observer );
228
+ inOrder .verify (observer , times (0 )).onNext (anyString ());
229
+ scheduler .advanceTimeTo (1000 , TimeUnit .MILLISECONDS );
230
+ inOrder .verify (observer , times (1 )).onCompleted ();
231
+ inOrder .verifyNoMoreInteractions ();
232
+ }
233
+
234
+ @ Test
235
+ public void testDebounceWithError () {
205
236
Observable <String > source = Observable .create (new OnSubscribeFunc <String >() {
206
237
@ Override
207
238
public Subscription onSubscribe (Observer <? super String > observer ) {
@@ -214,7 +245,7 @@ public Subscription onSubscribe(Observer<? super String> observer) {
214
245
}
215
246
});
216
247
217
- Observable <String > sampled = Observable .create (OperationThrottleWithTimeout . throttleWithTimeout (source , 400 , TimeUnit .MILLISECONDS , scheduler ));
248
+ Observable <String > sampled = Observable .create (OperationDebounce . debounce (source , 400 , TimeUnit .MILLISECONDS , scheduler ));
218
249
sampled .subscribe (observer );
219
250
220
251
scheduler .advanceTimeTo (0 , TimeUnit .MILLISECONDS );
0 commit comments