From cd91082e27b8757006e665dd25eed0d37bc1ae53 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 21 Oct 2016 18:57:27 +0200 Subject: [PATCH 1/2] 2.x: distinctUntilChanged to store the selected key instead of the value --- src/main/java/io/reactivex/Flowable.java | 8 +- src/main/java/io/reactivex/Observable.java | 6 +- .../internal/functions/Functions.java | 26 ---- .../FlowableDistinctUntilChanged.java | 122 +++++++++++------- .../ObservableDistinctUntilChanged.java | 67 ++++++---- .../FlowableDistinctUntilChangedTest.java | 30 ++++- .../ObservableDistinctUntilChangedTest.java | 28 +++- 7 files changed, 174 insertions(+), 113 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 8fb991144a..7533c5a991 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -25,7 +25,7 @@ import io.reactivex.internal.functions.*; import io.reactivex.internal.fuseable.ScalarCallable; import io.reactivex.internal.operators.flowable.*; -import io.reactivex.internal.operators.observable.*; +import io.reactivex.internal.operators.observable.ObservableFromPublisher; import io.reactivex.internal.schedulers.ImmediateThinScheduler; import io.reactivex.internal.subscribers.*; import io.reactivex.internal.util.*; @@ -7266,7 +7266,7 @@ public final Flowable distinct(Function keySelector, @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final Flowable distinctUntilChanged() { - return new FlowableDistinctUntilChanged(this, Functions.equalsPredicate()); + return distinctUntilChanged(Functions.identity()); } /** @@ -7294,7 +7294,7 @@ public final Flowable distinctUntilChanged() { @SchedulerSupport(SchedulerSupport.NONE) public final Flowable distinctUntilChanged(Function keySelector) { ObjectHelper.requireNonNull(keySelector, "keySelector is null"); - return new FlowableDistinctUntilChanged(this, Functions.equalsPredicate(keySelector)); + return RxJavaPlugins.onAssembly(new FlowableDistinctUntilChanged(this, keySelector, ObjectHelper.equalsPredicate())); } /** @@ -7321,7 +7321,7 @@ public final Flowable distinctUntilChanged(Function keySele @SchedulerSupport(SchedulerSupport.NONE) public final Flowable distinctUntilChanged(BiPredicate comparer) { ObjectHelper.requireNonNull(comparer, "comparer is null"); - return RxJavaPlugins.onAssembly(new FlowableDistinctUntilChanged(this, comparer)); + return RxJavaPlugins.onAssembly(new FlowableDistinctUntilChanged(this, Functions.identity(), comparer)); } /** diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 40354b8163..746a342a39 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -6333,7 +6333,7 @@ public final Observable distinct(Function keySelector, Call */ @SchedulerSupport(SchedulerSupport.NONE) public final Observable distinctUntilChanged() { - return new ObservableDistinctUntilChanged(this, Functions.equalsPredicate()); + return distinctUntilChanged(Functions.identity()); } /** @@ -6357,7 +6357,7 @@ public final Observable distinctUntilChanged() { @SchedulerSupport(SchedulerSupport.NONE) public final Observable distinctUntilChanged(Function keySelector) { ObjectHelper.requireNonNull(keySelector, "keySelector is null"); - return new ObservableDistinctUntilChanged(this, Functions.equalsPredicate(keySelector)); + return RxJavaPlugins.onAssembly(new ObservableDistinctUntilChanged(this, keySelector, ObjectHelper.equalsPredicate())); } /** @@ -6380,7 +6380,7 @@ public final Observable distinctUntilChanged(Function keySe @SchedulerSupport(SchedulerSupport.NONE) public final Observable distinctUntilChanged(BiPredicate comparer) { ObjectHelper.requireNonNull(comparer, "comparer is null"); - return RxJavaPlugins.onAssembly(new ObservableDistinctUntilChanged(this, comparer)); + return RxJavaPlugins.onAssembly(new ObservableDistinctUntilChanged(this, Functions.identity(), comparer)); } /** diff --git a/src/main/java/io/reactivex/internal/functions/Functions.java b/src/main/java/io/reactivex/internal/functions/Functions.java index e3dc91b2f1..9a6337316c 100644 --- a/src/main/java/io/reactivex/internal/functions/Functions.java +++ b/src/main/java/io/reactivex/internal/functions/Functions.java @@ -639,32 +639,6 @@ public static Function, List> listSorter(final Comparator(comparator); } - static final BiPredicate DEFAULT_EQUALS_PREDICATE = equalsPredicate(Functions.identity()); - - @SuppressWarnings("unchecked") - public static BiPredicate equalsPredicate() { - return (BiPredicate)DEFAULT_EQUALS_PREDICATE; - } - - static final class KeyedEqualsPredicate implements BiPredicate { - final Function keySelector; - - KeyedEqualsPredicate(Function keySelector) { - this.keySelector = keySelector; - } - - @Override - public boolean test(T t1, T t2) throws Exception { - K k1 = ObjectHelper.requireNonNull(keySelector.apply(t1), "The keySelector returned a null key"); - K k2 = ObjectHelper.requireNonNull(keySelector.apply(t2), "The keySelector returned a null key"); - return ObjectHelper.equals(k1, k2); - } - } - - public static BiPredicate equalsPredicate(Function keySelector) { - return new KeyedEqualsPredicate(keySelector); - } - public static final Consumer REQUEST_MAX = new Consumer() { @Override public void accept(Subscription t) throws Exception { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java index 6ed807ffe1..f19a796f52 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java @@ -15,16 +15,19 @@ import org.reactivestreams.*; -import io.reactivex.functions.BiPredicate; +import io.reactivex.functions.*; import io.reactivex.internal.fuseable.ConditionalSubscriber; import io.reactivex.internal.subscribers.*; -public final class FlowableDistinctUntilChanged extends AbstractFlowableWithUpstream { +public final class FlowableDistinctUntilChanged extends AbstractFlowableWithUpstream { - final BiPredicate comparer; + final Function keySelector; + + final BiPredicate comparer; - public FlowableDistinctUntilChanged(Publisher source, BiPredicate comparer) { + public FlowableDistinctUntilChanged(Publisher source, Function keySelector, BiPredicate comparer) { super(source); + this.keySelector = keySelector; this.comparer = comparer; } @@ -32,24 +35,29 @@ public FlowableDistinctUntilChanged(Publisher source, BiPredicate s) { if (s instanceof ConditionalSubscriber) { ConditionalSubscriber cs = (ConditionalSubscriber) s; - source.subscribe(new DistinctUntilChangedConditionalSubscriber(cs, comparer)); + source.subscribe(new DistinctUntilChangedConditionalSubscriber(cs, keySelector, comparer)); } else { - source.subscribe(new DistinctUntilChangedSubscriber(s, comparer)); + source.subscribe(new DistinctUntilChangedSubscriber(s, keySelector, comparer)); } } - static final class DistinctUntilChangedSubscriber extends BasicFuseableSubscriber + static final class DistinctUntilChangedSubscriber extends BasicFuseableSubscriber implements ConditionalSubscriber { - final BiPredicate comparer; - T last; + final Function keySelector; + + final BiPredicate comparer; + + K last; boolean hasValue; DistinctUntilChangedSubscriber(Subscriber actual, - BiPredicate comparer) { + Function keySelector, + BiPredicate comparer) { super(actual); + this.keySelector = keySelector; this.comparer = comparer; } @@ -70,23 +78,25 @@ public boolean tryOnNext(T t) { return true; } - if (hasValue) { - boolean equal; - try { - equal = comparer.test(last, t); - } catch (Throwable ex) { - fail(ex); - return false; - } - last = t; - if (equal) { - return false; + K key; + + try { + key = keySelector.apply(t); + if (hasValue) { + boolean equal = comparer.test(last, key); + last = key; + if (equal) { + return false; + } + } else { + hasValue = true; + last = key; } - actual.onNext(t); - return true; + } catch (Throwable ex) { + fail(ex); + return true; } - hasValue = true; - last = t; + actual.onNext(t); return true; } @@ -103,17 +113,18 @@ public T poll() throws Exception { if (v == null) { return null; } + K key = keySelector.apply(v); if (!hasValue) { hasValue = true; - last = v; + last = key; return v; } - if (!comparer.test(last, v)) { - last = v; + if (!comparer.test(last, key)) { + last = key; return v; } - last = v; + last = key; if (sourceMode != SYNC) { s.request(1); } @@ -122,17 +133,21 @@ public T poll() throws Exception { } - static final class DistinctUntilChangedConditionalSubscriber extends BasicFuseableConditionalSubscriber { + static final class DistinctUntilChangedConditionalSubscriber extends BasicFuseableConditionalSubscriber { - final BiPredicate comparer; + final Function keySelector; + + final BiPredicate comparer; - T last; + K last; boolean hasValue; DistinctUntilChangedConditionalSubscriber(ConditionalSubscriber actual, - BiPredicate comparer) { + Function keySelector, + BiPredicate comparer) { super(actual); + this.keySelector = keySelector; this.comparer = comparer; } @@ -152,20 +167,27 @@ public boolean tryOnNext(T t) { return actual.tryOnNext(t); } - if (hasValue) { - boolean equal; - try { - equal = comparer.test(last, t); - } catch (Throwable ex) { - fail(ex); - return false; + K key; + + try { + key = keySelector.apply(t); + if (hasValue) { + boolean equal = comparer.test(last, key); + last = key; + if (equal) { + return false; + } + } else { + hasValue = true; + last = key; } - last = t; - return !equal && actual.tryOnNext(t); + } catch (Throwable ex) { + fail(ex); + return true; } - hasValue = true; - last = t; - return actual.tryOnNext(t); + + actual.onNext(t); + return true; } @Override @@ -180,16 +202,18 @@ public T poll() throws Exception { if (v == null) { return null; } + K key = keySelector.apply(v); if (!hasValue) { hasValue = true; - last = v; + last = key; return v; } - if (!comparer.test(last, v)) { - last = v; + + if (!comparer.test(last, key)) { + last = key; return v; } - last = v; + last = key; if (sourceMode != SYNC) { s.request(1); } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChanged.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChanged.java index ea1e77502f..10a5299deb 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChanged.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChanged.java @@ -14,34 +14,41 @@ package io.reactivex.internal.operators.observable; import io.reactivex.*; -import io.reactivex.functions.BiPredicate; +import io.reactivex.functions.*; import io.reactivex.internal.observers.BasicFuseableObserver; -public final class ObservableDistinctUntilChanged extends AbstractObservableWithUpstream { +public final class ObservableDistinctUntilChanged extends AbstractObservableWithUpstream { - final BiPredicate comparer; + final Function keySelector; + + final BiPredicate comparer; - public ObservableDistinctUntilChanged(ObservableSource source, BiPredicate comparer) { + public ObservableDistinctUntilChanged(ObservableSource source, Function keySelector, BiPredicate comparer) { super(source); + this.keySelector = keySelector; this.comparer = comparer; } @Override protected void subscribeActual(Observer s) { - source.subscribe(new DistinctUntilChangedObserver(s, comparer)); + source.subscribe(new DistinctUntilChangedObserver(s, keySelector, comparer)); } - static final class DistinctUntilChangedObserver extends BasicFuseableObserver { + static final class DistinctUntilChangedObserver extends BasicFuseableObserver { - final BiPredicate comparer; + final Function keySelector; + + final BiPredicate comparer; - T last; + K last; boolean hasValue; DistinctUntilChangedObserver(Observer actual, - BiPredicate comparer) { + Function keySelector, + BiPredicate comparer) { super(actual); + this.keySelector = keySelector; this.comparer = comparer; } @@ -55,24 +62,27 @@ public void onNext(T t) { return; } - if (hasValue) { - boolean equal; - try { - equal = comparer.test(last, t); - } catch (Throwable ex) { - fail(ex); - return; - } - last = t; - if (equal) { - return; + K key; + + try { + key = keySelector.apply(t); + if (hasValue) { + boolean equal = comparer.test(last, key); + last = key; + if (equal) { + return; + } + } else { + hasValue = true; + last = key; } - actual.onNext(t); - return; + } catch (Throwable ex) { + fail(ex); + return; } - hasValue = true; - last = t; + actual.onNext(t); + return; } @Override @@ -87,17 +97,18 @@ public T poll() throws Exception { if (v == null) { return null; } + K key = keySelector.apply(v); if (!hasValue) { hasValue = true; - last = v; + last = key; return v; } - if (!comparer.test(last, v)) { - last = v; + if (!comparer.test(last, key)) { + last = key; return v; } - last = v; + last = key; } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChangedTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChangedTest.java index cb8c5aa978..928cc4cfdd 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChangedTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChangedTest.java @@ -30,7 +30,7 @@ import io.reactivex.internal.fuseable.*; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.processors.UnicastProcessor; +import io.reactivex.processors.*; import io.reactivex.subscribers.*; public class FlowableDistinctUntilChangedTest { @@ -340,5 +340,31 @@ public boolean test(Integer a, Integer b) throws Exception { } finally { RxJavaPlugins.reset(); } - } + } + + class Mutable { + int value; + } + + @Test + public void mutableWithSelector() { + Mutable m = new Mutable(); + + PublishProcessor pp = PublishProcessor.create(); + + TestSubscriber ts = pp.distinctUntilChanged(new Function() { + @Override + public Object apply(Mutable m) throws Exception { + return m.value; + } + }) + .test(); + + pp.onNext(m); + m.value = 1; + pp.onNext(m); + pp.onComplete(); + + ts.assertResult(m, m); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChangedTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChangedTest.java index 434d6b70a8..6375f886a6 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChangedTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChangedTest.java @@ -29,7 +29,7 @@ import io.reactivex.internal.fuseable.QueueDisposable; import io.reactivex.observers.*; import io.reactivex.plugins.RxJavaPlugins; -import io.reactivex.subjects.UnicastSubject; +import io.reactivex.subjects.*; public class ObservableDistinctUntilChangedTest { @@ -248,4 +248,30 @@ public boolean test(Integer a, Integer b) throws Exception { RxJavaPlugins.reset(); } } + + class Mutable { + int value; + } + + @Test + public void mutableWithSelector() { + Mutable m = new Mutable(); + + PublishSubject pp = PublishSubject.create(); + + TestObserver ts = pp.distinctUntilChanged(new Function() { + @Override + public Object apply(Mutable m) throws Exception { + return m.value; + } + }) + .test(); + + pp.onNext(m); + m.value = 1; + pp.onNext(m); + pp.onComplete(); + + ts.assertResult(m, m); + } } \ No newline at end of file From 0a7db2abb0b67a2d37068e59ac4552b4cdc981da Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 21 Oct 2016 19:40:33 +0200 Subject: [PATCH 2/2] Fix null test and whitespaces --- .../flowable/FlowableDistinctUntilChanged.java | 14 +++++++------- .../observable/ObservableDistinctUntilChanged.java | 8 ++++---- .../io/reactivex/flowable/FlowableNullTests.java | 4 ++-- .../flowable/FlowableDistinctUntilChangedTest.java | 4 ++-- .../ObservableDistinctUntilChangedTest.java | 4 ++-- .../reactivex/observable/ObservableNullTests.java | 4 ++-- 6 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java index f19a796f52..c0155b9aa7 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChanged.java @@ -22,7 +22,7 @@ public final class FlowableDistinctUntilChanged extends AbstractFlowableWithUpstream { final Function keySelector; - + final BiPredicate comparer; public FlowableDistinctUntilChanged(Publisher source, Function keySelector, BiPredicate comparer) { @@ -46,7 +46,7 @@ static final class DistinctUntilChangedSubscriber extends BasicFuseableSub final Function keySelector; - + final BiPredicate comparer; K last; @@ -79,7 +79,7 @@ public boolean tryOnNext(T t) { } K key; - + try { key = keySelector.apply(t); if (hasValue) { @@ -96,7 +96,7 @@ public boolean tryOnNext(T t) { fail(ex); return true; } - + actual.onNext(t); return true; } @@ -136,7 +136,7 @@ public T poll() throws Exception { static final class DistinctUntilChangedConditionalSubscriber extends BasicFuseableConditionalSubscriber { final Function keySelector; - + final BiPredicate comparer; K last; @@ -168,7 +168,7 @@ public boolean tryOnNext(T t) { } K key; - + try { key = keySelector.apply(t); if (hasValue) { @@ -185,7 +185,7 @@ public boolean tryOnNext(T t) { fail(ex); return true; } - + actual.onNext(t); return true; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChanged.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChanged.java index 10a5299deb..ae24b357f9 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChanged.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChanged.java @@ -20,7 +20,7 @@ public final class ObservableDistinctUntilChanged extends AbstractObservableWithUpstream { final Function keySelector; - + final BiPredicate comparer; public ObservableDistinctUntilChanged(ObservableSource source, Function keySelector, BiPredicate comparer) { @@ -37,7 +37,7 @@ protected void subscribeActual(Observer s) { static final class DistinctUntilChangedObserver extends BasicFuseableObserver { final Function keySelector; - + final BiPredicate comparer; K last; @@ -63,7 +63,7 @@ public void onNext(T t) { } K key; - + try { key = keySelector.apply(t); if (hasValue) { @@ -80,7 +80,7 @@ public void onNext(T t) { fail(ex); return; } - + actual.onNext(t); return; } diff --git a/src/test/java/io/reactivex/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/flowable/FlowableNullTests.java index 16e52117c2..01cbf4f2c2 100644 --- a/src/test/java/io/reactivex/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableNullTests.java @@ -1167,14 +1167,14 @@ public void distinctUntilChangedBiPredicateNull() { just1.distinctUntilChanged((BiPredicate)null); } - @Test(expected = NullPointerException.class) + @Test public void distinctUntilChangedFunctionReturnsNull() { Flowable.range(1, 2).distinctUntilChanged(new Function() { @Override public Object apply(Integer v) { return null; } - }).blockingSubscribe(); + }).test().assertResult(1); } @Test(expected = NullPointerException.class) diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChangedTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChangedTest.java index 928cc4cfdd..52fd6e336f 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChangedTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDistinctUntilChangedTest.java @@ -349,9 +349,9 @@ class Mutable { @Test public void mutableWithSelector() { Mutable m = new Mutable(); - + PublishProcessor pp = PublishProcessor.create(); - + TestSubscriber ts = pp.distinctUntilChanged(new Function() { @Override public Object apply(Mutable m) throws Exception { diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChangedTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChangedTest.java index 6375f886a6..48ac37cafd 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChangedTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDistinctUntilChangedTest.java @@ -256,9 +256,9 @@ class Mutable { @Test public void mutableWithSelector() { Mutable m = new Mutable(); - + PublishSubject pp = PublishSubject.create(); - + TestObserver ts = pp.distinctUntilChanged(new Function() { @Override public Object apply(Mutable m) throws Exception { diff --git a/src/test/java/io/reactivex/observable/ObservableNullTests.java b/src/test/java/io/reactivex/observable/ObservableNullTests.java index 8aedbb83a8..5dda9022cf 100644 --- a/src/test/java/io/reactivex/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/observable/ObservableNullTests.java @@ -1257,14 +1257,14 @@ public void distinctUntilChangedBiPredicateNull() { just1.distinctUntilChanged((BiPredicate)null); } - @Test(expected = NullPointerException.class) + @Test public void distinctUntilChangedFunctionReturnsNull() { Observable.range(1, 2).distinctUntilChanged(new Function() { @Override public Object apply(Integer v) { return null; } - }).blockingSubscribe(); + }).test().assertResult(1); } @Test(expected = NullPointerException.class)