From 212af0a0f2fbcaaac792de4c4041eff35127437f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 27 Jul 2014 00:26:30 +0800 Subject: [PATCH 1/3] Fix issue #1522 --- .../internal/operators/OperatorTakeLast.java | 37 ++++-- .../operators/OperatorTakeLastTest.java | 116 ++++++++++++++++++ 2 files changed, 141 insertions(+), 12 deletions(-) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java index 9b1ac935eb..dc2fae4490 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java @@ -108,11 +108,21 @@ void startEmitting() { @Override public void request(long n) { - long _c = 0; + long _c; if (n == Long.MAX_VALUE) { - requested = Long.MAX_VALUE; + _c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE); } else { - _c = REQUESTED_UPDATER.getAndAdd(this, n); + for (;;) { + _c = requested; + if (_c == Long.MAX_VALUE) { + // If `requested` is Long.MAX_VALUE, `c+n` will be overflow. + // Therefore, always check before setting to `c+n` + return; + } + if (REQUESTED_UPDATER.compareAndSet(this, _c, _c + n)) { + break; + } + } } if (!emittingStarted) { // we haven't started yet, so record what was requested and return @@ -122,16 +132,20 @@ public void request(long n) { } void emit(long previousRequested) { - if (requested < 0) { + if (requested == Long.MAX_VALUE) { // fast-path without backpressure - try { - for (Object value : deque) { - notification.accept(subscriber, value); + if (previousRequested == 0) { + try { + for (Object value : deque) { + notification.accept(subscriber, value); + } + } catch (Throwable e) { + subscriber.onError(e); + } finally { + deque.clear(); } - } catch (Throwable e) { - subscriber.onError(e); - } finally { - deque.clear(); + } else { + // backpressure path will handle Long.MAX_VALUE and emit the rest events. } } else { // backpressure is requested @@ -160,7 +174,6 @@ void emit(long previousRequested) { // we're done emitting the number requested so return return; } - } } } diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorTakeLastTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorTakeLastTest.java index 1db81cf9fd..13b626bf6e 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorTakeLastTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorTakeLastTest.java @@ -30,7 +30,9 @@ import rx.Observable; import rx.Observer; +import rx.Subscriber; import rx.functions.Func1; +import rx.functions.Functions; import rx.internal.util.RxRingBuffer; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -148,4 +150,118 @@ public Integer call(Integer i) { }; } + @Test + public void testIssue1522() { + // https://github.com/Netflix/RxJava/issues/1522 + assertEquals(0, Observable + .empty() + .count() + .filter(Functions.alwaysFalse()) + .toList() + .toBlocking().single().size()); + } + + @Test + public void testIgnoreRequest1() { + // If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown. + Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber() { + + @Override + public void onStart() { + request(Long.MAX_VALUE); + } + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Integer integer) { + request(Long.MAX_VALUE); + } + }); + } + + @Test + public void testIgnoreRequest2() { + // If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown. + Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber() { + + @Override + public void onStart() { + request(1); + } + + @Override + public void onCompleted() { + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Integer integer) { + request(1); + } + }); + } + + @Test(timeout = 30000) + public void testIgnoreRequest3() { + // If `takeLast` does not ignore `request` properly, it will enter an infinite loop. + Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber() { + + @Override + public void onStart() { + request(1); + } + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Integer integer) { + request(Long.MAX_VALUE); + } + }); + } + + + @Test + public void testIgnoreRequest4() { + // If `takeLast` does not ignore `request` properly, StackOverflowError will be thrown. + Observable.range(0, 100000).takeLast(100000).subscribe(new Subscriber() { + + @Override + public void onStart() { + request(Long.MAX_VALUE); + } + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + } + + @Override + public void onNext(Integer integer) { + request(1); + } + }); + } } From a2a7a29cd37bda4c2195129369f2d40ccfcdebd0 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 29 Jul 2014 19:00:46 +0800 Subject: [PATCH 2/3] Optimize the fix for takeLast and handle the race condition --- .../internal/operators/OperatorTakeLast.java | 34 +++++++++++-------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java index dc2fae4490..b5fce7216f 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorTakeLast.java @@ -108,21 +108,14 @@ void startEmitting() { @Override public void request(long n) { + if (requested == Long.MAX_VALUE) { + return; + } long _c; if (n == Long.MAX_VALUE) { _c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE); } else { - for (;;) { - _c = requested; - if (_c == Long.MAX_VALUE) { - // If `requested` is Long.MAX_VALUE, `c+n` will be overflow. - // Therefore, always check before setting to `c+n` - return; - } - if (REQUESTED_UPDATER.compareAndSet(this, _c, _c + n)) { - break; - } - } + _c = REQUESTED_UPDATER.getAndAdd(this, n); } if (!emittingStarted) { // we haven't started yet, so record what was requested and return @@ -169,10 +162,21 @@ void emit(long previousRequested) { emitted++; } } - - if (REQUESTED_UPDATER.addAndGet(this, -emitted) == 0) { - // we're done emitting the number requested so return - return; + for (;;) { + long oldRequested = requested; + long newRequested = oldRequested - emitted; + if (oldRequested == Long.MAX_VALUE) { + // became unbounded during the loop + // continue the outer loop to emit the rest events. + break; + } + if (REQUESTED_UPDATER.compareAndSet(this, oldRequested, newRequested)) { + if (newRequested == 0) { + // we're done emitting the number requested so return + return; + } + break; + } } } } From c1ec1f4a4ee2dad56ecd0fee959e54d72c5c9734 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 29 Jul 2014 21:18:21 +0800 Subject: [PATCH 3/3] Make "single" support backpressure Signed-off-by: zsxwing --- .../rx/internal/operators/OperatorSingle.java | 3 + .../operators/OperatorSingleTest.java | 55 ++++++++++++++++++- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/rxjava-core/src/main/java/rx/internal/operators/OperatorSingle.java b/rxjava-core/src/main/java/rx/internal/operators/OperatorSingle.java index 329314c34b..fd3fd54b37 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/OperatorSingle.java +++ b/rxjava-core/src/main/java/rx/internal/operators/OperatorSingle.java @@ -60,6 +60,9 @@ public void onNext(T value) { } else { this.value = value; isNonEmpty = true; + // Issue: https://github.com/Netflix/RxJava/pull/1527 + // Because we cache a value and don't emit now, we need to request another one. + request(1); } } diff --git a/rxjava-core/src/test/java/rx/internal/operators/OperatorSingleTest.java b/rxjava-core/src/test/java/rx/internal/operators/OperatorSingleTest.java index a900e4c706..6910390a51 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/OperatorSingleTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/OperatorSingleTest.java @@ -15,10 +15,9 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; +import static org.mockito.Mockito.*; import java.util.NoSuchElementException; @@ -27,7 +26,9 @@ import rx.Observable; import rx.Observer; +import rx.Subscriber; import rx.functions.Func1; +import rx.functions.Func2; public class OperatorSingleTest { @@ -241,4 +242,52 @@ public Boolean call(Integer t1) { inOrder.verify(observer, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } + + @Test + public void testSingleWithBackpressure() { + Observable observable = Observable.from(1, 2).single(); + + Subscriber subscriber = spy(new Subscriber() { + + @Override + public void onStart() { + request(1); + } + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Integer integer) { + request(1); + } + }); + observable.subscribe(subscriber); + + InOrder inOrder = inOrder(subscriber); + inOrder.verify(subscriber, times(1)).onError(isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test(timeout = 30000) + public void testIssue1527() throws InterruptedException { + //https://github.com/Netflix/RxJava/pull/1527 + Observable source = Observable.from(1, 2, 3, 4, 5, 6); + Observable reduced = source.reduce(new Func2() { + @Override + public Integer call(Integer i1, Integer i2) { + return i1 + i2; + } + }); + + Integer r = reduced.toBlocking().first(); + assertEquals(21, r.intValue()); + } }