Skip to content

Commit

Permalink
Wait for commits to complete in rebalance listener and on unsubscribe
Browse files Browse the repository at this point in the history
This PR adds the setting `rebalanceSafeCommits`. When set to `true`, we hold up a rebalance until offsets for revoked partitions are committed. This allows the client that gets the partition assigned hereafter to start at the exact right offset, preventing duplicate processing.

When `rebalanceSafeCommits` is set to `false` the previous behavior is active. This lets the old stream finish its work in the background while the new stream is already starting. The old behavior causes duplicate processing but can be a bit more performant since rebalances are not hold up.

In order to handle commits while waiting for the streams to end, commits are added to a separate queue. To make sure the main poll loop knows about these commits, we also place a `CommitAvailable` value on the command queue.

The rebalance listener ZIO effects (and other callbacks) are executed on the calling thread. This is required because the underlying Java client requires single threaded access. It rejects multiple concurrent invocation unless they are from the same thread, that is, the thread that executes the poll. (Remember, the rebalance listener and other callbacks are invoked by the Java client during a call to poll or commit.) Note though that ZIO operations that require time are not possible since these cause the workflow to shift to another thread.

Once we know that all commits have been sent to the broker, we need to wait till the callbacks are called. We do this by calling `commitSync` with an empty map of offsets. CommitSync guarantees that all previously send commits complete before commitSync returns. (NOTE: requires patched Kafka client.)

The rebalance listener no longer manipulates the state. The rebalance event only collects changes. The main poll loop then use those changes to construct the next state. When the rebalance listener was not invoked, we skip some tasks for a small performance improvement.

Added WIP RebalanceSafeCommitConsumerSpec as a testbed for improved unit tests (and also to test the new commit-on-rebalance feature)

Also:
 - End all streams when only onLost is invoked.
 - Disable zio-intellij's inspection `SimplifyWhenInspection` in Runloop because its suggestion is not equivalent (performance-wise).
 - Configure a low `max.poll.records` (100 instead of 1000) for tests. There was a concurrency bug that only failed a test the lower setting. The benchmarks continue to use 1000 for `max.poll.records`.
 - Prevent deadlock by doing unsubscribe with a single consumer access.
 - Await stream end on unsubscribe and shutdown as well.
 - Removed method `Runloop.awaitShutdown`, it is not used, not accessible and its semantics are questionable.
  • Loading branch information
erikvanoosten committed May 24, 2023
1 parent d941b29 commit ac7f61b
Show file tree
Hide file tree
Showing 12 changed files with 1,641 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,9 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
)
},
test("commit offsets for all consumed messages") {
//
// TODO: find out whether the test description is wrong (it doesn't seem to commit), or the test is wrong
//
val nrMessages = 50
val messages = (1 to nrMessages).toList.map(i => (s"key$i", s"msg$i"))

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
`max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed
runloopTimeout: Duration = ConsumerSettings.defaultRunloopTimeout,
properties: Map[String, String] = Map.empty
Expand All @@ -123,6 +124,7 @@ object KafkaTestUtils {
)
.withOffsetRetrieval(offsetRetrieval)
.withRestartStreamOnRebalancing(restartStreamOnRebalancing)
.withRebalanceSafeCommits(rebalanceSafeCommits)
.withProperties(properties)

val withClientInstanceId = clientInstanceId.fold(settings)(settings.withGroupInstanceId)
Expand All @@ -139,6 +141,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
properties: Map[String, String] = Map.empty
): URIO[Kafka, ConsumerSettings] =
consumerSettings(
Expand All @@ -148,6 +151,7 @@ object KafkaTestUtils {
allowAutoCreateTopics = allowAutoCreateTopics,
offsetRetrieval = offsetRetrieval,
restartStreamOnRebalancing = restartStreamOnRebalancing,
rebalanceSafeCommits = rebalanceSafeCommits,
properties = properties
)
.map(
Expand Down Expand Up @@ -187,6 +191,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
diagnostics: Diagnostics = Diagnostics.NoOp,
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
properties: Map[String, String] = Map.empty
): ZLayer[Kafka, Throwable, Consumer] =
(ZLayer(
Expand All @@ -197,6 +202,7 @@ object KafkaTestUtils {
allowAutoCreateTopics = allowAutoCreateTopics,
offsetRetrieval = offsetRetrieval,
restartStreamOnRebalancing = restartStreamOnRebalancing,
rebalanceSafeCommits = rebalanceSafeCommits,
properties = properties
)
) ++ ZLayer.succeed(diagnostics)) >>> Consumer.live
Expand All @@ -212,6 +218,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
diagnostics: Diagnostics = Diagnostics.NoOp,
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
properties: Map[String, String] = Map.empty,
rebalanceListener: RebalanceListener = RebalanceListener.noop
): ZLayer[Kafka, Throwable, Consumer] =
Expand All @@ -223,6 +230,7 @@ object KafkaTestUtils {
allowAutoCreateTopics = allowAutoCreateTopics,
offsetRetrieval = offsetRetrieval,
restartStreamOnRebalancing = restartStreamOnRebalancing,
rebalanceSafeCommits = rebalanceSafeCommits,
properties = properties
).map(_.withRebalanceListener(rebalanceListener))
) ++ ZLayer.succeed(diagnostics)) >>> Consumer.live
Expand Down
1 change: 1 addition & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,7 @@ object Consumer {
offsetRetrieval = settings.offsetRetrieval,
userRebalanceListener = settings.rebalanceListener,
restartStreamsOnRebalancing = settings.restartStreamOnRebalancing,
rebalanceSafeCommits = settings.rebalanceSafeCommits,
runloopTimeout = settings.runloopTimeout
)
subscriptions <- Ref.Synchronized.make(Set.empty[Subscription])
Expand Down
27 changes: 27 additions & 0 deletions zio-kafka/src/main/scala/zio/kafka/consumer/ConsumerSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,29 @@ import zio.kafka.security.KafkaCredentialStore
* @param restartStreamOnRebalancing
* When `true` _all_ streams are restarted during a rebalance, including those streams that are not revoked. The
* default is `false`.
*
* @param rebalanceSafeCommits
* Whether to hold up a rebalance until all offsets of consumed messages have been committed. The default is `false`,
* but the recommended value is `true` as it prevents duplicate messages.
*
* Use `false` _only_ when your streams does not do commits, or when it is okay to have messages processed twice
* concurrently and you cannot afford the performance hit during a rebalance.
*
* When `true`, messages consumed from revoked partitions must be committed before we allow the rebalance to continue.
*
* When a partition is revoked, consuming the messages will be taken over by another consumer. The other consumer will
* continue from the committed offset. It it therefore important that this consumer commits offsets of all consumed
* messages. Therefore, by holding up the rebalance until these commits are done, we ensure that the new consumer will
* start from the correct offset.
*
* During a rebalance no new messages can be received _for any stream_. Therefore, _all_ streams are deprived of new
* messages until the revoked streams are ready committing.
*
* When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any offset
* commits from these streams have a high chance of being delayed (commits are not possible during some phases of a
* rebalance). The consumer that takes over the partition will likely not see these delayed commits and will start from
* an earlier offset. The result is that some messages are processed twice and concurrently.
*
* @param runloopTimeout
* Internal timeout for each iteration of the command processing and polling loop, use to detect stalling. This should
* be much larger than the pollTimeout and the time it takes to process chunks of records. If your consumer is not
Expand All @@ -29,6 +52,7 @@ case class ConsumerSettings(
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
rebalanceListener: RebalanceListener = RebalanceListener.noop,
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
runloopTimeout: Duration = ConsumerSettings.defaultRunloopTimeout
) {
private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match {
Expand Down Expand Up @@ -81,6 +105,9 @@ case class ConsumerSettings(
def withRestartStreamOnRebalancing(value: Boolean): ConsumerSettings =
copy(restartStreamOnRebalancing = value)

def withRebalanceSafeCommits(value: Boolean): ConsumerSettings =
copy(rebalanceSafeCommits = value)

def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings =
withProperties(credentialsStore.properties)

Expand Down
6 changes: 3 additions & 3 deletions zio-kafka/src/main/scala/zio/kafka/consumer/Offset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ sealed trait Offset {
def partition: Int
def offset: Long
def commit: Task[Unit]
def batch: OffsetBatch
def asOffsetBatch: OffsetBatch
def consumerGroupMetadata: Option[ConsumerGroupMetadata]

/**
Expand Down Expand Up @@ -42,6 +42,6 @@ private final case class OffsetImpl(
commitHandle: Map[TopicPartition, Long] => Task[Unit],
consumerGroupMetadata: Option[ConsumerGroupMetadata]
) extends Offset {
def commit: Task[Unit] = commitHandle(Map(topicPartition -> offset))
def batch: OffsetBatch = OffsetBatchImpl(Map(topicPartition -> offset), commitHandle, consumerGroupMetadata)
def commit: Task[Unit] = commitHandle(Map(topicPartition -> offset))
def asOffsetBatch: OffsetBatch = OffsetBatchImpl(Map(topicPartition -> offset), commitHandle, consumerGroupMetadata)
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private final case class OffsetBatchImpl(
case object EmptyOffsetBatch extends OffsetBatch {
override val offsets: Map[TopicPartition, Long] = Map.empty
override val commit: Task[Unit] = ZIO.unit
override def add(offset: Offset): OffsetBatch = offset.batch
override def add(offset: Offset): OffsetBatch = offset.asOffsetBatch
override def merge(offset: Offset): OffsetBatch = add(offset)
override def merge(offsets: OffsetBatch): OffsetBatch = offsets
override def consumerGroupMetadata: Option[ConsumerGroupMetadata] = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import scala.jdk.CollectionConverters._

/**
* ZIO wrapper around Kafka's `ConsumerRebalanceListener` to work with Scala collection types and ZIO effects.
*
* Note that the given ZIO effects are executed directly on the Kafka poll thread. Fork and shift to another executor
* when this is not desired.
*/
final case class RebalanceListener(
onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ private[consumer] final class ConsumerAccess(
def withConsumerZIO[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] =
access.withPermit(withConsumerNoPermit(f))

private[consumer] def withConsumerNoPermit[R, A](
private def withConsumerNoPermit[R, A](
f: ByteArrayKafkaConsumer => RIO[R, A]
): RIO[R, A] =
ZIO
Expand All @@ -31,10 +31,17 @@ private[consumer] final class ConsumerAccess(
.flatMap(fib => fib.join.onInterrupt(ZIO.succeed(consumer.wakeup()) *> fib.interrupt))

/**
* Do not use this method outside of the Runloop
* Use this method only from Runloop.
*/
private[internal] def runloopAccess[R, E, A](f: ByteArrayKafkaConsumer => ZIO[R, E, A]): ZIO[R, E, A] =
access.withPermit(f(consumer))

/**
* Use this method ONLY from the rebalance listener.
*/
private[internal] def rebalanceListenerAccess[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] =
withConsumerNoPermit(f)

}

private[consumer] object ConsumerAccess {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
package zio.kafka.consumer.internal

import org.apache.kafka.common.TopicPartition
import zio.kafka.consumer.Offset
import zio.kafka.consumer.diagnostics.{ DiagnosticEvent, Diagnostics }
import zio.kafka.consumer.internal.Runloop.Command.Request
import zio.kafka.consumer.internal.Runloop.{ ByteArrayCommittableRecord, Command }
import zio.stream.{ Take, ZStream }
import zio.{ Chunk, LogAnnotation, Promise, Queue, UIO, ZIO }
import zio.{ Chunk, LogAnnotation, Promise, Queue, Ref, UIO, ZIO }

private[internal] final class PartitionStreamControl private (
val tp: TopicPartition,
stream: ZStream[Any, Throwable, ByteArrayCommittableRecord],
val lastOffset: Ref[Option[Offset]],
dataQueue: Queue[Take[Throwable, ByteArrayCommittableRecord]],
interruptPromise: Promise[Throwable, Unit],
completedPromise: Promise[Nothing, Unit]
startedPromise: Promise[Nothing, Unit],
endedPromise: Promise[Nothing, Unit],
completedPromise: Promise[Nothing, Unit],
interruptPromise: Promise[Throwable, Unit]
) {

private var pollResumedHistory: PollHistory = PollHistory.Empty
Expand All @@ -24,7 +28,8 @@ private[internal] final class PartitionStreamControl private (

/** Offer new data for the stream to process. */
def offerRecords(data: Chunk[ByteArrayCommittableRecord]): ZIO[Any, Nothing, Unit] =
dataQueue.offer(Take.chunk(data)).unit
data.lastOption.fold(ZIO.unit)(last => lastOffset.set(Some(last.offset))) *>
dataQueue.offer(Take.chunk(data)).unit

/** To be invoked when the partition was lost. */
def lost(): UIO[Boolean] =
Expand All @@ -34,16 +39,40 @@ private[internal] final class PartitionStreamControl private (
def end(): ZIO[Any, Nothing, Unit] =
logAnnotate {
ZIO.logTrace(s"Partition ${tp.toString} ending") *>
dataQueue.offer(Take.end).unit
ZIO
.whenZIO(endedPromise.succeed(())) {
dataQueue.offer(Take.end)
}
.unit
}

/** Returns true when the stream is done. */
def isCompleted: ZIO[Any, Nothing, Boolean] =
completedPromise.isDone
/** Returns true when the stream accepts new data. */
def acceptsData: ZIO[Any, Nothing, Boolean] =
for {
ended <- endedPromise.isDone
completed <- completedPromise.isDone
interrupted <- interruptPromise.isDone
} yield !(ended || completed || interrupted)

/** Returns true when the stream is running. */
def isRunning: ZIO[Any, Nothing, Boolean] =
isCompleted.negate
/** Returns true when the stream is done (or when it didn't even start). */
def isCompletedAfterStart: ZIO[Any, Nothing, Boolean] =
for {
started <- startedPromise.isDone
completed <- completedPromise.isDone
} yield !started || completed

def lasOffsetIsIn(committedOffsets: Map[TopicPartition, Long]): ZIO[Any, Nothing, Boolean] =
lastOffset.get.map(_.forall(offset => committedOffsets.get(offset.topicPartition).exists(_ >= offset.offset))).tap {
result =>
for {
lo <- lastOffset.get
_ <- ZIO.logDebug(
s"${tp.partition()} lastOffset: ${lo.map(_.offset.toString).getOrElse("-")} " +
s"in committedOffsets: ${committedOffsets.get(tp).map(_.toString).getOrElse("-")} " +
s"==> $result"
)
} yield ()
}

val tpStream: (TopicPartition, ZStream[Any, Throwable, ByteArrayCommittableRecord]) =
(tp, stream)
Expand Down Expand Up @@ -72,8 +101,11 @@ private[internal] object PartitionStreamControl {
): ZIO[Any, Nothing, PartitionStreamControl] =
for {
_ <- ZIO.logTrace(s"Creating partition stream ${tp.toString}")
interruptionPromise <- Promise.make[Throwable, Unit]
startedPromise <- Promise.make[Nothing, Unit]
endedPromise <- Promise.make[Nothing, Unit]
completedPromise <- Promise.make[Nothing, Unit]
interruptionPromise <- Promise.make[Throwable, Unit]
lastOffset <- Ref.make[Option[Offset]](None)
dataQueue <- Queue.unbounded[Take[Throwable, ByteArrayCommittableRecord]]
requestAndAwaitData =
for {
Expand All @@ -90,12 +122,22 @@ private[internal] object PartitionStreamControl {
completedPromise.succeed(()) <*
ZIO.logDebug(s"Partition stream ${tp.toString} has ended")
) *>
ZStream.fromZIO(startedPromise.succeed(())) *>
ZStream.repeatZIOChunk {
// First try to take all records that are available right now.
// When no data is available, request more data and await its arrival.
dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data))
}.flattenTake
.interruptWhen(interruptionPromise)
} yield new PartitionStreamControl(tp, stream, dataQueue, interruptionPromise, completedPromise)
} yield new PartitionStreamControl(
tp,
stream,
lastOffset,
dataQueue,
startedPromise,
endedPromise,
completedPromise,
interruptionPromise
)

}
Loading

0 comments on commit ac7f61b

Please sign in to comment.