From 5c210338bc984b6a1f2d739cc839c91e0fa76345 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 24 Oct 2019 15:01:01 +0200 Subject: [PATCH] 2.x: Fix window(time) possible interrupts while terminating --- .../flowable/FlowableWindowTimed.java | 40 +-- .../observable/ObservableWindowTimed.java | 35 +-- .../flowable/FlowableWindowWithTimeTest.java | 243 ++++++++++++++++- .../ObservableWindowWithTimeTest.java | 244 +++++++++++++++++- 4 files changed, 503 insertions(+), 59 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java index 00f783a39e..2db0eba8cc 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java @@ -160,7 +160,6 @@ public void onError(Throwable t) { } downstream.onError(t); - dispose(); } @Override @@ -171,7 +170,6 @@ public void onComplete() { } downstream.onComplete(); - dispose(); } @Override @@ -184,22 +182,15 @@ public void cancel() { cancelled = true; } - public void dispose() { - DisposableHelper.dispose(timer); - } - @Override public void run() { - if (cancelled) { terminated = true; - dispose(); } queue.offer(NEXT); if (enter()) { drainLoop(); } - } void drainLoop() { @@ -221,13 +212,13 @@ void drainLoop() { if (d && (o == null || o == NEXT)) { window = null; q.clear(); - dispose(); Throwable err = error; if (err != null) { w.onError(err); } else { w.onComplete(); } + timer.dispose(); return; } @@ -251,8 +242,8 @@ void drainLoop() { window = null; queue.clear(); upstream.cancel(); - dispose(); a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests.")); + timer.dispose(); return; } } else { @@ -396,7 +387,7 @@ public void onNext(T t) { window = null; upstream.cancel(); downstream.onError(new MissingBackpressureException("Could not deliver window due to lack of requests")); - dispose(); + disposeTimer(); return; } } else { @@ -424,7 +415,6 @@ public void onError(Throwable t) { } downstream.onError(t); - dispose(); } @Override @@ -435,7 +425,6 @@ public void onComplete() { } downstream.onComplete(); - dispose(); } @Override @@ -448,8 +437,8 @@ public void cancel() { cancelled = true; } - public void dispose() { - DisposableHelper.dispose(timer); + public void disposeTimer() { + timer.dispose(); Worker w = worker; if (w != null) { w.dispose(); @@ -468,7 +457,7 @@ void drainLoop() { if (terminated) { upstream.cancel(); q.clear(); - dispose(); + disposeTimer(); return; } @@ -488,7 +477,7 @@ void drainLoop() { } else { w.onComplete(); } - dispose(); + disposeTimer(); return; } @@ -515,7 +504,7 @@ void drainLoop() { queue.clear(); upstream.cancel(); a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests.")); - dispose(); + disposeTimer(); return; } } @@ -554,7 +543,7 @@ void drainLoop() { window = null; upstream.cancel(); downstream.onError(new MissingBackpressureException("Could not deliver window due to lack of requests")); - dispose(); + disposeTimer(); return; } } else { @@ -585,7 +574,6 @@ public void run() { p.queue.offer(this); } else { p.terminated = true; - p.dispose(); } if (p.enter()) { p.drainLoop(); @@ -682,7 +670,6 @@ public void onError(Throwable t) { } downstream.onError(t); - dispose(); } @Override @@ -693,7 +680,6 @@ public void onComplete() { } downstream.onComplete(); - dispose(); } @Override @@ -706,10 +692,6 @@ public void cancel() { cancelled = true; } - public void dispose() { - worker.dispose(); - } - void complete(UnicastProcessor w) { queue.offer(new SubjectWork(w, false)); if (enter()) { @@ -730,9 +712,9 @@ void drainLoop() { for (;;) { if (terminated) { upstream.cancel(); - dispose(); q.clear(); ws.clear(); + worker.dispose(); return; } @@ -756,7 +738,7 @@ void drainLoop() { } } ws.clear(); - dispose(); + worker.dispose(); return; } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java index 406d8f03ff..5214d2b225 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java @@ -15,14 +15,13 @@ import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.observers.QueueDrainObserver; import io.reactivex.internal.queue.MpscLinkedQueue; import io.reactivex.internal.util.NotificationLite; @@ -85,7 +84,7 @@ static final class WindowExactUnboundedObserver UnicastSubject window; - final AtomicReference timer = new AtomicReference(); + final SequentialDisposable timer = new SequentialDisposable(); static final Object NEXT = new Object(); @@ -114,7 +113,7 @@ public void onSubscribe(Disposable d) { if (!cancelled) { Disposable task = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit); - DisposableHelper.replace(timer, task); + timer.replace(task); } } } @@ -146,7 +145,6 @@ public void onError(Throwable t) { drainLoop(); } - disposeTimer(); downstream.onError(t); } @@ -157,7 +155,6 @@ public void onComplete() { drainLoop(); } - disposeTimer(); downstream.onComplete(); } @@ -171,15 +168,10 @@ public boolean isDisposed() { return cancelled; } - void disposeTimer() { - DisposableHelper.dispose(timer); - } - @Override public void run() { if (cancelled) { terminated = true; - disposeTimer(); } queue.offer(NEXT); if (enter()) { @@ -206,13 +198,13 @@ void drainLoop() { if (d && (o == null || o == NEXT)) { window = null; q.clear(); - disposeTimer(); Throwable err = error; if (err != null) { w.onError(err); } else { w.onComplete(); } + timer.dispose(); return; } @@ -266,7 +258,7 @@ static final class WindowExactBoundedObserver volatile boolean terminated; - final AtomicReference timer = new AtomicReference(); + final SequentialDisposable timer = new SequentialDisposable(); WindowExactBoundedObserver( Observer> actual, @@ -312,7 +304,7 @@ public void onSubscribe(Disposable d) { task = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit); } - DisposableHelper.replace(timer, task); + timer.replace(task); } } @@ -370,7 +362,6 @@ public void onError(Throwable t) { } downstream.onError(t); - disposeTimer(); } @Override @@ -381,7 +372,6 @@ public void onComplete() { } downstream.onComplete(); - disposeTimer(); } @Override @@ -428,13 +418,13 @@ void drainLoop() { if (d && (empty || isHolder)) { window = null; q.clear(); - disposeTimer(); Throwable err = error; if (err != null) { w.onError(err); } else { w.onComplete(); } + disposeTimer(); return; } @@ -507,7 +497,6 @@ public void run() { p.queue.offer(this); } else { p.terminated = true; - p.disposeTimer(); } if (p.enter()) { p.drainLoop(); @@ -592,7 +581,6 @@ public void onError(Throwable t) { } downstream.onError(t); - disposeWorker(); } @Override @@ -603,7 +591,6 @@ public void onComplete() { } downstream.onComplete(); - disposeWorker(); } @Override @@ -616,10 +603,6 @@ public boolean isDisposed() { return cancelled; } - void disposeWorker() { - worker.dispose(); - } - void complete(UnicastSubject w) { queue.offer(new SubjectWork(w, false)); if (enter()) { @@ -640,9 +623,9 @@ void drainLoop() { for (;;) { if (terminated) { upstream.dispose(); - disposeWorker(); q.clear(); ws.clear(); + worker.dispose(); return; } @@ -665,8 +648,8 @@ void drainLoop() { w.onComplete(); } } - disposeWorker(); ws.clear(); + worker.dispose(); return; } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java index 37d8928571..56e1a736e6 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java @@ -16,7 +16,7 @@ import static org.junit.Assert.*; import java.util.*; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.*; import org.junit.*; @@ -918,5 +918,244 @@ public void nextWindowMissingBackpressureDrainOnTime() { .assertError(MissingBackpressureException.class) .assertNotComplete(); } -} + @Test + public void exactTimeBoundNoInterruptWindowOutputOnComplete() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishProcessor pp = PublishProcessor.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + pp.window(100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Flowable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + pp.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + pp.onComplete(); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void exactTimeBoundNoInterruptWindowOutputOnError() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishProcessor pp = PublishProcessor.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + pp.window(100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Flowable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + pp.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + pp.onError(new TestException()); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void exactTimeAndSizeBoundNoInterruptWindowOutputOnComplete() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishProcessor pp = PublishProcessor.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + pp.window(100, TimeUnit.MILLISECONDS, 10) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Flowable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + pp.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + pp.onComplete(); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void exactTimeAndSizeBoundNoInterruptWindowOutputOnError() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishProcessor pp = PublishProcessor.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + pp.window(100, TimeUnit.MILLISECONDS, 10) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Flowable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + pp.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + pp.onError(new TestException()); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void skipTimeAndSizeBoundNoInterruptWindowOutputOnComplete() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishProcessor pp = PublishProcessor.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + pp.window(90, 100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Flowable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + pp.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + pp.onComplete(); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void skipTimeAndSizeBoundNoInterruptWindowOutputOnError() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishProcessor pp = PublishProcessor.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + pp.window(90, 100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Flowable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + pp.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + pp.onError(new TestException()); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java index fbd90088ed..8500d52948 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java @@ -16,8 +16,8 @@ import static org.junit.Assert.*; import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; import org.junit.*; @@ -708,4 +708,244 @@ public void countRestartsOnTimeTick() { .assertNoErrors() .assertNotComplete(); } + + @Test + public void exactTimeBoundNoInterruptWindowOutputOnComplete() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishSubject ps = PublishSubject.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + ps.window(100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Observable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + ps.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + ps.onComplete(); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void exactTimeBoundNoInterruptWindowOutputOnError() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishSubject ps = PublishSubject.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + ps.window(100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Observable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + ps.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + ps.onError(new TestException()); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void exactTimeAndSizeBoundNoInterruptWindowOutputOnComplete() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishSubject ps = PublishSubject.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + ps.window(100, TimeUnit.MILLISECONDS, 10) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Observable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + ps.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + ps.onComplete(); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void exactTimeAndSizeBoundNoInterruptWindowOutputOnError() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishSubject ps = PublishSubject.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + ps.window(100, TimeUnit.MILLISECONDS, 10) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Observable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + ps.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + ps.onError(new TestException()); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void skipTimeAndSizeBoundNoInterruptWindowOutputOnComplete() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishSubject ps = PublishSubject.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + ps.window(90, 100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Observable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + ps.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + ps.onComplete(); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } + + @Test + public void skipTimeAndSizeBoundNoInterruptWindowOutputOnError() throws Exception { + final AtomicBoolean isInterrupted = new AtomicBoolean(); + + final PublishSubject ps = PublishSubject.create(); + + final CountDownLatch doOnNextDone = new CountDownLatch(1); + final CountDownLatch secondWindowProcessing = new CountDownLatch(1); + + ps.window(90, 100, TimeUnit.MILLISECONDS) + .doOnNext(new Consumer>() { + int count; + @Override + public void accept(Observable v) throws Exception { + System.out.println(Thread.currentThread()); + if (count++ == 1) { + secondWindowProcessing.countDown(); + try { + Thread.sleep(200); + isInterrupted.set(Thread.interrupted()); + } catch (InterruptedException ex) { + isInterrupted.set(true); + } + doOnNextDone.countDown(); + } + } + }) + .test(); + + ps.onNext(1); + + assertTrue(secondWindowProcessing.await(5, TimeUnit.SECONDS)); + + ps.onError(new TestException()); + + assertTrue(doOnNextDone.await(5, TimeUnit.SECONDS)); + + assertFalse("The doOnNext got interrupted!", isInterrupted.get()); + } }