Skip to content

Commit

Permalink
Working nodejs TCP server and client (#191)
Browse files Browse the repository at this point in the history
* Run tests for new transport
* Don't depend on kotlinx.nodejs
* Support publishing plain JS module
  • Loading branch information
olme04 authored Jan 12, 2022
1 parent ebdb0d9 commit 7a9ab2e
Show file tree
Hide file tree
Showing 15 changed files with 278 additions and 174 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/gradle-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,15 @@ jobs:
distributions-cache-enabled: false
dependencies-cache-enabled: false
configuration-cache-enabled: false
- name: Test rsocket-transport-nodejs module
if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure())
timeout-minutes: 15
uses: gradle/gradle-build-action@v1
with:
arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info
distributions-cache-enabled: false
dependencies-cache-enabled: false
configuration-cache-enabled: false

- name: Publish Test Report
if: always()
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/gradle-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ jobs:
distributions-cache-enabled: false
dependencies-cache-enabled: false
configuration-cache-enabled: false
- name: Test rsocket-transport-nodejs module
if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure())
timeout-minutes: 15
uses: gradle/gradle-build-action@v1
with:
arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info
distributions-cache-enabled: false
dependencies-cache-enabled: false
configuration-cache-enabled: false

- name: Publish Test Report
if: always()
Expand Down
9 changes: 9 additions & 0 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ jobs:
distributions-cache-enabled: false
dependencies-cache-enabled: false
configuration-cache-enabled: false
- name: Test rsocket-transport-nodejs module
if: (matrix.target == 'jsIrNode' || matrix.target == 'jsLegacyNode') && (success() || failure())
timeout-minutes: 15
uses: gradle/gradle-build-action@v1
with:
arguments: rsocket-transport-nodejs-tcp:${{ matrix.target }}Test --scan --info
distributions-cache-enabled: false
dependencies-cache-enabled: false
configuration-cache-enabled: false

- name: Publish Test Report
if: always()
Expand Down
10 changes: 7 additions & 3 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,12 @@ subprojects {
project.name != "rsocket-transport-ktor" &&
project.name != "rsocket-transport-ktor-client"

val jsOnly = project.name == "rsocket-transport-nodejs-tcp"
val nodejsOnly = project.name == "rsocket-transport-nodejs-tcp"

if (!isAutoConfigurable) return@configure

jvm {
if (!jsOnly) jvm {
testRuns.all {
executionTask.configure {
// ActiveProcessorCount is used here, to make sure local setup is similar as on CI
Expand All @@ -90,7 +92,7 @@ subprojects {
}
}
}
browser {
if (!nodejsOnly) browser {
testTask {
useKarma {
useConfigDirectory(rootDir.resolve("js").resolve("karma.config.d"))
Expand All @@ -100,6 +102,8 @@ subprojects {
}
}

if (jsOnly) return@configure

//native targets configuration
val linuxTargets = listOf(linuxX64())
val mingwTargets = if (supportMingw) listOf(mingwX64()) else emptyList()
Expand Down Expand Up @@ -317,7 +321,7 @@ subprojects {
}

tasks.matching { it.name == "generatePomFileForKotlinMultiplatformPublication" }.configureEach {
dependsOn(tasks["generatePomFileForJvmPublication"])
tasks.findByName("generatePomFileForJvmPublication")?.let { dependsOn(it) }
}
}
}
Expand Down
160 changes: 0 additions & 160 deletions examples/nodejs-tcp-transport/src/jsMain/kotlin/Server.kt

This file was deleted.

1 change: 0 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ kotlinxCoroutinesVersion=1.5.2-native-mt
kotlinxAtomicfuVersion=0.17.0
kotlinxSerializationVersion=1.3.1
kotlinxBenchmarkVersion=0.4.0
kotlinxNodejsVersion=0.0.7
rsocketJavaVersion=1.1.1
turbineVersion=0.7.0
artifactoryVersion=4.25.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,18 @@

plugins {
kotlin("multiplatform")
}
id("kotlinx-atomicfu")

val kotlinxNodejsVersion: String by rootProject
signing
`maven-publish`
id("com.jfrog.artifactory")
}

kotlin {
js(IR) {
nodejs {
binaries.executable()
}
}

sourceSets {
val jsMain by getting {
dependencies {
implementation(project(":rsocket-core"))
implementation("org.jetbrains.kotlinx:kotlinx-nodejs:$kotlinxNodejsVersion")
api(project(":rsocket-core"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.rsocket.kotlin.transport.nodejs.tcp

import io.ktor.utils.io.core.*
import io.rsocket.kotlin.frame.io.*

internal fun ByteReadPacket.withLength(): ByteReadPacket = buildPacket {
@Suppress("INVISIBLE_MEMBER") writeLength(this@withLength.remaining.toInt())
writePacket(this@withLength)
}

internal class FrameWithLengthAssembler(private val onFrame: (frame: ByteReadPacket) -> Unit) {
private var expectedFrameLength = 0 //TODO atomic for native
private val packetBuilder: BytePacketBuilder = BytePacketBuilder()
inline fun write(write: BytePacketBuilder.() -> Unit) {
packetBuilder.write()
loop()
}

private fun loop() {
while (true) when {
expectedFrameLength == 0 && packetBuilder.size < 3 -> return // no length
expectedFrameLength == 0 -> withTemp { // has length
expectedFrameLength = @Suppress("INVISIBLE_MEMBER") it.readLength()
if (it.remaining >= expectedFrameLength) build(it) // if has length and frame
}
packetBuilder.size < expectedFrameLength -> return // not enough bytes to read frame
else -> withTemp { build(it) } // enough bytes to read frame
}
}

private fun build(from: ByteReadPacket) {
val frame = buildPacket {
writePacket(from, expectedFrameLength)
}
expectedFrameLength = 0
onFrame(frame)
}

private inline fun withTemp(block: (tempPacket: ByteReadPacket) -> Unit) {
val tempPacket = packetBuilder.build()
block(tempPacket)
packetBuilder.writePacket(tempPacket)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.rsocket.kotlin.transport.nodejs.tcp

import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.*
import io.rsocket.kotlin.transport.*
import io.rsocket.kotlin.transport.nodejs.tcp.internal.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

public class TcpClientTransport(
private val port: Int,
private val hostname: String,
private val pool: ObjectPool<ChunkBuffer> = ChunkBuffer.Pool,
coroutineContext: CoroutineContext = EmptyCoroutineContext
) : ClientTransport {

override val coroutineContext: CoroutineContext = coroutineContext + SupervisorJob(coroutineContext[Job])

@TransportApi
override suspend fun connect(): Connection {
val socket = connect(port, hostname)
return TcpConnection(coroutineContext, pool, socket)
}
}
Loading

0 comments on commit 7a9ab2e

Please sign in to comment.