Skip to content

Commit

Permalink
Await commit during a rebalance (#1098)
Browse files Browse the repository at this point in the history
  • Loading branch information
erikvanoosten authored Nov 22, 2023
1 parent 0638ebd commit 6ee5d0b
Show file tree
Hide file tree
Showing 11 changed files with 408 additions and 35 deletions.
123 changes: 123 additions & 0 deletions zio-kafka-test/src/test/scala/zio/kafka/consumer/ConsumerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,129 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
consumedMessages <- messagesReceived.get
} yield assert(consumedMessages)(contains(newMessage).negate)
},
suite("rebalanceSafeCommits prevents processing messages twice when rebalancing")({

/**
* Outline of this test:
* - A producer generates some messages on every partition of a topic (2 partitions),
* - A consumer starts reading from the topic. It is the only consumer so it handles all partitions.
* - After a few messages a second consumer is started. One partition will be re-assigned.
*
* Since the first consumer is slow, we expect it to not have committed the offsets yet when the rebalance
* happens. As a consequence, the second consumer would see some messages the first consumer already consumed.
*
* '''However,''' since we enable `rebalanceSafeCommits` on the first consumer, no messages should be consumed
* by both consumers.
*/
def testForPartitionAssignmentStrategy[T <: ConsumerPartitionAssignor: ClassTag] =
test(implicitly[ClassTag[T]].runtimeClass.getName) {
val partitionCount = 2

def makeConsumer(
clientId: String,
groupId: String,
rebalanceSafeCommits: Boolean
): ZIO[Scope with Kafka, Throwable, Consumer] =
for {
settings <- consumerSettings(
clientId = clientId,
groupId = Some(groupId),
`max.poll.records` = 1,
rebalanceSafeCommits = rebalanceSafeCommits
)
consumer <- Consumer.make(settings)
} yield consumer

for {
topic <- randomTopic
subscription = Subscription.topics(topic)
clientId1 <- randomClient
clientId2 <- randomClient
groupId <- randomGroup
_ <- ZIO.attempt(EmbeddedKafka.createCustomTopic(topic, partitions = partitionCount))
// Produce one message to all partitions, every 500 ms
pFib <- ZStream
.fromSchedule(Schedule.fixed(500.millis))
.mapZIO { i =>
ZIO.foreachDiscard(0 until partitionCount) { p =>
produceMany(topic, p, Seq((s"key-$p-$i", s"msg-$p-$i")))
}
}
.runDrain
.fork
_ <- ZIO.logDebug("Starting consumer 1")
c1 <- makeConsumer(clientId1, groupId, rebalanceSafeCommits = true)
c1Sleep <- Ref.make[Int](3)
c1Started <- Promise.make[Nothing, Unit]
c1Keys <- Ref.make(Chunk.empty[String])
fib1 <-
ZIO
.logAnnotate("consumer", "1") {
// When the stream ends, the topic subscription ends as well. Because of that the consumer
// shuts down and commits are no longer possible. Therefore, we signal the second consumer in
// such a way that it doesn't close the stream.
c1
.plainStream(subscription, Serde.string, Serde.string)
.tap(record =>
ZIO.logDebug(
s"Received record with offset ${record.partition}:${record.offset.offset} and key ${record.key}"
)
)
.tap { record =>
// Signal consumer 2 can start when a record is seen for every partition.
for {
keys <- c1Keys.updateAndGet(_ :+ record.key)
_ <- c1Started.succeed(()).when(keys.map(_.split('-')(1)).toSet.size == partitionCount)
} yield ()
}
// Buffer so that the above can run ahead of the below, this is important;
// we want consumer 2 to start before consumer 1 commits.
.buffer(partitionCount)
.mapZIO { record =>
for {
s <- c1Sleep.get
_ <- ZIO.sleep(s.seconds)
_ <- ZIO.logDebug(
s"Committing offset ${record.partition}:${record.offset.offset} for key ${record.key}"
)
_ <- record.offset.commit
} yield record.key
}
.runCollect
.map(_.toSet)
}
.fork
_ <- c1Started.await
_ <- ZIO.logDebug("Starting consumer 2")
c2 <- makeConsumer(clientId2, groupId, rebalanceSafeCommits = false)
fib2 <- ZIO
.logAnnotate("consumer", "2") {
c2
.plainStream(subscription, Serde.string, Serde.string)
.tap(msg => ZIO.logDebug(s"Received ${msg.key}"))
.mapZIO(msg => msg.offset.commit.as(msg.key))
.take(5)
.runCollect
.map(_.toSet)
}
.fork
_ <- ZIO.logDebug("Waiting for consumers to end")
c2Keys: Set[String] <- fib2.join
_ <- ZIO.logDebug("Consumer 2 ready")
_ <- c1.stopConsumption
_ <- c1Sleep.set(0)
c1Keys: Set[String] <- fib1.join
_ <- ZIO.logDebug("Consumer 1 ready")
_ <- pFib.interrupt
} yield assertTrue((c1Keys intersect c2Keys).isEmpty)
}

// Test for both default partition assignment strategies
Seq(
testForPartitionAssignmentStrategy[RangeAssignor],
testForPartitionAssignmentStrategy[CooperativeStickyAssignor]
)
}: _*),
test("partitions for topic doesn't fail if doesn't exist") {
for {
topic <- randomTopic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,32 @@ object RunloopCommitOffsetsSpec extends ZIOSpecDefault {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val s2 = s1.keepPartitions(Set(tp10))
assertTrue(s2.offsets == Map(tp10 -> 10L))
},
test("does not 'contain' offset when tp is not present") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L))
val result = s1.contains(tp20, 10)
assertTrue(!result)
},
test("does not 'contain' a higher offset") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val result = s1.contains(tp10, 11)
assertTrue(!result)
},
test("does 'contain' equal offset") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val result = s1.contains(tp10, 10)
assertTrue(result)
},
test("does 'contain' lower offset") {
val s1 = Runloop.CommitOffsets(Map(tp10 -> 10L, tp20 -> 20L))
val result = s1.contains(tp20, 19)
assertTrue(result)
}
)

private def makeCommit(offsets: Map[TopicPartition, Long]): RunloopCommand.Commit = {
private def makeCommit(offsets: Map[TopicPartition, Long]): Runloop.Commit = {
val o = offsets.map { case (tp, offset) => tp -> new OffsetAndMetadata(offset) }
val p = Unsafe.unsafe(implicit unsafe => Promise.unsafe.make[Throwable, Unit](FiberId.None))
RunloopCommand.Commit(o, p)
Runloop.Commit(o, p)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
maxPollInterval: Duration = 5.minutes,
`max.poll.records`: Int = 100, // settings this higher can cause concurrency bugs to go unnoticed
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
Expand All @@ -138,6 +139,7 @@ object KafkaTestUtils {
)
.withOffsetRetrieval(offsetRetrieval)
.withRestartStreamOnRebalancing(restartStreamOnRebalancing)
.withRebalanceSafeCommits(rebalanceSafeCommits)
.withProperties(properties)

val withClientInstanceId = clientInstanceId.fold(settings)(settings.withGroupInstanceId)
Expand All @@ -154,6 +156,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
properties: Map[String, String] = Map.empty
): URIO[Kafka, ConsumerSettings] =
consumerSettings(
Expand All @@ -163,6 +166,7 @@ object KafkaTestUtils {
allowAutoCreateTopics = allowAutoCreateTopics,
offsetRetrieval = offsetRetrieval,
restartStreamOnRebalancing = restartStreamOnRebalancing,
rebalanceSafeCommits = rebalanceSafeCommits,
properties = properties
)
.map(
Expand Down Expand Up @@ -202,6 +206,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
diagnostics: Diagnostics = Diagnostics.NoOp,
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
commitTimeout: Duration = ConsumerSettings.defaultCommitTimeout,
properties: Map[String, String] = Map.empty
): ZLayer[Kafka, Throwable, Consumer] =
Expand All @@ -213,6 +218,7 @@ object KafkaTestUtils {
allowAutoCreateTopics = allowAutoCreateTopics,
offsetRetrieval = offsetRetrieval,
restartStreamOnRebalancing = restartStreamOnRebalancing,
rebalanceSafeCommits = rebalanceSafeCommits,
properties = properties,
commitTimeout = commitTimeout
)
Expand All @@ -229,6 +235,7 @@ object KafkaTestUtils {
allowAutoCreateTopics: Boolean = true,
diagnostics: Diagnostics = Diagnostics.NoOp,
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
properties: Map[String, String] = Map.empty,
rebalanceListener: RebalanceListener = RebalanceListener.noop
): ZLayer[Kafka, Throwable, Consumer] =
Expand All @@ -240,6 +247,7 @@ object KafkaTestUtils {
allowAutoCreateTopics = allowAutoCreateTopics,
offsetRetrieval = offsetRetrieval,
restartStreamOnRebalancing = restartStreamOnRebalancing,
rebalanceSafeCommits = rebalanceSafeCommits,
properties = properties
).map(_.withRebalanceListener(rebalanceListener))
) ++ ZLayer.succeed(diagnostics)) >>> Consumer.live
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ final case class ConsumerSettings(
offsetRetrieval: OffsetRetrieval = OffsetRetrieval.Auto(),
rebalanceListener: RebalanceListener = RebalanceListener.noop,
restartStreamOnRebalancing: Boolean = false,
rebalanceSafeCommits: Boolean = false,
fetchStrategy: FetchStrategy = QueueSizeBasedFetchStrategy()
) {
private[this] def autoOffsetResetConfig: Map[String, String] = offsetRetrieval match {
Expand Down Expand Up @@ -154,6 +155,40 @@ final case class ConsumerSettings(
def withRestartStreamOnRebalancing(value: Boolean): ConsumerSettings =
copy(restartStreamOnRebalancing = value)

/**
* WARNING: 'rebalanceSafeCommits' is an EXPERIMENTAL feature. It is not recommended for production use yet.
*
* @param value
* Whether to hold up a rebalance until all offsets of consumed messages have been committed. The default is
* `false`, but the recommended value is `true` as it prevents duplicate messages.
*
* Use `false` when:
* - your streams do not commit, or
* - your streams require access to the consumer (the consumer is not available until the rebalance is done), or
* - when it is okay to process records twice (possibly concurrently), for example, because processing is
* idempotent.
*
* When `true`, messages consumed from revoked partitions must be committed before we allow the rebalance to continue.
*
* When a partition is revoked, consuming the messages will be taken over by another consumer. The other consumer will
* continue from the committed offset. It is therefore important that this consumer commits offsets of all consumed
* messages. Therefore, by holding up the rebalance until these commits are done, we ensure that the new consumer will
* start from the correct offset.
*
* During a rebalance no new messages can be received _for any stream_. Therefore, _all_ streams are deprived of new
* messages until the revoked streams are ready committing.
*
* Rebalances are held up for at most 3/5 of `maxPollInterval` (see [[withMaxPollInterval]]), by default this
* calculates to 3 minutes.
*
* When `false`, streams for revoked partitions may continue to run even though the rebalance is not held up. Any
* offset commits from these streams have a high chance of being delayed (commits are not possible during some phases
* of a rebalance). The consumer that takes over the partition will likely not see these delayed commits and will
* start from an earlier offset. The result is that some messages are processed twice and concurrently.
*/
def withRebalanceSafeCommits(value: Boolean): ConsumerSettings =
copy(rebalanceSafeCommits = value)

def withCredentials(credentialsStore: KafkaCredentialStore): ConsumerSettings =
withProperties(credentialsStore.properties)

Expand Down Expand Up @@ -200,6 +235,6 @@ final case class ConsumerSettings(
object ConsumerSettings {
val defaultCommitTimeout: Duration = 15.seconds

def apply(bootstrapServers: List[String]) =
def apply(bootstrapServers: List[String]): ConsumerSettings =
new ConsumerSettings().withBootstrapServers(bootstrapServers)
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import scala.jdk.CollectionConverters._

/**
* ZIO wrapper around Kafka's `ConsumerRebalanceListener` to work with Scala collection types and ZIO effects.
*
* Note that the given ZIO effects are executed directly on the Kafka poll thread. Fork and shift to another executor
* when this is not desired.
*/
final case class RebalanceListener(
onAssigned: (Set[TopicPartition], RebalanceConsumer) => Task[Unit],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ private[consumer] final class ConsumerAccess(
def withConsumerZIO[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] =
access.lock.zipRight(withConsumerNoPermit(f)).ensuring(access.unlock)

private[consumer] def withConsumerNoPermit[R, A](
private def withConsumerNoPermit[R, A](
f: ByteArrayKafkaConsumer => RIO[R, A]
): RIO[R, A] =
ZIO
Expand All @@ -33,10 +33,17 @@ private[consumer] final class ConsumerAccess(
.flatMap(fib => fib.join.onInterrupt(ZIO.succeed(consumer.wakeup()) *> fib.interrupt))

/**
* Do not use this method outside of the Runloop
* Use this method only from Runloop.
*/
private[internal] def runloopAccess[R, E, A](f: ByteArrayKafkaConsumer => ZIO[R, E, A]): ZIO[R, E, A] =
access.lock.zipRight(f(consumer)).ensuring(access.unlock)

/**
* Use this method ONLY from the rebalance listener.
*/
private[internal] def rebalanceListenerAccess[R, A](f: ByteArrayKafkaConsumer => RIO[R, A]): RIO[R, A] =
withConsumerNoPermit(f)

}

private[consumer] object ConsumerAccess {
Expand Down
Loading

0 comments on commit 6ee5d0b

Please sign in to comment.