diff --git a/.github/workflows/gradle-main.yml b/.github/workflows/gradle-main.yml index 8a6453215..d1411d2d0 100644 --- a/.github/workflows/gradle-main.yml +++ b/.github/workflows/gradle-main.yml @@ -86,6 +86,15 @@ jobs: distributions-cache-enabled: false dependencies-cache-enabled: false configuration-cache-enabled: false + - name: Test rsocket-transport-nodejs module + if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure()) + timeout-minutes: 15 + uses: gradle/gradle-build-action@v1 + with: + arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info + distributions-cache-enabled: false + dependencies-cache-enabled: false + configuration-cache-enabled: false - name: Publish Test Report if: always() diff --git a/.github/workflows/gradle-pr.yml b/.github/workflows/gradle-pr.yml index 6850360f8..f211596d2 100644 --- a/.github/workflows/gradle-pr.yml +++ b/.github/workflows/gradle-pr.yml @@ -81,6 +81,15 @@ jobs: distributions-cache-enabled: false dependencies-cache-enabled: false configuration-cache-enabled: false + - name: Test rsocket-transport-nodejs module + if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure()) + timeout-minutes: 15 + uses: gradle/gradle-build-action@v1 + with: + arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info + distributions-cache-enabled: false + dependencies-cache-enabled: false + configuration-cache-enabled: false - name: Publish Test Report if: always() diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index efc4e55a1..f3d3cbd53 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -80,6 +80,15 @@ jobs: distributions-cache-enabled: false dependencies-cache-enabled: false configuration-cache-enabled: false + - name: Test rsocket-transport-nodejs module + if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure()) + timeout-minutes: 15 + uses: gradle/gradle-build-action@v1 + with: + arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info + distributions-cache-enabled: false + dependencies-cache-enabled: false + configuration-cache-enabled: false - name: Publish Test Report if: always() diff --git a/build.gradle.kts b/build.gradle.kts index 575fa39aa..2ea54225e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -65,10 +65,12 @@ subprojects { project.name != "rsocket-transport-ktor" && project.name != "rsocket-transport-ktor-client" + val jsOnly = project.name == "rsocket-transport-nodejs-tcp" + val nodejsOnly = project.name == "rsocket-transport-nodejs-tcp" if (!isAutoConfigurable) return@configure - jvm { + if (!jsOnly) jvm { testRuns.all { executionTask.configure { // ActiveProcessorCount is used here, to make sure local setup is similar as on CI @@ -90,7 +92,7 @@ subprojects { } } } - browser { + if (!nodejsOnly) browser { testTask { useKarma { useConfigDirectory(rootDir.resolve("js").resolve("karma.config.d")) @@ -100,6 +102,8 @@ subprojects { } } + if (jsOnly) return@configure + //native targets configuration val linuxTargets = listOf(linuxX64()) val mingwTargets = if (supportMingw) listOf(mingwX64()) else emptyList() @@ -317,7 +321,7 @@ subprojects { } tasks.matching { it.name == "generatePomFileForKotlinMultiplatformPublication" }.configureEach { - dependsOn(tasks["generatePomFileForJvmPublication"]) + tasks.findByName("generatePomFileForJvmPublication")?.let { dependsOn(it) } } } } diff --git a/examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt b/examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt deleted file mode 100644 index eef0aeadf..000000000 --- a/examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Copyright 2015-2020 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import io.ktor.utils.io.core.* -import io.ktor.utils.io.js.* -import io.rsocket.kotlin.* -import io.rsocket.kotlin.core.* -import io.rsocket.kotlin.payload.* -import io.rsocket.kotlin.transport.* -import kotlinx.coroutines.* -import kotlinx.coroutines.channels.* -import net.* -import org.khronos.webgl.* - -fun main() { - //create transport - val serverTransport = NodeJsTcpServerTransport(9000) { - println("Server started") - } - - //start server - RSocketServer().bind(serverTransport) { - RSocketRequestHandler { - requestResponse { - println("Received: ${it.data.readText()}") - buildPayload { data("Hello from nodejs") } - } - } - } -} - -//TODO fun interface with lambda fail on IR -@OptIn(TransportApi::class) -fun NodeJsTcpServerTransport(port: Int, onStart: () -> Unit = {}): ServerTransport = object : ServerTransport { - override fun start(accept: suspend (Connection) -> Unit): Server = - //create nodejs TCP server - createServer { - //wrap TCP connection with RSocket connection and start server - GlobalScope.launch { accept(NodeJsTcpConnection(it)) } - }.listen(port, onStart) -} - -// nodejs TCP transport connection - may not work in all cases, not tested properly -@OptIn(ExperimentalCoroutinesApi::class, TransportApi::class) -class NodeJsTcpConnection(private val socket: Socket) : Connection { - override val job: Job = Job() - - private val sendChannel = Channel(8) - private val receiveChannel = Channel(8) - - init { - //setup closing of job/socket - socket.on("close") { _ -> job.cancel() } - job.invokeOnCompletion { if (!socket.writableEnded) socket.end(it?.message ?: "Closed") } - - handleSending() - handleReceiving() - } - - // get packets from send channel and put them in socket - private fun handleSending() { - GlobalScope.launch(job) { - sendChannel.consumeEach { packet -> - val buffer = buildPacket { - writeLength(packet.remaining.toInt()) - writePacket(packet) - }.readArrayBuffer() - socket.write(Uint8Array(buffer)) - } - } - } - - // get buffers from socket and put them in receive channel - private fun handleReceiving() { - - fun savePacket(packet: ByteReadPacket) { - GlobalScope.launch(job) { receiveChannel.send(packet) } - } - - var expectedFrameLength = 0 - val packetBuilder = BytePacketBuilder() - - fun buildAndSend(from: ByteReadPacket) { - val packet = buildPacket { - writePacket(from, expectedFrameLength) - } - expectedFrameLength = 0 - savePacket(packet) - } - - //returns true if length read and awaiting for more data to read frame - //returns false if no bytes to read length - fun loopUntilEnoughBytes(): Boolean { - // loop while packetBuilder has enough bytes to read length or length and frame - while (expectedFrameLength == 0) { - if (packetBuilder.size >= 3) { - val tempPacket = packetBuilder.build() - expectedFrameLength = tempPacket.readLength() - - //if enough data to read frame - if (tempPacket.remaining >= expectedFrameLength) buildAndSend(tempPacket) - - packetBuilder.writePacket(tempPacket) //write rest back - } else return false - } - return true - } - - // subscribe on nodejs TCP data events - socket.on("data") { buffer: Buffer -> - //put buffer data to packetBuilder to work with it - packetBuilder.writeFully(buffer.buffer) - if (!loopUntilEnoughBytes()) return@on - - //if length received and enough data for frame - if (packetBuilder.size >= expectedFrameLength) { - val tempPacket = packetBuilder.build() - buildAndSend(tempPacket) - packetBuilder.writePacket(tempPacket) //write rest back - - loopUntilEnoughBytes() //if builder has more bytes - handle them - } - } - } - - override suspend fun send(packet: ByteReadPacket) { - sendChannel.send(packet) - } - - override suspend fun receive(): ByteReadPacket { - return receiveChannel.receive() - } -} - -private fun ByteReadPacket.readLength(): Int { - val b = readByte().toInt() and 0xFF shl 16 - val b1 = readByte().toInt() and 0xFF shl 8 - val b2 = readByte().toInt() and 0xFF - return b or b1 or b2 -} - -private fun BytePacketBuilder.writeLength(length: Int) { - require(length and 0xFFFFFF.inv() == 0) { "Length is larger than 24 bits" } - writeByte((length shr 16).toByte()) - writeByte((length shr 8).toByte()) - writeByte(length.toByte()) -} diff --git a/gradle.properties b/gradle.properties index 1834584b9..24e69dafc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -23,7 +23,6 @@ kotlinxCoroutinesVersion=1.5.2-native-mt kotlinxAtomicfuVersion=0.17.0 kotlinxSerializationVersion=1.3.1 kotlinxBenchmarkVersion=0.4.0 -kotlinxNodejsVersion=0.0.7 rsocketJavaVersion=1.1.1 turbineVersion=0.7.0 artifactoryVersion=4.25.1 diff --git a/examples/nodejs-tcp-transport/build.gradle.kts b/rsocket-transport-nodejs-tcp/build.gradle.kts similarity index 73% rename from examples/nodejs-tcp-transport/build.gradle.kts rename to rsocket-transport-nodejs-tcp/build.gradle.kts index 46d3903e2..c73084cef 100644 --- a/examples/nodejs-tcp-transport/build.gradle.kts +++ b/rsocket-transport-nodejs-tcp/build.gradle.kts @@ -16,22 +16,18 @@ plugins { kotlin("multiplatform") -} + id("kotlinx-atomicfu") -val kotlinxNodejsVersion: String by rootProject + signing + `maven-publish` + id("com.jfrog.artifactory") +} kotlin { - js(IR) { - nodejs { - binaries.executable() - } - } - sourceSets { val jsMain by getting { dependencies { - implementation(project(":rsocket-core")) - implementation("org.jetbrains.kotlinx:kotlinx-nodejs:$kotlinxNodejsVersion") + api(project(":rsocket-core")) } } } diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/FrameWithLengthAssembler.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/FrameWithLengthAssembler.kt new file mode 100644 index 000000000..f5c991f70 --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/FrameWithLengthAssembler.kt @@ -0,0 +1,44 @@ +package io.rsocket.kotlin.transport.nodejs.tcp + +import io.ktor.utils.io.core.* +import io.rsocket.kotlin.frame.io.* + +internal fun ByteReadPacket.withLength(): ByteReadPacket = buildPacket { + @Suppress("INVISIBLE_MEMBER") writeLength(this@withLength.remaining.toInt()) + writePacket(this@withLength) +} + +internal class FrameWithLengthAssembler(private val onFrame: (frame: ByteReadPacket) -> Unit) { + private var expectedFrameLength = 0 //TODO atomic for native + private val packetBuilder: BytePacketBuilder = BytePacketBuilder() + inline fun write(write: BytePacketBuilder.() -> Unit) { + packetBuilder.write() + loop() + } + + private fun loop() { + while (true) when { + expectedFrameLength == 0 && packetBuilder.size < 3 -> return // no length + expectedFrameLength == 0 -> withTemp { // has length + expectedFrameLength = @Suppress("INVISIBLE_MEMBER") it.readLength() + if (it.remaining >= expectedFrameLength) build(it) // if has length and frame + } + packetBuilder.size < expectedFrameLength -> return // not enough bytes to read frame + else -> withTemp { build(it) } // enough bytes to read frame + } + } + + private fun build(from: ByteReadPacket) { + val frame = buildPacket { + writePacket(from, expectedFrameLength) + } + expectedFrameLength = 0 + onFrame(frame) + } + + private inline fun withTemp(block: (tempPacket: ByteReadPacket) -> Unit) { + val tempPacket = packetBuilder.build() + block(tempPacket) + packetBuilder.writePacket(tempPacket) + } +} diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt new file mode 100644 index 000000000..6fdeb5d90 --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpClientTransport.kt @@ -0,0 +1,25 @@ +package io.rsocket.kotlin.transport.nodejs.tcp + +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.nodejs.tcp.internal.* +import kotlinx.coroutines.* +import kotlin.coroutines.* + +public class TcpClientTransport( + private val port: Int, + private val hostname: String, + private val pool: ObjectPool = ChunkBuffer.Pool, + coroutineContext: CoroutineContext = EmptyCoroutineContext +) : ClientTransport { + + override val coroutineContext: CoroutineContext = coroutineContext + SupervisorJob(coroutineContext[Job]) + + @TransportApi + override suspend fun connect(): Connection { + val socket = connect(port, hostname) + return TcpConnection(coroutineContext, pool, socket) + } +} diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt new file mode 100644 index 000000000..fb3bb7b24 --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpConnection.kt @@ -0,0 +1,54 @@ +package io.rsocket.kotlin.transport.nodejs.tcp + +import io.ktor.utils.io.core.* +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.js.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.internal.* +import io.rsocket.kotlin.transport.nodejs.tcp.internal.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import org.khronos.webgl.* +import kotlin.coroutines.* + +@TransportApi +internal class TcpConnection( + override val coroutineContext: CoroutineContext, + override val pool: ObjectPool, + private val socket: Socket +) : Connection { + + private val sendChannel = @Suppress("INVISIBLE_MEMBER") SafeChannel(8) + private val receiveChannel = @Suppress("INVISIBLE_MEMBER") SafeChannel(Channel.UNLIMITED) + + init { + launch { + sendChannel.consumeEach { packet -> + socket.write(Uint8Array(packet.withLength().readArrayBuffer())) + } + } + + coroutineContext.job.invokeOnCompletion { + when (it) { + null -> socket.destroy() + else -> socket.destroy(Error(it.message, it.cause)) + } + } + + val frameAssembler = FrameWithLengthAssembler { receiveChannel.trySend(it) } //TODO + socket.on( + onData = { frameAssembler.write { writeFully(it.buffer) } }, + onError = { coroutineContext.job.cancel("Socket error", it) }, + onClose = { if (!it) coroutineContext.job.cancel("Socket closed") } + ) + } + + override suspend fun send(packet: ByteReadPacket) { + sendChannel.send(packet) + } + + override suspend fun receive(): ByteReadPacket { + return receiveChannel.receive() + } +} diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt new file mode 100644 index 000000000..5346d6050 --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpServerTransport.kt @@ -0,0 +1,32 @@ +package io.rsocket.kotlin.transport.nodejs.tcp + +import io.ktor.utils.io.core.internal.* +import io.ktor.utils.io.pool.* +import io.rsocket.kotlin.* +import io.rsocket.kotlin.transport.* +import io.rsocket.kotlin.transport.nodejs.tcp.internal.* +import kotlinx.coroutines.* +import kotlin.coroutines.* + +public class TcpServer internal constructor( + public val job: Job, private val server: Server +) { + public suspend fun close(): Unit = suspendCancellableCoroutine { cont -> + server.close { cont.resume(Unit) } + } +} + +public class TcpServerTransport( + private val port: Int, private val hostname: String, private val pool: ObjectPool = ChunkBuffer.Pool +) : ServerTransport { + @TransportApi + override fun CoroutineScope.start(accept: suspend CoroutineScope.(Connection) -> Unit): TcpServer { + val supervisorJob = SupervisorJob(coroutineContext[Job]) + val server = createServer(port, hostname, { supervisorJob.cancel() }) { + launch(supervisorJob) { + accept(TcpConnection(coroutineContext, pool, it)) + } + } + return TcpServer(supervisorJob, server) + } +} diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/internal/ext.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/internal/ext.kt new file mode 100644 index 000000000..c52cc48d9 --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/internal/ext.kt @@ -0,0 +1,23 @@ +package io.rsocket.kotlin.transport.nodejs.tcp.internal + +import org.khronos.webgl.* + +internal fun Socket.on( + onData: (data: Uint8Array) -> Unit, + onError: (error: Error) -> Unit, + onClose: (hadError: Boolean) -> Unit +) { + on("data", onData) + on("error", onError) + on("close", onClose) +} + +internal fun createServer( + port: Int, + hostname: String, + onClose: () -> Unit, + listener: (Socket) -> Unit +): Server = createServer(listener).apply { + on("close", onClose) + listen(port, hostname) +} diff --git a/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/internal/nodejs.net.kt b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/internal/nodejs.net.kt new file mode 100644 index 000000000..3f4f7b536 --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsMain/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/internal/nodejs.net.kt @@ -0,0 +1,23 @@ +@file:JsModule("net") +@file:JsNonModule + +package io.rsocket.kotlin.transport.nodejs.tcp.internal + +import org.khronos.webgl.* + +internal external fun connect(port: Int, host: String): Socket +internal external fun createServer(connectionListener: (socket: Socket) -> Unit): Server + +internal external interface Server { + fun listen(port: Int, hostname: String) + fun close(callback: (err: Error) -> Unit) + fun on(event: String /* "close" */, listener: () -> Unit) +} + +internal external interface Socket { + fun write(buffer: Uint8Array): Boolean + fun destroy(error: Error = definedExternally) + fun on(event: String /* "close" */, listener: (had_error: Boolean) -> Unit): Socket + fun on(event: String /* "data" */, listener: (data: Uint8Array) -> Unit): Socket + fun on(event: String /* "error" */, listener: (err: Error) -> Unit): Socket +} diff --git a/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt b/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt new file mode 100644 index 000000000..d32c9144a --- /dev/null +++ b/rsocket-transport-nodejs-tcp/src/jsTest/kotlin/io/rsocket/kotlin/transport/nodejs/tcp/TcpTransportTest.kt @@ -0,0 +1,35 @@ +package io.rsocket.kotlin.transport.nodejs.tcp + +import io.rsocket.kotlin.test.* +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlin.random.* + +object PortProvider { + private val port = atomic(Random.nextInt(20, 90) * 100) + fun next(): Int = port.incrementAndGet() +} + + +class TcpTransportTest : TransportTest() { + private val testJob = Job() + + private lateinit var server: TcpServer + + override suspend fun before() { + val port = PortProvider.next() + server = SERVER.bindIn( + CoroutineScope(testJob + CoroutineExceptionHandler { c, e -> println("$c -> $e") }), + TcpServerTransport(port, "127.0.0.1", InUseTrackingPool), + ACCEPTOR + ) + client = CONNECTOR.connect(TcpClientTransport(port, "127.0.0.1", InUseTrackingPool, testJob)) + } + + override suspend fun after() { + delay(100) //TODO close race + super.after() + testJob.cancelAndJoin() + server.close() + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index 266f39be1..ece5be2a6 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -71,3 +71,5 @@ include("rsocket-transport-ktor-client") include("rsocket-transport-ktor-server") project(":rsocket-transport-ktor-client").projectDir = file("rsocket-transport-ktor/rsocket-transport-ktor-client") project(":rsocket-transport-ktor-server").projectDir = file("rsocket-transport-ktor/rsocket-transport-ktor-server") + +include("rsocket-transport-nodejs-tcp")