From bfbc610bfa9ddb79032de5a9cab492698f55dd4e Mon Sep 17 00:00:00 2001 From: jules Ivanic Date: Sat, 17 Jun 2023 19:07:30 +0400 Subject: [PATCH] Add test testing: Starting a new consumption session with a subscription which is invalid with the previous ones should fail with `InvalidSubscriptionUnion` error --- .../kafka/consumer/SubscriptionsSpec.scala | 101 +++++++++++++++--- 1 file changed, 88 insertions(+), 13 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala index 7fdf5ea187..14dc6352c5 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/SubscriptionsSpec.scala @@ -13,6 +13,9 @@ import zio.test.Assertion._ import zio.test.TestAspect._ import zio.test._ +import java.util.concurrent.atomic.AtomicInteger + +//noinspection SimplifyAssertInspection object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { override val kafkaPrefix: String = "subscriptionsspec" @@ -88,22 +91,94 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { _ <- produceMany(topic1, kvs) _ <- produceMany(topic2, kvs) - result <- - (Consumer + consumer_0 = + Consumer .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) - .runCollect zipPar - Consumer - .plainStream( - Subscription.manual(topic2, 1), - Serde.string, - Serde.string - ) - .runCollect) - .provideSomeLayer[Kafka & Scope](consumer(client, Some(group))) - .unit - .exit + .runCollect + + consumer_1 = + Consumer + .plainStream( + Subscription.manual(topic2, 1), // invalid with the previous subscription + Serde.string, + Serde.string + ) + .runCollect + + result <- (consumer_0 zipPar consumer_1) + .provideSomeLayer[Kafka & Scope](consumer(client, Some(group))) + .unit + .exit } yield assert(result)(fails(isSubtype[InvalidSubscriptionUnion](anything))) }, + test( + "gives an error when attempting to subscribe using a manual subscription when there is already a topic subscription and doesn't fail the already running consuming session" + ) { + val numberOfMessages = 20 + val kvs = (0 to numberOfMessages).toList.map(i => (s"key$i", s"msg$i")) + for { + topic1 <- randomTopic + client <- randomClient + group <- randomGroup + + _ <- produceMany(topic1, kvs) + + counter = new AtomicInteger(1) + + firstMessagesRef <- Ref.make(("", "")) + finalizersRef <- Ref.make(Chunk.empty[String]) + + consumer_0 = + Consumer + .plainStream(Subscription.topics(topic1), Serde.string, Serde.string) + // Here we delay each message to be sure that `consumer_1` will fail while `consumer_0` is still running + .mapZIO { r => + firstMessagesRef.updateSome { case ("", v) => + ("First consumer_0 message", v) + } *> + ZIO + .logDebug(s"Consumed ${counter.getAndIncrement()} records") + .delay(10.millis) + .as(r) + } + .take(numberOfMessages.toLong) + .runCollect + .exit + .zipLeft(finalizersRef.update(_ :+ "consumer_0 finalized")) + + consumer_1 = + Consumer + .plainStream( + Subscription.manual(topic1, 1), // invalid with the previous subscription + Serde.string, + Serde.string + ) + .tapError { _ => + firstMessagesRef.updateSome { case (v, "") => + (v, "consumer_1 error") + } + } + .runCollect + .exit + .zipLeft(finalizersRef.update(_ :+ "consumer_1 finalized")) + + consumerInstance <- consumer(client, Some(group)).build + + fiber_0 <- consumer_0.provideEnvironment(consumerInstance).fork + _ <- ZIO.unit.delay(100.millis) // Wait to be sure that `consumer_0` is running + fiber_1 <- consumer_1.provideEnvironment(consumerInstance).fork + + result_0 <- fiber_0.join + result_1 <- fiber_1.join + + finalizingOrder <- finalizersRef.get + firstMessages <- firstMessagesRef.get + } yield assert(result_0)(succeeds(hasSize(equalTo(numberOfMessages)))) && + assert(result_1)(fails(isSubtype[InvalidSubscriptionUnion](anything))) && + // Here we check that `consumer_0` was running when `consumer_1` failed + assert(firstMessages)(equalTo(("First consumer_0 message", "consumer_1 error"))) && + assert(finalizingOrder)(equalTo(Chunk("consumer_1 finalized", "consumer_0 finalized"))) + } @@ nonFlaky(5), test("distributes records (randomly) from overlapping subscriptions over all subscribers") { val kvs = (1 to 500).toList.map(i => (s"key$i", s"msg$i")) for {