From be8d144e65cf8dc66661ec4451e7b717b52a70b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 17 Mar 2016 23:18:52 +0100 Subject: [PATCH] 1.x: fix counted buffer and window backpressure --- src/main/java/rx/Observable.java | 16 +- .../internal/operators/BackpressureUtils.java | 210 ++++++- .../operators/OperatorBufferWithSize.java | 396 ++++++++----- .../operators/OperatorWindowWithSize.java | 560 +++++++++++++----- .../rx/internal/operators/UnicastSubject.java | 57 +- .../java/rx/observers/TestSubscriber.java | 19 +- .../operators/OperatorBufferTest.java | 85 ++- .../operators/OperatorWindowWithSizeTest.java | 67 +++ 8 files changed, 1082 insertions(+), 328 deletions(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 7a2d91a2af..ed08939889 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -9744,8 +9744,8 @@ public final Observable> window(Func0 *
*
Backpressure Support:
- *
The operator honors backpressure on its outer subscriber, ignores backpressure in its inner Observables - * but each of them will emit at most {@code count} elements.
+ *
The operator honors backpressure of its inner and outer subscribers, however, the inner Observable uses an + * unbounded buffer that may hold at most {@code count} elements.
*
Scheduler:
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
*
@@ -9754,6 +9754,7 @@ public final Observable> window(Func0ReactiveX operators documentation: Window */ public final Observable> window(int count) { @@ -9769,8 +9770,8 @@ public final Observable> window(int count) { * *
*
Backpressure Support:
- *
The operator honors backpressure on its outer subscriber, ignores backpressure in its inner Observables - * but each of them will emit at most {@code count} elements.
+ *
The operator honors backpressure of its inner and outer subscribers, however, the inner Observable uses an + * unbounded buffer that may hold at most {@code count} elements.
*
Scheduler:
*
This version of {@code window} does not operate by default on a particular {@link Scheduler}.
*
@@ -9782,9 +9783,16 @@ public final Observable> window(int count) { * {@code count} are equal this is the same operation as {@link #window(int)}. * @return an Observable that emits windows every {@code skip} items containing at most {@code count} items * from the source Observable + * @throws IllegalArgumentException if either count or skip is non-positive * @see ReactiveX operators documentation: Window */ public final Observable> window(int count, int skip) { + if (count <= 0) { + throw new IllegalArgumentException("count > 0 required but it was " + count); + } + if (skip <= 0) { + throw new IllegalArgumentException("skip > 0 required but it was " + skip); + } return lift(new OperatorWindowWithSize(count, skip)); } diff --git a/src/main/java/rx/internal/operators/BackpressureUtils.java b/src/main/java/rx/internal/operators/BackpressureUtils.java index 0d4adef0a8..cfbe282901 100644 --- a/src/main/java/rx/internal/operators/BackpressureUtils.java +++ b/src/main/java/rx/internal/operators/BackpressureUtils.java @@ -15,8 +15,10 @@ */ package rx.internal.operators; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.Queue; +import java.util.concurrent.atomic.*; + +import rx.Subscriber; /** * Utility functions for use with backpressure. @@ -32,6 +34,8 @@ private BackpressureUtils() { * addition once the addition is successful (uses CAS semantics). If * overflows then sets {@code requested} field to {@code Long.MAX_VALUE}. * + * @param the type of the target object on which the field updater operates + * * @param requested * atomic field updater for a request count * @param object @@ -103,6 +107,208 @@ public static long addCap(long a, long b) { return u; } + /** + * Masks the most significant bit, i.e., 0x8000_0000_0000_0000L. + */ + static final long COMPLETED_MASK = Long.MIN_VALUE; + /** + * Masks the request amount bits, i.e., 0x7FFF_FFFF_FFFF_FFFF. + */ + static final long REQUESTED_MASK = Long.MAX_VALUE; + + /** + * Signals the completion of the main sequence and switches to post-completion replay mode. + * + *

+ * Don't modify the queue after calling this method! + * + *

+ * Post-completion backpressure handles the case when a source produces values based on + * requests when it is active but more values are available even after its completion. + * In this case, the onCompleted() can't just emit the contents of the queue but has to + * coordinate with the requested amounts. This requires two distinct modes: active and + * completed. In active mode, requests flow through and the queue is not accessed but + * in completed mode, requests no-longer reach the upstream but help in draining the queue. + *

+ * The algorithm utilizes the most significant bit (bit 63) of a long value (AtomicLong) since + * request amount only goes up to Long.MAX_VALUE (bits 0-62) and negative values aren't + * allowed. + * + * @param the value type to emit + * @param requested the holder of current requested amount + * @param queue the queue holding values to be emitted after completion + * @param actual the subscriber to receive the values + */ + public static void postCompleteDone(AtomicLong requested, Queue queue, Subscriber actual) { + for (;;) { + long r = requested.get(); + + // switch to completed mode only once + if ((r & COMPLETED_MASK) != 0L) { + return; + } + + // + long u = r | COMPLETED_MASK; + + if (requested.compareAndSet(r, u)) { + // if we successfully switched to post-complete mode and there + // are requests available start draining the queue + if (r != 0L) { + // if the switch happened when there was outstanding requests, start draining + postCompleteDrain(requested, queue, actual); + } + return; + } + } + } + + /** + * Accumulates requests (validated) and handles the completed mode draining of the queue based on the requests. + * + *

+ * Post-completion backpressure handles the case when a source produces values based on + * requests when it is active but more values are available even after its completion. + * In this case, the onCompleted() can't just emit the contents of the queue but has to + * coordinate with the requested amounts. This requires two distinct modes: active and + * completed. In active mode, requests flow through and the queue is not accessed but + * in completed mode, requests no-longer reach the upstream but help in draining the queue. + * + * @param the value type to emit + * @param requested the holder of current requested amount + * @param n the value requested; + * @param queue the queue holding values to be emitted after completion + * @param actual the subscriber to receive the values + * @return true if in the active mode and the request amount of n can be relayed to upstream, false if + * in the post-completed mode and the queue is draining. + */ + public static boolean postCompleteRequest(AtomicLong requested, long n, Queue queue, Subscriber actual) { + if (n < 0L) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); + } + if (n == 0) { + return (requested.get() & COMPLETED_MASK) == 0; + } + + for (;;) { + long r = requested.get(); + + // mask of the completed flag + long c = r & COMPLETED_MASK; + // mask of the requested amount + long u = r & REQUESTED_MASK; + + // add the current requested amount and the new requested amount + // cap at Long.MAX_VALUE; + long v = addCap(u, n); + + // restore the completed flag + v |= c; + + if (requested.compareAndSet(r, v)) { + // if there was no outstanding request before and in + // the post-completed state, start draining + if (r == COMPLETED_MASK) { + postCompleteDrain(requested, queue, actual); + return false; + } + // returns true for active mode and false if the completed flag was set + return c == 0L; + } + } + } + + /** + * Drains the queue based on the outstanding requests in post-completed mode (only!). + * + * @param the value type to emit + * @param requested the holder of current requested amount + * @param queue the queue holding values to be emitted after completion + * @param actual the subscriber to receive the values + */ + static void postCompleteDrain(AtomicLong requested, Queue queue, Subscriber subscriber) { + + long r = requested.get(); + /* + * Since we are supposed to be in the post-complete state, + * requested will have its top bit set. + * To allow direct comparison, we start with an emission value which has also + * this flag set, then increment it as usual. + * Since COMPLETED_MASK is essentially Long.MIN_VALUE, + * there won't be any overflow or sign flip. + */ + long e = COMPLETED_MASK; + + for (;;) { + + /* + * This is an improved queue-drain algorithm with a specialization + * in which we know the queue won't change anymore (i.e., done is always true + * when looking at the classical algorithm and there is no error). + * + * Note that we don't check for cancellation or emptyness upfront for two reasons: + * 1) if e != r, the loop will do this and we quit appropriately + * 2) if e == r, then either there was no outstanding requests or we emitted the requested amount + * and the execution simply falls to the e == r check below which checks for emptyness anyway. + */ + + while (e != r) { + if (subscriber.isUnsubscribed()) { + return; + } + + T v = queue.poll(); + + if (v == null) { + subscriber.onCompleted(); + return; + } + + subscriber.onNext(v); + + e++; + } + + /* + * If the emission count reaches the requested amount the same time the queue becomes empty + * this will make sure the subscriber is completed immediately instead of on the next request. + * This is also true if there are no outstanding requests (this the while loop doesn't run) + * and the queue is empty from the start. + */ + if (e == r) { + if (subscriber.isUnsubscribed()) { + return; + } + if (queue.isEmpty()) { + subscriber.onCompleted(); + return; + } + } + + /* + * Fast flow: see if more requests have arrived in the meantime. + * This avoids an atomic add (~40 cycles) and resumes the emission immediately. + */ + r = requested.get(); + + if (r == e) { + /* + * Atomically decrement the requested amount by the emission amount. + * We can't use the full emission value because of the completed flag, + * however, due to two's complement representation, the flag on requested + * is preserved. + */ + r = requested.addAndGet(-(e & REQUESTED_MASK)); + // The requested amount actually reached zero, quit + if (r == COMPLETED_MASK) { + return; + } + // reset the emission count + e = COMPLETED_MASK; + } + } + } + /** * Atomically subtracts a value from the requested amount unless it's at Long.MAX_VALUE. * @param requested the requested amount holder diff --git a/src/main/java/rx/internal/operators/OperatorBufferWithSize.java b/src/main/java/rx/internal/operators/OperatorBufferWithSize.java index e08fd440c2..6475547563 100644 --- a/src/main/java/rx/internal/operators/OperatorBufferWithSize.java +++ b/src/main/java/rx/internal/operators/OperatorBufferWithSize.java @@ -15,16 +15,13 @@ */ package rx.internal.operators; -import java.util.ArrayList; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; +import java.util.*; +import java.util.concurrent.atomic.*; +import rx.*; import rx.Observable; import rx.Observable.Operator; -import rx.Producer; -import rx.Subscriber; -import rx.exceptions.Exceptions; +import rx.exceptions.MissingBackpressureException; /** * This operation takes @@ -66,167 +63,274 @@ public OperatorBufferWithSize(int count, int skip) { @Override public Subscriber call(final Subscriber> child) { - if (count == skip) { - return new Subscriber(child) { - List buffer; - - @Override - public void setProducer(final Producer producer) { - child.setProducer(new Producer() { - - private volatile boolean infinite = false; - - @Override - public void request(long n) { - if (infinite) { - return; - } - if (n >= Long.MAX_VALUE / count) { - // n == Long.MAX_VALUE or n * count >= Long.MAX_VALUE - infinite = true; - producer.request(Long.MAX_VALUE); - } else { - producer.request(n * count); - } - } - }); - } + if (skip == count) { + BufferExact parent = new BufferExact(child, count); + + child.add(parent); + child.setProducer(parent.createProducer()); + + return parent; + } + if (skip > count) { + BufferSkip parent = new BufferSkip(child, count, skip); + + child.add(parent); + child.setProducer(parent.createProducer()); + + return parent; + } + BufferOverlap parent = new BufferOverlap(child, count, skip); + + child.add(parent); + child.setProducer(parent.createProducer()); + + return parent; + } + + static final class BufferExact extends Subscriber { + final Subscriber> actual; + final int count; + List buffer; + + public BufferExact(Subscriber> actual, int count) { + this.actual = actual; + this.count = count; + this.request(0L); + } + + @Override + public void onNext(T t) { + List b = buffer; + if (b == null) { + b = new ArrayList(count); + buffer = b; + } + + b.add(t); + + if (b.size() == count) { + buffer = null; + actual.onNext(b); + } + } + + @Override + public void onError(Throwable e) { + buffer = null; + actual.onError(e); + } + + @Override + public void onCompleted() { + List b = buffer; + if (b != null) { + actual.onNext(b); + } + actual.onCompleted(); + } + + Producer createProducer() { + return new Producer() { @Override - public void onNext(T t) { - if (buffer == null) { - buffer = new ArrayList(count); + public void request(long n) { + if (n < 0L) { + throw new IllegalArgumentException("n >= required but it was " + n); } - buffer.add(t); - if (buffer.size() == count) { - List oldBuffer = buffer; - buffer = null; - child.onNext(oldBuffer); + if (n != 0L) { + long u = BackpressureUtils.multiplyCap(n, count); + BufferExact.this.request(u); } } + }; + } + } + + static final class BufferSkip extends Subscriber { + final Subscriber> actual; + final int count; + final int skip; + + long index; + + List buffer; - @Override - public void onError(Throwable e) { - buffer = null; - child.onError(e); - } - - @Override - public void onCompleted() { - List oldBuffer = buffer; + public BufferSkip(Subscriber> actual, int count, int skip) { + this.actual = actual; + this.count = count; + this.skip = skip; + this.request(0L); + } + + @Override + public void onNext(T t) { + long i = index; + List b = buffer; + if (i == 0) { + b = new ArrayList(count); + buffer = b; + } + i++; + if (i == skip) { + index = 0; + } else { + index = i; + } + + if (b != null) { + b.add(t); + + if (b.size() == count) { buffer = null; - if (oldBuffer != null) { - try { - child.onNext(oldBuffer); - } catch (Throwable t) { - Exceptions.throwOrReport(t, this); - return; - } - } - child.onCompleted(); + actual.onNext(b); } - }; + } } - return new Subscriber(child) { - final List> chunks = new LinkedList>(); - int index; - - @Override - public void setProducer(final Producer producer) { - child.setProducer(new Producer() { - - private volatile boolean firstRequest = true; - private volatile boolean infinite = false; - - private void requestInfinite() { - infinite = true; - producer.request(Long.MAX_VALUE); - } - - @Override - public void request(long n) { - if (n == 0) { - return; - } - if (n < 0) { - throw new IllegalArgumentException("request a negative number: " + n); - } - if (infinite) { - return; - } - if (n == Long.MAX_VALUE) { - requestInfinite(); - } else { - if (firstRequest) { - firstRequest = false; - if (n - 1 >= (Long.MAX_VALUE - count) / skip) { - // count + skip * (n - 1) >= Long.MAX_VALUE - requestInfinite(); - return; - } - // count = 5, skip = 2, n = 3 - // * * * * * - // * * * * * - // * * * * * - // request = 5 + 2 * ( 3 - 1) - producer.request(count + skip * (n - 1)); - } else { - if (n >= Long.MAX_VALUE / skip) { - // skip * n >= Long.MAX_VALUE - requestInfinite(); - return; - } - // count = 5, skip = 2, n = 3 - // (* * *) * * - // ( *) * * * * - // * * * * * - // request = skip * n - // "()" means the items already emitted before this request - producer.request(skip * n); - } - } - } - }); + + @Override + public void onError(Throwable e) { + buffer = null; + actual.onError(e); + } + + @Override + public void onCompleted() { + List b = buffer; + if (b != null) { + buffer = null; + actual.onNext(b); } + actual.onCompleted(); + } + + Producer createProducer() { + return new BufferSkipProducer(); + } + + final class BufferSkipProducer + extends AtomicBoolean + implements Producer { + /** */ + private static final long serialVersionUID = 3428177408082367154L; @Override - public void onNext(T t) { - if (index++ % skip == 0) { - chunks.add(new ArrayList(count)); + public void request(long n) { + if (n < 0) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); } - - Iterator> it = chunks.iterator(); - while (it.hasNext()) { - List chunk = it.next(); - chunk.add(t); - if (chunk.size() == count) { - it.remove(); - child.onNext(chunk); + if (n != 0) { + BufferSkip parent = BufferSkip.this; + if (!get() && compareAndSet(false, true)) { + long u = BackpressureUtils.multiplyCap(n, parent.count); + long v = BackpressureUtils.multiplyCap(parent.skip - parent.count, n - 1); + long w = BackpressureUtils.addCap(u, v); + parent.request(w); + } else { + long u = BackpressureUtils.multiplyCap(n, parent.skip); + parent.request(u); } } } + } + } + + static final class BufferOverlap extends Subscriber { + final Subscriber> actual; + final int count; + final int skip; + + long index; + + final ArrayDeque> queue; + + final AtomicLong requested; + + long produced; - @Override - public void onError(Throwable e) { - chunks.clear(); - child.onError(e); + public BufferOverlap(Subscriber> actual, int count, int skip) { + this.actual = actual; + this.count = count; + this.skip = skip; + this.queue = new ArrayDeque>(); + this.requested = new AtomicLong(); + this.request(0L); + } + + @Override + public void onNext(T t) { + long i = index; + if (i == 0) { + List b = new ArrayList(count); + queue.offer(b); + } + i++; + if (i == skip) { + index = 0; + } else { + index = i; + } + + for (List list : queue) { + list.add(t); + } + + List b = queue.peek(); + if (b != null && b.size() == count) { + queue.poll(); + produced++; + actual.onNext(b); + } + } + + @Override + public void onError(Throwable e) { + queue.clear(); + + actual.onError(e); + } + + @Override + public void onCompleted() { + long p = produced; + + if (p != 0L) { + if (p > requested.get()) { + actual.onError(new MissingBackpressureException("More produced than requested? " + p)); + return; + } + requested.addAndGet(-p); } + + BackpressureUtils.postCompleteDone(requested, queue, actual); + } + + Producer createProducer() { + return new BufferOverlapProducer(); + } + + final class BufferOverlapProducer extends AtomicBoolean implements Producer { + + /** */ + private static final long serialVersionUID = -4015894850868853147L; + @Override - public void onCompleted() { - try { - for (List chunk : chunks) { - try { - child.onNext(chunk); - } catch (Throwable t) { - Exceptions.throwOrReport(t, this); - return; + public void request(long n) { + BufferOverlap parent = BufferOverlap.this; + if (BackpressureUtils.postCompleteRequest(parent.requested, n, parent.queue, parent.actual)) { + if (n != 0L) { + if (!get() && compareAndSet(false, true)) { + long u = BackpressureUtils.multiplyCap(parent.skip, n - 1); + long v = BackpressureUtils.addCap(u, parent.count); + + parent.request(v); + } else { + long u = BackpressureUtils.multiplyCap(parent.skip, n); + parent.request(u); } } - child.onCompleted(); - } finally { - chunks.clear(); } } - }; + + } } } diff --git a/src/main/java/rx/internal/operators/OperatorWindowWithSize.java b/src/main/java/rx/internal/operators/OperatorWindowWithSize.java index 48d80d9cfb..3538991526 100644 --- a/src/main/java/rx/internal/operators/OperatorWindowWithSize.java +++ b/src/main/java/rx/internal/operators/OperatorWindowWithSize.java @@ -16,12 +16,14 @@ package rx.internal.operators; import java.util.*; +import java.util.concurrent.atomic.*; import rx.*; -import rx.Observable.Operator; import rx.Observable; -import rx.Observer; +import rx.Observable.Operator; import rx.functions.Action0; +import rx.internal.util.atomic.SpscLinkedArrayQueue; +import rx.subjects.Subject; import rx.subscriptions.Subscriptions; /** @@ -48,215 +50,455 @@ public OperatorWindowWithSize(int size, int skip) { @Override public Subscriber call(Subscriber> child) { if (skip == size) { - ExactSubscriber e = new ExactSubscriber(child); - e.init(); - return e; + WindowExact parent = new WindowExact(child, size); + + child.add(parent.cancel); + child.setProducer(parent.createProducer()); + + return parent; + } else + if (skip > size) { + WindowSkip parent = new WindowSkip(child, size, skip); + + child.add(parent.cancel); + child.setProducer(parent.createProducer()); + + return parent; } - InexactSubscriber ie = new InexactSubscriber(child); - ie.init(); - return ie; + + WindowOverlap parent = new WindowOverlap(child, size, skip); + + child.add(parent.cancel); + child.setProducer(parent.createProducer()); + + return parent; + } - /** Subscriber with exact, non-overlapping window bounds. */ - final class ExactSubscriber extends Subscriber { - final Subscriber> child; - int count; - UnicastSubject window; - volatile boolean noWindow = true; - public ExactSubscriber(Subscriber> child) { - /** - * See https://github.com/ReactiveX/RxJava/issues/1546 - * We cannot compose through a Subscription because unsubscribing - * applies to the outer, not the inner. - */ - this.child = child; - /* - * Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself) - */ + + static final class WindowExact extends Subscriber implements Action0 { + final Subscriber> actual; + + final int size; + + final AtomicInteger wip; + + final Subscription cancel; + + int index; + + Subject window; + + public WindowExact(Subscriber> actual, int size) { + this.actual = actual; + this.size = size; + this.wip = new AtomicInteger(1); + this.cancel = Subscriptions.create(this); + this.add(cancel); + this.request(0); } - void init() { - child.add(Subscriptions.create(new Action0() { - - @Override - public void call() { - // if no window we unsubscribe up otherwise wait until window ends - if (noWindow) { - unsubscribe(); - } - } + + @Override + public void onNext(T t) { + int i = index; + + Subject w = window; + if (i == 0) { + wip.getAndIncrement(); - })); - child.setProducer(new Producer() { + w = UnicastSubject.create(size, this); + window = w; + + actual.onNext(w); + } + i++; + + w.onNext(t); + + if (i == size) { + index = 0; + window = null; + w.onCompleted(); + } else { + index = i; + } + } + + @Override + public void onError(Throwable e) { + Subject w = window; + + if (w != null) { + window = null; + w.onError(e); + } + actual.onError(e); + } + + @Override + public void onCompleted() { + Subject w = window; + + if (w != null) { + window = null; + w.onCompleted(); + } + actual.onCompleted(); + } + + Producer createProducer() { + return new Producer() { @Override public void request(long n) { - if (n > 0) { - long u = n * size; - if (((u >>> 31) != 0) && (u / n != size)) { - u = Long.MAX_VALUE; - } - requestMore(u); + if (n < 0L) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); + } + if (n != 0L) { + long u = BackpressureUtils.multiplyCap(size, n); + WindowExact.this.request(u); } } - }); + }; } - void requestMore(long n) { - request(n); + @Override + public void call() { + if (wip.decrementAndGet() == 0) { + unsubscribe(); + } } - + } + + static final class WindowSkip extends Subscriber implements Action0 { + final Subscriber> actual; + + final int size; + + final int skip; + + final AtomicInteger wip; + + final Subscription cancel; + + int index; + + Subject window; + + public WindowSkip(Subscriber> actual, int size, int skip) { + this.actual = actual; + this.size = size; + this.skip = skip; + this.wip = new AtomicInteger(1); + this.cancel = Subscriptions.create(this); + this.add(cancel); + this.request(0); + } + @Override public void onNext(T t) { - if (window == null) { - noWindow = false; - window = UnicastSubject.create(); - child.onNext(window); + int i = index; + + Subject w = window; + if (i == 0) { + wip.getAndIncrement(); + + w = UnicastSubject.create(size, this); + window = w; + + actual.onNext(w); } - window.onNext(t); - if (++count % size == 0) { - window.onCompleted(); + i++; + + if (w != null) { + w.onNext(t); + } + + if (i == size) { + index = i; window = null; - noWindow = true; - if (child.isUnsubscribed()) { - unsubscribe(); - } + w.onCompleted(); + } else + if (i == skip) { + index = 0; + } else { + index = i; } + } - + @Override public void onError(Throwable e) { - if (window != null) { - window.onError(e); + Subject w = window; + + if (w != null) { + window = null; + w.onError(e); } - child.onError(e); + actual.onError(e); } - + @Override public void onCompleted() { - if (window != null) { - window.onCompleted(); + Subject w = window; + + if (w != null) { + window = null; + w.onCompleted(); } - child.onCompleted(); + actual.onCompleted(); } - } - - /** Subscriber with inexact, possibly overlapping or skipping windows. */ - final class InexactSubscriber extends Subscriber { - final Subscriber> child; - int count; - final List> chunks = new LinkedList>(); - volatile boolean noWindow = true; - - public InexactSubscriber(Subscriber> child) { - /** - * See https://github.com/ReactiveX/RxJava/issues/1546 - * We cannot compose through a Subscription because unsubscribing - * applies to the outer, not the inner. - */ - this.child = child; + + Producer createProducer() { + return new WindowSkipProducer(); } + + @Override + public void call() { + if (wip.decrementAndGet() == 0) { + unsubscribe(); + } + } + + final class WindowSkipProducer extends AtomicBoolean implements Producer { + /** */ + private static final long serialVersionUID = 4625807964358024108L; - void init() { - /* - * Add unsubscribe hook to child to get unsubscribe on outer (unsubscribing on next window, not on the inner window itself) - */ - child.add(Subscriptions.create(new Action0() { - - @Override - public void call() { - // if no window we unsubscribe up otherwise wait until window ends - if (noWindow) { - unsubscribe(); - } + @Override + public void request(long n) { + if (n < 0L) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); } - - })); - - child.setProducer(new Producer() { - @Override - public void request(long n) { - if (n > 0) { - long u = n * size; - if (((u >>> 31) != 0) && (u / n != size)) { - u = Long.MAX_VALUE; - } - requestMore(u); + if (n != 0L) { + WindowSkip parent = WindowSkip.this; + if (!get() && compareAndSet(false, true)) { + long u = BackpressureUtils.multiplyCap(n, parent.size); + long v = BackpressureUtils.multiplyCap(parent.skip - parent.size, n - 1); + long w = BackpressureUtils.addCap(u, v); + parent.request(w); + } else { + long u = BackpressureUtils.multiplyCap(n, parent.skip); + parent.request(u); } } - }); + } } + } + + static final class WindowOverlap extends Subscriber implements Action0 { + final Subscriber> actual; - void requestMore(long n) { - request(n); - } + final int size; + + final int skip; + + final AtomicInteger wip; + + final Subscription cancel; + + final ArrayDeque> windows; + final AtomicLong requested; + + final AtomicInteger drainWip; + + final Queue> queue; + + Throwable error; + + volatile boolean done; + + int index; + + int produced; + + public WindowOverlap(Subscriber> actual, int size, int skip) { + this.actual = actual; + this.size = size; + this.skip = skip; + this.wip = new AtomicInteger(1); + this.windows = new ArrayDeque>(); + this.drainWip = new AtomicInteger(); + this.requested = new AtomicLong(); + this.cancel = Subscriptions.create(this); + this.add(cancel); + this.request(0); + int maxWindows = (size + (skip - 1)) / skip; + this.queue = new SpscLinkedArrayQueue>(maxWindows); + } + @Override public void onNext(T t) { - if (count++ % skip == 0) { - if (!child.isUnsubscribed()) { - if (chunks.isEmpty()) { - noWindow = false; - } - CountedSubject cs = createCountedSubject(); - chunks.add(cs); - child.onNext(cs.producer); - } + int i = index; + + ArrayDeque> q = windows; + + if (i == 0 && !actual.isUnsubscribed()) { + wip.getAndIncrement(); + + Subject w = UnicastSubject.create(16, this); + q.offer(w); + + queue.offer(w); + drain(); } - Iterator> it = chunks.iterator(); - while (it.hasNext()) { - CountedSubject cs = it.next(); - cs.consumer.onNext(t); - if (++cs.count == size) { - it.remove(); - cs.consumer.onCompleted(); - } + for (Subject w : windows) { + w.onNext(t); } - if (chunks.isEmpty()) { - noWindow = true; - if (child.isUnsubscribed()) { - unsubscribe(); + + int p = produced + 1; + + if (p == size) { + produced = p - skip; + + Subject w = q.poll(); + if (w != null) { + w.onCompleted(); } + } else { + produced = p; + } + + i++; + if (i == skip) { + index = 0; + } else { + index = i; } } - + @Override public void onError(Throwable e) { - List> list = new ArrayList>(chunks); - chunks.clear(); - noWindow = true; - for (CountedSubject cs : list) { - cs.consumer.onError(e); + for (Subject w : windows) { + w.onError(e); } - child.onError(e); + windows.clear(); + + error = e; + done = true; + drain(); } - + @Override public void onCompleted() { - List> list = new ArrayList>(chunks); - chunks.clear(); - noWindow = true; - for (CountedSubject cs : list) { - cs.consumer.onCompleted(); + for (Subject w : windows) { + w.onCompleted(); + } + windows.clear(); + + done = true; + drain(); + } + + Producer createProducer() { + return new WindowOverlapProducer(); + } + + @Override + public void call() { + if (wip.decrementAndGet() == 0) { + unsubscribe(); } - child.onCompleted(); } + + void drain() { + AtomicInteger dw = drainWip; + if (dw.getAndIncrement() != 0) { + return; + } - CountedSubject createCountedSubject() { - final UnicastSubject bus = UnicastSubject.create(); - return new CountedSubject(bus, bus); + final Subscriber> a = actual; + final Queue> q = queue; + + int missed = 1; + + for (;;) { + + long r = requested.get(); + long e = 0L; + + while (e != r) { + boolean d = done; + Subject v = q.poll(); + boolean empty = v == null; + + if (checkTerminated(d, empty, a, q)) { + return; + } + + if (empty) { + break; + } + + a.onNext(v); + + e++; + } + + if (e == r) { + if (checkTerminated(done, q.isEmpty(), a, q)) { + return; + } + } + + if (e != 0 && r != Long.MAX_VALUE) { + requested.addAndGet(-e); + } + + missed = dw.addAndGet(-missed); + if (missed == 0) { + break; + } + } } - } - /** - * Record to store the subject and the emission count. - * @param the subject's in-out type - */ - static final class CountedSubject { - final Observer consumer; - final Observable producer; - int count; + + boolean checkTerminated(boolean d, boolean empty, Subscriber> a, Queue> q) { + if (a.isUnsubscribed()) { + q.clear(); + return true; + } + if (d) { + Throwable e = error; + if (e != null) { + q.clear(); + a.onError(e); + return true; + } else + if (empty) { + a.onCompleted(); + return true; + } + } + return false; + } + + final class WindowOverlapProducer extends AtomicBoolean implements Producer { + /** */ + private static final long serialVersionUID = 4625807964358024108L; - public CountedSubject(Observer consumer, Observable producer) { - this.consumer = consumer; - this.producer = producer; + @Override + public void request(long n) { + if (n < 0L) { + throw new IllegalArgumentException("n >= 0 required but it was " + n); + } + if (n != 0L) { + + WindowOverlap parent = WindowOverlap.this; + + if (!get() && compareAndSet(false, true)) { + long u = BackpressureUtils.multiplyCap(parent.skip, n - 1); + long v = BackpressureUtils.addCap(u, parent.size); + + parent.request(v); + } else { + long u = BackpressureUtils.multiplyCap(parent.skip, n); + WindowOverlap.this.request(u); + } + + BackpressureUtils.getAndAddRequest(parent.requested, n); + parent.drain(); + } + } } } + } diff --git a/src/main/java/rx/internal/operators/UnicastSubject.java b/src/main/java/rx/internal/operators/UnicastSubject.java index 5fb21b65f6..569745358e 100644 --- a/src/main/java/rx/internal/operators/UnicastSubject.java +++ b/src/main/java/rx/internal/operators/UnicastSubject.java @@ -32,12 +32,15 @@ * amount. In this case, the buffered values are no longer retained. If the Subscriber * requests a limited amount, queueing is involved and only those values are retained which * weren't requested by the Subscriber at that time. + * + * @param the input and output value type */ public final class UnicastSubject extends Subject { /** * Constructs an empty UnicastSubject instance with the default capacity hint of 16 elements. * + * @param the input and output value type * @return the created UnicastSubject instance */ public static UnicastSubject create() { @@ -48,14 +51,34 @@ public static UnicastSubject create() { *

The capacity hint determines the internal queue's island size: the larger * it is the less frequent allocation will happen if there is no subscriber * or the subscriber hasn't caught up. + * @param the input and output value type * @param capacityHint the capacity hint for the internal queue * @return the created BufferUntilSubscriber instance */ public static UnicastSubject create(int capacityHint) { - State state = new State(capacityHint); + State state = new State(capacityHint, null); return new UnicastSubject(state); } - + + /** + * Constructs an empty UnicastSubject instance with a capacity hint and + * an Action0 instance to call if the subject reaches its terminal state + * or the single Subscriber unsubscribes mid-sequence. + *

The capacity hint determines the internal queue's island size: the larger + * it is the less frequent allocation will happen if there is no subscriber + * or the subscriber hasn't caught up. + * @param the input and output value type + * @param capacityHint the capacity hint for the internal queue + * @param onTerminated the optional callback to call when subject reaches its terminal state + * or the single Subscriber unsubscribes mid-sequence. It will be called + * at most once. + * @return the created BufferUntilSubscriber instance + */ + public static UnicastSubject create(int capacityHint, Action0 onTerminated) { + State state = new State(capacityHint, onTerminated); + return new UnicastSubject(state); + } + final State state; private UnicastSubject(State state) { @@ -97,6 +120,8 @@ static final class State extends AtomicLong implements Producer, Observer, final Queue queue; /** JCTools queues don't accept nulls. */ final NotificationLite nl; + /** Atomically set to true on terminal condition. */ + final AtomicReference terminateOnce; /** In case the source emitted an error. */ Throwable error; /** Indicates the source has terminated. */ @@ -111,10 +136,14 @@ static final class State extends AtomicLong implements Producer, Observer, * Constructor. * @param capacityHint indicates how large each island in the Spsc queue should be to * reduce allocation frequency + * @param onTerminated the action to call when the subject reaches its terminal state or + * the single subscriber unsubscribes. */ - public State(int capacityHint) { + public State(int capacityHint, Action0 onTerminated) { this.nl = NotificationLite.instance(); this.subscriber = new AtomicReference>(); + this.terminateOnce = onTerminated != null ? new AtomicReference(onTerminated) : null; + Queue q; if (capacityHint > 1) { q = UnsafeAccess.isUnsafeAvailable() @@ -161,6 +190,9 @@ public void onNext(T t) { @Override public void onError(Throwable e) { if (!done) { + + doTerminate(); + error = e; done = true; if (!caughtUp) { @@ -179,6 +211,9 @@ public void onError(Throwable e) { @Override public void onCompleted() { if (!done) { + + doTerminate(); + done = true; if (!caughtUp) { boolean stillReplay = false; @@ -292,6 +327,9 @@ void replay() { */ @Override public void call() { + + doTerminate(); + done = true; synchronized (this) { if (emitting) { @@ -328,5 +366,18 @@ boolean checkTerminated(boolean done, boolean empty, Subscriber s) { } return false; } + + /** + * Call the optional termination action at most once. + */ + void doTerminate() { + AtomicReference ref = this.terminateOnce; + if (ref != null) { + Action0 a = ref.get(); + if (a != null && ref.compareAndSet(a, null)) { + a.call(); + } + } + } } } \ No newline at end of file diff --git a/src/main/java/rx/observers/TestSubscriber.java b/src/main/java/rx/observers/TestSubscriber.java index 6b9cf90d3e..17655ab91f 100644 --- a/src/main/java/rx/observers/TestSubscriber.java +++ b/src/main/java/rx/observers/TestSubscriber.java @@ -25,14 +25,13 @@ /** * A {@code TestSubscriber} is a variety of {@link Subscriber} that you can use for unit testing, to perform * assertions, inspect received events, or wrap a mocked {@code Subscriber}. + * @param the value type */ public class TestSubscriber extends Subscriber { private final TestObserver testObserver; private final CountDownLatch latch = new CountDownLatch(1); private volatile Thread lastSeenThread; - /** Holds the initial request value. */ - private final long initialRequest; /** The shared no-op observer. */ private static final Observer INERT = new Observer() { @@ -78,7 +77,9 @@ public TestSubscriber(Observer delegate, long initialRequest) { throw new NullPointerException(); } this.testObserver = new TestObserver(delegate); - this.initialRequest = initialRequest; + if (initialRequest >= 0L) { + this.request(initialRequest); + } } /** @@ -112,6 +113,7 @@ public TestSubscriber() { /** * Factory method to construct a TestSubscriber with an initial request of Long.MAX_VALUE and no delegation. + * @param the value type * @return the created TestSubscriber instance * @since 1.1.0 */ @@ -121,6 +123,7 @@ public static TestSubscriber create() { /** * Factory method to construct a TestSubscriber with the given initial request amount and no delegation. + * @param the value type * @param initialRequest the initial request amount, negative values revert to the default unbounded mode * @return the created TestSubscriber instance * @since 1.1.0 @@ -132,6 +135,7 @@ public static TestSubscriber create(long initialRequest) { /** * Factory method to construct a TestSubscriber which delegates events to the given Observer and * issues the given initial request amount. + * @param the value type * @param delegate the observer to delegate events to * @param initialRequest the initial request amount, negative values revert to the default unbounded mode * @return the created TestSubscriber instance @@ -145,6 +149,7 @@ public static TestSubscriber create(Observer delegate, long initialReq /** * Factory method to construct a TestSubscriber which delegates events to the given Observer and * an issues an initial request of Long.MAX_VALUE. + * @param the value type * @param delegate the observer to delegate events to * @return the created TestSubscriber instance * @throws NullPointerException if delegate is null @@ -157,6 +162,7 @@ public static TestSubscriber create(Subscriber delegate) { /** * Factory method to construct a TestSubscriber which delegates events to the given Subscriber and * an issues an initial request of Long.MAX_VALUE. + * @param the value type * @param delegate the subscriber to delegate events to * @return the created TestSubscriber instance * @throws NullPointerException if delegate is null @@ -166,13 +172,6 @@ public static TestSubscriber create(Observer delegate) { return new TestSubscriber(delegate); } - @Override - public void onStart() { - if (initialRequest >= 0) { - requestMore(initialRequest); - } - } - /** * Notifies the Subscriber that the {@code Observable} has finished sending push-based notifications. *

diff --git a/src/test/java/rx/internal/operators/OperatorBufferTest.java b/src/test/java/rx/internal/operators/OperatorBufferTest.java index 75cddb8ad1..ad8b1bc9c7 100644 --- a/src/test/java/rx/internal/operators/OperatorBufferTest.java +++ b/src/test/java/rx/internal/operators/OperatorBufferTest.java @@ -949,10 +949,11 @@ public void call(final Subscriber s) { @Override public void request(long n) { - requested.set(n); - s.onNext(1); - s.onNext(2); - s.onNext(3); + if (BackpressureUtils.getAndAddRequest(requested, n) == 0) { + s.onNext(1); + s.onNext(2); + s.onNext(3); + } } }); @@ -1015,4 +1016,80 @@ public void onCompleted() { assertFalse(s.isUnsubscribed()); } + + @SuppressWarnings("unchecked") + @Test + public void testPostCompleteBackpressure() { + Observable> source = Observable.range(1, 10).buffer(3, 1); + + TestSubscriber> ts = TestSubscriber.create(0L); + + source.subscribe(ts); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertNoErrors(); + + ts.requestMore(7); + + ts.assertValues( + Arrays.asList(1, 2, 3), + Arrays.asList(2, 3, 4), + Arrays.asList(3, 4, 5), + Arrays.asList(4, 5, 6), + Arrays.asList(5, 6, 7), + Arrays.asList(6, 7, 8), + Arrays.asList(7, 8, 9) + ); + ts.assertNotCompleted(); + ts.assertNoErrors(); + + ts.requestMore(1); + + ts.assertValues( + Arrays.asList(1, 2, 3), + Arrays.asList(2, 3, 4), + Arrays.asList(3, 4, 5), + Arrays.asList(4, 5, 6), + Arrays.asList(5, 6, 7), + Arrays.asList(6, 7, 8), + Arrays.asList(7, 8, 9), + Arrays.asList(8, 9, 10) + ); + ts.assertNotCompleted(); + ts.assertNoErrors(); + + ts.requestMore(1); + + ts.assertValues( + Arrays.asList(1, 2, 3), + Arrays.asList(2, 3, 4), + Arrays.asList(3, 4, 5), + Arrays.asList(4, 5, 6), + Arrays.asList(5, 6, 7), + Arrays.asList(6, 7, 8), + Arrays.asList(7, 8, 9), + Arrays.asList(8, 9, 10), + Arrays.asList(9, 10) + ); + ts.assertNotCompleted(); + ts.assertNoErrors(); + + ts.requestMore(1); + + ts.assertValues( + Arrays.asList(1, 2, 3), + Arrays.asList(2, 3, 4), + Arrays.asList(3, 4, 5), + Arrays.asList(4, 5, 6), + Arrays.asList(5, 6, 7), + Arrays.asList(6, 7, 8), + Arrays.asList(7, 8, 9), + Arrays.asList(8, 9, 10), + Arrays.asList(9, 10), + Arrays.asList(10) + ); + ts.assertCompleted(); + ts.assertNoErrors(); + } } diff --git a/src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java b/src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java index 9dade31fbc..3677e83e0a 100644 --- a/src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java +++ b/src/test/java/rx/internal/operators/OperatorWindowWithSizeTest.java @@ -324,4 +324,71 @@ public Observable> call(Observable t) { ts.assertNoErrors(); ts.assertCompleted(); } + + @Test + public void testBackpressureOuterOverlap() { + Observable> source = Observable.range(1, 10).window(3, 1); + + TestSubscriber> ts = TestSubscriber.create(0L); + + source.subscribe(ts); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(1); + + ts.assertValueCount(1); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(7); + + ts.assertValueCount(8); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(3); + + ts.assertValueCount(10); + ts.assertCompleted(); + ts.assertNoErrors(); + } + + @Test(expected = IllegalArgumentException.class) + public void testCountInvalid() { + Observable.range(1, 10).window(0, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void testSkipInvalid() { + Observable.range(1, 10).window(3, 0); + } + @Test + public void testTake1Overlapping() { + Observable> source = Observable.range(1, 10).window(3, 1).take(1); + + TestSubscriber> ts = TestSubscriber.create(0L); + + source.subscribe(ts); + + ts.assertNoValues(); + ts.assertNoErrors(); + ts.assertNotCompleted(); + + ts.requestMore(2); + + ts.assertValueCount(1); + ts.assertCompleted(); + ts.assertNoErrors(); + + TestSubscriber ts1 = TestSubscriber.create(); + + ts.getOnNextEvents().get(0).subscribe(ts1); + + ts1.assertValues(1, 2, 3); + ts1.assertCompleted(); + ts1.assertNoErrors(); + } } \ No newline at end of file