diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java index 811ced15f3..151541f63f 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java @@ -21,7 +21,7 @@ import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.disposables.Disposable; -import io.reactivex.rxjava3.exceptions.MissingBackpressureException; +import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.internal.queue.SpscArrayQueue; import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper; import io.reactivex.rxjava3.internal.util.*; @@ -62,7 +62,7 @@ static final class BlockingFlowableIterator long produced; volatile boolean done; - Throwable error; + volatile Throwable error; BlockingFlowableIterator(int batchSize) { this.queue = new SpscArrayQueue(batchSize); @@ -75,6 +75,13 @@ static final class BlockingFlowableIterator @Override public boolean hasNext() { for (;;) { + if (isDisposed()) { + Throwable e = error; + if (e != null) { + throw ExceptionHelper.wrapOrThrow(e); + } + return false; + } boolean d = done; boolean empty = queue.isEmpty(); if (d) { @@ -90,7 +97,7 @@ public boolean hasNext() { BlockingHelper.verifyNonBlocking(); lock.lock(); try { - while (!done && queue.isEmpty()) { + while (!done && queue.isEmpty() && !isDisposed()) { condition.await(); } } catch (InterruptedException ex) { @@ -175,6 +182,7 @@ public void remove() { @Override public void dispose() { SubscriptionHelper.cancel(this); + signalConsumer(); // Just in case it is currently blocking in hasNext. } @Override diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java index b190896f2b..0ba2da1720 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableIterable.java @@ -53,7 +53,7 @@ static final class BlockingObservableIterator final Condition condition; volatile boolean done; - Throwable error; + volatile Throwable error; BlockingObservableIterator(int batchSize) { this.queue = new SpscLinkedArrayQueue(batchSize); @@ -64,6 +64,13 @@ static final class BlockingObservableIterator @Override public boolean hasNext() { for (;;) { + if (isDisposed()) { + Throwable e = error; + if (e != null) { + throw ExceptionHelper.wrapOrThrow(e); + } + return false; + } boolean d = done; boolean empty = queue.isEmpty(); if (d) { @@ -80,7 +87,7 @@ public boolean hasNext() { BlockingHelper.verifyNonBlocking(); lock.lock(); try { - while (!done && queue.isEmpty()) { + while (!done && queue.isEmpty() && !isDisposed()) { condition.await(); } } finally { @@ -146,6 +153,7 @@ public void remove() { @Override public void dispose() { DisposableHelper.dispose(this); + signalConsumer(); // Just in case it is currently blocking in hasNext. } @Override diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java index abb445c149..20bfcf9962 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableToIteratorTest.java @@ -16,14 +16,18 @@ import static org.junit.Assert.*; import java.util.*; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.reactivestreams.*; import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.exceptions.*; import io.reactivex.rxjava3.internal.operators.flowable.BlockingFlowableIterable.BlockingFlowableIterator; import io.reactivex.rxjava3.internal.subscriptions.BooleanSubscription; +import io.reactivex.rxjava3.processors.PublishProcessor; +import io.reactivex.rxjava3.schedulers.Schedulers; public class BlockingFlowableToIteratorTest extends RxJavaTest { @@ -163,4 +167,28 @@ protected void subscribeActual(Subscriber s) { it.next(); } + + @Test(expected = NoSuchElementException.class) + public void disposedIteratorHasNextReturns() { + Iterator it = PublishProcessor.create() + .blockingIterable().iterator(); + ((Disposable)it).dispose(); + assertFalse(it.hasNext()); + it.next(); + } + + @Test + public void asyncDisposeUnblocks() { + final Iterator it = PublishProcessor.create() + .blockingIterable().iterator(); + + Schedulers.single().scheduleDirect(new Runnable() { + @Override + public void run() { + ((Disposable)it).dispose(); + } + }, 1, TimeUnit.SECONDS); + + assertFalse(it.hasNext()); + } } diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableNextTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableNextTest.java index 938f757a0c..194619b8bc 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableNextTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableNextTest.java @@ -28,7 +28,6 @@ import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.internal.operators.observable.BlockingObservableNext.NextObserver; import io.reactivex.rxjava3.plugins.RxJavaPlugins; -import io.reactivex.rxjava3.processors.BehaviorProcessor; import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subjects.*; import io.reactivex.rxjava3.testsupport.TestHelper; @@ -333,9 +332,9 @@ public void singleSourceManyIterators() throws InterruptedException { @Test public void synchronousNext() { - assertEquals(1, BehaviorProcessor.createDefault(1).take(1).blockingSingle().intValue()); - assertEquals(2, BehaviorProcessor.createDefault(2).blockingIterable().iterator().next().intValue()); - assertEquals(3, BehaviorProcessor.createDefault(3).blockingNext().iterator().next().intValue()); + assertEquals(1, BehaviorSubject.createDefault(1).take(1).blockingSingle().intValue()); + assertEquals(2, BehaviorSubject.createDefault(2).blockingIterable().iterator().next().intValue()); + assertEquals(3, BehaviorSubject.createDefault(3).blockingNext().iterator().next().intValue()); } @Test diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableToIteratorTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableToIteratorTest.java index d904ed30dd..3a98208d70 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableToIteratorTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/observable/BlockingObservableToIteratorTest.java @@ -16,15 +16,18 @@ import static org.junit.Assert.*; import java.util.*; +import java.util.concurrent.TimeUnit; import org.junit.Test; import io.reactivex.rxjava3.core.*; import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observer; -import io.reactivex.rxjava3.disposables.Disposables; +import io.reactivex.rxjava3.disposables.*; import io.reactivex.rxjava3.exceptions.TestException; import io.reactivex.rxjava3.internal.operators.observable.BlockingObservableIterable.BlockingObservableIterator; +import io.reactivex.rxjava3.schedulers.Schedulers; +import io.reactivex.rxjava3.subjects.PublishSubject; public class BlockingObservableToIteratorTest extends RxJavaTest { @@ -104,4 +107,28 @@ public void remove() { BlockingObservableIterator it = new BlockingObservableIterator(128); it.remove(); } + + @Test(expected = NoSuchElementException.class) + public void disposedIteratorHasNextReturns() { + Iterator it = PublishSubject.create() + .blockingIterable().iterator(); + ((Disposable)it).dispose(); + assertFalse(it.hasNext()); + it.next(); + } + + @Test + public void asyncDisposeUnblocks() { + final Iterator it = PublishSubject.create() + .blockingIterable().iterator(); + + Schedulers.single().scheduleDirect(new Runnable() { + @Override + public void run() { + ((Disposable)it).dispose(); + } + }, 1, TimeUnit.SECONDS); + + assertFalse(it.hasNext()); + } }