From 5e1faf757607193f959ddde11c5050553fd04f15 Mon Sep 17 00:00:00 2001 From: Oleg Yukhnevich Date: Tue, 12 Mar 2024 20:48:22 +0200 Subject: [PATCH] enable all transport tests, remove delays, cleanup code --- .../kotlin/transport/tests/TransportTest.kt | 102 ++++++++++-------- 1 file changed, 56 insertions(+), 46 deletions(-) diff --git a/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt b/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt index 24d3ed6d..729bab7e 100644 --- a/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt +++ b/rsocket-transport-tests/src/commonMain/kotlin/io/rsocket/kotlin/transport/tests/TransportTest.kt @@ -60,25 +60,25 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { @Test fun fireAndForget10() = test { (1..10).map { async { client.fireAndForget(payload(it)) } }.awaitAll() - delay(1000) //TODO: leak check +// delay(1000) //TODO: leak check } @Test open fun largePayloadFireAndForget10() = test { (1..10).map { async { client.fireAndForget(requesterLargePayload) } }.awaitAll() - delay(1000) //TODO: leak check +// delay(1000) //TODO: leak check } @Test fun metadataPush10() = test { (1..10).map { async { client.metadataPush(packet(requesterData)) } }.awaitAll() - delay(1000) //TODO: leak check +// delay(1000) //TODO: leak check } @Test open fun largePayloadMetadataPush10() = test { (1..10).map { async { client.metadataPush(packet(requesterLargeData)) } }.awaitAll() - delay(1000) //TODO: leak check +// delay(1000) //TODO: leak check } @Test @@ -89,8 +89,11 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { @Test fun requestChannel1() = test(10.seconds) { - val list = client.requestChannel(payload(0), flowOf(payload(0))).onEach { it.close() }.toList() - assertEquals(1, list.size) + val count = + client.requestChannel(payload(0), flowOf(payload(0))) + .onEach { it.close() } + .count() + assertEquals(1, count) } @Test @@ -98,9 +101,12 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { val request = flow { repeat(3) { emit(payload(it)) } } - val list = - client.requestChannel(payload(0), request).flowOn(PrefetchStrategy(3, 0)).onEach { it.close() }.toList() - assertEquals(3, list.size) + val count = + client.requestChannel(payload(0), request) + .flowOn(PrefetchStrategy(3, 0)) + .onEach { it.close() } + .count() + assertEquals(3, count) } @Test @@ -108,56 +114,59 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { val request = flow { repeat(200) { emit(requesterLargePayload) } } - val list = + val count = client.requestChannel(requesterLargePayload, request) .flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)) .onEach { it.close() } - .toList() - assertEquals(200, list.size) + .count() + assertEquals(200, count) } @Test - @Ignore //flaky, ignore for now + //@Ignore //flaky, ignore for now fun requestChannel20000() = test { val request = flow { repeat(20_000) { emit(payload(7)) } } - val list = client.requestChannel(payload(7), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)).onEach { + val count = client.requestChannel(payload(7), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)).onEach { assertEquals(requesterData, it.data.readText()) assertEquals(requesterMetadata, it.metadata?.readText()) - }.toList() - assertEquals(20_000, list.size) + }.count() + assertEquals(20_000, count) } @Test - @Ignore //flaky, ignore for now + //@Ignore //flaky, ignore for now fun requestChannel200000() = test { val request = flow { repeat(200_000) { emit(payload(it)) } } - val list = - client.requestChannel(payload(0), request).flowOn(PrefetchStrategy(10000, 0)).onEach { it.close() }.toList() - assertEquals(200_000, list.size) + val count = + client.requestChannel(payload(0), request) + .flowOn(PrefetchStrategy(10000, 0)) + .onEach { it.close() } + .count() + assertEquals(200_000, count) } @Test - @Ignore //flaky, ignore for now + //@Ignore //flaky, ignore for now fun requestChannel16x256() = test { val request = flow { repeat(256) { emit(payload(it)) } } - (0..16).map { - async(Dispatchers.Default) { - val list = client.requestChannel(payload(0), request).onEach { it.close() }.toList() - assertEquals(256, list.size) + (0..16).map { r -> + async { + val count = client.requestChannel(payload(0), request).onEach { it.close() }.count() + assertEquals(256, count) } }.awaitAll() } @Test - @Ignore //flaky, ignore for now + //@Ignore //flaky, ignore for now fun requestChannel256x512() = test { val request = flow { repeat(512) { @@ -165,20 +174,20 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { } } (0..256).map { - async(Dispatchers.Default) { - val list = client.requestChannel(payload(0), request).onEach { it.close() }.toList() - assertEquals(512, list.size) + async { + val count = client.requestChannel(payload(0), request).onEach { it.close() }.count() + assertEquals(512, count) } }.awaitAll() } @Test - @Ignore //flaky, ignore for now + //@Ignore //flaky, ignore for now fun requestChannel500NoLeak() = test { val request = flow { repeat(10_000) { emitOrClose(payload(3)) } } - val list = + val count = client .requestChannel(payload(3), request) .flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)) @@ -186,9 +195,10 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { .onEach { assertEquals(requesterData, it.data.readText()) assertEquals(requesterMetadata, it.metadata?.readText()) - }.toList() - assertEquals(500, list.size) - delay(1000) //TODO: leak check + } + .count() + assertEquals(500, count) + //(1000) //TODO: leak check } @Test @@ -212,42 +222,42 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck { } @Test - @Ignore //flaky, ignore for now + //@Ignore //flaky, ignore for now fun requestResponse10000() = test { (1..10000).map { async { client.requestResponse(payload(3)).let(Companion::checkPayload) } }.awaitAll() } @Test - @Ignore //flaky, ignore for now + //@Ignore //flaky, ignore for now fun requestResponse100000() = test { repeat(100000) { client.requestResponse(payload(3)).let(Companion::checkPayload) } } @Test fun requestStream5() = test { - val list = - client.requestStream(payload(3)).flowOn(PrefetchStrategy(5, 0)).take(5).onEach { checkPayload(it) }.toList() - assertEquals(5, list.size) + val count = + client.requestStream(payload(3)).flowOn(PrefetchStrategy(5, 0)).take(5).onEach { checkPayload(it) }.count() + assertEquals(5, count) } @Test fun requestStream10000() = test { - val list = client.requestStream(payload(3)).onEach { checkPayload(it) }.toList() - assertEquals(10000, list.size) + val count = client.requestStream(payload(3)).onEach { checkPayload(it) }.count() + assertEquals(10000, count) } @Test - @Ignore //flaky, ignore for now + //@Ignore //flaky, ignore for now fun requestStream500NoLeak() = test { - val list = + val count = client .requestStream(payload(3)) .flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)) .take(500) .onEach { checkPayload(it) } - .toList() - assertEquals(500, list.size) - delay(1000) //TODO: leak check + .count() + assertEquals(500, count) +// delay(1000) //TODO: leak check } companion object {