Skip to content

Commit

Permalink
fix some issues
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Mar 12, 2024
1 parent 5e5620b commit eb4677e
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,13 @@ internal suspend fun FlowCollector<Payload>.emitAllWithRequestN(
while (true) {
val result = payloads.receiveCatching()
if (result.isClosed) return result.exceptionOrNull()
emit(result.getOrThrow()) // will never throw
val payload = result.getOrThrow() // will never throw
try {
emit(payload)
} catch (cause: Throwable) {
payload.close()
throw cause
}

@OptIn(DelicateCoroutinesApi::class)
if (requestNs.isClosedForSend) continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class TestConnection : RSocketTransportSession.Sequential, RSocketClientTarget {

init {
coroutineContext.job.invokeOnCompletion {
sendChannel.close(it)
sendChannel.cancelWithCause(it)
receiveChannel.cancelWithCause(it)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ class RSocketTest : SuspendTest, TestWithLeakCheck {
}
requestChannel { init, payloads ->
init.close()
payloads.onEach { it.close() }.launchIn(this)
flow { repeat(10) { emitOrClose(payload("server got -> [$it]")) } }
flow {
coroutineScope {
payloads.onEach { it.close() }.launchIn(this)
repeat(10) { emitOrClose(payload("server got -> [$it]")) }
}
}
}
}
}
Expand Down

0 comments on commit eb4677e

Please sign in to comment.