Skip to content

Commit

Permalink
enable all transport tests, remove delays, cleanup code
Browse files Browse the repository at this point in the history
  • Loading branch information
whyoleg committed Mar 12, 2024
1 parent c6e36b4 commit 5e1faf7
Showing 1 changed file with 56 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,25 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {
@Test
fun fireAndForget10() = test {
(1..10).map { async { client.fireAndForget(payload(it)) } }.awaitAll()
delay(1000) //TODO: leak check
// delay(1000) //TODO: leak check
}

@Test
open fun largePayloadFireAndForget10() = test {
(1..10).map { async { client.fireAndForget(requesterLargePayload) } }.awaitAll()
delay(1000) //TODO: leak check
// delay(1000) //TODO: leak check
}

@Test
fun metadataPush10() = test {
(1..10).map { async { client.metadataPush(packet(requesterData)) } }.awaitAll()
delay(1000) //TODO: leak check
// delay(1000) //TODO: leak check
}

@Test
open fun largePayloadMetadataPush10() = test {
(1..10).map { async { client.metadataPush(packet(requesterLargeData)) } }.awaitAll()
delay(1000) //TODO: leak check
// delay(1000) //TODO: leak check
}

@Test
Expand All @@ -89,106 +89,116 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {

@Test
fun requestChannel1() = test(10.seconds) {
val list = client.requestChannel(payload(0), flowOf(payload(0))).onEach { it.close() }.toList()
assertEquals(1, list.size)
val count =
client.requestChannel(payload(0), flowOf(payload(0)))
.onEach { it.close() }
.count()
assertEquals(1, count)
}

@Test
fun requestChannel3() = test {
val request = flow {
repeat(3) { emit(payload(it)) }
}
val list =
client.requestChannel(payload(0), request).flowOn(PrefetchStrategy(3, 0)).onEach { it.close() }.toList()
assertEquals(3, list.size)
val count =
client.requestChannel(payload(0), request)
.flowOn(PrefetchStrategy(3, 0))
.onEach { it.close() }
.count()
assertEquals(3, count)
}

@Test
open fun largePayloadRequestChannel200() = test {
val request = flow {
repeat(200) { emit(requesterLargePayload) }
}
val list =
val count =
client.requestChannel(requesterLargePayload, request)
.flowOn(PrefetchStrategy(Int.MAX_VALUE, 0))
.onEach { it.close() }
.toList()
assertEquals(200, list.size)
.count()
assertEquals(200, count)
}

@Test
@Ignore //flaky, ignore for now
//@Ignore //flaky, ignore for now
fun requestChannel20000() = test {
val request = flow {
repeat(20_000) { emit(payload(7)) }
}
val list = client.requestChannel(payload(7), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)).onEach {
val count = client.requestChannel(payload(7), request).flowOn(PrefetchStrategy(Int.MAX_VALUE, 0)).onEach {
assertEquals(requesterData, it.data.readText())
assertEquals(requesterMetadata, it.metadata?.readText())
}.toList()
assertEquals(20_000, list.size)
}.count()
assertEquals(20_000, count)
}

@Test
@Ignore //flaky, ignore for now
//@Ignore //flaky, ignore for now
fun requestChannel200000() = test {
val request = flow {
repeat(200_000) { emit(payload(it)) }
}
val list =
client.requestChannel(payload(0), request).flowOn(PrefetchStrategy(10000, 0)).onEach { it.close() }.toList()
assertEquals(200_000, list.size)
val count =
client.requestChannel(payload(0), request)
.flowOn(PrefetchStrategy(10000, 0))
.onEach { it.close() }
.count()
assertEquals(200_000, count)
}

@Test
@Ignore //flaky, ignore for now
//@Ignore //flaky, ignore for now
fun requestChannel16x256() = test {
val request = flow {
repeat(256) {
emit(payload(it))
}
}
(0..16).map {
async(Dispatchers.Default) {
val list = client.requestChannel(payload(0), request).onEach { it.close() }.toList()
assertEquals(256, list.size)
(0..16).map { r ->
async {
val count = client.requestChannel(payload(0), request).onEach { it.close() }.count()
assertEquals(256, count)
}
}.awaitAll()
}

@Test
@Ignore //flaky, ignore for now
//@Ignore //flaky, ignore for now
fun requestChannel256x512() = test {
val request = flow {
repeat(512) {
emit(payload(it))
}
}
(0..256).map {
async(Dispatchers.Default) {
val list = client.requestChannel(payload(0), request).onEach { it.close() }.toList()
assertEquals(512, list.size)
async {
val count = client.requestChannel(payload(0), request).onEach { it.close() }.count()
assertEquals(512, count)
}
}.awaitAll()
}

@Test
@Ignore //flaky, ignore for now
//@Ignore //flaky, ignore for now
fun requestChannel500NoLeak() = test {
val request = flow {
repeat(10_000) { emitOrClose(payload(3)) }
}
val list =
val count =
client
.requestChannel(payload(3), request)
.flowOn(PrefetchStrategy(Int.MAX_VALUE, 0))
.take(500)
.onEach {
assertEquals(requesterData, it.data.readText())
assertEquals(requesterMetadata, it.metadata?.readText())
}.toList()
assertEquals(500, list.size)
delay(1000) //TODO: leak check
}
.count()
assertEquals(500, count)
//(1000) //TODO: leak check
}

@Test
Expand All @@ -212,42 +222,42 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {
}

@Test
@Ignore //flaky, ignore for now
//@Ignore //flaky, ignore for now
fun requestResponse10000() = test {
(1..10000).map { async { client.requestResponse(payload(3)).let(Companion::checkPayload) } }.awaitAll()
}

@Test
@Ignore //flaky, ignore for now
//@Ignore //flaky, ignore for now
fun requestResponse100000() = test {
repeat(100000) { client.requestResponse(payload(3)).let(Companion::checkPayload) }
}

@Test
fun requestStream5() = test {
val list =
client.requestStream(payload(3)).flowOn(PrefetchStrategy(5, 0)).take(5).onEach { checkPayload(it) }.toList()
assertEquals(5, list.size)
val count =
client.requestStream(payload(3)).flowOn(PrefetchStrategy(5, 0)).take(5).onEach { checkPayload(it) }.count()
assertEquals(5, count)
}

@Test
fun requestStream10000() = test {
val list = client.requestStream(payload(3)).onEach { checkPayload(it) }.toList()
assertEquals(10000, list.size)
val count = client.requestStream(payload(3)).onEach { checkPayload(it) }.count()
assertEquals(10000, count)
}

@Test
@Ignore //flaky, ignore for now
//@Ignore //flaky, ignore for now
fun requestStream500NoLeak() = test {
val list =
val count =
client
.requestStream(payload(3))
.flowOn(PrefetchStrategy(Int.MAX_VALUE, 0))
.take(500)
.onEach { checkPayload(it) }
.toList()
assertEquals(500, list.size)
delay(1000) //TODO: leak check
.count()
assertEquals(500, count)
// delay(1000) //TODO: leak check
}

companion object {
Expand Down

0 comments on commit 5e1faf7

Please sign in to comment.