diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java index f966f01365..bc11aa5425 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableRefCount.java @@ -141,7 +141,11 @@ void timeout(RefConnection rc) { if (source instanceof Disposable) { ((Disposable)source).dispose(); } else if (source instanceof ResettableConnectable) { - ((ResettableConnectable)source).resetIf(connectionObject); + if (connectionObject == null) { + rc.disconnectedEarly = true; + } else { + ((ResettableConnectable)source).resetIf(connectionObject); + } } } } @@ -160,6 +164,8 @@ static final class RefConnection extends AtomicReference boolean connected; + boolean disconnectedEarly; + RefConnection(FlowableRefCount parent) { this.parent = parent; } @@ -172,6 +178,11 @@ public void run() { @Override public void accept(Disposable t) throws Exception { DisposableHelper.replace(this, t); + synchronized (parent) { + if (disconnectedEarly) { + ((ResettableConnectable)parent.source).resetIf(t); + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java index 3dced24de6..5abc174350 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRefCount.java @@ -135,10 +135,15 @@ void timeout(RefConnection rc) { connection = null; Disposable connectionObject = rc.get(); DisposableHelper.dispose(rc); + if (source instanceof Disposable) { ((Disposable)source).dispose(); } else if (source instanceof ResettableConnectable) { - ((ResettableConnectable)source).resetIf(connectionObject); + if (connectionObject == null) { + rc.disconnectedEarly = true; + } else { + ((ResettableConnectable)source).resetIf(connectionObject); + } } } } @@ -157,6 +162,8 @@ static final class RefConnection extends AtomicReference boolean connected; + boolean disconnectedEarly; + RefConnection(ObservableRefCount parent) { this.parent = parent; } @@ -169,6 +176,11 @@ public void run() { @Override public void accept(Disposable t) throws Exception { DisposableHelper.replace(this, t); + synchronized (parent) { + if (disconnectedEarly) { + ((ResettableConnectable)parent.source).resetIf(t); + } + } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index 289170b254..673a0f4add 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -1394,4 +1394,19 @@ public void timeoutDisposesSource() { assertTrue(((Disposable)o.source).isDisposed()); } + + @Test + public void disconnectBeforeConnect() { + BehaviorProcessor processor = BehaviorProcessor.create(); + + Flowable flowable = processor + .replay(1) + .refCount(); + + flowable.takeUntil(Flowable.just(1)).test(); + + processor.onNext(2); + + flowable.take(1).test().assertResult(2); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index ea69a1d500..0f0d930d8d 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -1345,4 +1345,19 @@ public void timeoutDisposesSource() { assertTrue(((Disposable)o.source).isDisposed()); } + + @Test + public void disconnectBeforeConnect() { + BehaviorSubject subject = BehaviorSubject.create(); + + Observable observable = subject + .replay(1) + .refCount(); + + observable.takeUntil(Observable.just(1)).test(); + + subject.onNext(2); + + observable.take(1).test().assertResult(2); + } }