-
Notifications
You must be signed in to change notification settings - Fork 386
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pass through context from getOffsetsOnAssign #1066
Comments
If I understand correctly, you want to be able to maintain state for a particular partition when using partitioned sources. Presently at a rebalance the inner partition streams will complete and the new partition streams for that group member get re-emitted. If the same partition is re-emitted to the same group you want it to keep a reference to that context A partial implementation or some pseudo code might be helpful for my own understanding. |
I just want to pass some info from the offset lookup code to the start of the partition stream. Will try to come up with an example. But indeed this would also be useful in case of a complete-re-emit case but that would involve even more API changes |
Here some code that shows the problem (we have to do a expensive operation twice) case class Snapshot(offset: Long, state: State)
def loadSnapshot(partition: TopicPartition): Future[Snapshot] = {
// expensive operation here
}
def getOffsetsOnAssign(partitions: Set[TopicPartition]): Future[Map[TopicPartition, Long]] =
Future
.traverse(partitions) { partition =>
// we drop s.state here
loadSnapshot(partition).map(s => (partition, s.offset))
}
.map(_.toMap)
def getLastCommittedOffset(partition: TopicPartition): Future[Long] =
// use metaDataClient.getCommittedOffset()
committablePartitionedManualOffsetSource(settings, subscription, getOffsetsOnAssign).flatMapConcat {
case (partition, source) =>
val f = for {
lastCommittedOffset <- getLastCommittedOffset(partition)
snapshot <- loadSnapshot(partition)
} yield {
// use snapshot.state to set up the partition
// start business logic that consumes events for the source
// if we have not reached lastCommittedOffset we ignore side effects and commits
// from time to time write out snapshots
}
Source.futureSource(f)
} I would like to pass |
Got it. I agree, it would be useful to propagate additional user context in this scenario. Since your proposed solution involves breaking API changes, maybe we can add a new factory method that emits a I shudder to think what this factory method would be called. |
Had a look at implementing this today but got a bit stuck on the internal bookkeeping, think it will require a few iterations. |
We went through a few iterations but have not found a satisfying solution mainly because of the multi-case logic in the driver. We might have another go at this later on by dropping everything we don't need but our next try will be using the java driver. |
Why not use Akka Kafka sharding? It does state management robust to reassignment. https://akka.io/blog/news/2020/03/18/akka-sharding-kafka-video |
Short description
The use case is a Kafka event-sourced setup with periodic partition snapshot storing
In this case when a partition is assigned you have the following steps:
The problem is that the work done in step 2 can not be accessed in step 4 without resorting to temporarily storing that in some threadsafe reference which might get out of sync if repartitions happen
Details
current function
A simple solution would be to have some context produced in
getOffsetsOnAssign
to be passed in next to theTopicPartition
and theSource[CommitableMessage]
improved function that introduces a
C
for Context (Snapshot in my case)Any workarounds would also be welcome
The text was updated successfully, but these errors were encountered: