From e215ff3df6095d473e6bfcd2e62ef4b4f7b070eb Mon Sep 17 00:00:00 2001 From: David Karnok Date: Wed, 22 Mar 2017 20:54:34 +0100 Subject: [PATCH] 2.x: fix window() with time+size emission problems (#5213) --- .../flowable/FlowableWindowTimed.java | 21 ++-- .../observable/ObservableWindowTimed.java | 21 ++-- .../flowable/FlowableWindowWithTimeTest.java | 103 ++++++++++++++++++ .../ObservableWindowWithTimeTest.java | 103 ++++++++++++++++++ 4 files changed, 233 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java index e5b7c09cc8..28193e6a77 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java @@ -281,6 +281,7 @@ static final class WindowExactBoundedSubscriber final int bufferSize; final boolean restartTimerOnMaxSize; final long maxSize; + final Scheduler.Worker worker; long count; @@ -290,8 +291,6 @@ static final class WindowExactBoundedSubscriber UnicastProcessor window; - Scheduler.Worker worker; - volatile boolean terminated; final SequentialDisposable timer = new SequentialDisposable(); @@ -307,6 +306,11 @@ static final class WindowExactBoundedSubscriber this.bufferSize = bufferSize; this.maxSize = maxSize; this.restartTimerOnMaxSize = restartTimerOnMaxSize; + if (restartTimerOnMaxSize) { + worker = scheduler.createWorker(); + } else { + worker = null; + } } @Override @@ -342,10 +346,7 @@ public void onSubscribe(Subscription s) { Disposable d; ConsumerIndexHolder consumerIndexHolder = new ConsumerIndexHolder(producerIndex, this); if (restartTimerOnMaxSize) { - Scheduler.Worker sw = scheduler.createWorker(); - worker = sw; - sw.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit); - d = sw; + d = worker.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit); } else { d = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit); } @@ -451,6 +452,10 @@ public void cancel() { public void dispose() { DisposableHelper.dispose(timer); + Worker w = worker; + if (w != null) { + w.dispose(); + } } void drainLoop() { @@ -495,9 +500,9 @@ void drainLoop() { if (isHolder) { ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o; - if (producerIndex == consumerIndexHolder.index) { + if (restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) { w.onComplete(); - + count = 0; w = UnicastProcessor.create(bufferSize); window = w; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java index 64295aa3ae..ee3a28fd62 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableWindowTimed.java @@ -254,6 +254,8 @@ static final class WindowExactBoundedObserver final boolean restartTimerOnMaxSize; final long maxSize; + final Scheduler.Worker worker; + long count; long producerIndex; @@ -262,7 +264,6 @@ static final class WindowExactBoundedObserver UnicastSubject window; - Scheduler.Worker worker; volatile boolean terminated; @@ -279,6 +280,11 @@ static final class WindowExactBoundedObserver this.bufferSize = bufferSize; this.maxSize = maxSize; this.restartTimerOnMaxSize = restartTimerOnMaxSize; + if (restartTimerOnMaxSize) { + worker = scheduler.createWorker(); + } else { + worker = null; + } } @Override @@ -302,10 +308,7 @@ public void onSubscribe(Disposable s) { Disposable d; ConsumerIndexHolder consumerIndexHolder = new ConsumerIndexHolder(producerIndex, this); if (restartTimerOnMaxSize) { - Scheduler.Worker sw = scheduler.createWorker(); - worker = sw; - sw.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit); - d = sw; + d = worker.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit); } else { d = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit); } @@ -394,6 +397,10 @@ public boolean isDisposed() { void disposeTimer() { DisposableHelper.dispose(timer); + Worker w = worker; + if (w != null) { + w.dispose(); + } } void drainLoop() { @@ -438,9 +445,9 @@ void drainLoop() { if (isHolder) { ConsumerIndexHolder consumerIndexHolder = (ConsumerIndexHolder) o; - if (producerIndex == consumerIndexHolder.index) { + if (restartTimerOnMaxSize || producerIndex == consumerIndexHolder.index) { w.onComplete(); - + count = 0; w = UnicastSubject.create(bufferSize); window = w; diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java index 3e3e1e3dc6..74bcd72152 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableWindowWithTimeTest.java @@ -704,4 +704,107 @@ public void sizeTimeTimeout() { ts.values().get(0).test().assertResult(); } + + @Test + public void periodicWindowCompletion() { + TestScheduler scheduler = new TestScheduler(); + FlowableProcessor ps = PublishProcessor.create(); + + TestSubscriber> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, Long.MAX_VALUE, false) + .test(); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + ts.assertValueCount(21) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void periodicWindowCompletionRestartTimer() { + TestScheduler scheduler = new TestScheduler(); + FlowableProcessor ps = PublishProcessor.create(); + + TestSubscriber> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, Long.MAX_VALUE, true) + .test(); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + ts.assertValueCount(21) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void periodicWindowCompletionBounded() { + TestScheduler scheduler = new TestScheduler(); + FlowableProcessor ps = PublishProcessor.create(); + + TestSubscriber> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, false) + .test(); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + ts.assertValueCount(21) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void periodicWindowCompletionRestartTimerBounded() { + TestScheduler scheduler = new TestScheduler(); + FlowableProcessor ps = PublishProcessor.create(); + + TestSubscriber> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true) + .test(); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + ts.assertValueCount(21) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void periodicWindowCompletionRestartTimerBoundedSomeData() { + TestScheduler scheduler = new TestScheduler(); + FlowableProcessor ps = PublishProcessor.create(); + + TestSubscriber> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 2, true) + .test(); + + ps.onNext(1); + ps.onNext(2); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + ts.assertValueCount(22) + .assertNoErrors() + .assertNotComplete(); + } + @Test + public void countRestartsOnTimeTick() { + TestScheduler scheduler = new TestScheduler(); + FlowableProcessor ps = PublishProcessor.create(); + + TestSubscriber> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true) + .test(); + + // window #1 + ps.onNext(1); + ps.onNext(2); + + scheduler.advanceTimeBy(5, TimeUnit.MILLISECONDS); + + // window #2 + ps.onNext(3); + ps.onNext(4); + ps.onNext(5); + ps.onNext(6); + + ts.assertValueCount(2) + .assertNoErrors() + .assertNotComplete(); + } } + diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java index 5a6b326951..d481991a24 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableWindowWithTimeTest.java @@ -603,4 +603,107 @@ public void sizeTimeTimeout() { ts.values().get(0).test().assertResult(); } + + @Test + public void periodicWindowCompletion() { + TestScheduler scheduler = new TestScheduler(); + Subject ps = PublishSubject.create(); + + TestObserver> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, Long.MAX_VALUE, false) + .test(); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + ts.assertValueCount(21) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void periodicWindowCompletionRestartTimer() { + TestScheduler scheduler = new TestScheduler(); + Subject ps = PublishSubject.create(); + + TestObserver> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, Long.MAX_VALUE, true) + .test(); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + ts.assertValueCount(21) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void periodicWindowCompletionBounded() { + TestScheduler scheduler = new TestScheduler(); + Subject ps = PublishSubject.create(); + + TestObserver> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, false) + .test(); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + ts.assertValueCount(21) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void periodicWindowCompletionRestartTimerBounded() { + TestScheduler scheduler = new TestScheduler(); + Subject ps = PublishSubject.create(); + + TestObserver> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true) + .test(); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + ts.assertValueCount(21) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void periodicWindowCompletionRestartTimerBoundedSomeData() { + TestScheduler scheduler = new TestScheduler(); + Subject ps = PublishSubject.create(); + + TestObserver> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 2, true) + .test(); + + ps.onNext(1); + ps.onNext(2); + + scheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS); + + ts.assertValueCount(22) + .assertNoErrors() + .assertNotComplete(); + } + + @Test + public void countRestartsOnTimeTick() { + TestScheduler scheduler = new TestScheduler(); + Subject ps = PublishSubject.create(); + + TestObserver> ts = ps.window(5, TimeUnit.MILLISECONDS, scheduler, 5, true) + .test(); + + // window #1 + ps.onNext(1); + ps.onNext(2); + + scheduler.advanceTimeBy(5, TimeUnit.MILLISECONDS); + + // window #2 + ps.onNext(3); + ps.onNext(4); + ps.onNext(5); + ps.onNext(6); + + ts.assertValueCount(2) + .assertNoErrors() + .assertNotComplete(); + } }