diff --git a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java index e50246a988..5eb11d3e4e 100644 --- a/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java +++ b/src/main/java/io/reactivex/internal/operators/parallel/ParallelFromPublisher.java @@ -116,7 +116,7 @@ public void onSubscribe(Subscription s) { @SuppressWarnings("unchecked") QueueSubscription qs = (QueueSubscription) s; - int m = qs.requestFusion(QueueSubscription.ANY); + int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY); if (m == QueueSubscription.SYNC) { sourceMode = m; diff --git a/src/test/java/io/reactivex/parallel/ParallelFromPublisherTest.java b/src/test/java/io/reactivex/parallel/ParallelFromPublisherTest.java index 874fd62fba..69ccfc3db6 100644 --- a/src/test/java/io/reactivex/parallel/ParallelFromPublisherTest.java +++ b/src/test/java/io/reactivex/parallel/ParallelFromPublisherTest.java @@ -14,15 +14,22 @@ package io.reactivex.parallel; import static org.junit.Assert.*; + +import java.util.*; +import java.util.concurrent.*; + import org.junit.Test; -import org.reactivestreams.Subscriber; +import org.reactivestreams.*; -import io.reactivex.Flowable; +import io.reactivex.*; import io.reactivex.exceptions.*; -import io.reactivex.functions.Function; +import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.fuseable.QueueSubscription; +import io.reactivex.internal.subscribers.BasicFuseableSubscriber; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.processors.UnicastProcessor; +import io.reactivex.schedulers.Schedulers; public class ParallelFromPublisherTest { @@ -53,6 +60,53 @@ public void fusedFilterBecomesEmpty() { .assertResult(); } + static final class StripBoundary extends Flowable implements FlowableTransformer { + + final Flowable source; + + StripBoundary(Flowable source) { + this.source = source; + } + + @Override + public Publisher apply(Flowable upstream) { + return new StripBoundary(upstream); + } + + @Override + protected void subscribeActual(Subscriber s) { + source.subscribe(new StripBoundarySubscriber(s)); + } + + static final class StripBoundarySubscriber extends BasicFuseableSubscriber { + + StripBoundarySubscriber(Subscriber actual) { + super(actual); + } + + @Override + public void onNext(T t) { + actual.onNext(t); + } + + @Override + public int requestFusion(int mode) { + QueueSubscription fs = qs; + if (fs != null) { + int m = fs.requestFusion(mode & ~QueueSubscription.BOUNDARY); + this.sourceMode = m; + return m; + } + return QueueSubscription.NONE; + } + + @Override + public T poll() throws Exception { + return qs.poll(); + } + } + } + @Test public void syncFusedMapCrash() { Flowable.just(1) @@ -62,6 +116,7 @@ public Object apply(Integer v) throws Exception { throw new TestException(); } }) + .compose(new StripBoundary(null)) .parallel() .sequential() .test() @@ -81,6 +136,7 @@ public Object apply(Integer v) throws Exception { throw new TestException(); } }) + .compose(new StripBoundary(null)) .parallel() .sequential() .test() @@ -88,4 +144,45 @@ public Object apply(Integer v) throws Exception { assertFalse(up.hasSubscribers()); } + + @Test + public void boundaryConfinement() { + final Set between = new HashSet(); + final ConcurrentHashMap processing = new ConcurrentHashMap(); + + Flowable.range(1, 10) + .observeOn(Schedulers.single(), false, 1) + .doOnNext(new Consumer() { + @Override + public void accept(Integer v) throws Exception { + between.add(Thread.currentThread().getName()); + } + }) + .parallel(2, 1) + .runOn(Schedulers.computation(), 1) + .map(new Function() { + @Override + public Object apply(Integer v) throws Exception { + processing.putIfAbsent(Thread.currentThread().getName(), ""); + return v; + } + }) + .sequential() + .test() + .awaitDone(5, TimeUnit.SECONDS) + .assertSubscribed() + .assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + .assertComplete() + .assertNoErrors() + ; + + assertEquals(between.toString(), 1, between.size()); + assertTrue(between.toString(), between.iterator().next().contains("RxSingleScheduler")); + + Map map = processing; // AnimalSniffer: CHM.keySet() in Java 8 returns KeySetView + + for (String e : map.keySet()) { + assertTrue(map.toString(), e.contains("RxComputationThreadPool")); + } + } }