diff --git a/.github/workflows/ci-benchmarks.yml b/.github/workflows/ci-benchmarks.yml new file mode 100644 index 00000000..406bd74d --- /dev/null +++ b/.github/workflows/ci-benchmarks.yml @@ -0,0 +1,37 @@ +name: Benchmarks CI +on: + push: + +jobs: + benchmark: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + os: [ ubuntu-latest, macos-latest ] + steps: + - uses: actions/checkout@v4 + - uses: ./.github/actions/setup-gradle + - run: ./gradlew assembleBenchmarks + + - run: > + ./gradlew + FastCsvLocalRequestChannelBenchmark + FastCsvLocalRequestResponseBenchmark + FastCsvLocalRequestStreamBenchmark + --dry-run + + - run: > + ./gradlew + FastCsvLocalRequestChannelBenchmark + FastCsvLocalRequestResponseBenchmark + FastCsvLocalRequestStreamBenchmark + --no-parallel + --max-workers=1 + + - if: always() && !cancelled() + uses: actions/upload-artifact@v4 + with: + name: benchmark-reports-${{ matrix.os }} + path: "benchmarks/**/build/reports/benchmarks/**/*.csv" + retention-days: 1 diff --git a/.github/workflows/ci-samples.yml b/.github/workflows/ci-samples.yml index 73abb2f4..1dec2167 100644 --- a/.github/workflows/ci-samples.yml +++ b/.github/workflows/ci-samples.yml @@ -19,7 +19,5 @@ jobs: steps: - uses: actions/checkout@v4 - uses: ./.github/actions/setup-gradle - with: - cache-read-only: true - run: ./gradlew build --continue working-directory: samples/${{ matrix.sample }} diff --git a/benchmarks/build.gradle.kts b/benchmarks/build.gradle.kts deleted file mode 100644 index 3d070c14..00000000 --- a/benchmarks/build.gradle.kts +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright 2015-2022 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import kotlinx.benchmark.gradle.* -import org.gradle.kotlin.dsl.benchmark -import org.jetbrains.kotlin.gradle.plugin.mpp.* - -plugins { - rsocket.multiplatform - - alias(libs.plugins.kotlin.allopen) - alias(libs.plugins.kotlinx.benchmark) -} - -val jmhVersionOverride: String by rootProject - -kotlin { - val jvm = jvm() //common jvm source set - val kotlinJvm = jvm("kotlin") //kotlin benchmark - val javaJvm = jvm("java") - - sourceSets { - val commonMain by getting { - dependencies { - implementation(libs.kotlinx.benchmark) - implementation(libs.kotlinx.coroutines.core) - } - } - - val jvmMain by getting - - val kotlinMain by getting { - dependsOn(jvmMain) - dependencies { - implementation(projects.rsocketCore) - implementation(projects.rsocketTransportLocal) - } - } - - val javaMain by getting { - dependsOn(jvmMain) - dependencies { - implementation(libs.kotlinx.coroutines.reactor) - implementation(libs.rsocket.java.core) - implementation(libs.rsocket.java.transport.local) - } - } - } -} - -allOpen { - annotation("org.openjdk.jmh.annotations.State") -} - -benchmark { - targets { - register("jvm") - register("kotlin") - register("java") - } - - targets.withType { jmhVersion = libs.versions.jmh.get() } -} - -tasks.register("jmhProfilers") { - group = "benchmark" - description = "Lists the available JMH profilers" - classpath = (kotlin.targets["jvm"].compilations["main"] as KotlinJvmCompilation).runtimeDependencyFiles - main = "org.openjdk.jmh.Main" - args("-lprof") -} - -fun registerJmhGCTask(target: String): TaskProvider<*> = tasks.register("${target}BenchmarkGC") { - group = "benchmark" - - val buildFolder = buildDir.resolve("benchmarks").resolve(target) - val compilation = (kotlin.targets[target].compilations["main"] as KotlinJvmCompilation) - classpath( - file(buildFolder.resolve("classes")), - file(buildFolder.resolve("resources")), - compilation.runtimeDependencyFiles, - compilation.output.allOutputs - ) - main = "org.openjdk.jmh.Main" - - dependsOn("${target}BenchmarkCompile") - args("-prof", "gc") - args("-jvmArgsPrepend", "-Xmx3072m") - args("-jvmArgsPrepend", "-Xms3072m") - args("-foe", "true") //fail-on-error - args("-v", "NORMAL") //verbosity [SILENT, NORMAL, EXTRA] - args("-rf", "json") - args("-rff", project.file("build/reports/benchmarks/$target/result.json").also { it.parentFile.mkdirs() }) -} - -val t1 = registerJmhGCTask("java") -val t2 = registerJmhGCTask("kotlin") - -tasks.register("benchmarkGC") { - group = "benchmark" - dependsOn(t1.get()) - dependsOn(t2.get()) -} diff --git a/benchmarks/rsocket-java/build.gradle.kts b/benchmarks/rsocket-java/build.gradle.kts new file mode 100644 index 00000000..c1770636 --- /dev/null +++ b/benchmarks/rsocket-java/build.gradle.kts @@ -0,0 +1,47 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import kotlinx.benchmark.gradle.* +import rsocketbuild.* + +plugins { + id("rsocketbuild.multiplatform-benchmarks") +} + +kotlin { + jvmTarget() + + sourceSets { + jvmMain.dependencies { + implementation(projects.benchmarksShared) + + implementation(libs.kotlinx.coroutines.reactor) + implementation(libs.rsocket.java.transport.local) + implementation(libs.rsocket.java.transport.netty) + } + } +} + +benchmark { + targets { + register("jvm") { + this as JvmBenchmarkTarget + jmhVersion = libs.versions.jmh.get() + } + } + + registerBenchmarks("RSocketJava", listOf("local", "tcp", "ws")) +} diff --git a/benchmarks/rsocket-java/src/jvmMain/kotlin/LocalRSocketJavaBenchmark.kt b/benchmarks/rsocket-java/src/jvmMain/kotlin/LocalRSocketJavaBenchmark.kt new file mode 100644 index 00000000..262f64f8 --- /dev/null +++ b/benchmarks/rsocket-java/src/jvmMain/kotlin/LocalRSocketJavaBenchmark.kt @@ -0,0 +1,25 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.java + +import io.rsocket.transport.* +import io.rsocket.transport.local.* + +class LocalRSocketJavaBenchmark : RSocketJavaBenchmark() { + override val serverTransport: ServerTransport<*> = LocalServerTransport.create("local") + override val clientTransport: ClientTransport = LocalClientTransport.create("local") +} diff --git a/benchmarks/src/javaMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketJavaBenchmark.kt b/benchmarks/rsocket-java/src/jvmMain/kotlin/RSocketJavaBenchmark.kt similarity index 51% rename from benchmarks/src/javaMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketJavaBenchmark.kt rename to benchmarks/rsocket-java/src/jvmMain/kotlin/RSocketJavaBenchmark.kt index 96685d7d..d1334408 100644 --- a/benchmarks/src/javaMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketJavaBenchmark.kt +++ b/benchmarks/rsocket-java/src/jvmMain/kotlin/RSocketJavaBenchmark.kt @@ -1,5 +1,5 @@ /* - * Copyright 2015-2022 the original author or authors. + * Copyright 2015-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,32 +14,56 @@ * limitations under the License. */ -package io.rsocket.kotlin.benchmarks +package io.rsocket.kotlin.benchmarks.java import io.rsocket.* import io.rsocket.core.* import io.rsocket.frame.decoder.* -import io.rsocket.transport.local.* +import io.rsocket.kotlin.benchmarks.* +import io.rsocket.transport.* import io.rsocket.util.* +import kotlinx.benchmark.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.reactivestreams.* import reactor.core.publisher.* import kotlin.random.* -class RSocketJavaBenchmark : RSocketBenchmark() { +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = WARMUP, time = WARMUP_DURATION) +@Measurement(iterations = ITERATION, time = ITERATION_DURATION) +@State(Scope.Benchmark) +abstract class RSocketJavaBenchmark : RSocketBenchmark() { + protected abstract val clientTransport: ClientTransport + protected abstract val serverTransport: ServerTransport<*> + + private lateinit var payload: Payload + private lateinit var payloadMono: Mono + private lateinit var payloadsFlux: Flux + private lateinit var payloadsFlow: Flow + private lateinit var client: RSocket + private lateinit var server: Closeable - lateinit var client: RSocket - lateinit var server: Closeable + override fun createPayload(size: Int): Payload = if (size == 0) EmptyPayload.INSTANCE else ByteBufPayload.create( + ByteArray(size / 2).also { Random.nextBytes(it) }, + ByteArray(size / 2).also { Random.nextBytes(it) } + ) + + override fun createPayloadCopy(): Payload = payload.retain() + + override fun releasePayload(payload: Payload) { + payload.release() + } - lateinit var payload: Payload - lateinit var payloadMono: Mono - lateinit var payloadsFlux: Flux - lateinit var payloadsFlow: Flow + override fun consumePayload(bh: Blackhole, value: Payload) = bh.consume(value) + override suspend fun doRequestResponse(): Payload = client.requestResponse(payload.retain()).awaitSingle() + override fun doRequestStream(): Flow = client.requestStream(payload.retain()).asFlow() + override fun doRequestChannel(): Flow = client.requestChannel(payloadsFlow.asPublisher()).asFlow() + + @Setup override fun setup() { payload = createPayload(payloadSize) - payloadMono = Mono.fromSupplier(payload::retain) payloadsFlux = Flux.range(0, 5000).map { payload.retain() } payloadsFlow = flow { repeat(5000) { emit(payload.retain()) } } @@ -61,33 +85,48 @@ class RSocketJavaBenchmark : RSocketBenchmark() { }) } .payloadDecoder(PayloadDecoder.ZERO_COPY) - .bind(LocalServerTransport.create("server")) + .bind(serverTransport) .block()!! client = RSocketConnector.create() .payloadDecoder(PayloadDecoder.ZERO_COPY) - .connect(LocalClientTransport.create("server")) + .connect(clientTransport) .block()!! } + @TearDown override fun cleanup() { client.dispose() server.dispose() } - override fun createPayload(size: Int): Payload = if (size == 0) EmptyPayload.INSTANCE else ByteBufPayload.create( - ByteArray(size / 2).also { Random.nextBytes(it) }, - ByteArray(size / 2).also { Random.nextBytes(it) } - ) + @Param("0") + override var payloadSize: Int = 0 - override fun releasePayload(payload: Payload) { - payload.release() - } + @Benchmark + override fun requestResponseBlocking(bh: Blackhole) = super.requestResponseBlocking(bh) - override suspend fun doRequestResponse(): Payload = client.requestResponse(payload.retain()).awaitSingle() + @Benchmark + override fun requestResponseParallel(bh: Blackhole) = super.requestResponseParallel(bh) + + @Benchmark + override fun requestResponseConcurrent(bh: Blackhole) = super.requestResponseConcurrent(bh) + + @Benchmark + override fun requestStreamBlocking(bh: Blackhole) = super.requestStreamBlocking(bh) + + @Benchmark + override fun requestStreamParallel(bh: Blackhole) = super.requestStreamParallel(bh) + + @Benchmark + override fun requestStreamConcurrent(bh: Blackhole) = super.requestStreamConcurrent(bh) - override suspend fun doRequestStream(): Flow = client.requestStream(payload.retain()).asFlow() + @Benchmark + override fun requestChannelBlocking(bh: Blackhole) = super.requestChannelBlocking(bh) - override suspend fun doRequestChannel(): Flow = client.requestChannel(payloadsFlow.asPublisher()).asFlow() + @Benchmark + override fun requestChannelParallel(bh: Blackhole) = super.requestChannelParallel(bh) + @Benchmark + override fun requestChannelConcurrent(bh: Blackhole) = super.requestChannelConcurrent(bh) } diff --git a/benchmarks/rsocket-java/src/jvmMain/kotlin/TcpRSocketJavaBenchmark.kt b/benchmarks/rsocket-java/src/jvmMain/kotlin/TcpRSocketJavaBenchmark.kt new file mode 100644 index 00000000..f733dc44 --- /dev/null +++ b/benchmarks/rsocket-java/src/jvmMain/kotlin/TcpRSocketJavaBenchmark.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.java + +import io.rsocket.transport.* +import io.rsocket.transport.netty.client.* +import io.rsocket.transport.netty.server.* + +class TcpRSocketJavaBenchmark : RSocketJavaBenchmark() { + override val serverTransport: ServerTransport<*> = TcpServerTransport.create(9000) + override val clientTransport: ClientTransport = TcpClientTransport.create(9000) +} diff --git a/benchmarks/rsocket-java/src/jvmMain/kotlin/WsRSocketJavaBenchmark.kt b/benchmarks/rsocket-java/src/jvmMain/kotlin/WsRSocketJavaBenchmark.kt new file mode 100644 index 00000000..a618da9b --- /dev/null +++ b/benchmarks/rsocket-java/src/jvmMain/kotlin/WsRSocketJavaBenchmark.kt @@ -0,0 +1,26 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.java + +import io.rsocket.transport.* +import io.rsocket.transport.netty.client.* +import io.rsocket.transport.netty.server.* + +class WsRSocketJavaBenchmark : RSocketJavaBenchmark() { + override val serverTransport: ServerTransport<*> = WebsocketServerTransport.create(9000) + override val clientTransport: ClientTransport = WebsocketClientTransport.create(9000) +} diff --git a/benchmarks/rsocket-kotlin-0_16/build.gradle.kts b/benchmarks/rsocket-kotlin-0_16/build.gradle.kts new file mode 100644 index 00000000..fb5b7b76 --- /dev/null +++ b/benchmarks/rsocket-kotlin-0_16/build.gradle.kts @@ -0,0 +1,57 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import kotlinx.benchmark.gradle.* +import org.gradle.kotlin.dsl.benchmark +import rsocketbuild.* + +plugins { + id("rsocketbuild.multiplatform-benchmarks") +} + +kotlin { + jvmTarget() + + // no mingw because of cio + macosX64() + macosArm64() + linuxX64() + + sourceSets { + commonMain.dependencies { + implementation(projects.benchmarksShared) + implementation("io.rsocket.kotlin:rsocket-transport-local:0.16.0") + implementation("io.rsocket.kotlin:rsocket-transport-ktor-tcp:0.16.0") + } + } +} + +benchmark { + targets { + register("jvm") { + this as JvmBenchmarkTarget + jmhVersion = libs.versions.jmh.get() + } + register("macosArm64") + register("macosX64") + register("linuxX64") + } + registerBenchmarks("RSocketKotlin_0_16_", listOf("local", "ktorTcp")) { transport -> + if (transport == "local") { + param("dispatcher", "DEFAULT", "UNCONFINED") + } + } +} diff --git a/benchmarks/rsocket-kotlin-0_16/src/commonMain/kotlin/KtorTcpRSocketKotlin_0_16_Benchmark.kt b/benchmarks/rsocket-kotlin-0_16/src/commonMain/kotlin/KtorTcpRSocketKotlin_0_16_Benchmark.kt new file mode 100644 index 00000000..4ab75ea4 --- /dev/null +++ b/benchmarks/rsocket-kotlin-0_16/src/commonMain/kotlin/KtorTcpRSocketKotlin_0_16_Benchmark.kt @@ -0,0 +1,56 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.ktor.network.sockets.* +import io.rsocket.kotlin.benchmarks.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.ktor.tcp.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* +import kotlin.random.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = WARMUP, time = WARMUP_DURATION) +@Measurement(iterations = ITERATION, time = ITERATION_DURATION) +@State(Scope.Benchmark) +class KtorTcpRSocketKotlin_0_16_Benchmark : RSocketKotlin_0_16_Benchmark() { + @Param("0") + override var payloadSize: Int = 0 + + override val serverDispatcher: CoroutineDispatcher = Dispatchers.IO + + override val serverTransport: ServerTransport<*> by lazy { + TcpServerTransport(port = 9000 + Random.nextInt(100)) + } + + override suspend fun clientTransport(server: Any?): ClientTransport { + return TcpClientTransport( + (server as TcpServer).serverSocket.await().localAddress as InetSocketAddress + ) + } + + @Setup + override fun setup() { + super.setup() + } + + @TearDown + override fun cleanup() { + super.cleanup() + } +} diff --git a/benchmarks/rsocket-kotlin-0_16/src/commonMain/kotlin/LocalRSocketKotlin_0_16_Benchmark.kt b/benchmarks/rsocket-kotlin-0_16/src/commonMain/kotlin/LocalRSocketKotlin_0_16_Benchmark.kt new file mode 100644 index 00000000..32bb351c --- /dev/null +++ b/benchmarks/rsocket-kotlin-0_16/src/commonMain/kotlin/LocalRSocketKotlin_0_16_Benchmark.kt @@ -0,0 +1,51 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.rsocket.kotlin.benchmarks.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.local.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = WARMUP, time = WARMUP_DURATION) +@Measurement(iterations = ITERATION, time = ITERATION_DURATION) +@State(Scope.Benchmark) +class LocalRSocketKotlin_0_16_Benchmark : RSocketKotlin_0_16_Benchmark() { + @Param("") + var dispatcher: String = "" + + override val serverDispatcher: CoroutineDispatcher by lazy { + when (dispatcher) { + "DEFAULT" -> Dispatchers.Default + "UNCONFINED" -> Dispatchers.Unconfined + else -> error("wrong parameter 'dispatcher=$dispatcher'") + } + } + override val serverTransport: ServerTransport<*> by lazy { + LocalServerTransport() + } + + override suspend fun clientTransport(server: Any?): ClientTransport = server as LocalServer + + @Setup + override fun setup() = super.setup() + + @TearDown + override fun cleanup() = super.cleanup() +} diff --git a/benchmarks/rsocket-kotlin-0_16/src/commonMain/kotlin/RSocketKotlin_0_16_Benchmark.kt b/benchmarks/rsocket-kotlin-0_16/src/commonMain/kotlin/RSocketKotlin_0_16_Benchmark.kt new file mode 100644 index 00000000..fe3fbb12 --- /dev/null +++ b/benchmarks/rsocket-kotlin-0_16/src/commonMain/kotlin/RSocketKotlin_0_16_Benchmark.kt @@ -0,0 +1,119 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.benchmarks.* +import io.rsocket.kotlin.core.* +import io.rsocket.kotlin.payload.* +import io.rsocket.kotlin.transport.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlin.coroutines.* +import kotlin.random.* + +@State(Scope.Benchmark) +@Suppress("ClassName") +@OptIn(ExperimentalStreamsApi::class) +abstract class RSocketKotlin_0_16_Benchmark : RSocketBenchmark() { + protected abstract suspend fun clientTransport(server: Any?): ClientTransport + protected abstract val serverTransport: ServerTransport<*> + protected abstract val serverDispatcher: CoroutineDispatcher + + private val requestStrategy = PrefetchStrategy(64, 0) + + protected val benchJob = Job() + private lateinit var client: RSocket + private lateinit var payload: Payload + private lateinit var payloadsFlow: Flow + + override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload( + data = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) }), + metadata = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) }) + ) + + override fun createPayloadCopy(): Payload = payload.copy() + override fun consumePayload(bh: Blackhole, value: Payload) = bh.consume(value) + override fun releasePayload(payload: Payload) = payload.close() + + override suspend fun doRequestResponse(): Payload = client.requestResponse(createPayloadCopy()) + override fun doRequestStream(): Flow = client.requestStream(createPayloadCopy()).flowOn(requestStrategy) + override fun doRequestChannel(): Flow = client.requestChannel(createPayloadCopy(), payloadsFlow).flowOn(requestStrategy) + + override fun setup(): Unit = runBlocking { + payload = createPayload(payloadSize) + payloadsFlow = flow { repeat(5000) { emit(createPayloadCopy()) } } + + val server = RSocketServer().bindIn(CoroutineScope(benchJob + serverDispatcher), serverTransport) { + object : RSocket { + override val coroutineContext: CoroutineContext = Job() + override suspend fun requestResponse(payload: Payload): Payload { + payload.close() + return createPayloadCopy() + } + + override fun requestStream(payload: Payload): Flow { + payload.close() + return payloadsFlow + } + + override fun requestChannel(initPayload: Payload, payloads: Flow): Flow { + initPayload.close() + return payloads.flowOn(requestStrategy) + } + } + } + client = RSocketConnector().connect(clientTransport(server)) + } + + override fun cleanup(): Unit = runBlocking { + client.coroutineContext.job.cancelAndJoin() + benchJob.cancelAndJoin() + } + + @Param("0") + override var payloadSize: Int = 0 + + @Benchmark + override fun requestResponseBlocking(bh: Blackhole) = super.requestResponseBlocking(bh) + + @Benchmark + override fun requestResponseParallel(bh: Blackhole) = super.requestResponseParallel(bh) + + @Benchmark + override fun requestResponseConcurrent(bh: Blackhole) = super.requestResponseConcurrent(bh) + + @Benchmark + override fun requestStreamBlocking(bh: Blackhole) = super.requestStreamBlocking(bh) + + @Benchmark + override fun requestStreamParallel(bh: Blackhole) = super.requestStreamParallel(bh) + + @Benchmark + override fun requestStreamConcurrent(bh: Blackhole) = super.requestStreamConcurrent(bh) + + @Benchmark + override fun requestChannelBlocking(bh: Blackhole) = super.requestChannelBlocking(bh) + + @Benchmark + override fun requestChannelParallel(bh: Blackhole) = super.requestChannelParallel(bh) + + @Benchmark + override fun requestChannelConcurrent(bh: Blackhole) = super.requestChannelConcurrent(bh) +} diff --git a/benchmarks/rsocket-kotlin/build.gradle.kts b/benchmarks/rsocket-kotlin/build.gradle.kts new file mode 100644 index 00000000..7e90cabe --- /dev/null +++ b/benchmarks/rsocket-kotlin/build.gradle.kts @@ -0,0 +1,100 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import kotlinx.benchmark.gradle.* +import org.gradle.kotlin.dsl.benchmark +import rsocketbuild.* + +plugins { + id("rsocketbuild.multiplatform-benchmarks") +} + +kotlin { + jvmTarget() + + // no mingw because of cio + macosX64() + macosArm64() + linuxX64() + + sourceSets { + commonMain.dependencies { + implementation(projects.benchmarksShared) + + implementation(projects.rsocketTransportLocal) + implementation(projects.rsocketTransportKtorTcp) + implementation(projects.rsocketTransportKtorWebsocketClient) + implementation(projects.rsocketTransportKtorWebsocketServer) + + // ktor engines + implementation(libs.ktor.server.cio) + implementation(libs.ktor.client.cio) + } + jvmMain.dependencies { + implementation(projects.rsocketTransportNettyTcp) + implementation(projects.rsocketTransportNettyWebsocket) + implementation(projects.rsocketTransportNettyQuic) + implementation(libs.netty.codec.quic.map { + val javaOsName = System.getProperty("os.name") + val javaOsArch = System.getProperty("os.arch") + val suffix = when { + javaOsName.contains("mac", ignoreCase = true) -> "osx" + javaOsName.contains("linux", ignoreCase = true) -> "linux" + javaOsName.contains("windows", ignoreCase = true) -> "windows" + else -> error("Unknown os.name: $javaOsName") + } + "-" + when (javaOsArch) { + "x86_64", "amd64" -> "x86_64" + "arm64", "aarch64" -> "aarch_64" + else -> error("Unknown os.arch: $javaOsArch") + } + "$it:$suffix" + }) + implementation(libs.bouncycastle) + } + } +} + +benchmark { + targets { + register("jvm") { + this as JvmBenchmarkTarget + jmhVersion = libs.versions.jmh.get() + } + register("macosArm64") + register("macosX64") + register("linuxX64") + } + + registerBenchmarks( + "RSocketKotlin", + listOf( + "local", + "ktorTcp", +// "ktorWs", + "nettyTcp", +// "nettyWs", + "nettyQuic" + ) + ) { transport -> + if (transport == "local") { + param("dispatcher", "DEFAULT", "UNCONFINED") + param("channels", "S", "M") + } + if (transport == "ktorTcp") { + param("dispatcher", "DEFAULT", "UNCONFINED") + } + } +} diff --git a/benchmarks/rsocket-kotlin/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt new file mode 100644 index 00000000..b617c774 --- /dev/null +++ b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/KtorTcpRSocketKotlinBenchmark.kt @@ -0,0 +1,71 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.ktor.network.selector.* +import io.rsocket.kotlin.benchmarks.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.ktor.tcp.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = WARMUP, time = WARMUP_DURATION) +@Measurement(iterations = ITERATION, time = ITERATION_DURATION) +@State(Scope.Benchmark) +class KtorTcpRSocketKotlinBenchmark : RSocketKotlinBenchmark() { + + @Param("DEFAULT") + var dispatcher: String = "" + + private val dispatcherV by lazy { + when (dispatcher) { + "DEFAULT" -> Dispatchers.Default + "UNCONFINED" -> Dispatchers.Unconfined + else -> error("wrong parameter 'dispatcher=$dispatcher'") + } + } + + private val selector by lazy { + SelectorManager(Dispatchers.IO) + } + + override val serverTarget: RSocketServerTarget<*> by lazy { + KtorTcpServerTransport(benchJob) { + dispatcher(dispatcherV) + selectorManager(selector, manage = false) + }.target() + } + + override fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget { + return KtorTcpClientTransport(benchJob) { + dispatcher(dispatcherV) + selectorManager(selector, manage = false) + }.target((serverInstance as KtorTcpServerInstance).localAddress) + } + + @Setup + override fun setup() { + super.setup() + } + + @TearDown + override fun cleanup() { + super.cleanup() + selector.close() + } +} diff --git a/benchmarks/rsocket-kotlin/src/commonMain/kotlin/LocalRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/LocalRSocketKotlinBenchmark.kt new file mode 100644 index 00000000..8d91fd6d --- /dev/null +++ b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/LocalRSocketKotlinBenchmark.kt @@ -0,0 +1,65 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.rsocket.kotlin.benchmarks.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.local.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = WARMUP, time = WARMUP_DURATION) +@Measurement(iterations = ITERATION, time = ITERATION_DURATION) +@State(Scope.Benchmark) +class LocalRSocketKotlinBenchmark : RSocketKotlinBenchmark() { + @Param("S") + var channels: String = "" + + @Param("DEFAULT") + var dispatcher: String = "" + + private val dispatcherV by lazy { + when (dispatcher) { + "DEFAULT" -> Dispatchers.Default + "UNCONFINED" -> Dispatchers.Unconfined + else -> error("wrong parameter 'dispatcher=$dispatcher'") + } + } + override val serverTarget: RSocketServerTarget<*> by lazy { + LocalServerTransport(benchJob) { + dispatcher(dispatcherV) + when (channels) { + "S" -> sequential() + "M" -> multiplexed() + else -> error("wrong parameter 'channels=$channels'") + } + }.target("local") + } + + override fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget { + return LocalClientTransport(benchJob) { + dispatcher(dispatcherV) + }.target("local") + } + + @Setup + override fun setup() = super.setup() + + @TearDown + override fun cleanup() = super.cleanup() +} diff --git a/benchmarks/rsocket-kotlin/src/commonMain/kotlin/RSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/RSocketKotlinBenchmark.kt new file mode 100644 index 00000000..d1384d7f --- /dev/null +++ b/benchmarks/rsocket-kotlin/src/commonMain/kotlin/RSocketKotlinBenchmark.kt @@ -0,0 +1,113 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.rsocket.kotlin.* +import io.rsocket.kotlin.benchmarks.* +import io.rsocket.kotlin.core.* +import io.rsocket.kotlin.payload.* +import io.rsocket.kotlin.transport.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.io.* +import kotlin.random.* + +@State(Scope.Benchmark) +@OptIn(ExperimentalStreamsApi::class) +abstract class RSocketKotlinBenchmark : RSocketBenchmark() { + protected abstract fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget + protected abstract val serverTarget: RSocketServerTarget<*> + + private val requestStrategy = PrefetchStrategy(64, 0) + + protected val benchJob = Job() + private lateinit var client: RSocket + private lateinit var payload: Payload + private lateinit var payloadsFlow: Flow + + override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload( + data = Buffer().apply { write(Random.nextBytes(size / 2)) }, + metadata = Buffer().apply { write(Random.nextBytes(size / 2)) } + ) + + override fun createPayloadCopy(): Payload = payload.copy() + override fun consumePayload(bh: Blackhole, value: Payload) = bh.consume(value) + override fun releasePayload(payload: Payload) = payload.close() + + override suspend fun doRequestResponse(): Payload = client.requestResponse(createPayloadCopy()) + override fun doRequestStream(): Flow = client.requestStream(createPayloadCopy()).flowOn(requestStrategy) + override fun doRequestChannel(): Flow = client.requestChannel(createPayloadCopy(), payloadsFlow).flowOn(requestStrategy) + + override fun setup(): Unit = runBlocking { + payload = createPayload(payloadSize) + payloadsFlow = flow { repeat(5000) { emit(createPayloadCopy()) } } + + val serverInstance = RSocketServer().startServer(serverTarget) { + RSocketRequestHandler { + requestResponse { + it.close() + createPayloadCopy() + } + requestStream { + it.close() + payloadsFlow + } + requestChannel { init, payloads -> + init.close() + payloads.flowOn(requestStrategy) + } + } + } + client = RSocketConnector().connect(clientTarget(serverInstance)) + } + + override fun cleanup(): Unit = runBlocking { + client.coroutineContext.job.cancelAndJoin() + benchJob.cancelAndJoin() + } + + @Param("0") + override var payloadSize: Int = 0 + + @Benchmark + override fun requestResponseBlocking(bh: Blackhole) = super.requestResponseBlocking(bh) + + @Benchmark + override fun requestResponseParallel(bh: Blackhole) = super.requestResponseParallel(bh) + + @Benchmark + override fun requestResponseConcurrent(bh: Blackhole) = super.requestResponseConcurrent(bh) + + @Benchmark + override fun requestStreamBlocking(bh: Blackhole) = super.requestStreamBlocking(bh) + + @Benchmark + override fun requestStreamParallel(bh: Blackhole) = super.requestStreamParallel(bh) + + @Benchmark + override fun requestStreamConcurrent(bh: Blackhole) = super.requestStreamConcurrent(bh) + + @Benchmark + override fun requestChannelBlocking(bh: Blackhole) = super.requestChannelBlocking(bh) + + @Benchmark + override fun requestChannelParallel(bh: Blackhole) = super.requestChannelParallel(bh) + + @Benchmark + override fun requestChannelConcurrent(bh: Blackhole) = super.requestChannelConcurrent(bh) +} diff --git a/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyQuicRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyQuicRSocketKotlinBenchmark.kt new file mode 100644 index 00000000..14db5c2f --- /dev/null +++ b/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyQuicRSocketKotlinBenchmark.kt @@ -0,0 +1,79 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.ktor.network.selector.* +import io.netty.channel.nio.* +import io.netty.handler.ssl.util.* +import io.netty.incubator.codec.quic.* +import io.rsocket.kotlin.benchmarks.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.ktor.tcp.* +import io.rsocket.kotlin.transport.netty.quic.* +import io.rsocket.kotlin.transport.netty.tcp.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = WARMUP, time = WARMUP_DURATION) +@Measurement(iterations = ITERATION, time = ITERATION_DURATION) +@State(Scope.Benchmark) +class NettyQuicRSocketKotlinBenchmark : RSocketKotlinBenchmark() { + private val certificates = SelfSignedCertificate() + + private val protos = arrayOf("hq-29") + + private val sharedGroup by lazy { + NioEventLoopGroup() + } + + override val serverTarget: RSocketServerTarget<*> by lazy { + NettyQuicServerTransport(benchJob) { + eventLoopGroup(sharedGroup, manage = false) + ssl { + keyManager(certificates.privateKey(), null, certificates.certificate()) + applicationProtocols(*protos) + } + codec { + tokenHandler(InsecureQuicTokenHandler.INSTANCE) + } + }.target("127.0.0.1") + } + + override fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget { + return NettyQuicClientTransport(benchJob) { + eventLoopGroup(sharedGroup, manage = false) + ssl { + trustManager(InsecureTrustManagerFactory.INSTANCE) + applicationProtocols(*protos) + } + }.target( + (serverInstance as NettyQuicServerInstance).localAddress + ) + } + + @Setup + override fun setup() { + super.setup() + } + + @TearDown + override fun cleanup() { + super.cleanup() + sharedGroup.shutdownGracefully().await(1000) + } +} diff --git a/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyTcpRSocketKotlinBenchmark.kt b/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyTcpRSocketKotlinBenchmark.kt new file mode 100644 index 00000000..5c03df13 --- /dev/null +++ b/benchmarks/rsocket-kotlin/src/jvmMain/kotlin/NettyTcpRSocketKotlinBenchmark.kt @@ -0,0 +1,52 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks.kotlin + +import io.ktor.network.selector.* +import io.netty.channel.nio.* +import io.rsocket.kotlin.benchmarks.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.ktor.tcp.* +import io.rsocket.kotlin.transport.netty.tcp.* +import kotlinx.benchmark.* +import kotlinx.coroutines.* + +@BenchmarkMode(Mode.Throughput) +@Warmup(iterations = WARMUP, time = WARMUP_DURATION) +@Measurement(iterations = ITERATION, time = ITERATION_DURATION) +@State(Scope.Benchmark) +class NettyTcpRSocketKotlinBenchmark : RSocketKotlinBenchmark() { + override val serverTarget: RSocketServerTarget<*> by lazy { + NettyTcpServerTransport(benchJob).target() + } + + override fun clientTarget(serverInstance: RSocketServerInstance): RSocketClientTarget { + return NettyTcpClientTransport(benchJob).target( + (serverInstance as NettyTcpServerInstance).localAddress + ) + } + + @Setup + override fun setup() { + super.setup() + } + + @TearDown + override fun cleanup() { + super.cleanup() + } +} diff --git a/benchmarks/shared/build.gradle.kts b/benchmarks/shared/build.gradle.kts new file mode 100644 index 00000000..09089f0b --- /dev/null +++ b/benchmarks/shared/build.gradle.kts @@ -0,0 +1,37 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import rsocketbuild.* + +plugins { + id("rsocketbuild.multiplatform-base") +} + +kotlin { + jvmTarget() + + // no mingw because of cio + macosX64() + macosArm64() + linuxX64() + + sourceSets { + commonMain.dependencies { + api(libs.kotlinx.coroutines.core) + api(libs.kotlinx.benchmark) + } + } +} diff --git a/benchmarks/shared/src/commonMain/kotlin/RSocketBenchmark.kt b/benchmarks/shared/src/commonMain/kotlin/RSocketBenchmark.kt new file mode 100644 index 00000000..2396eea0 --- /dev/null +++ b/benchmarks/shared/src/commonMain/kotlin/RSocketBenchmark.kt @@ -0,0 +1,107 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.benchmarks + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* + +const val ITERATION = 5 +const val ITERATION_DURATION = 3 + +const val WARMUP = 5 +const val WARMUP_DURATION = 3 + +abstract class RSocketBenchmark { + + // payload operations + abstract val payloadSize: Int + abstract fun createPayload(size: Int): Payload + abstract fun createPayloadCopy(): Payload + abstract fun releasePayload(payload: Payload) + abstract fun consumePayload(bh: Blackhole, value: Payload) + private fun usePayload(bh: Blackhole, payload: Payload) { + releasePayload(payload) + consumePayload(bh, payload) + } + + // lifecycle + + abstract fun setup() + abstract fun cleanup() + + // benchmarks + + // Benchmark annotation doesn't inherit on jvm + open fun requestResponseBlocking(bh: Blackhole) = blocking(bh, 1000, ::requestResponse) + open fun requestResponseParallel(bh: Blackhole) = parallel(bh, 1000, ::requestResponse) + open fun requestResponseConcurrent(bh: Blackhole) = concurrent(bh, 1000, ::requestResponse) + + open fun requestStreamBlocking(bh: Blackhole) = blocking(bh, 100, ::requestStream) + open fun requestStreamParallel(bh: Blackhole) = parallel(bh, 100, ::requestStream) + open fun requestStreamConcurrent(bh: Blackhole) = concurrent(bh, 100, ::requestStream) + + open fun requestChannelBlocking(bh: Blackhole) = blocking(bh, 10, ::requestChannel) + open fun requestChannelParallel(bh: Blackhole) = parallel(bh, 10, ::requestChannel) + open fun requestChannelConcurrent(bh: Blackhole) = concurrent(bh, 10, ::requestChannel) + + // operations + + abstract suspend fun doRequestResponse(): Payload + abstract fun doRequestStream(): Flow + abstract fun doRequestChannel(): Flow + + private suspend fun requestResponse(bh: Blackhole) { + doRequestResponse().also { usePayload(bh, it) } + } + + private suspend fun requestStream(bh: Blackhole) { + doRequestStream().collect { usePayload(bh, it) } + } + + private suspend fun requestChannel(bh: Blackhole) { + doRequestChannel().collect { usePayload(bh, it) } + } + + // execution strategies + + // plain blocking + private inline fun blocking( + bh: Blackhole, + p: Int, + crossinline block: suspend (bh: Blackhole) -> Unit, + ): Unit = runBlocking { + repeat(p) { block(bh) } + } + + // Run every request in a separate coroutine which will be dispatched on Default dispatcher (thread amount = cores amount) + private inline fun parallel( + bh: Blackhole, + p: Int, + crossinline block: suspend (bh: Blackhole) -> Unit, + ): Unit = runBlocking(Dispatchers.Default) { + repeat(p) { launch { block(bh) } } + } + + // Run every request in separate coroutine, but on single thread dispatcher + private inline fun concurrent( + bh: Blackhole, + p: Int, + crossinline block: suspend (bh: Blackhole) -> Unit, + ): Unit = runBlocking { + repeat(p) { launch { block(bh) } } + } +} diff --git a/benchmarks/src/jvmMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketBenchmark.kt b/benchmarks/src/jvmMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketBenchmark.kt deleted file mode 100644 index c00ae1ac..00000000 --- a/benchmarks/src/jvmMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketBenchmark.kt +++ /dev/null @@ -1,138 +0,0 @@ -/* - * Copyright 2015-2022 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -@file:OptIn(DelicateCoroutinesApi::class) - -package io.rsocket.kotlin.benchmarks - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import org.openjdk.jmh.annotations.* -import org.openjdk.jmh.infra.* -import java.util.concurrent.locks.* - -@BenchmarkMode(Mode.Throughput) -@Fork(value = 2) -@Warmup(iterations = 5, time = 5) -@Measurement(iterations = 5, time = 5) -@State(Scope.Benchmark) -abstract class RSocketBenchmark { - - @Param("0", "64", "1024", "131072", "1048576", "15728640") - var payloadSize: Int = 0 - - @Setup - abstract fun setup() - - @TearDown - abstract fun cleanup() - - @TearDown(Level.Iteration) - fun awaitToBeConsumed() { - LockSupport.parkNanos(2000) - } - - abstract fun createPayload(size: Int): Payload - - abstract fun releasePayload(payload: Payload) - - abstract suspend fun doRequestResponse(): Payload - - abstract suspend fun doRequestStream(): Flow - - abstract suspend fun doRequestChannel(): Flow - - - @Benchmark - fun requestResponseBlocking(bh: Blackhole) = blocking(bh, ::requestResponse) - - @Benchmark - fun requestResponseParallel(bh: Blackhole) = parallel(bh, 1000, ::requestResponse) - - @Benchmark - fun requestResponseConcurrent(bh: Blackhole) = concurrent(bh, 1000, ::requestResponse) - - - @Benchmark - fun requestStreamBlocking(bh: Blackhole) = blocking(bh, ::requestStream) - - @Benchmark - fun requestStreamParallel(bh: Blackhole) = parallel(bh, 10, ::requestStream) - - @Benchmark - fun requestStreamConcurrent(bh: Blackhole) = concurrent(bh, 10, ::requestStream) - - - @Benchmark - fun requestChannelBlocking(bh: Blackhole) = blocking(bh, ::requestChannel) - - @Benchmark - fun requestChannelParallel(bh: Blackhole) = parallel(bh, 10, ::requestChannel) - - @Benchmark - fun requestChannelConcurrent(bh: Blackhole) = concurrent(bh, 10, ::requestChannel) - - - private suspend fun requestResponse(bh: Blackhole) { - doRequestResponse().also { - releasePayload(it) - bh.consume(it) - } - } - - private suspend fun requestStream(bh: Blackhole) { - doRequestStream().collect { - releasePayload(it) - bh.consume(it) - } - } - - private suspend fun requestChannel(bh: Blackhole) { - doRequestChannel().collect { - releasePayload(it) - bh.consume(it) - } - } - - //plain blocking - private inline fun blocking(bh: Blackhole, crossinline block: suspend (bh: Blackhole) -> Unit): Unit = runBlocking { - block(bh) - } - - //Run every request in separate coroutine which will be dispatched on Default dispatcher (threads amount = cores amount) - private inline fun parallel(bh: Blackhole, p: Int, crossinline block: suspend (bh: Blackhole) -> Unit): Unit = - runBlocking { - (0..p).map { - GlobalScope.async { block(bh) } - }.awaitAll() - } - - //Run every request in separate coroutine, but on single thread dispatcher: - // - do request 1 - // - suspend on awaiting of result 1 - // - do request 2 - // - suspend on awaiting of result 2 - // - receive result on request 1 - // - receive result on request 2 - // - .... - //working with requests is single threaded but concurrent - private inline fun concurrent(bh: Blackhole, p: Int, crossinline block: suspend (bh: Blackhole) -> Unit): Unit = - runBlocking { - (0..p).map { - async { block(bh) } - }.awaitAll() - } -} diff --git a/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt b/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt deleted file mode 100644 index 310d0078..00000000 --- a/benchmarks/src/kotlinMain/kotlin/io/rsocket/kotlin/benchmarks/RSocketKotlinBenchmark.kt +++ /dev/null @@ -1,87 +0,0 @@ -/* - * Copyright 2015-2022 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.rsocket.kotlin.benchmarks - -import io.ktor.utils.io.core.* -import io.rsocket.kotlin.* -import io.rsocket.kotlin.core.* -import io.rsocket.kotlin.payload.* -import io.rsocket.kotlin.transport.local.* -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import kotlin.random.* - -@OptIn(ExperimentalStreamsApi::class, DelicateCoroutinesApi::class) -class RSocketKotlinBenchmark : RSocketBenchmark() { - private val requestStrategy = PrefetchStrategy(64, 0) - - private val benchJob = Job() - lateinit var client: RSocket - - lateinit var payload: Payload - lateinit var payloadsFlow: Flow - - fun payloadCopy(): Payload = payload.copy() - - override fun setup() { - payload = createPayload(payloadSize) - payloadsFlow = flow { repeat(5000) { emit(payloadCopy()) } } - val server = RSocketServer().bindIn(CoroutineScope(benchJob + Dispatchers.Unconfined), LocalServerTransport()) { - RSocketRequestHandler { - requestResponse { - it.close() - payloadCopy() - } - requestStream { - it.close() - payloadsFlow - } - requestChannel { init, payloads -> - init.close() - payloads.flowOn(requestStrategy) - } - } - } - client = runBlocking { - RSocketConnector().connect(server) - } - } - - override fun cleanup() { - runBlocking { - client.coroutineContext.job.cancelAndJoin() - benchJob.cancelAndJoin() - } - } - - override fun createPayload(size: Int): Payload = if (size == 0) Payload.Empty else Payload( - data = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) }), - metadata = ByteReadPacket(ByteArray(size / 2).also { Random.nextBytes(it) }) - ) - - override fun releasePayload(payload: Payload) { - payload.close() - } - - override suspend fun doRequestResponse(): Payload = client.requestResponse(payloadCopy()) - - override suspend fun doRequestStream(): Flow = client.requestStream(payloadCopy()).flowOn(requestStrategy) - - override suspend fun doRequestChannel(): Flow = - client.requestChannel(payloadCopy(), payloadsFlow).flowOn(requestStrategy) - -} diff --git a/build-logic/build.gradle.kts b/build-logic/build.gradle.kts index c1efa7d6..062d04bd 100644 --- a/build-logic/build.gradle.kts +++ b/build-logic/build.gradle.kts @@ -20,6 +20,8 @@ plugins { dependencies { implementation(libs.kotlin.gradle.plugin) + implementation(libs.kotlin.allopen.gradle.plugin) implementation(libs.kotlinx.bcv.gradle.plugin) + implementation(libs.kotlinx.benchmark.gradle.plugin) implementation(libs.maven.publish.gradle.plugin) } diff --git a/build-logic/src/main/kotlin/rsocketbuild.multiplatform-benchmarks.gradle.kts b/build-logic/src/main/kotlin/rsocketbuild.multiplatform-benchmarks.gradle.kts new file mode 100644 index 00000000..a2ddd84b --- /dev/null +++ b/build-logic/src/main/kotlin/rsocketbuild.multiplatform-benchmarks.gradle.kts @@ -0,0 +1,25 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { + id("rsocketbuild.multiplatform-base") + kotlin("plugin.allopen") + id("org.jetbrains.kotlinx.benchmark") +} + +allOpen { + annotation("org.openjdk.jmh.annotations.State") +} diff --git a/build-logic/src/main/kotlin/rsocketbuild/benchmarks.kt b/build-logic/src/main/kotlin/rsocketbuild/benchmarks.kt new file mode 100644 index 00000000..18ee8301 --- /dev/null +++ b/build-logic/src/main/kotlin/rsocketbuild/benchmarks.kt @@ -0,0 +1,68 @@ +/* + * Copyright 2015-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rsocketbuild + +import kotlinx.benchmark.gradle.* +import org.gradle.kotlin.dsl.support.* + +fun BenchmarksExtension.registerBenchmarks( + implName: String, + transports: List, + customize: BenchmarkConfiguration.(transport: String) -> Unit = {}, +) { + fun register( + operation: String, + transport: String, + format: String, + kind: String, + ) { + configurations.register( + listOf(kind, format, transport, operation).joinToString("", transform = String::uppercaseFirstChar) + ) { + reportFormat = format + when (kind) { + "fast" -> param("payloadSize", "0") + "full" -> param("payloadSize", "0", "64", "4096") + else -> error("wrong `kind`=$kind") + } + + include("${transport.uppercaseFirstChar()}${implName}Benchmark.${operation}Blocking") + include("${transport.uppercaseFirstChar()}${implName}Benchmark.${operation}Parallel") + include("${transport.uppercaseFirstChar()}${implName}Benchmark.${operation}Concurrent") + + customize(transport) + } + } + + val operations = listOf("requestResponse", "requestStream", "requestChannel") + val formats = listOf("text", "csv", "json") + val kinds = listOf("fast", "full") + transports.forEach { transport -> + operations.forEach { operation -> + formats.forEach { format -> + kinds.forEach { kind -> + register(operation, transport, format, kind) + } + } + } + } +} + +//if (transport == "local") { +// param("dispatcher", "DEFAULT", "UNCONFINED") +//// param("channels", "S", "M") +//} \ No newline at end of file diff --git a/build-settings/src/main/kotlin/rsocketsettings.repositories.settings.gradle.kts b/build-settings/src/main/kotlin/rsocketsettings.repositories.settings.gradle.kts index 60646f00..500f2b97 100644 --- a/build-settings/src/main/kotlin/rsocketsettings.repositories.settings.gradle.kts +++ b/build-settings/src/main/kotlin/rsocketsettings.repositories.settings.gradle.kts @@ -26,5 +26,6 @@ pluginManagement { dependencyResolutionManagement { repositories { mavenCentral() + gradlePluginPortal() } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 32d551fc..aa33af56 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -4,7 +4,7 @@ kotlin = "2.1.0" kotlinx-io = "0.6.0" kotlinx-atomicfu = "0.26.0" kotlinx-coroutines = "1.9.0" -kotlinx-benchmark = "0.4.8" +kotlinx-benchmark = "0.4.13" kotlinx-bcv = "0.16.3" ktor = "3.0.2" @@ -17,9 +17,9 @@ bouncycastle = "1.79" turbine = "1.2.0" -rsocket-java = "1.1.3" +rsocket-java = "1.1.4" -jmh = "1.36" +jmh = "1.37" maven-publish = "0.30.0" @@ -57,13 +57,17 @@ turbine = { module = "app.cash.turbine:turbine", version.ref = "turbine" } rsocket-java-core = { module = 'io.rsocket:rsocket-core', version.ref = "rsocket-java" } rsocket-java-transport-local = { module = 'io.rsocket:rsocket-transport-local', version.ref = "rsocket-java" } +rsocket-java-transport-netty = { module = 'io.rsocket:rsocket-transport-netty', version.ref = "rsocket-java" } kotlin-gradle-plugin = { module = "org.jetbrains.kotlin:kotlin-gradle-plugin", version.ref = "kotlin" } +kotlin-allopen-gradle-plugin = { module = "org.jetbrains.kotlin:kotlin-allopen", version.ref = "kotlin" } kotlinx-atomicfu-gradle-plugin = { module = "org.jetbrains.kotlinx:atomicfu-gradle-plugin", version.ref = "kotlinx-atomicfu" } kotlinx-bcv-gradle-plugin = { module = "org.jetbrains.kotlinx:binary-compatibility-validator", version.ref = "kotlinx-bcv" } +kotlinx-benchmark-gradle-plugin = { module = "org.jetbrains.kotlinx:kotlinx-benchmark-plugin", version.ref = "kotlinx-benchmark" } maven-publish-gradle-plugin = { module = "com.vanniktech:gradle-maven-publish-plugin", version.ref = "maven-publish" } [plugins] kotlin-multiplatform = { id = "org.jetbrains.kotlin.multiplatform", version.ref = "kotlin" } +kotlin-plugin-allopen = { id = "org.jetbrains.kotlin.plugin.allopen", version.ref = "kotlin" } kotlinx-benchmark = { id = "org.jetbrains.kotlinx.benchmark", version.ref = "kotlinx-benchmark" } kotlinx-atomicfu = { id = "org.jetbrains.kotlinx.atomicfu", version.ref = "kotlinx-atomicfu" } \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index e278615f..c10d3f1c 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -56,4 +56,11 @@ projects("rsocket-kotlin") { module("rsocket-ktor-client") module("rsocket-ktor-server") } + + folder("benchmarks") { + module("shared") + module("rsocket-kotlin-0_16") + module("rsocket-kotlin") + module("rsocket-java") + } }