From 488fcd08f857b887ae5d8b69c8ac02bf7c989804 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 16 Feb 2017 16:48:37 +0100 Subject: [PATCH] 2.x: fix Maybe.concat() subscribe-after-cancel, verify others --- .../operators/maybe/MaybeConcatArray.java | 5 +- .../maybe/MaybeConcatArrayDelayError.java | 5 +- .../operators/maybe/MaybeConcatIterable.java | 5 +- .../flowable/FlowableConcatTest.java | 84 +++++++++++++++++++ .../operators/maybe/MaybeConcatArrayTest.java | 41 +++++++++ .../maybe/MaybeConcatIterableTest.java | 42 ++++++++++ .../observable/ObservableConcatTest.java | 83 ++++++++++++++++++ .../operators/single/SingleConcatTest.java | 43 ++++++++++ 8 files changed, 302 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArray.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArray.java index 259af34c48..b67534ade9 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArray.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArray.java @@ -114,9 +114,10 @@ void drain() { AtomicReference c = current; Subscriber a = actual; + Disposable cancelled = disposables; for (;;) { - if (disposables.isDisposed()) { + if (cancelled.isDisposed()) { c.lazySet(null); return; } @@ -141,7 +142,7 @@ void drain() { c.lazySet(null); } - if (goNextSource) { + if (goNextSource && !cancelled.isDisposed()) { int i = index; if (i == sources.length) { a.onComplete(); diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayDelayError.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayDelayError.java index 0bf703eb64..ebff9986d9 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayDelayError.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayDelayError.java @@ -124,9 +124,10 @@ void drain() { AtomicReference c = current; Subscriber a = actual; + Disposable cancelled = disposables; for (;;) { - if (disposables.isDisposed()) { + if (cancelled.isDisposed()) { c.lazySet(null); return; } @@ -151,7 +152,7 @@ void drain() { c.lazySet(null); } - if (goNextSource) { + if (goNextSource && !cancelled.isDisposed()) { int i = index; if (i == sources.length) { Throwable ex = errors.get(); diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatIterable.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatIterable.java index 707cb0fc2d..1a3c286c71 100644 --- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatIterable.java +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeConcatIterable.java @@ -126,9 +126,10 @@ void drain() { AtomicReference c = current; Subscriber a = actual; + Disposable cancelled = disposables; for (;;) { - if (disposables.isDisposed()) { + if (cancelled.isDisposed()) { c.lazySet(null); return; } @@ -153,7 +154,7 @@ void drain() { c.lazySet(null); } - if (goNextSource) { + if (goNextSource && !cancelled.isDisposed()) { boolean b; try { diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java index a0fc52d12d..82c8765f85 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatTest.java @@ -1542,4 +1542,88 @@ public Publisher apply(Integer v) throws Exception { .test() .assertFailure(TestException.class); } + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscription() { + final int[] calls = { 0 }; + + Flowable source = Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter s) throws Exception { + calls[0]++; + s.onNext(1); + s.onComplete(); + } + }, BackpressureStrategy.MISSING); + + Flowable.concatArray(source, source).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscriptionDelayError() { + final int[] calls = { 0 }; + + Flowable source = Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter s) throws Exception { + calls[0]++; + s.onNext(1); + s.onComplete(); + } + }, BackpressureStrategy.MISSING); + + Flowable.concatArrayDelayError(source, source).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscriptionIterable() { + final int[] calls = { 0 }; + + Flowable source = Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter s) throws Exception { + calls[0]++; + s.onNext(1); + s.onComplete(); + } + }, BackpressureStrategy.MISSING); + + Flowable.concat(Arrays.asList(source, source)).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscriptionDelayErrorIterable() { + final int[] calls = { 0 }; + + Flowable source = Flowable.create(new FlowableOnSubscribe() { + @Override + public void subscribe(FlowableEmitter s) throws Exception { + calls[0]++; + s.onNext(1); + s.onComplete(); + } + }, BackpressureStrategy.MISSING); + + Flowable.concatDelayError(Arrays.asList(source, source)).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java index 479c29140e..04773bcedf 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatArrayTest.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.util.List; +import static org.junit.Assert.*; import org.junit.Test; import io.reactivex.*; @@ -156,4 +157,44 @@ protected void subscribeActual(MaybeObserver observer) { RxJavaPlugins.reset(); } } + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscription() { + final int[] calls = { 0 }; + + Maybe source = Maybe.create(new MaybeOnSubscribe() { + @Override + public void subscribe(MaybeEmitter s) throws Exception { + calls[0]++; + s.onSuccess(1); + } + }); + + Maybe.concatArray(source, source).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscriptionDelayError() { + final int[] calls = { 0 }; + + Maybe source = Maybe.create(new MaybeOnSubscribe() { + @Override + public void subscribe(MaybeEmitter s) throws Exception { + calls[0]++; + s.onSuccess(1); + } + }); + + Maybe.concatArrayDelayError(source, source).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } } diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatIterableTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatIterableTest.java index aeb5d6d963..4bc41e8af7 100644 --- a/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatIterableTest.java +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeConcatIterableTest.java @@ -13,6 +13,8 @@ package io.reactivex.internal.operators.maybe; +import static org.junit.Assert.assertEquals; + import java.util.*; import org.junit.Test; @@ -121,4 +123,44 @@ public Maybe apply(Integer v) throws Exception { .test() .assertFailure(NullPointerException.class); } + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscription() { + final int[] calls = { 0 }; + + Maybe source = Maybe.create(new MaybeOnSubscribe() { + @Override + public void subscribe(MaybeEmitter s) throws Exception { + calls[0]++; + s.onSuccess(1); + } + }); + + Maybe.concat(Arrays.asList(source, source)).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscriptionDelayError() { + final int[] calls = { 0 }; + + Maybe source = Maybe.create(new MaybeOnSubscribe() { + @Override + public void subscribe(MaybeEmitter s) throws Exception { + calls[0]++; + s.onSuccess(1); + } + }); + + Maybe.concatDelayError(Arrays.asList(source, source)).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatTest.java index bb0983def9..a2489dcff1 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableConcatTest.java @@ -958,4 +958,87 @@ public ObservableSource apply(Object v) throws Exception { } + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscription() { + final int[] calls = { 0 }; + + Observable source = Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter s) throws Exception { + calls[0]++; + s.onNext(1); + s.onComplete(); + } + }); + + Observable.concatArray(source, source).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscriptionDelayError() { + final int[] calls = { 0 }; + + Observable source = Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter s) throws Exception { + calls[0]++; + s.onNext(1); + s.onComplete(); + } + }); + + Observable.concatArrayDelayError(source, source).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscriptionIterable() { + final int[] calls = { 0 }; + + Observable source = Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter s) throws Exception { + calls[0]++; + s.onNext(1); + s.onComplete(); + } + }); + + Observable.concat(Arrays.asList(source, source)).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscriptionDelayErrorIterable() { + final int[] calls = { 0 }; + + Observable source = Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(ObservableEmitter s) throws Exception { + calls[0]++; + s.onNext(1); + s.onComplete(); + } + }); + + Observable.concatDelayError(Arrays.asList(source, source)).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleConcatTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleConcatTest.java index 9100a9589b..8068f9e5bd 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleConcatTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleConcatTest.java @@ -13,6 +13,8 @@ package io.reactivex.internal.operators.single; +import static org.junit.Assert.assertEquals; + import java.util.Arrays; import org.junit.Test; @@ -81,4 +83,45 @@ public void concatObservable() { .assertComplete(); } } + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscription() { + final int[] calls = { 0 }; + + Single source = Single.create(new SingleOnSubscribe() { + @Override + public void subscribe(SingleEmitter s) throws Exception { + calls[0]++; + s.onSuccess(1); + } + }); + + Single.concatArray(source, source).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } + + + @SuppressWarnings("unchecked") + @Test + public void noSubsequentSubscriptionIterable() { + final int[] calls = { 0 }; + + Single source = Single.create(new SingleOnSubscribe() { + @Override + public void subscribe(SingleEmitter s) throws Exception { + calls[0]++; + s.onSuccess(1); + } + }); + + Single.concat(Arrays.asList(source, source)).firstElement() + .test() + .assertResult(1); + + assertEquals(1, calls[0]); + } }