Skip to content

Commit

Permalink
2.x: Perf measure of Flowable flatMap & flatMapCompletable (#5902)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Mar 9, 2018
1 parent 7bdcb59 commit 4e50ea4
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 2 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ targetCompatibility = JavaVersion.VERSION_1_6
def junitVersion = "4.12"
def reactiveStreamsVersion = "1.0.2"
def mockitoVersion = "2.1.0"
def jmhLibVersion = "1.19"
def jmhLibVersion = "1.20"
def testNgVersion = "6.11"
def guavaVersion = "24.0-jre"
def jacocoVersion = "0.8.0"
Expand Down Expand Up @@ -201,6 +201,7 @@ jmh {
jmhVersion = jmhLibVersion
humanOutputFile = null
includeTests = false
jvmArgsAppend = ["-Djmh.separateClasspathJAR=true"]

if (project.hasProperty("jmh")) {
include = ".*" + project.jmh + ".*"
Expand Down
73 changes: 73 additions & 0 deletions src/jmh/java/io/reactivex/FlowableFlatMapCompletablePerf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;

import io.reactivex.functions.Action;
import io.reactivex.internal.functions.Functions;
import io.reactivex.schedulers.Schedulers;

@BenchmarkMode(Mode.Throughput)
@Warmup(iterations = 5)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@State(Scope.Thread)
public class FlowableFlatMapCompletablePerf implements Action {

@Param({"1", "10", "100", "1000", "10000", "100000", "1000000"})
int items;

@Param({"1", "8", "32", "128", "256"})
int maxConcurrency;

@Param({"1", "10", "100", "1000"})
int work;

Completable flatMapCompletable;

Flowable<Object> flatMap;

@Override
public void run() throws Exception {
Blackhole.consumeCPU(work);
}

@Setup
public void setup() {
Integer[] array = new Integer[items];
Arrays.fill(array, 777);

flatMapCompletable = Flowable.fromArray(array)
.flatMapCompletable(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation())), false, maxConcurrency);

flatMap = Flowable.fromArray(array)
.flatMap(Functions.justFunction(Completable.fromAction(this).subscribeOn(Schedulers.computation()).toFlowable()), false, maxConcurrency);
}

// @Benchmark
public Object flatMap(Blackhole bh) {
return flatMap.subscribeWith(new PerfAsyncConsumer(bh)).await(items);
}

@Benchmark
public Object flatMapCompletable(Blackhole bh) {
return flatMapCompletable.subscribeWith(new PerfAsyncConsumer(bh)).await(items);
}
}
63 changes: 63 additions & 0 deletions src/jmh/java/io/reactivex/FlowableFlatMapCompletableSyncPerf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;

import io.reactivex.internal.functions.Functions;

@BenchmarkMode(Mode.Throughput)
@Warmup(iterations = 5)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@State(Scope.Thread)
public class FlowableFlatMapCompletableSyncPerf {

@Param({"1", "10", "100", "1000", "10000", "100000", "1000000"})
int items;

@Param({"1", "8", "32", "128", "256"})
int maxConcurrency;

Completable flatMapCompletable;

Flowable<Object> flatMap;

@Setup
public void setup() {
Integer[] array = new Integer[items];
Arrays.fill(array, 777);

flatMapCompletable = Flowable.fromArray(array)
.flatMapCompletable(Functions.justFunction(Completable.complete()), false, maxConcurrency);

flatMap = Flowable.fromArray(array)
.flatMap(Functions.justFunction(Completable.complete().toFlowable()), false, maxConcurrency);
}

@Benchmark
public Object flatMap(Blackhole bh) {
return flatMap.subscribeWith(new PerfConsumer(bh));
}

@Benchmark
public Object flatMapCompletable(Blackhole bh) {
return flatMapCompletable.subscribeWith(new PerfConsumer(bh));
}
}
4 changes: 3 additions & 1 deletion src/jmh/java/io/reactivex/PerfAsyncConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ public void onComplete() {
/**
* Wait for the terminal signal.
* @param count if less than 1001, a spin-wait is used
* @return this
*/
public void await(int count) {
public PerfAsyncConsumer await(int count) {
if (count <= 1000) {
while (getCount() != 0) { }
} else {
Expand All @@ -79,6 +80,7 @@ public void await(int count) {
throw new RuntimeException(ex);
}
}
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.reactivex.disposables.*;
import io.reactivex.exceptions.*;
import io.reactivex.functions.Function;
import io.reactivex.internal.functions.Functions;
import io.reactivex.internal.fuseable.*;
import io.reactivex.observers.*;
import io.reactivex.processors.PublishProcessor;
Expand Down Expand Up @@ -523,4 +524,21 @@ public CompletableSource apply(Integer v) throws Exception {
.test()
.assertFailure(TestException.class);
}

@Test
public void asyncMaxConcurrency() {
for (int itemCount = 1; itemCount <= 100000; itemCount *= 10) {
for (int concurrency = 1; concurrency <= 256; concurrency *= 2) {
Flowable.range(1, itemCount)
.flatMapCompletable(
Functions.justFunction(Completable.complete()
.subscribeOn(Schedulers.computation()))
, false, concurrency)
.test()
.withTag("itemCount=" + itemCount + ", concurrency=" + concurrency)
.awaitDone(5, TimeUnit.SECONDS)
.assertResult();
}
}
}
}

0 comments on commit 4e50ea4

Please sign in to comment.