diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java index 0e3fe58b83..d9d6ffa517 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySelector.java @@ -71,6 +71,8 @@ static final class WindowBoundaryMainSubscriber final AtomicLong windows = new AtomicLong(); + final AtomicBoolean stopWindows = new AtomicBoolean(); + WindowBoundaryMainSubscriber(Subscriber> actual, Publisher open, Function> close, int bufferSize) { super(actual, new MpscLinkedQueue()); @@ -89,14 +91,13 @@ public void onSubscribe(Subscription s) { downstream.onSubscribe(this); - if (cancelled) { + if (stopWindows.get()) { return; } OperatorWindowBoundaryOpenSubscriber os = new OperatorWindowBoundaryOpenSubscriber(this); if (boundary.compareAndSet(null, os)) { - windows.getAndIncrement(); s.request(Long.MAX_VALUE); open.subscribe(os); } @@ -177,7 +178,12 @@ public void request(long n) { @Override public void cancel() { - cancelled = true; + if (stopWindows.compareAndSet(false, true)) { + DisposableHelper.dispose(boundary); + if (windows.decrementAndGet() == 0) { + upstream.cancel(); + } + } } void dispose() { @@ -236,7 +242,7 @@ void drainLoop() { continue; } - if (cancelled) { + if (stopWindows.get()) { continue; } @@ -250,7 +256,7 @@ void drainLoop() { produced(1); } } else { - cancelled = true; + cancel(); a.onError(new MissingBackpressureException("Could not deliver new window due to lack of requests")); continue; } @@ -260,7 +266,7 @@ void drainLoop() { try { p = ObjectHelper.requireNonNull(close.apply(wo.open), "The publisher supplied is null"); } catch (Throwable e) { - cancelled = true; + cancel(); a.onError(e); continue; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java index 1e2a2ea052..d8e745e213 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySelector.java @@ -69,6 +69,8 @@ static final class WindowBoundaryMainObserver final AtomicLong windows = new AtomicLong(); + final AtomicBoolean stopWindows = new AtomicBoolean(); + WindowBoundaryMainObserver(Observer> actual, ObservableSource open, Function> close, int bufferSize) { super(actual, new MpscLinkedQueue()); @@ -87,14 +89,13 @@ public void onSubscribe(Disposable d) { downstream.onSubscribe(this); - if (cancelled) { + if (stopWindows.get()) { return; } OperatorWindowBoundaryOpenObserver os = new OperatorWindowBoundaryOpenObserver(this); if (boundary.compareAndSet(null, os)) { - windows.getAndIncrement(); open.subscribe(os); } } @@ -164,12 +165,17 @@ void error(Throwable t) { @Override public void dispose() { - cancelled = true; + if (stopWindows.compareAndSet(false, true)) { + DisposableHelper.dispose(boundary); + if (windows.decrementAndGet() == 0) { + upstream.dispose(); + } + } } @Override public boolean isDisposed() { - return cancelled; + return stopWindows.get(); } void disposeBoundary() { @@ -229,7 +235,7 @@ void drainLoop() { continue; } - if (cancelled) { + if (stopWindows.get()) { continue; } @@ -244,7 +250,7 @@ void drainLoop() { p = ObjectHelper.requireNonNull(close.apply(wo.open), "The ObservableSource supplied is null"); } catch (Throwable e) { Exceptions.throwIfFatal(e); - cancelled = true; + stopWindows.set(true); a.onError(e); continue; } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java index c1d825afed..1d27381129 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithStartEndFlowableTest.java @@ -17,12 +17,13 @@ import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.*; import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; @@ -254,8 +255,8 @@ public Flowable apply(Integer t) { ts.dispose(); - // FIXME subject has subscribers because of the open window - assertTrue(open.hasSubscribers()); + // Disposing the outer sequence stops the opening of new windows + assertFalse(open.hasSubscribers()); // FIXME subject has subscribers because of the open window assertTrue(close.hasSubscribers()); } @@ -430,4 +431,58 @@ protected void subscribeActual( RxJavaPlugins.reset(); } } + + static Flowable flowableDisposed(final AtomicBoolean ref) { + return Flowable.just(1).concatWith(Flowable.never()) + .doOnCancel(new Action() { + @Override + public void run() throws Exception { + ref.set(true); + } + }); + } + + @Test + public void mainAndBoundaryDisposeOnNoWindows() { + AtomicBoolean mainDisposed = new AtomicBoolean(); + AtomicBoolean openDisposed = new AtomicBoolean(); + final AtomicBoolean closeDisposed = new AtomicBoolean(); + + flowableDisposed(mainDisposed) + .window(flowableDisposed(openDisposed), new Function>() { + @Override + public Flowable apply(Integer v) throws Exception { + return flowableDisposed(closeDisposed); + } + }) + .test() + .assertSubscribed() + .assertNoErrors() + .assertNotComplete() + .dispose(); + + assertTrue(mainDisposed.get()); + assertTrue(openDisposed.get()); + assertTrue(closeDisposed.get()); + } + + @Test + @SuppressWarnings("unchecked") + public void mainWindowMissingBackpressure() { + PublishProcessor source = PublishProcessor.create(); + PublishProcessor boundary = PublishProcessor.create(); + + TestSubscriber> ts = source.window(boundary, Functions.justFunction(Flowable.never())) + .test(0L) + ; + + ts.assertEmpty(); + + boundary.onNext(1); + + ts.assertFailure(MissingBackpressureException.class); + + assertFalse(source.hasSubscribers()); + assertFalse(boundary.hasSubscribers()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java index d1426a5a61..c4f7fa6409 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithStartEndObservableTest.java @@ -17,6 +17,7 @@ import java.util.*; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.*; @@ -256,8 +257,8 @@ public Observable apply(Integer t) { to.dispose(); - // FIXME subject has subscribers because of the open window - assertTrue(open.hasObservers()); + // Disposing the outer sequence stops the opening of new windows + assertFalse(open.hasObservers()); // FIXME subject has subscribers because of the open window assertTrue(close.hasObservers()); } @@ -423,4 +424,38 @@ protected void subscribeActual( RxJavaPlugins.reset(); } } + + static Observable observableDisposed(final AtomicBoolean ref) { + return Observable.just(1).concatWith(Observable.never()) + .doOnDispose(new Action() { + @Override + public void run() throws Exception { + ref.set(true); + } + }); + } + + @Test + public void mainAndBoundaryDisposeOnNoWindows() { + AtomicBoolean mainDisposed = new AtomicBoolean(); + AtomicBoolean openDisposed = new AtomicBoolean(); + final AtomicBoolean closeDisposed = new AtomicBoolean(); + + observableDisposed(mainDisposed) + .window(observableDisposed(openDisposed), new Function>() { + @Override + public ObservableSource apply(Integer v) throws Exception { + return observableDisposed(closeDisposed); + } + }) + .test() + .assertSubscribed() + .assertNoErrors() + .assertNotComplete() + .dispose(); + + assertTrue(mainDisposed.get()); + assertTrue(openDisposed.get()); + assertTrue(closeDisposed.get()); + } }