Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streams with flexible requestN semantic #118

Merged
merged 2 commits into from
Dec 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,9 @@ From [RSocket protocol](https://github.com/rsocket/rsocket/blob/master/Protocol.
This is a credit-based model where the Requester grants the Responder credit for the number of PAYLOADs it can send.
It is sometimes referred to as "request-n" or "request(n)".

`kotlinx.coroutines` doesn't truly support `request(n)` semantic, but it has `Flow.buffer(n)` operator
which can be used to achieve something similar:
`kotlinx.coroutines` doesn't truly support `request(n)` semantic, but it has flexible `CoroutineContext`
which can be used to achieve something similar. `rsocket-kotlin` contains `RequestStrategy` coroutine context element, which defines,
strategy for sending of `requestN` frames.

Example:

Expand All @@ -220,13 +221,11 @@ val client: RSocket = TODO()
//and stream
val stream: Flow<Payload> = client.requestStream(Payload("data"))

//now we can use buffer to tell underlying transport to request values in chunks
val bufferedStream: Flow<Payload> = stream.buffer(10) //here buffer is 10, if `buffer` operator is not used buffer is by default 64

//now you can collect as any other `Flow`
//just after collection first request for 10 elements will be sent
//after 10 elements collected, 10 more elements will be requested, and so on
bufferedStream.collect { payload: Payload ->
//now we can use `flowOn` to add request strategy to context of flow
//here we use prefetch strategy which will send requestN for 10 elements, when, there is 5 elements left to collect
//so on call `collect`, requestStream frame with requestN will be sent, and then, after 5 elements will be collected
//new requestN with 5 will be sent, so collect will be smooth
stream.flowOn(PrefetchStrategy(requestSize = 10, requestOn = 5)).collect { payload: Payload ->
println(payload.data.readText())
}
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import java.util.concurrent.locks.*

@BenchmarkMode(Mode.Throughput)
@Fork(value = 2)
@Warmup(iterations = 10, time = 10)
@Measurement(iterations = 7, time = 10)
@Warmup(iterations = 5, time = 5)
@Measurement(iterations = 5, time = 5)
@State(Scope.Benchmark)
abstract class RSocketBenchmark<Payload : Any> {

Expand All @@ -40,7 +40,7 @@ abstract class RSocketBenchmark<Payload : Any> {

@TearDown(Level.Iteration)
fun awaitToBeConsumed() {
LockSupport.parkNanos(5000)
LockSupport.parkNanos(2000)
}

abstract fun createPayload(size: Int): Payload
Expand All @@ -58,10 +58,10 @@ abstract class RSocketBenchmark<Payload : Any> {
fun requestResponseBlocking(bh: Blackhole) = blocking(bh, ::requestResponse)

@Benchmark
fun requestResponseParallel(bh: Blackhole) = parallel(bh, 500, ::requestResponse)
fun requestResponseParallel(bh: Blackhole) = parallel(bh, 1000, ::requestResponse)

@Benchmark
fun requestResponseConcurrent(bh: Blackhole) = concurrent(bh, 500, ::requestResponse)
fun requestResponseConcurrent(bh: Blackhole) = concurrent(bh, 1000, ::requestResponse)


@Benchmark
Expand All @@ -78,10 +78,10 @@ abstract class RSocketBenchmark<Payload : Any> {
fun requestChannelBlocking(bh: Blackhole) = blocking(bh, ::requestChannel)

@Benchmark
fun requestChannelParallel(bh: Blackhole) = parallel(bh, 3, ::requestChannel)
fun requestChannelParallel(bh: Blackhole) = parallel(bh, 10, ::requestChannel)

@Benchmark
fun requestChannelConcurrent(bh: Blackhole) = concurrent(bh, 3, ::requestChannel)
fun requestChannelConcurrent(bh: Blackhole) = concurrent(bh, 10, ::requestChannel)


private suspend fun requestResponse(bh: Blackhole) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,37 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.random.*

@OptIn(ExperimentalStreamsApi::class, ExperimentalCoroutinesApi::class)
class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
private val requestStrategy = PrefetchStrategy(64, 0)

lateinit var client: RSocket
lateinit var server: Job

lateinit var payload: Payload
lateinit var payloadsFlow: Flow<Payload>

fun payloadCopy(): Payload = payload.copy()
Copy link
Member

@yschimke yschimke Nov 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Outside this review...

This sort of pattern, reuse of a payload probably deserves some nicer more natural mechanism. This is likely a source of bugs under error conditions which will be missed on the happy path. How do we improve this? e.g. payload.permanent()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and how payload.permanent() will work? have just possibility to reuse data? and release after all?
@altavir has much experience working with kotlin with IO, and now researching adding rsocket-kotlin to his own projects. Maybe hi have some ideas on better payload handling?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just from my side. We do allocate a ByteBuf once, and then retain it every time we use it in benchmarks. Sounds like a copy but not really, thus not sure Kotlin IO can do the same


override fun setup() {
payload = createPayload(payloadSize)
payloadsFlow = flow { repeat(5000) { emit(payload.copy()) } }
payloadsFlow = flow { repeat(5000) { emit(payloadCopy()) } }

val localServer = LocalServer()
server = RSocketServer().bind(localServer) {
RSocketRequestHandler {
requestResponse {
it.release()
payload
payloadCopy()
}
requestStream {
it.release()
payloadsFlow
}
requestChannel { it }
requestChannel { it.flowOn(requestStrategy) }
}
}
return runBlocking {
client = runBlocking {
RSocketConnector().connect(localServer)
}
}
Expand All @@ -72,10 +76,10 @@ class RSocketKotlinBenchmark : RSocketBenchmark<Payload>() {
payload.release()
}

override suspend fun doRequestResponse(): Payload = client.requestResponse(payload.copy())
override suspend fun doRequestResponse(): Payload = client.requestResponse(payloadCopy())

override suspend fun doRequestStream(): Flow<Payload> = client.requestStream(payload.copy())
override suspend fun doRequestStream(): Flow<Payload> = client.requestStream(payloadCopy()).flowOn(requestStrategy)

override suspend fun doRequestChannel(): Flow<Payload> = client.requestChannel(payloadsFlow)
override suspend fun doRequestChannel(): Flow<Payload> = client.requestChannel(payloadsFlow).flowOn(requestStrategy)

}
7 changes: 5 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ subprojects {
extensions.configure<KotlinMultiplatformExtension> {
val isTestProject = project.name == "rsocket-test"
val isLibProject = project.name.startsWith("rsocket")
val isPlaygroundProject = project.name == "playground"
val isExampleProject = "examples" in project.path

sourceSets.all {
languageSettings.apply {
Expand All @@ -206,7 +208,7 @@ subprojects {

useExperimentalAnnotation("kotlin.RequiresOptIn")

if (name.contains("test", ignoreCase = true) || isTestProject) {
if (name.contains("test", ignoreCase = true) || isTestProject || isPlaygroundProject) {
useExperimentalAnnotation("kotlin.time.ExperimentalTime")
useExperimentalAnnotation("kotlin.ExperimentalStdlibApi")

Expand All @@ -221,6 +223,7 @@ subprojects {

useExperimentalAnnotation("io.rsocket.kotlin.TransportApi")
useExperimentalAnnotation("io.rsocket.kotlin.ExperimentalMetadataApi")
useExperimentalAnnotation("io.rsocket.kotlin.ExperimentalStreamsApi")
}
}
}
Expand All @@ -233,7 +236,7 @@ subprojects {
}

//fix atomicfu for examples and playground
if ("examples" in project.path || project.name == "playground") {
if (isExampleProject || isPlaygroundProject) {
sourceSets["commonMain"].dependencies {
implementation("org.jetbrains.kotlinx:atomicfu:$kotlinxAtomicfuVersion")
}
Expand Down
5 changes: 2 additions & 3 deletions examples/interactions/src/jvmMain/kotlin/ReconnectExample.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.local.*
import kotlinx.atomicfu.*
Expand Down Expand Up @@ -62,7 +61,7 @@ fun main(): Unit = runBlocking {

//do request
try {
rSocket.requestStream(Payload("Hello", "World")).buffer(3).collect {
rSocket.requestStream(Payload("Hello", "World")).flowOn(PrefetchStrategy(3, 0)).collect {
val index = it.data.readText().substringAfter("Payload: ").toInt()
println("Client receives index: $index")
}
Expand All @@ -72,7 +71,7 @@ fun main(): Unit = runBlocking {

//do request just after it

rSocket.requestStream(Payload("Hello", "World")).buffer(3).take(3).collect {
rSocket.requestStream(Payload("Hello", "World")).flowOn(PrefetchStrategy(3, 0)).take(3).collect {
val index = it.data.readText().substringAfter("Payload: ").toInt()
println("Client receives index: $index after reconnection")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.local.*
import kotlinx.coroutines.*
Expand Down Expand Up @@ -62,7 +61,7 @@ fun main(): Unit = runBlocking {
})

//do request
rSocket.requestStream(Payload("Hello", "World")).buffer(3).take(3).collect {
rSocket.requestStream(Payload("Hello", "World")).flowOn(PrefetchStrategy(3, 0)).take(3).collect {
val index = it.data.readText().substringAfter("Payload: ").toInt()
println("Client receives index: $index")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.local.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
Expand All @@ -27,7 +26,7 @@ fun main(): Unit = runBlocking {
RSocketServer().bind(server) {
RSocketRequestHandler {
requestChannel { request ->
request.buffer(3).take(3).flatMapConcat { payload ->
request.flowOn(PrefetchStrategy(3, 0)).take(3).flatMapConcat { payload ->
val data = payload.data.readText()
flow {
repeat(3) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import io.rsocket.kotlin.*
import io.rsocket.kotlin.core.*
import io.rsocket.kotlin.payload.*
import io.rsocket.kotlin.transport.local.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
Expand Down Expand Up @@ -44,7 +43,7 @@ fun main(): Unit = runBlocking {
val response = rSocket.requestStream(Payload("Hello", "World"))

response
.buffer(2) //use buffer as first operator to use RequestN semantic, so request by 2 elements
.flowOn(PrefetchStrategy(2, 0))
.map { it.data.readText().substringAfter("Payload: ").toInt() }
.take(2)
.collect {
Expand Down
10 changes: 6 additions & 4 deletions playground/src/commonMain/kotlin/Stub.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ suspend fun RSocket.doSomething() {
// launch { rSocket.fireAndForget(Payload(byteArrayOf(1, 1, 1), byteArrayOf(2, 2, 2))) }
// launch { rSocket.metadataPush(byteArrayOf(1, 2, 3)) }
var i = 0
requestStream(buildPayload {
data(byteArrayOf(1, 1, 1))
metadata(byteArrayOf(2, 2, 2))
}).buffer(10000).collect {
requestStream(
buildPayload {
data(byteArrayOf(1, 1, 1))
metadata(byteArrayOf(2, 2, 2))
}
).flowOn(PrefetchStrategy(10000, 0)).collect {
println(it.data.readBytes().contentToString())
if (++i == 10000) error("")
}
Expand Down
51 changes: 51 additions & 0 deletions playground/src/commonMain/kotlin/streams.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import io.rsocket.kotlin.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

@ExperimentalStreamsApi
private suspend fun s() {
val flow = flow {
val strategy = coroutineContext[RequestStrategy]!!.provide()
var i = strategy.firstRequest()
println("INIT: $i")
var r = 0
while (i > 0) {
emit(r++)
val n = strategy.nextRequest()
println("")
if (n > 0) i += n
i--
}
}

flow.flowOn(PrefetchStrategy(64, 16)).onEach { println(it) }.launchIn(GlobalScope)

val ch = Channel<Int>()

flow.flowOn(ChannelStrategy(ch)).onEach { println(it) }.launchIn(GlobalScope)

delay(100)
ch.send(5)
delay(100)
ch.send(5)
delay(100)
ch.send(5)
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ package io.rsocket.kotlin
@RequiresOptIn(
level = RequiresOptIn.Level.WARNING,
message = "This is an API which is used to implement transport for RSocket, such as WS or TCP. " +
"This API can change in future in non backwards-incompatible manner."
"This API can change in future in non backwards-compatible manner."
)
public annotation class TransportApi

@Retention(value = AnnotationRetention.BINARY)
@RequiresOptIn(
level = RequiresOptIn.Level.WARNING,
message = "This is an API to work with metadata. This API can change in future in non backwards-incompatible manner."
message = "This is an API to work with metadata. This API can change in future in non backwards-compatible manner."
)
public annotation class ExperimentalMetadataApi

@Retention(value = AnnotationRetention.BINARY)
@RequiresOptIn(
level = RequiresOptIn.Level.WARNING,
message = "This is an API to customize request strategy of streams. This API can change in future in non backwards-compatible manner."
)
public annotation class ExperimentalStreamsApi
Loading