Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Jun 26, 2023
1 parent 374b9e3 commit 3151537
Show file tree
Hide file tree
Showing 8 changed files with 542 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.operation

import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.payload.*

// returns true if stream can be closed
internal abstract class FrameReceiver {
private var hasPayload: Boolean = false
private var hasMetadata: Boolean = false
private val data = BytePacketBuilder(NoPool)
private val metadata = BytePacketBuilder(NoPool)

private fun appendFragment(fragment: Payload) {
hasPayload = true
data.writePacket(fragment.data)
when (val meta = fragment.metadata) {
null -> {}
else -> {
hasMetadata = true
metadata.writePacket(meta)
}
}
}

private fun assemblePayload(): Payload? {
if (!hasPayload) return null

val payload = Payload(
data = data.build(),
metadata = when {
hasMetadata -> metadata.build()
else -> null
}
)
hasMetadata = false
hasPayload = false
return payload
}

// todo non-suspend variant?
// TODO: if there are no fragments saved and there are no following - we can ignore going through buffer
suspend fun receivePayload(fragment: Payload?, complete: Boolean, follows: Boolean): Boolean {
fragment?.let(::appendFragment)
return when {
complete -> when (val payload = assemblePayload()) {
null -> receiveComplete()
else -> receiveNextComplete(payload)
}

else -> when {
follows -> true // not all fragments received
else -> when (val payload = assemblePayload()) {
null -> error("protocol violation: next or complete flags should be set")
else -> receiveNext(payload)
}
}
}
}

abstract suspend fun receiveNext(payload: Payload): Boolean
abstract suspend fun receiveNextComplete(payload: Payload): Boolean
abstract fun receiveComplete(): Boolean

abstract fun receiveError(cause: Throwable): Boolean
abstract fun receiveCancel(): Boolean
abstract fun receiveRequestN(n: Int): Boolean
}

private object NoPool : ObjectPool<ChunkBuffer> {
override val capacity: Int
get() = error("should not be called")

override fun borrow(): ChunkBuffer {
error("should not be called")
}

override fun dispose() {
error("should not be called")
}

override fun recycle(instance: ChunkBuffer) {
error("should not be called")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.operation

import io.ktor.utils.io.core.*

internal abstract class FrameSender : Closeable {
suspend fun sendCancel() {

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.operation

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

internal class Requester(
override val coroutineContext: CoroutineContext,
private val executor: RequesterOperationExecutor,
) : RSocket {

// TODO: may be move `metadataPush` somewhere else from RSocket? near keep alive and lease sending/receiving?
override suspend fun metadataPush(metadata: ByteReadPacket) {
super.metadataPush(metadata)
}

override suspend fun fireAndForget(payload: Payload): Unit = executeSingleOperation(payload) { cont ->
RequesterFireAndForgetOperation(payload, cont)
}

override suspend fun requestResponse(payload: Payload): Payload = executeSingleOperation(payload) { cont ->
RequesterRequestResponseOperation(payload, cont)
}

override fun requestStream(payload: Payload): Flow<Payload> = executeFlowOperation(payload) { initialRequest, requests, channel ->
RequesterRequestStreamOperation(payload, initialRequest, requests, channel)
}

override fun requestChannel(initPayload: Payload, payloads: Flow<Payload>): Flow<Payload> =
executeFlowOperation(initPayload) { initialRequest, requests, channel ->
RequesterRequestChannelOperation(initPayload, payloads, initialRequest, requests, channel)
}

private suspend inline fun <T> executeSingleOperation(
payload: Payload,
crossinline createOperation: (
cont: CancellableContinuation<T>,
) -> RequesterOperation,
): T {
ensureActiveOrClose(payload)
return suspendCancellableCoroutine { cont ->
val operation = createOperation(cont)
cont.invokeOnCancellation(operation::cancel)
executor.execute(operation)
}
}

@OptIn(ExperimentalStreamsApi::class)
private inline fun executeFlowOperation(
payload: Payload,
crossinline createOperation: (
initialRequest: Int,
requests: ReceiveChannel<Int>,
channel: SendChannel<Payload>,
) -> RequesterOperation,
): Flow<Payload> = requestFlow { strategy, initialRequest ->
ensureActiveOrClose(payload)
val channel = channelForCloseable<Payload>(Channel.UNLIMITED) //TODO: should be configurable
val requests = Channel<Int>(Channel.UNLIMITED)
val operation = createOperation(initialRequest, requests, channel)
var error: Throwable? = null
try {
executor.execute(operation)
while (true) {
val result = channel.receiveCatching()
if (result.isClosed) {
error = result.exceptionOrNull()
break
}
emit(result.getOrThrow())
val next = strategy.nextRequest()
if (next > 0) requests.trySend(next)
}
} catch (cause: Throwable) {
error = cause
// scope of flow is canceled or emit failed
operation.cancel(cause)
throw cause
} finally {
channel.cancelWithCause(error)
requests.cancelWithCause(error)
}
error?.let { throw it }
}

// TODO: after removing CoroutineScope from RSocket move this to OperationExecutor
private suspend inline fun ensureActiveOrClose(closeable: Closeable) {
if (currentCoroutineContext().isActive && isActive) return
closeable.close()
currentCoroutineContext().ensureActive()
ensureActive()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.operation

import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.*

//TODO: may be just throw error on receiving frames?
internal class RequesterFireAndForgetOperation(
private val payload: Payload,
private val completionContinuation: CancellableContinuation<Unit>,
) : RequesterOperation() {
override suspend fun receiveNext(payload: Payload): Boolean {
payload.close()
return false // no-op - should not happen
}

override suspend fun receiveNextComplete(payload: Payload): Boolean {
payload.close()
return false // no-op - should not happen
}

override fun receiveComplete(): Boolean {
return false // no-op - should not happen
}

override fun receiveError(cause: Throwable): Boolean {
return false // no-op - should not happen
}

override fun receiveCancel(): Boolean {
return false // no-op - should not happen
}

override fun receiveRequestN(n: Int): Boolean {
return false // no-op - should not happen
}

override fun cancel(cause: Throwable?) {
TODO("FAF CANCEL")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.operation

import kotlinx.coroutines.*

internal abstract class RequesterOperation {
abstract fun execute(connectionScope: CoroutineScope, sender: FrameSender): FrameReceiver
abstract fun cancel(cause: Throwable?) //TODO is cause needed?
}

internal sealed class RequesterOperationExecutor {
abstract fun execute(operation: RequesterOperation)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2015-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.rsocket.kotlin.operation

import io.rsocket.kotlin.payload.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.*

internal class RequesterRequestChannelOperation(
private val initPayload: Payload,
private val payloads: Flow<Payload>,
private val initialRequest: Int,
private val requests: ReceiveChannel<Int>,
private val responsePayloadsChannel: SendChannel<Payload>,
) : RequesterOperation() {

override suspend fun receiveNext(payload: Payload): Boolean {
responsePayloadsChannel.send(payload)
return false
}

override suspend fun receiveNextComplete(payload: Payload): Boolean {
responsePayloadsChannel.send(payload)
return receiveComplete()
}

override fun receiveComplete(): Boolean {
responsePayloadsChannel.close()
return true
}

override fun receiveError(cause: Throwable): Boolean {
// TODO: decide on behavior - may be we need to call cancel, so to stop stream right now, and do not collect elements
// also we need to stop sending `requestN` after this
responsePayloadsChannel.close(cause)
return true
}

override fun cancel(cause: Throwable?) {
TODO("Not yet implemented")
}

override fun receiveCancel(): Boolean {
return false // no-op - should not happen
}

override fun receiveRequestN(n: Int): Boolean {
return false // no-op - should not happen
}
}
Loading

0 comments on commit 3151537

Please sign in to comment.