Skip to content

Commit

Permalink
Merge pull request #1901 from akarnokd/RedoRequestFix
Browse files Browse the repository at this point in the history
Fixed redo & groupBy backpressure management
  • Loading branch information
benjchristensen committed Nov 29, 2014
2 parents 053e506 + 60656e5 commit 37314b0
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 16 deletions.
25 changes: 11 additions & 14 deletions src/main/java/rx/internal/operators/OnSubscribeRedo.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ private OnSubscribeRedo(Observable<T> source, Func1<? super Observable<? extends
@Override
public void call(final Subscriber<? super T> child) {
final AtomicBoolean isLocked = new AtomicBoolean(true);
final AtomicBoolean isStarted = new AtomicBoolean(false);
final AtomicBoolean resumeBoundary = new AtomicBoolean(true);
// incremented when requests are made, decremented when requests are fulfilled
final AtomicLong consumerCapacity = new AtomicLong(0l);
final AtomicReference<Producer> currentProducer = new AtomicReference<Producer>();
Expand Down Expand Up @@ -300,6 +300,8 @@ public void onNext(Object t) {
if (!isLocked.get() && !child.isUnsubscribed()) {
if (consumerCapacity.get() > 0) {
worker.schedule(subscribeToSource);
} else {
resumeBoundary.compareAndSet(false, true);
}
}
}
Expand All @@ -315,22 +317,17 @@ public void setProducer(Producer producer) {
child.setProducer(new Producer() {

@Override
public void request(long n) {
if (isStarted.compareAndSet(false, true)) {
consumerCapacity.set(n);
public void request(final long n) {
long c = consumerCapacity.getAndAdd(n);
Producer producer = currentProducer.get();
if (producer != null) {
producer.request(c + n);
} else
if (c == 0 && resumeBoundary.compareAndSet(true, false)) {
worker.schedule(subscribeToSource);
} else {
if (consumerCapacity.getAndAdd(n) == 0) {
// restart
worker.schedule(subscribeToSource);
} else {
if (currentProducer.get() != null) {
currentProducer.get().request(n);
}
}
}
}
});

}
}
2 changes: 1 addition & 1 deletion src/main/java/rx/internal/operators/OperatorGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ private void pollQueue(GroupState<K, T> groupState) {
}

private void requestMoreIfNecessary() {
if (REQUESTED.get(this) == 0) {
if (REQUESTED.get(this) == 0 && terminated == 0) {
long toRequest = MAX_QUEUE_SIZE - BUFFERED_COUNT.get(this);
if (toRequest > 0 && REQUESTED.compareAndSet(this, 0, toRequest)) {
request(toRequest);
Expand Down
92 changes: 91 additions & 1 deletion src/test/java/rx/internal/operators/OperatorRetryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import rx.functions.Func1;
import rx.functions.Func2;
import rx.internal.util.RxRingBuffer;
import rx.observables.GroupedObservable;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
Expand Down Expand Up @@ -405,10 +406,14 @@ public static class FuncWithErrors implements Observable.OnSubscribe<String> {
public void call(Subscriber<? super String> o) {
o.onNext("beginningEveryTime");
if (count.getAndIncrement() < numFailures) {
System.out.println("FuncWithErrors @ " + count.get());
o.onError(new RuntimeException("forced failure: " + count.get()));
} else {
System.out.println("FuncWithErrors @ onSuccessOnly");
o.onNext("onSuccessOnly");
System.out.println("FuncWithErrors @ onCompleted");
o.onCompleted();
System.out.println("FuncWithErrors !");
}
}
}
Expand Down Expand Up @@ -663,7 +668,7 @@ public void testTimeoutWithRetry() {
assertEquals("Start 6 threads, retry 5 then fail on 6", 6, so.efforts.get());
}

@Test
@Test(timeout = 3000)
public void testRetryWithBackpressure() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand All @@ -684,5 +689,90 @@ public void testRetryWithBackpressure() {
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}
@Test(timeout = 3000)
public void testIssue1900() throws InterruptedException {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
final int NUM_MSG = 1034;
final AtomicInteger count = new AtomicInteger();

Observable<String> origin = Observable.range(0, NUM_MSG)
.map(new Func1<Integer, String>() {
@Override
public String call(Integer t1) {
return "msg: " + count.incrementAndGet();
}
});

origin.retry()
.groupBy(new Func1<String, String>() {
@Override
public String call(String t1) {
return t1;
}
})
.flatMap(new Func1<GroupedObservable<String,String>, Observable<String>>() {
@Override
public Observable<String> call(GroupedObservable<String, String> t1) {
return t1.take(1);
}
})
.unsafeSubscribe(new TestSubscriber<String>(observer));

InOrder inOrder = inOrder(observer);
// should show 3 attempts
inOrder.verify(observer, times(NUM_MSG)).onNext(any(java.lang.String.class));
// // should have no errors
inOrder.verify(observer, never()).onError(any(Throwable.class));
// should have a single success
//inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
// should have a single successful onCompleted
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}
@Test(timeout = 3000)
public void testIssue1900SourceNotSupportingBackpressure() {
@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
final int NUM_MSG = 1034;
final AtomicInteger count = new AtomicInteger();

Observable<String> origin = Observable.create(new Observable.OnSubscribe<String>() {

@Override
public void call(Subscriber<? super String> o) {
for(int i=0; i<NUM_MSG; i++) {
o.onNext("msg:" + count.incrementAndGet());
}
o.onCompleted();
}
});

origin.retry()
.groupBy(new Func1<String, String>() {
@Override
public String call(String t1) {
return t1;
}
})
.flatMap(new Func1<GroupedObservable<String,String>, Observable<String>>() {
@Override
public Observable<String> call(GroupedObservable<String, String> t1) {
return t1.take(1);
}
})
.unsafeSubscribe(new TestSubscriber<String>(observer));

InOrder inOrder = inOrder(observer);
// should show 3 attempts
inOrder.verify(observer, times(NUM_MSG)).onNext(any(java.lang.String.class));
// // should have no errors
inOrder.verify(observer, never()).onError(any(Throwable.class));
// should have a single success
//inOrder.verify(observer, times(1)).onNext("onSuccessOnly");
// should have a single successful onCompleted
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

}

0 comments on commit 37314b0

Please sign in to comment.