Skip to content

Commit

Permalink
Reworked streams API using coroutine context element RequestStrategy …
Browse files Browse the repository at this point in the history
…for better control over requestN frames
  • Loading branch information
whyoleg committed Nov 25, 2020
1 parent a56c2ea commit b34de79
Show file tree
Hide file tree
Showing 20 changed files with 313 additions and 189 deletions.
17 changes: 8 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,9 @@ From [RSocket protocol](https://github.com/rsocket/rsocket/blob/master/Protocol.
This is a credit-based model where the Requester grants the Responder credit for the number of PAYLOADs it can send.
It is sometimes referred to as "request-n" or "request(n)".

`kotlinx.coroutines` doesn't truly support `request(n)` semantic, but it has `Flow.buffer(n)` operator
which can be used to achieve something similar:
`kotlinx.coroutines` doesn't truly support `request(n)` semantic, but it has flexible `CoroutineContext`
which can be used to achieve something similar. `rsocket-kotlin` contains `RequestStrategy` coroutine context element, which defines,
strategy for sending of `requestN` frames.

Example:

Expand All @@ -220,13 +221,11 @@ val client: RSocket = TODO()
//and stream
val stream: Flow<Payload> = client.requestStream(Payload("data"))

//now we can use buffer to tell underlying transport to request values in chunks
val bufferedStream: Flow<Payload> = stream.buffer(10) //here buffer is 10, if `buffer` operator is not used buffer is by default 64

//now you can collect as any other `Flow`
//just after collection first request for 10 elements will be sent
//after 10 elements collected, 10 more elements will be requested, and so on
bufferedStream.collect { payload: Payload ->
//now we can use `flowOn` to add request strategy to context of flow
//here we use prefetch strategy which will send requestN for 10 elements, when, there is 5 elements left to collect
//so on call `collect`, requestStream frame with requestN will be sent, and then, after 5 elements will be collected
//new requestN with 5 will be sent, so collect will be smooth
stream.flowOn(PrefetchStrategy(requestSize = 10, requestOn = 5)).collect { payload: Payload ->
println(payload.data.readText())
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import java.util.concurrent.locks.*

@BenchmarkMode(Mode.Throughput)
@Fork(value = 2)
@Warmup(iterations = 10, time = 10)
@Measurement(iterations = 7, time = 10)
@Warmup(iterations = 5, time = 5)
@Measurement(iterations = 5, time = 5)
@State(Scope.Benchmark)
abstract class RSocketBenchmark<Payload : Any> {

Expand All @@ -40,7 +40,7 @@ abstract class RSocketBenchmark<Payload : Any> {

@TearDown(Level.Iteration)
fun awaitToBeConsumed() {
LockSupport.parkNanos(5000)
LockSupport.parkNanos(2000)
}

abstract fun createPayload(size: Int): Payload
Expand All @@ -58,10 +58,10 @@ abstract class RSocketBenchmark<Payload : Any> {
fun requestResponseBlocking(bh: Blackhole) = blocking(bh, ::requestResponse)

@Benchmark
fun requestResponseParallel(bh: Blackhole) = parallel(bh, 500, ::requestResponse)
fun requestResponseParallel(bh: Blackhole) = parallel(bh, 1000, ::requestResponse)

@Benchmark
fun requestResponseConcurrent(bh: Blackhole) = concurrent(bh, 500, ::requestResponse)
fun requestResponseConcurrent(bh: Blackhole) = concurrent(bh, 1000, ::requestResponse)


@Benchmark
Expand All @@ -78,10 +78,10 @@ abstract class RSocketBenchmark<Payload : Any> {
fun requestChannelBlocking(bh: Blackhole) = blocking(bh, ::requestChannel)

@Benchmark
fun requestChannelParallel(bh: Blackhole) = parallel(bh, 3, ::requestChannel)
fun requestChannelParallel(bh: Blackhole) = parallel(bh, 10, ::requestChannel)

@Benchmark
fun requestChannelConcurrent(bh: Blackhole) = concurrent(bh, 3, ::requestChannel)
fun requestChannelConcurrent(bh: Blackhole) = concurrent(bh, 10, ::requestChannel)


private suspend fun requestResponse(bh: Blackhole) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,37 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.*

@OptIn(ExperimentalStreamsApi::class, ExperimentalCoroutinesApi::class)
class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
private val requestStrategy = PrefetchStrategy(64, 0)

lateinit var client: RSocket
lateinit var server: Job

lateinit var payload: Payload
lateinit var payloadsFlow: Flow<Payload>

fun payloadCopy(): Payload = payload.copy()

override fun setup() {
payload = createPayload(payloadSize)
payloadsFlow = flow { repeat(5000) { emit(payload.copy()) } }
payloadsFlow = flow { repeat(5000) { emit(payloadCopy()) } }

val localServer = LocalServer()
server = RSocketServer().bind(localServer) {
RSocketRequestHandler {
requestResponse {
it.release()
payload
payloadCopy()
}
requestStream {
it.release()
payloadsFlow
}
requestChannel { it }
requestChannel { it.flowOn(requestStrategy) }
}
}
return runBlocking {
client = runBlocking {
RSocketConnector().connect(localServer)
}
}
Expand All @@ -72,10 +76,10 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
payload.release()
}

override suspend fun doRequestResponse(): Payload = client.requestResponse(payload.copy())
override suspend fun doRequestResponse(): Payload = client.requestResponse(payloadCopy())

override suspend fun doRequestStream(): Flow<Payload> = client.requestStream(payload.copy())
override suspend fun doRequestStream(): Flow<Payload> = client.requestStream(payloadCopy()).flowOn(requestStrategy)

override suspend fun doRequestChannel(): Flow<Payload> = client.requestChannel(payloadsFlow)
override suspend fun doRequestChannel(): Flow<Payload> = client.requestChannel(payloadsFlow).flowOn(requestStrategy)

}
7 changes: 5 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ subprojects {
extensions.configure<KotlinMultiplatformExtension> {
val isTestProject = project.name == "rsocket-test"
val isLibProject = project.name.startsWith("rsocket")
val isPlaygroundProject = project.name == "playground"
val isExampleProject = "examples" in project.path

sourceSets.all {
languageSettings.apply {
Expand All @@ -206,7 +208,7 @@ subprojects {

useExperimentalAnnotation("kotlin.RequiresOptIn")

if (name.contains("test", ignoreCase = true) || isTestProject) {
if (name.contains("test", ignoreCase = true) || isTestProject || isPlaygroundProject) {
useExperimentalAnnotation("kotlin.time.ExperimentalTime")
useExperimentalAnnotation("kotlin.ExperimentalStdlibApi")

Expand All @@ -221,6 +223,7 @@ subprojects {

useExperimentalAnnotation("io.rsocket.kotlin.TransportApi")
useExperimentalAnnotation("io.rsocket.kotlin.ExperimentalMetadataApi")
useExperimentalAnnotation("io.rsocket.kotlin.ExperimentalStreamsApi")
}
}
}
Expand All @@ -233,7 +236,7 @@ subprojects {
}

//fix atomicfu for examples and playground
if ("examples" in project.path || project.name == "playground") {
if (isExampleProject || isPlaygroundProject) {
sourceSets["commonMain"].dependencies {
implementation("org.jetbrains.kotlinx:atomicfu:$kotlinxAtomicfuVersion")
}
Expand Down
5 changes: 2 additions & 3 deletions examples/interactions/src/jvmMain/kotlin/ReconnectExample.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.local.*
import kotlinx.atomicfu.*
Expand Down Expand Up @@ -62,7 +61,7 @@ fun main(): Unit = runBlocking {

//do request
try {
rSocket.requestStream(Payload("Hello", "World")).buffer(3).collect {
rSocket.requestStream(Payload("Hello", "World")).flowOn(PrefetchStrategy(3, 0)).collect {
val index = it.data.readText().substringAfter("Payload: ").toInt()
println("Client receives index: $index")
}
Expand All @@ -72,7 +71,7 @@ fun main(): Unit = runBlocking {

//do request just after it

rSocket.requestStream(Payload("Hello", "World")).buffer(3).take(3).collect {
rSocket.requestStream(Payload("Hello", "World")).flowOn(PrefetchStrategy(3, 0)).take(3).collect {
val index = it.data.readText().substringAfter("Payload: ").toInt()
println("Client receives index: $index after reconnection")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.local.*
import kotlinx.coroutines.*
Expand Down Expand Up @@ -62,7 +61,7 @@ fun main(): Unit = runBlocking {
})

//do request
rSocket.requestStream(Payload("Hello", "World")).buffer(3).take(3).collect {
rSocket.requestStream(Payload("Hello", "World")).flowOn(PrefetchStrategy(3, 0)).take(3).collect {
val index = it.data.readText().substringAfter("Payload: ").toInt()
println("Client receives index: $index")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.local.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
Expand All @@ -27,7 +26,7 @@ fun main(): Unit = runBlocking {
RSocketServer().bind(server) {
RSocketRequestHandler {
requestChannel { request ->
request.buffer(3).take(3).flatMapConcat { payload ->
request.flowOn(PrefetchStrategy(3, 0)).take(3).flatMapConcat { payload ->
val data = payload.data.readText()
flow {
repeat(3) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.local.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
Expand Down Expand Up @@ -44,7 +43,7 @@ fun main(): Unit = runBlocking {
val response = rSocket.requestStream(Payload("Hello", "World"))

response
.buffer(2) //use buffer as first operator to use RequestN semantic, so request by 2 elements
.flowOn(PrefetchStrategy(2, 0))
.map { it.data.readText().substringAfter("Payload: ").toInt() }
.take(2)
.collect {
Expand Down
10 changes: 6 additions & 4 deletions playground/src/commonMain/kotlin/Stub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ suspend fun RSocket.doSomething() {
// launch { rSocket.fireAndForget(Payload(byteArrayOf(1, 1, 1), byteArrayOf(2, 2, 2))) }
// launch { rSocket.metadataPush(byteArrayOf(1, 2, 3)) }
var i = 0
requestStream(buildPayload {
data(byteArrayOf(1, 1, 1))
metadata(byteArrayOf(2, 2, 2))
}).buffer(10000).collect {
requestStream(
buildPayload {
data(byteArrayOf(1, 1, 1))
metadata(byteArrayOf(2, 2, 2))
}
).flowOn(PrefetchStrategy(10000, 0)).collect {
println(it.data.readBytes().contentToString())
if (++i == 10000) error("")
}
Expand Down
51 changes: 51 additions & 0 deletions playground/src/commonMain/kotlin/streams.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import io.rsocket.kotlin.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

@ExperimentalStreamsApi
private suspend fun s() {
val flow = flow {
val strategy = coroutineContext[RequestStrategy]!!.provide()
var i = strategy.firstRequest()
println("INIT: $i")
var r = 0
while (i > 0) {
emit(r++)
val n = strategy.nextRequest()
println("")
if (n > 0) i += n
i--
}
}

flow.flowOn(PrefetchStrategy(64, 16)).onEach { println(it) }.launchIn(GlobalScope)

val ch = Channel<Int>()

flow.flowOn(ChannelStrategy(ch)).onEach { println(it) }.launchIn(GlobalScope)

delay(100)
ch.send(5)
delay(100)
ch.send(5)
delay(100)
ch.send(5)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ package io.rsocket.kotlin
@RequiresOptIn(
level = RequiresOptIn.Level.WARNING,
message = "This is an API which is used to implement transport for RSocket, such as WS or TCP. " +
"This API can change in future in non backwards-incompatible manner."
"This API can change in future in non backwards-compatible manner."
)
public annotation class TransportApi

@Retention(value = AnnotationRetention.BINARY)
@RequiresOptIn(
level = RequiresOptIn.Level.WARNING,
message = "This is an API to work with metadata. This API can change in future in non backwards-incompatible manner."
message = "This is an API to work with metadata. This API can change in future in non backwards-compatible manner."
)
public annotation class ExperimentalMetadataApi

@Retention(value = AnnotationRetention.BINARY)
@RequiresOptIn(
level = RequiresOptIn.Level.WARNING,
message = "This is an API to customize request strategy of streams. This API can change in future in non backwards-compatible manner."
)
public annotation class ExperimentalStreamsApi
Loading

0 comments on commit b34de79

Please sign in to comment.