From 958b0a57a9ea3b3eddcf04115626be47b48417fc Mon Sep 17 00:00:00 2001 From: Steven Vroonland Date: Sun, 21 Apr 2024 09:46:00 +0200 Subject: [PATCH] Update pendingRequests and assignedStreams --- .../test/scala/zio/kafka/consumer/ConsumerSpec.scala | 2 +- .../src/main/scala/zio/kafka/consumer/Consumer.scala | 7 ++++--- .../scala/zio/kafka/consumer/internal/Runloop.scala | 11 ++++++++++- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala index ea5d45a670..90ca5e7fa7 100644 --- a/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala +++ b/zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala @@ -374,7 +374,7 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom { Serde.string ) { stream => stream - .flatMapPar(Int.MaxValue) { case (tp, partitionStream) => + .flatMapPar(Int.MaxValue) { case (_, partitionStream) => partitionStream.mapConcatZIO { record => for { nr <- messagesReceived.updateAndGet(_ + 1) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index 19a8eaecdd..f571ee3817 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -697,7 +697,9 @@ private[consumer] final class ConsumerLive private[consumer] ( ): ZIO[R, Throwable, Any] = partitionedStreamWithGracefulShutdown(subscription, keyDeserializer, valueDeserializer, shutdownTimeout) { partitionedStream => - withStream(partitionedStream.flatMapPar(n = Int.MaxValue, bufferSize = bufferSize)(_._2)) + withStream( + partitionedStream.flatMapPar(n = Int.MaxValue, bufferSize = bufferSize)(_._2) + ) } /** @@ -724,8 +726,7 @@ private[consumer] final class ConsumerLive private[consumer] ( for { control <- streamControl fib <- withStream(control.stream) - .onInterrupt(ZIO.logError("withStream in runWithGracefulShutdown interrupted, this should not happen")) - .forkDaemon + .onInterrupt(ZIO.logError("withStream in runWithGracefulShutdown interrupted, this should not happen")).forkScoped result <- fib.join.onInterrupt( control.stop *> fib.join diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index 30c56f134d..54de0de14e 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -651,7 +651,16 @@ private[consumer] final class Runloop private ( case RunloopCommand.EndStreamsBySubscription(subscription, cont) => ZIO.foreachDiscard( state.assignedStreams.filter(stream => Subscription.subscriptionMatches(subscription, stream.tp)) - )(_.end) *> cont.succeed(()).as(state) + )(_.end) *> cont + .succeed(()) + .as( + state.copy( + pendingRequests = + state.pendingRequests.filterNot(req => Subscription.subscriptionMatches(subscription, req.tp)), + assignedStreams = + state.assignedStreams.filterNot(stream => Subscription.subscriptionMatches(subscription, stream.tp)) + ) + ) case RunloopCommand.RemoveSubscription(subscription) => state.subscriptionState match {