diff --git a/src/main/java/rx/subjects/ReplaySubject.java b/src/main/java/rx/subjects/ReplaySubject.java index 73ddf8585a..badf09a948 100644 --- a/src/main/java/rx/subjects/ReplaySubject.java +++ b/src/main/java/rx/subjects/ReplaySubject.java @@ -838,7 +838,7 @@ public void evict(NodeList t1) { @Override public boolean test(Object value, long now) { - return true; // size gets never stale + return false; // size gets never stale } @Override diff --git a/src/test/java/rx/internal/operators/OperatorReplayTest.java b/src/test/java/rx/internal/operators/OperatorReplayTest.java index 6a6e48f068..8e6dddce8c 100644 --- a/src/test/java/rx/internal/operators/OperatorReplayTest.java +++ b/src/test/java/rx/internal/operators/OperatorReplayTest.java @@ -87,6 +87,58 @@ public void testBufferedReplay() { } } + @Test + public void testBufferedWindowReplay() { + PublishSubject source = PublishSubject.create(); + TestScheduler scheduler = new TestScheduler(); + ConnectableObservable co = source.replay(3, 100, TimeUnit.MILLISECONDS, scheduler); + co.connect(); + + { + @SuppressWarnings("unchecked") + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + source.onNext(1); + scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS); + source.onNext(2); + scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS); + source.onNext(3); + scheduler.advanceTimeBy(10, TimeUnit.MILLISECONDS); + + inOrder.verify(observer1, times(1)).onNext(1); + inOrder.verify(observer1, times(1)).onNext(2); + inOrder.verify(observer1, times(1)).onNext(3); + + source.onNext(4); + source.onNext(5); + scheduler.advanceTimeBy(90, TimeUnit.MILLISECONDS); + + inOrder.verify(observer1, times(1)).onNext(4); + + inOrder.verify(observer1, times(1)).onNext(5); + + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + + } + + { + @SuppressWarnings("unchecked") + Observer observer1 = mock(Observer.class); + InOrder inOrder = inOrder(observer1); + + co.subscribe(observer1); + + inOrder.verify(observer1, times(1)).onNext(4); + inOrder.verify(observer1, times(1)).onNext(5); + inOrder.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + } + } + @Test public void testWindowedReplay() { TestScheduler scheduler = new TestScheduler();