diff --git a/rxjava-core/src/main/java/rx/internal/operators/BlockingOperatorNext.java b/rxjava-core/src/main/java/rx/internal/operators/BlockingOperatorNext.java index 40c03efcab..7f58d18534 100644 --- a/rxjava-core/src/main/java/rx/internal/operators/BlockingOperatorNext.java +++ b/rxjava-core/src/main/java/rx/internal/operators/BlockingOperatorNext.java @@ -19,6 +19,7 @@ import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Notification; @@ -46,11 +47,7 @@ public static Iterable next(final Observable items) { @Override public Iterator iterator() { NextObserver nextObserver = new NextObserver(); - final NextIterator nextIterator = new NextIterator(nextObserver); - - items.materialize().subscribe(nextObserver); - - return nextIterator; + return new NextIterator(items, nextObserver); } }; @@ -59,28 +56,19 @@ public Iterator iterator() { // test needs to access the observer.waiting flag non-blockingly. /* private */static final class NextIterator implements Iterator { - private final NextObserver observer; + private final NextObserver observer; + private final Observable items; private T next; private boolean hasNext = true; private boolean isNextConsumed = true; private Throwable error = null; + private boolean started = false; - private NextIterator(NextObserver observer) { + private NextIterator(Observable items, NextObserver observer) { + this.items = items; this.observer = observer; } - - // in tests, set the waiting flag without blocking for the next value to - // allow lockstepping instead of multi-threading - /** - * In tests, set the waiting flag without blocking for the next value to - * allow lockstepping instead of multi-threading - * @param value use 1 to enter into the waiting state - */ - void setWaiting(int value) { - observer.setWaiting(value); - } - @Override public boolean hasNext() { if (error != null) { @@ -102,6 +90,13 @@ public boolean hasNext() { private boolean moveToNext() { try { + if (!started) { + started = true; + // if not started, start now + observer.setWaiting(1); + items.materialize().subscribe(observer); + } + Notification nextNotification = observer.takeNext(); if (nextNotification.isOnNext()) { isNextConsumed = false; diff --git a/rxjava-core/src/test/java/rx/internal/operators/BlockingOperatorNextTest.java b/rxjava-core/src/test/java/rx/internal/operators/BlockingOperatorNextTest.java index 5ed0fcbc59..2c4b36dd80 100644 --- a/rxjava-core/src/test/java/rx/internal/operators/BlockingOperatorNextTest.java +++ b/rxjava-core/src/test/java/rx/internal/operators/BlockingOperatorNextTest.java @@ -37,6 +37,7 @@ import rx.internal.operators.BlockingOperatorNext; import rx.observables.BlockingObservable; import rx.schedulers.Schedulers; +import rx.subjects.BehaviorSubject; import rx.subjects.PublishSubject; import rx.subjects.Subject; @@ -295,26 +296,27 @@ public void run() { @Test /* (timeout = 8000) */ public void testSingleSourceManyIterators() throws InterruptedException { - PublishSubject ps = PublishSubject.create(); - BlockingObservable source = ps.take(10).toBlocking(); + Observable o = Observable.interval(10, TimeUnit.MILLISECONDS); + PublishSubject terminal = PublishSubject.create(); + BlockingObservable source = o.takeUntil(terminal).toBlocking(); Iterable iter = source.next(); for (int j = 0; j < 3; j++) { BlockingOperatorNext.NextIterator it = (BlockingOperatorNext.NextIterator)iter.iterator(); - for (long i = 0; i < 9; i++) { - // hasNext has to set the waiting to true, otherwise, all onNext will be skipped - it.setWaiting(1); - ps.onNext(i); + for (long i = 0; i < 10; i++) { Assert.assertEquals(true, it.hasNext()); - Assert.assertEquals(j + "th iteration", Long.valueOf(i), it.next()); + Assert.assertEquals(j + "th iteration next", Long.valueOf(i), it.next()); } - it.setWaiting(1); - ps.onNext(9L); - - Assert.assertEquals(j + "th iteration", false, it.hasNext()); + terminal.onNext(null); } - + } + + @Test + public void testSynchronousNext() { + assertEquals(1, BehaviorSubject.create(1).take(1).toBlocking().single().intValue()); + assertEquals(2, BehaviorSubject.create(2).toBlocking().toIterable().iterator().next().intValue()); + assertEquals(3, BehaviorSubject.create(3).toBlocking().next().iterator().next().intValue()); } }