diff --git a/build.gradle b/build.gradle index 9da4ae5757..f3f16d4eb4 100644 --- a/build.gradle +++ b/build.gradle @@ -54,7 +54,7 @@ targetCompatibility = JavaVersion.VERSION_1_6 def junitVersion = "4.12" def reactiveStreamsVersion = "1.0.2" def mockitoVersion = "2.1.0" -def jmhLibVersion = "1.19" +def jmhLibVersion = "1.20" def testNgVersion = "6.11" def guavaVersion = "24.0-jre" def jacocoVersion = "0.8.0" @@ -201,6 +201,7 @@ jmh { jmhVersion = jmhLibVersion humanOutputFile = null includeTests = false + jvmArgsAppend = ["-Djmh.separateClasspathJAR=true"] if (project.hasProperty("jmh")) { include = ".*" + project.jmh + ".*" diff --git a/src/jmh/java/io/reactivex/FlowableFlatMapCompletablePerf.java b/src/jmh/java/io/reactivex/FlowableFlatMapCompletablePerf.java new file mode 100644 index 0000000000..ec846d35eb --- /dev/null +++ b/src/jmh/java/io/reactivex/FlowableFlatMapCompletablePerf.java @@ -0,0 +1,73 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.functions.Action; +import io.reactivex.internal.functions.Functions; +import io.reactivex.schedulers.Schedulers; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableFlatMapCompletablePerf implements Action { + + @Param({"1", "10", "100", "1000", "10000", "100000", "1000000"}) + int items; + + @Param({"1", "8", "32", "128", "256"}) + int maxConcurrency; + + @Param({"1", "10", "100", "1000"}) + int work; + + Completable flatMapCompletable; + + Flowable flatMap; + + @Override + public void run() throws Exception { + Blackhole.consumeCPU(work); + } + + @Setup + public void setup() { + Integer[] array = new Integer[items]; + Arrays.fill(array, 777); + + flatMapCompletable = Flowable.fromArray(array) + .flatMapCompletable(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation())), false, maxConcurrency); + + flatMap = Flowable.fromArray(array) + .flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), false, maxConcurrency); + } + +// @Benchmark + public Object flatMap(Blackhole bh) { + return flatMap.subscribeWith(new PerfAsyncConsumer(bh)).await(items); + } + + @Benchmark + public Object flatMapCompletable(Blackhole bh) { + return flatMapCompletable.subscribeWith(new PerfAsyncConsumer(bh)).await(items); + } +} diff --git a/src/jmh/java/io/reactivex/FlowableFlatMapCompletableSyncPerf.java b/src/jmh/java/io/reactivex/FlowableFlatMapCompletableSyncPerf.java new file mode 100644 index 0000000000..047bad30a3 --- /dev/null +++ b/src/jmh/java/io/reactivex/FlowableFlatMapCompletableSyncPerf.java @@ -0,0 +1,63 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.infra.Blackhole; + +import io.reactivex.internal.functions.Functions; + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = 5) +@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS) +@OutputTimeUnit(TimeUnit.SECONDS) +@Fork(value = 1) +@State(Scope.Thread) +public class FlowableFlatMapCompletableSyncPerf { + + @Param({"1", "10", "100", "1000", "10000", "100000", "1000000"}) + int items; + + @Param({"1", "8", "32", "128", "256"}) + int maxConcurrency; + + Completable flatMapCompletable; + + Flowable flatMap; + + @Setup + public void setup() { + Integer[] array = new Integer[items]; + Arrays.fill(array, 777); + + flatMapCompletable = Flowable.fromArray(array) + .flatMapCompletable(Functions.justFunction(Completable.complete()), false, maxConcurrency); + + flatMap = Flowable.fromArray(array) + .flatMap(Functions.justFunction(Completable.complete().toFlowable()), false, maxConcurrency); + } + + @Benchmark + public Object flatMap(Blackhole bh) { + return flatMap.subscribeWith(new PerfConsumer(bh)); + } + + @Benchmark + public Object flatMapCompletable(Blackhole bh) { + return flatMapCompletable.subscribeWith(new PerfConsumer(bh)); + } +} diff --git a/src/jmh/java/io/reactivex/PerfAsyncConsumer.java b/src/jmh/java/io/reactivex/PerfAsyncConsumer.java index 8db609b4b5..0b99202afc 100644 --- a/src/jmh/java/io/reactivex/PerfAsyncConsumer.java +++ b/src/jmh/java/io/reactivex/PerfAsyncConsumer.java @@ -68,8 +68,9 @@ public void onComplete() { /** * Wait for the terminal signal. * @param count if less than 1001, a spin-wait is used + * @return this */ - public void await(int count) { + public PerfAsyncConsumer await(int count) { if (count <= 1000) { while (getCount() != 0) { } } else { @@ -79,6 +80,7 @@ public void await(int count) { throw new RuntimeException(ex); } } + return this; } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java index c91e7b8ef9..d132f56f7b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java @@ -25,6 +25,7 @@ import io.reactivex.disposables.*; import io.reactivex.exceptions.*; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.fuseable.*; import io.reactivex.observers.*; import io.reactivex.processors.PublishProcessor; @@ -523,4 +524,21 @@ public CompletableSource apply(Integer v) throws Exception { .test() .assertFailure(TestException.class); } + + @Test + public void asyncMaxConcurrency() { + for (int itemCount = 1; itemCount <= 100000; itemCount *= 10) { + for (int concurrency = 1; concurrency <= 256; concurrency *= 2) { + Flowable.range(1, itemCount) + .flatMapCompletable( + Functions.justFunction(Completable.complete() + .subscribeOn(Schedulers.computation())) + , false, concurrency) + .test() + .withTag("itemCount=" + itemCount + ", concurrency=" + concurrency) + .awaitDone(5, TimeUnit.SECONDS) + .assertResult(); + } + } + } }