From 5090f6f3267347c7b4caef1916621e183b2b6a2c Mon Sep 17 00:00:00 2001 From: Kirill Shelopugin Date: Tue, 23 Jul 2024 11:26:10 +0200 Subject: [PATCH] allow deriving and remapping keys (#616) --- .../kafka/flow/PartitionFlow.scala | 19 +- .../kafka/flow/PartitionFlowOf.scala | 11 +- .../evolutiongaming/kafka/flow/RemapKey.scala | 22 ++ .../kafka/flow/PartitionFlowSpec.scala | 128 ++++++++++- .../src/test/resources/logback-test.xml | 2 +- .../kafka/flow/RemapKeySpec.scala | 208 ++++++++++++++++++ .../kafka/flow/kafkapersistence/package.scala | 10 +- 7 files changed, 385 insertions(+), 15 deletions(-) create mode 100644 core/src/main/scala/com/evolutiongaming/kafka/flow/RemapKey.scala create mode 100644 persistence-kafka-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/RemapKeySpec.scala diff --git a/core/src/main/scala/com/evolutiongaming/kafka/flow/PartitionFlow.scala b/core/src/main/scala/com/evolutiongaming/kafka/flow/PartitionFlow.scala index 52cc106b..a3bcfb18 100644 --- a/core/src/main/scala/com/evolutiongaming/kafka/flow/PartitionFlow.scala +++ b/core/src/main/scala/com/evolutiongaming/kafka/flow/PartitionFlow.scala @@ -12,7 +12,7 @@ import com.evolutiongaming.catshelper.{Log, LogOf} import com.evolutiongaming.kafka.flow.PartitionFlowConfig.ParallelismMode._ import com.evolutiongaming.kafka.flow.kafka.{OffsetToCommit, ScheduleCommit} import com.evolutiongaming.kafka.flow.timer.{TimerContext, Timestamp} -import com.evolutiongaming.skafka.consumer.ConsumerRecord +import com.evolutiongaming.skafka.consumer.{ConsumerRecord, WithSize} import com.evolutiongaming.skafka.{Offset, TopicPartition} import scodec.bits.ByteVector @@ -57,11 +57,12 @@ object PartitionFlow { keyStateOf: KeyStateOf[F], config: PartitionFlowConfig, filter: Option[FilterRecord[F]] = None, + remapKey: Option[RemapKey[F]] = None, scheduleCommit: ScheduleCommit[F] ): Resource[F, PartitionFlow[F]] = LogResource[F](getClass, topicPartition.toString) flatMap { implicit log => Cache.loading[F, String, PartitionKey[F]] flatMap { cache => - of(topicPartition, assignedAt, keyStateOf, cache, config, filter, scheduleCommit) + of(topicPartition, assignedAt, keyStateOf, cache, config, filter, remapKey, scheduleCommit) } } @@ -72,6 +73,7 @@ object PartitionFlow { cache: Cache[F, String, PartitionKey[F]], config: PartitionFlowConfig, filter: Option[FilterRecord[F]] = None, + remapKey: Option[RemapKey[F]] = None, scheduleCommit: ScheduleCommit[F] )(implicit log: Log[F]): Resource[F, PartitionFlow[F]] = for { clock <- Resource.eval(Clock[F].instant) @@ -89,6 +91,7 @@ object PartitionFlow { cache = cache, config = config, filter = filter, + remapKey = remapKey, scheduleCommit = scheduleCommit ) } yield flow @@ -104,6 +107,7 @@ object PartitionFlow { cache: Cache[F, String, PartitionKey[F]], config: PartitionFlowConfig, filter: Option[FilterRecord[F]], + remapKey: Option[RemapKey[F]], scheduleCommit: ScheduleCommit[F] )(implicit log: Log[F]): Resource[F, PartitionFlow[F]] = { @@ -144,7 +148,16 @@ object PartitionFlow { _ <- log.debug(s"processing ${records.size} records") clock <- Clock[F].instant - keys = records groupBy (_.key map (_.value)) collect { + remappedRecords <- remapKey.fold(records.pure[F])(remapKey => + records.traverse { record => + record.key match { + case Some(WithSize(key, _)) => + remapKey.remap(key, record).map(newKey => record.copy(key = WithSize(newKey).some)) + case None => record.pure[F] + } + } + ) + keys = remappedRecords.groupBy(_.key map (_.value)).collect { // we deliberately ignore records without a key to simplify the code // we might return the support in future if such will be required case (Some(key), records) => (key, records) diff --git a/core/src/main/scala/com/evolutiongaming/kafka/flow/PartitionFlowOf.scala b/core/src/main/scala/com/evolutiongaming/kafka/flow/PartitionFlowOf.scala index 2cad6ae7..08e3768e 100644 --- a/core/src/main/scala/com/evolutiongaming/kafka/flow/PartitionFlowOf.scala +++ b/core/src/main/scala/com/evolutiongaming/kafka/flow/PartitionFlowOf.scala @@ -1,7 +1,7 @@ package com.evolutiongaming.kafka.flow -import cats.effect.kernel.Async import cats.effect.Resource +import cats.effect.kernel.Async import com.evolutiongaming.catshelper.LogOf import com.evolutiongaming.kafka.flow.PartitionFlow.FilterRecord import com.evolutiongaming.kafka.flow.kafka.ScheduleCommit @@ -26,12 +26,17 @@ object PartitionFlowOf { * no state will be restored for that key; (2) no fold will be executed for that event. It doesn't affect * committing consumer offsets, thus, even if all records in a batch are skipped, new offsets will still be * committed if necessary + * @param remapKey + * allows to remap the key of a record before it is processed by the flow. Remapping is done before the record is + * processed by the flow. Thus, the next steps in the flow (such as [[FilterRecord]] and [[FoldOption]]) will see + * the remapped key */ def apply[F[_]: Async: LogOf]( keyStateOf: KeyStateOf[F], config: PartitionFlowConfig = PartitionFlowConfig(), - filter: Option[FilterRecord[F]] = None + filter: Option[FilterRecord[F]] = None, + remapKey: Option[RemapKey[F]] = None, ): PartitionFlowOf[F] = { (topicPartition, assignedAt, scheduleCommit) => - PartitionFlow.resource(topicPartition, assignedAt, keyStateOf, config, filter, scheduleCommit) + PartitionFlow.resource(topicPartition, assignedAt, keyStateOf, config, filter, remapKey, scheduleCommit) } } diff --git a/core/src/main/scala/com/evolutiongaming/kafka/flow/RemapKey.scala b/core/src/main/scala/com/evolutiongaming/kafka/flow/RemapKey.scala new file mode 100644 index 00000000..04199c78 --- /dev/null +++ b/core/src/main/scala/com/evolutiongaming/kafka/flow/RemapKey.scala @@ -0,0 +1,22 @@ +package com.evolutiongaming.kafka.flow + +import cats.Applicative +import cats.syntax.applicative._ +import com.evolutiongaming.skafka.consumer.ConsumerRecord +import scodec.bits.ByteVector + +trait RemapKey[F[_]] { + + /** Derive a new key for the consumer record based on the current key (if there is one) and the record itself. + * Deriving is done before the record is processed by the flow. Thus, the next steps in the flow (such as + * [[FilterRecord]] and [[FoldOption]]) will see the remapped key in the consumer record. + */ + def remap(key: String, record: ConsumerRecord[String, ByteVector]): F[String] +} + +object RemapKey { + def of[F[_]](f: (String, ConsumerRecord[String, ByteVector]) => F[String]): RemapKey[F] = (key, record) => + f(key, record) + + def empty[F[_]: Applicative]: RemapKey[F] = (key, _) => key.pure[F] +} diff --git a/core/src/test/scala/com/evolutiongaming/kafka/flow/PartitionFlowSpec.scala b/core/src/test/scala/com/evolutiongaming/kafka/flow/PartitionFlowSpec.scala index eadaf50e..b90a47ae 100644 --- a/core/src/test/scala/com/evolutiongaming/kafka/flow/PartitionFlowSpec.scala +++ b/core/src/test/scala/com/evolutiongaming/kafka/flow/PartitionFlowSpec.scala @@ -3,19 +3,20 @@ package com.evolutiongaming.kafka.flow import cats.effect.unsafe.IORuntime import cats.effect.{IO, Ref, Resource} import cats.syntax.all._ +import com.evolution.scache.Cache import com.evolutiongaming.catshelper.{Log, LogOf} -import com.evolutiongaming.kafka.flow.PartitionFlow.FilterRecord +import com.evolutiongaming.kafka.flow.PartitionFlow.{FilterRecord, PartitionKey} import com.evolutiongaming.kafka.flow.PartitionFlowSpec._ import com.evolutiongaming.kafka.flow.effect.CatsEffectMtlInstances._ import com.evolutiongaming.kafka.flow.journal.JournalsOf import com.evolutiongaming.kafka.flow.kafka.{ScheduleCommit, ToOffset} -import com.evolutiongaming.kafka.flow.key.KeysOf +import com.evolutiongaming.kafka.flow.key.{KeyDatabase, KeysOf} import com.evolutiongaming.kafka.flow.persistence.PersistenceOf import com.evolutiongaming.kafka.flow.registry.EntityRegistry import com.evolutiongaming.kafka.flow.snapshot.{SnapshotDatabase, SnapshotsOf} -import com.evolutiongaming.kafka.flow.timer.{TimerContext, TimerFlowOf, Timestamp} +import com.evolutiongaming.kafka.flow.timer.{TimerContext, TimerFlowOf, TimersOf, Timestamp} import com.evolutiongaming.skafka.consumer.{ConsumerRecord, WithSize} -import com.evolutiongaming.skafka.{Offset, TopicPartition} +import com.evolutiongaming.skafka.{Offset, Partition, TopicPartition} import com.evolutiongaming.sstream.Stream import munit.FunSuite import scodec.bits.ByteVector @@ -23,6 +24,7 @@ import scodec.bits.ByteVector import scala.concurrent.duration._ class PartitionFlowSpec extends FunSuite { + import PartitionFlowSpec.RemapKeyState implicit val ioRuntime: IORuntime = IORuntime.global @@ -266,9 +268,121 @@ class PartitionFlowSpec extends FunSuite { flow.unsafeRunSync() } + test("RemapKeys derives keys correctly and updates them before applying filters and folds") { + val remap = RemapKey.of[IO] { (key, _) => IO.pure(s"$key-derived") } + + // Set up some data to be eagerly read on PartitionFlow creation + val initialKey = KafkaKey("appId", "groupId", TopicPartition("topic", Partition.min), "key1-derived") + val initialData = Map(initialKey -> "initial") + + val newKey = KafkaKey("appId", "groupId", TopicPartition("topic", Partition.min), "key2-derived") + + val test: IO[Unit] = setupRemapKeyTest(remap, initialData).use { + case RemapKeyState(cache, keys, snapshots, committedOffset, partitionFlow) => + val key1Record = ConsumerRecord( + TopicPartition("topic", Partition.min), + Offset.unsafe(1L), + None, + WithSize("key1").some, + WithSize(ByteVector("value1".getBytes)).some + ) + + val key2Record = key1Record.copy( + key = WithSize("key2").some, + value = WithSize(ByteVector("value2".getBytes)).some, + offset = Offset.unsafe(2L) + ) + + for { + // Ensure pre-existing data is loaded correctly from the storage + _ <- cache.keys.map(keys => assertEquals(keys.size, 1)) + _ <- keys.get.map(keys => assertEquals(keys, Set(initialKey))) + _ <- snapshots.get.map(snapshots => assertEquals(snapshots, initialData)) + _ <- committedOffset.get.map(offset => assertEquals(offset, Offset.min)) + + // Handle a record for the existing key and check that the key was correctly derived and fold applied + _ <- partitionFlow.apply(List(key1Record)) + _ <- cache.keys.map(keys => assertEquals(keys, Set(initialKey.key))) + _ <- keys.get.map(keys => assertEquals(keys, Set(initialKey))) + _ <- snapshots.get.map(snapshots => assertEquals(snapshots, Map(initialKey -> "initial+value1"))) + _ <- committedOffset.get.map(offset => assertEquals(offset, Offset.unsafe(2L))) + + // Handle a record for a new key and check that the key was correctly derived and fold applied + _ <- partitionFlow.apply(List(key2Record)) + _ <- cache.keys.map(keys => assertEquals(keys, Set(initialKey.key, newKey.key))) + _ <- keys.get.map(keys => assertEquals(keys, Set(initialKey, newKey))) + _ <- snapshots + .get + .map(snapshots => assertEquals(snapshots, Map(initialKey -> "initial+value1", newKey -> "value2"))) + _ <- committedOffset.get.map(offset => assertEquals(offset, Offset.unsafe(3L))) + } yield () + + } + + test.unsafeRunSync() + } + + private def setupRemapKeyTest(remapKey: RemapKey[IO], initialData: Map[KafkaKey, String]) = { + import com.evolutiongaming.kafka.flow.effect.CatsEffectMtlInstances._ + implicit val logOf = LogOf.empty[IO] + logOf.apply(classOf[PartitionFlowSpec]).toResource.flatMap { implicit log => + val committedOffset = Ref.unsafe[IO, Offset](Offset.min) + val keyStorage = Ref.unsafe[IO, Set[KafkaKey]](initialData.keySet) + val keysOf = KeysOf.apply[IO, KafkaKey](KeyDatabase.memory[IO, KafkaKey](keyStorage.stateInstance)) + val snapshotsStorage = Ref.unsafe[IO, Map[KafkaKey, String]](initialData) + val persistenceOf = + PersistenceOf + .snapshotsOnly[IO, KafkaKey, String, ConsumerRecord[String, ByteVector]]( + keysOf, + SnapshotsOf.backedBy(SnapshotDatabase.memory(snapshotsStorage.stateInstance)) + ) + val fold = FoldOption.of[IO, String, ConsumerRecord[String, ByteVector]] { (state, record) => + IO { + val event = new String(record.value.get.value.toArray) + state.fold(event)(_ + "+" + event).some + } + } + val timerFlowOf = TimerFlowOf.persistPeriodically[IO](fireEvery = 0.seconds, persistEvery = 0.seconds) + for { + timersOf <- TimersOf.memory[IO, KafkaKey].toResource + keyFlowOf = KeyFlowOf.apply(timerFlowOf, fold, TickOption.id[IO, String]) + keyStateOf = KeyStateOf.eagerRecovery[IO, String]( + applicationId = "appId", + groupId = "groupId", + keysOf = keysOf, + timersOf = timersOf, + persistenceOf = persistenceOf, + keyFlowOf = keyFlowOf, + registry = EntityRegistry.empty[IO, KafkaKey, String] + ) + cache <- Cache.loading[IO, String, PartitionKey[IO]] + partitionFlow <- PartitionFlow.of( + topicPartition = TopicPartition("topic", Partition.min), + assignedAt = Offset.min, + keyStateOf = keyStateOf, + cache = cache, + config = PartitionFlowConfig(triggerTimersInterval = 0.seconds, commitOffsetsInterval = 0.seconds), + filter = none, + remapKey = remapKey.some, + scheduleCommit = new ScheduleCommit[IO] { + def schedule(offset: Offset) = committedOffset.set(offset) + } + ) + } yield RemapKeyState(cache, keyStorage, snapshotsStorage, committedOffset, partitionFlow) + } + } + } object PartitionFlowSpec { + case class RemapKeyState( + cache: Cache[IO, String, PartitionKey[IO]], + keys: Ref[IO, Set[KafkaKey]], + snapshots: Ref[IO, Map[KafkaKey, String]], + committedOffset: Ref[IO, Offset], + partitionFlow: PartitionFlow[IO], + ) + class ConstFixture(waitForN: Int) { implicit val logOf: LogOf[IO] = LogOf.empty implicit val log: Log[IO] = Log.empty @@ -315,7 +429,8 @@ object PartitionFlowSpec { def makeFlow( timerFlowOf: TimerFlowOf[IO], persistenceOf: PersistenceOf[IO, String, State, ConsumerRecord[String, ByteVector]], - filter: Option[FilterRecord[IO]] = none + filter: Option[FilterRecord[IO]] = none, + remapKey: Option[RemapKey[IO]] = none, ): Resource[IO, PartitionFlow[IO]] = { val keyStateOf: KeyStateOf[IO] = new KeyStateOf[IO] { def apply( @@ -352,7 +467,8 @@ object PartitionFlowSpec { commitOffsetsInterval = 0.seconds ), filter = filter, - scheduleCommit = scheduleCommit + scheduleCommit = scheduleCommit, + remapKey = remapKey, ) } diff --git a/persistence-kafka-it-tests/src/test/resources/logback-test.xml b/persistence-kafka-it-tests/src/test/resources/logback-test.xml index 21ce9d4b..e62b323e 100644 --- a/persistence-kafka-it-tests/src/test/resources/logback-test.xml +++ b/persistence-kafka-it-tests/src/test/resources/logback-test.xml @@ -14,4 +14,4 @@ - \ No newline at end of file + diff --git a/persistence-kafka-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/RemapKeySpec.scala b/persistence-kafka-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/RemapKeySpec.scala new file mode 100644 index 00000000..c658c135 --- /dev/null +++ b/persistence-kafka-it-tests/src/test/scala/com/evolutiongaming/kafka/flow/RemapKeySpec.scala @@ -0,0 +1,208 @@ +package com.evolutiongaming.kafka.flow + +import cats.data.{NonEmptyList, NonEmptySet} +import cats.effect.unsafe.IORuntime +import cats.effect.{IO, Resource} +import cats.syntax.all._ +import com.evolutiongaming.catshelper.{Log, LogOf} +import com.evolutiongaming.kafka.flow.kafka.KafkaModule +import com.evolutiongaming.kafka.flow.kafkapersistence.{KafkaPersistenceModuleOf, kafkaEagerRecovery} +import com.evolutiongaming.kafka.flow.registry.EntityRegistry +import com.evolutiongaming.kafka.flow.timer.{TimerFlowOf, TimersOf} +import com.evolutiongaming.retry.Retry +import com.evolutiongaming.skafka.CommonConfig +import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig, ConsumerOf, ConsumerRecord} +import com.evolutiongaming.skafka.producer.{ProducerConfig, ProducerOf, ProducerRecord, RecordMetadata} +import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic} +import scodec.bits.ByteVector + +import java.util.Properties +import java.util.concurrent.TimeUnit +import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ +import com.evolutiongaming.skafka.Partition +import scala.util.Random + +class RemapKeySpec extends ForAllKafkaSuite { + implicit val ioRuntime: IORuntime = IORuntime.global + implicit val logOf: LogOf[IO] = LogOf.slf4j[IO].unsafeRunSync() + implicit val log: Log[IO] = logOf(this.getClass).unsafeRunSync() + + private def producerConfig = + ProducerConfig(common = CommonConfig(bootstrapServers = NonEmptyList.one(kafka.container.bootstrapServers))) + + private val appId = "app-id" + private val testGroupId = "group-id" + + private val stateTopic = "state-topic-remap-key" + + private def kafkaPersistenceModuleOf: Resource[IO, KafkaPersistenceModuleOf[IO, String]] = { + ProducerOf + .apply1[IO]() + .apply(producerConfig) + .map { producer => + KafkaPersistenceModuleOf.caching[IO, String]( + consumerOf = ConsumerOf.apply1[IO](), + producer = producer, + consumerConfig = ConsumerConfig( + common = producerConfig.common, + autoCommit = false, + autoOffsetReset = AutoOffsetReset.Earliest, + groupId = testGroupId.some, + ), + snapshotTopic = stateTopic + ) + } + .evalTap(_ => createTopic(stateTopic, 4)) + } + + test("remap keys but store snapshots in the same partition as input records") { + // using unique input topic name per test as weaver is running tests in parallel + val inputTopic = "kafka-persistence-test-remap-key-input" + kafkaPersistenceModuleOf + .use { persistenceModuleOf => + val kafka = kafkaModule() + val remapKey = RemapKey.of((key, _) => IO.pure(key + "-remapped")) + def findRecord(key: String, records: List[ConsumerRecord[String, String]]) = + records + .find(_.key.get.value == key) + .fold(IO.raiseError[ConsumerRecord[String, String]](new RuntimeException(s"$key not found")))(IO.pure) + + for { + _ <- createTopic(inputTopic, 4) + _ <- runFlow(kafka, persistenceModuleOf, inputTopic, remapKey).use { _ => + for { + // send input records to specific partitions deliberately + input1 <- sendInput(kafka, inputTopic, key = "key0", value = "1", partition = 1) + input2 <- sendInput(kafka, inputTopic, key = "key1", value = "2", partition = 3) + + // consume snapshots created by the flow + snapshots <- consume(2, stateTopic) + + // find snapshots by remapped keys and check that they are in the same partition as input records + // despite the keys were remapped + snapshot1 <- findRecord("key0-remapped", snapshots) + snapshot2 <- findRecord("key1-remapped", snapshots) + + _ = assertEquals(snapshot1.partition, input1.partition) + _ = assertEquals(snapshot2.partition, input2.partition) + } yield () + } + } yield () + } + .unsafeRunSync() + } + + private def createTopic(topic: String, partitions: Int) = { + val props = new Properties + props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.container.bootstrapServers) + + Resource.make(IO.delay(AdminClient.create(props)))(cl => IO(cl.close())).use { client => + IO(client.createTopics(List(new NewTopic(topic, partitions, 1.toShort)).asJava)).map(res => + res.all().get(10, TimeUnit.SECONDS) + ) + } + } + + private def consume(n: Int, inputTopic: String): IO[List[ConsumerRecord[String, String]]] = { + val config = + ConsumerConfig( + common = producerConfig.common, + autoCommit = false, + autoOffsetReset = AutoOffsetReset.Earliest, + groupId = Random.alphanumeric.take(10).mkString.some + ) + ConsumerOf.apply1[IO]().apply[String, String](config).use { consumer => + consumer.subscribe(NonEmptySet.one(inputTopic)).flatMap { _ => + def poll(acc: List[ConsumerRecord[String, String]]): IO[List[ConsumerRecord[String, String]]] = + consumer.poll(1.second).flatMap { consumerRecords => + val newRecords = acc ++ consumerRecords.values.values.map(_.toList).flatten.toList + if (newRecords.size < n) { + poll(newRecords) + } else IO.pure(newRecords) + } + + poll(List.empty).timeout(30.seconds) + } + } + } + + private def sendInput( + kafka: KafkaModule[IO], + inputTopic: String, + partition: Int, + key: String, + value: String + ): IO[RecordMetadata] = { + val config = producerConfig.copy(common = producerConfig.common.copy(clientId = Some("RemapKeySpec-producer"))) + kafka.producerOf(config).use { producer => + val record = ProducerRecord[String, String](inputTopic, value.some, key.some, Partition.unsafe(partition).some) + producer.send(record).flatten + } + } + + private def runFlow( + kafka: KafkaModule[IO], + persistenceModuleOf: KafkaPersistenceModuleOf[IO, String], + inputTopic: String, + remapKey: RemapKey[IO], + ): Resource[IO, IO[Unit]] = { + implicit val retry = Retry.empty[IO] + for { + flowOf <- topicFlowOf(persistenceModuleOf, remapKey).toResource + completion <- KafkaFlow + .resource( + consumer = kafka.consumerOf("groupId-RemapKeySpec"), + flowOf = ConsumerFlowOf[IO]( + topic = inputTopic, + flowOf = flowOf + ) + ) + } yield completion + } + + private def topicFlowOf( + persistenceModuleOf: KafkaPersistenceModuleOf[IO, String], + remapKey: RemapKey[IO], + ): IO[TopicFlowOf[IO]] = { + for { + timersOf <- TimersOf.memory[IO, KafkaKey] + partitionFlowOf = kafkaEagerRecovery[IO, String]( + kafkaPersistenceModuleOf = persistenceModuleOf, + applicationId = appId, + groupId = testGroupId, + timersOf = timersOf, + timerFlowOf = TimerFlowOf + .persistPeriodically[IO]( + // 0 seconds intervals are used to persist state after every consumer.poll + // to simplify test scenarios + fireEvery = 0.seconds, + persistEvery = 0.seconds, + // flush on revoke is set to false, as it has no impact on test outcomes + // coz we persist the state after every consumer.poll + flushOnRevoke = false + ), + fold = foldLogic, + partitionFlowConfig = PartitionFlowConfig( + // 0 seconds intervals are used to commit offsets after every consumer.poll + // to simplify test scenarios + triggerTimersInterval = 0.seconds, + commitOffsetsInterval = 0.seconds + ), + tick = TickOption.id[IO, String], + filter = none, + remapKey = remapKey.some, + registry = EntityRegistry.empty[IO, KafkaKey, String] + ) + } yield TopicFlowOf(partitionFlowOf) + } + + private def foldLogic: FoldOption[IO, String, ConsumerRecord[String, ByteVector]] = + FoldOption.of { (state, record) => + for { + input <- IO(record.value.get.value.decodeUtf8.toOption.get) + key = record.key.get.value + newState = state.fold(input)(_ + input) + } yield newState.some + } +} diff --git a/persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/package.scala b/persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/package.scala index 58bdf0c9..f3d03d03 100644 --- a/persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/package.scala +++ b/persistence-kafka/src/main/scala/com/evolutiongaming/kafka/flow/kafkapersistence/package.scala @@ -53,6 +53,8 @@ package object kafkapersistence { * enhances framework with metrics * @param filter * optional function to pre-filter incoming events before they are processed by `fold` + * @param remapKey + * optional function to remap keys before they are processed by `fold` */ def kafkaEagerRecovery[F[_]: Async: LogOf, S]( kafkaPersistenceModuleOf: KafkaPersistenceModuleOf[F, S], @@ -65,6 +67,7 @@ package object kafkapersistence { partitionFlowConfig: PartitionFlowConfig, metrics: FlowMetrics[F] = FlowMetrics.empty[F], filter: Option[FilterRecord[F]] = None, + remapKey: Option[RemapKey[F]] = None, registry: EntityRegistry[F, KafkaKey, S] ): PartitionFlowOf[F] = kafkaEagerRecovery( @@ -78,6 +81,7 @@ package object kafkapersistence { partitionFlowConfig = partitionFlowConfig, metrics = metrics, filter = filter, + remapKey = remapKey, additionalPersistOf = AdditionalStatePersistOf.empty[F, S], registry = registry ) @@ -131,6 +135,7 @@ package object kafkapersistence { partitionFlowConfig: PartitionFlowConfig, metrics: FlowMetrics[F], filter: Option[FilterRecord[F]], + remapKey: Option[RemapKey[F]], additionalPersistOf: AdditionalStatePersistOf[F, S], registry: EntityRegistry[F, KafkaKey, S] ): PartitionFlowOf[F] = @@ -159,8 +164,9 @@ package object kafkapersistence { additionalPersistOf = additionalPersistOf, registry = registry ) withMetrics metrics.keyStateOfMetrics, - config = partitionFlowConfig, - filter = filter + config = partitionFlowConfig, + filter = filter, + remapKey = remapKey, ) partitionFlow <- partitionFlowOf(topicPartition, assignedAt, scheduleCommit) } yield partitionFlow