Skip to content

Commit

Permalink
Add ReplicatedOffsetNotifier callback
Browse files Browse the repository at this point in the history
Current recovery logic doesn't scale when the volume of events is large,
because clients have to consume all the events on all the app nodes.
Additionally, current logic creates Kafka consumers on-demand,
which is a heavy operation. This consumer creation happens in spikes,
when replicator is running late, and affects client applications stability.

Introducing a side channel for notifications about replication progress
will allow us in the future to have an alternative implementation for recovery
without these flaws. I.e. notifications go to a separate lean Kafka topic
which is read by all client nodes. On recovery:
- a mark is written
- wait until mark offset confirmed replicated in the notification topic
  (with potential fallback to polling of the pointer table in Cassandra)
- recover from Cassandra only
  • Loading branch information
migesok committed Feb 7, 2025
1 parent 90e8bec commit 3467a96
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.evolutiongaming.kafka.journal.replicator

import cats.Applicative
import com.evolutiongaming.skafka.{Offset, TopicPartition}

/**
* Interface for receiving timely notifications about [[Replicator]] topic replication progress.
*
* @tparam F effect type
*/
trait ReplicatedOffsetNotifier[F[_]] {

/**
* This method's effect is evaluated when a topic-partition offset has been replicated. It is guaranteed that upon
* evaluating the effect, all the changes before this offset are visible in the `EventualJournal`.
*
* It is advised not to block semantically in the effect here, because it would slow down the replication process.
*
* On subsequent calls to `onReplicatedOffset` for a topic-partition, you might observe offsets smaller than the ones
* you saw before. This is possible when a topic-partition replication process is restarted from a last committed
* offset and replays events. The implementations are required to handle this situation gracefully, i.e. ignoring
* the offsets smaller than the previously seen ones.
*
* @param topicPartition topic partition
* @param offset offset, until which (including the changes at the offset itself)
* all the changes in the topic partition have been replicated
*/
def onReplicatedOffset(topicPartition: TopicPartition, offset: Offset): F[Unit]
}

object ReplicatedOffsetNotifier {

/**
* [[ReplicatedOffsetNotifier]] implementation which does nothing
* @tparam F effect type
*/
def empty[F[_]: Applicative]: ReplicatedOffsetNotifier[F] = (_: TopicPartition, _: Offset) => Applicative[F].unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,24 @@ object Replicator {
journal: ReplicatedJournal[F],
hostName: Option[HostName],
): Resource[F, F[Unit]] = {
make(
config = config,
metrics = metrics,
journal = journal,
hostName = hostName,
replicatedOffsetNotifier = ReplicatedOffsetNotifier.empty,
)
}

def make[
F[_]: Temporal: Parallel: Runtime: FromTry: ToTry: Fail: LogOf: KafkaConsumerOf: MeasureDuration: JsonCodec,
](
config: ReplicatorConfig,
metrics: Option[Metrics[F]],
journal: ReplicatedJournal[F],
hostName: Option[HostName],
replicatedOffsetNotifier: ReplicatedOffsetNotifier[F],
): Resource[F, F[Unit]] = {

val topicReplicator: Topic => Resource[F, F[Outcome[F, Throwable, Unit]]] =
(topic: Topic) => {
Expand All @@ -78,7 +96,7 @@ object Replicator {
.fold { TopicReplicatorMetrics.empty[F] } { metrics => metrics(topic) }

val cacheOf = CacheOf[F](config.cacheExpireAfter, metrics.flatMap(_.cache))
TopicReplicator.make(topic, journal, consumer, metrics1, cacheOf)
TopicReplicator.make(topic, journal, consumer, metrics1, cacheOf, replicatedOffsetNotifier)
}

val consumer = Consumer.make[F](config.kafka.consumer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,17 @@ import cats.data.{NonEmptyList as Nel, NonEmptyMap as Nem, NonEmptySet as Nes}
import cats.effect.*
import cats.effect.implicits.*
import cats.implicits.*
import com.evolutiongaming.catshelper.*
import com.evolutiongaming.catshelper.ClockHelper.*
import com.evolutiongaming.catshelper.ParallelHelper.*
import com.evolutiongaming.catshelper.{FromTry, Log, LogOf, MeasureDuration, ToTry}
import com.evolutiongaming.kafka.journal.*
import com.evolutiongaming.kafka.journal.conversions.{ConsRecordToActionRecord, KafkaRead}
import com.evolutiongaming.kafka.journal.eventual.*
import com.evolutiongaming.kafka.journal.util.Fail
import com.evolutiongaming.kafka.journal.util.SkafkaHelper.*
import com.evolutiongaming.retry.Sleep
import com.evolutiongaming.skafka.*
import com.evolutiongaming.skafka.consumer.{AutoOffsetReset, ConsumerConfig}
import com.evolutiongaming.skafka.{Metadata, Offset, Partition, Topic}
import scodec.bits.ByteVector

import java.time.Instant
Expand All @@ -36,6 +36,7 @@ private[journal] object TopicReplicator {
consumer: Resource[F, TopicConsumer[F]],
metrics: TopicReplicatorMetrics[F],
cacheOf: CacheOf[F],
replicatedOffsetNotifier: ReplicatedOffsetNotifier[F],
): Resource[F, F[Outcome[F, Throwable, Unit]]] = {

implicit val fromAttempt: FromAttempt[F] = FromAttempt.lift[F]
Expand All @@ -61,6 +62,7 @@ private[journal] object TopicReplicator {
metrics = metrics,
log = log,
cacheOf = cacheOf,
replicatedOffsetNotifier = replicatedOffsetNotifier,
)
}

Expand All @@ -86,6 +88,7 @@ private[journal] object TopicReplicator {
metrics: TopicReplicatorMetrics[F],
log: Log[F],
cacheOf: CacheOf[F],
replicatedOffsetNotifier: ReplicatedOffsetNotifier[F],
): F[Unit] = {

trait PartitionFlow {
Expand Down Expand Up @@ -173,18 +176,19 @@ private[journal] object TopicReplicator {
result <- {
val offset1 = records.maximumBy { _.offset }.offset

def set = offsetRef.set(offset1.some)
def setAndNotify = offsetRef.set(offset1.some) >>
replicatedOffsetNotifier.onReplicatedOffset(TopicPartition(topic, partition), offset1)

offset.fold {
for {
a <- offsets.create(offset1, timestamp)
_ <- set
_ <- setAndNotify
} yield a
} { offset =>
if (offset1 > offset) {
for {
a <- offsets.update(offset1, timestamp)
_ <- set
_ <- setAndNotify
} yield a
} else {
().pure[F]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers {
topicReplicator.run(data).unsafeToFuture().map {
case (result, _) =>
result shouldEqual State(
topics = List(topic),
commits = List(Nem.of((0, 5), (1, 5))),
pointers = Map((topic, Map((0, 4L), (1, 4L)))),
topics = List(topic),
commits = List(Nem.of((0, 5), (1, 5))),
pointers = Map((topic, Map((0, 4L), (1, 4L)))),
replicatedOffsetNotifications = Map((topic, Map((0, Vector(4L)), (1, Vector(4L))))),
journal = Map(
("0-0", List(record(seqNr = 1, partition = 0, offset = 1), record(seqNr = 2, partition = 0, offset = 3))),
(
Expand Down Expand Up @@ -113,9 +114,10 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers {
topicReplicator.run(state).unsafeToFuture().map {
case (result, _) =>
result shouldEqual State(
topics = List(topic),
commits = List(Nem.of((0, 2))),
pointers = Map((topic, Map((0, 1L)))),
topics = List(topic),
commits = List(Nem.of((0, 2))),
pointers = Map((topic, Map((0, 1L)))),
replicatedOffsetNotifications = Map((topic, Map((0, Vector(1L))))),
journal = Map(
(
"id",
Expand Down Expand Up @@ -173,7 +175,8 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers {
Nem.of((0, 3)),
Nem.of((0, 2)),
),
pointers = Map((topic, Map((0, 4L), (1, 4L)))),
pointers = Map((topic, Map((0, 4L), (1, 4L)))),
replicatedOffsetNotifications = Map((topic, Map((0, Vector(1L, 2L, 3L, 4L)), (1, Vector(1L, 2L, 3L, 4L))))),
journal = Map(
("0-0", List(record(seqNr = 2, partition = 0, offset = 3), record(seqNr = 1, partition = 0, offset = 1))),
(
Expand Down Expand Up @@ -270,9 +273,10 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers {
topicReplicator.run(data).unsafeToFuture().map {
case (result, _) =>
result shouldEqual State(
topics = List(topic),
commits = List(Nem.of((0, 10), (1, 10), (2, 10))),
pointers = Map((topic, Map((0, 9L), (1, 9L), (2, 9L)))),
topics = List(topic),
commits = List(Nem.of((0, 10), (1, 10), (2, 10))),
pointers = Map((topic, Map((0, 9L), (1, 9L), (2, 9L)))),
replicatedOffsetNotifications = Map((topic, Map((0, Vector(9L)), (1, Vector(9L)), (2, Vector(9L))))),
journal = Map(
("0-0", List(record(seqNr = 1, partition = 0, offset = 1), record(seqNr = 2, partition = 0, offset = 5))),
(
Expand Down Expand Up @@ -353,6 +357,39 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers {
}
}

"replicate when mark only" in {
val tp0 = topicPartitionOf(0)
val tp1 = topicPartitionOf(1)

def keyOf(tp: TopicPartition, id: String) = Key(id = s"${tp.partition}-$id", topic = tp.topic)

val records = ConsumerRecords(
Map(
tp0 -> Nel.of(
consumerRecordOf(markOf(keyOf(tp0, "1")), tp0, offset = 1L),
),
tp1 -> Nel.of(
consumerRecordOf(markOf(keyOf(tp1, "1")), tp1, offset = 10L),
consumerRecordOf(markOf(keyOf(tp1, "2")), tp1, offset = 11L),
),
),
)

val data = State(records = List(records))
topicReplicator.run(data).unsafeToFuture().map {
case (result, _) =>
result shouldEqual State(
topics = List(topic),
commits = List(Nem.of(0 -> 2L, 1 -> 12L)),
pointers = Map(topic -> Map(0 -> 1L, 1 -> 11L)),
replicatedOffsetNotifications = Map(topic -> Map(0 -> Vector(1L), 1 -> Vector(11L))),
journal = Map.empty,
metaJournal = Map.empty,
metrics = List(Metrics.Round(records = 3)),
)
}
}

"replicate appends and deletes" in {
val records = {
val records = for {
Expand Down Expand Up @@ -406,9 +443,10 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers {
topicReplicator.run(data).unsafeToFuture().map {
case (result, _) =>
result shouldEqual State(
topics = List(topic),
commits = List(Nem.of((0, 11), (1, 11))),
pointers = Map((topic, Map((0, 10L), (1, 10L)))),
topics = List(topic),
commits = List(Nem.of((0, 11), (1, 11))),
pointers = Map((topic, Map((0, 10L), (1, 10L)))),
replicatedOffsetNotifications = Map((topic, Map((0, Vector(10L)), (1, Vector(10L))))),
journal = Map(
("0-0", List(record(seqNr = 1, partition = 0, offset = 1), record(seqNr = 2, partition = 0, offset = 5))),
("0-1", List(record(seqNr = 3, partition = 0, offset = 8))),
Expand Down Expand Up @@ -522,7 +560,8 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers {
Nem.of((0, 3)),
Nem.of((0, 2)),
),
pointers = Map((topic, Map((0, 12L)))),
pointers = Map((topic, Map((0, 12L)))),
replicatedOffsetNotifications = Map((topic, Map((0, Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12))))),
journal = Map(
("0-0", Nil),
(
Expand Down Expand Up @@ -614,9 +653,10 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers {
topicReplicator.run(data).unsafeToFuture().map {
case (result, _) =>
result shouldEqual State(
topics = List(topic),
commits = List(Nem.of((0, 4), (1, 4), (2, 4))),
pointers = Map((topic, Map((0, 3L), (1, 3L), (2, 3L)))),
topics = List(topic),
commits = List(Nem.of((0, 4), (1, 4), (2, 4))),
pointers = Map((topic, Map((0, 3L), (1, 3L), (2, 3L)))),
replicatedOffsetNotifications = Map((topic, Map((0, Vector(3L)), (1, Vector(3L)), (2, Vector(3L))))),
journal = Map(
("0-0", List(record(seqNr = 2, partition = 0, offset = 2))),
(
Expand Down Expand Up @@ -685,10 +725,11 @@ class TopicReplicatorSpec extends AsyncWordSpec with Matchers {
.map {
case (result, _) =>
result shouldEqual State(
topics = List(topic),
commits = List(Nem.of((0, 2))),
pointers = Map((topic, Map((0, 1L)))),
metrics = List(Metrics.Round(records = 2), Metrics.Purge(actions = 1)),
topics = List(topic),
commits = List(Nem.of((0, 2))),
pointers = Map((topic, Map((0, 1L)))),
replicatedOffsetNotifications = Map((topic, Map((0, Vector(1L))))),
metrics = List(Metrics.Round(records = 2), Metrics.Purge(actions = 1)),
)
}
.unsafeToFuture()
Expand Down Expand Up @@ -919,6 +960,13 @@ object TopicReplicatorSpec {
def topics = SortedSet.empty[Topic].pure[StateT]
}

val replicatedOffsetNotifier: ReplicatedOffsetNotifier[StateT] =
(topicPartition: TopicPartition, offset: Offset) => {
StateT.unit { state =>
state.addReplicatedOffsetNotification(topicPartition, offset.value)
}
}

implicit val consumer: TopicConsumer[StateT] = new TopicConsumer[StateT] {

def subscribe(listener: RebalanceListener1[StateT]) = {
Expand Down Expand Up @@ -1012,17 +1060,19 @@ object TopicReplicatorSpec {
metrics = metrics,
log = Log.empty[StateT],
cacheOf = CacheOf.empty[StateT],
replicatedOffsetNotifier = replicatedOffsetNotifier,
)
}

final case class State(
topics: List[Topic] = Nil,
commits: List[Nem[Int, Long]] = Nil,
records: List[ConsRecords] = Nil,
pointers: Map[Topic, Map[Int, Long]] = Map.empty,
journal: Map[String, List[EventRecord[EventualPayloadAndType]]] = Map.empty,
metaJournal: Map[String, MetaJournal] = Map.empty,
metrics: List[Metrics] = Nil,
topics: List[Topic] = Nil,
commits: List[Nem[Int, Long]] = Nil,
records: List[ConsRecords] = Nil,
pointers: Map[Topic, Map[Int, Long]] = Map.empty,
replicatedOffsetNotifications: Map[Topic, Map[Int, Vector[Long]]] = Map.empty,
journal: Map[String, List[EventRecord[EventualPayloadAndType]]] = Map.empty,
metaJournal: Map[String, MetaJournal] = Map.empty,
metrics: List[Metrics] = Nil,
) { self =>

def +(metrics: Metrics): (State, Unit) = {
Expand Down Expand Up @@ -1057,6 +1107,22 @@ object TopicReplicatorSpec {
copy(journal = self.journal.updated(id, records), metaJournal = self.metaJournal.updated(id, metaJournal))
}
}

def addReplicatedOffsetNotification(topicPartition: TopicPartition, offset: Long): State = {
val topic = topicPartition.topic
val partition = topicPartition.partition.value
val prevPartitionToOffsets = replicatedOffsetNotifications.getOrElse(topic, Map.empty)
val prevOffsets = prevPartitionToOffsets.getOrElse(partition, Vector.empty)
copy(
replicatedOffsetNotifications = replicatedOffsetNotifications.updated(
topic,
prevPartitionToOffsets.updated(
partition,
prevOffsets :+ offset,
),
),
)
}
}

type StateT[A] = cats.data.StateT[IO, State, A]
Expand Down

0 comments on commit 3467a96

Please sign in to comment.