From 31515370d156e7f487c17e1aef92631ebb9e3761 Mon Sep 17 00:00:00 2001 From: Oleg Yukhnevich Date: Mon, 26 Jun 2023 20:47:36 +0300 Subject: [PATCH] save --- .../rsocket/kotlin/operation/FrameReceiver.kt | 102 ++++++++++++++++ .../rsocket/kotlin/operation/FrameSender.kt | 25 ++++ .../io/rsocket/kotlin/operation/Requester.kt | 115 ++++++++++++++++++ .../RequesterFireAndForgetOperation.kt | 56 +++++++++ .../kotlin/operation/RequesterOperation.kt | 28 +++++ .../RequesterRequestChannelOperation.kt | 64 ++++++++++ .../RequesterRequestResponseOperation.kt | 89 ++++++++++++++ .../RequesterRequestStreamOperation.kt | 63 ++++++++++ 8 files changed, 542 insertions(+) create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/FrameReceiver.kt create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/FrameSender.kt create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/Requester.kt create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterFireAndForgetOperation.kt create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterOperation.kt create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestChannelOperation.kt create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperation.kt create mode 100644 rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestStreamOperation.kt diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/FrameReceiver.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/FrameReceiver.kt new file mode 100644 index 00000000..fe08e377 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/FrameReceiver.kt @@ -0,0 +1,102 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.operation + +import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.payload.* + +// returns true if stream can be closed +internal abstract class FrameReceiver { + private var hasPayload: Boolean = false + private var hasMetadata: Boolean = false + private val data = BytePacketBuilder(NoPool) + private val metadata = BytePacketBuilder(NoPool) + + private fun appendFragment(fragment: Payload) { + hasPayload = true + data.writePacket(fragment.data) + when (val meta = fragment.metadata) { + null -> {} + else -> { + hasMetadata = true + metadata.writePacket(meta) + } + } + } + + private fun assemblePayload(): Payload? { + if (!hasPayload) return null + + val payload = Payload( + data = data.build(), + metadata = when { + hasMetadata -> metadata.build() + else -> null + } + ) + hasMetadata = false + hasPayload = false + return payload + } + + // todo non-suspend variant? + // TODO: if there are no fragments saved and there are no following - we can ignore going through buffer + suspend fun receivePayload(fragment: Payload?, complete: Boolean, follows: Boolean): Boolean { + fragment?.let(::appendFragment) + return when { + complete -> when (val payload = assemblePayload()) { + null -> receiveComplete() + else -> receiveNextComplete(payload) + } + + else -> when { + follows -> true // not all fragments received + else -> when (val payload = assemblePayload()) { + null -> error("protocol violation: next or complete flags should be set") + else -> receiveNext(payload) + } + } + } + } + + abstract suspend fun receiveNext(payload: Payload): Boolean + abstract suspend fun receiveNextComplete(payload: Payload): Boolean + abstract fun receiveComplete(): Boolean + + abstract fun receiveError(cause: Throwable): Boolean + abstract fun receiveCancel(): Boolean + abstract fun receiveRequestN(n: Int): Boolean +} + +private object NoPool : ObjectPool { + override val capacity: Int + get() = error("should not be called") + + override fun borrow(): ChunkBuffer { + error("should not be called") + } + + override fun dispose() { + error("should not be called") + } + + override fun recycle(instance: ChunkBuffer) { + error("should not be called") + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/FrameSender.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/FrameSender.kt new file mode 100644 index 00000000..b2a5e6f6 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/FrameSender.kt @@ -0,0 +1,25 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.operation + +import io.ktor.utils.io.core.* + +internal abstract class FrameSender : Closeable { + suspend fun sendCancel() { + + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/Requester.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/Requester.kt new file mode 100644 index 00000000..d5af8cf3 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/Requester.kt @@ -0,0 +1,115 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.operation + +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.internal.io.* +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* +import kotlin.coroutines.* + +internal class Requester( + override val coroutineContext: CoroutineContext, + private val executor: RequesterOperationExecutor, +) : RSocket { + + // TODO: may be move `metadataPush` somewhere else from RSocket? near keep alive and lease sending/receiving? + override suspend fun metadataPush(metadata: ByteReadPacket) { + super.metadataPush(metadata) + } + + override suspend fun fireAndForget(payload: Payload): Unit = executeSingleOperation(payload) { cont -> + RequesterFireAndForgetOperation(payload, cont) + } + + override suspend fun requestResponse(payload: Payload): Payload = executeSingleOperation(payload) { cont -> + RequesterRequestResponseOperation(payload, cont) + } + + override fun requestStream(payload: Payload): Flow = executeFlowOperation(payload) { initialRequest, requests, channel -> + RequesterRequestStreamOperation(payload, initialRequest, requests, channel) + } + + override fun requestChannel(initPayload: Payload, payloads: Flow): Flow = + executeFlowOperation(initPayload) { initialRequest, requests, channel -> + RequesterRequestChannelOperation(initPayload, payloads, initialRequest, requests, channel) + } + + private suspend inline fun executeSingleOperation( + payload: Payload, + crossinline createOperation: ( + cont: CancellableContinuation, + ) -> RequesterOperation, + ): T { + ensureActiveOrClose(payload) + return suspendCancellableCoroutine { cont -> + val operation = createOperation(cont) + cont.invokeOnCancellation(operation::cancel) + executor.execute(operation) + } + } + + @OptIn(ExperimentalStreamsApi::class) + private inline fun executeFlowOperation( + payload: Payload, + crossinline createOperation: ( + initialRequest: Int, + requests: ReceiveChannel, + channel: SendChannel, + ) -> RequesterOperation, + ): Flow = requestFlow { strategy, initialRequest -> + ensureActiveOrClose(payload) + val channel = channelForCloseable(Channel.UNLIMITED) //TODO: should be configurable + val requests = Channel(Channel.UNLIMITED) + val operation = createOperation(initialRequest, requests, channel) + var error: Throwable? = null + try { + executor.execute(operation) + while (true) { + val result = channel.receiveCatching() + if (result.isClosed) { + error = result.exceptionOrNull() + break + } + emit(result.getOrThrow()) + val next = strategy.nextRequest() + if (next > 0) requests.trySend(next) + } + } catch (cause: Throwable) { + error = cause + // scope of flow is canceled or emit failed + operation.cancel(cause) + throw cause + } finally { + channel.cancelWithCause(error) + requests.cancelWithCause(error) + } + error?.let { throw it } + } + + // TODO: after removing CoroutineScope from RSocket move this to OperationExecutor + private suspend inline fun ensureActiveOrClose(closeable: Closeable) { + if (currentCoroutineContext().isActive && isActive) return + closeable.close() + currentCoroutineContext().ensureActive() + ensureActive() + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterFireAndForgetOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterFireAndForgetOperation.kt new file mode 100644 index 00000000..f30de7a7 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterFireAndForgetOperation.kt @@ -0,0 +1,56 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.operation + +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.* + +//TODO: may be just throw error on receiving frames? +internal class RequesterFireAndForgetOperation( + private val payload: Payload, + private val completionContinuation: CancellableContinuation, +) : RequesterOperation() { + override suspend fun receiveNext(payload: Payload): Boolean { + payload.close() + return false // no-op - should not happen + } + + override suspend fun receiveNextComplete(payload: Payload): Boolean { + payload.close() + return false // no-op - should not happen + } + + override fun receiveComplete(): Boolean { + return false // no-op - should not happen + } + + override fun receiveError(cause: Throwable): Boolean { + return false // no-op - should not happen + } + + override fun receiveCancel(): Boolean { + return false // no-op - should not happen + } + + override fun receiveRequestN(n: Int): Boolean { + return false // no-op - should not happen + } + + override fun cancel(cause: Throwable?) { + TODO("FAF CANCEL") + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterOperation.kt new file mode 100644 index 00000000..d327a59c --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterOperation.kt @@ -0,0 +1,28 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.operation + +import kotlinx.coroutines.* + +internal abstract class RequesterOperation { + abstract fun execute(connectionScope: CoroutineScope, sender: FrameSender): FrameReceiver + abstract fun cancel(cause: Throwable?) //TODO is cause needed? +} + +internal sealed class RequesterOperationExecutor { + abstract fun execute(operation: RequesterOperation) +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestChannelOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestChannelOperation.kt new file mode 100644 index 00000000..5bacec86 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestChannelOperation.kt @@ -0,0 +1,64 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.operation + +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.* + +internal class RequesterRequestChannelOperation( + private val initPayload: Payload, + private val payloads: Flow, + private val initialRequest: Int, + private val requests: ReceiveChannel, + private val responsePayloadsChannel: SendChannel, +) : RequesterOperation() { + + override suspend fun receiveNext(payload: Payload): Boolean { + responsePayloadsChannel.send(payload) + return false + } + + override suspend fun receiveNextComplete(payload: Payload): Boolean { + responsePayloadsChannel.send(payload) + return receiveComplete() + } + + override fun receiveComplete(): Boolean { + responsePayloadsChannel.close() + return true + } + + override fun receiveError(cause: Throwable): Boolean { + // TODO: decide on behavior - may be we need to call cancel, so to stop stream right now, and do not collect elements + // also we need to stop sending `requestN` after this + responsePayloadsChannel.close(cause) + return true + } + + override fun cancel(cause: Throwable?) { + TODO("Not yet implemented") + } + + override fun receiveCancel(): Boolean { + return false // no-op - should not happen + } + + override fun receiveRequestN(n: Int): Boolean { + return false // no-op - should not happen + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperation.kt new file mode 100644 index 00000000..810582e8 --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestResponseOperation.kt @@ -0,0 +1,89 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.operation + +import io.rsocket.kotlin.payload.* +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlin.coroutines.* + +internal class RequesterRequestResponseOperation( + payload: Payload, + private val responsePayloadContinuation: CancellableContinuation, +) : RequesterOperation() { + private var cancelled = atomic(false) + private var requestFrameSent by atomic(false) + private var payload: Payload? by atomic(payload) + private var sendJob by atomic(null) + + override fun execute( + connectionScope: CoroutineScope, + sender: FrameSender, + ): FrameReceiver { + check(!cancelled.value) + checkNotNull(payload) + sendJob = scope.launch { + //send frame + } + } + + override fun cancel(cause: Throwable?) { + if (!cancelled.compareAndSet(false, true)) return + + sendJob?.cancel("Request cancelled", cause) + sendJob = null + if (requestFrameSent) { + // need to send cancel frame + // close stream + } + } + + private class Receiver( + private val responsePayloadContinuation: CancellableContinuation, + ) : FrameReceiver() { + // for request response even if there is no `complete` flag, we still should handle it + override suspend fun receiveNext(payload: Payload): Boolean { + return receiveNextComplete(payload) + } + + @OptIn(ExperimentalCoroutinesApi::class) + override suspend fun receiveNextComplete(payload: Payload): Boolean { + responsePayloadContinuation.resume(payload) { + payload.close() + } + return true // close stream + } + + override fun receiveError(cause: Throwable): Boolean { + responsePayloadContinuation.resumeWithException(cause) + return true // close stream + } + + override fun receiveComplete(): Boolean { + return false // no-op - should not happen + } + + override fun receiveRequestN(n: Int): Boolean { + return false // no-op - should not happen + } + + override fun receiveCancel(): Boolean { + return false // no-op - should not happen + } + + } +} diff --git a/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestStreamOperation.kt b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestStreamOperation.kt new file mode 100644 index 00000000..2b6847af --- /dev/null +++ b/rsocket-core/src/commonMain/kotlin/io/rsocket/kotlin/operation/RequesterRequestStreamOperation.kt @@ -0,0 +1,63 @@ +/* + * Copyright 2015-2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.rsocket.kotlin.operation + +import io.rsocket.kotlin.* +import io.rsocket.kotlin.payload.* +import kotlinx.coroutines.channels.* + +internal class RequesterRequestStreamOperation( + private val payload: Payload, + private val initialRequest: Int, + private val requests: ReceiveChannel, + private val responsePayloadsChannel: SendChannel, +) : RequesterOperation() { + + override suspend fun receiveNext(payload: Payload): Boolean { + responsePayloadsChannel.send(payload) + return false + } + + override suspend fun receiveNextComplete(payload: Payload): Boolean { + responsePayloadsChannel.send(payload) + return receiveComplete() + } + + override fun receiveComplete(): Boolean { + responsePayloadsChannel.close() + return true + } + + override fun receiveError(cause: Throwable): Boolean { + // TODO: decide on behavior - may be we need to call cancel, so to stop stream right now, and do not collect elements + // also we need to stop sending `requestN` after this + responsePayloadsChannel.close(cause) + return true + } + + override fun cancel(cause: Throwable?) { + TODO("Not yet implemented") + } + + override fun receiveCancel(): Boolean { + return false // no-op - should not happen + } + + override fun receiveRequestN(n: Int): Boolean { + return false // no-op - should not happen + } +}