diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 372bb36c7c..ff20df3c22 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -1010,7 +1010,7 @@ public final Completable cache() { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Completable compose(CompletableTransformer transformer) { - return wrap(transformer.apply(this)); + return wrap(ObjectHelper.requireNonNull(transformer, "transformer is null").apply(this)); } /** @@ -1866,7 +1866,7 @@ private Completable timeout0(long timeout, TimeUnit unit, Scheduler scheduler, C @SchedulerSupport(SchedulerSupport.NONE) public final U to(Function converter) { try { - return converter.apply(this); + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 8a2f850e25..2f30236606 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -6592,7 +6592,7 @@ public final Single collectInto(final U initialItem, BiConsumer Flowable compose(FlowableTransformer composer) { - return fromPublisher(((FlowableTransformer) composer).apply(this)); + return fromPublisher(((FlowableTransformer) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this)); } /** @@ -14543,7 +14543,7 @@ public final Flowable> timestamp(final TimeUnit unit, final Scheduler s @SchedulerSupport(SchedulerSupport.NONE) public final R to(Function, R> converter) { try { - return converter.apply(this); + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 1065dc06f0..779fb79ae9 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -2092,7 +2092,7 @@ public final Maybe cast(final Class clazz) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Maybe compose(MaybeTransformer transformer) { - return wrap(((MaybeTransformer) transformer).apply(this)); + return wrap(((MaybeTransformer) ObjectHelper.requireNonNull(transformer, "transformer is null")).apply(this)); } /** @@ -3065,7 +3065,7 @@ public final Maybe ofType(final Class clazz) { @SchedulerSupport(SchedulerSupport.NONE) public final R to(Function, R> convert) { try { - return convert.apply(this); + return ObjectHelper.requireNonNull(convert, "convert is null").apply(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 6cf91a7df6..2d1e36dd54 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -5921,7 +5921,7 @@ public final Single collectInto(final U initialValue, BiConsumer Observable compose(ObservableTransformer composer) { - return wrap(((ObservableTransformer) composer).apply(this)); + return wrap(((ObservableTransformer) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this)); } /** @@ -12226,7 +12226,7 @@ public final Observable> timestamp(final TimeUnit unit, final Scheduler @SchedulerSupport(SchedulerSupport.NONE) public final R to(Function, R> converter) { try { - return converter.apply(this); + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index f516143c72..d0e65336e2 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1548,7 +1548,7 @@ public final Single hide() { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final Single compose(SingleTransformer transformer) { - return wrap(((SingleTransformer) transformer).apply(this)); + return wrap(((SingleTransformer) ObjectHelper.requireNonNull(transformer, "transformer is null")).apply(this)); } /** @@ -2953,7 +2953,7 @@ private Single timeout0(final long timeout, final TimeUnit unit, final Schedu @SchedulerSupport(SchedulerSupport.NONE) public final R to(Function, R> convert) { try { - return convert.apply(this); + return ObjectHelper.requireNonNull(convert, "convert is null").apply(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java index b2eab028c1..f7484bc9d5 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -49,7 +49,7 @@ public abstract class ParallelFlowable { * of items must be equal to the parallelism level of this ParallelFlowable * @see #parallelism() */ - public abstract void subscribe(Subscriber[] subscribers); + public abstract void subscribe(@NonNull Subscriber[] subscribers); /** * Returns the number of expected parallel Subscribers. @@ -64,7 +64,7 @@ public abstract class ParallelFlowable { * @param subscribers the array of Subscribers * @return true if the number of subscribers equals to the parallelism level */ - protected final boolean validate(Subscriber[] subscribers) { + protected final boolean validate(@NonNull Subscriber[] subscribers) { int p = parallelism(); if (subscribers.length != p) { Throwable iae = new IllegalArgumentException("parallelism = " + p + ", subscribers = " + subscribers.length); @@ -84,7 +84,7 @@ protected final boolean validate(Subscriber[] subscribers) { * @return the ParallelFlowable instance */ @CheckReturnValue - public static ParallelFlowable from(Publisher source) { + public static ParallelFlowable from(@NonNull Publisher source) { return from(source, Runtime.getRuntime().availableProcessors(), Flowable.bufferSize()); } @@ -96,7 +96,7 @@ public static ParallelFlowable from(Publisher source) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public static ParallelFlowable from(Publisher source, int parallelism) { + public static ParallelFlowable from(@NonNull Publisher source, int parallelism) { return from(source, parallelism, Flowable.bufferSize()); } @@ -112,7 +112,7 @@ public static ParallelFlowable from(Publisher source, int pa * @return the new ParallelFlowable instance */ @CheckReturnValue - public static ParallelFlowable from(Publisher source, + public static ParallelFlowable from(@NonNull Publisher source, int parallelism, int prefetch) { ObjectHelper.requireNonNull(source, "source"); ObjectHelper.verifyPositive(parallelism, "parallelism"); @@ -130,7 +130,7 @@ public static ParallelFlowable from(Publisher source, * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable map(Function mapper) { + public final ParallelFlowable map(@NonNull Function mapper) { ObjectHelper.requireNonNull(mapper, "mapper"); return RxJavaPlugins.onAssembly(new ParallelMap(this, mapper)); } @@ -143,7 +143,7 @@ public final ParallelFlowable map(Function mapper * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable filter(Predicate predicate) { + public final ParallelFlowable filter(@NonNull Predicate predicate) { ObjectHelper.requireNonNull(predicate, "predicate"); return RxJavaPlugins.onAssembly(new ParallelFilter(this, predicate)); } @@ -168,7 +168,7 @@ public final ParallelFlowable filter(Predicate predicate) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable runOn(Scheduler scheduler) { + public final ParallelFlowable runOn(@NonNull Scheduler scheduler) { return runOn(scheduler, Flowable.bufferSize()); } @@ -194,7 +194,7 @@ public final ParallelFlowable runOn(Scheduler scheduler) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable runOn(Scheduler scheduler, int prefetch) { + public final ParallelFlowable runOn(@NonNull Scheduler scheduler, int prefetch) { ObjectHelper.requireNonNull(scheduler, "scheduler"); ObjectHelper.verifyPositive(prefetch, "prefetch"); return RxJavaPlugins.onAssembly(new ParallelRunOn(this, scheduler, prefetch)); @@ -209,7 +209,7 @@ public final ParallelFlowable runOn(Scheduler scheduler, int prefetch) { * @return the new Flowable instance emitting the reduced value or empty if the ParallelFlowable was empty */ @CheckReturnValue - public final Flowable reduce(BiFunction reducer) { + public final Flowable reduce(@NonNull BiFunction reducer) { ObjectHelper.requireNonNull(reducer, "reducer"); return RxJavaPlugins.onAssembly(new ParallelReduceFull(this, reducer)); } @@ -226,7 +226,7 @@ public final Flowable reduce(BiFunction reducer) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable reduce(Callable initialSupplier, BiFunction reducer) { + public final ParallelFlowable reduce(@NonNull Callable initialSupplier, @NonNull BiFunction reducer) { ObjectHelper.requireNonNull(initialSupplier, "initialSupplier"); ObjectHelper.requireNonNull(reducer, "reducer"); return RxJavaPlugins.onAssembly(new ParallelReduce(this, initialSupplier, reducer)); @@ -341,7 +341,7 @@ public final Flowable sequentialDelayError(int prefetch) { * @return the new Flowable instance */ @CheckReturnValue - public final Flowable sorted(Comparator comparator) { + public final Flowable sorted(@NonNull Comparator comparator) { return sorted(comparator, 16); } @@ -356,7 +356,7 @@ public final Flowable sorted(Comparator comparator) { * @return the new Flowable instance */ @CheckReturnValue - public final Flowable sorted(Comparator comparator, int capacityHint) { + public final Flowable sorted(@NonNull Comparator comparator, int capacityHint) { ObjectHelper.requireNonNull(comparator, "comparator is null"); ObjectHelper.verifyPositive(capacityHint, "capacityHint"); int ch = capacityHint / parallelism() + 1; @@ -375,7 +375,7 @@ public final Flowable sorted(Comparator comparator, int capacityHi * @return the new Px instannce */ @CheckReturnValue - public final Flowable> toSortedList(Comparator comparator) { + public final Flowable> toSortedList(@NonNull Comparator comparator) { return toSortedList(comparator, 16); } /** @@ -388,7 +388,7 @@ public final Flowable> toSortedList(Comparator comparator) { * @return the new Px instannce */ @CheckReturnValue - public final Flowable> toSortedList(Comparator comparator, int capacityHint) { + public final Flowable> toSortedList(@NonNull Comparator comparator, int capacityHint) { ObjectHelper.requireNonNull(comparator, "comparator is null"); ObjectHelper.verifyPositive(capacityHint, "capacityHint"); @@ -408,7 +408,7 @@ public final Flowable> toSortedList(Comparator comparator, in * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable doOnNext(Consumer onNext) { + public final ParallelFlowable doOnNext(@NonNull Consumer onNext) { ObjectHelper.requireNonNull(onNext, "onNext is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, onNext, @@ -430,7 +430,7 @@ public final ParallelFlowable doOnNext(Consumer onNext) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable doAfterNext(Consumer onAfterNext) { + public final ParallelFlowable doAfterNext(@NonNull Consumer onAfterNext) { ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), @@ -451,7 +451,7 @@ public final ParallelFlowable doAfterNext(Consumer onAfterNext) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable doOnError(Consumer onError) { + public final ParallelFlowable doOnError(@NonNull Consumer onError) { ObjectHelper.requireNonNull(onError, "onError is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), @@ -472,7 +472,7 @@ public final ParallelFlowable doOnError(Consumer onError) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable doOnComplete(Action onComplete) { + public final ParallelFlowable doOnComplete(@NonNull Action onComplete) { ObjectHelper.requireNonNull(onComplete, "onComplete is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), @@ -493,7 +493,7 @@ public final ParallelFlowable doOnComplete(Action onComplete) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable doAfterTerminated(Action onAfterTerminate) { + public final ParallelFlowable doAfterTerminated(@NonNull Action onAfterTerminate) { ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), @@ -514,7 +514,7 @@ public final ParallelFlowable doAfterTerminated(Action onAfterTerminate) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable doOnSubscribe(Consumer onSubscribe) { + public final ParallelFlowable doOnSubscribe(@NonNull Consumer onSubscribe) { ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), @@ -535,7 +535,7 @@ public final ParallelFlowable doOnSubscribe(Consumer on * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable doOnRequest(LongConsumer onRequest) { + public final ParallelFlowable doOnRequest(@NonNull LongConsumer onRequest) { ObjectHelper.requireNonNull(onRequest, "onRequest is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), @@ -556,7 +556,7 @@ public final ParallelFlowable doOnRequest(LongConsumer onRequest) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable doOnCancel(Action onCancel) { + public final ParallelFlowable doOnCancel(@NonNull Action onCancel) { ObjectHelper.requireNonNull(onCancel, "onCancel is null"); return RxJavaPlugins.onAssembly(new ParallelPeek(this, Functions.emptyConsumer(), @@ -580,7 +580,7 @@ public final ParallelFlowable doOnCancel(Action onCancel) { * @return the new ParallelFlowable instance */ @CheckReturnValue - public final ParallelFlowable collect(Callable collectionSupplier, BiConsumer collector) { + public final ParallelFlowable collect(@NonNull Callable collectionSupplier, @NonNull BiConsumer collector) { ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null"); ObjectHelper.requireNonNull(collector, "collector is null"); return RxJavaPlugins.onAssembly(new ParallelCollect(this, collectionSupplier, collector)); @@ -595,7 +595,7 @@ public final ParallelFlowable collect(Callable collectionSup * @return the new ParallelFlowable instance */ @CheckReturnValue - public static ParallelFlowable fromArray(Publisher... publishers) { + public static ParallelFlowable fromArray(@NonNull Publisher... publishers) { if (publishers.length == 0) { throw new IllegalArgumentException("Zero publishers not supported"); } @@ -611,9 +611,9 @@ public static ParallelFlowable fromArray(Publisher... publishers) { * @return the value returned by the converter function */ @CheckReturnValue - public final U to(Function, U> converter) { + public final U to(@NonNull Function, U> converter) { try { - return converter.apply(this); + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); @@ -629,8 +629,8 @@ public final U to(Function, U> converter) { * @return the ParallelFlowable returned by the function */ @CheckReturnValue - public final ParallelFlowable compose(Function, ParallelFlowable> composer) { - return RxJavaPlugins.onAssembly(to(composer)); + public final ParallelFlowable compose(@NonNull ParallelTransformer composer) { + return RxJavaPlugins.onAssembly(ObjectHelper.requireNonNull(composer, "composer is null").apply(this)); } /** @@ -643,7 +643,7 @@ public final ParallelFlowable compose(Function ParallelFlowable flatMap(Function> mapper) { + public final ParallelFlowable flatMap(@NonNull Function> mapper) { return flatMap(mapper, false, Integer.MAX_VALUE, Flowable.bufferSize()); } @@ -659,7 +659,7 @@ public final ParallelFlowable flatMap(Function ParallelFlowable flatMap( - Function> mapper, boolean delayError) { + @NonNull Function> mapper, boolean delayError) { return flatMap(mapper, delayError, Integer.MAX_VALUE, Flowable.bufferSize()); } @@ -677,7 +677,7 @@ public final ParallelFlowable flatMap( */ @CheckReturnValue public final ParallelFlowable flatMap( - Function> mapper, boolean delayError, int maxConcurrency) { + @NonNull Function> mapper, boolean delayError, int maxConcurrency) { return flatMap(mapper, delayError, maxConcurrency, Flowable.bufferSize()); } @@ -695,7 +695,7 @@ public final ParallelFlowable flatMap( */ @CheckReturnValue public final ParallelFlowable flatMap( - Function> mapper, + @NonNull Function> mapper, boolean delayError, int maxConcurrency, int prefetch) { ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency"); @@ -714,7 +714,7 @@ public final ParallelFlowable flatMap( */ @CheckReturnValue public final ParallelFlowable concatMap( - Function> mapper) { + @NonNull Function> mapper) { return concatMap(mapper, 2); } @@ -730,7 +730,7 @@ public final ParallelFlowable concatMap( */ @CheckReturnValue public final ParallelFlowable concatMap( - Function> mapper, + @NonNull Function> mapper, int prefetch) { ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(prefetch, "prefetch"); @@ -750,7 +750,7 @@ public final ParallelFlowable concatMap( */ @CheckReturnValue public final ParallelFlowable concatMapDelayError( - Function> mapper, + @NonNull Function> mapper, boolean tillTheEnd) { return concatMapDelayError(mapper, 2, tillTheEnd); } @@ -768,7 +768,7 @@ public final ParallelFlowable concatMapDelayError( */ @CheckReturnValue public final ParallelFlowable concatMapDelayError( - Function> mapper, + @NonNull Function> mapper, int prefetch, boolean tillTheEnd) { ObjectHelper.requireNonNull(mapper, "mapper is null"); ObjectHelper.verifyPositive(prefetch, "prefetch"); diff --git a/src/main/java/io/reactivex/parallel/ParallelTransformer.java b/src/main/java/io/reactivex/parallel/ParallelTransformer.java new file mode 100644 index 0000000000..f6837bf948 --- /dev/null +++ b/src/main/java/io/reactivex/parallel/ParallelTransformer.java @@ -0,0 +1,36 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import io.reactivex.annotations.*; + +/** + * Interface to compose ParallelFlowable. + * + * @param the upstream value type + * @param the downstream value type + * + * @since 2.0.8 - experimental + */ +@Experimental +public interface ParallelTransformer { + /** + * Applies a function to the upstream ParallelFlowable and returns a ParallelFlowable with + * optionally different element type. + * @param upstream the upstream ParallelFlowable instance + * @return the transformed ParallelFlowable instance + */ + @NonNull + ParallelFlowable apply(@NonNull ParallelFlowable upstream); +} \ No newline at end of file diff --git a/src/test/java/io/reactivex/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/ParamValidationCheckerTest.java index 3a69a87565..d0993cd10a 100644 --- a/src/test/java/io/reactivex/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/ParamValidationCheckerTest.java @@ -24,6 +24,7 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.parallel.*; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.schedulers.Schedulers; @@ -58,6 +59,11 @@ public void checkCompletable() { checkClass(Completable.class); } + @Test(timeout = 30000) + public void checkParallelFlowable() { + checkClass(ParallelFlowable.class); + } + // --------------------------------------------------------------------------------------- // --------------------------------------------------------------------------------------- @@ -546,6 +552,8 @@ public void checkCompletable() { defaultValues.put(Object[].class, new Object[] { new Object(), new Object() }); defaultValues.put(Future.class, new FutureTask(Functions.EMPTY_RUNNABLE, 1)); + defaultValues.put(ParallelFlowable.class, ParallelFlowable.from(Flowable.never())); + defaultValues.put(Subscriber[].class, new Subscriber[] { new AllFunctionals() }); // ----------------------------------------------------------------------------------- defaultInstances = new HashMap, List>(); @@ -572,7 +580,9 @@ public void checkCompletable() { addDefaultInstance(Maybe.class, Maybe.just(1), "Just(1)"); addDefaultInstance(Maybe.class, Maybe.just(1).hide(), "Just(1).Hide()"); - } + + addDefaultInstance(ParallelFlowable.class, Flowable.just(1).parallel(), "Just(1)"); +} static void addIgnore(ParamIgnore ignore) { String key = ignore.toString(); @@ -836,9 +846,14 @@ static final class AllFunctionals FlowableTransformer, ObservableTransformer, SingleTransformer, MaybeTransformer, CompletableTransformer, Subscriber, FlowableSubscriber, Observer, SingleObserver, MaybeObserver, CompletableObserver, FlowableOperator, ObservableOperator, SingleOperator, MaybeOperator, CompletableOperator, - Comparator + Comparator, ParallelTransformer { + @Override + public ParallelFlowable apply(ParallelFlowable upstream) { + return null; + } + @Override public Object apply(Object t1, Object t2, Object t3, Object t4, Object t5, Object t6, Object t7, Object t8, Object t9) throws Exception { diff --git a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java index c46d6a07c5..dcf3e81e69 100644 --- a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java +++ b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java @@ -1115,9 +1115,9 @@ public Flowable apply(ParallelFlowable pf) throws Exception { public void compose() { Flowable.range(1, 5) .parallel() - .compose(new Function, ParallelFlowable>() { + .compose(new ParallelTransformer() { @Override - public ParallelFlowable apply(ParallelFlowable pf) throws Exception { + public ParallelFlowable apply(ParallelFlowable pf) { return pf.map(new Function() { @Override public Integer apply(Integer v) throws Exception {