Skip to content

Commit

Permalink
2.x: Add PublishProcessor JMH perf comparison (#5675)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Oct 17, 2017
1 parent b0bc478 commit 1ad6647
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 2 deletions.
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ targetCompatibility = JavaVersion.VERSION_1_6
def junitVersion = "4.12"
def reactiveStreamsVersion = "1.0.1"
def mockitoVersion = "2.1.0"
def jmhVersion = "1.16"
def jmhLibVersion = "1.19"
def testNgVersion = "6.11"

// --------------------------------------
Expand Down Expand Up @@ -192,8 +192,9 @@ publishing.publications.all {
}

jmh {
jmhVersion = "1.19"
jmhVersion = jmhLibVersion
humanOutputFile = null
includeTests = false

if (project.hasProperty("jmh")) {
include = ".*" + project.jmh + ".*"
Expand Down
56 changes: 56 additions & 0 deletions src/jmh/java/io/reactivex/PerfBoundedSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import java.util.concurrent.CountDownLatch;

import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Subscription;

/**
* Performance subscriber with a one-time request from the upstream.
*/
public class PerfBoundedSubscriber extends CountDownLatch implements FlowableSubscriber<Object> {

final Blackhole bh;

final long request;

public PerfBoundedSubscriber(Blackhole bh, long request) {
super(1);
this.bh = bh;
this.request = request;
}

@Override
public void onSubscribe(Subscription s) {
s.request(request);
}

@Override
public void onComplete() {
countDown();
}

@Override
public void onError(Throwable e) {
countDown();
}

@Override
public void onNext(Object t) {
bh.consume(t);
}

}
109 changes: 109 additions & 0 deletions src/jmh/java/io/reactivex/PublishProcessorPerf.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex;

import java.util.concurrent.TimeUnit;

import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;

import io.reactivex.processors.PublishProcessor;
import io.reactivex.subjects.PublishSubject;

@BenchmarkMode(Mode.Throughput)
@Warmup(iterations = 5)
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@State(Scope.Thread)
public class PublishProcessorPerf {

PublishProcessor<Integer> unbounded;

PublishProcessor<Integer> bounded;

PublishSubject<Integer> subject;

@Setup
public void setup(Blackhole bh) {
unbounded = PublishProcessor.create();
unbounded.subscribe(new PerfConsumer(bh));

bounded = PublishProcessor.create();
bounded.subscribe(new PerfBoundedSubscriber(bh, 1000 * 1000));

subject = PublishSubject.create();
subject.subscribe(new PerfConsumer(bh));
}

@Benchmark
public void unbounded1() {
unbounded.onNext(1);
}

@Benchmark
public void unbounded1k() {
for (int i = 0; i < 1000; i++) {
unbounded.onNext(1);
}
}

@Benchmark
public void unbounded1m() {
for (int i = 0; i < 1000000; i++) {
unbounded.onNext(1);
}
}

@Benchmark
public void bounded1() {
bounded.onNext(1);
}


@Benchmark
public void bounded1k() {
for (int i = 0; i < 1000; i++) {
bounded.onNext(1);
}
}

@Benchmark
public void bounded1m() {
for (int i = 0; i < 1000000; i++) {
bounded.onNext(1);
}
}


@Benchmark
public void subject1() {
subject.onNext(1);
}


@Benchmark
public void subject1k() {
for (int i = 0; i < 1000; i++) {
subject.onNext(1);
}
}

@Benchmark
public void subject1m() {
for (int i = 0; i < 1000000; i++) {
subject.onNext(1);
}
}
}

0 comments on commit 1ad6647

Please sign in to comment.