From 9fa9cea2a42a5ad3d779250846b6048f1b01d3bf Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Mon, 8 Jun 2015 16:22:27 +1000 Subject: [PATCH] ensure iterator.hasNext is not called unnecessarily as per #3006 --- .../operators/OnSubscribeFromIterable.java | 44 +++++---- .../OnSubscribeFromIterableTest.java | 93 +++++++++++++++++++ 2 files changed, 120 insertions(+), 17 deletions(-) diff --git a/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java b/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java index 766b624416..2aad771b57 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java +++ b/src/main/java/rx/internal/operators/OnSubscribeFromIterable.java @@ -71,14 +71,19 @@ public void request(long n) { } if (n == Long.MAX_VALUE && REQUESTED_UPDATER.compareAndSet(this, 0, Long.MAX_VALUE)) { // fast-path without backpressure - while (it.hasNext()) { + + while (true) { if (o.isUnsubscribed()) { return; + } else if (it.hasNext()) { + o.onNext(it.next()); + } else if (!o.isUnsubscribed()) { + o.onCompleted(); + return; + } else { + // is unsubscribed + return; } - o.onNext(it.next()); - } - if (!o.isUnsubscribed()) { - o.onCompleted(); } } else if (n > 0) { // backpressure is requested @@ -86,27 +91,32 @@ public void request(long n) { if (_c == 0) { while (true) { /* - * This complicated logic is done to avoid touching the volatile `requested` value - * during the loop itself. If it is touched during the loop the performance is impacted significantly. + * This complicated logic is done to avoid touching the + * volatile `requested` value during the loop itself. If + * it is touched during the loop the performance is + * impacted significantly. */ long r = requested; long numToEmit = r; - while (it.hasNext() && --numToEmit >= 0) { + while (true) { if (o.isUnsubscribed()) { return; - } - o.onNext(it.next()); - - } - - if (!it.hasNext()) { - if (!o.isUnsubscribed()) { + } else if (it.hasNext()) { + if (--numToEmit >= 0) { + o.onNext(it.next()); + } else + break; + } else if (!o.isUnsubscribed()) { o.onCompleted(); + return; + } else { + // is unsubscribed + return; } - return; } if (REQUESTED_UPDATER.addAndGet(this, -r) == 0) { - // we're done emitting the number requested so return + // we're done emitting the number requested so + // return return; } diff --git a/src/test/java/rx/internal/operators/OnSubscribeFromIterableTest.java b/src/test/java/rx/internal/operators/OnSubscribeFromIterableTest.java index 91bf65bf4d..a75e733951 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeFromIterableTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeFromIterableTest.java @@ -15,6 +15,7 @@ */ package rx.internal.operators; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; @@ -29,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -221,4 +223,95 @@ public void onNext(Object t) { assertTrue(completed.get()); } + @Test + public void testDoesNotCallIteratorHasNextMoreThanRequiredWithBackpressure() { + final AtomicBoolean called = new AtomicBoolean(false); + Iterable iterable = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + + int count = 1; + + @Override + public void remove() { + // ignore + } + + @Override + public boolean hasNext() { + if (count > 1) { + called.set(true); + return false; + } else + return true; + } + + @Override + public Integer next() { + return count++; + } + + }; + } + }; + Observable.from(iterable).take(1).subscribe(); + assertFalse(called.get()); + } + + @Test + public void testDoesNotCallIteratorHasNextMoreThanRequiredFastPath() { + final AtomicBoolean called = new AtomicBoolean(false); + Iterable iterable = new Iterable() { + + @Override + public Iterator iterator() { + return new Iterator() { + + @Override + public void remove() { + // ignore + } + + int count = 1; + + @Override + public boolean hasNext() { + if (count > 1) { + called.set(true); + return false; + } else + return true; + } + + @Override + public Integer next() { + return count++; + } + + }; + } + }; + Observable.from(iterable).subscribe(new Subscriber() { + + @Override + public void onCompleted() { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onNext(Integer t) { + // unsubscribe on first emission + unsubscribe(); + } + }); + assertFalse(called.get()); + } + }