Skip to content

Commit

Permalink
2.x: make parallel() a fusion-async-boundary
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd committed Oct 17, 2017
1 parent 1ad6647 commit 45a04ca
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void onSubscribe(Subscription s) {
@SuppressWarnings("unchecked")
QueueSubscription<T> qs = (QueueSubscription<T>) s;

int m = qs.requestFusion(QueueSubscription.ANY);
int m = qs.requestFusion(QueueSubscription.ANY | QueueSubscription.BOUNDARY);

if (m == QueueSubscription.SYNC) {
sourceMode = m;
Expand Down
103 changes: 100 additions & 3 deletions src/test/java/io/reactivex/parallel/ParallelFromPublisherTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@
package io.reactivex.parallel;

import static org.junit.Assert.*;

import java.util.*;
import java.util.concurrent.*;

import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.*;

import io.reactivex.Flowable;
import io.reactivex.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Function;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.fuseable.QueueSubscription;
import io.reactivex.internal.subscribers.BasicFuseableSubscriber;
import io.reactivex.internal.subscriptions.BooleanSubscription;
import io.reactivex.processors.UnicastProcessor;
import io.reactivex.schedulers.Schedulers;

public class ParallelFromPublisherTest {

Expand Down Expand Up @@ -53,6 +60,53 @@ public void fusedFilterBecomesEmpty() {
.assertResult();
}

static final class StripBoundary<T> extends Flowable<T> implements FlowableTransformer<T, T> {

final Flowable<T> source;

StripBoundary(Flowable<T> source) {
this.source = source;
}

@Override
public Publisher<T> apply(Flowable<T> upstream) {
return new StripBoundary<T>(upstream);
}

@Override
protected void subscribeActual(Subscriber<? super T> s) {
source.subscribe(new StripBoundarySubscriber<T>(s));
}

static final class StripBoundarySubscriber<T> extends BasicFuseableSubscriber<T, T> {

StripBoundarySubscriber(Subscriber<? super T> actual) {
super(actual);
}

@Override
public void onNext(T t) {
actual.onNext(t);
}

@Override
public int requestFusion(int mode) {
QueueSubscription<T> fs = qs;
if (fs != null) {
int m = fs.requestFusion(mode & ~QueueSubscription.BOUNDARY);
this.sourceMode = m;
return m;
}
return QueueSubscription.NONE;
}

@Override
public T poll() throws Exception {
return qs.poll();
}
}
}

@Test
public void syncFusedMapCrash() {
Flowable.just(1)
Expand All @@ -62,6 +116,7 @@ public Object apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(new StripBoundary<Object>(null))
.parallel()
.sequential()
.test()
Expand All @@ -81,11 +136,53 @@ public Object apply(Integer v) throws Exception {
throw new TestException();
}
})
.compose(new StripBoundary<Object>(null))
.parallel()
.sequential()
.test()
.assertFailure(TestException.class);

assertFalse(up.hasSubscribers());
}

@Test
public void boundaryConfinement() {
final Set<String> between = new HashSet<String>();
final ConcurrentHashMap<String, String> processing = new ConcurrentHashMap<String, String>();

Flowable.range(1, 10)
.observeOn(Schedulers.single(), false, 1)
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer v) throws Exception {
between.add(Thread.currentThread().getName());
}
})
.parallel(2, 1)
.runOn(Schedulers.computation(), 1)
.map(new Function<Integer, Object>() {
@Override
public Object apply(Integer v) throws Exception {
processing.putIfAbsent(Thread.currentThread().getName(), "");
return v;
}
})
.sequential()
.test()
.awaitDone(5, TimeUnit.SECONDS)
.assertSubscribed()
.assertValueSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
.assertComplete()
.assertNoErrors()
;

assertEquals(between.toString(), 1, between.size());
assertTrue(between.toString(), between.iterator().next().contains("RxSingleScheduler"));

Map<String, String> map = processing; // AnimalSniffer: CHM.keySet() in Java 8 returns KeySetView

for (String e : map.keySet()) {
assertTrue(map.toString(), e.contains("RxComputationThreadPool"));
}
}
}

0 comments on commit 45a04ca

Please sign in to comment.