Skip to content

Commit

Permalink
working nodejs TCP server and client
Browse files Browse the repository at this point in the history
* not published yet, because of transitive dependency on kotlinx.nodejs, which is only on jcenter
  • Loading branch information
olme04 committed Dec 13, 2021
1 parent 1115e7f commit 7e580b9
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 168 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/gradle-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ jobs:
distributions-cache-enabled: false
dependencies-cache-enabled: false
configuration-cache-enabled: false
- name: Test rsocket-transport-nodejs module
if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure())
timeout-minutes: 15
uses: gradle/gradle-build-action@v1
with:
arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info
distributions-cache-enabled: false
dependencies-cache-enabled: false
configuration-cache-enabled: false

- name: Publish Test Report
if: always()
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/gradle-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ jobs:
distributions-cache-enabled: false
dependencies-cache-enabled: false
configuration-cache-enabled: false
- name: Test rsocket-transport-nodejs module
if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure())
timeout-minutes: 15
uses: gradle/gradle-build-action@v1
with:
arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info
distributions-cache-enabled: false
dependencies-cache-enabled: false
configuration-cache-enabled: false

- name: Publish Test Report
if: always()
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ jobs:
distributions-cache-enabled: false
dependencies-cache-enabled: false
configuration-cache-enabled: false
- name: Test rsocket-transport-nodejs module
if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure())
timeout-minutes: 15
uses: gradle/gradle-build-action@v1
with:
arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info
distributions-cache-enabled: false
dependencies-cache-enabled: false
configuration-cache-enabled: false

- name: Publish Test Report
if: always()
Expand Down
8 changes: 6 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,12 @@ subprojects {
project.name != "rsocket-transport-ktor" &&
project.name != "rsocket-transport-ktor-client"

val jsOnly = project.name == "rsocket-transport-nodejs-tcp"
val nodejsOnly = project.name == "rsocket-transport-nodejs-tcp"

if (!isAutoConfigurable) return@configure

jvm {
if (!jsOnly) jvm {
testRuns.all {
executionTask.configure {
// ActiveProcessorCount is used here, to make sure local setup is similar as on CI
Expand All @@ -97,7 +99,7 @@ subprojects {
}
}
}
browser {
if (!nodejsOnly) browser {
testTask {
useKarma {
useConfigDirectory(rootDir.resolve("js").resolve("karma.config.d"))
Expand All @@ -107,6 +109,8 @@ subprojects {
}
}

if (jsOnly) return@configure

//native targets configuration
val linuxTargets = listOf(linuxX64())
val mingwTargets = if (supportMingw) listOf(mingwX64()) else emptyList()
Expand Down
160 changes: 0 additions & 160 deletions examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,12 @@

plugins {
kotlin("multiplatform")
id("kotlinx-atomicfu")
}

val kotlinxNodejsVersion: String by rootProject

kotlin {
js(IR) {
nodejs {
binaries.executable()
}
}

sourceSets {
val jsMain by getting {
dependencies {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.rsocket.kotlin.transport.nodejs.tcp

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.frame.io.*

internal fun ByteReadPacket.withLength(): ByteReadPacket = buildPacket {
@Suppress("INVISIBLE_MEMBER") writeLength(this@withLength.remaining.toInt())
writePacket(this@withLength)
}

internal class FrameWithLengthAssembler(private val onFrame: (frame: ByteReadPacket) -> Unit) {
private var expectedFrameLength = 0 //TODO atomic for native
private val packetBuilder: BytePacketBuilder = BytePacketBuilder()
inline fun write(write: BytePacketBuilder.() -> Unit) {
packetBuilder.write()
loop()
}

private fun loop() {
while (true) when {
expectedFrameLength == 0 && packetBuilder.size < 3 -> return // no length
expectedFrameLength == 0 -> withTemp { // has length
expectedFrameLength = @Suppress("INVISIBLE_MEMBER") it.readLength()
if (it.remaining >= expectedFrameLength) build(it) // if has length and frame
}
packetBuilder.size < expectedFrameLength -> return // not enough bytes to read frame
else -> withTemp { build(it) } // enough bytes to read frame
}
}

private fun build(from: ByteReadPacket) {
val frame = buildPacket {
writePacket(from, expectedFrameLength)
}
expectedFrameLength = 0
onFrame(frame)
}

private inline fun withTemp(block: (tempPacket: ByteReadPacket) -> Unit) {
val tempPacket = packetBuilder.build()
block(tempPacket)
packetBuilder.writePacket(tempPacket)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.rsocket.kotlin.transport.nodejs.tcp

import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

public class TcpClientTransport(
private val port: Int,
private val hostname: String,
private val pool: ObjectPool<ChunkBuffer> = ChunkBuffer.Pool,
coroutineContext: CoroutineContext = EmptyCoroutineContext
) : ClientTransport {

override val coroutineContext: CoroutineContext = coroutineContext + SupervisorJob(coroutineContext[Job])

@TransportApi
override suspend fun connect(): Connection {
val socket = net.connect(port, hostname)
return TcpConnection(coroutineContext, pool, socket)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.rsocket.kotlin.transport.nodejs.tcp

import Buffer
import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.js.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.internal.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import net.*
import org.khronos.webgl.*
import kotlin.coroutines.*

@TransportApi
internal class TcpConnection(
override val coroutineContext: CoroutineContext,
override val pool: ObjectPool<ChunkBuffer>,
private val socket: Socket
) : Connection {

private val sendChannel = @Suppress("INVISIBLE_MEMBER") SafeChannel<ByteReadPacket>(8)
private val receiveChannel = @Suppress("INVISIBLE_MEMBER") SafeChannel<ByteReadPacket>(Channel.UNLIMITED)

init {
launch {
sendChannel.consumeEach { packet ->
socket.write(Uint8Array(packet.withLength().readArrayBuffer()))
}
}

coroutineContext.job.invokeOnCompletion {
when (it) {
null -> socket.destroy()
else -> socket.destroy(Error(it.message, it.cause))
}
}

val frameAssembler = FrameWithLengthAssembler { receiveChannel.trySend(it) } //TODO
socket.on("data") { buffer: Buffer ->
frameAssembler.write { writeFully(buffer.buffer) }
}
socket.on("error") { error: Error ->
coroutineContext.job.cancel("Socket error", error)
}
socket.on("close") { hadError: Boolean ->
if (!hadError) coroutineContext.job.cancel("Socket closed")
}
}

override suspend fun send(packet: ByteReadPacket) {
sendChannel.send(packet)
}

override suspend fun receive(): ByteReadPacket {
return receiveChannel.receive()
}
}
Loading

0 comments on commit 7e580b9

Please sign in to comment.