Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass through context from getOffsetsOnAssign #1066

Open
francisdb opened this issue Feb 26, 2020 · 7 comments
Open

Pass through context from getOffsetsOnAssign #1066

francisdb opened this issue Feb 26, 2020 · 7 comments

Comments

@francisdb
Copy link

francisdb commented Feb 26, 2020

Short description

The use case is a Kafka event-sourced setup with periodic partition snapshot storing

In this case when a partition is assigned you have the following steps:

  1. load snapshot for partition
  2. start reading topic at the offset of that snapshot
  3. (now you get your alpakka stream of streams partition stream)
  4. rebuild the state from the snapshot (need a way to access the snapshot here)
  5. fetch the actual latest committed offset on that partition
  6. start reading the stream and while you have not reached the latest committed offset you apply the events without performing any side-effects

The problem is that the work done in step 2 can not be accessed in step 4 without resorting to temporarily storing that in some threadsafe reference which might get out of sync if repartitions happen

Details

current function

def committablePartitionedManualOffsetSource[K, V](
      settings: ConsumerSettings[K, V],
      subscription: AutoSubscription,
      getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, Long]],
      onRevoke: Set[TopicPartition] => Unit = _ => ()
  ): Source[(TopicPartition, Source[CommittableMessage[K, V], NotUsed]), Control]

A simple solution would be to have some context produced in getOffsetsOnAssign to be passed in next to the TopicPartition and the Source[CommitableMessage]

improved function that introduces a C for Context (Snapshot in my case)

def committablePartitionedManualOffsetSource[K, V, C](
      settings: ConsumerSettings[K, V],
      subscription: AutoSubscription,
      getOffsetsOnAssign: Set[TopicPartition] => Future[Map[TopicPartition, (Long, C)]],
      onRevoke: Set[TopicPartition] => Unit = _ => ()
  ): Source[(TopicPartition, C, Source[CommittableMessage[K, V], NotUsed]), Control]

Any workarounds would also be welcome

@seglo
Copy link
Contributor

seglo commented Feb 26, 2020

If I understand correctly, you want to be able to maintain state for a particular partition when using partitioned sources. Presently at a rebalance the inner partition streams will complete and the new partition streams for that group member get re-emitted. If the same partition is re-emitted to the same group you want it to keep a reference to that context C so it can be re-emitted with the new stream?

A partial implementation or some pseudo code might be helpful for my own understanding.

@francisdb
Copy link
Author

francisdb commented Feb 26, 2020

I just want to pass some info from the offset lookup code to the start of the partition stream. Will try to come up with an example.

But indeed this would also be useful in case of a complete-re-emit case but that would involve even more API changes

@francisdb
Copy link
Author

francisdb commented Feb 26, 2020

Here some code that shows the problem (we have to do a expensive operation twice)

  case class Snapshot(offset: Long, state: State)

  def loadSnapshot(partition: TopicPartition): Future[Snapshot] = {
    // expensive operation here
  }

  def getOffsetsOnAssign(partitions: Set[TopicPartition]): Future[Map[TopicPartition, Long]] =
    Future
      .traverse(partitions) { partition =>
        // we drop s.state here
        loadSnapshot(partition).map(s => (partition, s.offset))
      }
      .map(_.toMap)

  def getLastCommittedOffset(partition: TopicPartition): Future[Long] =
    // use metaDataClient.getCommittedOffset()

  committablePartitionedManualOffsetSource(settings, subscription, getOffsetsOnAssign).flatMapConcat {
    case (partition, source) =>
      val f = for {
        lastCommittedOffset <- getLastCommittedOffset(partition)
        snapshot            <- loadSnapshot(partition)
      } yield {
        // use snapshot.state to set up the partition
        // start business logic that consumes events for the source
        // if we have not reached lastCommittedOffset we ignore side effects and commits
        // from time to time write out snapshots
      }
      Source.futureSource(f)
  }

I would like to pass snapshot.state through so we don't have to do this expensive operation twice. (also this code might get out of sync and load a newer snapshot the second time, since snapshot writing is async)

@seglo
Copy link
Contributor

seglo commented Feb 27, 2020

Got it. I agree, it would be useful to propagate additional user context in this scenario.

Since your proposed solution involves breaking API changes, maybe we can add a new factory method that emits a SourceWithContext (where C in your use case would be the context). We can use a new getOffsetsOnAssign signature that returns a C to populate it. To access the context you could mapContext, or drop down to a normal Source with SourceWithContext.asSource and the context would be available in a tuple of ((TopicPartition, Source[CommittableOffset], Ctx). Or users could just access the context for that partitioned source downstream if they want, though options are limited for merging streams together, etc, perhaps this is overthinking it though. Maybe you could experiment with it?

I shudder to think what this factory method would be called. sourceWithContextCommittablePartitionedManualOffsetSource just doesn't roll off the tongue!

@francisdb
Copy link
Author

Had a look at implementing this today but got a bit stuck on the internal bookkeeping, think it will require a few iterations.

@francisdb
Copy link
Author

We went through a few iterations but have not found a satisfying solution mainly because of the multi-case logic in the driver. We might have another go at this later on by dropping everything we don't need but our next try will be using the java driver.

@SemanticBeeng
Copy link

SemanticBeeng commented Jul 21, 2022

The problem is that the work done in step 2 can not be accessed in step 4 without resorting to temporarily storing that in some threadsafe reference which might get out of sync if repartitions happen

Why not use Akka Kafka sharding? It does state management robust to reassignment.

https://akka.io/blog/news/2020/03/18/akka-sharding-kafka-video
https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html#external-shard-allocation

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants