Skip to content

Commit

Permalink
Remove usages of deprecated in ktor APIs
Browse files Browse the repository at this point in the history
* introduce BufferPool as a placeholder for something that will (or will not) available after migration to ktor 3.0 and kotlinx-io
  • Loading branch information
whyoleg committed Mar 2, 2024
1 parent 45f7158 commit 2ccd4c7
Show file tree
Hide file tree
Showing 78 changed files with 366 additions and 428 deletions.
91 changes: 45 additions & 46 deletions rsocket-core/api/rsocket-core.api

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,28 +17,25 @@
package io.rsocket.kotlin

import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import kotlinx.coroutines.*

/**
* That interface isn't stable for inheritance.
*/
@TransportApi
public interface Connection : CoroutineScope {
public val pool: ObjectPool<ChunkBuffer> get() = ChunkBuffer.Pool

public suspend fun send(packet: ByteReadPacket)
public suspend fun receive(): ByteReadPacket
}

@OptIn(TransportApi::class)
internal suspend inline fun <T> Connection.receiveFrame(block: (frame: Frame) -> T): T =
internal suspend inline fun <T> Connection.receiveFrame(pool: BufferPool, block: (frame: Frame) -> T): T =
receive().readFrame(pool).closeOnError(block)

@OptIn(TransportApi::class)
internal suspend fun Connection.sendFrame(frame: Frame) {
internal suspend fun Connection.sendFrame(pool: BufferPool, frame: Frame) {
frame.toPacket(pool).closeOnError { send(it) }
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@ import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
Expand All @@ -32,6 +33,7 @@ public class RSocketConnector internal constructor(
private val connectionConfigProvider: () -> ConnectionConfig,
private val acceptor: ConnectionAcceptor,
private val reconnectPredicate: ReconnectPredicate?,
private val bufferPool: BufferPool,
) {

public suspend fun connect(transport: ClientTransport): RSocket = when (reconnectPredicate) {
Expand Down Expand Up @@ -68,9 +70,10 @@ public class RSocketConnector internal constructor(
maxFragmentSize = maxFragmentSize,
interceptors = interceptors,
connectionConfig = connectionConfig,
acceptor = acceptor
acceptor = acceptor,
bufferPool = bufferPool
)
connection.sendFrame(setupFrame)
connection.sendFrame(bufferPool, setupFrame)
return requester
} catch (cause: Throwable) {
connectionConfig.setupPayload.close()
Expand All @@ -82,5 +85,5 @@ public class RSocketConnector internal constructor(

private fun Connection.wrapConnection(): Connection =
interceptors.wrapConnection(this)
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"))
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"), bufferPool)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@ package io.rsocket.kotlin.core

import io.rsocket.kotlin.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.keepalive.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.payload.*
Expand All @@ -35,6 +36,9 @@ public class RSocketConnectorBuilder internal constructor() {
field = value
}

@Deprecated("Only for tests in rsocket", level = DeprecationLevel.ERROR)
public var bufferPool: BufferPool = BufferPool.Default

private val connectionConfig: ConnectionConfigBuilder = ConnectionConfigBuilder()
private val interceptors: InterceptorsBuilder = InterceptorsBuilder()
private var acceptor: ConnectionAcceptor? = null
Expand Down Expand Up @@ -108,7 +112,8 @@ public class RSocketConnectorBuilder internal constructor() {
interceptors.build(),
connectionConfig.producer(),
acceptor ?: defaultAcceptor,
reconnectPredicate
reconnectPredicate,
@Suppress("DEPRECATION_ERROR") bufferPool
)

private companion object {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@ import io.rsocket.kotlin.*
import io.rsocket.kotlin.frame.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.logging.*
import io.rsocket.kotlin.transport.*
import kotlinx.coroutines.*
Expand All @@ -29,6 +30,7 @@ public class RSocketServer internal constructor(
private val loggerFactory: LoggerFactory,
private val maxFragmentSize: Int,
private val interceptors: Interceptors,
private val bufferPool: BufferPool,
) {

@DelicateCoroutinesApi
Expand All @@ -47,7 +49,7 @@ public class RSocketServer internal constructor(
}
}

private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame { setupFrame ->
private suspend fun Connection.bind(acceptor: ConnectionAcceptor): Job = receiveFrame(bufferPool) { setupFrame ->
when {
setupFrame !is SetupFrame -> failSetup(RSocketError.Setup.Invalid("Invalid setup frame: ${setupFrame.type}"))
setupFrame.version != Version.Current -> failSetup(RSocketError.Setup.Unsupported("Unsupported version: ${setupFrame.version}"))
Expand All @@ -64,7 +66,8 @@ public class RSocketServer internal constructor(
payloadMimeType = setupFrame.payloadMimeType,
setupPayload = setupFrame.payload
),
acceptor = acceptor
acceptor = acceptor,
bufferPool = bufferPool
)
coroutineContext.job
} catch (e: Throwable) {
Expand All @@ -75,13 +78,13 @@ public class RSocketServer internal constructor(

@Suppress("SuspendFunctionOnCoroutineScope")
private suspend fun Connection.failSetup(error: RSocketError.Setup): Nothing {
sendFrame(ErrorFrame(0, error))
sendFrame(bufferPool, ErrorFrame(0, error))
cancel("Connection establishment failed", error)
throw error
}

private fun Connection.wrapConnection(): Connection =
interceptors.wrapConnection(this)
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"))
.logging(loggerFactory.logger("io.rsocket.kotlin.frame"), bufferPool)

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package io.rsocket.kotlin.core

import io.rsocket.kotlin.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.logging.*

public class RSocketServerBuilder internal constructor() {
Expand All @@ -30,14 +31,22 @@ public class RSocketServerBuilder internal constructor() {
field = value
}

@Deprecated("Only for tests in rsocket", level = DeprecationLevel.ERROR)
public var bufferPool: BufferPool = BufferPool.Default

private val interceptors: InterceptorsBuilder = InterceptorsBuilder()

public fun interceptors(configure: InterceptorsBuilder.() -> Unit) {
interceptors.configure()
}

@OptIn(RSocketLoggingApi::class)
internal fun build(): RSocketServer = RSocketServer(loggerFactory, maxFragmentSize, interceptors.build())
internal fun build(): RSocketServer = RSocketServer(
loggerFactory,
maxFragmentSize,
interceptors.build(),
@Suppress("DEPRECATION_ERROR") bufferPool
)
}

public fun RSocketServer(configure: RSocketServerBuilder.() -> Unit = {}): RSocketServer {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,8 @@
package io.rsocket.kotlin.frame

import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.io.*
import io.rsocket.kotlin.payload.*

internal class ExtensionFrame(
Expand Down Expand Up @@ -49,7 +48,7 @@ internal class ExtensionFrame(
}
}

internal fun ByteReadPacket.readExtension(pool: ObjectPool<ChunkBuffer>, streamId: Int, flags: Int): ExtensionFrame {
internal fun ByteReadPacket.readExtension(pool: BufferPool, streamId: Int, flags: Int): ExtensionFrame {
val extendedType = readInt()
val payload = readPayload(pool, flags)
return ExtensionFrame(streamId, extendedType, payload)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,8 @@
package io.rsocket.kotlin.frame

import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.io.*

private const val FlagsMask: Int = 1023
private const val FrameTypeShift: Int = 10
Expand All @@ -33,9 +32,9 @@ internal sealed class Frame : Closeable {
protected abstract fun StringBuilder.appendFlags()
protected abstract fun StringBuilder.appendSelf()

internal fun toPacket(pool: ObjectPool<ChunkBuffer>): ByteReadPacket {
internal fun toPacket(pool: BufferPool): ByteReadPacket {
check(type.canHaveMetadata || !(flags check Flags.Metadata)) { "bad value for metadata flag" }
return buildPacket(pool) {
return pool.buildPacket {
writeInt(streamId)
writeShort((type.encodedType shl FrameTypeShift or flags).toShort())
writeSelf()
Expand All @@ -54,7 +53,7 @@ internal sealed class Frame : Closeable {
}
}

internal fun ByteReadPacket.readFrame(pool: ObjectPool<ChunkBuffer>): Frame = use {
internal fun ByteReadPacket.readFrame(pool: BufferPool): Frame = use {
val streamId = readInt()
val typeAndFlags = readShort().toInt() and 0xFFFF
val flags = typeAndFlags and FlagsMask
Expand All @@ -75,9 +74,11 @@ internal fun ByteReadPacket.readFrame(pool: ObjectPool<ChunkBuffer>): Frame = us
FrameType.RequestFnF,
FrameType.RequestResponse,
-> readRequest(pool, type, streamId, flags, withInitial = false)

FrameType.RequestStream,
FrameType.RequestChannel,
-> readRequest(pool, type, streamId, flags, withInitial = true)

FrameType.Reserved -> error("Reserved")
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,8 @@
package io.rsocket.kotlin.frame

import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.io.*

private const val RespondFlag = 128

Expand Down Expand Up @@ -51,7 +50,7 @@ internal class KeepAliveFrame(
}
}

internal fun ByteReadPacket.readKeepAlive(pool: ObjectPool<ChunkBuffer>, flags: Int): KeepAliveFrame {
internal fun ByteReadPacket.readKeepAlive(pool: BufferPool, flags: Int): KeepAliveFrame {
val respond = flags check RespondFlag
val lastPosition = readLong()
val data = readPacket(pool)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,8 @@
package io.rsocket.kotlin.frame

import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.io.*

internal class LeaseFrame(
val ttl: Int,
Expand Down Expand Up @@ -50,7 +49,7 @@ internal class LeaseFrame(
}
}

internal fun ByteReadPacket.readLease(pool: ObjectPool<ChunkBuffer>, flags: Int): LeaseFrame {
internal fun ByteReadPacket.readLease(pool: BufferPool, flags: Int): LeaseFrame {
val ttl = readInt()
val numberOfRequests = readInt()
val metadata = if (flags check Flags.Metadata) readMetadata(pool) else null
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,8 @@
package io.rsocket.kotlin.frame

import io.ktor.utils.io.core.*
import io.ktor.utils.io.core.internal.*
import io.ktor.utils.io.pool.*
import io.rsocket.kotlin.frame.io.*
import io.rsocket.kotlin.internal.io.*

internal class MetadataPushFrame(
val metadata: ByteReadPacket,
Expand All @@ -45,5 +44,5 @@ internal class MetadataPushFrame(
}
}

internal fun ByteReadPacket.readMetadataPush(pool: ObjectPool<ChunkBuffer>): MetadataPushFrame =
internal fun ByteReadPacket.readMetadataPush(pool: BufferPool): MetadataPushFrame =
MetadataPushFrame(readPacket(pool))
Loading

0 comments on commit 2ccd4c7

Please sign in to comment.