From 9cd0c4ee2cf003b21190fd2132b7ba7ef27f3a21 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Tue, 14 Nov 2023 08:08:12 +0100 Subject: [PATCH] Protect agains user diagnostics, better rebalance events (#1102) Diagnostics is a feature of zio-kafka that allows users to listen to key events. Since zio-kafka calls out to the user's implementation of the Diagnostics trait, there are no guarantees on how well it behaves. This is even more important inside the rebalance listener where we (soon, with #1098) run on the same-thread-runtime and can not afford to be switched to another thread by ZIO operations that are normally safe to use. To protect against these issues the user's diagnostics implementation is run on a separate fiber, feeding from a queue of events. In addition, the rebalance events are replaced by a single event which is emitted from outside the rebalance listener. The new event gives the full picture of a rebalance, including which streams were ended. Previously it was not clear which rebalance events belonged to the same rebalance. **Breaking change** Since the rebalance events are changed, this is a breaking change. --- .../scala/zio/kafka/consumer/Consumer.scala | 17 +++++++++--- .../diagnostics/DiagnosticEvent.scala | 12 ++++----- .../consumer/diagnostics/Diagnostics.scala | 27 +++++++++++++++++-- .../zio/kafka/consumer/internal/Runloop.scala | 15 +++++++---- 4 files changed, 54 insertions(+), 17 deletions(-) diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala index a76169763b..b14e759e66 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala @@ -10,6 +10,7 @@ import org.apache.kafka.common._ import zio._ import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization import zio.kafka.consumer.diagnostics.Diagnostics +import zio.kafka.consumer.diagnostics.Diagnostics.ConcurrentDiagnostics import zio.kafka.consumer.internal.{ ConsumerAccess, RunloopAccess } import zio.kafka.serde.{ Deserializer, Serde } import zio.kafka.utils.SslHelper @@ -175,15 +176,23 @@ object Consumer { } yield consumer } + /** + * A new consumer. + * + * @param diagnostics + * an optional callback for key events in the consumer life-cycle. The callbacks will be executed in a separate + * fiber. Since the events are queued, failure to handle these events leads to out of memory errors + */ def make( settings: ConsumerSettings, diagnostics: Diagnostics = Diagnostics.NoOp ): ZIO[Scope, Throwable, Consumer] = for { - _ <- ZIO.addFinalizer(diagnostics.emit(Finalization.ConsumerFinalized)) - _ <- SslHelper.validateEndpoint(settings.driverSettings) - consumerAccess <- ConsumerAccess.make(settings) - runloopAccess <- RunloopAccess.make(settings, consumerAccess, diagnostics) + wrappedDiagnostics <- ConcurrentDiagnostics.make(diagnostics) + _ <- ZIO.addFinalizer(wrappedDiagnostics.emit(Finalization.ConsumerFinalized)) + _ <- SslHelper.validateEndpoint(settings.driverSettings) + consumerAccess <- ConsumerAccess.make(settings) + runloopAccess <- RunloopAccess.make(settings, consumerAccess, wrappedDiagnostics) } yield new ConsumerLive(consumerAccess, runloopAccess) /** diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala index a268c5c6cb..ea97c6f80d 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/DiagnosticEvent.scala @@ -20,12 +20,12 @@ object DiagnosticEvent { final case class Failure(offsets: Map[TopicPartition, OffsetAndMetadata], cause: Throwable) extends Commit } - sealed trait Rebalance extends DiagnosticEvent - object Rebalance { - final case class Revoked(partitions: Set[TopicPartition]) extends Rebalance - final case class Assigned(partitions: Set[TopicPartition]) extends Rebalance - final case class Lost(partitions: Set[TopicPartition]) extends Rebalance - } + final case class Rebalance( + revoked: Set[TopicPartition], + assigned: Set[TopicPartition], + lost: Set[TopicPartition], + ended: Set[TopicPartition] + ) extends DiagnosticEvent sealed trait Finalization extends DiagnosticEvent object Finalization { diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/Diagnostics.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/Diagnostics.scala index 3c5bbe7bee..653ce92395 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/Diagnostics.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/diagnostics/Diagnostics.scala @@ -1,6 +1,8 @@ package zio.kafka.consumer.diagnostics -import zio.{ Queue, Scope, UIO, ZIO } +import zio.stream.ZStream +import zio._ +import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization.ConsumerFinalized trait Diagnostics { def emit(event: => DiagnosticEvent): UIO[Unit] @@ -10,11 +12,32 @@ object Diagnostics { override def emit(event: => DiagnosticEvent): UIO[Unit] = ZIO.unit } - final case class SlidingQueue private (queue: Queue[DiagnosticEvent]) extends Diagnostics { + final case class SlidingQueue private[Diagnostics] (queue: Queue[DiagnosticEvent]) extends Diagnostics { override def emit(event: => DiagnosticEvent): UIO[Unit] = queue.offer(event).unit } + object SlidingQueue { def make(queueSize: Int = 16): ZIO[Scope, Nothing, SlidingQueue] = ZIO.acquireRelease(Queue.sliding[DiagnosticEvent](queueSize))(_.shutdown).map(SlidingQueue(_)) } + + object ConcurrentDiagnostics { + + /** + * @return + * a `Diagnostics` that runs the wrapped `Diagnostics` concurrently in a separate fiber. Events are emitting to + * the fiber via an unbounded queue + */ + def make(wrapped: Diagnostics): ZIO[Scope, Nothing, Diagnostics] = + if (wrapped == Diagnostics.NoOp) ZIO.succeed(Diagnostics.NoOp) + else { + for { + queue <- ZIO.acquireRelease(Queue.unbounded[DiagnosticEvent])(_.shutdown) + fib <- ZStream.fromQueue(queue).tap(wrapped.emit(_)).takeUntil(_ == ConsumerFinalized).runDrain.forkScoped + _ <- ZIO.addFinalizer(queue.offer(ConsumerFinalized) *> fib.await) + } yield new Diagnostics { + override def emit(event: => DiagnosticEvent): UIO[Unit] = queue.offer(event).unit + } + } + } } diff --git a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala index d982a3c01a..be10411280 100644 --- a/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala +++ b/zio-kafka/src/main/scala/zio/kafka/consumer/internal/Runloop.scala @@ -6,7 +6,7 @@ import org.apache.kafka.common.errors.RebalanceInProgressException import zio._ import zio.kafka.consumer.Consumer.{ CommitTimeout, OffsetRetrieval } import zio.kafka.consumer._ -import zio.kafka.consumer.diagnostics.DiagnosticEvent.Finalization +import zio.kafka.consumer.diagnostics.DiagnosticEvent.{ Finalization, Rebalance } import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics } import zio.kafka.consumer.fetch.FetchStrategy import zio.kafka.consumer.internal.ConsumerAccess.ByteArrayKafkaConsumer @@ -91,7 +91,6 @@ private[consumer] final class Runloop private ( onAssigned = (assignedTps, _) => for { _ <- ZIO.logDebug(s"${assignedTps.size} partitions are assigned") - _ <- diagnostics.emit(DiagnosticEvent.Rebalance.Assigned(assignedTps)) rebalanceEvent <- lastRebalanceEvent.get state <- currentStateRef.get streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams @@ -103,7 +102,6 @@ private[consumer] final class Runloop private ( onRevoked = (revokedTps, _) => for { _ <- ZIO.logDebug(s"${revokedTps.size} partitions are revoked") - _ <- diagnostics.emit(DiagnosticEvent.Rebalance.Revoked(revokedTps)) rebalanceEvent <- lastRebalanceEvent.get state <- currentStateRef.get streamsToEnd = if (restartStreamsOnRebalancing && !rebalanceEvent.wasInvoked) state.assignedStreams @@ -115,7 +113,6 @@ private[consumer] final class Runloop private ( onLost = (lostTps, _) => for { _ <- ZIO.logDebug(s"${lostTps.size} partitions are lost") - _ <- diagnostics.emit(DiagnosticEvent.Rebalance.Lost(lostTps)) rebalanceEvent <- lastRebalanceEvent.get state <- currentStateRef.get lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp)) @@ -307,7 +304,7 @@ private[consumer] final class Runloop private ( _ <- ZIO.logDebug( s"Starting poll with ${state.pendingRequests.size} pending requests and" + s" ${state.pendingCommits.size} pending commits," + - s" resuming ${partitionsToFetch} partitions" + s" resuming $partitionsToFetch partitions" ) _ <- currentStateRef.set(state) pollResult <- @@ -387,6 +384,14 @@ private[consumer] final class Runloop private ( _ <- committedOffsetsRef.update(_.keepPartitions(updatedAssignedStreams.map(_.tp).toSet)): Task[Unit] + _ <- diagnostics.emit( + Rebalance( + revoked = revokedTps, + assigned = assignedTps, + lost = lostTps, + ended = endedStreams.map(_.tp).toSet + ) + ) } yield Runloop.PollResult( records = polledRecords, ignoreRecordsForTps = ignoreRecordsForTps,