Skip to content

Commit

Permalink
allow deriving and remapping keys (#616)
Browse files Browse the repository at this point in the history
  • Loading branch information
Z1kkurat authored Jul 23, 2024
1 parent 3401aa6 commit 5090f6f
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import com.evolutiongaming.catshelper.{Log, LogOf}
import com.evolutiongaming.kafka.flow.PartitionFlowConfig.ParallelismMode._
import com.evolutiongaming.kafka.flow.kafka.{OffsetToCommit, ScheduleCommit}
import com.evolutiongaming.kafka.flow.timer.{TimerContext, Timestamp}
import com.evolutiongaming.skafka.consumer.ConsumerRecord
import com.evolutiongaming.skafka.consumer.{ConsumerRecord, WithSize}
import com.evolutiongaming.skafka.{Offset, TopicPartition}
import scodec.bits.ByteVector

Expand Down Expand Up @@ -57,11 +57,12 @@ object PartitionFlow {
keyStateOf: KeyStateOf[F],
config: PartitionFlowConfig,
filter: Option[FilterRecord[F]] = None,
remapKey: Option[RemapKey[F]] = None,
scheduleCommit: ScheduleCommit[F]
): Resource[F, PartitionFlow[F]] =
LogResource[F](getClass, topicPartition.toString) flatMap { implicit log =>
Cache.loading[F, String, PartitionKey[F]] flatMap { cache =>
of(topicPartition, assignedAt, keyStateOf, cache, config, filter, scheduleCommit)
of(topicPartition, assignedAt, keyStateOf, cache, config, filter, remapKey, scheduleCommit)
}
}

Expand All @@ -72,6 +73,7 @@ object PartitionFlow {
cache: Cache[F, String, PartitionKey[F]],
config: PartitionFlowConfig,
filter: Option[FilterRecord[F]] = None,
remapKey: Option[RemapKey[F]] = None,
scheduleCommit: ScheduleCommit[F]
)(implicit log: Log[F]): Resource[F, PartitionFlow[F]] = for {
clock <- Resource.eval(Clock[F].instant)
Expand All @@ -89,6 +91,7 @@ object PartitionFlow {
cache = cache,
config = config,
filter = filter,
remapKey = remapKey,
scheduleCommit = scheduleCommit
)
} yield flow
Expand All @@ -104,6 +107,7 @@ object PartitionFlow {
cache: Cache[F, String, PartitionKey[F]],
config: PartitionFlowConfig,
filter: Option[FilterRecord[F]],
remapKey: Option[RemapKey[F]],
scheduleCommit: ScheduleCommit[F]
)(implicit log: Log[F]): Resource[F, PartitionFlow[F]] = {

Expand Down Expand Up @@ -144,7 +148,16 @@ object PartitionFlow {
_ <- log.debug(s"processing ${records.size} records")

clock <- Clock[F].instant
keys = records groupBy (_.key map (_.value)) collect {
remappedRecords <- remapKey.fold(records.pure[F])(remapKey =>
records.traverse { record =>
record.key match {
case Some(WithSize(key, _)) =>
remapKey.remap(key, record).map(newKey => record.copy(key = WithSize(newKey).some))
case None => record.pure[F]
}
}
)
keys = remappedRecords.groupBy(_.key map (_.value)).collect {
// we deliberately ignore records without a key to simplify the code
// we might return the support in future if such will be required
case (Some(key), records) => (key, records)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.evolutiongaming.kafka.flow

import cats.effect.kernel.Async
import cats.effect.Resource
import cats.effect.kernel.Async
import com.evolutiongaming.catshelper.LogOf
import com.evolutiongaming.kafka.flow.PartitionFlow.FilterRecord
import com.evolutiongaming.kafka.flow.kafka.ScheduleCommit
Expand All @@ -26,12 +26,17 @@ object PartitionFlowOf {
* no state will be restored for that key; (2) no fold will be executed for that event. It doesn't affect
* committing consumer offsets, thus, even if all records in a batch are skipped, new offsets will still be
* committed if necessary
* @param remapKey
* allows to remap the key of a record before it is processed by the flow. Remapping is done before the record is
* processed by the flow. Thus, the next steps in the flow (such as [[FilterRecord]] and [[FoldOption]]) will see
* the remapped key
*/
def apply[F[_]: Async: LogOf](
keyStateOf: KeyStateOf[F],
config: PartitionFlowConfig = PartitionFlowConfig(),
filter: Option[FilterRecord[F]] = None
filter: Option[FilterRecord[F]] = None,
remapKey: Option[RemapKey[F]] = None,
): PartitionFlowOf[F] = { (topicPartition, assignedAt, scheduleCommit) =>
PartitionFlow.resource(topicPartition, assignedAt, keyStateOf, config, filter, scheduleCommit)
PartitionFlow.resource(topicPartition, assignedAt, keyStateOf, config, filter, remapKey, scheduleCommit)
}
}
22 changes: 22 additions & 0 deletions core/src/main/scala/com/evolutiongaming/kafka/flow/RemapKey.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.evolutiongaming.kafka.flow

import cats.Applicative
import cats.syntax.applicative._
import com.evolutiongaming.skafka.consumer.ConsumerRecord
import scodec.bits.ByteVector

trait RemapKey[F[_]] {

/** Derive a new key for the consumer record based on the current key (if there is one) and the record itself.
* Deriving is done before the record is processed by the flow. Thus, the next steps in the flow (such as
* [[FilterRecord]] and [[FoldOption]]) will see the remapped key in the consumer record.
*/
def remap(key: String, record: ConsumerRecord[String, ByteVector]): F[String]
}

object RemapKey {
def of[F[_]](f: (String, ConsumerRecord[String, ByteVector]) => F[String]): RemapKey[F] = (key, record) =>
f(key, record)

def empty[F[_]: Applicative]: RemapKey[F] = (key, _) => key.pure[F]
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,28 @@ package com.evolutiongaming.kafka.flow
import cats.effect.unsafe.IORuntime
import cats.effect.{IO, Ref, Resource}
import cats.syntax.all._
import com.evolution.scache.Cache
import com.evolutiongaming.catshelper.{Log, LogOf}
import com.evolutiongaming.kafka.flow.PartitionFlow.FilterRecord
import com.evolutiongaming.kafka.flow.PartitionFlow.{FilterRecord, PartitionKey}
import com.evolutiongaming.kafka.flow.PartitionFlowSpec._
import com.evolutiongaming.kafka.flow.effect.CatsEffectMtlInstances._
import com.evolutiongaming.kafka.flow.journal.JournalsOf
import com.evolutiongaming.kafka.flow.kafka.{ScheduleCommit, ToOffset}
import com.evolutiongaming.kafka.flow.key.KeysOf
import com.evolutiongaming.kafka.flow.key.{KeyDatabase, KeysOf}
import com.evolutiongaming.kafka.flow.persistence.PersistenceOf
import com.evolutiongaming.kafka.flow.registry.EntityRegistry
import com.evolutiongaming.kafka.flow.snapshot.{SnapshotDatabase, SnapshotsOf}
import com.evolutiongaming.kafka.flow.timer.{TimerContext, TimerFlowOf, Timestamp}
import com.evolutiongaming.kafka.flow.timer.{TimerContext, TimerFlowOf, TimersOf, Timestamp}
import com.evolutiongaming.skafka.consumer.{ConsumerRecord, WithSize}
import com.evolutiongaming.skafka.{Offset, TopicPartition}
import com.evolutiongaming.skafka.{Offset, Partition, TopicPartition}
import com.evolutiongaming.sstream.Stream
import munit.FunSuite
import scodec.bits.ByteVector

import scala.concurrent.duration._

class PartitionFlowSpec extends FunSuite {
import PartitionFlowSpec.RemapKeyState

implicit val ioRuntime: IORuntime = IORuntime.global

Expand Down Expand Up @@ -266,9 +268,121 @@ class PartitionFlowSpec extends FunSuite {
flow.unsafeRunSync()
}

test("RemapKeys derives keys correctly and updates them before applying filters and folds") {
val remap = RemapKey.of[IO] { (key, _) => IO.pure(s"$key-derived") }

// Set up some data to be eagerly read on PartitionFlow creation
val initialKey = KafkaKey("appId", "groupId", TopicPartition("topic", Partition.min), "key1-derived")
val initialData = Map(initialKey -> "initial")

val newKey = KafkaKey("appId", "groupId", TopicPartition("topic", Partition.min), "key2-derived")

val test: IO[Unit] = setupRemapKeyTest(remap, initialData).use {
case RemapKeyState(cache, keys, snapshots, committedOffset, partitionFlow) =>
val key1Record = ConsumerRecord(
TopicPartition("topic", Partition.min),
Offset.unsafe(1L),
None,
WithSize("key1").some,
WithSize(ByteVector("value1".getBytes)).some
)

val key2Record = key1Record.copy(
key = WithSize("key2").some,
value = WithSize(ByteVector("value2".getBytes)).some,
offset = Offset.unsafe(2L)
)

for {
// Ensure pre-existing data is loaded correctly from the storage
_ <- cache.keys.map(keys => assertEquals(keys.size, 1))
_ <- keys.get.map(keys => assertEquals(keys, Set(initialKey)))
_ <- snapshots.get.map(snapshots => assertEquals(snapshots, initialData))
_ <- committedOffset.get.map(offset => assertEquals(offset, Offset.min))

// Handle a record for the existing key and check that the key was correctly derived and fold applied
_ <- partitionFlow.apply(List(key1Record))
_ <- cache.keys.map(keys => assertEquals(keys, Set(initialKey.key)))
_ <- keys.get.map(keys => assertEquals(keys, Set(initialKey)))
_ <- snapshots.get.map(snapshots => assertEquals(snapshots, Map(initialKey -> "initial+value1")))
_ <- committedOffset.get.map(offset => assertEquals(offset, Offset.unsafe(2L)))

// Handle a record for a new key and check that the key was correctly derived and fold applied
_ <- partitionFlow.apply(List(key2Record))
_ <- cache.keys.map(keys => assertEquals(keys, Set(initialKey.key, newKey.key)))
_ <- keys.get.map(keys => assertEquals(keys, Set(initialKey, newKey)))
_ <- snapshots
.get
.map(snapshots => assertEquals(snapshots, Map(initialKey -> "initial+value1", newKey -> "value2")))
_ <- committedOffset.get.map(offset => assertEquals(offset, Offset.unsafe(3L)))
} yield ()

}

test.unsafeRunSync()
}

private def setupRemapKeyTest(remapKey: RemapKey[IO], initialData: Map[KafkaKey, String]) = {
import com.evolutiongaming.kafka.flow.effect.CatsEffectMtlInstances._
implicit val logOf = LogOf.empty[IO]
logOf.apply(classOf[PartitionFlowSpec]).toResource.flatMap { implicit log =>
val committedOffset = Ref.unsafe[IO, Offset](Offset.min)
val keyStorage = Ref.unsafe[IO, Set[KafkaKey]](initialData.keySet)
val keysOf = KeysOf.apply[IO, KafkaKey](KeyDatabase.memory[IO, KafkaKey](keyStorage.stateInstance))
val snapshotsStorage = Ref.unsafe[IO, Map[KafkaKey, String]](initialData)
val persistenceOf =
PersistenceOf
.snapshotsOnly[IO, KafkaKey, String, ConsumerRecord[String, ByteVector]](
keysOf,
SnapshotsOf.backedBy(SnapshotDatabase.memory(snapshotsStorage.stateInstance))
)
val fold = FoldOption.of[IO, String, ConsumerRecord[String, ByteVector]] { (state, record) =>
IO {
val event = new String(record.value.get.value.toArray)
state.fold(event)(_ + "+" + event).some
}
}
val timerFlowOf = TimerFlowOf.persistPeriodically[IO](fireEvery = 0.seconds, persistEvery = 0.seconds)
for {
timersOf <- TimersOf.memory[IO, KafkaKey].toResource
keyFlowOf = KeyFlowOf.apply(timerFlowOf, fold, TickOption.id[IO, String])
keyStateOf = KeyStateOf.eagerRecovery[IO, String](
applicationId = "appId",
groupId = "groupId",
keysOf = keysOf,
timersOf = timersOf,
persistenceOf = persistenceOf,
keyFlowOf = keyFlowOf,
registry = EntityRegistry.empty[IO, KafkaKey, String]
)
cache <- Cache.loading[IO, String, PartitionKey[IO]]
partitionFlow <- PartitionFlow.of(
topicPartition = TopicPartition("topic", Partition.min),
assignedAt = Offset.min,
keyStateOf = keyStateOf,
cache = cache,
config = PartitionFlowConfig(triggerTimersInterval = 0.seconds, commitOffsetsInterval = 0.seconds),
filter = none,
remapKey = remapKey.some,
scheduleCommit = new ScheduleCommit[IO] {
def schedule(offset: Offset) = committedOffset.set(offset)
}
)
} yield RemapKeyState(cache, keyStorage, snapshotsStorage, committedOffset, partitionFlow)
}
}

}
object PartitionFlowSpec {

case class RemapKeyState(
cache: Cache[IO, String, PartitionKey[IO]],
keys: Ref[IO, Set[KafkaKey]],
snapshots: Ref[IO, Map[KafkaKey, String]],
committedOffset: Ref[IO, Offset],
partitionFlow: PartitionFlow[IO],
)

class ConstFixture(waitForN: Int) {
implicit val logOf: LogOf[IO] = LogOf.empty
implicit val log: Log[IO] = Log.empty
Expand Down Expand Up @@ -315,7 +429,8 @@ object PartitionFlowSpec {
def makeFlow(
timerFlowOf: TimerFlowOf[IO],
persistenceOf: PersistenceOf[IO, String, State, ConsumerRecord[String, ByteVector]],
filter: Option[FilterRecord[IO]] = none
filter: Option[FilterRecord[IO]] = none,
remapKey: Option[RemapKey[IO]] = none,
): Resource[IO, PartitionFlow[IO]] = {
val keyStateOf: KeyStateOf[IO] = new KeyStateOf[IO] {
def apply(
Expand Down Expand Up @@ -352,7 +467,8 @@ object PartitionFlowSpec {
commitOffsetsInterval = 0.seconds
),
filter = filter,
scheduleCommit = scheduleCommit
scheduleCommit = scheduleCommit,
remapKey = remapKey,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@
<appender-ref ref="CONSOLE"/>
</root>

</configuration>
</configuration>
Loading

0 comments on commit 5090f6f

Please sign in to comment.