From a2556443f808f3a517eae7bc4ea32db92a863a3b Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 25 Nov 2014 12:11:15 +0100 Subject: [PATCH 1/2] Fixed redo & groupBy backpressure management --- .../internal/operators/OnSubscribeRedo.java | 25 +++-- .../internal/operators/OperatorGroupBy.java | 2 +- .../internal/operators/OperatorRetryTest.java | 92 ++++++++++++++++++- 3 files changed, 103 insertions(+), 16 deletions(-) diff --git a/src/main/java/rx/internal/operators/OnSubscribeRedo.java b/src/main/java/rx/internal/operators/OnSubscribeRedo.java index 3a795aafd0..3e7a2c4a29 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRedo.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRedo.java @@ -190,7 +190,7 @@ private OnSubscribeRedo(Observable source, Func1 child) { final AtomicBoolean isLocked = new AtomicBoolean(true); - final AtomicBoolean isStarted = new AtomicBoolean(false); + final AtomicBoolean resumeBoundary = new AtomicBoolean(true); // incremented when requests are made, decremented when requests are fulfilled final AtomicLong consumerCapacity = new AtomicLong(0l); final AtomicReference currentProducer = new AtomicReference(); @@ -300,6 +300,8 @@ public void onNext(Object t) { if (!isLocked.get() && !child.isUnsubscribed()) { if (consumerCapacity.get() > 0) { worker.schedule(subscribeToSource); + } else { + resumeBoundary.compareAndSet(false, true); } } } @@ -315,22 +317,17 @@ public void setProducer(Producer producer) { child.setProducer(new Producer() { @Override - public void request(long n) { - if (isStarted.compareAndSet(false, true)) { - consumerCapacity.set(n); + public void request(final long n) { + long c = consumerCapacity.getAndAdd(n); + Producer producer = currentProducer.get(); + if (producer != null) { + producer.request(c + n); + } else + if (c == 0 && resumeBoundary.compareAndSet(true, false)) { worker.schedule(subscribeToSource); - } else { - if (consumerCapacity.getAndAdd(n) == 0) { - // restart - worker.schedule(subscribeToSource); - } else { - if (currentProducer.get() != null) { - currentProducer.get().request(n); - } - } } } }); - + } } diff --git a/src/main/java/rx/internal/operators/OperatorGroupBy.java b/src/main/java/rx/internal/operators/OperatorGroupBy.java index 12231721b4..f934856164 100644 --- a/src/main/java/rx/internal/operators/OperatorGroupBy.java +++ b/src/main/java/rx/internal/operators/OperatorGroupBy.java @@ -306,7 +306,7 @@ private void pollQueue(GroupState groupState) { } private void requestMoreIfNecessary() { - if (REQUESTED.get(this) == 0) { + if (REQUESTED.get(this) == 0 && terminated == 0) { long toRequest = MAX_QUEUE_SIZE - BUFFERED_COUNT.get(this); if (toRequest > 0 && REQUESTED.compareAndSet(this, 0, toRequest)) { request(toRequest); diff --git a/src/test/java/rx/internal/operators/OperatorRetryTest.java b/src/test/java/rx/internal/operators/OperatorRetryTest.java index 01ad91c49e..6b7361c0d3 100644 --- a/src/test/java/rx/internal/operators/OperatorRetryTest.java +++ b/src/test/java/rx/internal/operators/OperatorRetryTest.java @@ -38,6 +38,7 @@ import rx.functions.Func1; import rx.functions.Func2; import rx.internal.util.RxRingBuffer; +import rx.observables.GroupedObservable; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; import rx.subjects.PublishSubject; @@ -405,10 +406,14 @@ public static class FuncWithErrors implements Observable.OnSubscribe { public void call(Subscriber o) { o.onNext("beginningEveryTime"); if (count.getAndIncrement() < numFailures) { + System.out.println("FuncWithErrors @ " + count.get()); o.onError(new RuntimeException("forced failure: " + count.get())); } else { + System.out.println("FuncWithErrors @ onSuccessOnly"); o.onNext("onSuccessOnly"); + System.out.println("FuncWithErrors @ onCompleted"); o.onCompleted(); + System.out.println("FuncWithErrors !"); } } } @@ -663,7 +668,7 @@ public void testTimeoutWithRetry() { assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get()); } - @Test + @Test(timeout = 3000) public void testRetryWithBackpressure() { @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); @@ -684,5 +689,90 @@ public void testRetryWithBackpressure() { inOrder.verify(observer, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } + @Test(timeout = 3000) + public void testIssue1900() throws InterruptedException { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + final int NUM_MSG = 1034; + final AtomicInteger count = new AtomicInteger(); + + Observable origin = Observable.range(0, NUM_MSG) + .map(new Func1() { + @Override + public String call(Integer t1) { + return "msg: " + count.incrementAndGet(); + } + }); + + origin.retry() + .groupBy(new Func1() { + @Override + public String call(String t1) { + return t1; + } + }) + .flatMap(new Func1, Observable>() { + @Override + public Observable call(GroupedObservable t1) { + return t1.take(1); + } + }) + .unsafeSubscribe(new TestSubscriber(observer)); + + InOrder inOrder = inOrder(observer); + // should show 3 attempts + inOrder.verify(observer, times(NUM_MSG)).onNext(any(java.lang.String.class)); + // // should have no errors + inOrder.verify(observer, never()).onError(any(Throwable.class)); + // should have a single success + //inOrder.verify(observer, times(1)).onNext("onSuccessOnly"); + // should have a single successful onCompleted + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + @Test/*(timeout = 3000)*/ + public void testIssue1900SourceNotSupportingBackpressure() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + final int NUM_MSG = 1034; + final AtomicInteger count = new AtomicInteger(); + + Observable origin = Observable.create(new Observable.OnSubscribe() { + + @Override + public void call(Subscriber o) { + for(int i=0; i() { + @Override + public String call(String t1) { + return t1; + } + }) + .flatMap(new Func1, Observable>() { + @Override + public Observable call(GroupedObservable t1) { + return t1.take(1); + } + }) + .unsafeSubscribe(new TestSubscriber(observer)); + + InOrder inOrder = inOrder(observer); + // should show 3 attempts + inOrder.verify(observer, times(NUM_MSG)).onNext(any(java.lang.String.class)); + // // should have no errors + inOrder.verify(observer, never()).onError(any(Throwable.class)); + // should have a single success + //inOrder.verify(observer, times(1)).onNext("onSuccessOnly"); + // should have a single successful onCompleted + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } } From 60656e53c955e70bacd773f540c151b4aa36c188 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 25 Nov 2014 12:13:22 +0100 Subject: [PATCH 2/2] Restore timeout on the test. --- src/test/java/rx/internal/operators/OperatorRetryTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/rx/internal/operators/OperatorRetryTest.java b/src/test/java/rx/internal/operators/OperatorRetryTest.java index 6b7361c0d3..747d022c4f 100644 --- a/src/test/java/rx/internal/operators/OperatorRetryTest.java +++ b/src/test/java/rx/internal/operators/OperatorRetryTest.java @@ -730,7 +730,7 @@ public Observable call(GroupedObservable t1) { inOrder.verify(observer, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } - @Test/*(timeout = 3000)*/ + @Test(timeout = 3000) public void testIssue1900SourceNotSupportingBackpressure() { @SuppressWarnings("unchecked") Observer observer = mock(Observer.class);