From 68367092136798a60f27e3a0c13b09ebb8725e85 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 10 Jul 2014 20:29:35 -0700 Subject: [PATCH 1/3] Zip with Backpressure Support This supports both upstream and downstream backpressure. --- .../rx/internal/operators/OperatorZip.java | 242 +++++++++++------- .../java/rx/internal/util/RxRingBuffer.java | 26 +- .../src/test/java/rx/BackpressureTests.java | 44 ++-- .../internal/operators/OperatorZipTest.java | 215 +++++++++++++--- 4 files changed, 367 insertions(+), 160 deletions(-) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorZip.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorZip.java index 9c33bc328f..6d56c10234 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorZip.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorZip.java @@ -15,13 +15,15 @@ */ package rx.internal.operators; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import rx.Observable; import rx.Observable.Operator; import rx.Observer; +import rx.Producer; import rx.Subscriber; +import rx.exceptions.MissingBackpressureException; import rx.exceptions.OnErrorThrowable; import rx.functions.Func2; import rx.functions.Func3; @@ -33,6 +35,7 @@ import rx.functions.Func9; import rx.functions.FuncN; import rx.functions.Functions; +import rx.internal.util.RxRingBuffer; import rx.subscriptions.CompositeSubscription; /** @@ -48,7 +51,9 @@ *

* The resulting Observable returned from zip will invoke onNext as many times as the * number of onNext invocations of the source Observable that emits the fewest items. - * @param the result type + * + * @param + * the result type */ public final class OperatorZip implements Operator[]> { /* @@ -104,69 +109,106 @@ public OperatorZip(Func9 f) { @SuppressWarnings("rawtypes") @Override - public Subscriber call(final Subscriber observer) { - return new Subscriber(observer) { + public Subscriber call(final Subscriber child) { + final Zip zipper = new Zip(child, zipFunction); + final ZipProducer producer = new ZipProducer(zipper); + child.setProducer(producer); + final ZipSubscriber subscriber = new ZipSubscriber(child, zipper, producer); + return subscriber; + } - boolean started = false; + private final class ZipSubscriber extends Subscriber { - @Override - public void onCompleted() { - if (!started) { - // this means we have not received a valid onNext before termination so we emit the onCompleted - observer.onCompleted(); - } - } + final Subscriber child; + final Zip zipper; + final ZipProducer producer; - @Override - public void onError(Throwable e) { - observer.onError(e); + public ZipSubscriber(Subscriber child, Zip zipper, ZipProducer producer) { + super(child); + this.child = child; + this.zipper = zipper; + this.producer = producer; + } + + boolean started = false; + + @Override + public void onCompleted() { + if (!started) { + // this means we have not received a valid onNext before termination so we emit the onCompleted + child.onCompleted(); } + } - @Override - public void onNext(Observable[] observables) { - if (observables == null || observables.length == 0) { - observer.onCompleted(); - } else { - started = true; - new Zip(observables, observer, zipFunction).zip(); - } + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(Observable[] observables) { + if (observables == null || observables.length == 0) { + child.onCompleted(); + } else { + started = true; + zipper.start(observables, producer); } + } + + } + + private static final class ZipProducer extends AtomicLong implements Producer { + + private Zip zipper; + + public ZipProducer(Zip zipper) { + this.zipper = zipper; + } + + @Override + public void request(long n) { + addAndGet(n); + // try and claim emission if no other threads are doing so + zipper.tick(); + } - }; } - static final NotificationLite on = NotificationLite.instance(); private static final class Zip { - @SuppressWarnings("rawtypes") - final Observable[] os; - final Object[] observers; - final Observer observer; - final FuncN zipFunction; - final CompositeSubscription childSubscription = new CompositeSubscription(); + private final Observer child; + private final FuncN zipFunction; + private final CompositeSubscription childSubscription = new CompositeSubscription(); + volatile long counter; @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater COUNTER_UPDATER - = AtomicLongFieldUpdater.newUpdater(Zip.class, "counter"); + static final AtomicLongFieldUpdater COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(Zip.class, "counter"); + + static final int THRESHOLD = (int) (RxRingBuffer.SIZE * 0.7); + int emitted = 0; // not volatile/synchronized as accessed inside COUNTER_UPDATER block + + /* initialized when started in `start` */ + private Object[] observers; + private AtomicLong requested; @SuppressWarnings("rawtypes") - public Zip(Observable[] os, final Subscriber observer, FuncN zipFunction) { - this.os = os; - this.observer = observer; + public Zip(final Subscriber child, FuncN zipFunction) { + this.child = child; this.zipFunction = zipFunction; + child.add(childSubscription); + } + + @SuppressWarnings("unchecked") + public void start(@SuppressWarnings("rawtypes") Observable[] os, AtomicLong requested) { observers = new Object[os.length]; + this.requested = requested; for (int i = 0; i < os.length; i++) { - InnerObserver io = new InnerObserver(); + InnerSubscriber io = new InnerSubscriber(); observers[i] = io; childSubscription.add(io); } - observer.add(childSubscription); - } - - @SuppressWarnings("unchecked") - public void zip() { for (int i = 0; i < os.length; i++) { - os[i].unsafeSubscribe((InnerObserver) observers[i]); + os[i].unsafeSubscribe((InnerSubscriber) observers[i]); } } @@ -179,51 +221,64 @@ public void zip() { */ @SuppressWarnings("unchecked") void tick() { + if (observers == null) { + // nothing yet to do (initial request from Producer) + return; + } if (COUNTER_UPDATER.getAndIncrement(this) == 0) { do { - final Object[] vs = new Object[observers.length]; - boolean allHaveValues = true; - for (int i = 0; i < observers.length; i++) { - Object n = ((InnerObserver) observers[i]).items.peek(); - - if (n == null) { - allHaveValues = false; - continue; - } + // we only emit if requested > 0 + if (requested.get() > 0) { + final Object[] vs = new Object[observers.length]; + boolean allHaveValues = true; + for (int i = 0; i < observers.length; i++) { + RxRingBuffer buffer = ((InnerSubscriber) observers[i]).items; + Object n = buffer.peek(); + + if (n == null) { + allHaveValues = false; + continue; + } - switch (on.kind(n)) { - case OnNext: - vs[i] = on.getValue(n); - break; - case OnCompleted: - observer.onCompleted(); - // we need to unsubscribe from all children since children are - // independently subscribed - childSubscription.unsubscribe(); - return; - default: - // shouldn't get here - } - } - if (allHaveValues) { - try { - // all have something so emit - observer.onNext(zipFunction.call(vs)); - } catch (Throwable e) { - observer.onError(OnErrorThrowable.addValueAsLastCause(e, vs)); - return; - } - // now remove them - for (Object obj : observers) { - InnerObserver io = (InnerObserver)obj; - io.items.poll(); - // eagerly check if the next item on this queue is an onComplete - if (on.isCompleted(io.items.peek())) { - // it is an onComplete so shut down - observer.onCompleted(); - // we need to unsubscribe from all children since children are independently subscribed + if (buffer.isCompleted(n)) { + child.onCompleted(); + // we need to unsubscribe from all children since children are + // independently subscribed childSubscription.unsubscribe(); return; + } else { + vs[i] = buffer.getValue(n); + } + } + if (allHaveValues) { + try { + // all have something so emit + child.onNext(zipFunction.call(vs)); + // we emitted so decrement the requested counter + requested.decrementAndGet(); + emitted++; + } catch (Throwable e) { + child.onError(OnErrorThrowable.addValueAsLastCause(e, vs)); + return; + } + // now remove them + for (Object obj : observers) { + RxRingBuffer buffer = ((InnerSubscriber) obj).items; + buffer.poll(); + // eagerly check if the next item on this queue is an onComplete + if (buffer.isCompleted(buffer.peek())) { + // it is an onComplete so shut down + child.onCompleted(); + // we need to unsubscribe from all children since children are independently subscribed + childSubscription.unsubscribe(); + return; + } + } + if (emitted > THRESHOLD) { + for (Object obj : observers) { + ((InnerSubscriber) obj).request(emitted); + } + emitted = 0; } } } @@ -235,27 +290,36 @@ void tick() { // used to observe each Observable we are zipping together // it collects all items in an internal queue @SuppressWarnings("rawtypes") - final class InnerObserver extends Subscriber { + final class InnerSubscriber extends Subscriber { // Concurrent* since we need to read it from across threads - final ConcurrentLinkedQueue items = new ConcurrentLinkedQueue(); + final RxRingBuffer items = RxRingBuffer.getSpmcInstance(); + + @Override + public void onStart() { + request(RxRingBuffer.SIZE); + } @SuppressWarnings("unchecked") @Override public void onCompleted() { - items.add(on.completed()); + items.onCompleted(); tick(); } @Override public void onError(Throwable e) { - // emit error and shut down - observer.onError(e); + // emit error immediately and shut down + child.onError(e); } @SuppressWarnings("unchecked") @Override public void onNext(Object t) { - items.add(on.next(t)); + try { + items.onNext(t); + } catch (MissingBackpressureException e) { + onError(e); + } tick(); } }; diff --git a/rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java b/rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java index a1c74d2636..2684e21b1b 100644 --- a/rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java +++ b/rxjava-core/src/main/java/rx/internal/util/RxRingBuffer.java @@ -276,9 +276,9 @@ public Object poll() { Object o; o = queue.poll(); /* - * benjchristensen July 10 2014 => The check for 'queue.size() == 0' came from a very rare concurrency bug where poll() + * benjchristensen July 10 2014 => The check for 'queue.isEmpty()' came from a very rare concurrency bug where poll() * is invoked, then an "onNext + onCompleted/onError" arrives before hitting the if check below. In that case, - * "o == null" and there is a terminal state, but now "queue.size() > 0" and we should NOT return the terminalState. + * "o == null" and there is a terminal state, but now "queue.isEmpty()" and we should NOT return the terminalState. * * The queue.size() check is a double-check that works to handle this, without needing to synchronize poll with on* * or needing to enqueue terminalState. @@ -286,17 +286,27 @@ public Object poll() { * This did make me consider eliminating the 'terminalState' ref and enqueuing it ... but then that requires * a +1 of the size, or -1 of how many onNext can be sent. See comment on 'terminalState' above for why it * is currently the way it is. - * - * This performs fine as long as we don't use a queue implementation where the size() impl has to scan the whole list, - * such as ConcurrentLinkedQueue. */ - if (o == null && terminalState != null && queue.size() == 0) { + if (o == null && terminalState != null && queue.isEmpty()) { o = terminalState; // once emitted we clear so a poll loop will finish terminalState = null; } return o; } + + public Object peek() { + if (queue == null) { + // we are unsubscribed and have released the undelrying queue + return null; + } + Object o; + o = queue.peek(); + if (o == null && terminalState != null && queue.isEmpty()) { + o = terminalState; + } + return o; + } public boolean isCompleted(Object o) { return on.isCompleted(o); @@ -306,6 +316,10 @@ public boolean isError(Object o) { return on.isError(o); } + public Object getValue(Object o) { + return on.getValue(o); + } + public boolean accept(Object o, Observer child) { return on.accept(child, o); } diff --git a/rxjava-core/src/test/java/rx/BackpressureTests.java b/rxjava-core/src/test/java/rx/BackpressureTests.java index 4ad1fe89d3..bf357675eb 100644 --- a/rxjava-core/src/test/java/rx/BackpressureTests.java +++ b/rxjava-core/src/test/java/rx/BackpressureTests.java @@ -24,7 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.junit.Ignore; import org.junit.Test; import rx.Observable.OnSubscribe; @@ -75,9 +74,6 @@ public Integer call(Integer i) { assertTrue(c.get() < RxRingBuffer.SIZE * 2); } - // currently the first starves the second - // is it possible to make 2 synchronous streams fairly merge without the first starving all others? - @Ignore @Test public void testMergeSync() { int NUM = (int) ((int) RxRingBuffer.SIZE * 4.1); @@ -92,12 +88,13 @@ public void testMergeSync() { System.out.println("Expected: " + NUM + " got: " + ts.getOnNextEvents().size()); System.out.println("testMergeSync => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c1.get() + " / " + c2.get()); assertEquals(NUM, ts.getOnNextEvents().size()); - assertTrue(c1.get() < RxRingBuffer.SIZE * 3); - assertTrue(c2.get() < RxRingBuffer.SIZE * 3); + // either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1) + // TODO is it possible to make this deterministic rather than one possibly starving the other? + // benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit + assertTrue(c1.get() < RxRingBuffer.SIZE * 5); + assertTrue(c2.get() < RxRingBuffer.SIZE * 5); } - // currently the first starves the second - @Ignore @Test public void testMergeAsync() { int NUM = (int) ((int) RxRingBuffer.SIZE * 4.1); @@ -113,12 +110,13 @@ public void testMergeAsync() { ts.assertNoErrors(); System.out.println("testMergeAsync => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c1.get() + " / " + c2.get()); assertEquals(NUM, ts.getOnNextEvents().size()); - assertTrue(c1.get() < RxRingBuffer.SIZE * 3); - assertTrue(c2.get() < RxRingBuffer.SIZE * 3); + // either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1) + // TODO is it possible to make this deterministic rather than one possibly starving the other? + // benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit + assertTrue(c1.get() < RxRingBuffer.SIZE * 5); + assertTrue(c2.get() < RxRingBuffer.SIZE * 5); } - // currently the first starves the second - @Ignore @Test public void testMergeAsyncThenObserveOn() { int NUM = (int) ((int) RxRingBuffer.SIZE * 4.1); @@ -134,8 +132,11 @@ public void testMergeAsyncThenObserveOn() { ts.assertNoErrors(); System.out.println("testMergeAsyncThenObserveOn => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c1.get() + " / " + c2.get()); assertEquals(NUM, ts.getOnNextEvents().size()); - assertTrue(c1.get() < RxRingBuffer.SIZE * 3); - assertTrue(c2.get() < RxRingBuffer.SIZE * 3); + // either one can starve the other, but neither should be capable of doing more than 5 batches (taking 4.1) + // TODO is it possible to make this deterministic rather than one possibly starving the other? + // benjchristensen => In general I'd say it's not worth trying to make it so, as "fair" algoritms generally take a performance hit + assertTrue(c1.get() < RxRingBuffer.SIZE * 5); + assertTrue(c2.get() < RxRingBuffer.SIZE * 5); } @Test @@ -182,8 +183,7 @@ public Observable call(Integer i) { assertTrue(c.get() <= RxRingBuffer.SIZE * 2); } - @Ignore - @Test(timeout = 2000) + @Test public void testZipSync() { int NUM = (int) ((int) RxRingBuffer.SIZE * 4.1); AtomicInteger c1 = new AtomicInteger(); @@ -206,12 +206,11 @@ public Integer call(Integer t1, Integer t2) { ts.assertNoErrors(); System.out.println("testZipSync => Received: " + ts.getOnNextEvents().size() + " Emitted: " + c1.get() + " / " + c2.get()); assertEquals(NUM, ts.getOnNextEvents().size()); - assertTrue(c1.get() < RxRingBuffer.SIZE * 3); - assertTrue(c2.get() < RxRingBuffer.SIZE * 3); + assertTrue(c1.get() < RxRingBuffer.SIZE * 5); + assertTrue(c2.get() < RxRingBuffer.SIZE * 5); } - @Ignore - @Test(timeout = 2000) + @Test public void testZipAsync() { int NUM = (int) ((int) RxRingBuffer.SIZE * 2.1); AtomicInteger c1 = new AtomicInteger(); @@ -312,7 +311,7 @@ public void testUserSubscriberUsingRequestSync() { public void onStart() { request(100); } - + @Override public void onCompleted() { @@ -358,8 +357,7 @@ public void testUserSubscriberUsingRequestAsync() throws InterruptedException { public void onStart() { request(100); } - - + @Override public void onCompleted() { latch.countDown(); diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorZipTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorZipTest.java index 6fdfec985a..01cf19ce90 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorZipTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorZipTest.java @@ -16,6 +16,7 @@ package rx.internal.operators; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.inOrder; @@ -28,6 +29,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.CountDownLatch; @@ -49,7 +51,9 @@ import rx.functions.Func3; import rx.functions.FuncN; import rx.functions.Functions; +import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; public class OperatorZipTest { @@ -103,7 +107,7 @@ public void testCollectionSizeDifferentThanFunction() { @SuppressWarnings("unchecked") /* mock calls don't do generics */ @Test - public void testZippingDifferentLengthObservableSequences1() { + public void testStartpingDifferentLengthObservableSequences1() { Observer w = mock(Observer.class); TestObservable w1 = new TestObservable(); @@ -136,7 +140,7 @@ public void testZippingDifferentLengthObservableSequences1() { } @Test - public void testZippingDifferentLengthObservableSequences2() { + public void testStartpingDifferentLengthObservableSequences2() { @SuppressWarnings("unchecked") Observer w = mock(Observer.class); @@ -436,7 +440,7 @@ public void testAggregatorEarlyCompletion() { @SuppressWarnings("unchecked") /* mock calls don't do generics */ @Test - public void testZip2Types() { + public void testStart2Types() { Func2 zipr = getConcatStringIntegerZipr(); /* define a Observer to receive aggregated events */ @@ -455,7 +459,7 @@ public void testZip2Types() { @SuppressWarnings("unchecked") /* mock calls don't do generics */ @Test - public void testZip3Types() { + public void testStart3Types() { Func3 zipr = getConcatStringIntegerIntArrayZipr(); /* define a Observer to receive aggregated events */ @@ -753,7 +757,7 @@ public void testSecondFails() { } @Test - public void testZipWithOnCompletedTwice() { + public void testStartWithOnCompletedTwice() { // issue: https://groups.google.com/forum/#!topic/rxjava/79cWTv3TFp0 // The problem is the original "zip" implementation does not wrap // an internal observer with a SafeObserver. However, in the "zip", @@ -798,7 +802,7 @@ public void onNext(Integer args) { } @Test - public void testZip() { + public void testStart() { Observable os = OBSERVABLE_OF_5_INTEGERS .zip(OBSERVABLE_OF_5_INTEGERS, new Func2() { @@ -825,11 +829,10 @@ public void call(String s) { } @Test - public void testZipAsync() throws InterruptedException { + public void testStartAsync() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); - final CountDownLatch infiniteObservables = new CountDownLatch(2); - Observable os = ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables) - .zip(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(infiniteObservables), new Func2() { + Observable os = ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(new CountDownLatch(1)).onBackpressureBuffer() + .zip(ASYNC_OBSERVABLE_OF_INFINITE_INTEGERS(new CountDownLatch(1)).onBackpressureBuffer(), new Func2() { @Override public String call(Integer a, Integer b) { @@ -837,40 +840,20 @@ public String call(Integer a, Integer b) { } }).take(5); - final ArrayList list = new ArrayList(); - os.subscribe(new Observer() { - - @Override - public void onCompleted() { - latch.countDown(); - } + TestSubscriber ts = new TestSubscriber(); + os.subscribe(ts); - @Override - public void onError(Throwable e) { - e.printStackTrace(); - latch.countDown(); - } + ts.awaitTerminalEvent(); + ts.assertNoErrors(); - @Override - public void onNext(String s) { - System.out.println(s); - list.add(s); - } - }); - - latch.await(2000, TimeUnit.MILLISECONDS); - if (!infiniteObservables.await(2000, TimeUnit.MILLISECONDS)) { - throw new RuntimeException("didn't unsubscribe"); - } - - assertEquals(5, list.size()); - assertEquals("1-1", list.get(0)); - assertEquals("2-2", list.get(1)); - assertEquals("5-5", list.get(4)); + assertEquals(5, ts.getOnNextEvents().size()); + assertEquals("1-1", ts.getOnNextEvents().get(0)); + assertEquals("2-2", ts.getOnNextEvents().get(1)); + assertEquals("5-5", ts.getOnNextEvents().get(4)); } @Test - public void testZipInfiniteAndFinite() throws InterruptedException { + public void testStartInfiniteAndFinite() throws InterruptedException { final CountDownLatch latch = new CountDownLatch(1); final CountDownLatch infiniteObservable = new CountDownLatch(1); Observable os = OBSERVABLE_OF_5_INTEGERS @@ -974,7 +957,7 @@ public void call(String s) { } @Test - public void testZipEmptyObservables() { + public void testStartEmptyObservables() { Observable o = Observable.zip(Observable. empty(), Observable. empty(), new Func2() { @@ -999,7 +982,7 @@ public void call(String s) { } @Test - public void testZipEmptyList() { + public void testStartEmptyList() { final Object invoked = new Object(); Collection> observables = Collections.emptyList(); @@ -1023,7 +1006,7 @@ public Object call(final Object... args) { * and last() expects at least a single response. */ @Test(expected = NoSuchElementException.class) - public void testZipEmptyListBlocking() { + public void testStartEmptyListBlocking() { final Object invoked = new Object(); Collection> observables = Collections.emptyList(); @@ -1039,6 +1022,154 @@ public Object call(final Object... args) { o.toBlocking().last(); } + @Test + public void testBackpressureSync() { + AtomicInteger generatedA = new AtomicInteger(); + AtomicInteger generatedB = new AtomicInteger(); + Observable o1 = createInfiniteObservable(generatedA); + Observable o2 = createInfiniteObservable(generatedB); + + TestSubscriber ts = new TestSubscriber(); + Observable.zip(o1, o2, new Func2() { + + @Override + public String call(Integer t1, Integer t2) { + return t1 + "-" + t2; + } + + }).take(RxRingBuffer.SIZE * 2).subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size()); + assertTrue(generatedA.get() < (RxRingBuffer.SIZE * 3)); + assertTrue(generatedB.get() < (RxRingBuffer.SIZE * 3)); + } + + @Test + public void testBackpressureAsync() { + AtomicInteger generatedA = new AtomicInteger(); + AtomicInteger generatedB = new AtomicInteger(); + Observable o1 = createInfiniteObservable(generatedA).subscribeOn(Schedulers.computation()); + Observable o2 = createInfiniteObservable(generatedB).subscribeOn(Schedulers.computation()); + + TestSubscriber ts = new TestSubscriber(); + Observable.zip(o1, o2, new Func2() { + + @Override + public String call(Integer t1, Integer t2) { + return t1 + "-" + t2; + } + + }).take(RxRingBuffer.SIZE * 2).subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size()); + assertTrue(generatedA.get() < (RxRingBuffer.SIZE * 3)); + assertTrue(generatedB.get() < (RxRingBuffer.SIZE * 3)); + } + + @Test + public void testDownstreamBackpressureRequestsWithFiniteSyncObservables() { + AtomicInteger generatedA = new AtomicInteger(); + AtomicInteger generatedB = new AtomicInteger(); + Observable o1 = createInfiniteObservable(generatedA).take(RxRingBuffer.SIZE * 2); + Observable o2 = createInfiniteObservable(generatedB).take(RxRingBuffer.SIZE * 2); + + TestSubscriber ts = new TestSubscriber(); + Observable.zip(o1, o2, new Func2() { + + @Override + public String call(Integer t1, Integer t2) { + return t1 + "-" + t2; + } + + }).observeOn(Schedulers.computation()).take(RxRingBuffer.SIZE * 2).subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size()); + System.out.println("Generated => A: " + generatedA.get() + " B: " + generatedB.get()); + assertTrue(generatedA.get() < (RxRingBuffer.SIZE * 3)); + assertTrue(generatedB.get() < (RxRingBuffer.SIZE * 3)); + } + + @Test + public void testDownstreamBackpressureRequestsWithInfiniteAsyncObservables() { + AtomicInteger generatedA = new AtomicInteger(); + AtomicInteger generatedB = new AtomicInteger(); + Observable o1 = createInfiniteObservable(generatedA).subscribeOn(Schedulers.computation()); + Observable o2 = createInfiniteObservable(generatedB).subscribeOn(Schedulers.computation()); + + TestSubscriber ts = new TestSubscriber(); + Observable.zip(o1, o2, new Func2() { + + @Override + public String call(Integer t1, Integer t2) { + return t1 + "-" + t2; + } + + }).observeOn(Schedulers.computation()).take(RxRingBuffer.SIZE * 2).subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size()); + System.out.println("Generated => A: " + generatedA.get() + " B: " + generatedB.get()); + assertTrue(generatedA.get() < (RxRingBuffer.SIZE * 4)); + assertTrue(generatedB.get() < (RxRingBuffer.SIZE * 4)); + } + + @Test + public void testDownstreamBackpressureRequestsWithInfiniteSyncObservables() { + AtomicInteger generatedA = new AtomicInteger(); + AtomicInteger generatedB = new AtomicInteger(); + Observable o1 = createInfiniteObservable(generatedA); + Observable o2 = createInfiniteObservable(generatedB); + + TestSubscriber ts = new TestSubscriber(); + Observable.zip(o1, o2, new Func2() { + + @Override + public String call(Integer t1, Integer t2) { + return t1 + "-" + t2; + } + + }).observeOn(Schedulers.computation()).take(RxRingBuffer.SIZE * 2).subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + assertEquals(RxRingBuffer.SIZE * 2, ts.getOnNextEvents().size()); + System.out.println("Generated => A: " + generatedA.get() + " B: " + generatedB.get()); + assertTrue(generatedA.get() < (RxRingBuffer.SIZE * 4)); + assertTrue(generatedB.get() < (RxRingBuffer.SIZE * 4)); + } + + private Observable createInfiniteObservable(final AtomicInteger generated) { + Observable observable = Observable.from(new Iterable() { + @Override + public Iterator iterator() { + return new Iterator() { + + @Override + public void remove() { + } + + @Override + public Integer next() { + return generated.getAndIncrement(); + } + + @Override + public boolean hasNext() { + return true; + } + }; + } + }); + return observable; + } + Observable OBSERVABLE_OF_5_INTEGERS = OBSERVABLE_OF_5_INTEGERS(new AtomicInteger()); Observable OBSERVABLE_OF_5_INTEGERS(final AtomicInteger numEmitted) { From bf5f1cb7a4106516c97bb7351dc80cea77f51eee Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 17 Jul 2014 11:19:09 -0700 Subject: [PATCH 2/3] OperatorAll & Backpressure Similar to filter, it needs to request(1) each time it filters an onNext. --- .../src/main/java/rx/internal/operators/OperatorAll.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorAll.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorAll.java index ea260d3d1e..0e8382c49b 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorAll.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorAll.java @@ -45,6 +45,9 @@ public void onNext(T t) { child.onNext(false); child.onCompleted(); unsubscribe(); + } else { + // if we drop values we must replace them upstream as downstream won't receive and request more + request(1); } } From b068f5d8b657cd8d75c1ea10e963d90be0b8624b Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 17 Jul 2014 12:43:29 -0700 Subject: [PATCH 3/3] Debugging Test --- .../operators/OnSubscribeCacheTest.java | 54 +++++++------------ 1 file changed, 20 insertions(+), 34 deletions(-) diff --git a/rxjava-core/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java b/rxjava-core/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java index fdf3202cc4..e6a053ee91 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java @@ -34,6 +34,7 @@ import rx.functions.Func2; import rx.internal.operators.OnSubscribeCache; import rx.observers.TestObserver; +import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; import rx.subjects.AsyncSubject; import rx.subjects.BehaviorSubject; @@ -93,7 +94,8 @@ public void call(String v) { } assertEquals(1, counter.get()); } - void testWithCustomSubjectAndRepeat(Subject subject, Integer... expected) { + + private void testWithCustomSubjectAndRepeat(Subject subject, Integer... expected) { Observable source0 = Observable.from(1, 2, 3) .subscribeOn(Schedulers.io()) .flatMap(new Func1>() { @@ -107,9 +109,9 @@ public Integer call(Long t1) { }); } }); - + Observable source1 = Observable.create(new OnSubscribeCache(source0, subject)); - + Observable source2 = source1 .repeat(4) .zip(Observable.timer(0, 10, TimeUnit.MILLISECONDS, Schedulers.newThread()), new Func2() { @@ -117,52 +119,36 @@ public Integer call(Long t1) { public Integer call(Integer t1, Long t2) { return t1; } - - }); - final CountDownLatch cdl = new CountDownLatch(1); - TestObserver test = new TestObserver(new Observer() { - @Override - public void onNext(Integer t) { - } - @Override - public void onError(Throwable e) { - cdl.countDown(); - } - - @Override - public void onCompleted() { - cdl.countDown(); - } - }); - source2.subscribe(test); + }); + TestSubscriber ts = new TestSubscriber(); + source2.subscribe(ts); - try { - cdl.await(20, TimeUnit.SECONDS); - } catch (InterruptedException ex) { - fail("Interrupted"); - } - - test.assertReceivedOnNext(Arrays.asList(expected)); - test.assertTerminalEvent(); - assertTrue(test.getOnErrorEvents().isEmpty()); + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + System.out.println(ts.getOnNextEvents()); + ts.assertReceivedOnNext(Arrays.asList(expected)); } + @Test(timeout = 10000) public void testWithAsyncSubjectAndRepeat() { - testWithCustomSubjectAndRepeat(AsyncSubject.create(), 3, 3, 3, 3); + testWithCustomSubjectAndRepeat(AsyncSubject. create(), 3, 3, 3, 3); } + @Test(timeout = 10000) public void testWithBehaviorSubjectAndRepeat() { // BehaviorSubject just completes when repeated testWithCustomSubjectAndRepeat(BehaviorSubject.create(0), 0, 1, 2, 3); } + @Test(timeout = 10000) public void testWithPublishSubjectAndRepeat() { // PublishSubject just completes when repeated - testWithCustomSubjectAndRepeat(PublishSubject.create(), 1, 2, 3); + testWithCustomSubjectAndRepeat(PublishSubject. create(), 1, 2, 3); } - @Test(timeout = 10000) + + @Test public void testWithReplaySubjectAndRepeat() { - testWithCustomSubjectAndRepeat(ReplaySubject.create(), 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3); + testWithCustomSubjectAndRepeat(ReplaySubject. create(), 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3); } }