From 6065cc9139d1fa0eb19fcd2376be4d95083b4e5f Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 3 Feb 2017 19:10:33 +0100 Subject: [PATCH] 2.x: fix replay() cancel/dispose NPE --- .../operators/flowable/FlowableReplay.java | 14 +++++------ .../observable/ObservableReplay.java | 13 +++++------ .../flowable/FlowablePublishTest.java | 21 +++++++++++++++++ .../flowable/FlowableReplayTest.java | 20 ++++++++++++++++ .../observable/ObservablePublishTest.java | 21 +++++++++++++++++ .../observable/ObservableReplayTest.java | 23 ++++++++++++++++++- 6 files changed, 97 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index b9d2338b77..ce0c33054c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -336,7 +336,10 @@ public void connect(Consumer connection) { } @SuppressWarnings("rawtypes") - static final class ReplaySubscriber implements Subscriber, Disposable { + static final class ReplaySubscriber + extends AtomicReference + implements Subscriber, Disposable { + private static final long serialVersionUID = 7224554242710036740L; /** Holds notifications from upstream. */ final ReplayBuffer buffer; /** Indicates this Subscriber received a terminal event. */ @@ -361,8 +364,6 @@ static final class ReplaySubscriber implements Subscriber, Disposable { long maxChildRequested; /** Counts the outstanding upstream requests until the producer arrives. */ long maxUpstreamRequested; - /** The upstream producer. */ - volatile Subscription subscription; @SuppressWarnings("unchecked") ReplaySubscriber(ReplayBuffer buffer) { @@ -386,7 +387,7 @@ public void dispose() { // current.compareAndSet(ReplaySubscriber.this, null); // we don't care if it fails because it means the current has // been replaced in the meantime - subscription.cancel(); + SubscriptionHelper.cancel(this); } /** @@ -476,8 +477,7 @@ void remove(InnerSubscription p) { @Override public void onSubscribe(Subscription p) { - if (SubscriptionHelper.validate(subscription, p)) { - subscription = p; + if (SubscriptionHelper.setOnce(this, p)) { manageRequests(); for (InnerSubscription rp : subscribers.get()) { buffer.replay(rp); @@ -548,7 +548,7 @@ void manageRequests() { } long ur = maxUpstreamRequested; - Subscription p = subscription; + Subscription p = get(); long diff = maxTotalRequests - ri; if (diff != 0L) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index 382254f864..14884030a2 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -316,7 +316,10 @@ public void connect(Consumer connection) { } @SuppressWarnings("rawtypes") - static final class ReplayObserver implements Observer, Disposable { + static final class ReplayObserver + extends AtomicReference + implements Observer, Disposable { + private static final long serialVersionUID = -533785617179540163L; /** Holds notifications from upstream. */ final ReplayBuffer buffer; /** Indicates this Observer received a terminal event. */ @@ -335,9 +338,6 @@ static final class ReplayObserver implements Observer, Disposable { */ final AtomicBoolean shouldConnect; - /** The upstream producer. */ - volatile Disposable subscription; - ReplayObserver(ReplayBuffer buffer) { this.buffer = buffer; @@ -358,7 +358,7 @@ public void dispose() { // current.compareAndSet(ReplayObserver.this, null); // we don't care if it fails because it means the current has // been replaced in the meantime - subscription.dispose(); + DisposableHelper.dispose(this); } /** @@ -444,8 +444,7 @@ void remove(InnerDisposable producer) { @Override public void onSubscribe(Disposable p) { - if (DisposableHelper.validate(this.subscription, p)) { - subscription = p; + if (DisposableHelper.setOnce(this, p)) { replay(); } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java index e449c04d56..a713a3a920 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishTest.java @@ -873,4 +873,25 @@ public void subscribe(FlowableEmitter s) throws Exception { .test(0L) .assertFailure(MissingBackpressureException.class); } + + @Test + public void delayedUpstreamOnSubscribe() { + final Subscriber[] sub = { null }; + + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + sub[0] = s; + } + } + .publish() + .connect() + .dispose(); + + BooleanSubscription bs = new BooleanSubscription(); + + sub[0].onSubscribe(bs); + + assertTrue(bs.isCancelled()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java index ff10a30a23..68da937bee 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java @@ -1711,4 +1711,24 @@ public void testSizedTruncation() { Assert.assertFalse(buf.hasError()); } + @Test + public void delayedUpstreamOnSubscribe() { + final Subscriber[] sub = { null }; + + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + sub[0] = s; + } + } + .replay() + .connect() + .dispose(); + + BooleanSubscription bs = new BooleanSubscription(); + + sub[0].onSubscribe(bs); + + assertTrue(bs.isCancelled()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java index df1b3d1d72..b19d26edb8 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservablePublishTest.java @@ -699,4 +699,25 @@ public ObservableSource apply(Observable v) throws Exception { assertFalse(ps.hasObservers()); } + + @Test + public void delayedUpstreamOnSubscribe() { + final Observer[] sub = { null }; + + new Observable() { + @Override + protected void subscribeActual(Observer s) { + sub[0] = s; + } + } + .publish() + .connect() + .dispose(); + + Disposable bs = Disposables.empty(); + + sub[0].onSubscribe(bs); + + assertTrue(bs.isDisposed()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index 0d87564aed..7d3fa0504d 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -21,7 +21,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; -import io.reactivex.annotations.NonNull; import org.junit.*; import org.mockito.InOrder; @@ -29,6 +28,7 @@ import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.Scheduler.Worker; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; @@ -1490,4 +1490,25 @@ public void onNext(Integer t) { to.assertValues(1); } + + @Test + public void delayedUpstreamOnSubscribe() { + final Observer[] sub = { null }; + + new Observable() { + @Override + protected void subscribeActual(Observer s) { + sub[0] = s; + } + } + .replay() + .connect() + .dispose(); + + Disposable bs = Disposables.empty(); + + sub[0].onSubscribe(bs); + + assertTrue(bs.isDisposed()); + } }