diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 27a97b4b3d..70a5655011 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -15,12 +15,12 @@ import java.util.*; import java.util.concurrent.*; +import rx.annotations.Experimental; import rx.exceptions.*; import rx.functions.*; import rx.internal.operators.*; import rx.internal.util.ScalarSynchronousObservable; import rx.internal.util.UtilityFunctions; - import rx.observables.*; import rx.observers.SafeSubscriber; import rx.plugins.*; @@ -182,6 +182,7 @@ public void call(Subscriber o) { * @return the source Observable, transformed by the transformer function * @see RxJava wiki: Implementing Your Own Operators */ + @SuppressWarnings("unchecked") public Observable compose(Transformer transformer) { return ((Transformer) transformer).call(this); } @@ -5054,6 +5055,47 @@ public final Observable onBackpressureDrop() { return lift(new OperatorOnBackpressureDrop()); } + /** + * Instructs an Observable that is emitting items faster than its observer can consume them is to + * block the producer thread. + *

+ * The producer side can emit up to {@code maxQueueLength} onNext elements without blocking, but the + * consumer side considers the amount its downstream requested through {@code Producer.request(n)} + * and doesn't emit more than requested even if more is available. For example, using + * {@code onBackpressureBlock(384).observeOn(Schedulers.io())} will not throw a MissingBackpressureException. + *

+ * Note that if the upstream Observable does support backpressure, this operator ignores that capability + * and doesn't propagate any backpressure requests from downstream. + * + * @param maxQueueLength the maximum number of items the producer can emit without blocking + * @return the source Observable modified to block {@code onNext} notifications on overflow + * @see RxJava wiki: Backpressure + * @Experimental The behavior of this can change at any time. + */ + @Experimental + public final Observable onBackpressureBlock(int maxQueueLength) { + return lift(new OperatorOnBackpressureBlock(maxQueueLength)); + } + /** + * Instructs an Observable that is emitting items faster than its observer can consume them is to + * block the producer thread if the number of undelivered onNext events reaches the system-wide ring buffer size. + *

+ * The producer side can emit up to the system-wide ring buffer size onNext elements without blocking, but the + * consumer side considers the amount its downstream requested through {@code Producer.request(n)} + * and doesn't emit more than requested even if available. + *

+ * Note that if the upstream Observable does support backpressure, this operator ignores that capability + * and doesn't propagate any backpressure requests from downstream. + * + * @return the source Observable modified to block {@code onNext} notifications on overflow + * @see RxJava wiki: Backpressure + * @Experimental The behavior of this can change at any time. + */ + @Experimental + public final Observable onBackpressureBlock() { + return onBackpressureBlock(rx.internal.util.RxRingBuffer.SIZE); + } + /** * Instructs an Observable to pass control to another Observable rather than invoking * {@link Observer#onError onError} if it encounters an error. diff --git a/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java b/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java new file mode 100644 index 0000000000..bb328788e9 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorOnBackpressureBlock.java @@ -0,0 +1,157 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; + +import rx.Observable.Operator; +import rx.Producer; +import rx.Subscriber; + +/** + * Operator that blocks the producer thread in case a backpressure is needed. + */ +public class OperatorOnBackpressureBlock implements Operator { + final int max; + public OperatorOnBackpressureBlock(int max) { + this.max = max; + } + @Override + public Subscriber call(Subscriber child) { + BlockingSubscriber s = new BlockingSubscriber(max, child); + s.init(); + return s; + } + + static final class BlockingSubscriber extends Subscriber { + final NotificationLite nl = NotificationLite.instance(); + final BlockingQueue queue; + final Subscriber child; + /** Guarded by this. */ + long requestedCount; + /** Guarded by this. */ + boolean emitting; + volatile boolean terminated; + /** Set before terminated, read after terminated. */ + Throwable exception; + public BlockingSubscriber(int max, Subscriber child) { + this.queue = new ArrayBlockingQueue(max); + this.child = child; + } + void init() { + child.add(this); + child.setProducer(new Producer() { + @Override + public void request(long n) { + synchronized (BlockingSubscriber.this) { + if (n == Long.MAX_VALUE || requestedCount == Long.MAX_VALUE) { + requestedCount = Long.MAX_VALUE; + } else { + requestedCount += n; + } + } + drain(); + } + }); + } + @Override + public void onNext(T t) { + try { + queue.put(nl.next(t)); + drain(); + } catch (InterruptedException ex) { + if (!isUnsubscribed()) { + onError(ex); + } + } + } + @Override + public void onError(Throwable e) { + if (!terminated) { + exception = e; + terminated = true; + drain(); + } + } + @Override + public void onCompleted() { + terminated = true; + drain(); + } + void drain() { + long n; + synchronized (this) { + if (emitting) { + return; + } + emitting = true; + n = requestedCount; + } + boolean skipFinal = false; + try { + while (true) { + int emitted = 0; + while (n > 0) { + Object o = queue.poll(); + if (o == null) { + if (terminated) { + if (exception != null) { + child.onError(exception); + } else { + child.onCompleted(); + } + return; + } + break; + } else { + child.onNext(nl.getValue(o)); + n--; + emitted++; + } + } + synchronized (this) { + // if no backpressure below + if (requestedCount == Long.MAX_VALUE) { + // no new data arrived since the last poll + if (queue.peek() == null) { + skipFinal = true; + emitting = false; + return; + } + n = Long.MAX_VALUE; + } else { + if (emitted == 0) { + skipFinal = true; + emitting = false; + return; + } + requestedCount -= emitted; + n = requestedCount; + } + } + } + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + } + } +} diff --git a/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java b/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java new file mode 100644 index 0000000000..b247b15e6b --- /dev/null +++ b/src/test/java/rx/internal/operators/OnBackpressureBlockTest.java @@ -0,0 +1,262 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.operators; + +import static org.mockito.Mockito.*; + +import java.util.Arrays; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Observer; +import rx.Subscriber; +import rx.exceptions.MissingBackpressureException; +import rx.exceptions.TestException; +import rx.internal.util.RxRingBuffer; +import rx.observers.TestObserver; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; + +/** + * Test the onBackpressureBlock() behavior. + */ +public class OnBackpressureBlockTest { + static final int WAIT = 200; + + @Test(timeout = 1000) + public void testSimpleBelowCapacity() { + Observable source = Observable.just(1).onBackpressureBlock(10); + + TestObserver o = new TestObserver(); + source.subscribe(o); + + o.assertReceivedOnNext(Arrays.asList(1)); + o.assertTerminalEvent(); + assertTrue(o.getOnErrorEvents().isEmpty()); + } + @Test(timeout = 10000) + public void testSimpleAboveCapacity() throws InterruptedException { + Observable source = Observable.range(1, 11).subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(10); + + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + }; + source.subscribe(o); + o.requestMore(10); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + + o.requestMore(10); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11)); + + o.assertTerminalEvent(); + assertTrue(o.getOnErrorEvents().isEmpty()); + } + + @Test(timeout = 3000) + public void testNoMissingBackpressureException() { + final int NUM_VALUES = RxRingBuffer.SIZE * 3; + Observable source = Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber t1) { + for (int i = 0; i < NUM_VALUES; i++) { + t1.onNext(i); + } + t1.onCompleted(); + } + }).subscribeOn(Schedulers.newThread()); + + @SuppressWarnings("unchecked") + Observer o = mock(Observer.class); + + TestSubscriber s = new TestSubscriber(o); + + source.onBackpressureBlock(RxRingBuffer.SIZE).observeOn(Schedulers.newThread()).subscribe(s); + + s.awaitTerminalEvent(); + + verify(o, never()).onError(any(MissingBackpressureException.class)); + + s.assertNoErrors(); + verify(o, times(NUM_VALUES)).onNext(any(Integer.class)); + verify(o).onCompleted(); + } + @Test(timeout = 10000) + public void testBlockedProducerCanBeUnsubscribed() throws InterruptedException { + Observable source = Observable.range(1, 11).subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(5); + + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + }; + source.subscribe(o); + + o.requestMore(5); + + Thread.sleep(WAIT); + + o.unsubscribe(); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5)); + o.assertNoErrors(); + assertTrue(o.getOnCompletedEvents().isEmpty()); + } + @Test(timeout = 10000) + public void testExceptionIsDelivered() throws InterruptedException { + Observable source = Observable.range(1, 10) + .concatWith(Observable.error(new TestException("Forced failure"))) + .subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(5); + + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + }; + source.subscribe(o); + + o.requestMore(7); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + o.assertNoErrors(); + assertTrue(o.getOnCompletedEvents().isEmpty()); + + o.requestMore(3); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + o.assertNoErrors(); + assertTrue(o.getOnCompletedEvents().isEmpty()); + + o.requestMore(10); + + Thread.sleep(WAIT); + + o.assertTerminalEvent(); + assertEquals(1, o.getOnErrorEvents().size()); + assertTrue(o.getOnErrorEvents().get(0) instanceof TestException); + } + @Test(timeout = 10000) + public void testExceptionIsDeliveredAfterValues() throws InterruptedException { + Observable source = Observable.range(1, 10) + .concatWith(Observable.error(new TestException("Forced failure"))) + .subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(5); + + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + }; + source.subscribe(o); + + o.requestMore(7); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + o.assertNoErrors(); + assertTrue(o.getOnCompletedEvents().isEmpty()); + + o.requestMore(7); + + Thread.sleep(WAIT); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + assertEquals(1, o.getOnErrorEvents().size()); + assertTrue(o.getOnErrorEvents().get(0) instanceof TestException); + assertTrue(o.getOnCompletedEvents().isEmpty()); + } + @Test(timeout = 10000) + public void testTakeWorksWithSubscriberRequesting() { + Observable source = Observable.range(1, 10) + .concatWith(Observable.error(new TestException("Forced failure"))) + .subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(5).take(7); + + TestSubscriber o = new TestSubscriber() { + @Override + public void onStart() { + request(0); // make sure it doesn't start in unlimited mode + } + }; + source.subscribe(o); + + o.requestMore(7); + + o.awaitTerminalEvent(); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + o.assertNoErrors(); + o.assertTerminalEvent(); + } + @Test(timeout = 10000) + public void testTakeWorksSubscriberRequestUnlimited() { + Observable source = Observable.range(1, 10) + .concatWith(Observable.error(new TestException("Forced failure"))) + .subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(5).take(7); + + TestSubscriber o = new TestSubscriber(); + source.subscribe(o); + + o.awaitTerminalEvent(); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + o.assertNoErrors(); + o.assertTerminalEvent(); + } + @Test(timeout = 10000) + public void testTakeWorksSubscriberRequestUnlimitedBufferedException() { + Observable source = Observable.range(1, 10) + .concatWith(Observable.error(new TestException("Forced failure"))) + .subscribeOn(Schedulers.newThread()) + .onBackpressureBlock(11).take(7); + + TestSubscriber o = new TestSubscriber(); + source.subscribe(o); + + o.awaitTerminalEvent(); + + o.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7)); + o.assertNoErrors(); + o.assertTerminalEvent(); + } +}