Skip to content

Commit

Permalink
Add test testing: Starting a new consumption session with a subscript…
Browse files Browse the repository at this point in the history
…ion which is invalid with the previous ones should fail with `InvalidSubscriptionUnion` error
  • Loading branch information
guizmaii committed Jun 17, 2023
1 parent a75bf02 commit bfbc610
Showing 1 changed file with 88 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import zio.test.Assertion._
import zio.test.TestAspect._
import zio.test._

import java.util.concurrent.atomic.AtomicInteger

//noinspection SimplifyAssertInspection
object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
override val kafkaPrefix: String = "subscriptionsspec"

Expand Down Expand Up @@ -88,22 +91,94 @@ object SubscriptionsSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
_ <- produceMany(topic1, kvs)
_ <- produceMany(topic2, kvs)

result <-
(Consumer
consumer_0 =
Consumer
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
.runCollect zipPar
Consumer
.plainStream(
Subscription.manual(topic2, 1),
Serde.string,
Serde.string
)
.runCollect)
.provideSomeLayer[Kafka & Scope](consumer(client, Some(group)))
.unit
.exit
.runCollect

consumer_1 =
Consumer
.plainStream(
Subscription.manual(topic2, 1), // invalid with the previous subscription
Serde.string,
Serde.string
)
.runCollect

result <- (consumer_0 zipPar consumer_1)
.provideSomeLayer[Kafka & Scope](consumer(client, Some(group)))
.unit
.exit
} yield assert(result)(fails(isSubtype[InvalidSubscriptionUnion](anything)))
},
test(
"gives an error when attempting to subscribe using a manual subscription when there is already a topic subscription and doesn't fail the already running consuming session"
) {
val numberOfMessages = 20
val kvs = (0 to numberOfMessages).toList.map(i => (s"key$i", s"msg$i"))
for {
topic1 <- randomTopic
client <- randomClient
group <- randomGroup

_ <- produceMany(topic1, kvs)

counter = new AtomicInteger(1)

firstMessagesRef <- Ref.make(("", ""))
finalizersRef <- Ref.make(Chunk.empty[String])

consumer_0 =
Consumer
.plainStream(Subscription.topics(topic1), Serde.string, Serde.string)
// Here we delay each message to be sure that `consumer_1` will fail while `consumer_0` is still running
.mapZIO { r =>
firstMessagesRef.updateSome { case ("", v) =>
("First consumer_0 message", v)
} *>
ZIO
.logDebug(s"Consumed ${counter.getAndIncrement()} records")
.delay(10.millis)
.as(r)
}
.take(numberOfMessages.toLong)
.runCollect
.exit
.zipLeft(finalizersRef.update(_ :+ "consumer_0 finalized"))

consumer_1 =
Consumer
.plainStream(
Subscription.manual(topic1, 1), // invalid with the previous subscription
Serde.string,
Serde.string
)
.tapError { _ =>
firstMessagesRef.updateSome { case (v, "") =>
(v, "consumer_1 error")
}
}
.runCollect
.exit
.zipLeft(finalizersRef.update(_ :+ "consumer_1 finalized"))

consumerInstance <- consumer(client, Some(group)).build

fiber_0 <- consumer_0.provideEnvironment(consumerInstance).fork
_ <- ZIO.unit.delay(100.millis) // Wait to be sure that `consumer_0` is running
fiber_1 <- consumer_1.provideEnvironment(consumerInstance).fork

result_0 <- fiber_0.join
result_1 <- fiber_1.join

finalizingOrder <- finalizersRef.get
firstMessages <- firstMessagesRef.get
} yield assert(result_0)(succeeds(hasSize(equalTo(numberOfMessages)))) &&
assert(result_1)(fails(isSubtype[InvalidSubscriptionUnion](anything))) &&
// Here we check that `consumer_0` was running when `consumer_1` failed
assert(firstMessages)(equalTo(("First consumer_0 message", "consumer_1 error"))) &&
assert(finalizingOrder)(equalTo(Chunk("consumer_1 finalized", "consumer_0 finalized")))
} @@ nonFlaky(5),
test("distributes records (randomly) from overlapping subscriptions over all subscribers") {
val kvs = (1 to 500).toList.map(i => (s"key$i", s"msg$i"))
for {
Expand Down

0 comments on commit bfbc610

Please sign in to comment.