diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 89468cec2b..c80e8be95d 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -6164,7 +6164,7 @@ public final > Observable buffer(Callable< @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable cache() { - return ObservableCache.from(this); + return cacheWithInitialCapacity(16); } /** @@ -6222,7 +6222,8 @@ public final Observable cache() { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Observable cacheWithInitialCapacity(int initialCapacity) { - return ObservableCache.from(this, initialCapacity); + ObjectHelper.verifyPositive(initialCapacity, "initialCapacity"); + return RxJavaPlugins.onAssembly(new ObservableCache(this, initialCapacity)); } /** diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java index db4ccecfa1..830b0984ac 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCache.java @@ -19,7 +19,7 @@ import io.reactivex.*; import io.reactivex.internal.subscriptions.SubscriptionHelper; -import io.reactivex.internal.util.*; +import io.reactivex.internal.util.BackpressureHelper; import io.reactivex.plugins.RxJavaPlugins; /** @@ -28,45 +28,93 @@ * * @param the source element type */ -public final class FlowableCache extends AbstractFlowableWithUpstream { - /** The cache and replay state. */ - final CacheState state; +public final class FlowableCache extends AbstractFlowableWithUpstream +implements FlowableSubscriber { + /** + * The subscription to the source should happen at most once. + */ final AtomicBoolean once; /** - * Private constructor because state needs to be shared between the Observable body and - * the onSubscribe function. - * @param source the upstream source whose signals to cache - * @param capacityHint the capacity hint + * The number of items per cached nodes. + */ + final int capacityHint; + + /** + * The current known array of subscriber state to notify. + */ + final AtomicReference[]> subscribers; + + /** + * A shared instance of an empty array of subscribers to avoid creating + * a new empty array when all subscribers cancel. + */ + @SuppressWarnings("rawtypes") + static final CacheSubscription[] EMPTY = new CacheSubscription[0]; + /** + * A shared instance indicating the source has no more events and there + * is no need to remember subscribers anymore. + */ + @SuppressWarnings("rawtypes") + static final CacheSubscription[] TERMINATED = new CacheSubscription[0]; + + /** + * The total number of elements in the list available for reads. + */ + volatile long size; + + /** + * The starting point of the cached items. */ + final Node head; + + /** + * The current tail of the linked structure holding the items. + */ + Node tail; + + /** + * How many items have been put into the tail node so far. + */ + int tailOffset; + + /** + * If {@link #subscribers} is {@link #TERMINATED}, this holds the terminal error if not null. + */ + Throwable error; + + /** + * True if the source has terminated. + */ + volatile boolean done; + + /** + * Constructs an empty, non-connected cache. + * @param source the source to subscribe to for the first incoming subscriber + * @param capacityHint the number of items expected (reduce allocation frequency) + */ + @SuppressWarnings("unchecked") public FlowableCache(Flowable source, int capacityHint) { super(source); - this.state = new CacheState(source, capacityHint); + this.capacityHint = capacityHint; this.once = new AtomicBoolean(); + Node n = new Node(capacityHint); + this.head = n; + this.tail = n; + this.subscribers = new AtomicReference[]>(EMPTY); } @Override protected void subscribeActual(Subscriber t) { - // we can connect first because we replay everything anyway - ReplaySubscription rp = new ReplaySubscription(t, state); - t.onSubscribe(rp); - - boolean doReplay = true; - if (state.addChild(rp)) { - if (rp.requested.get() == ReplaySubscription.CANCELLED) { - state.removeChild(rp); - doReplay = false; - } - } + CacheSubscription consumer = new CacheSubscription(t, this); + t.onSubscribe(consumer); + add(consumer); - // we ensure a single connection here to save an instance field of AtomicBoolean in state. if (!once.get() && once.compareAndSet(false, true)) { - state.connect(); - } - - if (doReplay) { - rp.replay(); + source.subscribe(this); + } else { + replay(consumer); } } @@ -75,7 +123,7 @@ protected void subscribeActual(Subscriber t) { * @return true if already connected */ /* public */boolean isConnected() { - return state.isConnected; + return once.get(); } /** @@ -83,208 +131,248 @@ protected void subscribeActual(Subscriber t) { * @return true if the cache has Subscribers */ /* public */ boolean hasSubscribers() { - return state.subscribers.get().length != 0; + return subscribers.get().length != 0; } /** * Returns the number of events currently cached. * @return the number of currently cached event count */ - /* public */ int cachedEventCount() { - return state.size(); + /* public */ long cachedEventCount() { + return size; } /** - * Contains the active child subscribers and the values to replay. - * - * @param the value type of the cached items + * Atomically adds the consumer to the {@link #subscribers} copy-on-write array + * if the source has not yet terminated. + * @param consumer the consumer to add */ - static final class CacheState extends LinkedArrayList implements FlowableSubscriber { - /** The source observable to connect to. */ - final Flowable source; - /** Holds onto the subscriber connected to source. */ - final AtomicReference connection = new AtomicReference(); - /** Guarded by connection (not this). */ - final AtomicReference[]> subscribers; - /** The default empty array of subscribers. */ - @SuppressWarnings("rawtypes") - static final ReplaySubscription[] EMPTY = new ReplaySubscription[0]; - /** The default empty array of subscribers. */ - @SuppressWarnings("rawtypes") - static final ReplaySubscription[] TERMINATED = new ReplaySubscription[0]; - - /** Set to true after connection. */ - volatile boolean isConnected; - /** - * Indicates that the source has completed emitting values or the - * Observable was forcefully terminated. - */ - boolean sourceDone; + void add(CacheSubscription consumer) { + for (;;) { + CacheSubscription[] current = subscribers.get(); + if (current == TERMINATED) { + return; + } + int n = current.length; - @SuppressWarnings("unchecked") - CacheState(Flowable source, int capacityHint) { - super(capacityHint); - this.source = source; - this.subscribers = new AtomicReference[]>(EMPTY); + @SuppressWarnings("unchecked") + CacheSubscription[] next = new CacheSubscription[n + 1]; + System.arraycopy(current, 0, next, 0, n); + next[n] = consumer; + + if (subscribers.compareAndSet(current, next)) { + return; + } } - /** - * Adds a ReplaySubscription to the subscribers array atomically. - * @param p the target ReplaySubscription wrapping a downstream Subscriber with state - * @return true if the ReplaySubscription was added or false if the cache is already terminated - */ - public boolean addChild(ReplaySubscription p) { - // guarding by connection to save on allocating another object - // thus there are two distinct locks guarding the value-addition and child come-and-go - for (;;) { - ReplaySubscription[] a = subscribers.get(); - if (a == TERMINATED) { - return false; - } - int n = a.length; - @SuppressWarnings("unchecked") - ReplaySubscription[] b = new ReplaySubscription[n + 1]; - System.arraycopy(a, 0, b, 0, n); - b[n] = p; - if (subscribers.compareAndSet(a, b)) { - return true; + } + + /** + * Atomically removes the consumer from the {@link #subscribers} copy-on-write array. + * @param consumer the consumer to remove + */ + @SuppressWarnings("unchecked") + void remove(CacheSubscription consumer) { + for (;;) { + CacheSubscription[] current = subscribers.get(); + int n = current.length; + if (n == 0) { + return; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (current[i] == consumer) { + j = i; + break; } } + + if (j < 0) { + return; + } + CacheSubscription[] next; + + if (n == 1) { + next = EMPTY; + } else { + next = new CacheSubscription[n - 1]; + System.arraycopy(current, 0, next, 0, j); + System.arraycopy(current, j + 1, next, j, n - j - 1); + } + + if (subscribers.compareAndSet(current, next)) { + return; + } + } + } + + /** + * Replays the contents of this cache to the given consumer based on its + * current state and number of items requested by it. + * @param consumer the consumer to continue replaying items to + */ + void replay(CacheSubscription consumer) { + // make sure there is only one replay going on at a time + if (consumer.getAndIncrement() != 0) { + return; } - /** - * Removes the ReplaySubscription (if present) from the subscribers array atomically. - * @param p the target ReplaySubscription wrapping a downstream Subscriber with state - */ - @SuppressWarnings("unchecked") - public void removeChild(ReplaySubscription p) { - for (;;) { - ReplaySubscription[] a = subscribers.get(); - int n = a.length; - if (n == 0) { - return; - } - int j = -1; - for (int i = 0; i < n; i++) { - if (a[i].equals(p)) { - j = i; - break; - } - } - if (j < 0) { - return; - } - ReplaySubscription[] b; - if (n == 1) { - b = EMPTY; + // see if there were more replay request in the meantime + int missed = 1; + // read out state into locals upfront to avoid being re-read due to volatile reads + long index = consumer.index; + int offset = consumer.offset; + Node node = consumer.node; + AtomicLong requested = consumer.requested; + Subscriber downstream = consumer.downstream; + int capacity = capacityHint; + + for (;;) { + // first see if the source has terminated, read order matters! + boolean sourceDone = done; + // and if the number of items is the same as this consumer has received + boolean empty = size == index; + + // if the source is done and we have all items so far, terminate the consumer + if (sourceDone && empty) { + // release the node object to avoid leaks through retained consumers + consumer.node = null; + // if error is not null then the source failed + Throwable ex = error; + if (ex != null) { + downstream.onError(ex); } else { - b = new ReplaySubscription[n - 1]; - System.arraycopy(a, 0, b, 0, j); - System.arraycopy(a, j + 1, b, j, n - j - 1); + downstream.onComplete(); } - if (subscribers.compareAndSet(a, b)) { + return; + } + + // there are still items not sent to the consumer + if (!empty) { + // see how many items the consumer has requested in total so far + long consumerRequested = requested.get(); + // MIN_VALUE indicates a cancelled consumer, we stop replaying + if (consumerRequested == Long.MIN_VALUE) { + // release the node object to avoid leaks through retained consumers + consumer.node = null; return; } - } - } + // if the consumer has requested more and there is more, we will emit an item + if (consumerRequested != index) { + + // if the offset in the current node has reached the node capacity + if (offset == capacity) { + // switch to the subsequent node + node = node.next; + // reset the in-node offset + offset = 0; + } - @Override - public void onSubscribe(Subscription s) { - SubscriptionHelper.setOnce(connection, s, Long.MAX_VALUE); - } + // emit the cached item + downstream.onNext(node.values[offset]); - /** - * Connects the cache to the source. - * Make sure this is called only once. - */ - public void connect() { - source.subscribe(this); - isConnected = true; - } + // move the node offset forward + offset++; + // move the total consumed item count forward + index++; - @Override - public void onNext(T t) { - if (!sourceDone) { - Object o = NotificationLite.next(t); - add(o); - for (ReplaySubscription rp : subscribers.get()) { - rp.replay(); + // retry for the next item/terminal event if any + continue; } } - } - @SuppressWarnings("unchecked") - @Override - public void onError(Throwable e) { - if (!sourceDone) { - sourceDone = true; - Object o = NotificationLite.error(e); - add(o); - SubscriptionHelper.cancel(connection); - for (ReplaySubscription rp : subscribers.getAndSet(TERMINATED)) { - rp.replay(); - } - } else { - RxJavaPlugins.onError(e); + // commit the changed references back + consumer.index = index; + consumer.offset = offset; + consumer.node = node; + // release the changes and see if there were more replay request in the meantime + missed = consumer.addAndGet(-missed); + if (missed == 0) { + break; } } + } - @SuppressWarnings("unchecked") - @Override - public void onComplete() { - if (!sourceDone) { - sourceDone = true; - Object o = NotificationLite.complete(); - add(o); - SubscriptionHelper.cancel(connection); - for (ReplaySubscription rp : subscribers.getAndSet(TERMINATED)) { - rp.replay(); - } - } + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + } + + @Override + public void onNext(T t) { + int tailOffset = this.tailOffset; + // if the current tail node is full, create a fresh node + if (tailOffset == capacityHint) { + Node n = new Node(tailOffset); + n.values[0] = t; + this.tailOffset = 1; + tail.next = n; + tail = n; + } else { + tail.values[tailOffset] = t; + this.tailOffset = tailOffset + 1; + } + size++; + for (CacheSubscription consumer : subscribers.get()) { + replay(consumer); + } + } + + @SuppressWarnings("unchecked") + @Override + public void onError(Throwable t) { + if (done) { + RxJavaPlugins.onError(t); + return; + } + error = t; + done = true; + for (CacheSubscription consumer : subscribers.getAndSet(TERMINATED)) { + replay(consumer); + } + } + + @SuppressWarnings("unchecked") + @Override + public void onComplete() { + done = true; + for (CacheSubscription consumer : subscribers.getAndSet(TERMINATED)) { + replay(consumer); } } /** - * Keeps track of the current request amount and the replay position for a child Subscriber. - * - * @param + * Hosts the downstream consumer and its current requested and replay states. + * {@code this} holds the work-in-progress counter for the serialized replay. + * @param the value type */ - static final class ReplaySubscription - extends AtomicInteger implements Subscription { + static final class CacheSubscription extends AtomicInteger + implements Subscription { - private static final long serialVersionUID = -2557562030197141021L; - private static final long CANCELLED = Long.MIN_VALUE; - /** The actual child subscriber. */ - final Subscriber child; - /** The cache state object. */ - final CacheState state; + private static final long serialVersionUID = 6770240836423125754L; + + final Subscriber downstream; + + final FlowableCache parent; - /** - * Number of items requested and also the cancelled indicator if - * it contains {@link #CANCELLED}. - */ final AtomicLong requested; - /** - * Contains the reference to the buffer segment in replay. - * Accessed after reading state.size() and when emitting == true. - */ - Object[] currentBuffer; - /** - * Contains the index into the currentBuffer where the next value is expected. - * Accessed after reading state.size() and when emitting == true. - */ - int currentIndexInBuffer; - /** - * Contains the absolute index up until the values have been replayed so far. - */ - int index; + Node node; - /** Number of items emitted so far. */ - long emitted; + int offset; - ReplaySubscription(Subscriber child, CacheState state) { - this.child = child; - this.state = state; + long index; + + /** + * Constructs a new instance with the actual downstream consumer and + * the parent cache object. + * @param downstream the actual consumer + * @param parent the parent that holds onto the cached items + */ + CacheSubscription(Subscriber downstream, FlowableCache parent) { + this.downstream = downstream; + this.parent = parent; + this.node = parent.head; this.requested = new AtomicLong(); } @@ -292,99 +380,38 @@ static final class ReplaySubscription public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.addCancel(requested, n); - replay(); + parent.replay(this); } } @Override public void cancel() { - if (requested.getAndSet(CANCELLED) != CANCELLED) { - state.removeChild(this); + if (requested.getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { + parent.remove(this); } } + } + + /** + * Represents a segment of the cached item list as + * part of a linked-node-list structure. + * @param the element type + */ + static final class Node { /** - * Continue replaying available values if there are requests for them. + * The array of values held by this node. */ - public void replay() { - if (getAndIncrement() != 0) { - return; - } - - int missed = 1; - final Subscriber child = this.child; - AtomicLong rq = requested; - long e = emitted; - - for (;;) { + final T[] values; - long r = rq.get(); - - if (r == CANCELLED) { - return; - } - - // read the size, if it is non-zero, we can safely read the head and - // read values up to the given absolute index - int s = state.size(); - if (s != 0) { - Object[] b = currentBuffer; - - // latch onto the very first buffer now that it is available. - if (b == null) { - b = state.head(); - currentBuffer = b; - } - final int n = b.length - 1; - int j = index; - int k = currentIndexInBuffer; - - while (j < s && e != r) { - if (rq.get() == CANCELLED) { - return; - } - if (k == n) { - b = (Object[])b[n]; - k = 0; - } - Object o = b[k]; - - if (NotificationLite.accept(o, child)) { - return; - } - - k++; - j++; - e++; - } - - if (rq.get() == CANCELLED) { - return; - } - - if (r == e) { - Object o = b[k]; - if (NotificationLite.isComplete(o)) { - child.onComplete(); - return; - } else - if (NotificationLite.isError(o)) { - child.onError(NotificationLite.getError(o)); - return; - } - } - - index = j; - currentIndexInBuffer = k; - currentBuffer = b; - } + /** + * The next node if not null. + */ + volatile Node next; - emitted = e; - missed = addAndGet(-missed); - if (missed == 0) { - break; - } - } + @SuppressWarnings("unchecked") + Node(int capacityHint) { + this.values = (T[])new Object[capacityHint]; } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableCache.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableCache.java index 3bfe796efc..fdc3477b75 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableCache.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableCache.java @@ -17,10 +17,6 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; -import io.reactivex.internal.disposables.SequentialDisposable; -import io.reactivex.internal.functions.ObjectHelper; -import io.reactivex.internal.util.*; -import io.reactivex.plugins.RxJavaPlugins; /** * An observable which auto-connects to another observable, caches the elements @@ -28,61 +24,94 @@ * * @param the source element type */ -public final class ObservableCache extends AbstractObservableWithUpstream { - /** The cache and replay state. */ - final CacheState state; +public final class ObservableCache extends AbstractObservableWithUpstream +implements Observer { + /** + * The subscription to the source should happen at most once. + */ final AtomicBoolean once; /** - * Creates a cached Observable with a default capacity hint of 16. - * @param the value type - * @param source the source Observable to cache - * @return the CachedObservable instance + * The number of items per cached nodes. */ - public static Observable from(Observable source) { - return from(source, 16); - } + final int capacityHint; /** - * Creates a cached Observable with the given capacity hint. - * @param the value type - * @param source the source Observable to cache - * @param capacityHint the hint for the internal buffer size - * @return the CachedObservable instance + * The current known array of observer state to notify. */ - public static Observable from(Observable source, int capacityHint) { - ObjectHelper.verifyPositive(capacityHint, "capacityHint"); - CacheState state = new CacheState(source, capacityHint); - return RxJavaPlugins.onAssembly(new ObservableCache(source, state)); - } + final AtomicReference[]> observers; + + /** + * A shared instance of an empty array of observers to avoid creating + * a new empty array when all observers dispose. + */ + @SuppressWarnings("rawtypes") + static final CacheDisposable[] EMPTY = new CacheDisposable[0]; + /** + * A shared instance indicating the source has no more events and there + * is no need to remember observers anymore. + */ + @SuppressWarnings("rawtypes") + static final CacheDisposable[] TERMINATED = new CacheDisposable[0]; + + /** + * The total number of elements in the list available for reads. + */ + volatile long size; + + /** + * The starting point of the cached items. + */ + final Node head; + + /** + * The current tail of the linked structure holding the items. + */ + Node tail; + + /** + * How many items have been put into the tail node so far. + */ + int tailOffset; + + /** + * If {@link #observers} is {@link #TERMINATED}, this holds the terminal error if not null. + */ + Throwable error; + + /** + * True if the source has terminated. + */ + volatile boolean done; /** - * Private constructor because state needs to be shared between the Observable body and - * the onSubscribe function. - * @param source the source Observable to cache - * @param state the cache state object + * Constructs an empty, non-connected cache. + * @param source the source to subscribe to for the first incoming observer + * @param capacityHint the number of items expected (reduce allocation frequency) */ - private ObservableCache(Observable source, CacheState state) { + @SuppressWarnings("unchecked") + public ObservableCache(Observable source, int capacityHint) { super(source); - this.state = state; + this.capacityHint = capacityHint; this.once = new AtomicBoolean(); + Node n = new Node(capacityHint); + this.head = n; + this.tail = n; + this.observers = new AtomicReference[]>(EMPTY); } @Override protected void subscribeActual(Observer t) { - // we can connect first because we replay everything anyway - ReplayDisposable rp = new ReplayDisposable(t, state); - t.onSubscribe(rp); - - state.addChild(rp); + CacheDisposable consumer = new CacheDisposable(t, this); + t.onSubscribe(consumer); + add(consumer); - // we ensure a single connection here to save an instance field of AtomicBoolean in state. if (!once.get() && once.compareAndSet(false, true)) { - state.connect(); + source.subscribe(this); + } else { + replay(consumer); } - - rp.replay(); } /** @@ -90,290 +119,281 @@ protected void subscribeActual(Observer t) { * @return true if already connected */ /* public */boolean isConnected() { - return state.isConnected; + return once.get(); } /** * Returns true if there are observers subscribed to this observable. - * @return true if the cache has downstream Observers + * @return true if the cache has observers */ /* public */ boolean hasObservers() { - return state.observers.get().length != 0; + return observers.get().length != 0; } /** * Returns the number of events currently cached. - * @return the current number of elements in the cache + * @return the number of currently cached event count */ - /* public */ int cachedEventCount() { - return state.size(); + /* public */ long cachedEventCount() { + return size; } /** - * Contains the active child observers and the values to replay. - * - * @param + * Atomically adds the consumer to the {@link #observers} copy-on-write array + * if the source has not yet terminated. + * @param consumer the consumer to add */ - static final class CacheState extends LinkedArrayList implements Observer { - /** The source observable to connect to. */ - final Observable source; - /** Holds onto the subscriber connected to source. */ - final SequentialDisposable connection; - /** Guarded by connection (not this). */ - final AtomicReference[]> observers; - /** The default empty array of observers. */ - @SuppressWarnings("rawtypes") - static final ReplayDisposable[] EMPTY = new ReplayDisposable[0]; - /** The default empty array of observers. */ - @SuppressWarnings("rawtypes") - static final ReplayDisposable[] TERMINATED = new ReplayDisposable[0]; - - /** Set to true after connection. */ - volatile boolean isConnected; - /** - * Indicates that the source has completed emitting values or the - * Observable was forcefully terminated. - */ - boolean sourceDone; + void add(CacheDisposable consumer) { + for (;;) { + CacheDisposable[] current = observers.get(); + if (current == TERMINATED) { + return; + } + int n = current.length; - @SuppressWarnings("unchecked") - CacheState(Observable source, int capacityHint) { - super(capacityHint); - this.source = source; - this.observers = new AtomicReference[]>(EMPTY); - this.connection = new SequentialDisposable(); - } - /** - * Adds a ReplayDisposable to the observers array atomically. - * @param p the target ReplayDisposable wrapping a downstream Observer with additional state - * @return true if the disposable was added, false otherwise - */ - public boolean addChild(ReplayDisposable p) { - // guarding by connection to save on allocating another object - // thus there are two distinct locks guarding the value-addition and child come-and-go - for (;;) { - ReplayDisposable[] a = observers.get(); - if (a == TERMINATED) { - return false; - } - int n = a.length; - - @SuppressWarnings("unchecked") - ReplayDisposable[] b = new ReplayDisposable[n + 1]; - System.arraycopy(a, 0, b, 0, n); - b[n] = p; - if (observers.compareAndSet(a, b)) { - return true; - } + @SuppressWarnings("unchecked") + CacheDisposable[] next = new CacheDisposable[n + 1]; + System.arraycopy(current, 0, next, 0, n); + next[n] = consumer; + + if (observers.compareAndSet(current, next)) { + return; } } - /** - * Removes the ReplayDisposable (if present) from the observers array atomically. - * @param p the target ReplayDisposable wrapping a downstream Observer with additional state - */ - @SuppressWarnings("unchecked") - public void removeChild(ReplayDisposable p) { - for (;;) { - ReplayDisposable[] a = observers.get(); - int n = a.length; - if (n == 0) { - return; - } - int j = -1; - for (int i = 0; i < n; i++) { - if (a[i].equals(p)) { - j = i; - break; - } - } - if (j < 0) { - return; - } - ReplayDisposable[] b; - if (n == 1) { - b = EMPTY; - } else { - b = new ReplayDisposable[n - 1]; - System.arraycopy(a, 0, b, 0, j); - System.arraycopy(a, j + 1, b, j, n - j - 1); - } - if (observers.compareAndSet(a, b)) { - return; + } + + /** + * Atomically removes the consumer from the {@link #observers} copy-on-write array. + * @param consumer the consumer to remove + */ + @SuppressWarnings("unchecked") + void remove(CacheDisposable consumer) { + for (;;) { + CacheDisposable[] current = observers.get(); + int n = current.length; + if (n == 0) { + return; + } + + int j = -1; + for (int i = 0; i < n; i++) { + if (current[i] == consumer) { + j = i; + break; } } - } - @Override - public void onSubscribe(Disposable d) { - connection.update(d); + if (j < 0) { + return; + } + CacheDisposable[] next; + + if (n == 1) { + next = EMPTY; + } else { + next = new CacheDisposable[n - 1]; + System.arraycopy(current, 0, next, 0, j); + System.arraycopy(current, j + 1, next, j, n - j - 1); + } + + if (observers.compareAndSet(current, next)) { + return; + } } + } - /** - * Connects the cache to the source. - * Make sure this is called only once. - */ - public void connect() { - source.subscribe(this); - isConnected = true; + /** + * Replays the contents of this cache to the given consumer based on its + * current state and number of items requested by it. + * @param consumer the consumer to continue replaying items to + */ + void replay(CacheDisposable consumer) { + // make sure there is only one replay going on at a time + if (consumer.getAndIncrement() != 0) { + return; } - @Override - public void onNext(T t) { - if (!sourceDone) { - Object o = NotificationLite.next(t); - add(o); - for (ReplayDisposable rp : observers.get()) { - rp.replay(); - } + // see if there were more replay request in the meantime + int missed = 1; + // read out state into locals upfront to avoid being re-read due to volatile reads + long index = consumer.index; + int offset = consumer.offset; + Node node = consumer.node; + Observer downstream = consumer.downstream; + int capacity = capacityHint; + + for (;;) { + // if the consumer got disposed, clear the node and quit + if (consumer.disposed) { + consumer.node = null; + return; } - } - @SuppressWarnings("unchecked") - @Override - public void onError(Throwable e) { - if (!sourceDone) { - sourceDone = true; - Object o = NotificationLite.error(e); - add(o); - connection.dispose(); - for (ReplayDisposable rp : observers.getAndSet(TERMINATED)) { - rp.replay(); + // first see if the source has terminated, read order matters! + boolean sourceDone = done; + // and if the number of items is the same as this consumer has received + boolean empty = size == index; + + // if the source is done and we have all items so far, terminate the consumer + if (sourceDone && empty) { + // release the node object to avoid leaks through retained consumers + consumer.node = null; + // if error is not null then the source failed + Throwable ex = error; + if (ex != null) { + downstream.onError(ex); + } else { + downstream.onComplete(); } + return; } - } - @SuppressWarnings("unchecked") - @Override - public void onComplete() { - if (!sourceDone) { - sourceDone = true; - Object o = NotificationLite.complete(); - add(o); - connection.dispose(); - for (ReplayDisposable rp : observers.getAndSet(TERMINATED)) { - rp.replay(); + // there are still items not sent to the consumer + if (!empty) { + // if the offset in the current node has reached the node capacity + if (offset == capacity) { + // switch to the subsequent node + node = node.next; + // reset the in-node offset + offset = 0; } + + // emit the cached item + downstream.onNext(node.values[offset]); + + // move the node offset forward + offset++; + // move the total consumed item count forward + index++; + + // retry for the next item/terminal event if any + continue; + } + + // commit the changed references back + consumer.index = index; + consumer.offset = offset; + consumer.node = node; + // release the changes and see if there were more replay request in the meantime + missed = consumer.addAndGet(-missed); + if (missed == 0) { + break; } } } + @Override + public void onSubscribe(Disposable d) { + // we can't do much with the upstream disposable + } + + @Override + public void onNext(T t) { + int tailOffset = this.tailOffset; + // if the current tail node is full, create a fresh node + if (tailOffset == capacityHint) { + Node n = new Node(tailOffset); + n.values[0] = t; + this.tailOffset = 1; + tail.next = n; + tail = n; + } else { + tail.values[tailOffset] = t; + this.tailOffset = tailOffset + 1; + } + size++; + for (CacheDisposable consumer : observers.get()) { + replay(consumer); + } + } + + @SuppressWarnings("unchecked") + @Override + public void onError(Throwable t) { + error = t; + done = true; + for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) { + replay(consumer); + } + } + + @SuppressWarnings("unchecked") + @Override + public void onComplete() { + done = true; + for (CacheDisposable consumer : observers.getAndSet(TERMINATED)) { + replay(consumer); + } + } + /** - * Keeps track of the current request amount and the replay position for a child Observer. - * - * @param + * Hosts the downstream consumer and its current requested and replay states. + * {@code this} holds the work-in-progress counter for the serialized replay. + * @param the value type */ - static final class ReplayDisposable - extends AtomicInteger + static final class CacheDisposable extends AtomicInteger implements Disposable { - private static final long serialVersionUID = 7058506693698832024L; - /** The actual child subscriber. */ - final Observer child; - /** The cache state object. */ - final CacheState state; + private static final long serialVersionUID = 6770240836423125754L; - /** - * Contains the reference to the buffer segment in replay. - * Accessed after reading state.size() and when emitting == true. - */ - Object[] currentBuffer; - /** - * Contains the index into the currentBuffer where the next value is expected. - * Accessed after reading state.size() and when emitting == true. - */ - int currentIndexInBuffer; - /** - * Contains the absolute index up until the values have been replayed so far. - */ - int index; + final Observer downstream; - /** Set if the ReplayDisposable has been cancelled/disposed. */ - volatile boolean cancelled; + final ObservableCache parent; - ReplayDisposable(Observer child, CacheState state) { - this.child = child; - this.state = state; - } + Node node; - @Override - public boolean isDisposed() { - return cancelled; + int offset; + + long index; + + volatile boolean disposed; + + /** + * Constructs a new instance with the actual downstream consumer and + * the parent cache object. + * @param downstream the actual consumer + * @param parent the parent that holds onto the cached items + */ + CacheDisposable(Observer downstream, ObservableCache parent) { + this.downstream = downstream; + this.parent = parent; + this.node = parent.head; } @Override public void dispose() { - if (!cancelled) { - cancelled = true; - state.removeChild(this); + if (!disposed) { + disposed = true; + parent.remove(this); } } - /** - * Continue replaying available values if there are requests for them. - */ - public void replay() { - // make sure there is only a single thread emitting - if (getAndIncrement() != 0) { - return; - } - - final Observer child = this.child; - int missed = 1; - - for (;;) { + @Override + public boolean isDisposed() { + return disposed; + } + } - if (cancelled) { - return; - } + /** + * Represents a segment of the cached item list as + * part of a linked-node-list structure. + * @param the element type + */ + static final class Node { - // read the size, if it is non-zero, we can safely read the head and - // read values up to the given absolute index - int s = state.size(); - if (s != 0) { - Object[] b = currentBuffer; - - // latch onto the very first buffer now that it is available. - if (b == null) { - b = state.head(); - currentBuffer = b; - } - final int n = b.length - 1; - int j = index; - int k = currentIndexInBuffer; - - while (j < s) { - if (cancelled) { - return; - } - if (k == n) { - b = (Object[])b[n]; - k = 0; - } - Object o = b[k]; - - if (NotificationLite.accept(o, child)) { - return; - } - - k++; - j++; - } - - if (cancelled) { - return; - } - - index = j; - currentIndexInBuffer = k; - currentBuffer = b; + /** + * The array of values held by this node. + */ + final T[] values; - } + /** + * The next node if not null. + */ + volatile Node next; - missed = addAndGet(-missed); - if (missed == 0) { - break; - } - } + @SuppressWarnings("unchecked") + Node(int capacityHint) { + this.values = (T[])new Object[capacityHint]; } } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java index 6a427124bd..4208b18dec 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCacheTest.java @@ -139,7 +139,7 @@ public void testUnsubscribeSource() throws Exception { f.subscribe(); f.subscribe(); f.subscribe(); - verify(unsubscribe, times(1)).run(); + verify(unsubscribe, never()).run(); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java index 0cc39b5c03..dcf7eea347 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java @@ -951,11 +951,11 @@ public void accept(String v) { @Test public void testUnsubscribeSource() throws Exception { Action unsubscribe = mock(Action.class); - Flowable f = Flowable.just(1).doOnCancel(unsubscribe).cache(); + Flowable f = Flowable.just(1).doOnCancel(unsubscribe).replay().autoConnect(); f.subscribe(); f.subscribe(); f.subscribe(); - verify(unsubscribe, times(1)).run(); + verify(unsubscribe, never()).run(); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableCacheTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableCacheTest.java index e2aeb6a3f5..989206f156 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableCacheTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableCacheTest.java @@ -35,7 +35,7 @@ public class ObservableCacheTest { @Test public void testColdReplayNoBackpressure() { - ObservableCache source = (ObservableCache)ObservableCache.from(Observable.range(0, 1000)); + ObservableCache source = new ObservableCache(Observable.range(0, 1000), 16); assertFalse("Source is connected!", source.isConnected()); @@ -120,7 +120,7 @@ public void testUnsubscribeSource() throws Exception { public void testTake() { TestObserver to = new TestObserver(); - ObservableCache cached = (ObservableCache)ObservableCache.from(Observable.range(1, 100)); + ObservableCache cached = new ObservableCache(Observable.range(1, 1000), 16); cached.take(10).subscribe(to); to.assertNoErrors(); @@ -136,7 +136,7 @@ public void testAsync() { for (int i = 0; i < 100; i++) { TestObserver to1 = new TestObserver(); - ObservableCache cached = (ObservableCache)ObservableCache.from(source); + ObservableCache cached = new ObservableCache(source, 16); cached.observeOn(Schedulers.computation()).subscribe(to1); @@ -160,7 +160,7 @@ public void testAsyncComeAndGo() { Observable source = Observable.interval(1, 1, TimeUnit.MILLISECONDS) .take(1000) .subscribeOn(Schedulers.io()); - ObservableCache cached = (ObservableCache)ObservableCache.from(source); + ObservableCache cached = new ObservableCache(source, 16); Observable output = cached.observeOn(Schedulers.computation()); @@ -351,4 +351,23 @@ public void run() { .assertSubscribed().assertValueCount(500).assertComplete().assertNoErrors(); } } + + @Test + public void cancelledUpFront() { + final AtomicInteger call = new AtomicInteger(); + Observable f = Observable.fromCallable(new Callable() { + @Override + public Object call() throws Exception { + return call.incrementAndGet(); + } + }).concatWith(Observable.never()) + .cache(); + + f.test().assertValuesOnly(1); + + f.test(true) + .assertEmpty(); + + assertEquals(1, call.get()); + } }