Skip to content

Commit e3d04e3

Browse files
Merge pull request #369 from benjchristensen/reduce-and-scan-covariance
Remove covariance of scan/reduce
2 parents 128e598 + 3501fe7 commit e3d04e3

File tree

4 files changed

+65
-31
lines changed

4 files changed

+65
-31
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3025,7 +3025,7 @@ public Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunction)
30253025
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154(v%3Dvs.103).aspx">MSDN: Observable.Aggregate</a>
30263026
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
30273027
*/
3028-
public Observable<T> reduce(Func2<? super T, ? super T, ? extends T> accumulator) {
3028+
public Observable<T> reduce(Func2<T, T, T> accumulator) {
30293029
return create(OperationScan.scan(this, accumulator)).takeLast(1);
30303030
}
30313031

@@ -3206,7 +3206,7 @@ public ConnectableObservable<T> publish() {
32063206
*
32073207
* @see #reduce(Func2)
32083208
*/
3209-
public Observable<T> aggregate(Func2<? super T, ? super T, ? extends T> accumulator) {
3209+
public Observable<T> aggregate(Func2<T, T, T> accumulator) {
32103210
return reduce(accumulator);
32113211
}
32123212

@@ -3233,7 +3233,7 @@ public Observable<T> aggregate(Func2<? super T, ? super T, ? extends T> accumula
32333233
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229154(v%3Dvs.103).aspx">MSDN: Observable.Aggregate</a>
32343234
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
32353235
*/
3236-
public <R> Observable<R> reduce(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
3236+
public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulator) {
32373237
return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1);
32383238
}
32393239

@@ -3244,7 +3244,7 @@ public <R> Observable<R> reduce(R initialValue, Func2<? super R, ? super T, ? ex
32443244
*
32453245
* @see #reduce(Object, Func2)
32463246
*/
3247-
public <R> Observable<R> aggregate(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
3247+
public <R> Observable<R> aggregate(R initialValue, Func2<R, ? super T, R> accumulator) {
32483248
return reduce(initialValue, accumulator);
32493249
}
32503250

@@ -3267,7 +3267,7 @@ public <R> Observable<R> aggregate(R initialValue, Func2<? super R, ? super T, ?
32673267
* @return an Observable that emits the results of each call to the accumulator function
32683268
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
32693269
*/
3270-
public Observable<T> scan(Func2<? super T, ? super T, ? extends T> accumulator) {
3270+
public Observable<T> scan(Func2<T, T, T> accumulator) {
32713271
return create(OperationScan.scan(this, accumulator));
32723272
}
32733273

@@ -3328,7 +3328,7 @@ public Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
33283328
* @return an Observable that emits the results of each call to the accumulator function
33293329
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
33303330
*/
3331-
public <R> Observable<R> scan(R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
3331+
public <R> Observable<R> scan(R initialValue, Func2<R, ? super T, R> accumulator) {
33323332
return create(OperationScan.scan(this, initialValue, accumulator));
33333333
}
33343334

rxjava-core/src/main/java/rx/operators/OperationScan.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public final class OperationScan {
5555
* @return An observable sequence whose elements are the result of accumulating the output from the list of Observables.
5656
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212007%28v=vs.103%29.aspx">Observable.Scan(TSource, TAccumulate) Method (IObservable(TSource), TAccumulate, Func(TAccumulate, TSource, TAccumulate))</a>
5757
*/
58-
public static <T, R> OnSubscribeFunc<R> scan(Observable<? extends T> sequence, R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
58+
public static <T, R> OnSubscribeFunc<R> scan(Observable<? extends T> sequence, R initialValue, Func2<R, ? super T, R> accumulator) {
5959
return new Accumulator<T, R>(sequence, initialValue, accumulator);
6060
}
6161

@@ -70,17 +70,17 @@ public static <T, R> OnSubscribeFunc<R> scan(Observable<? extends T> sequence, R
7070
* @return An observable sequence whose elements are the result of accumulating the output from the list of Observables.
7171
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx">Observable.Scan(TSource) Method (IObservable(TSource), Func(TSource, TSource, TSource))</a>
7272
*/
73-
public static <T> OnSubscribeFunc<T> scan(Observable<? extends T> sequence, Func2<? super T, ? super T, ? extends T> accumulator) {
73+
public static <T> OnSubscribeFunc<T> scan(Observable<? extends T> sequence, Func2<T, T, T> accumulator) {
7474
return new AccuWithoutInitialValue<T>(sequence, accumulator);
7575
}
7676

7777
private static class AccuWithoutInitialValue<T> implements OnSubscribeFunc<T> {
7878
private final Observable<? extends T> sequence;
79-
private final Func2<? super T, ? super T, ? extends T> accumulatorFunction;
79+
private final Func2<T, T, T> accumulatorFunction;
8080

8181
private AccumulatingObserver<T, T> accumulatingObserver;
8282

83-
private AccuWithoutInitialValue(Observable<? extends T> sequence, Func2<? super T, ? super T, ? extends T> accumulator) {
83+
private AccuWithoutInitialValue(Observable<? extends T> sequence, Func2<T, T, T> accumulator) {
8484
this.sequence = sequence;
8585
this.accumulatorFunction = accumulator;
8686
}
@@ -116,9 +116,9 @@ public void onCompleted() {
116116
private static class Accumulator<T, R> implements OnSubscribeFunc<R> {
117117
private final Observable<? extends T> sequence;
118118
private final R initialValue;
119-
private final Func2<? super R, ? super T, ? extends R> accumulatorFunction;
119+
private final Func2<R, ? super T, R> accumulatorFunction;
120120

121-
private Accumulator(Observable<? extends T> sequence, R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
121+
private Accumulator(Observable<? extends T> sequence, R initialValue, Func2<R, ? super T, R> accumulator) {
122122
this.sequence = sequence;
123123
this.initialValue = initialValue;
124124
this.accumulatorFunction = accumulator;
@@ -133,11 +133,11 @@ public Subscription onSubscribe(final Observer<? super R> observer) {
133133

134134
private static class AccumulatingObserver<T, R> implements Observer<T> {
135135
private final Observer<? super R> observer;
136-
private final Func2<? super R, ? super T, ? extends R> accumulatorFunction;
136+
private final Func2<R, ? super T, R> accumulatorFunction;
137137

138138
private R acc;
139139

140-
private AccumulatingObserver(Observer<? super R> observer, R initialValue, Func2<? super R, ? super T, ? extends R> accumulator) {
140+
private AccumulatingObserver(Observer<? super R> observer, R initialValue, Func2<R, ? super T, R> accumulator) {
141141
this.observer = observer;
142142
this.accumulatorFunction = accumulator;
143143

rxjava-core/src/test/java/rx/CovarianceTest.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
import org.junit.Test;
66

7+
import rx.util.functions.Func2;
8+
79
/**
810
* Test super/extends of generics.
911
*
@@ -21,6 +23,25 @@ public void testCovarianceOfFrom() {
2123
// Observable.<HorrorMovie>from(new Movie()); // may not compile
2224
}
2325

26+
@Test
27+
public void testSortedList() {
28+
Func2<Media, Media, Integer> SORT_FUNCTION = new Func2<Media, Media, Integer>() {
29+
30+
@Override
31+
public Integer call(Media t1, Media t2) {
32+
return 1;
33+
}
34+
};
35+
36+
// this one would work without the covariance generics
37+
Observable<Media> o = Observable.from(new Movie(), new TVSeason(), new Album());
38+
o.toSortedList(SORT_FUNCTION);
39+
40+
// this one would NOT work without the covariance generics
41+
Observable<Movie> o2 = Observable.from(new Movie(), new ActionMovie(), new HorrorMovie());
42+
o2.toSortedList(SORT_FUNCTION);
43+
}
44+
2445
/*
2546
* Most tests are moved into their applicable classes such as [Operator]Tests.java
2647
*/
@@ -34,6 +55,15 @@ static class Movie extends Media {
3455
static class HorrorMovie extends Movie {
3556
}
3657

58+
static class ActionMovie extends Movie {
59+
}
60+
61+
static class Album extends Media {
62+
}
63+
64+
static class TVSeason extends Media {
65+
}
66+
3767
static class Rating {
3868
}
3969

rxjava-core/src/test/java/rx/ReduceTests.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ public Integer call(Integer t1, Integer t2) {
2525
assertEquals(6, value);
2626
}
2727

28+
@SuppressWarnings("unused")
2829
@Test
2930
public void reduceWithObjects() {
3031
Observable<Movie> horrorMovies = Observable.<Movie> from(new HorrorMovie());
@@ -41,9 +42,15 @@ public Movie call(Movie t1, Movie t2) {
4142
Observable<Movie> reduceResult2 = horrorMovies.reduce(chooseSecondMovie);
4243
}
4344

45+
/**
46+
* Reduce consumes and produces T so can't do covariance.
47+
*
48+
* https://github.com/Netflix/RxJava/issues/360#issuecomment-24203016
49+
*/
50+
@SuppressWarnings("unused")
4451
@Test
4552
public void reduceWithCovariantObjects() {
46-
Observable<HorrorMovie> horrorMovies = Observable.from(new HorrorMovie());
53+
Observable<Movie> horrorMovies = Observable.<Movie> from(new HorrorMovie());
4754

4855
Func2<Movie, Movie, Movie> chooseSecondMovie =
4956
new Func2<Movie, Movie, Movie>() {
@@ -52,36 +59,33 @@ public Movie call(Movie t1, Movie t2) {
5259
}
5360
};
5461

55-
Observable<Movie> reduceResult = Observable.create(OperationScan.scan(horrorMovies, chooseSecondMovie)).takeLast(1);
56-
57-
//TODO this isn't compiling
58-
// Observable<Movie> reduceResult2 = horrorMovies.reduce(chooseSecondMovie);
62+
Observable<Movie> reduceResult2 = horrorMovies.reduce(chooseSecondMovie);
5963
}
6064

65+
/**
66+
* Reduce consumes and produces T so can't do covariance.
67+
*
68+
* https://github.com/Netflix/RxJava/issues/360#issuecomment-24203016
69+
*/
6170
@Test
6271
public void reduceCovariance() {
63-
Observable<HorrorMovie> horrorMovies = Observable.from(new HorrorMovie());
64-
65-
// do something with horrorMovies, relying on the fact that all are HorrorMovies
66-
// and not just any Movies...
67-
68-
// pass it to library (works because it takes Observable<? extends Movie>)
72+
// must type it to <Movie>
73+
Observable<Movie> horrorMovies = Observable.<Movie> from(new HorrorMovie());
6974
libraryFunctionActingOnMovieObservables(horrorMovies);
7075
}
7176

72-
public void libraryFunctionActingOnMovieObservables(Observable<? extends Movie> obs) {
77+
/*
78+
* This accepts <Movie> instead of <? super Movie> since `reduce` can't handle covariants
79+
*/
80+
public void libraryFunctionActingOnMovieObservables(Observable<Movie> obs) {
7381
Func2<Movie, Movie, Movie> chooseSecondMovie =
7482
new Func2<Movie, Movie, Movie>() {
7583
public Movie call(Movie t1, Movie t2) {
7684
return t2;
7785
}
7886
};
7987

80-
Observable<Movie> reduceResult = Observable.create(OperationScan.scan(obs, chooseSecondMovie)).takeLast(1);
81-
82-
//TODO this isn't compiling
83-
// Observable<Movie> reduceResult2 = obs.reduce(chooseSecondMovie);
84-
// do something with reduceResult...
88+
obs.reduce(chooseSecondMovie);
8589
}
8690

8791
}

0 commit comments

Comments
 (0)