From 7e580b91162fe0d67114011174d79a345e872a3e Mon Sep 17 00:00:00 2001 From: olme04 Date: Mon, 6 Dec 2021 17:35:00 +0300 Subject: [PATCH] working nodejs TCP server and client * not published yet, because of transitive dependency on kotlinx.nodejs, which is only on jcenter --- .github/workflows/gradle-main.yml | 9 + .github/workflows/gradle-pr.yml | 9 + .github/workflows/run-tests.yml | 9 + build.gradle.kts | 8 +- .../src/jsMain/kotlin/Server.kt | 160 ------------------ .../build.gradle.kts | 7 +- .../nodejs/tcp/FrameWithLengthAssembler.kt | 44 +++++ .../nodejs/tcp/TcpClientTransport.kt | 24 +++ .../transport/nodejs/tcp/TcpConnection.kt | 59 +++++++ .../nodejs/tcp/TcpServerTransport.kt | 23 +++ .../transport/nodejs/tcp/TcpTransportTest.kt | 41 +++++ settings.gradle.kts | 7 + 12 files changed, 232 insertions(+), 168 deletions(-) delete mode 100644 examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt rename {examples/nodejs-tcp-transport => rsocket-transport-nodejs-tcp}/build.gradle.kts (92%) create mode 100644 rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/FrameWithLengthAssembler.kt create mode 100644 rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt create mode 100644 rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt create mode 100644 rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt create mode 100644 rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt diff --git a/.github/workflows/gradle-main.yml b/.github/workflows/gradle-main.yml index de72755c1..a361dee71 100644 --- a/.github/workflows/gradle-main.yml +++ b/.github/workflows/gradle-main.yml @@ -86,6 +86,15 @@ jobs: distributions-cache-enabled: false dependencies-cache-enabled: false configuration-cache-enabled: false + - name: Test rsocket-transport-nodejs module + if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure()) + timeout-minutes: 15 + uses: gradle/gradle-build-action@v1 + with: + arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info + distributions-cache-enabled: false + dependencies-cache-enabled: false + configuration-cache-enabled: false - name: Publish Test Report if: always() diff --git a/.github/workflows/gradle-pr.yml b/.github/workflows/gradle-pr.yml index 0432b7119..cb686fd93 100644 --- a/.github/workflows/gradle-pr.yml +++ b/.github/workflows/gradle-pr.yml @@ -81,6 +81,15 @@ jobs: distributions-cache-enabled: false dependencies-cache-enabled: false configuration-cache-enabled: false + - name: Test rsocket-transport-nodejs module + if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure()) + timeout-minutes: 15 + uses: gradle/gradle-build-action@v1 + with: + arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info + distributions-cache-enabled: false + dependencies-cache-enabled: false + configuration-cache-enabled: false - name: Publish Test Report if: always() diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index efc4e55a1..f3d3cbd53 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -80,6 +80,15 @@ jobs: distributions-cache-enabled: false dependencies-cache-enabled: false configuration-cache-enabled: false + - name: Test rsocket-transport-nodejs module + if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure()) + timeout-minutes: 15 + uses: gradle/gradle-build-action@v1 + with: + arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info + distributions-cache-enabled: false + dependencies-cache-enabled: false + configuration-cache-enabled: false - name: Publish Test Report if: always() diff --git a/build.gradle.kts b/build.gradle.kts index 03a63cfc1..2b2f6724d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -72,10 +72,12 @@ subprojects { project.name != "rsocket-transport-ktor" && project.name != "rsocket-transport-ktor-client" + val jsOnly = project.name == "rsocket-transport-nodejs-tcp" + val nodejsOnly = project.name == "rsocket-transport-nodejs-tcp" if (!isAutoConfigurable) return@configure - jvm { + if (!jsOnly) jvm { testRuns.all { executionTask.configure { // ActiveProcessorCount is used here, to make sure local setup is similar as on CI @@ -97,7 +99,7 @@ subprojects { } } } - browser { + if (!nodejsOnly) browser { testTask { useKarma { useConfigDirectory(rootDir.resolve("js").resolve("karma.config.d")) @@ -107,6 +109,8 @@ subprojects { } } + if (jsOnly) return@configure + //native targets configuration val linuxTargets = listOf(linuxX64()) val mingwTargets = if (supportMingw) listOf(mingwX64()) else emptyList() diff --git a/examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt b/examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt deleted file mode 100644 index eef0aeadf..000000000 --- a/examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import io.ktor.utils.io.core.* -import io.ktor.utils.io.js.* -import io.rsocket.kotlin.* -import io.rsocket.kotlin.core.* -import io.rsocket.kotlin.payload.* -import io.rsocket.kotlin.transport.* -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import net.* -import org.khronos.webgl.* - -fun main() { - //create transport - val serverTransport = NodeJsTcpServerTransport(9000) { - println("Server started") - } - - //start server - RSocketServer().bind(serverTransport) { - RSocketRequestHandler { - requestResponse { - println("Received: ${it.data.readText()}") - buildPayload { data("Hello from nodejs") } - } - } - } -} - -//TODO fun interface with lambda fail on IR -@OptIn(TransportApi::class) -fun NodeJsTcpServerTransport(port: Int, onStart: () -> Unit = {}): ServerTransport = object : ServerTransport { - override fun start(accept: suspend (Connection) -> Unit): Server = - //create nodejs TCP server - createServer { - //wrap TCP connection with RSocket connection and start server - GlobalScope.launch { accept(NodeJsTcpConnection(it)) } - }.listen(port, onStart) -} - -// nodejs TCP transport connection - may not work in all cases, not tested properly -@OptIn(ExperimentalCoroutinesApi::class, TransportApi::class) -class NodeJsTcpConnection(private val socket: Socket) : Connection { - override val job: Job = Job() - - private val sendChannel = Channel(8) - private val receiveChannel = Channel(8) - - init { - //setup closing of job/socket - socket.on("close") { _ -> job.cancel() } - job.invokeOnCompletion { if (!socket.writableEnded) socket.end(it?.message ?: "Closed") } - - handleSending() - handleReceiving() - } - - // get packets from send channel and put them in socket - private fun handleSending() { - GlobalScope.launch(job) { - sendChannel.consumeEach { packet -> - val buffer = buildPacket { - writeLength(packet.remaining.toInt()) - writePacket(packet) - }.readArrayBuffer() - socket.write(Uint8Array(buffer)) - } - } - } - - // get buffers from socket and put them in receive channel - private fun handleReceiving() { - - fun savePacket(packet: ByteReadPacket) { - GlobalScope.launch(job) { receiveChannel.send(packet) } - } - - var expectedFrameLength = 0 - val packetBuilder = BytePacketBuilder() - - fun buildAndSend(from: ByteReadPacket) { - val packet = buildPacket { - writePacket(from, expectedFrameLength) - } - expectedFrameLength = 0 - savePacket(packet) - } - - //returns true if length read and awaiting for more data to read frame - //returns false if no bytes to read length - fun loopUntilEnoughBytes(): Boolean { - // loop while packetBuilder has enough bytes to read length or length and frame - while (expectedFrameLength == 0) { - if (packetBuilder.size >= 3) { - val tempPacket = packetBuilder.build() - expectedFrameLength = tempPacket.readLength() - - //if enough data to read frame - if (tempPacket.remaining >= expectedFrameLength) buildAndSend(tempPacket) - - packetBuilder.writePacket(tempPacket) //write rest back - } else return false - } - return true - } - - // subscribe on nodejs TCP data events - socket.on("data") { buffer: Buffer -> - //put buffer data to packetBuilder to work with it - packetBuilder.writeFully(buffer.buffer) - if (!loopUntilEnoughBytes()) return@on - - //if length received and enough data for frame - if (packetBuilder.size >= expectedFrameLength) { - val tempPacket = packetBuilder.build() - buildAndSend(tempPacket) - packetBuilder.writePacket(tempPacket) //write rest back - - loopUntilEnoughBytes() //if builder has more bytes - handle them - } - } - } - - override suspend fun send(packet: ByteReadPacket) { - sendChannel.send(packet) - } - - override suspend fun receive(): ByteReadPacket { - return receiveChannel.receive() - } -} - -private fun ByteReadPacket.readLength(): Int { - val b = readByte().toInt() and 0xFF shl 16 - val b1 = readByte().toInt() and 0xFF shl 8 - val b2 = readByte().toInt() and 0xFF - return b or b1 or b2 -} - -private fun BytePacketBuilder.writeLength(length: Int) { - require(length and 0xFFFFFF.inv() == 0) { "Length is larger than 24 bits" } - writeByte((length shr 16).toByte()) - writeByte((length shr 8).toByte()) - writeByte(length.toByte()) -} diff --git a/examples/nodejs-tcp-transport/build.gradle.kts b/rsocket-transport-nodejs-tcp/build.gradle.kts similarity index 92% rename from examples/nodejs-tcp-transport/build.gradle.kts rename to rsocket-transport-nodejs-tcp/build.gradle.kts index 46d3903e2..be09ee344 100644 --- a/examples/nodejs-tcp-transport/build.gradle.kts +++ b/rsocket-transport-nodejs-tcp/build.gradle.kts @@ -16,17 +16,12 @@ plugins { kotlin("multiplatform") + id("kotlinx-atomicfu") } val kotlinxNodejsVersion: String by rootProject kotlin { - js(IR) { - nodejs { - binaries.executable() - } - } - sourceSets { val jsMain by getting { dependencies { diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/FrameWithLengthAssembler.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/FrameWithLengthAssembler.kt new file mode 100644 index 000000000..f5c991f70 --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/FrameWithLengthAssembler.kt @@ -0,0 +1,44 @@ +package io.rsocket.kotlin.transport.nodejs.tcp + +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.frame.io.* + +internal fun ByteReadPacket.withLength(): ByteReadPacket = buildPacket { + @Suppress("INVISIBLE_MEMBER") writeLength(this@withLength.remaining.toInt()) + writePacket(this@withLength) +} + +internal class FrameWithLengthAssembler(private val onFrame: (frame: ByteReadPacket) -> Unit) { + private var expectedFrameLength = 0 //TODO atomic for native + private val packetBuilder: BytePacketBuilder = BytePacketBuilder() + inline fun write(write: BytePacketBuilder.() -> Unit) { + packetBuilder.write() + loop() + } + + private fun loop() { + while (true) when { + expectedFrameLength == 0 && packetBuilder.size < 3 -> return // no length + expectedFrameLength == 0 -> withTemp { // has length + expectedFrameLength = @Suppress("INVISIBLE_MEMBER") it.readLength() + if (it.remaining >= expectedFrameLength) build(it) // if has length and frame + } + packetBuilder.size < expectedFrameLength -> return // not enough bytes to read frame + else -> withTemp { build(it) } // enough bytes to read frame + } + } + + private fun build(from: ByteReadPacket) { + val frame = buildPacket { + writePacket(from, expectedFrameLength) + } + expectedFrameLength = 0 + onFrame(frame) + } + + private inline fun withTemp(block: (tempPacket: ByteReadPacket) -> Unit) { + val tempPacket = packetBuilder.build() + block(tempPacket) + packetBuilder.writePacket(tempPacket) + } +} diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt new file mode 100644 index 000000000..26cbd1c3e --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt @@ -0,0 +1,24 @@ +package io.rsocket.kotlin.transport.nodejs.tcp + +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.transport.* +import kotlinx.coroutines.* +import kotlin.coroutines.* + +public class TcpClientTransport( + private val port: Int, + private val hostname: String, + private val pool: ObjectPool = ChunkBuffer.Pool, + coroutineContext: CoroutineContext = EmptyCoroutineContext +) : ClientTransport { + + override val coroutineContext: CoroutineContext = coroutineContext + SupervisorJob(coroutineContext[Job]) + + @TransportApi + override suspend fun connect(): Connection { + val socket = net.connect(port, hostname) + return TcpConnection(coroutineContext, pool, socket) + } +} diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt new file mode 100644 index 000000000..cfce05d00 --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt @@ -0,0 +1,59 @@ +package io.rsocket.kotlin.transport.nodejs.tcp + +import Buffer +import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.js.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import net.* +import org.khronos.webgl.* +import kotlin.coroutines.* + +@TransportApi +internal class TcpConnection( + override val coroutineContext: CoroutineContext, + override val pool: ObjectPool, + private val socket: Socket +) : Connection { + + private val sendChannel = @Suppress("INVISIBLE_MEMBER") SafeChannel(8) + private val receiveChannel = @Suppress("INVISIBLE_MEMBER") SafeChannel(Channel.UNLIMITED) + + init { + launch { + sendChannel.consumeEach { packet -> + socket.write(Uint8Array(packet.withLength().readArrayBuffer())) + } + } + + coroutineContext.job.invokeOnCompletion { + when (it) { + null -> socket.destroy() + else -> socket.destroy(Error(it.message, it.cause)) + } + } + + val frameAssembler = FrameWithLengthAssembler { receiveChannel.trySend(it) } //TODO + socket.on("data") { buffer: Buffer -> + frameAssembler.write { writeFully(buffer.buffer) } + } + socket.on("error") { error: Error -> + coroutineContext.job.cancel("Socket error", error) + } + socket.on("close") { hadError: Boolean -> + if (!hadError) coroutineContext.job.cancel("Socket closed") + } + } + + override suspend fun send(packet: ByteReadPacket) { + sendChannel.send(packet) + } + + override suspend fun receive(): ByteReadPacket { + return receiveChannel.receive() + } +} diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt new file mode 100644 index 000000000..f7c4423a1 --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt @@ -0,0 +1,23 @@ +package io.rsocket.kotlin.transport.nodejs.tcp + +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.transport.* +import kotlinx.coroutines.* +import net.* + +public class TcpServerTransport( + private val port: Int, + private val hostname: String, + private val pool: ObjectPool = ChunkBuffer.Pool +) : ServerTransport { + @TransportApi + override fun CoroutineScope.start(accept: suspend CoroutineScope.(Connection) -> Unit): Server { + return createServer { socket: Socket -> + launch { + accept(TcpConnection(coroutineContext, pool, socket)) + } + }.listen(port, hostname) + } +} diff --git a/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt b/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt new file mode 100644 index 000000000..c6e32e35b --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt @@ -0,0 +1,41 @@ +package io.rsocket.kotlin.transport.nodejs.tcp + +import io.rsocket.kotlin.test.* +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import net.* +import kotlin.coroutines.* +import kotlin.random.* + +object PortProvider { + private val port = atomic(Random.nextInt(20, 90) * 100) + fun next(): Int = port.incrementAndGet() +} + + +class TcpTransportTest : TransportTest() { + private val testJob = Job() + + private lateinit var server: Server + + override suspend fun before() { + val port = PortProvider.next() + server = SERVER.bindIn( + CoroutineScope(testJob + CoroutineExceptionHandler { c, e -> println("$c -> $e") }), + TcpServerTransport(port, "127.0.0.1", InUseTrackingPool), + ACCEPTOR + ) + client = CONNECTOR.connect(TcpClientTransport(port, "127.0.0.1", InUseTrackingPool, testJob)) + } + + override suspend fun after() { + delay(100) //TODO close race + super.after() + testJob.cancelAndJoin() + suspendCoroutine { cont -> + server.close { + cont.resume(Unit) + } + } + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 266f39be1..36ccd2847 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -41,6 +41,11 @@ pluginManagement { dependencyResolutionManagement { repositories { mavenCentral() + jcenter { + content { + includeModule("org.jetbrains.kotlinx", "kotlinx-nodejs") + } + } } } @@ -71,3 +76,5 @@ include("rsocket-transport-ktor-client") include("rsocket-transport-ktor-server") project(":rsocket-transport-ktor-client").projectDir = file("rsocket-transport-ktor/rsocket-transport-ktor-client") project(":rsocket-transport-ktor-server").projectDir = file("rsocket-transport-ktor/rsocket-transport-ktor-server") + +include("rsocket-transport-nodejs-tcp")