15
15
*/
16
16
package rx .concurrency ;
17
17
18
- import static org .junit .Assert .assertTrue ;
18
+ import static org .junit .Assert .* ;
19
19
import static org .mockito .Mockito .*;
20
20
21
21
import java .awt .EventQueue ;
22
22
import java .awt .event .ActionEvent ;
23
23
import java .awt .event .ActionListener ;
24
+ import java .util .concurrent .CountDownLatch ;
24
25
import java .util .concurrent .TimeUnit ;
25
26
import java .util .concurrent .atomic .AtomicReference ;
26
27
41
42
import rx .util .functions .Func2 ;
42
43
43
44
/**
44
- * Executes work on the Swing UI thread.
45
+ * Executes work on the Swing UI thread.
45
46
* This scheduler should only be used with actions that execute quickly.
46
47
*/
47
48
public final class SwingScheduler extends Scheduler {
@@ -77,33 +78,33 @@ public void call() {
77
78
@ Override
78
79
public <T > Subscription schedule (final T state , final Func2 <Scheduler , T , Subscription > action , long dueTime , TimeUnit unit ) {
79
80
final AtomicReference <Subscription > sub = new AtomicReference <Subscription >();
80
- long delay = unit .toMillis (dueTime );
81
+ long delay = unit .toMillis (dueTime );
81
82
assertThatTheDelayIsValidForTheSwingTimer (delay );
82
-
83
+
83
84
class ExecuteOnceAction implements ActionListener {
84
85
private Timer timer ;
85
-
86
+
86
87
private void setTimer (Timer timer ) {
87
88
this .timer = timer ;
88
89
}
89
-
90
+
90
91
@ Override
91
92
public void actionPerformed (ActionEvent e ) {
92
93
timer .stop ();
93
94
sub .set (action .call (SwingScheduler .this , state ));
94
95
}
95
96
}
96
-
97
+
97
98
ExecuteOnceAction executeOnce = new ExecuteOnceAction ();
98
99
final Timer timer = new Timer ((int ) delay , executeOnce );
99
100
executeOnce .setTimer (timer );
100
101
timer .start ();
101
-
102
+
102
103
return Subscriptions .create (new Action0 () {
103
104
@ Override
104
105
public void call () {
105
106
timer .stop ();
106
-
107
+
107
108
Subscription subscription = sub .get ();
108
109
if (subscription != null ) {
109
110
subscription .unsubscribe ();
@@ -115,28 +116,28 @@ public void call() {
115
116
@ Override
116
117
public <T > Subscription schedulePeriodically (T state , final Func2 <Scheduler , T , Subscription > action , long initialDelay , long period , TimeUnit unit ) {
117
118
final AtomicReference <Timer > timer = new AtomicReference <Timer >();
118
-
119
- final long delay = unit .toMillis (period );
119
+
120
+ final long delay = unit .toMillis (period );
120
121
assertThatTheDelayIsValidForTheSwingTimer (delay );
121
-
122
+
122
123
final CompositeSubscription subscriptions = new CompositeSubscription ();
123
124
final Func2 <Scheduler , T , Subscription > initialAction = new Func2 <Scheduler , T , Subscription >() {
124
- @ Override
125
- public Subscription call (final Scheduler scheduler , final T state0 ) {
126
- // start timer for periodic execution, collect subscriptions
127
- timer .set (new Timer ((int ) delay , new ActionListener () {
128
- @ Override
129
- public void actionPerformed (ActionEvent e ) {
130
- subscriptions .add (action .call (scheduler , state0 ));
131
- }
132
- }));
133
- timer .get ().start ();
134
-
135
- return action .call (scheduler , state0 );
136
- }
125
+ @ Override
126
+ public Subscription call (final Scheduler scheduler , final T state0 ) {
127
+ // start timer for periodic execution, collect subscriptions
128
+ timer .set (new Timer ((int ) delay , new ActionListener () {
129
+ @ Override
130
+ public void actionPerformed (ActionEvent e ) {
131
+ subscriptions .add (action .call (scheduler , state0 ));
132
+ }
133
+ }));
134
+ timer .get ().start ();
135
+
136
+ return action .call (scheduler , state0 );
137
+ }
137
138
};
138
139
subscriptions .add (schedule (state , initialAction , initialDelay , unit ));
139
-
140
+
140
141
subscriptions .add (Subscriptions .create (new Action0 () {
141
142
@ Override
142
143
public void call () {
@@ -147,7 +148,7 @@ public void call() {
147
148
}
148
149
}
149
150
}));
150
-
151
+
151
152
return subscriptions ;
152
153
}
153
154
@@ -156,11 +157,11 @@ private static void assertThatTheDelayIsValidForTheSwingTimer(long delay) {
156
157
throw new IllegalArgumentException (String .format ("The swing timer only accepts non-negative delays up to %d milliseconds." , Integer .MAX_VALUE ));
157
158
}
158
159
}
159
-
160
+
160
161
public static class UnitTest {
161
162
@ Rule
162
163
public ExpectedException exception = ExpectedException .none ();
163
-
164
+
164
165
@ Test
165
166
public void testInvalidDelayValues () {
166
167
final SwingScheduler scheduler = new SwingScheduler ();
@@ -174,34 +175,44 @@ public void testInvalidDelayValues() {
174
175
175
176
exception .expect (IllegalArgumentException .class );
176
177
scheduler .schedulePeriodically (action , 1L + Integer .MAX_VALUE , 100L , TimeUnit .MILLISECONDS );
177
-
178
+
178
179
exception .expect (IllegalArgumentException .class );
179
180
scheduler .schedulePeriodically (action , 100L , 1L + Integer .MAX_VALUE / 1000 , TimeUnit .SECONDS );
180
181
}
181
-
182
+
182
183
@ Test
183
184
public void testPeriodicScheduling () throws Exception {
184
185
final SwingScheduler scheduler = new SwingScheduler ();
185
186
187
+ final CountDownLatch latch = new CountDownLatch (4 );
188
+
186
189
final Action0 innerAction = mock (Action0 .class );
187
190
final Action0 unsubscribe = mock (Action0 .class );
188
191
final Func0 <Subscription > action = new Func0 <Subscription >() {
189
192
@ Override
190
193
public Subscription call () {
191
- innerAction .call ();
192
- assertTrue (SwingUtilities .isEventDispatchThread ());
193
- return Subscriptions .create (unsubscribe );
194
+ try {
195
+ innerAction .call ();
196
+ assertTrue (SwingUtilities .isEventDispatchThread ());
197
+ return Subscriptions .create (unsubscribe );
198
+ } finally {
199
+ latch .countDown ();
200
+ }
194
201
}
195
202
};
196
-
203
+
197
204
Subscription sub = scheduler .schedulePeriodically (action , 50 , 200 , TimeUnit .MILLISECONDS );
198
- Thread .sleep (840 );
205
+
206
+ if (!latch .await (5000 , TimeUnit .MILLISECONDS )) {
207
+ fail ("timed out waiting for tasks to execute" );
208
+ }
209
+
199
210
sub .unsubscribe ();
200
211
waitForEmptyEventQueue ();
201
212
verify (innerAction , times (4 )).call ();
202
213
verify (unsubscribe , times (4 )).call ();
203
214
}
204
-
215
+
205
216
@ Test
206
217
public void testNestedActions () throws Exception {
207
218
final SwingScheduler scheduler = new SwingScheduler ();
0 commit comments