From a845273cb48e719a66dbba82cbee06fcb91aa719 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sun, 8 May 2016 21:35:32 +0200 Subject: [PATCH] 1.x: ReplaySubject now supports backpressure In addition, the behavior of time-limited mode has been changed. Late subscribers will now skip stale data. --- src/main/java/rx/subjects/ReplaySubject.java | 1753 +++++++++-------- .../java/rx/subjects/ReplaySubjectTest.java | 188 +- 2 files changed, 1066 insertions(+), 875 deletions(-) diff --git a/src/main/java/rx/subjects/ReplaySubject.java b/src/main/java/rx/subjects/ReplaySubject.java index f1b693dc9a5..4c143c03800 100644 --- a/src/main/java/rx/subjects/ReplaySubject.java +++ b/src/main/java/rx/subjects/ReplaySubject.java @@ -16,22 +16,16 @@ package rx.subjects; import java.lang.reflect.Array; -import java.util.ArrayList; -import java.util.List; +import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; +import rx.*; import rx.Observer; -import rx.Scheduler; import rx.annotations.Beta; import rx.exceptions.Exceptions; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.internal.operators.NotificationLite; -import rx.internal.util.UtilityFunctions; -import rx.schedulers.Timestamped; -import rx.subjects.ReplaySubject.NodeList.Node; -import rx.subjects.SubjectSubscriptionManager.SubjectObserver; +import rx.internal.operators.BackpressureUtils; +import rx.internal.util.RxJavaPluginUtils; /** * Subject that buffers all items it observes and replays them to any {@link Observer} that subscribes. @@ -91,69 +85,12 @@ public static ReplaySubject create() { * @return the created subject */ public static ReplaySubject create(int capacity) { - final UnboundedReplayState state = new UnboundedReplayState(capacity); - SubjectSubscriptionManager ssm = new SubjectSubscriptionManager(); - ssm.onStart = new Action1>() { - @Override - public void call(SubjectObserver o) { - // replay history for this observer using the subscribing thread - int lastIndex = state.replayObserverFromIndex(0, o); - - // now that it is caught up add to observers - o.index(lastIndex); - } - }; - ssm.onAdded = new Action1>() { - @Override - public void call(SubjectObserver o) { - synchronized (o) { - if (!o.first || o.emitting) { - return; - } - o.first = false; - o.emitting = true; - } - boolean skipFinal = false; - try { - //noinspection UnnecessaryLocalVariable - Avoid re-read from outside this scope - final UnboundedReplayState localState = state; - for (;;) { - int idx = o.index(); - int sidx = localState.get(); - if (idx != sidx) { - Integer j = localState.replayObserverFromIndex(idx, o); - o.index(j); - } - synchronized (o) { - if (sidx == localState.get()) { - o.emitting = false; - skipFinal = true; - break; - } - } - } - } finally { - if (!skipFinal) { - synchronized (o) { - o.emitting = false; - } - } - } - } - }; - ssm.onTerminated = new Action1>() { - @Override - public void call(SubjectObserver o) { - Integer idx = o.index(); - if (idx == null) { - idx = 0; - } - // we will finish replaying if there is anything left - state.replayObserverFromIndex(idx, o); - } - }; - - return new ReplaySubject(ssm, ssm, state); + if (capacity <= 0) { + throw new IllegalArgumentException("capacity > 0 required but it was " + capacity); + } + ReplayBuffer buffer = new ReplayUnboundedBuffer(capacity); + ReplayState state = new ReplayState(buffer); + return new ReplaySubject(state); } /** * Creates an unbounded replay subject with the bounded-implementation for testing purposes. @@ -169,12 +106,9 @@ public void call(SubjectObserver o) { * @return the created subject */ /* public */ static ReplaySubject createUnbounded() { - final BoundedState state = new BoundedState( - new EmptyEvictionPolicy(), - UtilityFunctions.identity(), - UtilityFunctions.identity() - ); - return createWithState(state, new DefaultOnAdd(state)); + ReplayBuffer buffer = new ReplaySizeBoundBuffer(Integer.MAX_VALUE); + ReplayState state = new ReplayState(buffer); + return new ReplaySubject(state); } /** * Creates a size-bounded replay subject. @@ -197,12 +131,9 @@ public void call(SubjectObserver o) { * @return the created subject */ public static ReplaySubject createWithSize(int size) { - final BoundedState state = new BoundedState( - new SizeEvictionPolicy(size), - UtilityFunctions.identity(), - UtilityFunctions.identity() - ); - return createWithState(state, new DefaultOnAdd(state)); + ReplayBuffer buffer = new ReplaySizeBoundBuffer(size); + ReplayState state = new ReplayState(buffer); + return new ReplaySubject(state); } /** * Creates a time-bounded replay subject. @@ -237,12 +168,7 @@ public static ReplaySubject createWithSize(int size) { * @return the created subject */ public static ReplaySubject createWithTime(long time, TimeUnit unit, final Scheduler scheduler) { - final BoundedState state = new BoundedState( - new TimeEvictionPolicy(unit.toMillis(time), scheduler), - new AddTimestamped(scheduler), - new RemoveTimestamped() - ); - return createWithState(state, new TimedOnAdd(state, scheduler)); + return createWithTimeAndSize(time, unit, Integer.MAX_VALUE, scheduler); } /** * Creates a time- and size-bounded replay subject. @@ -279,904 +205,1077 @@ public static ReplaySubject createWithTime(long time, TimeUnit unit, fina * @return the created subject */ public static ReplaySubject createWithTimeAndSize(long time, TimeUnit unit, int size, final Scheduler scheduler) { - final BoundedState state = new BoundedState( - new PairEvictionPolicy( - new SizeEvictionPolicy(size), - new TimeEvictionPolicy(unit.toMillis(time), scheduler) - ), - new AddTimestamped(scheduler), - new RemoveTimestamped() - ); - return createWithState(state, new TimedOnAdd(state, scheduler)); - } - /** - * Creates a bounded replay subject with the given state shared between the subject and the - * {@link OnSubscribe} functions. - * - * @param - * the type of items observed and emitted by the Subject - * @param state - * the shared state - * @return the created subject - */ - static ReplaySubject createWithState(final BoundedState state, - Action1> onStart) { - SubjectSubscriptionManager ssm = new SubjectSubscriptionManager(); - ssm.onStart = onStart; - ssm.onAdded = new Action1>() { - @Override - public void call(SubjectObserver o) { - synchronized (o) { - if (!o.first || o.emitting) { - return; - } - o.first = false; - o.emitting = true; - } - boolean skipFinal = false; - try { - for (;;) { - NodeList.Node idx = o.index(); - NodeList.Node sidx = state.tail(); - if (idx != sidx) { - NodeList.Node j = state.replayObserverFromIndex(idx, o); - o.index(j); - } - synchronized (o) { - if (sidx == state.tail()) { - o.emitting = false; - skipFinal = true; - break; - } - } - } - } finally { - if (!skipFinal) { - synchronized (o) { - o.emitting = false; - } - } - } - } - }; - ssm.onTerminated = new Action1>() { - - @Override - public void call(SubjectObserver t1) { - NodeList.Node l = t1.index(); - if (l == null) { - l = state.head(); - } - state.replayObserverFromIndex(l, t1); - } - - }; - - return new ReplaySubject(ssm, ssm, state); + ReplayBuffer buffer = new ReplaySizeAndTimeBoundBuffer(size, unit.toMillis(time), scheduler); + ReplayState state = new ReplayState(buffer); + return new ReplaySubject(state); } /** The state storing the history and the references. */ - final ReplayState state; - /** The manager of subscribers. */ - final SubjectSubscriptionManager ssm; - ReplaySubject(OnSubscribe onSubscribe, SubjectSubscriptionManager ssm, ReplayState state) { - super(onSubscribe); - this.ssm = ssm; + final ReplayState state; + + ReplaySubject(ReplayState state) { + super(state); this.state = state; } @Override public void onNext(T t) { - if (ssm.active) { - state.next(t); - for (SubjectSubscriptionManager.SubjectObserver o : ssm.observers()) { - if (caughtUp(o)) { - o.onNext(t); - } - } - } + state.onNext(t); } @Override public void onError(final Throwable e) { - if (ssm.active) { - state.error(e); - List errors = null; - for (SubjectObserver o : ssm.terminate(NotificationLite.instance().error(e))) { - try { - if (caughtUp(o)) { - o.onError(e); - } - } catch (Throwable e2) { - if (errors == null) { - errors = new ArrayList(); - } - errors.add(e2); - } - } - - Exceptions.throwIfAny(errors); - } + state.onError(e); } @Override public void onCompleted() { - if (ssm.active) { - state.complete(); - for (SubjectObserver o : ssm.terminate(NotificationLite.instance().completed())) { - if (caughtUp(o)) { - o.onCompleted(); - } - } - } + state.onCompleted(); } /** * @return Returns the number of subscribers. */ /* Support test. */int subscriberCount() { - return ssm.get().observers.length; + return state.get().length; } @Override public boolean hasObservers() { - return ssm.observers().length > 0; + return state.get().length != 0; } - private boolean caughtUp(SubjectObserver o) { - if (!o.caughtUp) { - if (state.replayObserver(o)) { - o.caughtUp = true; - o.index(null); // once caught up, no need for the index anymore - } - return false; - } else { - // it was caught up so proceed the "raw route" - return true; + /** + * Check if the Subject has terminated with an exception. + * @return true if the subject has received a throwable through {@code onError}. + */ + @Beta + public boolean hasThrowable() { + return state.isTerminated() && state.buffer.error() != null; + } + /** + * Check if the Subject has terminated normally. + * @return true if the subject completed normally via {@code onCompleted} + */ + @Beta + public boolean hasCompleted() { + return state.isTerminated() && state.buffer.error() == null; + } + /** + * Returns the Throwable that terminated the Subject. + * @return the Throwable that terminated the Subject or {@code null} if the + * subject hasn't terminated yet or it terminated normally. + */ + @Beta + public Throwable getThrowable() { + if (state.isTerminated()) { + return state.buffer.error(); } + return null; + } + /** + * Returns the current number of items (non-terminal events) available for replay. + * @return the number of items available + */ + @Beta + public int size() { + return state.buffer.size(); + } + /** + * @return true if the Subject holds at least one non-terminal event available for replay + */ + @Beta + public boolean hasAnyValue() { + return !state.buffer.isEmpty(); + } + @Beta + public boolean hasValue() { + return hasAnyValue(); + } + /** + * Returns a snapshot of the currently buffered non-terminal events into + * the provided {@code a} array or creates a new array if it has not enough capacity. + * @param a the array to fill in + * @return the array {@code a} if it had enough capacity or a new array containing the available values + */ + @Beta + public T[] getValues(T[] a) { + return state.buffer.toArray(a); } - // ********************* - // State implementations - // ********************* + /** An empty array to trigger getValues() to return a new array. */ + private static final Object[] EMPTY_ARRAY = new Object[0]; /** - * The unbounded replay state. - * @param the input and output type + * Returns a snapshot of the currently buffered non-terminal events. + *

The operation is threadsafe. + * + * @return a snapshot of the currently buffered non-terminal events. + * @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number) */ - static final class UnboundedReplayState extends AtomicInteger implements ReplayState { - private final NotificationLite nl = NotificationLite.instance(); - /** The buffer. */ - private final ArrayList list; - /** The termination flag. */ - private volatile boolean terminated; - public UnboundedReplayState(int initialCapacity) { - list = new ArrayList(initialCapacity); - } - - @Override - public void next(T n) { - if (!terminated) { - list.add(nl.next(n)); - getAndIncrement(); // release index - } + @SuppressWarnings("unchecked") + @Beta + public Object[] getValues() { + T[] r = getValues((T[])EMPTY_ARRAY); + if (r == EMPTY_ARRAY) { + return new Object[0]; // don't leak the default empty array. } + return r; + } + + @Beta + public T getValue() { + return state.buffer.last(); + } + + /** + * Holds onto the array of Subscriber-wrapping ReplayProducers and + * the buffer that holds values to be replayed; it manages + * subscription and signal dispatching. + * + * @param the value type + */ + static final class ReplayState + extends AtomicReference[]> + implements OnSubscribe, Observer { - public void accept(Observer o, int idx) { - nl.accept(o, list.get(idx)); - } + /** */ + private static final long serialVersionUID = 5952362471246910544L; - @Override - public void complete() { - if (!terminated) { - terminated = true; - list.add(nl.completed()); - getAndIncrement(); // release index - } + final ReplayBuffer buffer; + + @SuppressWarnings("rawtypes") + static final ReplayProducer[] EMPTY = new ReplayProducer[0]; + @SuppressWarnings("rawtypes") + static final ReplayProducer[] TERMINATED = new ReplayProducer[0]; + + @SuppressWarnings("unchecked") + public ReplayState(ReplayBuffer buffer) { + this.buffer = buffer; + lazySet(EMPTY); } + @Override - public void error(Throwable e) { - if (!terminated) { - terminated = true; - list.add(nl.error(e)); - getAndIncrement(); // release index + public void call(Subscriber t) { + ReplayProducer rp = new ReplayProducer(t, this); + t.add(rp); + t.setProducer(rp); + + if (add(rp)) { + if (rp.isUnsubscribed()) { + remove(rp); + return; + } } + buffer.drain(rp); } - - @Override - public boolean terminated() { - return terminated; - } - - @Override - public boolean replayObserver(SubjectObserver observer) { - - synchronized (observer) { - observer.first = false; - if (observer.emitting) { + + boolean add(ReplayProducer rp) { + for (;;) { + ReplayProducer[] a = get(); + if (a == TERMINATED) { return false; } - } - - Integer lastEmittedLink = observer.index(); - if (lastEmittedLink != null) { - int l = replayObserverFromIndex(lastEmittedLink, observer); - observer.index(l); - return true; - } else { - throw new IllegalStateException("failed to find lastEmittedLink for: " + observer); + + int n = a.length; + + @SuppressWarnings("unchecked") + ReplayProducer[] b = new ReplayProducer[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = rp; + + if (compareAndSet(a, b)) { + return true; + } } } - - @Override - public Integer replayObserverFromIndex(Integer idx, SubjectObserver observer) { - int i = idx; - while (i < get()) { - accept(observer, i); - i++; + + @SuppressWarnings("unchecked") + void remove(ReplayProducer rp) { + for (;;) { + ReplayProducer[] a = get(); + if (a == TERMINATED || a == EMPTY) { + return; + } + + int n = a.length; + + int j = -1; + for (int i = 0; i < n; i++) { + if (a[i] == rp) { + j = i; + break; + } + } + + if (j < 0) { + return; + } + + ReplayProducer[] b; + if (n == 1) { + b = EMPTY; + } else { + b = new ReplayProducer[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + } + if (compareAndSet(a, b)) { + return; + } } - - return i; } @Override - public Integer replayObserverFromIndexTest(Integer idx, SubjectObserver observer, long now) { - return replayObserverFromIndex(idx, observer); - } - - @Override - public int size() { - int idx = get(); // acquire - if (idx > 0) { - Object o = list.get(idx - 1); - if (nl.isCompleted(o) || nl.isError(o)) { - return idx - 1; // do not report a terminal event as part of size + public void onNext(T t) { + ReplayBuffer b = buffer; + + b.next(t); + for (ReplayProducer rp : get()) { + if (rp.caughtUp) { + rp.actual.onNext(t); + } else { + if (b.drain(rp)) { + rp.caughtUp = true; + rp.node = null; + } } } - return idx; } - @Override - public boolean isEmpty() { - return size() == 0; - } - @Override + @SuppressWarnings("unchecked") - public T[] toArray(T[] a) { - int s = size(); - if (s > 0) { - if (s > a.length) { - a = (T[])Array.newInstance(a.getClass().getComponentType(), s); - } - for (int i = 0; i < s; i++) { - a[i] = (T)list.get(i); - } - if (a.length > s) { - a[s] = null; + @Override + public void onError(Throwable e) { + ReplayBuffer b = buffer; + + b.error(e); + List errors = null; + for (ReplayProducer rp : getAndSet(TERMINATED)) { + try { + if (rp.caughtUp) { + rp.actual.onError(e); + } else { + if (b.drain(rp)) { + rp.caughtUp = true; + rp.node = null; + } + } + } catch (Throwable ex) { + if (errors == null) { + errors = new ArrayList(); + } + errors.add(ex); } - } else - if (a.length > 0) { - a[0] = null; } - return a; + + Exceptions.throwIfAny(errors); } + + @SuppressWarnings("unchecked") @Override - public T latest() { - int idx = get(); - if (idx > 0) { - Object o = list.get(idx - 1); - if (nl.isCompleted(o) || nl.isError(o)) { - if (idx > 1) { - return nl.getValue(list.get(idx - 2)); + public void onCompleted() { + ReplayBuffer b = buffer; + + b.complete(); + for (ReplayProducer rp : getAndSet(TERMINATED)) { + if (rp.caughtUp) { + rp.actual.onCompleted(); + } else { + if (b.drain(rp)) { + rp.caughtUp = true; + rp.node = null; } - return null; } - return nl.getValue(o); } - return null; + } + + + boolean isTerminated() { + return get() == TERMINATED; } } + /** + * The base interface for buffering signals to be replayed to individual + * Subscribers. + * + * @param the value type + */ + interface ReplayBuffer { + + void next(T t); + + void error(Throwable e); + + void complete(); + + boolean drain(ReplayProducer rp); + + boolean isComplete(); + + Throwable error(); + + T last(); + + int size(); + + boolean isEmpty(); + + T[] toArray(T[] a); + } - /** - * The bounded replay state. - * @param the input and output type + /** + * An unbounded ReplayBuffer implementation that uses linked-arrays + * to avoid copy-on-grow situation with ArrayList. + * + * @param the value type */ - static final class BoundedState implements ReplayState> { - final NodeList list; - final EvictionPolicy evictionPolicy; - final Func1 enterTransform; - final Func1 leaveTransform; - final NotificationLite nl = NotificationLite.instance(); - volatile boolean terminated; - volatile NodeList.Node tail; + static final class ReplayUnboundedBuffer implements ReplayBuffer { + final int capacity; + + volatile int size; + + final Object[] head; - public BoundedState(EvictionPolicy evictionPolicy, Func1 enterTransform, - Func1 leaveTransform) { - this.list = new NodeList(); - this.tail = list.tail; - this.evictionPolicy = evictionPolicy; - this.enterTransform = enterTransform; - this.leaveTransform = leaveTransform; + Object[] tail; + + int tailIndex; + + volatile boolean done; + Throwable error; + + public ReplayUnboundedBuffer(int capacity) { + this.capacity = capacity; + this.tail = this.head = new Object[capacity + 1]; } + @Override - public void next(T value) { - if (!terminated) { - list.addLast(enterTransform.call(nl.next(value))); - evictionPolicy.evict(list); - tail = list.tail; + public void next(T t) { + if (done) { + return; } - } - @Override - public void complete() { - if (!terminated) { - terminated = true; - list.addLast(enterTransform.call(nl.completed())); - evictionPolicy.evictFinal(list); - tail = list.tail; + int i = tailIndex; + Object[] a = tail; + if (i == a.length - 1) { + Object[] b = new Object[a.length]; + b[0] = t; + tailIndex = 1; + a[i] = b; + tail = b; + } else { + a[i] = t; + tailIndex = i + 1; } + size++; } + @Override public void error(Throwable e) { - if (!terminated) { - terminated = true; - list.addLast(enterTransform.call(nl.error(e))); - // don't evict the terminal value - evictionPolicy.evictFinal(list); - tail = list.tail; - } - } - public void accept(Observer o, NodeList.Node node) { - nl.accept(o, leaveTransform.call(node.value)); - } - /** - * Accept only non-stale nodes. - * @param o the target observer - * @param node the node to accept or reject - * @param now the current time - */ - public void acceptTest(Observer o, NodeList.Node node, long now) { - Object v = node.value; - if (!evictionPolicy.test(v, now)) { - nl.accept(o, leaveTransform.call(v)); - } - } - public Node head() { - return list.head; - } - public Node tail() { - return tail; - } - @Override - public boolean replayObserver(SubjectObserver observer) { - synchronized (observer) { - observer.first = false; - if (observer.emitting) { - return false; - } + if (done) { + RxJavaPluginUtils.handleException(e); + return; } - - NodeList.Node lastEmittedLink = observer.index(); - NodeList.Node l = replayObserverFromIndex(lastEmittedLink, observer); - observer.index(l); - return true; + error = e; + done = true; } @Override - public NodeList.Node replayObserverFromIndex( - NodeList.Node l, SubjectObserver observer) { - while (l != tail()) { - accept(observer, l.next); - l = l.next; - } - return l; - } - @Override - public NodeList.Node replayObserverFromIndexTest( - NodeList.Node l, SubjectObserver observer, long now) { - while (l != tail()) { - acceptTest(observer, l.next, now); - l = l.next; - } - return l; + public void complete() { + done = true; } @Override - public boolean terminated() { - return terminated; - } - - @Override - public int size() { - int size = 0; - NodeList.Node l = head(); - NodeList.Node next = l.next; - while (next != null) { - size++; - l = next; - next = next.next; + public boolean drain(ReplayProducer rp) { + if (rp.getAndIncrement() != 0) { + return false; } - if (l.value != null) { - Object value = leaveTransform.call(l.value); - if (value != null && (nl.isError(value) || nl.isCompleted(value))) { - return size - 1; + + int missed = 1; + + final Subscriber a = rp.actual; + final int n = capacity; + + for (;;) { + + long r = rp.requested.get(); + long e = 0L; + + Object[] node = (Object[])rp.node; + if (node == null) { + node = head; + } + int tailIndex = rp.tailIndex; + int index = rp.index; + + while (e != r) { + if (rp.isUnsubscribed()) { + rp.node = null; + return false; + } + + boolean d = done; + boolean empty = index == size; + + if (d && empty) { + rp.node = null; + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onCompleted(); + } + return false; + } + + if (empty) { + break; + } + + if (tailIndex == n) { + node = (Object[])node[tailIndex]; + tailIndex = 0; + } + + @SuppressWarnings("unchecked") + T v = (T)node[tailIndex]; + + a.onNext(v); + + e++; + tailIndex++; + index++; + } + + if (e == r) { + if (rp.isUnsubscribed()) { + rp.node = null; + return false; + } + + boolean d = done; + boolean empty = index == size; + + if (d && empty) { + rp.node = null; + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onCompleted(); + } + return false; + } + } + + if (e != 0L) { + if (r != Long.MAX_VALUE) { + BackpressureUtils.produced(rp.requested, e); + } + } + + rp.index = index; + rp.tailIndex = tailIndex; + rp.node = node; + + missed = rp.addAndGet(-missed); + if (missed == 0) { + return r == Long.MAX_VALUE; } } - return size; } + @Override - public boolean isEmpty() { - NodeList.Node l = head(); - NodeList.Node next = l.next; - if (next == null) { - return true; - } - Object value = leaveTransform.call(next.value); - return nl.isError(value) || nl.isCompleted(value); + public boolean isComplete() { + return done; } + @Override - @SuppressWarnings("unchecked") - public T[] toArray(T[] a) { - List list = new ArrayList(); - NodeList.Node next = head().next; - while (next != null) { - Object o = leaveTransform.call(next.value); + public Throwable error() { + return error; + } - if (next.next == null && (nl.isError(o) || nl.isCompleted(o))) { - break; - } else { - list.add((T)o); - } - next = next.next; + @SuppressWarnings("unchecked") + @Override + public T last() { + // we don't have a volatile read on tail and tailIndex + // so we have to traverse the linked structure up until + // we read size / capacity nodes and index into the array + // via size % capacity + int s = size; + if (s == 0) { + return null; } - return list.toArray(a); + Object[] h = head; + int n = capacity; + + while (s >= n) { + h = (Object[])h[n]; + s -= n; + } + + return (T)h[s - 1]; } + @Override - public T latest() { - Node h = head().next; - if (h == null) { - return null; + public int size() { + return size; + } + + @Override + public boolean isEmpty() { + return size == 0; + } + + @SuppressWarnings("unchecked") + @Override + public T[] toArray(T[] a) { + int s = size; + if (a.length < s) { + a = (T[])Array.newInstance(a.getClass().getComponentType(), s); } - Node p = null; - while (h != tail()) { - p = h; - h = h.next; + + Object[] h = head; + int n = capacity; + + int j = 0; + + while (j + n < s) { + System.arraycopy(h, 0, a, j, n); + j += n; + h = (Object[])h[n]; } - Object value = leaveTransform.call(h.value); - if (nl.isError(value) || nl.isCompleted(value)) { - if (p != null) { - value = leaveTransform.call(p.value); - return nl.getValue(value); - } - return null; + + System.arraycopy(h, 0, a, j, s - j); + + if (a.length > s) { + a[s] = null; } - return nl.getValue(value); + + return a; } } - // ************** - // API interfaces - // ************** - - /** - * General API for replay state management. - * @param the input and output type - * @param the index type - */ - interface ReplayState { - /** @return true if the subject has reached a terminal state. */ - boolean terminated(); - /** - * Replay contents to the given observer. - * @param observer the receiver of events - * @return true if the subject has caught up - */ - boolean replayObserver(SubjectObserver observer); - /** - * Replay the buffered values from an index position and return a new index - * @param idx the current index position - * @param observer the receiver of events - * @return the new index position - */ - I replayObserverFromIndex( - I idx, SubjectObserver observer); - /** - * Replay the buffered values from an index position while testing for stale entries and return a new index - * @param idx the current index position - * @param observer the receiver of events - * @return the new index position - */ - I replayObserverFromIndexTest( - I idx, SubjectObserver observer, long now); - /** - * Add an OnNext value to the buffer - * @param value the value to add - */ - void next(T value); - /** - * Add an OnError exception and terminate the subject - * @param e the exception to add - */ - void error(Throwable e); - /** - * Add an OnCompleted exception and terminate the subject - */ - void complete(); - /** - * @return the number of non-terminal values in the replay buffer. - */ - int size(); - /** - * @return true if the replay buffer is empty of non-terminal values - */ - boolean isEmpty(); + static final class ReplaySizeBoundBuffer implements ReplayBuffer { + final int limit; - /** - * Copy the current values (minus any terminal value) from the buffer into the array - * or create a new array if there isn't enough room. - * @param a the array to fill in - * @return the array or a new array containing the current values - */ - T[] toArray(T[] a); - /** - * Returns the latest value that has been buffered or null if no such value - * present. - * @return the latest value buffered or null if none - */ - T latest(); - } - - /** Interface to manage eviction checking. */ - interface EvictionPolicy { - /** - * Subscribe-time checking for stale entries. - * @param value the value to test - * @param now the current time - * @return true if the value may be evicted - */ - boolean test(Object value, long now); - /** - * Evict values from the list. - * @param list the node list - */ - void evict(NodeList list); - /** - * Evict values from the list except the very last which is considered - * a terminal event - * @param list the node list - */ - void evictFinal(NodeList list); - } + volatile Node head; + + Node tail; - - // ************************ - // Callback implementations - // ************************ - - /** - * Remove elements from the beginning of the list if the size exceeds some threshold. - */ - static final class SizeEvictionPolicy implements EvictionPolicy { - final int maxSize; + int size; + + volatile boolean done; + Throwable error; - public SizeEvictionPolicy(int maxSize) { - this.maxSize = maxSize; + public ReplaySizeBoundBuffer(int limit) { + this.limit = limit; + Node n = new Node(null); + this.tail = n; + this.head = n; } - + @Override - public void evict(NodeList t1) { - while (t1.size() > maxSize) { - t1.removeFirst(); + public void next(T value) { + Node n = new Node(value); + tail.set(n); + tail = n; + int s = size; + if (s == limit) { + head = head.get(); + } else { + size = s + 1; } } @Override - public boolean test(Object value, long now) { - return false; // size gets never stale + public void error(Throwable ex) { + error = ex; + done = true; } - + @Override - public void evictFinal(NodeList t1) { - while (t1.size() > maxSize + 1) { - t1.removeFirst(); - } - } - } - /** - * Remove elements from the beginning of the list if the Timestamped value is older than - * a threshold. - */ - static final class TimeEvictionPolicy implements EvictionPolicy { - final long maxAgeMillis; - final Scheduler scheduler; - - public TimeEvictionPolicy(long maxAgeMillis, Scheduler scheduler) { - this.maxAgeMillis = maxAgeMillis; - this.scheduler = scheduler; + public void complete() { + done = true; } - + @Override - public void evict(NodeList t1) { - long now = scheduler.now(); - while (!t1.isEmpty()) { - NodeList.Node n = t1.head.next; - if (test(n.value, now)) { - t1.removeFirst(); - } else { - break; + public boolean drain(ReplayProducer rp) { + if (rp.getAndIncrement() != 0) { + return false; + } + + final Subscriber a = rp.actual; + + int missed = 1; + + for (;;) { + + long r = rp.requested.get(); + long e = 0L; + + @SuppressWarnings("unchecked") + Node node = (Node)rp.node; + if (node == null) { + node = head; + } + + while (e != r) { + if (rp.isUnsubscribed()) { + rp.node = null; + return false; + } + + boolean d = done; + Node next = node.get(); + boolean empty = next == null; + + if (d && empty) { + rp.node = null; + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onCompleted(); + } + return false; + } + + if (empty) { + break; + } + + a.onNext(next.value); + + e++; + node = next; + } + + if (e == r) { + if (rp.isUnsubscribed()) { + rp.node = null; + return false; + } + + boolean d = done; + boolean empty = node.get() == null; + + if (d && empty) { + rp.node = null; + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onCompleted(); + } + return false; + } + } + + if (e != 0L) { + if (r != Long.MAX_VALUE) { + BackpressureUtils.produced(rp.requested, e); + } + } + + rp.node = node; + + missed = rp.addAndGet(-missed); + if (missed == 0) { + return r == Long.MAX_VALUE; } } } - - @Override - public void evictFinal(NodeList t1) { - long now = scheduler.now(); - while (t1.size > 1) { - NodeList.Node n = t1.head.next; - if (test(n.value, now)) { - t1.removeFirst(); - } else { - break; - } + + static final class Node extends AtomicReference> { + /** */ + private static final long serialVersionUID = 3713592843205853725L; + + final T value; + + public Node(T value) { + this.value = value; } } @Override - public boolean test(Object value, long now) { - Timestamped ts = (Timestamped)value; - return ts.getTimestampMillis() <= now - maxAgeMillis; + public boolean isComplete() { + return done; } - - } - /** - * Pairs up two eviction policy callbacks. - */ - static final class PairEvictionPolicy implements EvictionPolicy { - final EvictionPolicy first; - final EvictionPolicy second; - - public PairEvictionPolicy(EvictionPolicy first, EvictionPolicy second) { - this.first = first; - this.second = second; + + @Override + public Throwable error() { + return error; } - + @Override - public void evict(NodeList t1) { - first.evict(t1); - second.evict(t1); + public T last() { + Node h = head; + Node n; + while ((n = h.get()) != null) { + h = n; + } + return h.value; } - + @Override - public void evictFinal(NodeList t1) { - first.evictFinal(t1); - second.evictFinal(t1); + public int size() { + int s = 0; + Node n = head.get(); + while (n != null && s != Integer.MAX_VALUE) { + n = n.get(); + s++; + } + return s; } @Override - public boolean test(Object value, long now) { - return first.test(value, now) || second.test(value, now); + public boolean isEmpty() { + return head.get() == null; + } + + @Override + public T[] toArray(T[] a) { + List list = new ArrayList(); + + Node n = head.get(); + while (n != null) { + list.add(n.value); + n = n.get(); + } + return list.toArray(a); } + } - - /** Maps the values to Timestamped. */ - static final class AddTimestamped implements Func1 { + + static final class ReplaySizeAndTimeBoundBuffer implements ReplayBuffer { + final int limit; + + final long maxAgeMillis; + final Scheduler scheduler; + + volatile TimedNode head; + + TimedNode tail; + + int size; - public AddTimestamped(Scheduler scheduler) { + volatile boolean done; + Throwable error; + + public ReplaySizeAndTimeBoundBuffer(int limit, long maxAgeMillis, Scheduler scheduler) { + this.limit = limit; + TimedNode n = new TimedNode(null, 0L); + this.tail = n; + this.head = n; + this.maxAgeMillis = maxAgeMillis; this.scheduler = scheduler; } @Override - public Object call(Object t1) { - return new Timestamped(scheduler.now(), t1); + public void next(T value) { + long now = scheduler.now(); + + TimedNode n = new TimedNode(value, now); + tail.set(n); + tail = n; + + now -= maxAgeMillis; + + int s = size; + TimedNode h0 = head; + TimedNode h = h0; + + if (s == limit) { + h = h.get(); + } else { + s++; + } + + while ((n = h.get()) != null) { + if (n.timestamp > now) { + break; + } + h = n; + s--; + } + + size = s; + if (h != h0) { + head = h; + } } - } - /** Maps timestamped values back to raw objects. */ - static final class RemoveTimestamped implements Func1 { @Override - @SuppressWarnings("unchecked") - public Object call(Object t1) { - return ((Timestamped)t1).getValue(); + public void error(Throwable ex) { + evictFinal(); + error = ex; + done = true; } - } - /** - * Default action of simply replaying the buffer on subscribe. - * @param the input and output value type - */ - static final class DefaultOnAdd implements Action1> { - final BoundedState state; - public DefaultOnAdd(BoundedState state) { - this.state = state; - } - @Override - public void call(SubjectObserver t1) { - NodeList.Node l = state.replayObserverFromIndex(state.head(), t1); - t1.index(l); + public void complete() { + evictFinal(); + done = true; } - } - /** - * Action of replaying non-stale entries of the buffer on subscribe - * @param the input and output value - */ - static final class TimedOnAdd implements Action1> { - final BoundedState state; - final Scheduler scheduler; + void evictFinal() { + long now = scheduler.now() - maxAgeMillis; + + TimedNode h0 = head; + TimedNode h = h0; + TimedNode n; + + while ((n = h.get()) != null) { + if (n.timestamp > now) { + break; + } + h = n; + } + + if (h0 != h) { + head = h; + } + } - public TimedOnAdd(BoundedState state, Scheduler scheduler) { - this.state = state; - this.scheduler = scheduler; + TimedNode latestHead() { + long now = scheduler.now() - maxAgeMillis; + TimedNode h = head; + TimedNode n; + while ((n = h.get()) != null) { + if (n.timestamp > now) { + break; + } + h = n; + } + return h; } @Override - public void call(SubjectObserver t1) { - NodeList.Node l; - if (!state.terminated) { - // ignore stale entries if still active - l = state.replayObserverFromIndexTest(state.head(), t1, scheduler.now()); - } else { - // accept all if terminated - l = state.replayObserverFromIndex(state.head(), t1); + public boolean drain(ReplayProducer rp) { + if (rp.getAndIncrement() != 0) { + return false; + } + + final Subscriber a = rp.actual; + + int missed = 1; + + for (;;) { + + long r = rp.requested.get(); + long e = 0L; + + @SuppressWarnings("unchecked") + TimedNode node = (TimedNode)rp.node; + if (node == null) { + node = latestHead(); + } + + while (e != r) { + if (rp.isUnsubscribed()) { + rp.node = null; + return false; + } + + boolean d = done; + TimedNode next = node.get(); + boolean empty = next == null; + + if (d && empty) { + rp.node = null; + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onCompleted(); + } + return false; + } + + if (empty) { + break; + } + + a.onNext(next.value); + + e++; + node = next; + } + + if (e == r) { + if (rp.isUnsubscribed()) { + rp.node = null; + return false; + } + + boolean d = done; + boolean empty = node.get() == null; + + if (d && empty) { + rp.node = null; + Throwable ex = error; + if (ex != null) { + a.onError(ex); + } else { + a.onCompleted(); + } + return false; + } + } + + if (e != 0L) { + if (r != Long.MAX_VALUE) { + BackpressureUtils.produced(rp.requested, e); + } + } + + rp.node = node; + + missed = rp.addAndGet(-missed); + if (missed == 0) { + return r == Long.MAX_VALUE; + } } - t1.index(l); } - - } - /** - * A singly-linked list with volatile next node pointer. - * @param the value type - */ - static final class NodeList { - /** - * The node containing the value and references to neighbours. - * @param the value type - */ - static final class Node { - /** The managed value. */ + + static final class TimedNode extends AtomicReference> { + /** */ + private static final long serialVersionUID = 3713592843205853725L; + final T value; - /** The hard reference to the next node. */ - volatile Node next; - Node(T value) { + + final long timestamp; + + public TimedNode(T value, long timestamp) { this.value = value; + this.timestamp = timestamp; } } - /** The head of the list. */ - final Node head = new Node(null); - /** The tail of the list. */ - Node tail = head; - /** The number of elements in the list. */ - int size; - - public void addLast(T value) { - Node t = tail; - Node t2 = new Node(value); - t.next = t2; - tail = t2; - size++; - } - public T removeFirst() { - if (head.next == null) { - throw new IllegalStateException("Empty!"); - } - Node t = head.next; - head.next = t.next; - if (head.next == null) { - tail = head; - } - size--; - return t.value; - } - public boolean isEmpty() { - return size == 0; + + @Override + public boolean isComplete() { + return done; } - public int size() { - return size; + + @Override + public Throwable error() { + return error; } - public void clear() { - tail = head; - size = 0; + + @Override + public T last() { + TimedNode h = latestHead(); + TimedNode n; + while ((n = h.get()) != null) { + h = n; + } + return h.value; } - } - /** Empty eviction policy. */ - static final class EmptyEvictionPolicy implements EvictionPolicy { + @Override - public boolean test(Object value, long now) { - return true; + public int size() { + int s = 0; + TimedNode n = latestHead().get(); + while (n != null && s != Integer.MAX_VALUE) { + n = n.get(); + s++; + } + return s; } + @Override - public void evict(NodeList list) { + public boolean isEmpty() { + return latestHead().get() == null; } + @Override - public void evictFinal(NodeList list) { + public T[] toArray(T[] a) { + List list = new ArrayList(); + + TimedNode n = latestHead().get(); + while (n != null) { + list.add(n.value); + n = n.get(); + } + return list.toArray(a); } - } - /** - * Check if the Subject has terminated with an exception. - * @return true if the subject has received a throwable through {@code onError}. - */ - @Beta - public boolean hasThrowable() { - NotificationLite nl = ssm.nl; - Object o = ssm.getLatest(); - return nl.isError(o); - } - /** - * Check if the Subject has terminated normally. - * @return true if the subject completed normally via {@code onCompleted} - */ - @Beta - public boolean hasCompleted() { - NotificationLite nl = ssm.nl; - Object o = ssm.getLatest(); - return o != null && !nl.isError(o); + } + /** - * Returns the Throwable that terminated the Subject. - * @return the Throwable that terminated the Subject or {@code null} if the - * subject hasn't terminated yet or it terminated normally. + * A producer and subscription implementation that tracks the current + * replay position of a particular subscriber. + *

+ * The this holds the current work-in-progress indicator used by serializing + * replays. + * + * @param the value type */ - @Beta - public Throwable getThrowable() { - NotificationLite nl = ssm.nl; - Object o = ssm.getLatest(); - if (nl.isError(o)) { - return nl.getError(o); + static final class ReplayProducer + extends AtomicInteger + implements Producer, Subscription { + /** */ + private static final long serialVersionUID = -5006209596735204567L; + + /** The wrapped Subscriber instance. */ + final Subscriber actual; + + /** Holds the current requested amount. */ + final AtomicLong requested; + + /** Holds the back-reference to the replay state object. */ + final ReplayState state; + + /** + * Indicates this Subscriber runs unbounded and the source-triggered + * buffer.drain() has emitted all available values. + *

+ * This field has to be read and written from the source emitter's thread only. + */ + boolean caughtUp; + + /** + * Unbounded buffer.drain() uses this field to remember the absolute index of + * values replayed to this Subscriber. + */ + int index; + + /** + * Unbounded buffer.drain() uses this index within its current node to indicate + * how many items were replayed from that particular node so far. + */ + int tailIndex; + + /** + * Stores the current replay node of the buffer to be used by buffer.drain(). + */ + Object node; + + public ReplayProducer(Subscriber actual, ReplayState state) { + this.actual = actual; + this.requested = new AtomicLong(); + this.state = state; } - return null; - } - /** - * Returns the current number of items (non-terminal events) available for replay. - * @return the number of items available - */ - @Beta - public int size() { - return state.size(); - } - /** - * @return true if the Subject holds at least one non-terminal event available for replay - */ - @Beta - public boolean hasAnyValue() { - return !state.isEmpty(); - } - @Beta - public boolean hasValue() { - return hasAnyValue(); - } - /** - * Returns a snapshot of the currently buffered non-terminal events into - * the provided {@code a} array or creates a new array if it has not enough capacity. - * @param a the array to fill in - * @return the array {@code a} if it had enough capacity or a new array containing the available values - */ - @Beta - public T[] getValues(T[] a) { - return state.toArray(a); - } - - /** An empty array to trigger getValues() to return a new array. */ - private static final Object[] EMPTY_ARRAY = new Object[0]; - - /** - * Returns a snapshot of the currently buffered non-terminal events. - *

The operation is threadsafe. - * - * @return a snapshot of the currently buffered non-terminal events. - * @since (If this graduates from being an Experimental class method, replace this parenthetical with the release number) - */ - @SuppressWarnings("unchecked") - @Beta - public Object[] getValues() { - T[] r = getValues((T[])EMPTY_ARRAY); - if (r == EMPTY_ARRAY) { - return new Object[0]; // don't leak the default empty array. + + @Override + public void unsubscribe() { + state.remove(this); } - return r; - } - - @Beta - public T getValue() { - return state.latest(); + + @Override + public boolean isUnsubscribed() { + return actual.isUnsubscribed(); + } + + @Override + public void request(long n) { + if (n > 0L) { + BackpressureUtils.getAndAddRequest(requested, n); + state.buffer.drain(this); + } else if (n < 0L) { + throw new IllegalArgumentException("n >= required but it was " + n); + } + } + } } diff --git a/src/test/java/rx/subjects/ReplaySubjectTest.java b/src/test/java/rx/subjects/ReplaySubjectTest.java index cd04fc02ccf..7d5b5b87857 100644 --- a/src/test/java/rx/subjects/ReplaySubjectTest.java +++ b/src/test/java/rx/subjects/ReplaySubjectTest.java @@ -443,52 +443,6 @@ public void onCompleted() { verify(o, never()).onError(any(Throwable.class)); } @Test - public void testNodeListSimpleAddRemove() { - ReplaySubject.NodeList list = new ReplaySubject.NodeList(); - - assertEquals(0, list.size()); - - // add and remove one - - list.addLast(1); - - assertEquals(1, list.size()); - - assertEquals((Integer)1, list.removeFirst()); - - assertEquals(0, list.size()); - - // add and remove one again - - list.addLast(1); - - assertEquals(1, list.size()); - - assertEquals((Integer)1, list.removeFirst()); - - // add and remove two items - - list.addLast(1); - list.addLast(2); - - assertEquals(2, list.size()); - - assertEquals((Integer)1, list.removeFirst()); - assertEquals((Integer)2, list.removeFirst()); - - assertEquals(0, list.size()); - // clear two items - - list.addLast(1); - list.addLast(2); - - assertEquals(2, list.size()); - - list.clear(); - - assertEquals(0, list.size()); - } - @Test public void testReplay1AfterTermination() { ReplaySubject source = ReplaySubject.createWithSize(1); @@ -555,7 +509,7 @@ public void testReplayTimestampedAfterTermination() { verify(o, never()).onNext(1); verify(o, never()).onNext(2); - verify(o).onNext(3); + verify(o, never()).onNext(3); // late subscribers no longer replay stale data verify(o).onCompleted(); verify(o, never()).onError(any(Throwable.class)); } @@ -844,9 +798,12 @@ public void testSizeAndHasAnyValueTimeBounded() { for (int i = 0; i < 1000; i++) { rs.onNext(i); - ts.advanceTimeBy(2, TimeUnit.SECONDS); + ts.advanceTimeBy(500, TimeUnit.MILLISECONDS); assertEquals(1, rs.size()); assertTrue(rs.hasAnyValue()); + ts.advanceTimeBy(1500, TimeUnit.MILLISECONDS); + assertEquals(0, rs.size()); // stale data no longer peekable + assertFalse(rs.hasAnyValue()); } rs.onCompleted(); @@ -1049,4 +1006,139 @@ public void testReplaySubjectBoundedError() { assertArrayEquals(new Integer[] { null }, async.getValues(new Integer[] { 0 })); assertArrayEquals(new Integer[] { null, 0 }, async.getValues(new Integer[] { 0, 0 })); } + + void backpressureLive(ReplaySubject rs) { + TestSubscriber ts = TestSubscriber.create(0); + + rs.subscribe(ts); + + for (int i = 1; i <= 5; i++) { + rs.onNext(i); + } + + ts.assertNoValues(); + + ts.requestMore(2); + + ts.assertValues(1, 2); + + ts.requestMore(6); + + ts.assertValues(1, 2, 3, 4, 5); + + for (int i = 6; i <= 10; i++) { + rs.onNext(i); + } + + ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8); + + rs.onCompleted(); + + ts.assertNotCompleted(); + + ts.requestMore(2); + + ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + void backpressureOffline(ReplaySubject rs) { + TestSubscriber ts = TestSubscriber.create(0); + + for (int i = 1; i <= 10; i++) { + rs.onNext(i); + } + rs.onCompleted(); + + rs.subscribe(ts); + + ts.assertNoValues(); + + ts.requestMore(2); + + ts.assertValues(1, 2); + + ts.requestMore(6); + + ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8); + + ts.assertNotCompleted(); + + ts.requestMore(2); + + ts.assertValues(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + void backpressureOffline5(ReplaySubject rs) { + TestSubscriber ts = TestSubscriber.create(0); + + for (int i = 1; i <= 10; i++) { + rs.onNext(i); + } + rs.onCompleted(); + + rs.subscribe(ts); + + ts.assertNoValues(); + + ts.requestMore(2); + + ts.assertValues(6, 7); + + ts.requestMore(2); + + ts.assertValues(6, 7, 8, 9); + + ts.assertNotCompleted(); + + ts.requestMore(1); + + ts.assertValues(6, 7, 8, 9, 10); + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void backpressureUnboundedLive() { + backpressureLive(ReplaySubject.create()); + } + + @Test + public void backpressureSizeBoundLive() { + backpressureLive(ReplaySubject.createWithSize(1)); + backpressureLive(ReplaySubject.createWithSize(5)); + backpressureLive(ReplaySubject.createWithSize(10)); + backpressureLive(ReplaySubject.createWithSize(100)); + } + + @Test + public void backpressureSizeAndTimeLive() { + backpressureLive(ReplaySubject.createWithTimeAndSize(1, TimeUnit.DAYS, 1, Schedulers.immediate())); + backpressureLive(ReplaySubject.createWithTimeAndSize(1, TimeUnit.DAYS, 5, Schedulers.immediate())); + backpressureLive(ReplaySubject.createWithTimeAndSize(1, TimeUnit.DAYS, 10, Schedulers.immediate())); + backpressureLive(ReplaySubject.createWithTimeAndSize(1, TimeUnit.DAYS, 100, Schedulers.immediate())); + } + + @Test + public void backpressureUnboundedOffline() { + backpressureOffline(ReplaySubject.create()); + } + + @Test + public void backpressureSizeBoundOffline() { + backpressureOffline5(ReplaySubject.createWithSize(5)); + backpressureOffline(ReplaySubject.createWithSize(10)); + backpressureOffline(ReplaySubject.createWithSize(100)); + } + + @Test + public void backpressureSizeAndTimeOffline() { + backpressureOffline5(ReplaySubject.createWithTimeAndSize(1, TimeUnit.DAYS, 5, Schedulers.immediate())); + backpressureOffline(ReplaySubject.createWithTimeAndSize(1, TimeUnit.DAYS, 10, Schedulers.immediate())); + backpressureOffline(ReplaySubject.createWithTimeAndSize(1, TimeUnit.DAYS, 100, Schedulers.immediate())); + } + }