diff --git a/src/main/java/io/reactivex/subjects/ReplaySubject.java b/src/main/java/io/reactivex/subjects/ReplaySubject.java index 29634ff1c4..271a1f7b56 100644 --- a/src/main/java/io/reactivex/subjects/ReplaySubject.java +++ b/src/main/java/io/reactivex/subjects/ReplaySubject.java @@ -13,7 +13,6 @@ package io.reactivex.subjects; -import io.reactivex.annotations.Nullable; import java.lang.reflect.Array; import java.util.*; import java.util.concurrent.TimeUnit; @@ -21,7 +20,7 @@ import io.reactivex.Observer; import io.reactivex.Scheduler; -import io.reactivex.annotations.CheckReturnValue; +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.util.NotificationLite; @@ -94,8 +93,9 @@ * in a non-blocking and thread-safe manner via {@link #hasValue()}, {@link #getValue()}, * {@link #getValues()} or {@link #getValues(Object[])}. *

- * Note that due to concurrency requirements, a size-bounded {@code ReplaySubject} may hold strong references to more - * source emissions than specified. + * Note that due to concurrency requirements, a size- and time-bounded {@code ReplaySubject} may hold strong references to more + * source emissions than specified while it isn't terminated yet. Use the {@link #cleanupBuffer()} to allow + * such inaccessible items to be cleaned up by GC once no consumer references it anymore. *

*
Scheduler:
*
{@code ReplaySubject} does not operate by default on a particular {@link io.reactivex.Scheduler} and @@ -415,6 +415,24 @@ public T getValue() { return buffer.getValue(); } + /** + * Makes sure the item cached by the head node in a bounded + * ReplaySubject is released (as it is never part of a replay). + *

+ * By default, live bounded buffers will remember one item before + * the currently receivable one to ensure subscribers can always + * receive a continuous sequence of items. A terminated ReplaySubject + * automatically releases this inaccessible item. + *

+ * The method must be called sequentially, similar to the standard + * {@code onXXX} methods. + * @since 2.1.11 - experimental + */ + @Experimental + public void cleanupBuffer() { + buffer.trimHead(); + } + /** An empty array to avoid allocation in getValues(). */ private static final Object[] EMPTY_ARRAY = new Object[0]; @@ -563,6 +581,12 @@ interface ReplayBuffer { * @return true if successful */ boolean compareAndSet(Object expected, Object next); + + /** + * Make sure an old inaccessible head value is released + * in a bounded buffer. + */ + void trimHead(); } static final class ReplayDisposable extends AtomicInteger implements Disposable { @@ -619,10 +643,16 @@ public void add(T value) { @Override public void addFinal(Object notificationLite) { buffer.add(notificationLite); + trimHead(); size++; done = true; } + @Override + public void trimHead() { + // no-op in this type of buffer + } + @Override @Nullable @SuppressWarnings("unchecked") @@ -839,9 +869,24 @@ public void addFinal(Object notificationLite) { size++; t.lazySet(n); // releases both the tail and size + trimHead(); done = true; } + /** + * Replace a non-empty head node with an empty one to + * allow the GC of the inaccessible old value. + */ + @Override + public void trimHead() { + Node h = head; + if (h.value != null) { + Node n = new Node(null); + n.lazySet(h.get()); + head = n; + } + } + @Override @Nullable @SuppressWarnings("unchecked") @@ -1047,12 +1092,24 @@ void trimFinal() { for (;;) { TimedNode next = h.get(); if (next.get() == null) { - head = h; + if (h.value != null) { + TimedNode lasth = new TimedNode(null, 0L); + lasth.lazySet(h.get()); + head = lasth; + } else { + head = h; + } break; } if (next.time > limit) { - head = h; + if (h.value != null) { + TimedNode lasth = new TimedNode(null, 0L); + lasth.lazySet(h.get()); + head = lasth; + } else { + head = h; + } break; } @@ -1085,6 +1142,20 @@ public void addFinal(Object notificationLite) { done = true; } + /** + * Replace a non-empty head node with an empty one to + * allow the GC of the inaccessible old value. + */ + @Override + public void trimHead() { + TimedNode h = head; + if (h.value != null) { + TimedNode n = new TimedNode(null, 0); + n.lazySet(h.get()); + head = n; + } + } + @Override @Nullable @SuppressWarnings("unchecked") diff --git a/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java b/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java index 7331edf615..ccb5047cff 100644 --- a/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java +++ b/src/test/java/io/reactivex/subjects/ReplaySubjectTest.java @@ -30,6 +30,7 @@ import io.reactivex.functions.Function; import io.reactivex.observers.*; import io.reactivex.schedulers.*; +import io.reactivex.subjects.ReplaySubject.*; public class ReplaySubjectTest extends SubjectTest { @@ -1184,4 +1185,92 @@ public void timedNoOutdatedData() { source.test().assertResult(); } + + @Test + public void noHeadRetentionCompleteSize() { + ReplaySubject source = ReplaySubject.createWithSize(1); + + source.onNext(1); + source.onNext(2); + source.onComplete(); + + SizeBoundReplayBuffer buf = (SizeBoundReplayBuffer)source.buffer; + + assertNull(buf.head.value); + + Object o = buf.head; + + source.cleanupBuffer(); + + assertSame(o, buf.head); + } + + + @Test + public void noHeadRetentionSize() { + ReplaySubject source = ReplaySubject.createWithSize(1); + + source.onNext(1); + source.onNext(2); + + SizeBoundReplayBuffer buf = (SizeBoundReplayBuffer)source.buffer; + + assertNotNull(buf.head.value); + + source.cleanupBuffer(); + + assertNull(buf.head.value); + + Object o = buf.head; + + source.cleanupBuffer(); + + assertSame(o, buf.head); + } + + @Test + public void noHeadRetentionCompleteTime() { + ReplaySubject source = ReplaySubject.createWithTime(1, TimeUnit.MINUTES, Schedulers.computation()); + + source.onNext(1); + source.onNext(2); + source.onComplete(); + + SizeAndTimeBoundReplayBuffer buf = (SizeAndTimeBoundReplayBuffer)source.buffer; + + assertNull(buf.head.value); + + Object o = buf.head; + + source.cleanupBuffer(); + + assertSame(o, buf.head); + } + + @Test + public void noHeadRetentionTime() { + TestScheduler sch = new TestScheduler(); + + ReplaySubject source = ReplaySubject.createWithTime(1, TimeUnit.MILLISECONDS, sch); + + source.onNext(1); + + sch.advanceTimeBy(2, TimeUnit.MILLISECONDS); + + source.onNext(2); + + SizeAndTimeBoundReplayBuffer buf = (SizeAndTimeBoundReplayBuffer)source.buffer; + + assertNotNull(buf.head.value); + + source.cleanupBuffer(); + + assertNull(buf.head.value); + + Object o = buf.head; + + source.cleanupBuffer(); + + assertSame(o, buf.head); + } }