Skip to content

Commit 0d6c8e3

Browse files
authored
2.x: distinctUntilChanged to store the selected key instead of the value (#4747)
* 2.x: distinctUntilChanged to store the selected key instead of the value * Fix null test and whitespaces
1 parent a779687 commit 0d6c8e3

File tree

9 files changed

+178
-117
lines changed

9 files changed

+178
-117
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import io.reactivex.internal.functions.*;
2626
import io.reactivex.internal.fuseable.ScalarCallable;
2727
import io.reactivex.internal.operators.flowable.*;
28-
import io.reactivex.internal.operators.observable.*;
28+
import io.reactivex.internal.operators.observable.ObservableFromPublisher;
2929
import io.reactivex.internal.schedulers.ImmediateThinScheduler;
3030
import io.reactivex.internal.subscribers.*;
3131
import io.reactivex.internal.util.*;
@@ -7266,7 +7266,7 @@ public final <K> Flowable<T> distinct(Function<? super T, K> keySelector,
72667266
@BackpressureSupport(BackpressureKind.FULL)
72677267
@SchedulerSupport(SchedulerSupport.NONE)
72687268
public final Flowable<T> distinctUntilChanged() {
7269-
return new FlowableDistinctUntilChanged<T>(this, Functions.equalsPredicate());
7269+
return distinctUntilChanged(Functions.identity());
72707270
}
72717271

72727272
/**
@@ -7294,7 +7294,7 @@ public final Flowable<T> distinctUntilChanged() {
72947294
@SchedulerSupport(SchedulerSupport.NONE)
72957295
public final <K> Flowable<T> distinctUntilChanged(Function<? super T, K> keySelector) {
72967296
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
7297-
return new FlowableDistinctUntilChanged<T>(this, Functions.equalsPredicate(keySelector));
7297+
return RxJavaPlugins.onAssembly(new FlowableDistinctUntilChanged<T, K>(this, keySelector, ObjectHelper.equalsPredicate()));
72987298
}
72997299

73007300
/**
@@ -7321,7 +7321,7 @@ public final <K> Flowable<T> distinctUntilChanged(Function<? super T, K> keySele
73217321
@SchedulerSupport(SchedulerSupport.NONE)
73227322
public final Flowable<T> distinctUntilChanged(BiPredicate<? super T, ? super T> comparer) {
73237323
ObjectHelper.requireNonNull(comparer, "comparer is null");
7324-
return RxJavaPlugins.onAssembly(new FlowableDistinctUntilChanged<T>(this, comparer));
7324+
return RxJavaPlugins.onAssembly(new FlowableDistinctUntilChanged<T, T>(this, Functions.<T>identity(), comparer));
73257325
}
73267326

73277327
/**

src/main/java/io/reactivex/Observable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6333,7 +6333,7 @@ public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Call
63336333
*/
63346334
@SchedulerSupport(SchedulerSupport.NONE)
63356335
public final Observable<T> distinctUntilChanged() {
6336-
return new ObservableDistinctUntilChanged<T>(this, Functions.equalsPredicate());
6336+
return distinctUntilChanged(Functions.identity());
63376337
}
63386338

63396339
/**
@@ -6357,7 +6357,7 @@ public final Observable<T> distinctUntilChanged() {
63576357
@SchedulerSupport(SchedulerSupport.NONE)
63586358
public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector) {
63596359
ObjectHelper.requireNonNull(keySelector, "keySelector is null");
6360-
return new ObservableDistinctUntilChanged<T>(this, Functions.equalsPredicate(keySelector));
6360+
return RxJavaPlugins.onAssembly(new ObservableDistinctUntilChanged<T, K>(this, keySelector, ObjectHelper.equalsPredicate()));
63616361
}
63626362

63636363
/**
@@ -6380,7 +6380,7 @@ public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySe
63806380
@SchedulerSupport(SchedulerSupport.NONE)
63816381
public final Observable<T> distinctUntilChanged(BiPredicate<? super T, ? super T> comparer) {
63826382
ObjectHelper.requireNonNull(comparer, "comparer is null");
6383-
return RxJavaPlugins.onAssembly(new ObservableDistinctUntilChanged<T>(this, comparer));
6383+
return RxJavaPlugins.onAssembly(new ObservableDistinctUntilChanged<T, T>(this, Functions.<T>identity(), comparer));
63846384
}
63856385

63866386
/**

src/main/java/io/reactivex/internal/functions/Functions.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -639,32 +639,6 @@ public static <T> Function<List<T>, List<T>> listSorter(final Comparator<? super
639639
return new ListSorter<T>(comparator);
640640
}
641641

642-
static final BiPredicate<Object, Object> DEFAULT_EQUALS_PREDICATE = equalsPredicate(Functions.identity());
643-
644-
@SuppressWarnings("unchecked")
645-
public static <T> BiPredicate<T, T> equalsPredicate() {
646-
return (BiPredicate<T, T>)DEFAULT_EQUALS_PREDICATE;
647-
}
648-
649-
static final class KeyedEqualsPredicate<T, K> implements BiPredicate<T, T> {
650-
final Function<? super T, K> keySelector;
651-
652-
KeyedEqualsPredicate(Function<? super T, K> keySelector) {
653-
this.keySelector = keySelector;
654-
}
655-
656-
@Override
657-
public boolean test(T t1, T t2) throws Exception {
658-
K k1 = ObjectHelper.requireNonNull(keySelector.apply(t1), "The keySelector returned a null key");
659-
K k2 = ObjectHelper.requireNonNull(keySelector.apply(t2), "The keySelector returned a null key");
660-
return ObjectHelper.equals(k1, k2);
661-
}
662-
}
663-
664-
public static <T, K> BiPredicate<T, T> equalsPredicate(Function<? super T, K> keySelector) {
665-
return new KeyedEqualsPredicate<T, K>(keySelector);
666-
}
667-
668642
public static final Consumer<Subscription> REQUEST_MAX = new Consumer<Subscription>() {
669643
@Override
670644
public void accept(Subscription t) throws Exception {

src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java

Lines changed: 73 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -15,41 +15,49 @@
1515

1616
import org.reactivestreams.*;
1717

18-
import io.reactivex.functions.BiPredicate;
18+
import io.reactivex.functions.*;
1919
import io.reactivex.internal.fuseable.ConditionalSubscriber;
2020
import io.reactivex.internal.subscribers.*;
2121

22-
public final class FlowableDistinctUntilChanged<T> extends AbstractFlowableWithUpstream<T, T> {
22+
public final class FlowableDistinctUntilChanged<T, K> extends AbstractFlowableWithUpstream<T, T> {
2323

24-
final BiPredicate<? super T, ? super T> comparer;
24+
final Function<? super T, K> keySelector;
2525

26-
public FlowableDistinctUntilChanged(Publisher<T> source, BiPredicate<? super T, ? super T> comparer) {
26+
final BiPredicate<? super K, ? super K> comparer;
27+
28+
public FlowableDistinctUntilChanged(Publisher<T> source, Function<? super T, K> keySelector, BiPredicate<? super K, ? super K> comparer) {
2729
super(source);
30+
this.keySelector = keySelector;
2831
this.comparer = comparer;
2932
}
3033

3134
@Override
3235
protected void subscribeActual(Subscriber<? super T> s) {
3336
if (s instanceof ConditionalSubscriber) {
3437
ConditionalSubscriber<? super T> cs = (ConditionalSubscriber<? super T>) s;
35-
source.subscribe(new DistinctUntilChangedConditionalSubscriber<T>(cs, comparer));
38+
source.subscribe(new DistinctUntilChangedConditionalSubscriber<T, K>(cs, keySelector, comparer));
3639
} else {
37-
source.subscribe(new DistinctUntilChangedSubscriber<T>(s, comparer));
40+
source.subscribe(new DistinctUntilChangedSubscriber<T, K>(s, keySelector, comparer));
3841
}
3942
}
4043

41-
static final class DistinctUntilChangedSubscriber<T> extends BasicFuseableSubscriber<T, T>
44+
static final class DistinctUntilChangedSubscriber<T, K> extends BasicFuseableSubscriber<T, T>
4245
implements ConditionalSubscriber<T> {
4346

44-
final BiPredicate<? super T, ? super T> comparer;
4547

46-
T last;
48+
final Function<? super T, K> keySelector;
49+
50+
final BiPredicate<? super K, ? super K> comparer;
51+
52+
K last;
4753

4854
boolean hasValue;
4955

5056
DistinctUntilChangedSubscriber(Subscriber<? super T> actual,
51-
BiPredicate<? super T, ? super T> comparer) {
57+
Function<? super T, K> keySelector,
58+
BiPredicate<? super K, ? super K> comparer) {
5259
super(actual);
60+
this.keySelector = keySelector;
5361
this.comparer = comparer;
5462
}
5563

@@ -70,23 +78,25 @@ public boolean tryOnNext(T t) {
7078
return true;
7179
}
7280

73-
if (hasValue) {
74-
boolean equal;
75-
try {
76-
equal = comparer.test(last, t);
77-
} catch (Throwable ex) {
78-
fail(ex);
79-
return false;
80-
}
81-
last = t;
82-
if (equal) {
83-
return false;
81+
K key;
82+
83+
try {
84+
key = keySelector.apply(t);
85+
if (hasValue) {
86+
boolean equal = comparer.test(last, key);
87+
last = key;
88+
if (equal) {
89+
return false;
90+
}
91+
} else {
92+
hasValue = true;
93+
last = key;
8494
}
85-
actual.onNext(t);
86-
return true;
95+
} catch (Throwable ex) {
96+
fail(ex);
97+
return true;
8798
}
88-
hasValue = true;
89-
last = t;
99+
90100
actual.onNext(t);
91101
return true;
92102
}
@@ -103,17 +113,18 @@ public T poll() throws Exception {
103113
if (v == null) {
104114
return null;
105115
}
116+
K key = keySelector.apply(v);
106117
if (!hasValue) {
107118
hasValue = true;
108-
last = v;
119+
last = key;
109120
return v;
110121
}
111122

112-
if (!comparer.test(last, v)) {
113-
last = v;
123+
if (!comparer.test(last, key)) {
124+
last = key;
114125
return v;
115126
}
116-
last = v;
127+
last = key;
117128
if (sourceMode != SYNC) {
118129
s.request(1);
119130
}
@@ -122,17 +133,21 @@ public T poll() throws Exception {
122133

123134
}
124135

125-
static final class DistinctUntilChangedConditionalSubscriber<T> extends BasicFuseableConditionalSubscriber<T, T> {
136+
static final class DistinctUntilChangedConditionalSubscriber<T, K> extends BasicFuseableConditionalSubscriber<T, T> {
137+
138+
final Function<? super T, K> keySelector;
126139

127-
final BiPredicate<? super T, ? super T> comparer;
140+
final BiPredicate<? super K, ? super K> comparer;
128141

129-
T last;
142+
K last;
130143

131144
boolean hasValue;
132145

133146
DistinctUntilChangedConditionalSubscriber(ConditionalSubscriber<? super T> actual,
134-
BiPredicate<? super T, ? super T> comparer) {
147+
Function<? super T, K> keySelector,
148+
BiPredicate<? super K, ? super K> comparer) {
135149
super(actual);
150+
this.keySelector = keySelector;
136151
this.comparer = comparer;
137152
}
138153

@@ -152,20 +167,27 @@ public boolean tryOnNext(T t) {
152167
return actual.tryOnNext(t);
153168
}
154169

155-
if (hasValue) {
156-
boolean equal;
157-
try {
158-
equal = comparer.test(last, t);
159-
} catch (Throwable ex) {
160-
fail(ex);
161-
return false;
170+
K key;
171+
172+
try {
173+
key = keySelector.apply(t);
174+
if (hasValue) {
175+
boolean equal = comparer.test(last, key);
176+
last = key;
177+
if (equal) {
178+
return false;
179+
}
180+
} else {
181+
hasValue = true;
182+
last = key;
162183
}
163-
last = t;
164-
return !equal && actual.tryOnNext(t);
184+
} catch (Throwable ex) {
185+
fail(ex);
186+
return true;
165187
}
166-
hasValue = true;
167-
last = t;
168-
return actual.tryOnNext(t);
188+
189+
actual.onNext(t);
190+
return true;
169191
}
170192

171193
@Override
@@ -180,16 +202,18 @@ public T poll() throws Exception {
180202
if (v == null) {
181203
return null;
182204
}
205+
K key = keySelector.apply(v);
183206
if (!hasValue) {
184207
hasValue = true;
185-
last = v;
208+
last = key;
186209
return v;
187210
}
188-
if (!comparer.test(last, v)) {
189-
last = v;
211+
212+
if (!comparer.test(last, key)) {
213+
last = key;
190214
return v;
191215
}
192-
last = v;
216+
last = key;
193217
if (sourceMode != SYNC) {
194218
s.request(1);
195219
}

0 commit comments

Comments
 (0)