diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundary.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundary.java index d78570ee22..a0fb1f9f10 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundary.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundary.java @@ -18,167 +18,199 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.disposables.DisposableHelper; -import io.reactivex.internal.observers.QueueDrainObserver; import io.reactivex.internal.queue.MpscLinkedQueue; -import io.reactivex.internal.util.NotificationLite; -import io.reactivex.observers.*; +import io.reactivex.internal.util.AtomicThrowable; +import io.reactivex.observers.DisposableObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.UnicastSubject; public final class ObservableWindowBoundary extends AbstractObservableWithUpstream> { final ObservableSource other; - final int bufferSize; + final int capacityHint; - public ObservableWindowBoundary(ObservableSource source, ObservableSource other, int bufferSize) { + public ObservableWindowBoundary(ObservableSource source, ObservableSource other, int capacityHint) { super(source); this.other = other; - this.bufferSize = bufferSize; + this.capacityHint = capacityHint; } @Override - public void subscribeActual(Observer> t) { - source.subscribe(new WindowBoundaryMainObserver(new SerializedObserver>(t), other, bufferSize)); + public void subscribeActual(Observer> observer) { + WindowBoundaryMainObserver parent = new WindowBoundaryMainObserver(observer, capacityHint); + + observer.onSubscribe(parent); + other.subscribe(parent.boundaryObserver); + + source.subscribe(parent); } static final class WindowBoundaryMainObserver - extends QueueDrainObserver> - implements Disposable { + extends AtomicInteger + implements Observer, Disposable, Runnable { - final ObservableSource other; - final int bufferSize; + private static final long serialVersionUID = 2233020065421370272L; - Disposable s; + final Observer> downstream; - final AtomicReference boundary = new AtomicReference(); + final int capacityHint; - UnicastSubject window; + final WindowBoundaryInnerObserver boundaryObserver; - static final Object NEXT = new Object(); + final AtomicReference upstream; - final AtomicLong windows = new AtomicLong(); + final AtomicInteger windows; - WindowBoundaryMainObserver(Observer> actual, ObservableSource other, - int bufferSize) { - super(actual, new MpscLinkedQueue()); - this.other = other; - this.bufferSize = bufferSize; - windows.lazySet(1); - } + final MpscLinkedQueue queue; - @Override - public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; + final AtomicThrowable errors; - Observer> a = actual; - a.onSubscribe(this); + final AtomicBoolean stopWindows; - if (cancelled) { - return; - } + static final Object NEXT_WINDOW = new Object(); - UnicastSubject w = UnicastSubject.create(bufferSize); + volatile boolean done; - window = w; + UnicastSubject window; - a.onNext(w); + WindowBoundaryMainObserver(Observer> downstream, int capacityHint) { + this.downstream = downstream; + this.capacityHint = capacityHint; + this.boundaryObserver = new WindowBoundaryInnerObserver(this); + this.upstream = new AtomicReference(); + this.windows = new AtomicInteger(1); + this.queue = new MpscLinkedQueue(); + this.errors = new AtomicThrowable(); + this.stopWindows = new AtomicBoolean(); + } - WindowBoundaryInnerObserver inner = new WindowBoundaryInnerObserver(this); + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(upstream, d)) { - if (boundary.compareAndSet(null, inner)) { - windows.getAndIncrement(); - other.subscribe(inner); - } + innerNext(); } } @Override public void onNext(T t) { - if (fastEnter()) { - UnicastSubject w = window; - - w.onNext(t); + queue.offer(t); + drain(); + } - if (leave(-1) == 0) { - return; - } + @Override + public void onError(Throwable e) { + boundaryObserver.dispose(); + if (errors.addThrowable(e)) { + done = true; + drain(); } else { - queue.offer(NotificationLite.next(t)); - if (!enter()) { - return; - } + RxJavaPlugins.onError(e); } - drainLoop(); } @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - error = t; + public void onComplete() { + boundaryObserver.dispose(); done = true; - if (enter()) { - drainLoop(); - } + drain(); + } - if (windows.decrementAndGet() == 0) { - DisposableHelper.dispose(boundary); + @Override + public void dispose() { + if (stopWindows.compareAndSet(false, true)) { + boundaryObserver.dispose(); + if (windows.decrementAndGet() == 0) { + DisposableHelper.dispose(upstream); + } } - - actual.onError(t); } @Override - public void onComplete() { - if (done) { - return; - } - done = true; - if (enter()) { - drainLoop(); - } + public boolean isDisposed() { + return stopWindows.get(); + } + @Override + public void run() { if (windows.decrementAndGet() == 0) { - DisposableHelper.dispose(boundary); + DisposableHelper.dispose(upstream); } + } - actual.onComplete(); - + void innerNext() { + queue.offer(NEXT_WINDOW); + drain(); } - @Override - public void dispose() { - cancelled = true; + void innerError(Throwable e) { + DisposableHelper.dispose(upstream); + if (errors.addThrowable(e)) { + done = true; + drain(); + } else { + RxJavaPlugins.onError(e); + } } - @Override - public boolean isDisposed() { - return cancelled; + void innerComplete() { + DisposableHelper.dispose(upstream); + done = true; + drain(); } - void drainLoop() { - final MpscLinkedQueue q = (MpscLinkedQueue)queue; - final Observer> a = actual; + @SuppressWarnings("unchecked") + void drain() { + if (getAndIncrement() != 0) { + return; + } + int missed = 1; - UnicastSubject w = window; + Observer> downstream = this.downstream; + MpscLinkedQueue queue = this.queue; + AtomicThrowable errors = this.errors; + for (;;) { for (;;) { + if (windows.get() == 0) { + queue.clear(); + window = null; + return; + } + + UnicastSubject w = window; + boolean d = done; - Object o = q.poll(); + if (d && errors.get() != null) { + queue.clear(); + Throwable ex = errors.terminate(); + if (w != null) { + window = null; + w.onError(ex); + } + downstream.onError(ex); + return; + } + + Object v = queue.poll(); - boolean empty = o == null; + boolean empty = v == null; if (d && empty) { - DisposableHelper.dispose(boundary); - Throwable e = error; - if (e != null) { - w.onError(e); + Throwable ex = errors.terminate(); + if (ex == null) { + if (w != null) { + window = null; + w.onComplete(); + } + downstream.onComplete(); } else { - w.onComplete(); + if (w != null) { + window = null; + w.onError(ex); + } + downstream.onError(ex); } return; } @@ -187,48 +219,35 @@ void drainLoop() { break; } - if (o == NEXT) { - w.onComplete(); - - if (windows.decrementAndGet() == 0) { - DisposableHelper.dispose(boundary); - return; - } - - if (cancelled) { - continue; - } - - w = UnicastSubject.create(bufferSize); + if (v != NEXT_WINDOW) { + w.onNext((T)v); + continue; + } - windows.getAndIncrement(); + if (w != null) { + window = null; + w.onComplete(); + } + if (!stopWindows.get()) { + w = UnicastSubject.create(capacityHint, this); window = w; + windows.getAndIncrement(); - a.onNext(w); - - continue; + downstream.onNext(w); } - - w.onNext(NotificationLite.getValue(o)); } - missed = leave(-missed); + missed = addAndGet(-missed); if (missed == 0) { - return; + break; } } } - - void next() { - queue.offer(NEXT); - if (enter()) { - drainLoop(); - } - } } static final class WindowBoundaryInnerObserver extends DisposableObserver { + final WindowBoundaryMainObserver parent; boolean done; @@ -242,7 +261,7 @@ public void onNext(B t) { if (done) { return; } - parent.next(); + parent.innerNext(); } @Override @@ -252,7 +271,7 @@ public void onError(Throwable t) { return; } done = true; - parent.onError(t); + parent.innerError(t); } @Override @@ -261,7 +280,7 @@ public void onComplete() { return; } done = true; - parent.onComplete(); + parent.innerComplete(); } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java index 47b0841262..c2d319032b 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowBoundarySupplier.java @@ -21,179 +21,213 @@ import io.reactivex.exceptions.Exceptions; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.observers.QueueDrainObserver; import io.reactivex.internal.queue.MpscLinkedQueue; -import io.reactivex.internal.util.NotificationLite; -import io.reactivex.observers.*; +import io.reactivex.internal.util.AtomicThrowable; +import io.reactivex.observers.DisposableObserver; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.UnicastSubject; public final class ObservableWindowBoundarySupplier extends AbstractObservableWithUpstream> { final Callable> other; - final int bufferSize; + final int capacityHint; public ObservableWindowBoundarySupplier( ObservableSource source, - Callable> other, int bufferSize) { + Callable> other, int capacityHint) { super(source); this.other = other; - this.bufferSize = bufferSize; + this.capacityHint = capacityHint; } @Override - public void subscribeActual(Observer> t) { - source.subscribe(new WindowBoundaryMainObserver(new SerializedObserver>(t), other, bufferSize)); + public void subscribeActual(Observer> observer) { + WindowBoundaryMainObserver parent = new WindowBoundaryMainObserver(observer, capacityHint, other); + + source.subscribe(parent); } static final class WindowBoundaryMainObserver - extends QueueDrainObserver> - implements Disposable { + extends AtomicInteger + implements Observer, Disposable, Runnable { - final Callable> other; - final int bufferSize; + private static final long serialVersionUID = 2233020065421370272L; - Disposable s; + final Observer> downstream; - final AtomicReference boundary = new AtomicReference(); + final int capacityHint; - UnicastSubject window; + final AtomicReference> boundaryObserver; - static final Object NEXT = new Object(); + static final WindowBoundaryInnerObserver BOUNDARY_DISPOSED = new WindowBoundaryInnerObserver(null); - final AtomicLong windows = new AtomicLong(); + final AtomicInteger windows; - WindowBoundaryMainObserver(Observer> actual, Callable> other, - int bufferSize) { - super(actual, new MpscLinkedQueue()); - this.other = other; - this.bufferSize = bufferSize; - windows.lazySet(1); - } + final MpscLinkedQueue queue; - @Override - public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; + final AtomicThrowable errors; - Observer> a = actual; - a.onSubscribe(this); + final AtomicBoolean stopWindows; - if (cancelled) { - return; - } + final Callable> other; - ObservableSource p; + static final Object NEXT_WINDOW = new Object(); - try { - p = ObjectHelper.requireNonNull(other.call(), "The first window ObservableSource supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - s.dispose(); - a.onError(e); - return; - } + Disposable upstream; - UnicastSubject w = UnicastSubject.create(bufferSize); + volatile boolean done; - window = w; - - a.onNext(w); + UnicastSubject window; - WindowBoundaryInnerObserver inner = new WindowBoundaryInnerObserver(this); + WindowBoundaryMainObserver(Observer> downstream, int capacityHint, Callable> other) { + this.downstream = downstream; + this.capacityHint = capacityHint; + this.boundaryObserver = new AtomicReference>(); + this.windows = new AtomicInteger(1); + this.queue = new MpscLinkedQueue(); + this.errors = new AtomicThrowable(); + this.stopWindows = new AtomicBoolean(); + this.other = other; + } - if (boundary.compareAndSet(null, inner)) { - windows.getAndIncrement(); - p.subscribe(inner); - } + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(upstream, d)) { + upstream = d; + downstream.onSubscribe(this); + queue.offer(NEXT_WINDOW); + drain(); } } @Override public void onNext(T t) { - if (fastEnter()) { - UnicastSubject w = window; - - w.onNext(t); + queue.offer(t); + drain(); + } - if (leave(-1) == 0) { - return; - } + @Override + public void onError(Throwable e) { + disposeBoundary(); + if (errors.addThrowable(e)) { + done = true; + drain(); } else { - queue.offer(NotificationLite.next(t)); - if (!enter()) { - return; - } + RxJavaPlugins.onError(e); } - drainLoop(); } @Override - public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - error = t; + public void onComplete() { + disposeBoundary(); done = true; - if (enter()) { - drainLoop(); - } + drain(); + } - if (windows.decrementAndGet() == 0) { - DisposableHelper.dispose(boundary); + @Override + public void dispose() { + if (stopWindows.compareAndSet(false, true)) { + disposeBoundary(); + if (windows.decrementAndGet() == 0) { + upstream.dispose(); + } } + } - actual.onError(t); + @SuppressWarnings({ "rawtypes", "unchecked" }) + void disposeBoundary() { + Disposable d = boundaryObserver.getAndSet((WindowBoundaryInnerObserver)BOUNDARY_DISPOSED); + if (d != null && d != BOUNDARY_DISPOSED) { + d.dispose(); + } } @Override - public void onComplete() { - if (done) { - return; - } - done = true; - if (enter()) { - drainLoop(); - } + public boolean isDisposed() { + return stopWindows.get(); + } + @Override + public void run() { if (windows.decrementAndGet() == 0) { - DisposableHelper.dispose(boundary); + upstream.dispose(); } + } - actual.onComplete(); - + void innerNext(WindowBoundaryInnerObserver sender) { + boundaryObserver.compareAndSet(sender, null); + queue.offer(NEXT_WINDOW); + drain(); } - @Override - public void dispose() { - cancelled = true; + void innerError(Throwable e) { + upstream.dispose(); + if (errors.addThrowable(e)) { + done = true; + drain(); + } else { + RxJavaPlugins.onError(e); + } } - @Override - public boolean isDisposed() { - return cancelled; + void innerComplete() { + upstream.dispose(); + done = true; + drain(); } - void drainLoop() { - final MpscLinkedQueue q = (MpscLinkedQueue)queue; - final Observer> a = actual; + @SuppressWarnings("unchecked") + void drain() { + if (getAndIncrement() != 0) { + return; + } + int missed = 1; - UnicastSubject w = window; + Observer> downstream = this.downstream; + MpscLinkedQueue queue = this.queue; + AtomicThrowable errors = this.errors; + for (;;) { for (;;) { + if (windows.get() == 0) { + queue.clear(); + window = null; + return; + } + + UnicastSubject w = window; + boolean d = done; - Object o = q.poll(); - boolean empty = o == null; + if (d && errors.get() != null) { + queue.clear(); + Throwable ex = errors.terminate(); + if (w != null) { + window = null; + w.onError(ex); + } + downstream.onError(ex); + return; + } + + Object v = queue.poll(); + + boolean empty = v == null; if (d && empty) { - DisposableHelper.dispose(boundary); - Throwable e = error; - if (e != null) { - w.onError(e); + Throwable ex = errors.terminate(); + if (ex == null) { + if (w != null) { + window = null; + w.onComplete(); + } + downstream.onComplete(); } else { - w.onComplete(); + if (w != null) { + window = null; + w.onError(ex); + } + downstream.onError(ex); } return; } @@ -202,62 +236,48 @@ void drainLoop() { break; } - if (o == NEXT) { - w.onComplete(); + if (v != NEXT_WINDOW) { + w.onNext((T)v); + continue; + } - if (windows.decrementAndGet() == 0) { - DisposableHelper.dispose(boundary); - return; - } + if (w != null) { + window = null; + w.onComplete(); + } - if (cancelled) { - continue; - } + if (!stopWindows.get()) { + w = UnicastSubject.create(capacityHint, this); + window = w; + windows.getAndIncrement(); - ObservableSource p; + ObservableSource otherSource; try { - p = ObjectHelper.requireNonNull(other.call(), "The ObservableSource supplied is null"); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - DisposableHelper.dispose(boundary); - a.onError(e); - return; + otherSource = ObjectHelper.requireNonNull(other.call(), "The other Callable returned a null ObservableSource"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + errors.addThrowable(ex); + done = true; + continue; } - w = UnicastSubject.create(bufferSize); + WindowBoundaryInnerObserver bo = new WindowBoundaryInnerObserver(this); - windows.getAndIncrement(); + if (boundaryObserver.compareAndSet(null, bo)) { + otherSource.subscribe(bo); - window = w; - - a.onNext(w); - - WindowBoundaryInnerObserver b = new WindowBoundaryInnerObserver(this); - - if (boundary.compareAndSet(boundary.get(), b)) { - p.subscribe(b); + downstream.onNext(w); } - - continue; } - - w.onNext(NotificationLite.getValue(o)); } - missed = leave(-missed); + missed = addAndGet(-missed); if (missed == 0) { - return; + break; } } } - - void next() { - queue.offer(NEXT); - if (enter()) { - drainLoop(); - } - } } static final class WindowBoundaryInnerObserver extends DisposableObserver { @@ -276,7 +296,7 @@ public void onNext(B t) { } done = true; dispose(); - parent.next(); + parent.innerNext(this); } @Override @@ -286,7 +306,7 @@ public void onError(Throwable t) { return; } done = true; - parent.onError(t); + parent.innerError(t); } @Override @@ -295,7 +315,7 @@ public void onComplete() { return; } done = true; - parent.onComplete(); + parent.innerComplete(); } } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java index 8319a74d97..6c084f01d6 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithObservableTest.java @@ -19,17 +19,19 @@ import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; import org.junit.Test; import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; +import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; import io.reactivex.observers.*; +import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subjects.*; public class ObservableWindowWithObservableTest { @@ -346,8 +348,8 @@ public Observable call() { boundary.onComplete(); - // FIXME source still active because the open window - assertTrue(source.hasObservers()); + + assertFalse(source.hasObservers()); assertFalse(boundary.hasObservers()); ts.assertComplete(); @@ -374,10 +376,13 @@ public Observable call() { ts.dispose(); - // FIXME source has subscribers because the open window assertTrue(source.hasObservers()); - // FIXME boundary has subscribers because the open window - assertTrue(boundary.hasObservers()); + + assertFalse(boundary.hasObservers()); + + ts.values().get(0).test(true); + + assertFalse(source.hasObservers()); ts.assertNotComplete(); ts.assertNoErrors(); @@ -638,4 +643,627 @@ public Observable> apply(Observable f) } }); } + + @Test + public void upstreamDisposedWhenOutputsDisposed() { + PublishSubject source = PublishSubject.create(); + PublishSubject boundary = PublishSubject.create(); + + TestObserver to = source.window(boundary) + .take(1) + .flatMap(new Function, ObservableSource>() { + @Override + public ObservableSource apply( + Observable w) throws Exception { + return w.take(1); + } + }) + .test(); + + source.onNext(1); + + assertFalse("source not disposed", source.hasObservers()); + assertFalse("boundary not disposed", boundary.hasObservers()); + + to.assertResult(1); + } + + @Test + public void mainAndBoundaryBothError() { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> ref = new AtomicReference>(); + + TestObserver> to = Observable.error(new TestException("main")) + .window(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + ref.set(observer); + } + }) + .test(); + + to + .assertValueCount(1) + .assertError(TestException.class) + .assertErrorMessage("main") + .assertNotComplete(); + + ref.get().onError(new TestException("inner")); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void mainCompleteBoundaryErrorRace() { + final TestException ex = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + TestObserver> to = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + refMain.set(observer); + } + } + .window(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + ref.set(observer); + } + }) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + refMain.get().onComplete(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ref.get().onError(ex); + } + }; + + TestHelper.race(r1, r2); + + to + .assertValueCount(1) + .assertTerminated(); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void mainNextBoundaryNextRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + TestObserver> to = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + refMain.set(observer); + } + } + .window(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + ref.set(observer); + } + }) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + refMain.get().onNext(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ref.get().onNext(1); + } + }; + + TestHelper.race(r1, r2); + + to + .assertValueCount(2) + .assertNotComplete() + .assertNoErrors(); + } + } + + @Test + public void takeOneAnotherBoundary() { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + TestObserver> to = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + refMain.set(observer); + } + } + .window(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + ref.set(observer); + } + }) + .test(); + + to.assertValueCount(1) + .assertNotTerminated() + .cancel(); + + ref.get().onNext(1); + + to.assertValueCount(1) + .assertNotTerminated(); + } + + @Test + public void disposeMainBoundaryCompleteRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + final TestObserver> to = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + refMain.set(observer); + } + } + .window(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + final AtomicInteger counter = new AtomicInteger(); + observer.onSubscribe(new Disposable() { + + @Override + public void dispose() { + // about a microsecond + for (int i = 0; i < 100; i++) { + counter.incrementAndGet(); + } + } + + @Override + public boolean isDisposed() { + return false; + } + }); + ref.set(observer); + } + }) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + Observer o = ref.get(); + o.onNext(1); + o.onComplete(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void disposeMainBoundaryErrorRace() { + final TestException ex = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + final TestObserver> to = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + refMain.set(observer); + } + } + .window(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + final AtomicInteger counter = new AtomicInteger(); + observer.onSubscribe(new Disposable() { + + @Override + public void dispose() { + // about a microsecond + for (int i = 0; i < 100; i++) { + counter.incrementAndGet(); + } + } + + @Override + public boolean isDisposed() { + return false; + } + }); + ref.set(observer); + } + }) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + Observer o = ref.get(); + o.onNext(1); + o.onError(ex); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void boundarySupplierDoubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeObservable(new Function, Observable>>() { + @Override + public Observable> apply(Observable f) + throws Exception { + return f.window(Functions.justCallable(Observable.never())).takeLast(1); + } + }); + } + + @Test + public void selectorUpstreamDisposedWhenOutputsDisposed() { + PublishSubject source = PublishSubject.create(); + PublishSubject boundary = PublishSubject.create(); + + TestObserver to = source.window(Functions.justCallable(boundary)) + .take(1) + .flatMap(new Function, ObservableSource>() { + @Override + public ObservableSource apply( + Observable w) throws Exception { + return w.take(1); + } + }) + .test(); + + source.onNext(1); + + assertFalse("source not disposed", source.hasObservers()); + assertFalse("boundary not disposed", boundary.hasObservers()); + + to.assertResult(1); + } + + @Test + public void supplierMainAndBoundaryBothError() { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> ref = new AtomicReference>(); + + TestObserver> to = Observable.error(new TestException("main")) + .window(Functions.justCallable(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + ref.set(observer); + } + })) + .test(); + + to + .assertValueCount(1) + .assertError(TestException.class) + .assertErrorMessage("main") + .assertNotComplete(); + + ref.get().onError(new TestException("inner")); + + TestHelper.assertUndeliverable(errors, 0, TestException.class, "inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void supplierMainCompleteBoundaryErrorRace() { + final TestException ex = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + TestObserver> to = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + refMain.set(observer); + } + } + .window(Functions.justCallable(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + ref.set(observer); + } + })) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + refMain.get().onComplete(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ref.get().onError(ex); + } + }; + + TestHelper.race(r1, r2); + + to + .assertValueCount(1) + .assertTerminated(); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } + + @Test + public void supplierMainNextBoundaryNextRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + TestObserver> to = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + refMain.set(observer); + } + } + .window(Functions.justCallable(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + ref.set(observer); + } + })) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + refMain.get().onNext(1); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + ref.get().onNext(1); + } + }; + + TestHelper.race(r1, r2); + + to + .assertValueCount(2) + .assertNotComplete() + .assertNoErrors(); + } + } + + @Test + public void supplierTakeOneAnotherBoundary() { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + TestObserver> to = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + refMain.set(observer); + } + } + .window(Functions.justCallable(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + ref.set(observer); + } + })) + .test(); + + to.assertValueCount(1) + .assertNotTerminated() + .cancel(); + + ref.get().onNext(1); + + to.assertValueCount(1) + .assertNotTerminated(); + } + + @Test + public void supplierDisposeMainBoundaryCompleteRace() { + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + final TestObserver> to = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + refMain.set(observer); + } + } + .window(Functions.justCallable(new Observable() { + @Override + protected void subscribeActual(Observer observer) { + final AtomicInteger counter = new AtomicInteger(); + observer.onSubscribe(new Disposable() { + + @Override + public void dispose() { + // about a microsecond + for (int i = 0; i < 100; i++) { + counter.incrementAndGet(); + } + } + + @Override + public boolean isDisposed() { + return false; + } + }); + ref.set(observer); + } + })) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + Observer o = ref.get(); + o.onNext(1); + o.onComplete(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void supplierDisposeMainBoundaryErrorRace() { + final TestException ex = new TestException(); + + for (int i = 0; i < TestHelper.RACE_LONG_LOOPS; i++) { + List errors = TestHelper.trackPluginErrors(); + try { + final AtomicReference> refMain = new AtomicReference>(); + final AtomicReference> ref = new AtomicReference>(); + + final TestObserver> to = new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + refMain.set(observer); + } + } + .window(new Callable>() { + int count; + @Override + public ObservableSource call() throws Exception { + if (++count > 1) { + return Observable.never(); + } + return (new Observable() { + @Override + protected void subscribeActual(Observer observer) { + final AtomicInteger counter = new AtomicInteger(); + observer.onSubscribe(new Disposable() { + + @Override + public void dispose() { + // about a microsecond + for (int i = 0; i < 100; i++) { + counter.incrementAndGet(); + } + } + + @Override + public boolean isDisposed() { + return false; + } + }); + ref.set(observer); + } + }); + } + }) + .test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + to.cancel(); + } + }; + Runnable r2 = new Runnable() { + @Override + public void run() { + Observer o = ref.get(); + o.onNext(1); + o.onError(ex); + } + }; + + TestHelper.race(r1, r2); + + if (!errors.isEmpty()) { + TestHelper.assertUndeliverable(errors, 0, TestException.class); + } + } finally { + RxJavaPlugins.reset(); + } + } + } }