Skip to content

Commit 49a4b2d

Browse files
author
jmhofer
committed
added variance to Func2, too
1 parent 8e45528 commit 49a4b2d

File tree

18 files changed

+64
-60
lines changed

18 files changed

+64
-60
lines changed

rxjava-contrib/rxjava-android/src/main/java/rx/android/concurrency/HandlerThreadScheduler.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
package rx.android.concurrency;
22

33
import android.os.Handler;
4+
45
import org.junit.Test;
56
import org.junit.runner.RunWith;
67
import org.mockito.ArgumentCaptor;
78
import org.robolectric.RobolectricTestRunner;
89
import org.robolectric.annotation.Config;
10+
911
import rx.Scheduler;
1012
import rx.Subscription;
1113
import rx.operators.SafeObservableSubscription;
@@ -39,7 +41,7 @@ public HandlerThreadScheduler(Handler handler) {
3941
* See {@link #schedule(Object, rx.util.functions.Func2, long, java.util.concurrent.TimeUnit)}
4042
*/
4143
@Override
42-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
44+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
4345
return schedule(state, action, 0L, TimeUnit.MILLISECONDS);
4446
}
4547

@@ -56,7 +58,7 @@ public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscr
5658
* @return A Subscription from which one can unsubscribe from.
5759
*/
5860
@Override
59-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
61+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
6062
final SafeObservableSubscription subscription = new SafeObservableSubscription();
6163
final Scheduler _scheduler = this;
6264
handler.postDelayed(new Runnable() {
@@ -76,6 +78,7 @@ public static final class UnitTest {
7678
public void shouldScheduleImmediateActionOnHandlerThread() {
7779
final Handler handler = mock(Handler.class);
7880
final Object state = new Object();
81+
@SuppressWarnings("unchecked")
7982
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);
8083

8184
Scheduler scheduler = new HandlerThreadScheduler(handler);
@@ -94,6 +97,7 @@ public void shouldScheduleImmediateActionOnHandlerThread() {
9497
public void shouldScheduleDelayedActionOnHandlerThread() {
9598
final Handler handler = mock(Handler.class);
9699
final Object state = new Object();
100+
@SuppressWarnings("unchecked")
97101
final Func2<Scheduler, Object, Subscription> action = mock(Func2.class);
98102

99103
Scheduler scheduler = new HandlerThreadScheduler(handler);

rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private SwingScheduler() {
5555
}
5656

5757
@Override
58-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
58+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
5959
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
6060
EventQueue.invokeLater(new Runnable() {
6161
@Override
@@ -75,7 +75,7 @@ public void call() {
7575
}
7676

7777
@Override
78-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
78+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
7979
final AtomicReference<Subscription> sub = new AtomicReference<Subscription>();
8080
long delay = unit.toMillis(dueTime);
8181
assertThatTheDelayIsValidForTheSwingTimer(delay);
@@ -113,7 +113,7 @@ public void call() {
113113
}
114114

115115
@Override
116-
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
116+
public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
117117
final AtomicReference<Timer> timer = new AtomicReference<Timer>();
118118

119119
final long delay = unit.toMillis(period);

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -918,7 +918,7 @@ public static <T> Observable<T> from(Future<T> future, long timeout, TimeUnit un
918918
* Observable
919919
* @return an Observable that emits the zipped results
920920
*/
921-
public static <R, T0, T1> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> reduceFunction) {
921+
public static <R, T0, T1> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Func2<? super T0, ? super T1, ? extends R> reduceFunction) {
922922
return create(OperationZip.zip(w0, w1, reduceFunction));
923923
}
924924

@@ -964,7 +964,7 @@ public Boolean call(T first, T second) {
964964
* @return an Observable that emits Booleans that indicate whether the corresponding items
965965
* emitted by the source Observables are equal
966966
*/
967-
public static <T> Observable<Boolean> sequenceEqual(Observable<T> first, Observable<T> second, Func2<T, T, Boolean> equality) {
967+
public static <T> Observable<Boolean> sequenceEqual(Observable<T> first, Observable<T> second, Func2<? super T, ? super T, Boolean> equality) {
968968
return zip(first, second, equality);
969969
}
970970

@@ -1041,7 +1041,7 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observabl
10411041
* The aggregation function used to combine the source observable values.
10421042
* @return An Observable that combines the source Observables with the given combine function
10431043
*/
1044-
public static <R, T0, T1> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<T0, T1, R> combineFunction) {
1044+
public static <R, T0, T1> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<? super T0, ? super T1, ? extends R> combineFunction) {
10451045
return create(OperationCombineLatest.combineLatest(w0, w1, combineFunction));
10461046
}
10471047

@@ -1606,7 +1606,7 @@ public Observable<T> onErrorReturn(Func1<? super Throwable, ? extends T> resumeF
16061606
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154(v%3Dvs.103).aspx">MSDN: Observable.Aggregate</a>
16071607
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
16081608
*/
1609-
public Observable<T> reduce(Func2<T, T, T> accumulator) {
1609+
public Observable<T> reduce(Func2<? super T, ? super T, ? extends T> accumulator) {
16101610
return create(OperationScan.scan(this, accumulator)).takeLast(1);
16111611
}
16121612

@@ -1663,7 +1663,7 @@ public ConnectableObservable<T> publish() {
16631663
*
16641664
* @see #reduce(Func2)
16651665
*/
1666-
public Observable<T> aggregate(Func2<T, T, T> accumulator) {
1666+
public Observable<T> aggregate(Func2<? super T, ? super T, ? extends T> accumulator) {
16671667
return reduce(accumulator);
16681668
}
16691669

@@ -1690,7 +1690,7 @@ public Observable<T> aggregate(Func2<T, T, T> accumulator) {
16901690
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154(v%3Dvs.103).aspx">MSDN: Observable.Aggregate</a>
16911691
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
16921692
*/
1693-
public <R> Observable<R> reduce(R initialValue, Func2<R, T, R> accumulator) {
1693+
public <R> Observable<R> reduce(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
16941694
return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1);
16951695
}
16961696

@@ -1701,7 +1701,7 @@ public <R> Observable<R> reduce(R initialValue, Func2<R, T, R> accumulator) {
17011701
*
17021702
* @see #reduce(Object, Func2)
17031703
*/
1704-
public <R> Observable<R> aggregate(R initialValue, Func2<R, T, R> accumulator) {
1704+
public <R> Observable<R> aggregate(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
17051705
return reduce(initialValue, accumulator);
17061706
}
17071707

@@ -1724,7 +1724,7 @@ public <R> Observable<R> aggregate(R initialValue, Func2<R, T, R> accumulator) {
17241724
* @return an Observable that emits the results of each call to the accumulator function
17251725
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
17261726
*/
1727-
public Observable<T> scan(Func2<T, T, T> accumulator) {
1727+
public Observable<T> scan(Func2<? super T, ? super T, ? extends T> accumulator) {
17281728
return create(OperationScan.scan(this, accumulator));
17291729
}
17301730

@@ -1785,7 +1785,7 @@ public Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
17851785
* @return an Observable that emits the results of each call to the accumulator function
17861786
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
17871787
*/
1788-
public <R> Observable<R> scan(R initialValue, Func2<R, T, R> accumulator) {
1788+
public <R> Observable<R> scan(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
17891789
return create(OperationScan.scan(this, initialValue, accumulator));
17901790
}
17911791

@@ -1870,7 +1870,7 @@ public Observable<T> takeWhile(final Func1<? super T, Boolean> predicate) {
18701870
* @return an Observable that emits items from the source Observable so long as the predicate
18711871
* continues to return <code>true</code> for each item, then completes
18721872
*/
1873-
public Observable<T> takeWhileWithIndex(final Func2<T, Integer, Boolean> predicate) {
1873+
public Observable<T> takeWhileWithIndex(final Func2<? super T, ? super Integer, Boolean> predicate) {
18741874
return create(OperationTakeWhile.takeWhileWithIndex(this, predicate));
18751875
}
18761876

@@ -1956,7 +1956,7 @@ public Observable<List<T>> toSortedList() {
19561956
* an Integer that indicates their sort order
19571957
* @return an Observable that emits the items from the source Observable in sorted order
19581958
*/
1959-
public Observable<List<T>> toSortedList(Func2<T, T, Integer> sortFunction) {
1959+
public Observable<List<T>> toSortedList(Func2<? super T, ? super T, Integer> sortFunction) {
19601960
return create(OperationToObservableSortedList.toSortedList(this, sortFunction));
19611961
}
19621962

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ public abstract class Scheduler {
6262
* Action to schedule.
6363
* @return a subscription to be able to unsubscribe from action.
6464
*/
65-
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action);
65+
public abstract <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action);
6666

6767
/**
6868
* Schedules a cancelable action to be executed in delayTime.
@@ -77,7 +77,7 @@ public abstract class Scheduler {
7777
* Time unit of the delay time.
7878
* @return a subscription to be able to unsubscribe from action.
7979
*/
80-
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit);
80+
public abstract <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit);
8181

8282
/**
8383
* Schedules a cancelable action to be executed periodically.
@@ -96,7 +96,7 @@ public abstract class Scheduler {
9696
* The time unit the interval above is given in.
9797
* @return A subscription to be able to unsubscribe from action.
9898
*/
99-
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
99+
public <T> Subscription schedulePeriodically(T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
100100
final long periodInNanos = unit.toNanos(period);
101101
final AtomicBoolean complete = new AtomicBoolean();
102102

@@ -140,7 +140,7 @@ public void call() {
140140
* Time the action is to be executed. If in the past it will be executed immediately.
141141
* @return a subscription to be able to unsubscribe from action.
142142
*/
143-
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, Date dueTime) {
143+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, Date dueTime) {
144144
long scheduledTime = dueTime.getTime();
145145
long timeInFuture = scheduledTime - now();
146146
if (timeInFuture <= 0) {
@@ -162,7 +162,7 @@ public Subscription schedule(final Action0 action) {
162162
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
163163

164164
@Override
165-
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
165+
public Subscription call(Scheduler scheduler, Void state) {
166166
action.call();
167167
return Subscriptions.empty();
168168
}
@@ -180,7 +180,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
180180
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
181181

182182
@Override
183-
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
183+
public Subscription call(Scheduler scheduler, Void state) {
184184
action.call();
185185
return Subscriptions.empty();
186186
}
@@ -204,7 +204,7 @@ public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @Suppr
204204
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
205205
return schedulePeriodically(null, new Func2<Scheduler, Void, Subscription>() {
206206
@Override
207-
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
207+
public Subscription call(Scheduler scheduler, Void state) {
208208
action.call();
209209
return Subscriptions.empty();
210210
}

rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@ private CurrentThreadScheduler() {
4747
private final AtomicInteger counter = new AtomicInteger(0);
4848

4949
@Override
50-
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
50+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
5151
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
5252
enqueue(discardableAction, now());
5353
return discardableAction;
5454
}
5555

5656
@Override
57-
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
57+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
5858
long execTime = now() + unit.toMillis(dueTime);
5959

6060
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, new SleepingAction<T>(action, this, execTime));

rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,13 @@
2727
* Combines standard {@link Subscription#unsubscribe()} functionality with ability to skip execution if an unsubscribe occurs before the {@link #call()} method is invoked.
2828
*/
2929
/* package */class DiscardableAction<T> implements Func1<Scheduler, Subscription>, Subscription {
30-
private final Func2<Scheduler, T, Subscription> underlying;
30+
private final Func2<? super Scheduler, ? super T, ? extends Subscription> underlying;
3131
private final T state;
3232

3333
private final SafeObservableSubscription wrapper = new SafeObservableSubscription();
3434
private final AtomicBoolean ready = new AtomicBoolean(true);
3535

36-
public DiscardableAction(T state, Func2<Scheduler, T, Subscription> underlying) {
36+
public DiscardableAction(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> underlying) {
3737
this.state = state;
3838
this.underlying = underlying;
3939
}

rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public ExecutorScheduler(ScheduledExecutorService executor) {
4545
}
4646

4747
@Override
48-
public <T> Subscription schedulePeriodically(final T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
48+
public <T> Subscription schedulePeriodically(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long initialDelay, long period, TimeUnit unit) {
4949
if (executor instanceof ScheduledExecutorService) {
5050
final CompositeSubscription subscriptions = new CompositeSubscription();
5151

@@ -66,7 +66,7 @@ public void run() {
6666
}
6767

6868
@Override
69-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
69+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
7070
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
7171
final Scheduler _scheduler = this;
7272
// all subscriptions that may need to be unsubscribed
@@ -113,7 +113,7 @@ public void run() {
113113
}
114114

115115
@Override
116-
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
116+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
117117
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
118118
final Scheduler _scheduler = this;
119119
// all subscriptions that may need to be unsubscribed

rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@ private ImmediateScheduler() {
4141
}
4242

4343
@Override
44-
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
44+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
4545
return action.call(this, state);
4646
}
4747

4848
@Override
49-
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
49+
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long dueTime, TimeUnit unit) {
5050
// since we are executing immediately on this thread we must cause this thread to sleep
5151
long execTime = now() + unit.toMillis(dueTime);
5252

rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public static NewThreadScheduler getInstance() {
3636
}
3737

3838
@Override
39-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
39+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action) {
4040
final SafeObservableSubscription subscription = new SafeObservableSubscription();
4141
final Scheduler _scheduler = this;
4242

@@ -53,7 +53,7 @@ public void run() {
5353
}
5454

5555
@Override
56-
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long delay, TimeUnit unit) {
56+
public <T> Subscription schedule(final T state, final Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delay, TimeUnit unit) {
5757
// we will use the system scheduler since it doesn't make sense to launch a new Thread and then sleep
5858
// we will instead schedule the event then launch the thread after the delay has passed
5959
final Scheduler _scheduler = this;

rxjava-core/src/main/java/rx/concurrency/SleepingAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@
2020
import rx.util.functions.Func2;
2121

2222
/* package */class SleepingAction<T> implements Func2<Scheduler, T, Subscription> {
23-
private final Func2<Scheduler, T, Subscription> underlying;
23+
private final Func2<? super Scheduler, ? super T, ? extends Subscription> underlying;
2424
private final Scheduler scheduler;
2525
private final long execTime;
2626

27-
public SleepingAction(Func2<Scheduler, T, Subscription> underlying, Scheduler scheduler, long execTime) {
27+
public SleepingAction(Func2<? super Scheduler, ? super T, ? extends Subscription> underlying, Scheduler scheduler, long execTime) {
2828
this.underlying = underlying;
2929
this.scheduler = scheduler;
3030
this.execTime = execTime;

0 commit comments

Comments
 (0)