From c38a7806224772d508a8302c0fd3ae85d0ea5957 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 11 Dec 2014 21:53:45 +0100 Subject: [PATCH 1/7] Fixed race & late termination condition. --- .../OperatorOnBackpressureBlock.java | 38 ++++++-- .../operators/OnBackpressureBlockTest.java | 93 ++++++++++++++++++- 2 files changed, 117 insertions(+), 14 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java index bb328788e9..e106bac70d 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java @@ -58,6 +58,9 @@ void init() { child.setProducer(new Producer() { @Override public void request(long n) { + if (n == 0) { + return; + } synchronized (BlockingSubscriber.this) { if (n == Long.MAX_VALUE || requestedCount == Long.MAX_VALUE) { requestedCount = Long.MAX_VALUE; @@ -95,28 +98,41 @@ public void onCompleted() { } void drain() { long n; + boolean term; synchronized (this) { if (emitting) { return; } emitting = true; n = requestedCount; + term = terminated; } boolean skipFinal = false; try { + Subscriber child = this.child; + BlockingQueue queue = this.queue; while (true) { int emitted = 0; - while (n > 0) { - Object o = queue.poll(); - if (o == null) { - if (terminated) { - if (exception != null) { - child.onError(exception); + while (n > 0 || term) { + Object o; + if (term) { + o = queue.peek(); + if (o == null) { + Throwable e = exception; + if (e != null) { + child.onError(e); } else { child.onCompleted(); } + skipFinal = true; return; } + if (n == 0) { + break; + } + } + o = queue.poll(); + if (o == null) { break; } else { child.onNext(nl.getValue(o)); @@ -125,23 +141,25 @@ void drain() { } } synchronized (this) { + term = terminated; + boolean more = queue.peek() != null; // if no backpressure below if (requestedCount == Long.MAX_VALUE) { // no new data arrived since the last poll - if (queue.peek() == null) { + if (!more && !term) { skipFinal = true; emitting = false; return; } n = Long.MAX_VALUE; } else { - if (emitted == 0) { + requestedCount -= emitted; + n = requestedCount; + if ((n == 0 || !more) && (!term || more)) { skipFinal = true; emitting = false; return; } - requestedCount -= emitted; - n = requestedCount; } } } diff --git a/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java b/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java index b247b15e6b..47d3cebd71 100644 --- a/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java +++ b/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java @@ -16,11 +16,13 @@ package rx.internal.operators; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; import static org.mockito.Mockito.*; import java.util.Arrays; - -import static org.junit.Assert.*; +import java.util.Collections; import org.junit.Test; @@ -34,6 +36,7 @@ import rx.observers.TestObserver; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; +import rx.subjects.PublishSubject; /** * Test the onBackpressureBlock() behavior. @@ -161,13 +164,15 @@ public void onStart() { Thread.sleep(WAIT); o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - o.assertNoErrors(); - assertTrue(o.getOnCompletedEvents().isEmpty()); + o.assertTerminalEvent(); + assertEquals(1, o.getOnErrorEvents().size()); + assertTrue(o.getOnErrorEvents().get(0) instanceof TestException); o.requestMore(10); Thread.sleep(WAIT); + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); o.assertTerminalEvent(); assertEquals(1, o.getOnErrorEvents().size()); assertTrue(o.getOnErrorEvents().get(0) instanceof TestException); @@ -259,4 +264,84 @@ public void testTakeWorksSubscriberRequestUnlimitedBufferedException() { o.assertNoErrors(); o.assertTerminalEvent(); } + @Test(timeout = 10000) + public void testOnCompletedDoesntWaitIfNoEvents() { + + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + }; + Observable.empty().onBackpressureBlock(2).subscribe(o); + + o.assertNoErrors(); + o.assertTerminalEvent(); + o.assertReceivedOnNext(Collections.emptyList()); + } + @Test(timeout = 10000) + public void testOnCompletedDoesWaitIfEvents() { + + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + }; + Observable.just(1).onBackpressureBlock(2).subscribe(o); + + o.assertReceivedOnNext(Collections.emptyList()); + assertTrue(o.getOnErrorEvents().isEmpty()); + assertTrue(o.getOnCompletedEvents().isEmpty()); + } + @Test(timeout = 10000) + public void testOnCompletedDoesntWaitIfNoEvents2() { + final PublishSubject ps = PublishSubject.create(); + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + @Override + public void onNext(Integer t) { + super.onNext(t); + ps.onCompleted(); // as if an async completion arrived while in the loop + } + }; + ps.onBackpressureBlock(2).unsafeSubscribe(o); + ps.onNext(1); + o.requestMore(1); + + o.assertNoErrors(); + o.assertTerminalEvent(); + o.assertReceivedOnNext(Arrays.asList(1)); + } + @Test(timeout = 10000) + public void testOnCompletedDoesntWaitIfNoEvents3() { + final PublishSubject ps = PublishSubject.create(); + TestSubscriber o = new TestSubscriber() { + boolean once = true; + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + @Override + public void onNext(Integer t) { + super.onNext(t); + if (once) { + once = false; + ps.onNext(2); + ps.onCompleted(); // as if an async completion arrived while in the loop + requestMore(1); + } + } + }; + ps.onBackpressureBlock(3).unsafeSubscribe(o); + ps.onNext(1); + o.requestMore(1); + + o.assertNoErrors(); + o.assertTerminalEvent(); + o.assertReceivedOnNext(Arrays.asList(1, 2)); + } } From f6fd5deffde08b35a6b01a3ac3c6f58066b3a1ee Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 11 Dec 2014 23:59:00 +0100 Subject: [PATCH 2/7] Factored out the backpressure management into an experimental class and reimplemented Buffer and Block strategies with it. --- .../OperatorOnBackpressureBlock.java | 132 ++-------- .../OperatorOnBackpressureBuffer.java | 188 +++++++------- .../util/BackpressureDrainManager.java | 238 ++++++++++++++++++ 3 files changed, 351 insertions(+), 207 deletions(-) create mode 100644 src/main/java/rx/internal/util/BackpressureDrainManager.java diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java index e106bac70d..71a5fc4993 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java @@ -20,8 +20,8 @@ import java.util.concurrent.BlockingQueue; import rx.Observable.Operator; -import rx.Producer; import rx.Subscriber; +import rx.internal.util.BackpressureDrainManager; /** * Operator that blocks the producer thread in case a backpressure is needed. @@ -38,45 +38,25 @@ public Subscriber call(Subscriber child) { return s; } - static final class BlockingSubscriber extends Subscriber { + static final class BlockingSubscriber extends Subscriber implements BackpressureDrainManager.BackpressureQueueCallback { final NotificationLite nl = NotificationLite.instance(); final BlockingQueue queue; final Subscriber child; - /** Guarded by this. */ - long requestedCount; - /** Guarded by this. */ - boolean emitting; - volatile boolean terminated; - /** Set before terminated, read after terminated. */ - Throwable exception; + final BackpressureDrainManager manager; public BlockingSubscriber(int max, Subscriber child) { this.queue = new ArrayBlockingQueue(max); this.child = child; + this.manager = new BackpressureDrainManager(this); } void init() { child.add(this); - child.setProducer(new Producer() { - @Override - public void request(long n) { - if (n == 0) { - return; - } - synchronized (BlockingSubscriber.this) { - if (n == Long.MAX_VALUE || requestedCount == Long.MAX_VALUE) { - requestedCount = Long.MAX_VALUE; - } else { - requestedCount += n; - } - } - drain(); - } - }); + child.setProducer(manager); } @Override public void onNext(T t) { try { queue.put(nl.next(t)); - drain(); + manager.drain(); } catch (InterruptedException ex) { if (!isUnsubscribed()) { onError(ex); @@ -85,91 +65,31 @@ public void onNext(T t) { } @Override public void onError(Throwable e) { - if (!terminated) { - exception = e; - terminated = true; - drain(); - } + manager.terminateAndDrain(e); } @Override public void onCompleted() { - terminated = true; - drain(); + manager.terminateAndDrain(); } - void drain() { - long n; - boolean term; - synchronized (this) { - if (emitting) { - return; - } - emitting = true; - n = requestedCount; - term = terminated; - } - boolean skipFinal = false; - try { - Subscriber child = this.child; - BlockingQueue queue = this.queue; - while (true) { - int emitted = 0; - while (n > 0 || term) { - Object o; - if (term) { - o = queue.peek(); - if (o == null) { - Throwable e = exception; - if (e != null) { - child.onError(e); - } else { - child.onCompleted(); - } - skipFinal = true; - return; - } - if (n == 0) { - break; - } - } - o = queue.poll(); - if (o == null) { - break; - } else { - child.onNext(nl.getValue(o)); - n--; - emitted++; - } - } - synchronized (this) { - term = terminated; - boolean more = queue.peek() != null; - // if no backpressure below - if (requestedCount == Long.MAX_VALUE) { - // no new data arrived since the last poll - if (!more && !term) { - skipFinal = true; - emitting = false; - return; - } - n = Long.MAX_VALUE; - } else { - requestedCount -= emitted; - n = requestedCount; - if ((n == 0 || !more) && (!term || more)) { - skipFinal = true; - emitting = false; - return; - } - } - } - } - } finally { - if (!skipFinal) { - synchronized (this) { - emitting = false; - } - } + @Override + public boolean accept(Object value) { + return nl.accept(child, value); + } + @Override + public void complete(Throwable exception) { + if (exception != null) { + child.onError(exception); + } else { + child.onCompleted(); } } + @Override + public Object peek() { + return queue.peek(); + } + @Override + public Object poll() { + return queue.poll(); + } } } diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java index 2ddb582f9e..1517736dbd 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java @@ -15,7 +15,6 @@ */ package rx.internal.operators; -import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -25,11 +24,10 @@ import rx.Subscriber; import rx.exceptions.MissingBackpressureException; import rx.functions.Action0; +import rx.internal.util.BackpressureDrainManager; public class OperatorOnBackpressureBuffer implements Operator { - private final NotificationLite on = NotificationLite.instance(); - private final Long capacity; private final Action0 onOverflow; @@ -52,122 +50,110 @@ public OperatorOnBackpressureBuffer(long capacity, Action0 onOverflow) { @Override public Subscriber call(final Subscriber child) { - // TODO get a different queue implementation - final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); - final AtomicLong capacity = (this.capacity == null) ? null : new AtomicLong(this.capacity); - final AtomicLong wip = new AtomicLong(); - final AtomicLong requested = new AtomicLong(); - - child.setProducer(new Producer() { - - @Override - public void request(long n) { - if (requested.getAndAdd(n) == 0) { - pollQueue(wip, requested, capacity, queue, child); - } - } - }); // don't pass through subscriber as we are async and doing queue draining // a parent being unsubscribed should not affect the children - Subscriber parent = new Subscriber() { + BufferSubscriber parent = new BufferSubscriber(child, capacity, onOverflow); - private AtomicBoolean saturated = new AtomicBoolean(false); + // if child unsubscribes it should unsubscribe the parent, but not the other way around + child.add(parent); + child.setProducer(parent.manager()); - @Override - public void onStart() { - request(Long.MAX_VALUE); - } + return parent; + } + private static final class BufferSubscriber extends Subscriber implements BackpressureDrainManager.BackpressureQueueCallback { + // TODO get a different queue implementation + private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue(); + private final Long baseCapacity; + private final AtomicLong capacity; + private final Subscriber child; + private final AtomicBoolean saturated = new AtomicBoolean(false); + private final BackpressureDrainManager manager; + private final NotificationLite on = NotificationLite.instance(); + private final Action0 onOverflow; + + public BufferSubscriber(final Subscriber child, Long capacity, Action0 onOverflow) { + this.child = child; + this.baseCapacity = capacity; + this.capacity = capacity != null ? new AtomicLong(capacity) : null; + this.onOverflow = onOverflow; + this.manager = new BackpressureDrainManager(this); + } + @Override + public void onStart() { + request(Long.MAX_VALUE); + } - @Override - public void onCompleted() { - if (!saturated.get()) { - queue.offer(on.completed()); - pollQueue(wip, requested, capacity, queue, child); - } + @Override + public void onCompleted() { + if (!saturated.get()) { + manager.terminateAndDrain(); } + } - @Override - public void onError(Throwable e) { - if (!saturated.get()) { - queue.offer(on.error(e)); - pollQueue(wip, requested, capacity, queue, child); - } + @Override + public void onError(Throwable e) { + if (!saturated.get()) { + manager.terminateAndDrain(e); } + } - @Override - public void onNext(T t) { - if (!assertCapacity()) { - return; - } - queue.offer(on.next(t)); - pollQueue(wip, requested, capacity, queue, child); + @Override + public void onNext(T t) { + if (!assertCapacity()) { + return; } + queue.offer(on.next(t)); + manager.drain(); + } - private boolean assertCapacity() { - if (capacity == null) { - return true; - } - - long currCapacity; - do { - currCapacity = capacity.get(); - if (currCapacity <= 0) { - if (saturated.compareAndSet(false, true)) { - unsubscribe(); - child.onError(new MissingBackpressureException( - "Overflowed buffer of " - + OperatorOnBackpressureBuffer.this.capacity)); - if (onOverflow != null) { - onOverflow.call(); - } - } - return false; - } - // ensure no other thread stole our slot, or retry - } while (!capacity.compareAndSet(currCapacity, currCapacity - 1)); - return true; + @Override + public boolean accept(Object value) { + return on.accept(child, value); + } + @Override + public void complete(Throwable exception) { + if (exception != null) { + child.onError(exception); + } else { + child.onCompleted(); } - }; - - // if child unsubscribes it should unsubscribe the parent, but not the other way around - child.add(parent); + } + @Override + public Object peek() { + return queue.peek(); + } + @Override + public Object poll() { + return queue.poll(); + } - return parent; - } + private boolean assertCapacity() { + if (capacity == null) { + return true; + } - private void pollQueue(AtomicLong wip, AtomicLong requested, AtomicLong capacity, Queue queue, Subscriber child) { - // TODO can we do this without putting everything in the queue first so we can fast-path the case when we don't need to queue? - if (requested.get() > 0) { - // only one draining at a time - try { - /* - * This needs to protect against concurrent execution because `request` and `on*` events can come concurrently. - */ - if (wip.getAndIncrement() == 0) { - while (true) { - if (requested.getAndDecrement() != 0) { - Object o = queue.poll(); - if (o == null) { - // nothing in queue - requested.incrementAndGet(); - return; - } - if (capacity != null) { // it's bounded - capacity.incrementAndGet(); - } - on.accept(child, o); - } else { - // we hit the end ... so increment back to 0 again - requested.incrementAndGet(); - return; + long currCapacity; + do { + currCapacity = capacity.get(); + if (currCapacity <= 0) { + if (saturated.compareAndSet(false, true)) { + unsubscribe(); + child.onError(new MissingBackpressureException( + "Overflowed buffer of " + + baseCapacity)); + if (onOverflow != null) { + onOverflow.call(); } } + return false; } - - } finally { - wip.decrementAndGet(); - } + // ensure no other thread stole our slot, or retry + } while (!capacity.compareAndSet(currCapacity, currCapacity - 1)); + return true; + } + protected Producer manager() { + return manager; } } } diff --git a/src/main/java/rx/internal/util/BackpressureDrainManager.java b/src/main/java/rx/internal/util/BackpressureDrainManager.java new file mode 100644 index 0000000000..78a4d29c3e --- /dev/null +++ b/src/main/java/rx/internal/util/BackpressureDrainManager.java @@ -0,0 +1,238 @@ +/* + * Copyright 2011 David Karnok + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.util; + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +import rx.Producer; +import rx.annotations.Experimental; + +/** + * Manages the producer-backpressure-consumer interplay by + * matching up available elements with requested elements and/or + * terminal events. + */ +@Experimental +public final class BackpressureDrainManager implements Producer { + /** + * Interface representing the minimal callbacks required + * to operate the drain part of a backpressure system. + */ + public interface BackpressureQueueCallback { + /** + * Override this method to peek for the next element, + * null meaning no next element available now. + *

It will be called plain and while holding this object's monitor. + * @return the next element or null if no next element available + */ + Object peek(); + /** + * Override this method to poll (consume) the next element, + * null meaning no next element available now. + * @return the next element or null if no next element available + */ + Object poll(); + /** + * Override this method to deliver an element to downstream. + * The logic ensures that this happens only in the right conditions. + * @param value the value to deliver, not null + * @return true indicates that one should terminate the emission loop unconditionally + * and not deliver any further elements or terminal events. + */ + boolean accept(Object value); + /** + * Override this method to deliver a normal or exceptional + * terminal event. + * @param exception if not null, contains the terminal exception + */ + void complete(Throwable exception); + } + + /** The request counter, updated via REQUESTED_COUNTER. */ + protected volatile long requestedCount; + /** Atomically updates the the requestedCount field. */ + protected static final AtomicLongFieldUpdater REQUESTED_COUNT + = AtomicLongFieldUpdater.newUpdater(BackpressureDrainManager.class, "requestedCount"); + /** Indicates if one is in emitting phase, guarded by this. */ + protected boolean emitting; + /** Indicates a terminal state. */ + protected volatile boolean terminated; + /** Indicates an error state, barrier is provided via terminated. */ + protected Throwable exception; + /** The callbacks to manage the drain. */ + protected final BackpressureQueueCallback actual; + /** + * Constructs a backpressure drain manager with 0 requesedCount, + * no terminal event and not emitting. + * @param actual he queue callback to check for new element availability + */ + public BackpressureDrainManager(BackpressureQueueCallback actual) { + this.actual = actual; + } + /** + * Checks if a terminal state has been reached. + * @return true if a terminal state has been reached + */ + public final boolean isTerminated() { + return terminated; + } + /** + * Move into a terminal state. + * Call drain() anytime after. + */ + public final void terminate() { + terminated = true; + } + /** + * Move into a terminal state with an exception. + * Call drain() anytime after. + *

Serialized access is expected with respect to + * element emission. + * @param error the exception to deliver + */ + public final void terminate(Throwable error) { + if (!terminated) { + exception = error; + terminated = true; + } + } + /** + * Move into a terminal state and drain. + */ + public final void terminateAndDrain() { + terminated = true; + drain(); + } + /** + * Move into a terminal state with an exception and drain. + *

Serialized access is expected with respect to + * element emission. + * @param error the exception to deliver + */ + public final void terminateAndDrain(Throwable error) { + if (!terminated) { + exception = error; + terminated = true; + drain(); + } + } + @Override + public final void request(long n) { + if (n == 0) { + return; + } + boolean mayDrain; + long r; + long u; + do { + r = requestedCount; + mayDrain = r == 0; + if (r == Long.MAX_VALUE) { + mayDrain = true; + break; + } + if (n == Long.MAX_VALUE) { + u = n; + mayDrain = true; + } else { + u = r + n; + } + } while (!REQUESTED_COUNT.compareAndSet(this, r, u)); + // since we implement producer, we have to call drain + // on a 0-n request transition + if (mayDrain) { + drain(); + } + } + /** + * Try to drain the "queued" elements and terminal events + * by considering the available and requested event counts. + */ + public final void drain() { + long n; + boolean term; + synchronized (this) { + if (emitting) { + return; + } + emitting = true; + term = terminated; + } + n = requestedCount; + boolean skipFinal = false; + try { + BackpressureQueueCallback a = actual; + while (true) { + int emitted = 0; + while (n > 0 || term) { + Object o; + if (term) { + o = a.peek(); + if (o == null) { + skipFinal = true; + Throwable e = exception; + a.complete(e); + return; + } + if (n == 0) { + break; + } + } + o = a.poll(); + if (o == null) { + break; + } else { + if (a.accept(o)) { + skipFinal = true; + return; + } + n--; + emitted++; + } + } + synchronized (this) { + term = terminated; + boolean more = a.peek() != null; + // if no backpressure below + if (requestedCount == Long.MAX_VALUE) { + // no new data arrived since the last poll + if (!more && !term) { + skipFinal = true; + emitting = false; + return; + } + n = Long.MAX_VALUE; + } else { + n = REQUESTED_COUNT.addAndGet(this, -emitted); + if ((n == 0 || !more) && (!term || more)) { + skipFinal = true; + emitting = false; + return; + } + } + } + } + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + + } +} From a82eff91bebb55fce4fa32225e327170a0ad9246 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 18 Dec 2014 09:29:52 +0100 Subject: [PATCH 3/7] Fixed potential request value overflow. --- .../java/rx/internal/util/BackpressureDrainManager.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/main/java/rx/internal/util/BackpressureDrainManager.java b/src/main/java/rx/internal/util/BackpressureDrainManager.java index 78a4d29c3e..f898805e33 100644 --- a/src/main/java/rx/internal/util/BackpressureDrainManager.java +++ b/src/main/java/rx/internal/util/BackpressureDrainManager.java @@ -142,14 +142,17 @@ public final void request(long n) { r = requestedCount; mayDrain = r == 0; if (r == Long.MAX_VALUE) { - mayDrain = true; break; } if (n == Long.MAX_VALUE) { u = n; mayDrain = true; } else { - u = r + n; + if (r > Long.MAX_VALUE - n) { + u = Long.MAX_VALUE; + } else { + u = r + n; + } } } while (!REQUESTED_COUNT.compareAndSet(this, r, u)); // since we implement producer, we have to call drain From c653ae33d4d996bbedec220973f559e48b8680e7 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 20 Jan 2015 11:59:35 +0100 Subject: [PATCH 4/7] Fixed file comment, larger timeout for a test --- .../rx/internal/util/BackpressureDrainManager.java | 13 ++++++------- .../operators/OperatorOnBackpressureBufferTest.java | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/main/java/rx/internal/util/BackpressureDrainManager.java b/src/main/java/rx/internal/util/BackpressureDrainManager.java index f898805e33..7e521d0bba 100644 --- a/src/main/java/rx/internal/util/BackpressureDrainManager.java +++ b/src/main/java/rx/internal/util/BackpressureDrainManager.java @@ -1,19 +1,18 @@ -/* - * Copyright 2011 David Karnok - * +/** + * Copyright 2014 Netflix, Inc. + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ - package rx.internal.util; import java.util.concurrent.atomic.AtomicLongFieldUpdater; diff --git a/src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java b/src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java index 34cb73cfc0..ff2d8e0c6f 100644 --- a/src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java +++ b/src/test/java/rx/internal/operators/OperatorOnBackpressureBufferTest.java @@ -47,7 +47,7 @@ public void testNoBackpressureSupport() { ts.assertNoErrors(); } - @Test(timeout = 500) + @Test(timeout = 2000) public void testFixBackpressureWithBuffer() throws InterruptedException { final CountDownLatch l1 = new CountDownLatch(100); final CountDownLatch l2 = new CountDownLatch(150); From a3b454d849bb461e9ce09d967cfec31b1ebc0c8e Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 26 Jan 2015 13:28:59 +0100 Subject: [PATCH 5/7] Fixed line delimiters. --- .../util/BackpressureDrainManager.java | 480 +++++++++--------- 1 file changed, 240 insertions(+), 240 deletions(-) diff --git a/src/main/java/rx/internal/util/BackpressureDrainManager.java b/src/main/java/rx/internal/util/BackpressureDrainManager.java index 7e521d0bba..f4a95573e7 100644 --- a/src/main/java/rx/internal/util/BackpressureDrainManager.java +++ b/src/main/java/rx/internal/util/BackpressureDrainManager.java @@ -1,240 +1,240 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.internal.util; - -import java.util.concurrent.atomic.AtomicLongFieldUpdater; - -import rx.Producer; -import rx.annotations.Experimental; - -/** - * Manages the producer-backpressure-consumer interplay by - * matching up available elements with requested elements and/or - * terminal events. - */ -@Experimental -public final class BackpressureDrainManager implements Producer { - /** - * Interface representing the minimal callbacks required - * to operate the drain part of a backpressure system. - */ - public interface BackpressureQueueCallback { - /** - * Override this method to peek for the next element, - * null meaning no next element available now. - *

It will be called plain and while holding this object's monitor. - * @return the next element or null if no next element available - */ - Object peek(); - /** - * Override this method to poll (consume) the next element, - * null meaning no next element available now. - * @return the next element or null if no next element available - */ - Object poll(); - /** - * Override this method to deliver an element to downstream. - * The logic ensures that this happens only in the right conditions. - * @param value the value to deliver, not null - * @return true indicates that one should terminate the emission loop unconditionally - * and not deliver any further elements or terminal events. - */ - boolean accept(Object value); - /** - * Override this method to deliver a normal or exceptional - * terminal event. - * @param exception if not null, contains the terminal exception - */ - void complete(Throwable exception); - } - - /** The request counter, updated via REQUESTED_COUNTER. */ - protected volatile long requestedCount; - /** Atomically updates the the requestedCount field. */ - protected static final AtomicLongFieldUpdater REQUESTED_COUNT - = AtomicLongFieldUpdater.newUpdater(BackpressureDrainManager.class, "requestedCount"); - /** Indicates if one is in emitting phase, guarded by this. */ - protected boolean emitting; - /** Indicates a terminal state. */ - protected volatile boolean terminated; - /** Indicates an error state, barrier is provided via terminated. */ - protected Throwable exception; - /** The callbacks to manage the drain. */ - protected final BackpressureQueueCallback actual; - /** - * Constructs a backpressure drain manager with 0 requesedCount, - * no terminal event and not emitting. - * @param actual he queue callback to check for new element availability - */ - public BackpressureDrainManager(BackpressureQueueCallback actual) { - this.actual = actual; - } - /** - * Checks if a terminal state has been reached. - * @return true if a terminal state has been reached - */ - public final boolean isTerminated() { - return terminated; - } - /** - * Move into a terminal state. - * Call drain() anytime after. - */ - public final void terminate() { - terminated = true; - } - /** - * Move into a terminal state with an exception. - * Call drain() anytime after. - *

Serialized access is expected with respect to - * element emission. - * @param error the exception to deliver - */ - public final void terminate(Throwable error) { - if (!terminated) { - exception = error; - terminated = true; - } - } - /** - * Move into a terminal state and drain. - */ - public final void terminateAndDrain() { - terminated = true; - drain(); - } - /** - * Move into a terminal state with an exception and drain. - *

Serialized access is expected with respect to - * element emission. - * @param error the exception to deliver - */ - public final void terminateAndDrain(Throwable error) { - if (!terminated) { - exception = error; - terminated = true; - drain(); - } - } - @Override - public final void request(long n) { - if (n == 0) { - return; - } - boolean mayDrain; - long r; - long u; - do { - r = requestedCount; - mayDrain = r == 0; - if (r == Long.MAX_VALUE) { - break; - } - if (n == Long.MAX_VALUE) { - u = n; - mayDrain = true; - } else { - if (r > Long.MAX_VALUE - n) { - u = Long.MAX_VALUE; - } else { - u = r + n; - } - } - } while (!REQUESTED_COUNT.compareAndSet(this, r, u)); - // since we implement producer, we have to call drain - // on a 0-n request transition - if (mayDrain) { - drain(); - } - } - /** - * Try to drain the "queued" elements and terminal events - * by considering the available and requested event counts. - */ - public final void drain() { - long n; - boolean term; - synchronized (this) { - if (emitting) { - return; - } - emitting = true; - term = terminated; - } - n = requestedCount; - boolean skipFinal = false; - try { - BackpressureQueueCallback a = actual; - while (true) { - int emitted = 0; - while (n > 0 || term) { - Object o; - if (term) { - o = a.peek(); - if (o == null) { - skipFinal = true; - Throwable e = exception; - a.complete(e); - return; - } - if (n == 0) { - break; - } - } - o = a.poll(); - if (o == null) { - break; - } else { - if (a.accept(o)) { - skipFinal = true; - return; - } - n--; - emitted++; - } - } - synchronized (this) { - term = terminated; - boolean more = a.peek() != null; - // if no backpressure below - if (requestedCount == Long.MAX_VALUE) { - // no new data arrived since the last poll - if (!more && !term) { - skipFinal = true; - emitting = false; - return; - } - n = Long.MAX_VALUE; - } else { - n = REQUESTED_COUNT.addAndGet(this, -emitted); - if ((n == 0 || !more) && (!term || more)) { - skipFinal = true; - emitting = false; - return; - } - } - } - } - } finally { - if (!skipFinal) { - synchronized (this) { - emitting = false; - } - } - } - - } -} +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.util; + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +import rx.Producer; +import rx.annotations.Experimental; + +/** + * Manages the producer-backpressure-consumer interplay by + * matching up available elements with requested elements and/or + * terminal events. + */ +@Experimental +public final class BackpressureDrainManager implements Producer { + /** + * Interface representing the minimal callbacks required + * to operate the drain part of a backpressure system. + */ + public interface BackpressureQueueCallback { + /** + * Override this method to peek for the next element, + * null meaning no next element available now. + *

It will be called plain and while holding this object's monitor. + * @return the next element or null if no next element available + */ + Object peek(); + /** + * Override this method to poll (consume) the next element, + * null meaning no next element available now. + * @return the next element or null if no next element available + */ + Object poll(); + /** + * Override this method to deliver an element to downstream. + * The logic ensures that this happens only in the right conditions. + * @param value the value to deliver, not null + * @return true indicates that one should terminate the emission loop unconditionally + * and not deliver any further elements or terminal events. + */ + boolean accept(Object value); + /** + * Override this method to deliver a normal or exceptional + * terminal event. + * @param exception if not null, contains the terminal exception + */ + void complete(Throwable exception); + } + + /** The request counter, updated via REQUESTED_COUNTER. */ + protected volatile long requestedCount; + /** Atomically updates the the requestedCount field. */ + protected static final AtomicLongFieldUpdater REQUESTED_COUNT + = AtomicLongFieldUpdater.newUpdater(BackpressureDrainManager.class, "requestedCount"); + /** Indicates if one is in emitting phase, guarded by this. */ + protected boolean emitting; + /** Indicates a terminal state. */ + protected volatile boolean terminated; + /** Indicates an error state, barrier is provided via terminated. */ + protected Throwable exception; + /** The callbacks to manage the drain. */ + protected final BackpressureQueueCallback actual; + /** + * Constructs a backpressure drain manager with 0 requesedCount, + * no terminal event and not emitting. + * @param actual he queue callback to check for new element availability + */ + public BackpressureDrainManager(BackpressureQueueCallback actual) { + this.actual = actual; + } + /** + * Checks if a terminal state has been reached. + * @return true if a terminal state has been reached + */ + public final boolean isTerminated() { + return terminated; + } + /** + * Move into a terminal state. + * Call drain() anytime after. + */ + public final void terminate() { + terminated = true; + } + /** + * Move into a terminal state with an exception. + * Call drain() anytime after. + *

Serialized access is expected with respect to + * element emission. + * @param error the exception to deliver + */ + public final void terminate(Throwable error) { + if (!terminated) { + exception = error; + terminated = true; + } + } + /** + * Move into a terminal state and drain. + */ + public final void terminateAndDrain() { + terminated = true; + drain(); + } + /** + * Move into a terminal state with an exception and drain. + *

Serialized access is expected with respect to + * element emission. + * @param error the exception to deliver + */ + public final void terminateAndDrain(Throwable error) { + if (!terminated) { + exception = error; + terminated = true; + drain(); + } + } + @Override + public final void request(long n) { + if (n == 0) { + return; + } + boolean mayDrain; + long r; + long u; + do { + r = requestedCount; + mayDrain = r == 0; + if (r == Long.MAX_VALUE) { + break; + } + if (n == Long.MAX_VALUE) { + u = n; + mayDrain = true; + } else { + if (r > Long.MAX_VALUE - n) { + u = Long.MAX_VALUE; + } else { + u = r + n; + } + } + } while (!REQUESTED_COUNT.compareAndSet(this, r, u)); + // since we implement producer, we have to call drain + // on a 0-n request transition + if (mayDrain) { + drain(); + } + } + /** + * Try to drain the "queued" elements and terminal events + * by considering the available and requested event counts. + */ + public final void drain() { + long n; + boolean term; + synchronized (this) { + if (emitting) { + return; + } + emitting = true; + term = terminated; + } + n = requestedCount; + boolean skipFinal = false; + try { + BackpressureQueueCallback a = actual; + while (true) { + int emitted = 0; + while (n > 0 || term) { + Object o; + if (term) { + o = a.peek(); + if (o == null) { + skipFinal = true; + Throwable e = exception; + a.complete(e); + return; + } + if (n == 0) { + break; + } + } + o = a.poll(); + if (o == null) { + break; + } else { + if (a.accept(o)) { + skipFinal = true; + return; + } + n--; + emitted++; + } + } + synchronized (this) { + term = terminated; + boolean more = a.peek() != null; + // if no backpressure below + if (requestedCount == Long.MAX_VALUE) { + // no new data arrived since the last poll + if (!more && !term) { + skipFinal = true; + emitting = false; + return; + } + n = Long.MAX_VALUE; + } else { + n = REQUESTED_COUNT.addAndGet(this, -emitted); + if ((n == 0 || !more) && (!term || more)) { + skipFinal = true; + emitting = false; + return; + } + } + } + } + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + + } +} From 34bf40aa7536e63db5a81302deb98d497b0240ef Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 26 Jan 2015 13:33:45 +0100 Subject: [PATCH 6/7] Added capacity increase on poll. --- .../rx/internal/operators/OperatorOnBackpressureBuffer.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java index 1517736dbd..d323122f8a 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java @@ -125,7 +125,11 @@ public Object peek() { } @Override public Object poll() { - return queue.poll(); + Object value = queue.poll(); + if (capacity != null) { + capacity.incrementAndGet(); + } + return value; } private boolean assertCapacity() { From 047cc2863b6201fa9b1e3f3c79d545a87798f6bf Mon Sep 17 00:00:00 2001 From: David Karnok Date: Mon, 26 Jan 2015 16:41:04 +0100 Subject: [PATCH 7/7] Added value null check --- .../rx/internal/operators/OperatorOnBackpressureBuffer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java index d323122f8a..e35c489d5c 100644 --- a/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureBuffer.java @@ -126,7 +126,7 @@ public Object peek() { @Override public Object poll() { Object value = queue.poll(); - if (capacity != null) { + if (capacity != null && value != null) { capacity.incrementAndGet(); } return value;