diff --git a/build.gradle b/build.gradle index 12a5b04978..9da4ae5757 100644 --- a/build.gradle +++ b/build.gradle @@ -57,6 +57,7 @@ def mockitoVersion = "2.1.0" def jmhLibVersion = "1.19" def testNgVersion = "6.11" def guavaVersion = "24.0-jre" +def jacocoVersion = "0.8.0" // -------------------------------------- repositories { @@ -257,7 +258,7 @@ task testng(type: Test) { check.dependsOn testng jacoco { - toolVersion = "0.7.9" // See http://www.eclemma.org/jacoco/. + toolVersion = jacocoVersion // See http://www.eclemma.org/jacoco/. } task GCandMem(dependsOn: "check") doLast { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java index 58cec89232..8c8164f76f 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundary.java @@ -17,186 +17,210 @@ import org.reactivestreams.*; -import io.reactivex.Flowable; -import io.reactivex.disposables.Disposable; +import io.reactivex.*; import io.reactivex.exceptions.MissingBackpressureException; -import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.queue.MpscLinkedQueue; -import io.reactivex.internal.subscribers.QueueDrainSubscriber; import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.NotificationLite; +import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.UnicastProcessor; -import io.reactivex.subscribers.*; +import io.reactivex.subscribers.DisposableSubscriber; public final class FlowableWindowBoundary extends AbstractFlowableWithUpstream> { final Publisher other; - final int bufferSize; + final int capacityHint; - public FlowableWindowBoundary(Flowable source, Publisher other, int bufferSize) { + public FlowableWindowBoundary(Flowable source, Publisher other, int capacityHint) { super(source); this.other = other; - this.bufferSize = bufferSize; + this.capacityHint = capacityHint; } @Override - protected void subscribeActual(Subscriber> s) { - source.subscribe( - new WindowBoundaryMainSubscriber( - new SerializedSubscriber>(s), other, bufferSize)); + protected void subscribeActual(Subscriber> subscriber) { + WindowBoundaryMainSubscriber parent = new WindowBoundaryMainSubscriber(subscriber, capacityHint); + + subscriber.onSubscribe(parent); + + parent.innerNext(); + + other.subscribe(parent.boundarySubscriber); + + source.subscribe(parent); } static final class WindowBoundaryMainSubscriber - extends QueueDrainSubscriber> - implements Subscription { + extends AtomicInteger + implements FlowableSubscriber, Subscription, Runnable { - final Publisher other; - final int bufferSize; + private static final long serialVersionUID = 2233020065421370272L; - Subscription s; + final Subscriber> downstream; - final AtomicReference boundary = new AtomicReference(); + final int capacityHint; - UnicastProcessor window; + final WindowBoundaryInnerSubscriber boundarySubscriber; - static final Object NEXT = new Object(); + final AtomicReference upstream; - final AtomicLong windows = new AtomicLong(); + final AtomicInteger windows; - WindowBoundaryMainSubscriber(Subscriber> actual, Publisher other, - int bufferSize) { - super(actual, new MpscLinkedQueue()); - this.other = other; - this.bufferSize = bufferSize; - windows.lazySet(1); - } + final MpscLinkedQueue queue; - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; + final AtomicThrowable errors; - Subscriber> a = actual; - a.onSubscribe(this); + final AtomicBoolean stopWindows; - if (cancelled) { - return; - } + final AtomicLong requested; - UnicastProcessor w = UnicastProcessor.create(bufferSize); + static final Object NEXT_WINDOW = new Object(); - long r = requested(); - if (r != 0L) { - a.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); - } - } else { - s.cancel(); - a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests")); - return; - } + volatile boolean done; - window = w; + UnicastProcessor window; - WindowBoundaryInnerSubscriber inner = new WindowBoundaryInnerSubscriber(this); + long emitted; + + WindowBoundaryMainSubscriber(Subscriber> downstream, int capacityHint) { + this.downstream = downstream; + this.capacityHint = capacityHint; + this.boundarySubscriber = new WindowBoundaryInnerSubscriber(this); + this.upstream = new AtomicReference(); + this.windows = new AtomicInteger(1); + this.queue = new MpscLinkedQueue(); + this.errors = new AtomicThrowable(); + this.stopWindows = new AtomicBoolean(); + this.requested = new AtomicLong(); + } - if (boundary.compareAndSet(null, inner)) { - windows.getAndIncrement(); - s.request(Long.MAX_VALUE); - other.subscribe(inner); - } + @Override + public void onSubscribe(Subscription d) { + if (SubscriptionHelper.setOnce(upstream, d)) { + d.request(Long.MAX_VALUE); } } @Override public void onNext(T t) { - if (fastEnter()) { - UnicastProcessor w = window; - - w.onNext(t); + queue.offer(t); + drain(); + } - if (leave(-1) == 0) { - return; - } + @Override + public void onError(Throwable e) { + boundarySubscriber.dispose(); + if (errors.addThrowable(e)) { + done = true; + drain(); } else { - queue.offer(NotificationLite.next(t)); - if (!enter()) { - return; - } + RxJavaPlugins.onError(e); } - drainLoop(); } @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - error = t; + public void onComplete() { + boundarySubscriber.dispose(); done = true; - if (enter()) { - drainLoop(); - } + drain(); + } - if (windows.decrementAndGet() == 0) { - DisposableHelper.dispose(boundary); + @Override + public void cancel() { + if (stopWindows.compareAndSet(false, true)) { + boundarySubscriber.dispose(); + if (windows.decrementAndGet() == 0) { + SubscriptionHelper.cancel(upstream); + } } - - actual.onError(t); } @Override - public void onComplete() { - if (done) { - return; - } - done = true; - if (enter()) { - drainLoop(); - } + public void request(long n) { + BackpressureHelper.add(requested, n); + } + @Override + public void run() { if (windows.decrementAndGet() == 0) { - DisposableHelper.dispose(boundary); + SubscriptionHelper.cancel(upstream); } + } - actual.onComplete(); - + void innerNext() { + queue.offer(NEXT_WINDOW); + drain(); } - @Override - public void request(long n) { - requested(n); + void innerError(Throwable e) { + SubscriptionHelper.cancel(upstream); + if (errors.addThrowable(e)) { + done = true; + drain(); + } else { + RxJavaPlugins.onError(e); + } } - @Override - public void cancel() { - cancelled = true; + void innerComplete() { + SubscriptionHelper.cancel(upstream); + done = true; + drain(); } - void drainLoop() { - final SimplePlainQueue q = queue; - final Subscriber> a = actual; + @SuppressWarnings("unchecked") + void drain() { + if (getAndIncrement() != 0) { + return; + } + int missed = 1; - UnicastProcessor w = window; + Subscriber> downstream = this.downstream; + MpscLinkedQueue queue = this.queue; + AtomicThrowable errors = this.errors; + long emitted = this.emitted; + for (;;) { for (;;) { + if (windows.get() == 0) { + queue.clear(); + window = null; + return; + } + + UnicastProcessor w = window; + boolean d = done; - Object o = q.poll(); + if (d && errors.get() != null) { + queue.clear(); + Throwable ex = errors.terminate(); + if (w != null) { + window = null; + w.onError(ex); + } + downstream.onError(ex); + return; + } - boolean empty = o == null; + Object v = queue.poll(); + + boolean empty = v == null; if (d && empty) { - DisposableHelper.dispose(boundary); - Throwable e = error; - if (e != null) { - w.onError(e); + Throwable ex = errors.terminate(); + if (ex == null) { + if (w != null) { + window = null; + w.onComplete(); + } + downstream.onComplete(); } else { - w.onComplete(); + if (w != null) { + window = null; + w.onError(ex); + } + downstream.onError(ex); } return; } @@ -205,59 +229,44 @@ void drainLoop() { break; } - if (o == NEXT) { - w.onComplete(); - - if (windows.decrementAndGet() == 0) { - DisposableHelper.dispose(boundary); - return; - } - - if (cancelled) { - continue; - } + if (v != NEXT_WINDOW) { + w.onNext((T)v); + continue; + } - w = UnicastProcessor.create(bufferSize); + if (w != null) { + window = null; + w.onComplete(); + } - long r = requested(); - if (r != 0L) { - windows.getAndIncrement(); + if (!stopWindows.get()) { + w = UnicastProcessor.create(capacityHint, this); + window = w; + windows.getAndIncrement(); - a.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); - } + if (emitted != requested.get()) { + emitted++; + downstream.onNext(w); } else { - // don't emit new windows - cancelled = true; - a.onError(new MissingBackpressureException("Could not deliver new window due to lack of requests")); - continue; + SubscriptionHelper.cancel(upstream); + boundarySubscriber.dispose(); + errors.addThrowable(new MissingBackpressureException("Could not deliver a window due to lack of requests")); + done = true; } - - window = w; - continue; } - - w.onNext(NotificationLite.getValue(o)); } - missed = leave(-missed); + this.emitted = emitted; + missed = addAndGet(-missed); if (missed == 0) { - return; + break; } } } - - void next() { - queue.offer(NEXT); - if (enter()) { - drainLoop(); - } - } - } static final class WindowBoundaryInnerSubscriber extends DisposableSubscriber { + final WindowBoundaryMainSubscriber parent; boolean done; @@ -271,7 +280,7 @@ public void onNext(B t) { if (done) { return; } - parent.next(); + parent.innerNext(); } @Override @@ -281,7 +290,7 @@ public void onError(Throwable t) { return; } done = true; - parent.onError(t); + parent.innerError(t); } @Override @@ -290,7 +299,7 @@ public void onComplete() { return; } done = true; - parent.onComplete(); + parent.innerComplete(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java index f90d74ad2a..faaf1e7384 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowBoundarySupplier.java @@ -18,201 +18,224 @@ import org.reactivestreams.*; -import io.reactivex.Flowable; +import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; -import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.queue.MpscLinkedQueue; -import io.reactivex.internal.subscribers.QueueDrainSubscriber; import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.NotificationLite; +import io.reactivex.internal.util.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.UnicastProcessor; -import io.reactivex.subscribers.*; +import io.reactivex.subscribers.DisposableSubscriber; public final class FlowableWindowBoundarySupplier extends AbstractFlowableWithUpstream> { final Callable> other; - final int bufferSize; + final int capacityHint; public FlowableWindowBoundarySupplier(Flowable source, - Callable> other, int bufferSize) { + Callable> other, int capacityHint) { super(source); this.other = other; - this.bufferSize = bufferSize; + this.capacityHint = capacityHint; } @Override - protected void subscribeActual(Subscriber> s) { - source.subscribe(new WindowBoundaryMainSubscriber( - new SerializedSubscriber>(s), other, bufferSize)); + protected void subscribeActual(Subscriber> subscriber) { + WindowBoundaryMainSubscriber parent = new WindowBoundaryMainSubscriber(subscriber, capacityHint, other); + + source.subscribe(parent); } static final class WindowBoundaryMainSubscriber - extends QueueDrainSubscriber> - implements Subscription { + extends AtomicInteger + implements FlowableSubscriber, Subscription, Runnable { - final Callable> other; - final int bufferSize; + private static final long serialVersionUID = 2233020065421370272L; - Subscription s; + final Subscriber> downstream; - final AtomicReference boundary = new AtomicReference(); + final int capacityHint; - UnicastProcessor window; + final AtomicReference> boundarySubscriber; - static final Object NEXT = new Object(); + static final WindowBoundaryInnerSubscriber BOUNDARY_DISPOSED = new WindowBoundaryInnerSubscriber(null); - final AtomicLong windows = new AtomicLong(); + final AtomicInteger windows; - WindowBoundaryMainSubscriber(Subscriber> actual, Callable> other, - int bufferSize) { - super(actual, new MpscLinkedQueue()); - this.other = other; - this.bufferSize = bufferSize; - windows.lazySet(1); - } + final MpscLinkedQueue queue; - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; + final AtomicThrowable errors; - Subscriber> a = actual; - a.onSubscribe(this); + final AtomicBoolean stopWindows; - if (cancelled) { - return; - } + final Callable> other; - Publisher p; + static final Object NEXT_WINDOW = new Object(); - try { - p = ObjectHelper.requireNonNull(other.call(), "The first window publisher supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - s.cancel(); - a.onError(e); - return; - } + final AtomicLong requested; - UnicastProcessor w = UnicastProcessor.create(bufferSize); + Subscription upstream; - long r = requested(); - if (r != 0L) { - a.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); - } - } else { - s.cancel(); - a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests")); - return; - } + volatile boolean done; - window = w; + UnicastProcessor window; - WindowBoundaryInnerSubscriber inner = new WindowBoundaryInnerSubscriber(this); + long emitted; - if (boundary.compareAndSet(null, inner)) { - windows.getAndIncrement(); - s.request(Long.MAX_VALUE); - p.subscribe(inner); - } - } + WindowBoundaryMainSubscriber(Subscriber> downstream, int capacityHint, Callable> other) { + this.downstream = downstream; + this.capacityHint = capacityHint; + this.boundarySubscriber = new AtomicReference>(); + this.windows = new AtomicInteger(1); + this.queue = new MpscLinkedQueue(); + this.errors = new AtomicThrowable(); + this.stopWindows = new AtomicBoolean(); + this.other = other; + this.requested = new AtomicLong(); } @Override - public void onNext(T t) { - if (done) { - return; + public void onSubscribe(Subscription d) { + if (SubscriptionHelper.validate(upstream, d)) { + upstream = d; + downstream.onSubscribe(this); + queue.offer(NEXT_WINDOW); + drain(); + d.request(Long.MAX_VALUE); } - if (fastEnter()) { - UnicastProcessor w = window; + } - w.onNext(t); + @Override + public void onNext(T t) { + queue.offer(t); + drain(); + } - if (leave(-1) == 0) { - return; - } + @Override + public void onError(Throwable e) { + disposeBoundary(); + if (errors.addThrowable(e)) { + done = true; + drain(); } else { - queue.offer(NotificationLite.next(t)); - if (!enter()) { - return; - } + RxJavaPlugins.onError(e); } - drainLoop(); } @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - error = t; + public void onComplete() { + disposeBoundary(); done = true; - if (enter()) { - drainLoop(); - } + drain(); + } - if (windows.decrementAndGet() == 0) { - DisposableHelper.dispose(boundary); + @Override + public void cancel() { + if (stopWindows.compareAndSet(false, true)) { + disposeBoundary(); + if (windows.decrementAndGet() == 0) { + upstream.cancel(); + } } - - actual.onError(t); } @Override - public void onComplete() { - if (done) { - return; - } - done = true; - if (enter()) { - drainLoop(); + public void request(long n) { + BackpressureHelper.add(requested, n); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + void disposeBoundary() { + Disposable d = boundarySubscriber.getAndSet((WindowBoundaryInnerSubscriber)BOUNDARY_DISPOSED); + if (d != null && d != BOUNDARY_DISPOSED) { + d.dispose(); } + } + @Override + public void run() { if (windows.decrementAndGet() == 0) { - DisposableHelper.dispose(boundary); + upstream.cancel(); } + } - actual.onComplete(); - + void innerNext(WindowBoundaryInnerSubscriber sender) { + boundarySubscriber.compareAndSet(sender, null); + queue.offer(NEXT_WINDOW); + drain(); } - @Override - public void request(long n) { - requested(n); + void innerError(Throwable e) { + upstream.cancel(); + if (errors.addThrowable(e)) { + done = true; + drain(); + } else { + RxJavaPlugins.onError(e); + } } - @Override - public void cancel() { - cancelled = true; + void innerComplete() { + upstream.cancel(); + done = true; + drain(); } - void drainLoop() { - final SimplePlainQueue q = queue; - final Subscriber> a = actual; + @SuppressWarnings("unchecked") + void drain() { + if (getAndIncrement() != 0) { + return; + } + int missed = 1; - UnicastProcessor w = window; + Subscriber> downstream = this.downstream; + MpscLinkedQueue queue = this.queue; + AtomicThrowable errors = this.errors; + long emitted = this.emitted; + for (;;) { for (;;) { + if (windows.get() == 0) { + queue.clear(); + window = null; + return; + } + + UnicastProcessor w = window; + boolean d = done; - Object o = q.poll(); + if (d && errors.get() != null) { + queue.clear(); + Throwable ex = errors.terminate(); + if (w != null) { + window = null; + w.onError(ex); + } + downstream.onError(ex); + return; + } + + Object v = queue.poll(); - boolean empty = o == null; + boolean empty = v == null; if (d && empty) { - DisposableHelper.dispose(boundary); - Throwable e = error; - if (e != null) { - w.onError(e); + Throwable ex = errors.terminate(); + if (ex == null) { + if (w != null) { + window = null; + w.onComplete(); + } + downstream.onComplete(); } else { - w.onComplete(); + if (w != null) { + window = null; + w.onError(ex); + } + downstream.onError(ex); } return; } @@ -221,73 +244,57 @@ void drainLoop() { break; } - if (o == NEXT) { + if (v != NEXT_WINDOW) { + w.onNext((T)v); + continue; + } + + if (w != null) { + window = null; w.onComplete(); + } - if (windows.decrementAndGet() == 0) { - DisposableHelper.dispose(boundary); - return; - } + if (!stopWindows.get()) { + if (emitted != requested.get()) { + w = UnicastProcessor.create(capacityHint, this); + window = w; + windows.getAndIncrement(); - if (cancelled) { - continue; - } + Publisher otherSource; - Publisher p; + try { + otherSource = ObjectHelper.requireNonNull(other.call(), "The other Callable returned a null Publisher"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + errors.addThrowable(ex); + done = true; + continue; + } - try { - p = ObjectHelper.requireNonNull(other.call(), "The publisher supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - DisposableHelper.dispose(boundary); - a.onError(e); - return; - } + WindowBoundaryInnerSubscriber bo = new WindowBoundaryInnerSubscriber(this); - w = UnicastProcessor.create(bufferSize); + if (boundarySubscriber.compareAndSet(null, bo)) { + otherSource.subscribe(bo); - long r = requested(); - if (r != 0L) { - windows.getAndIncrement(); - - a.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); + emitted++; + downstream.onNext(w); } } else { - // don't emit new windows - cancelled = true; - a.onError(new MissingBackpressureException("Could not deliver new window due to lack of requests")); - continue; - } - - window = w; - - WindowBoundaryInnerSubscriber b = new WindowBoundaryInnerSubscriber(this); - - if (boundary.compareAndSet(boundary.get(), b)) { - p.subscribe(b); + upstream.cancel(); + disposeBoundary(); + errors.addThrowable(new MissingBackpressureException("Could not deliver a window due to lack of requests")); + done = true; } - - continue; } - - w.onNext(NotificationLite.getValue(o)); } - missed = leave(-missed); + this.emitted = emitted; + missed = addAndGet(-missed); if (missed == 0) { - return; + break; } } } - - void next() { - queue.offer(NEXT); - if (enter()) { - drainLoop(); - } - } } static final class WindowBoundaryInnerSubscriber extends DisposableSubscriber { @@ -305,8 +312,8 @@ public void onNext(B t) { return; } done = true; - cancel(); - parent.next(); + dispose(); + parent.innerNext(this); } @Override @@ -316,7 +323,7 @@ public void onError(Throwable t) { return; } done = true; - parent.onError(t); + parent.innerError(t); } @Override @@ -325,7 +332,7 @@ public void onComplete() { return; } done = true; - parent.onComplete(); + parent.innerComplete(); } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java index 7ceccf678f..d9912f0b01 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithFlowableTest.java @@ -19,7 +19,7 @@ import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.junit.Test; import org.reactivestreams.*; @@ -28,6 +28,8 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.*; import io.reactivex.subscribers.*; @@ -198,7 +200,7 @@ public void onComplete() { } @Test - public void testWindowViaFlowableSourceThrows() { + public void testWindowViaFlowableThrows() { PublishProcessor source = PublishProcessor.create(); PublishProcessor boundary = PublishProcessor.create(); @@ -343,8 +345,8 @@ public Flowable call() { boundary.onComplete(); - // FIXME source still active because the open window - assertTrue(source.hasSubscribers()); + + assertFalse(source.hasSubscribers()); assertFalse(boundary.hasSubscribers()); ts.assertComplete(); @@ -371,10 +373,14 @@ public Flowable call() { ts.dispose(); - // FIXME source has subscribers because the open window + assertTrue(source.hasSubscribers()); - // FIXME boundary has subscribers because the open window - assertTrue(boundary.hasSubscribers()); + + assertFalse(boundary.hasSubscribers()); + + ts.values().get(0).test().cancel(); + + assertFalse(source.hasSubscribers()); ts.assertNotComplete(); ts.assertNoErrors(); @@ -503,8 +509,18 @@ public Flowable apply(Flowable v) throws Exception { TestHelper.checkBadSourceFlowable(new Function, Object>() { @Override - public Object apply(Flowable o) throws Exception { - return Flowable.just(1).window(Functions.justCallable(o)).flatMap(new Function, Flowable>() { + public Object apply(final Flowable o) throws Exception { + return Flowable.just(1).window(new Callable>() { + int count; + @Override + public Publisher call() throws Exception { + if (++count > 1) { + return Flowable.never(); + } + return o; + } + }) + .flatMap(new Function, Flowable>() { @Override public Flowable apply(Flowable v) throws Exception { return v; @@ -716,4 +732,625 @@ public Publisher> apply(Flowable f) } }); } + + @Test + public void upstreamDisposedWhenOutputsDisposed() { + PublishProcessor source = PublishProcessor.create(); + PublishProcessor boundary = PublishProcessor.create(); + + TestSubscriber to = source.window(boundary) + .take(1) + .flatMap(new Function, Flowable>() { + @Override + public Flowable apply( + Flowable w) throws Exception { + return w.take(1); + } + }) + .test(); + + source.onNext(1); + + assertFalse("source not disposed", source.hasSubscribers()); + assertFalse("boundary not disposed", boundary.hasSubscribers()); + + to.assertResult(1); + } + + + @Test + public void mainAndBoundaryBothError() { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> ref = new AtomicReference>(); + + TestSubscriber> to = Flowable.error(new TestException("main")) + .window(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + ref.set(observer); + } + }) + .test(); + + to + .assertValueCount(1) + .assertError(TestException.class) + .assertErrorMessage("main") + .assertNotComplete(); + + ref.get().onError(new TestException("inner")); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void mainCompleteBoundaryErrorRace() { + final TestException ex = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + TestSubscriber> to = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + refMain.set(observer); + } + } + .window(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + ref.set(observer); + } + }) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + refMain.get().onComplete(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ref.get().onError(ex); + } + }; + + TestHelper.race(r1, r2); + + to + .assertValueCount(1) + .assertTerminated(); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void mainNextBoundaryNextRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + TestSubscriber> to = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + refMain.set(observer); + } + } + .window(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + ref.set(observer); + } + }) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + refMain.get().onNext(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ref.get().onNext(1); + } + }; + + TestHelper.race(r1, r2); + + to + .assertValueCount(2) + .assertNotComplete() + .assertNoErrors(); + } + } + + @Test + public void takeOneAnotherBoundary() { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + TestSubscriber> to = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + refMain.set(observer); + } + } + .window(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + ref.set(observer); + } + }) + .test(); + + to.assertValueCount(1) + .assertNotTerminated() + .cancel(); + + ref.get().onNext(1); + + to.assertValueCount(1) + .assertNotTerminated(); + } + + @Test + public void disposeMainBoundaryCompleteRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + final TestSubscriber> to = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + refMain.set(observer); + } + } + .window(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + final AtomicInteger counter = new AtomicInteger(); + observer.onSubscribe(new Subscription() { + + @Override + public void cancel() { + // about a microsecond + for (int i = 0; i < 100; i++) { + counter.incrementAndGet(); + } + } + + @Override + public void request(long n) { + } + }); + ref.set(observer); + } + }) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + Subscriber o = ref.get(); + o.onNext(1); + o.onComplete(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void disposeMainBoundaryErrorRace() { + final TestException ex = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + final TestSubscriber> to = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + refMain.set(observer); + } + } + .window(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + final AtomicInteger counter = new AtomicInteger(); + observer.onSubscribe(new Subscription() { + + @Override + public void cancel() { + // about a microsecond + for (int i = 0; i < 100; i++) { + counter.incrementAndGet(); + } + } + + @Override + public void request(long n) { + } + }); + ref.set(observer); + } + }) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + Subscriber o = ref.get(); + o.onNext(1); + o.onError(ex); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void boundarySupplierDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Flowable>>() { + @Override + public Flowable> apply(Flowable f) + throws Exception { + return f.window(Functions.justCallable(Flowable.never())).takeLast(1); + } + }); + } + + @Test + public void selectorUpstreamDisposedWhenOutputsDisposed() { + PublishProcessor source = PublishProcessor.create(); + PublishProcessor boundary = PublishProcessor.create(); + + TestSubscriber to = source.window(Functions.justCallable(boundary)) + .take(1) + .flatMap(new Function, Flowable>() { + @Override + public Flowable apply( + Flowable w) throws Exception { + return w.take(1); + } + }) + .test(); + + source.onNext(1); + + assertFalse("source not disposed", source.hasSubscribers()); + assertFalse("boundary not disposed", boundary.hasSubscribers()); + + to.assertResult(1); + } + + @Test + public void supplierMainAndBoundaryBothError() { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> ref = new AtomicReference>(); + + TestSubscriber> to = Flowable.error(new TestException("main")) + .window(Functions.justCallable(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + ref.set(observer); + } + })) + .test(); + + to + .assertValueCount(1) + .assertError(TestException.class) + .assertErrorMessage("main") + .assertNotComplete(); + + ref.get().onError(new TestException("inner")); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void supplierMainCompleteBoundaryErrorRace() { + final TestException ex = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + TestSubscriber> to = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + refMain.set(observer); + } + } + .window(Functions.justCallable(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + ref.set(observer); + } + })) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + refMain.get().onComplete(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ref.get().onError(ex); + } + }; + + TestHelper.race(r1, r2); + + to + .assertValueCount(1) + .assertTerminated(); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void supplierMainNextBoundaryNextRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + TestSubscriber> to = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + refMain.set(observer); + } + } + .window(Functions.justCallable(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + ref.set(observer); + } + })) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + refMain.get().onNext(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ref.get().onNext(1); + } + }; + + TestHelper.race(r1, r2); + + to + .assertValueCount(2) + .assertNotComplete() + .assertNoErrors(); + } + } + + @Test + public void supplierTakeOneAnotherBoundary() { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + TestSubscriber> to = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + refMain.set(observer); + } + } + .window(Functions.justCallable(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + ref.set(observer); + } + })) + .test(); + + to.assertValueCount(1) + .assertNotTerminated() + .cancel(); + + ref.get().onNext(1); + + to.assertValueCount(1) + .assertNotTerminated(); + } + + @Test + public void supplierDisposeMainBoundaryCompleteRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + final TestSubscriber> to = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + refMain.set(observer); + } + } + .window(Functions.justCallable(new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + final AtomicInteger counter = new AtomicInteger(); + observer.onSubscribe(new Subscription() { + + @Override + public void cancel() { + // about a microsecond + for (int i = 0; i < 100; i++) { + counter.incrementAndGet(); + } + } + + @Override + public void request(long n) { + } + }); + ref.set(observer); + } + })) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + Subscriber o = ref.get(); + o.onNext(1); + o.onComplete(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void supplierDisposeMainBoundaryErrorRace() { + final TestException ex = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + final TestSubscriber> to = new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + refMain.set(observer); + } + } + .window(new Callable>() { + int count; + @Override + public Flowable call() throws Exception { + if (++count > 1) { + return Flowable.never(); + } + return (new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + final AtomicInteger counter = new AtomicInteger(); + observer.onSubscribe(new Subscription() { + + @Override + public void cancel() { + // about a microsecond + for (int i = 0; i < 100; i++) { + counter.incrementAndGet(); + } + } + + @Override + public void request(long n) { + } + }); + ref.set(observer); + } + }); + } + }) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + Subscriber o = ref.get(); + o.onNext(1); + o.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + }