diff --git a/src/main/java/rx/internal/operators/OnSubscribeRange.java b/src/main/java/rx/internal/operators/OnSubscribeRange.java index bcfbe0736b..383d17f28f 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeRange.java +++ b/src/main/java/rx/internal/operators/OnSubscribeRange.java @@ -15,11 +15,10 @@ */ package rx.internal.operators; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; +import rx.*; import rx.Observable.OnSubscribe; -import rx.Producer; -import rx.Subscriber; /** * Emit ints from start to end inclusive. @@ -39,13 +38,13 @@ public void call(final Subscriber o) { o.setProducer(new RangeProducer(o, start, end)); } - private static final class RangeProducer implements Producer { + private static final class RangeProducer extends AtomicLong implements Producer { + /** */ + private static final long serialVersionUID = 4114392207069098388L; + private final Subscriber o; - // accessed by REQUESTED_UPDATER - private volatile long requested; - private static final AtomicLongFieldUpdater REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(RangeProducer.class, "requested"); - private long index; private final int end; + private long index; private RangeProducer(Subscriber o, int start, int end) { this.o = o; @@ -55,54 +54,79 @@ private RangeProducer(Subscriber o, int start, int end) { @Override public void request(long n) { - if (requested == Long.MAX_VALUE) { + if (get() == Long.MAX_VALUE) { // already started with fast-path return; } - if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) { + if (n == Long.MAX_VALUE && compareAndSet(0L, Long.MAX_VALUE)) { // fast-path without backpressure - for (long i = index; i <= end; i++) { + fastpath(); + } else if (n > 0L) { + long c = BackpressureUtils.getAndAddRequest(this, n); + if (c == 0L) { + // backpressure is requested + slowpath(n); + } + } + } + + /** + * + */ + void slowpath(long r) { + long idx = index; + while (true) { + /* + * This complicated logic is done to avoid touching the volatile `index` and `requested` values + * during the loop itself. If they are touched during the loop the performance is impacted significantly. + */ + long fs = end - idx + 1; + long e = Math.min(fs, r); + final boolean complete = fs <= r; + + fs = e + idx; + final Subscriber o = this.o; + + for (long i = idx; i != fs; i++) { if (o.isUnsubscribed()) { return; } o.onNext((int) i); } - if (!o.isUnsubscribed()) { + + if (complete) { + if (o.isUnsubscribed()) { + return; + } o.onCompleted(); + return; } - } else if (n > 0) { - // backpressure is requested - long _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER,this, n); - if (_c == 0) { - while (true) { - /* - * This complicated logic is done to avoid touching the volatile `index` and `requested` values - * during the loop itself. If they are touched during the loop the performance is impacted significantly. - */ - long r = requested; - long idx = index; - long numLeft = end - idx + 1; - long e = Math.min(numLeft, r); - boolean completeOnFinish = numLeft <= r; - long stopAt = e + idx; - for (long i = idx; i < stopAt; i++) { - if (o.isUnsubscribed()) { - return; - } - o.onNext((int) i); - } - index = stopAt; - - if (completeOnFinish) { - o.onCompleted(); - return; - } - if (REQUESTED_UPDATER.addAndGet(this, -e) == 0) { - // we're done emitting the number requested so return - return; - } - } + + idx = fs; + index = fs; + + r = addAndGet(-e); + if (r == 0L) { + // we're done emitting the number requested so return + return; + } + } + } + + /** + * + */ + void fastpath() { + final long end = this.end + 1L; + final Subscriber o = this.o; + for (long i = index; i != end; i++) { + if (o.isUnsubscribed()) { + return; } + o.onNext((int) i); + } + if (!o.isUnsubscribed()) { + o.onCompleted(); } } } diff --git a/src/perf/java/rx/operators/OperatorRangePerf.java b/src/perf/java/rx/operators/OperatorRangePerf.java index 85ca76e46d..52fd0af7ff 100644 --- a/src/perf/java/rx/operators/OperatorRangePerf.java +++ b/src/perf/java/rx/operators/OperatorRangePerf.java @@ -17,18 +17,11 @@ import java.util.concurrent.TimeUnit; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Mode; -import org.openjdk.jmh.annotations.OutputTimeUnit; -import org.openjdk.jmh.annotations.Param; -import org.openjdk.jmh.annotations.Scope; -import org.openjdk.jmh.annotations.Setup; -import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.infra.Blackhole; -import rx.Observable; -import rx.Subscriber; +import rx.*; +import rx.internal.operators.OnSubscribeRange; @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.SECONDS) @@ -50,7 +43,7 @@ public static class InputUsingRequest { @Setup public void setup(final Blackhole bh) { - observable = Observable.range(0, size); + observable = Observable.create(new OnSubscribeRange(0, size)); this.bh = bh; } @@ -98,7 +91,7 @@ public static class InputWithoutRequest { @Setup public void setup(final Blackhole bh) { - observable = Observable.range(0, size); + observable = Observable.create(new OnSubscribeRange(0, size)); this.bh = bh; } diff --git a/src/test/java/rx/internal/operators/OnSubscribeRangeTest.java b/src/test/java/rx/internal/operators/OnSubscribeRangeTest.java index 6d4d97d019..acc2f6ff75 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeRangeTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeRangeTest.java @@ -249,4 +249,23 @@ public void onNext(Integer t) { }}); assertTrue(completed.get()); } + + @Test(timeout = 1000) + public void testNearMaxValueWithoutBackpressure() { + TestSubscriber ts = TestSubscriber.create(); + Observable.range(Integer.MAX_VALUE - 1, 2).subscribe(ts); + + ts.assertCompleted(); + ts.assertNoErrors(); + ts.assertValues(Integer.MAX_VALUE - 1, Integer.MAX_VALUE); + } + @Test(timeout = 1000) + public void testNearMaxValueWithBackpressure() { + TestSubscriber ts = TestSubscriber.create(3); + Observable.range(Integer.MAX_VALUE - 1, 2).subscribe(ts); + + ts.assertCompleted(); + ts.assertNoErrors(); + ts.assertValues(Integer.MAX_VALUE - 1, Integer.MAX_VALUE); + } }