-
Notifications
You must be signed in to change notification settings - Fork 104
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Possibility of blocking/greedy rebalance handlers #127
Comments
for example, this would be a failing test: package fs2.kafka
import scala.concurrent.duration._
import cats.effect.IO
import cats.implicits._
import fs2.{Chunk, Pipe, Stream}
final class RebalanceSpec extends BaseKafkaSpec {
type Consumer = KafkaConsumer[IO, String, String]
type ConsumerStream = Stream[IO, CommittableMessage[IO, String, String]]
describe("rebalance process") {
it("should consume only once") {
withKafka { (config, topic) =>
createCustomTopic(topic, partitions = 6)
Stream
.unfoldEval[IO, Int, Int](0)(i => IO.pure(Some(i -> (i + 1))))
.take(1024 * 16)
.chunkN(32)
.zipLeft(Stream.repeatEval(timer.sleep(50.millis)))
.map(xs => publishToKafka(topic, xs.map(i => s"$i" -> s"$i").toList))
.compile
.drain
.map(x => println(s"producer completed $x"))
.unsafeRunAsyncAndForget()
val commit: Pipe[IO, Chunk[CommittableOffset[IO]], Unit] =
commitBatchChunk[IO]
val cstream: Stream[IO, Int] = consumerStream[IO]
.using(consumerSettings(config).withGroupId("g1"))
.evalTap(_.subscribeTo(topic))
.evalTap(consumer => IO(consumer.toString should startWith("KafkaConsumer$")).void)
.flatMap(_.stream)
.groupWithin(1000, 5.second)
.flatMap(
c =>
commit(Stream(c.map(_.committableOffset)))
.map(_ => println(s"committed chunk of ${c.size}"))
.flatMap(_ => Stream.chunk(c))
)
.map(_.record.value.toInt)
.interruptAfter(120.seconds)
val s1: Stream[IO, Int] = cstream
// will trigger rebalance
val s2: Stream[IO, Int] = cstream.delayBy(7.seconds)
val (c1, c2) = (s1.compile.toVector, s2.compile.toVector).parTupled.unsafeRunSync()
assert(c1.intersect(c2).isEmpty)
}
}
}
} if there was a way to do blocking commits on rebalance, it would pass. |
Thanks for opening this issue @tkroman! We can definitely add blocking rebalance listeners and support for syncronous commits, but the tricky part is knowing when you've completed 'processing of all currently consumed records' in the user-defined |
I've spent whole day yesterday trying to come up with a way to solve this in a way that would provide an out-of-the-box (from fs2-kafka POV) solution and I think under current constraints there isn't one. I think providing a full-fledged support for rebalance listeners is the right choice here. Here's what I've found out yesterday:
To answer your question - I have an implementation that is guarding against this issue, but it's based on a "native" kafka-consumer w/o any abstractions on top, so the logic there is something like this (pseudocode): // `inflight` is a shared state
// it is a set of consumer records
// messages can be added to `inflight` when they are polled
// and removed either manually or after commit happened
inflight = Set.empty
newRecords = poll()
inflight ++= newRecords
// `process` is business logic.
// it returns a set of committable messages
// it also can access `inflight` to remove certain "discarded" messages
commits = process(newRecords)
commit(commits)
inflight --= commits and rebalance listener is defined as def onRevoked(tps) = {
// block until there are inflight messages
while (!inflight.isEmpty) busyLoop()
} I can roughly see how this would translate to a streaming analogy, but I'm relatively new to fs2 to confidently start implementing production-grade solution, so any help or further discussion would be greatly appreciated. |
It sounds like we want a rebalance listener which provides access to the Java Kafka consumer (although wrapped so that operations are suspended in |
Yeah, that + optionally standardizing on some sort of inner state? I still can't formulate it precisely but from my example above - the |
With the changes in #129 we should be able to just use Edit: you might also be interested in #128 for true exactly-once delivery, rather than just minimizing the risk of duplicates. |
FWIW, the same issue that I raised in akka's kafka library repository (akka/alpakka-kafka#539 (comment)) was fixed in akka/alpakka-kafka#949 (by way of wrapping more kafka classes and exposing them in the API). |
#117 was a nice improvement over previous behavior in the sense that commit commands aren't lost anymore, but there is another issue that can't be controlled in any way right now: for consumers that want exactly-once [ish] with batching (e.g.
groupWIthin
) it's possible to consume the same record more than once due to rebalances.Consider a 2-consumer group with members A and B, where only A is active right now. Say there is a record R, which gets consumed by A and at the beginning of a 5-second window, and then within this 5-second window a rebalance happens. Now, if R belonged to a partition that was reassigned to B, and if A didn't commit R yet, A's commit will be deferred until after rebalance completes, which means A will commit R and B will consume R and then commit it too, which means double consumption w/o possibility to control this
One solution I can see is having an option to wait for the completion of processing of all currently consumed records in revocation handler, i.e. don't revoke until we completely handled everything we've read up to "now" and then proceed normally.
WDYT?
The text was updated successfully, but these errors were encountered: