From cefaade09e65000cde60fc2e85f8fbfcc9cea74c Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 7 Mar 2018 13:17:43 +0100 Subject: [PATCH] 2.x: Fix excess item retention in the other replay components --- .../operators/flowable/FlowableReplay.java | 11 +- .../observable/ObservableReplay.java | 12 +- .../reactivex/processors/ReplayProcessor.java | 66 ++++++- .../flowable/FlowableReplayTest.java | 170 +++++++++++++++++- .../observable/ObservableReplayTest.java | 155 ++++++++++++++++ .../processors/ReplayProcessorTest.java | 142 ++++++++++++++- 6 files changed, 548 insertions(+), 8 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index 3005eb11c8..7574a93d4b 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -806,6 +806,15 @@ public final void complete() { truncateFinal(); } + final void trimHead() { + Node head = get(); + if (head.value != null) { + Node n = new Node(null, 0L); + n.lazySet(head.get()); + set(n); + } + } + @Override public final void replay(InnerSubscription output) { synchronized (output) { @@ -909,7 +918,7 @@ void truncate() { * based on its properties (i.e., truncate but the very last node). */ void truncateFinal() { - + trimHead(); } /* test */ final void collect(Collection output) { Node n = getHead(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java index c48e83b74b..26efd9be5a 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java @@ -619,6 +619,16 @@ final void removeFirst() { // can't null out the head's value because of late replayers would see null setFirst(next); } + + final void trimHead() { + Node head = get(); + if (head.value != null) { + Node n = new Node(null); + n.lazySet(head.get()); + set(n); + } + } + /* test */ final void removeSome(int n) { Node head = get(); while (n > 0) { @@ -733,7 +743,7 @@ Object leaveTransform(Object value) { * based on its properties (i.e., truncate but the very last node). */ void truncateFinal() { - + trimHead(); } /* test */ final void collect(Collection output) { Node n = getHead(); diff --git a/src/main/java/io/reactivex/processors/ReplayProcessor.java b/src/main/java/io/reactivex/processors/ReplayProcessor.java index cdfcb0a3af..069e6628fd 100644 --- a/src/main/java/io/reactivex/processors/ReplayProcessor.java +++ b/src/main/java/io/reactivex/processors/ReplayProcessor.java @@ -21,7 +21,7 @@ import org.reactivestreams.*; import io.reactivex.Scheduler; -import io.reactivex.annotations.CheckReturnValue; +import io.reactivex.annotations.*; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.*; @@ -362,6 +362,24 @@ public Throwable getThrowable() { return null; } + /** + * Makes sure the item cached by the head node in a bounded + * ReplayProcessor is released (as it is never part of a replay). + *

+ * By default, live bounded buffers will remember one item before + * the currently receivable one to ensure subscribers can always + * receive a continuous sequence of items. A terminated ReplayProcessor + * automatically releases this inaccessible item. + *

+ * The method must be called sequentially, similar to the standard + * {@code onXXX} methods. + * @since 2.1.11 - experimental + */ + @Experimental + public void cleanupBuffer() { + buffer.trimHead(); + } + /** * Returns a single value the Subject currently has or null if no such value exists. *

The method is thread-safe. @@ -499,6 +517,12 @@ interface ReplayBuffer { boolean isDone(); Throwable getError(); + + /** + * Make sure an old inaccessible head value is released + * in a bounded buffer. + */ + void trimHead(); } static final class ReplaySubscription extends AtomicInteger implements Subscription { @@ -568,6 +592,11 @@ public void complete() { done = true; } + @Override + public void trimHead() { + // not applicable for an unbounded buffer + } + @Override public T getValue() { int s = size; @@ -771,14 +800,25 @@ public void next(T value) { @Override public void error(Throwable ex) { error = ex; + trimHead(); done = true; } @Override public void complete() { + trimHead(); done = true; } + @Override + public void trimHead() { + if (head.value != null) { + Node n = new Node(null); + n.lazySet(head.get()); + head = n; + } + } + @Override public boolean isDone() { return done; @@ -992,12 +1032,22 @@ void trimFinal() { for (;;) { TimedNode next = h.get(); if (next == null) { - head = h; + if (h.value != null) { + head = new TimedNode(null, 0L); + } else { + head = h; + } break; } if (next.time > limit) { - head = h; + if (h.value != null) { + TimedNode n = new TimedNode(null, 0L); + n.lazySet(h.get()); + head = n; + } else { + head = h; + } break; } @@ -1005,6 +1055,16 @@ void trimFinal() { } } + + @Override + public void trimHead() { + if (head.value != null) { + TimedNode n = new TimedNode(null, 0L); + n.lazySet(head.get()); + head = n; + } + } + @Override public void next(T value) { TimedNode n = new TimedNode(value, scheduler.now(unit)); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java index 3c0c1ffc22..d081ea1099 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java @@ -21,13 +21,13 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import io.reactivex.annotations.NonNull; import org.junit.*; import org.mockito.InOrder; import org.reactivestreams.*; import io.reactivex.*; import io.reactivex.Scheduler.Worker; +import io.reactivex.annotations.NonNull; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.TestException; import io.reactivex.flowables.ConnectableFlowable; @@ -1775,4 +1775,172 @@ public void badRequest() { .replay() ); } + + @Test + public void noHeadRetentionCompleteSize() { + PublishProcessor source = PublishProcessor.create(); + + FlowableReplay co = (FlowableReplay)source + .replay(1); + + // the backpressure coordination would not accept items from source otherwise + co.test(); + + co.connect(); + + BoundedReplayBuffer buf = (BoundedReplayBuffer)(co.current.get().buffer); + + source.onNext(1); + source.onNext(2); + source.onComplete(); + + assertNull(buf.get().value); + + Object o = buf.get(); + + buf.trimHead(); + + assertSame(o, buf.get()); + } + + @Test + public void noHeadRetentionErrorSize() { + PublishProcessor source = PublishProcessor.create(); + + FlowableReplay co = (FlowableReplay)source + .replay(1); + + co.test(); + + co.connect(); + + BoundedReplayBuffer buf = (BoundedReplayBuffer)(co.current.get().buffer); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + assertNull(buf.get().value); + + Object o = buf.get(); + + buf.trimHead(); + + assertSame(o, buf.get()); + } + + @Test + public void noHeadRetentionSize() { + PublishProcessor source = PublishProcessor.create(); + + FlowableReplay co = (FlowableReplay)source + .replay(1); + + co.test(); + + co.connect(); + + BoundedReplayBuffer buf = (BoundedReplayBuffer)(co.current.get().buffer); + + source.onNext(1); + source.onNext(2); + + assertNotNull(buf.get().value); + + buf.trimHead(); + + assertNull(buf.get().value); + + Object o = buf.get(); + + buf.trimHead(); + + assertSame(o, buf.get()); + } + + @Test + public void noHeadRetentionCompleteTime() { + PublishProcessor source = PublishProcessor.create(); + + FlowableReplay co = (FlowableReplay)source + .replay(1, TimeUnit.MINUTES, Schedulers.computation()); + + co.test(); + + co.connect(); + + BoundedReplayBuffer buf = (BoundedReplayBuffer)(co.current.get().buffer); + + source.onNext(1); + source.onNext(2); + source.onComplete(); + + assertNull(buf.get().value); + + Object o = buf.get(); + + buf.trimHead(); + + assertSame(o, buf.get()); + } + + @Test + public void noHeadRetentionErrorTime() { + PublishProcessor source = PublishProcessor.create(); + + FlowableReplay co = (FlowableReplay)source + .replay(1, TimeUnit.MINUTES, Schedulers.computation()); + + co.test(); + + co.connect(); + + BoundedReplayBuffer buf = (BoundedReplayBuffer)(co.current.get().buffer); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + assertNull(buf.get().value); + + Object o = buf.get(); + + buf.trimHead(); + + assertSame(o, buf.get()); + } + + @Test + public void noHeadRetentionTime() { + TestScheduler sch = new TestScheduler(); + + PublishProcessor source = PublishProcessor.create(); + + FlowableReplay co = (FlowableReplay)source + .replay(1, TimeUnit.MILLISECONDS, sch); + + co.test(); + + co.connect(); + + BoundedReplayBuffer buf = (BoundedReplayBuffer)(co.current.get().buffer); + + source.onNext(1); + + sch.advanceTimeBy(2, TimeUnit.MILLISECONDS); + + source.onNext(2); + + assertNotNull(buf.get().value); + + buf.trimHead(); + + assertNull(buf.get().value); + + Object o = buf.get(); + + buf.trimHead(); + + assertSame(o, buf.get()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java index d8b9c75ef2..0d64c4308f 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableReplayTest.java @@ -1561,4 +1561,159 @@ public void replaySelectorConnectableReturnsNull() { .test() .assertFailureAndMessage(NullPointerException.class, "The connectableFactory returned a null ConnectableObservable"); } + + @Test + public void noHeadRetentionCompleteSize() { + PublishSubject source = PublishSubject.create(); + + ObservableReplay co = (ObservableReplay)source + .replay(1); + + co.connect(); + + BoundedReplayBuffer buf = (BoundedReplayBuffer)(co.current.get().buffer); + + source.onNext(1); + source.onNext(2); + source.onComplete(); + + assertNull(buf.get().value); + + Object o = buf.get(); + + buf.trimHead(); + + assertSame(o, buf.get()); + } + + @Test + public void noHeadRetentionErrorSize() { + PublishSubject source = PublishSubject.create(); + + ObservableReplay co = (ObservableReplay)source + .replay(1); + + co.connect(); + + BoundedReplayBuffer buf = (BoundedReplayBuffer)(co.current.get().buffer); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + assertNull(buf.get().value); + + Object o = buf.get(); + + buf.trimHead(); + + assertSame(o, buf.get()); + } + + @Test + public void noHeadRetentionSize() { + PublishSubject source = PublishSubject.create(); + + ObservableReplay co = (ObservableReplay)source + .replay(1); + + co.connect(); + + BoundedReplayBuffer buf = (BoundedReplayBuffer)(co.current.get().buffer); + + source.onNext(1); + source.onNext(2); + + assertNotNull(buf.get().value); + + buf.trimHead(); + + assertNull(buf.get().value); + + Object o = buf.get(); + + buf.trimHead(); + + assertSame(o, buf.get()); + } + + @Test + public void noHeadRetentionCompleteTime() { + PublishSubject source = PublishSubject.create(); + + ObservableReplay co = (ObservableReplay)source + .replay(1, TimeUnit.MINUTES, Schedulers.computation()); + + co.connect(); + + BoundedReplayBuffer buf = (BoundedReplayBuffer)(co.current.get().buffer); + + source.onNext(1); + source.onNext(2); + source.onComplete(); + + assertNull(buf.get().value); + + Object o = buf.get(); + + buf.trimHead(); + + assertSame(o, buf.get()); + } + + @Test + public void noHeadRetentionErrorTime() { + PublishSubject source = PublishSubject.create(); + + ObservableReplay co = (ObservableReplay)source + .replay(1, TimeUnit.MINUTES, Schedulers.computation()); + + co.connect(); + + BoundedReplayBuffer buf = (BoundedReplayBuffer)(co.current.get().buffer); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + assertNull(buf.get().value); + + Object o = buf.get(); + + buf.trimHead(); + + assertSame(o, buf.get()); + } + + @Test + public void noHeadRetentionTime() { + TestScheduler sch = new TestScheduler(); + + PublishSubject source = PublishSubject.create(); + + ObservableReplay co = (ObservableReplay)source + .replay(1, TimeUnit.MILLISECONDS, sch); + + co.connect(); + + BoundedReplayBuffer buf = (BoundedReplayBuffer)(co.current.get().buffer); + + source.onNext(1); + + sch.advanceTimeBy(2, TimeUnit.MILLISECONDS); + + source.onNext(2); + + assertNotNull(buf.get().value); + + buf.trimHead(); + + assertNull(buf.get().value); + + Object o = buf.get(); + + buf.trimHead(); + + assertSame(o, buf.get()); + } } diff --git a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java index cd8f6de326..a80d1d0b83 100644 --- a/src/test/java/io/reactivex/processors/ReplayProcessorTest.java +++ b/src/test/java/io/reactivex/processors/ReplayProcessorTest.java @@ -30,6 +30,7 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.processors.ReplayProcessor.*; import io.reactivex.schedulers.*; import io.reactivex.subscribers.*; @@ -539,7 +540,7 @@ public void testReplayTimestampedDirectly() { // FIXME RS subscribers can't throw // @Test // public void testOnErrorThrowsDoesntPreventDelivery() { -// ReplaySubject ps = ReplaySubject.create(); +// ReplayProcessor ps = ReplayProcessor.create(); // // ps.subscribe(); // TestSubscriber ts = new TestSubscriber(); @@ -561,7 +562,7 @@ public void testReplayTimestampedDirectly() { // */ // @Test // public void testOnErrorThrowsDoesntPreventDelivery2() { -// ReplaySubject ps = ReplaySubject.create(); +// ReplayProcessor ps = ReplayProcessor.create(); // // ps.subscribe(); // ps.subscribe(); @@ -1540,4 +1541,141 @@ public void timeAndSizeBoundCancelAfterOne() { source.subscribeWith(take1AndCancel()) .assertResult(1); } + + @Test + public void noHeadRetentionCompleteSize() { + ReplayProcessor source = ReplayProcessor.createWithSize(1); + + source.onNext(1); + source.onNext(2); + source.onComplete(); + + SizeBoundReplayBuffer buf = (SizeBoundReplayBuffer)source.buffer; + + assertNull(buf.head.value); + + Object o = buf.head; + + source.cleanupBuffer(); + + assertSame(o, buf.head); + } + + @Test + public void noHeadRetentionErrorSize() { + ReplayProcessor source = ReplayProcessor.createWithSize(1); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + SizeBoundReplayBuffer buf = (SizeBoundReplayBuffer)source.buffer; + + assertNull(buf.head.value); + + Object o = buf.head; + + source.cleanupBuffer(); + + assertSame(o, buf.head); + } + + @Test + public void unboundedCleanupBufferNoOp() { + ReplayProcessor source = ReplayProcessor.create(1); + + source.onNext(1); + source.onNext(2); + + source.cleanupBuffer(); + + source.test().assertValuesOnly(1, 2); + } + + @Test + public void noHeadRetentionSize() { + ReplayProcessor source = ReplayProcessor.createWithSize(1); + + source.onNext(1); + source.onNext(2); + + SizeBoundReplayBuffer buf = (SizeBoundReplayBuffer)source.buffer; + + assertNotNull(buf.head.value); + + source.cleanupBuffer(); + + assertNull(buf.head.value); + + Object o = buf.head; + + source.cleanupBuffer(); + + assertSame(o, buf.head); + } + + @Test + public void noHeadRetentionCompleteTime() { + ReplayProcessor source = ReplayProcessor.createWithTime(1, TimeUnit.MINUTES, Schedulers.computation()); + + source.onNext(1); + source.onNext(2); + source.onComplete(); + + SizeAndTimeBoundReplayBuffer buf = (SizeAndTimeBoundReplayBuffer)source.buffer; + + assertNull(buf.head.value); + + Object o = buf.head; + + source.cleanupBuffer(); + + assertSame(o, buf.head); + } + + @Test + public void noHeadRetentionErrorTime() { + ReplayProcessor source = ReplayProcessor.createWithTime(1, TimeUnit.MINUTES, Schedulers.computation()); + + source.onNext(1); + source.onNext(2); + source.onError(new TestException()); + + SizeAndTimeBoundReplayBuffer buf = (SizeAndTimeBoundReplayBuffer)source.buffer; + + assertNull(buf.head.value); + + Object o = buf.head; + + source.cleanupBuffer(); + + assertSame(o, buf.head); + } + + @Test + public void noHeadRetentionTime() { + TestScheduler sch = new TestScheduler(); + + ReplayProcessor source = ReplayProcessor.createWithTime(1, TimeUnit.MILLISECONDS, sch); + + source.onNext(1); + + sch.advanceTimeBy(2, TimeUnit.MILLISECONDS); + + source.onNext(2); + + SizeAndTimeBoundReplayBuffer buf = (SizeAndTimeBoundReplayBuffer)source.buffer; + + assertNotNull(buf.head.value); + + source.cleanupBuffer(); + + assertNull(buf.head.value); + + Object o = buf.head; + + source.cleanupBuffer(); + + assertSame(o, buf.head); + } }