Skip to content

Commit 64f433b

Browse files
Merge pull request #2 from jmhofer/reactivate-core-tests
Reactivate core tests and combineLatest
2 parents 077b148 + e3e148a commit 64f433b

File tree

5 files changed

+41
-14
lines changed

5 files changed

+41
-14
lines changed

build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,7 @@ subprojects {
6363
}
6464
}
6565

66+
project(':rxjava-core') {
67+
sourceSets.test.java.srcDir 'src/test/java'
68+
}
69+

rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import rx.subscriptions.CompositeSubscription;
3939
import rx.subscriptions.Subscriptions;
4040
import rx.util.functions.Action0;
41-
import rx.util.functions.Func0;
4241
import rx.util.functions.Func2;
4342

4443
/**
@@ -187,14 +186,12 @@ public void testPeriodicScheduling() throws Exception {
187186
final CountDownLatch latch = new CountDownLatch(4);
188187

189188
final Action0 innerAction = mock(Action0.class);
190-
final Action0 unsubscribe = mock(Action0.class);
191-
final Func0<Subscription> action = new Func0<Subscription>() {
189+
final Action0 action = new Action0() {
192190
@Override
193-
public Subscription call() {
191+
public void call() {
194192
try {
195193
innerAction.call();
196194
assertTrue(SwingUtilities.isEventDispatchThread());
197-
return Subscriptions.create(unsubscribe);
198195
} finally {
199196
latch.countDown();
200197
}
@@ -210,7 +207,6 @@ public Subscription call() {
210207
sub.unsubscribe();
211208
waitForEmptyEventQueue();
212209
verify(innerAction, times(4)).call();
213-
verify(unsubscribe, times(4)).call();
214210
}
215211

216212
@Test

rxjava-contrib/rxjava-swing/src/main/java/rx/observables/SwingObservable.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import javax.swing.AbstractButton;
2727

2828
import rx.Observable;
29-
import static rx.Observable.filter;
3029
import rx.swing.sources.AbstractButtonSource;
3130
import rx.swing.sources.ComponentEventSource;
3231
import rx.swing.sources.KeyEventSource;
@@ -68,7 +67,7 @@ public static Observable<KeyEvent> fromKeyEvents(Component component) {
6867
* @return Observable of key events.
6968
*/
7069
public static Observable<KeyEvent> fromKeyEvents(Component component, final Set<Integer> keyCodes) {
71-
return filter(fromKeyEvents(component), new Func1<KeyEvent, Boolean>() {
70+
return fromKeyEvents(component).filter(new Func1<KeyEvent, Boolean>() {
7271
@Override
7372
public Boolean call(KeyEvent event) {
7473
return keyCodes.contains(event.getKeyCode());

rxjava-contrib/rxjava-swing/src/main/java/rx/swing/sources/KeyEventSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void call() {
8585
* @see SwingObservable.fromKeyEvents(Component, Set)
8686
*/
8787
public static Observable<Set<Integer>> currentlyPressedKeysOf(Component component) {
88-
return Observable.<KeyEvent, Set<Integer>>scan(fromKeyEventsOf(component), new HashSet<Integer>(), new Func2<Set<Integer>, KeyEvent, Set<Integer>>() {
88+
return fromKeyEventsOf(component).<Set<Integer>>scan(new HashSet<Integer>(), new Func2<Set<Integer>, KeyEvent, Set<Integer>>() {
8989
@Override
9090
public Set<Integer> call(Set<Integer> pressedKeys, KeyEvent event) {
9191
Set<Integer> afterEvent = new HashSet<Integer>(pressedKeys);

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,6 @@
1515
*/
1616
package rx;
1717

18-
import static org.junit.Assert.*;
19-
import static org.mockito.Matchers.*;
20-
import static org.mockito.Mockito.*;
21-
2218
import java.util.ArrayList;
2319
import java.util.Arrays;
2420
import java.util.Collection;
@@ -27,14 +23,14 @@
2723
import java.util.concurrent.Future;
2824
import java.util.concurrent.TimeUnit;
2925

30-
3126
import rx.concurrency.Schedulers;
3227
import rx.observables.BlockingObservable;
3328
import rx.observables.ConnectableObservable;
3429
import rx.observables.GroupedObservable;
3530
import rx.operators.OperationAll;
3631
import rx.operators.OperationBuffer;
3732
import rx.operators.OperationCache;
33+
import rx.operators.OperationCombineLatest;
3834
import rx.operators.OperationConcat;
3935
import rx.operators.OperationDefer;
4036
import rx.operators.OperationDematerialize;
@@ -1085,6 +1081,38 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observabl
10851081
return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction));
10861082
}
10871083

1084+
/**
1085+
* Combines the given observables, emitting an event containing an aggregation of the latest values of each of the source observables
1086+
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
1087+
* <p>
1088+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png">
1089+
*
1090+
* @param w0
1091+
* The first source observable.
1092+
* @param w1
1093+
* The second source observable.
1094+
* @param combineFunction
1095+
* The aggregation function used to combine the source observable values.
1096+
* @return An Observable that combines the source Observables with the given combine function
1097+
*/
1098+
public static <R, T0, T1> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineFunction) {
1099+
return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction));
1100+
}
1101+
1102+
/**
1103+
* @see #combineLatest(Observable, Observable, Func2)
1104+
*/
1105+
public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<T0, T1, T2, R> combineFunction) {
1106+
return create(OperationCombineLatest.combineLatest(w0, w1, w2, combineFunction));
1107+
}
1108+
1109+
/**
1110+
* @see #combineLatest(Observable, Observable, Func2)
1111+
*/
1112+
public static <R, T0, T1, T2, T3> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<T0, T1, T2, T3, R> combineFunction) {
1113+
return create(OperationCombineLatest.combineLatest(w0, w1, w2, w3, combineFunction));
1114+
}
1115+
10881116
/**
10891117
* Creates an Observable which produces buffers of collected values.
10901118
*

0 commit comments

Comments
 (0)