Skip to content

Commit

Permalink
fix: unexpected events when controlling replication with consumer fil…
Browse files Browse the repository at this point in the history
…ter (#1133)

Filters are not used in replay requests, because the change of consumer filter is typically what triggers a replay request (missing preceding events) but that means that a replica may receive unexpected events, so  apply the filters after the sequence number that triggered the replay
  • Loading branch information
patriknw authored Mar 4, 2024
1 parent 1e46416 commit aa6f961
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ import akka.annotation.InternalApi
*/
@InternalApi
private[akka] trait CanTriggerReplay {
private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit
private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long, triggeredBySeqNr: Long): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,6 @@ private[projection] final class JavaToScalaBySliceSourceProviderAdapterWithCanTr
delegate: javadsl.SourceProvider[Offset, Envelope] with CanTriggerReplay)
extends JavaToScalaBySliceSourceProviderAdapter[Offset, Envelope](delegate)
with CanTriggerReplay {
override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr)
override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long, triggeredBySeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr, triggeredBySeqNr)
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ private[projection] final class ScalaToJavaBySlicesSourceProviderAdapterWithCanT
extends ScalaToJavaBySlicesSourceProviderAdapter[Offset, Envelope](delegate)
with CanTriggerReplay {

override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr)
override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long, triggeredBySeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr, triggeredBySeqNr)
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,11 @@ object EventSourcedProvider {
maxSlice,
adjustStartOffset) with CanTriggerReplay {

private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
query.triggerReplay(persistenceId, fromSeqNr)
private[akka] override def triggerReplay(
persistenceId: String,
fromSeqNr: Long,
triggeredBySeqNr: Long): Unit =
query.triggerReplay(persistenceId, fromSeqNr, triggeredBySeqNr)

}
case _ =>
Expand Down Expand Up @@ -227,8 +230,11 @@ object EventSourcedProvider {
transformSnapshot,
adjustStartOffset) with CanTriggerReplay {

private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
query.triggerReplay(persistenceId, fromSeqNr)
private[akka] override def triggerReplay(
persistenceId: String,
fromSeqNr: Long,
triggeredBySeqNr: Long): Unit =
query.triggerReplay(persistenceId, fromSeqNr, triggeredBySeqNr)

}
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,11 @@ object EventSourcedProvider {
minSlice,
maxSlice,
adjustStartOffset) with CanTriggerReplay {
override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
query.triggerReplay(persistenceId, fromSeqNr)
override private[akka] def triggerReplay(
persistenceId: String,
fromSeqNr: Long,
triggeredBySeqNr: Long): Unit =
query.triggerReplay(persistenceId, fromSeqNr, triggeredBySeqNr)
}
case _ =>
new EventsBySlicesSourceProvider(system, eventsBySlicesQuery, entityType, minSlice, maxSlice, adjustStartOffset)
Expand Down Expand Up @@ -212,8 +215,11 @@ object EventSourcedProvider {
maxSlice,
transformSnapshot,
adjustStartOffset) with CanTriggerReplay {
override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
query.triggerReplay(persistenceId, fromSeqNr)
override private[akka] def triggerReplay(
persistenceId: String,
fromSeqNr: Long,
triggeredBySeqNr: Long): Unit =
query.triggerReplay(persistenceId, fromSeqNr, triggeredBySeqNr)
}
case _ =>
new EventsBySlicesStartingFromSnapshotsSourceProvider(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import akka.cluster.typed.Cluster
import akka.cluster.typed.Join
import akka.grpc.GrpcClientSettings
import akka.http.scaladsl.Http
import akka.persistence.r2dbc.R2dbcSettings
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.scaladsl.Effect
import akka.persistence.typed.scaladsl.EventSourcedBehavior
Expand Down Expand Up @@ -157,6 +158,8 @@ class ControlledReplicationIntegrationSpec(testContainerConf: TestContainerConf)
private val logger = LoggerFactory.getLogger(classOf[ControlledReplicationIntegrationSpec])
override def typedSystem: ActorSystem[_] = testKit.system

private lazy val r2dbcSettings = R2dbcSettings(system.settings.config.getConfig("akka.persistence.r2dbc"))

private val systems = Seq[ActorSystem[_]](
typedSystem,
akka.actor
Expand Down Expand Up @@ -237,7 +240,7 @@ class ControlledReplicationIntegrationSpec(testContainerConf: TestContainerConf)
logger.info("All three replication/producer services bound")
}

"replicate writes from one dc to others by tagging" in {
"control replication by ConsumerFilter tagging" in {
val entityId = "one"
val entityRefA = ClusterSharding(systemPerDc(DCA)).entityRefFor(TestEntity.EntityType, entityId)
val entityRefB = ClusterSharding(systemPerDc(DCB)).entityRefFor(TestEntity.EntityType, entityId)
Expand Down Expand Up @@ -282,6 +285,69 @@ class ControlledReplicationIntegrationSpec(testContainerConf: TestContainerConf)

}

"not replicate filtered events after replay trigger" in {
val entityId = "two"
val entityRefA = ClusterSharding(systemPerDc(DCA)).entityRefFor(TestEntity.EntityType, entityId)
val entityRefB = ClusterSharding(systemPerDc(DCB)).entityRefFor(TestEntity.EntityType, entityId)
val entityRefC = ClusterSharding(systemPerDc(DCC)).entityRefFor(TestEntity.EntityType, entityId)

// update in A
entityRefA.ask(TestEntity.UpdateItem("A", 0, _)).futureValue
entityRefA.ask(TestEntity.UpdateItem("A", 1, _)).futureValue

// replicate to B
logger.info("Replicate from A to B")
entityRefA.ask(TestEntity.SetScope(Set(DCB.id), _)).futureValue
eventually {
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1), Set(DCB.id))
}

// update in B
entityRefB.ask(TestEntity.UpdateItem("B", 2, _)).futureValue
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCB.id))
logger.info("Replicate from B to A")
// replicate to A
entityRefB.ask(TestEntity.SetScope(Set(DCA.id), _)).futureValue
eventually {
entityRefA.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
}

// replicate to C
// This will emit event from C to B for event that was scoped for B originally (those between A and B).
// That will trigger replay request from B to C. Filters are normally not used in replay because
// the changed filter is typically what is triggering the replay, but filters should still be applied
// after the event that triggered the replay.
logger.info("Replicate from A to C")
entityRefA.ask(TestEntity.SetScope(Set(DCC.id), _)).futureValue
eventually {
entityRefC.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCC.id))
}

// update in C
entityRefC.ask(TestEntity.UpdateItem("C", 3, _)).futureValue
// latest should not be replicated to B
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
Thread.sleep(r2dbcSettings.querySettings.backtrackingBehindCurrentTime.toMillis + 200)
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))

// another update in C
entityRefC.ask(TestEntity.UpdateItem("C", 4, _)).futureValue
// latest should still not be replicated to B
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))
Thread.sleep(r2dbcSettings.querySettings.backtrackingBehindCurrentTime.toMillis + 200)
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(Map("A" -> 1, "B" -> 2), Set(DCA.id))

// replicate to B
logger.info("Replicate from C to B")
entityRefC.ask(TestEntity.SetScope(Set(DCB.id), _)).futureValue
eventually {
entityRefB.ask(TestEntity.Get).futureValue shouldBe TestEntity.State(
Map("A" -> 1, "B" -> 2, "C" -> 4),
Set(DCB.id))
}

}

}

protected override def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import akka.projection.grpc.internal.proto.EventTimestampRequest
import akka.projection.grpc.internal.proto.InitReq
import akka.projection.grpc.internal.proto.LoadEventRequest
import akka.projection.grpc.internal.proto.PersistenceIdSeqNr
import akka.projection.grpc.internal.proto.ReplayPersistenceId
import akka.projection.grpc.internal.proto.ReplayReq
import akka.projection.grpc.internal.proto.ReplicaInfo
import akka.projection.grpc.internal.proto.StreamIn
Expand Down Expand Up @@ -427,7 +428,8 @@ class EventProducerServiceSpec
"replay events" in {
val persistenceId = nextPid(entityType3)
val initReq = InitReq(streamId3, 0, 1023, offset = None)
val replayReq = ReplayReq(List(PersistenceIdSeqNr(persistenceId.id, 2L)))
val replayReq = ReplayReq(replayPersistenceIds =
List(ReplayPersistenceId(Some(PersistenceIdSeqNr(persistenceId.id, 2L)), filterAfterSeqNr = Long.MaxValue)))
val streamIn =
Source(List(StreamIn(StreamIn.Message.Init(initReq)), StreamIn(StreamIn.Message.Replay(replayReq))))
.concat(Source.maybe)
Expand Down Expand Up @@ -488,7 +490,8 @@ class EventProducerServiceSpec
"replay events StartingFromSnapshots" in {
val persistenceId = nextPid(entityType5)
val initReq = InitReq(streamId5, 0, 1023, offset = None)
val replayReq = ReplayReq(List(PersistenceIdSeqNr(persistenceId.id, 1L)))
val replayReq = ReplayReq(replayPersistenceIds =
List(ReplayPersistenceId(Some(PersistenceIdSeqNr(persistenceId.id, 1L)), filterAfterSeqNr = Long.MaxValue)))
val streamIn =
Source(List(StreamIn(StreamIn.Message.Init(initReq)), StreamIn(StreamIn.Message.Replay(replayReq))))
.concat(Source.maybe)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import akka.projection.grpc.internal.proto.IncludeEntityIds
import akka.projection.grpc.internal.proto.IncludeTags
import akka.projection.grpc.internal.proto.PersistenceIdSeqNr
import akka.projection.grpc.internal.proto.ReplayReq
import akka.projection.grpc.internal.proto.ReplayPersistenceId
import akka.projection.grpc.internal.proto.StreamIn
import akka.projection.grpc.producer.EventProducerSettings
import akka.stream.scaladsl.BidiFlow
Expand Down Expand Up @@ -132,6 +133,14 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
val envPublisher = envPublisherPromise.future.futureValue
}

private def streamInReplayReq(pid: String, fromSeqNr: Long, filterAfterSeqNr: Long = Long.MaxValue): StreamIn =
StreamIn(
StreamIn.Message.Replay(
ReplayReq(replayPersistenceIds = List(replayPersistenceId(pid, fromSeqNr, filterAfterSeqNr)))))

private def replayPersistenceId(pid: String, fromSeqNr: Long, filterAfterSeqNr: Long = Long.MaxValue) =
ReplayPersistenceId(Some(PersistenceIdSeqNr(pid, fromSeqNr)), filterAfterSeqNr)

"FilterStage" must {
"emit EventEnvelope" in new Setup {
allEnvelopes.foreach(envPublisher.sendNext)
Expand Down Expand Up @@ -242,17 +251,16 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""

inPublisher.sendNext(
StreamIn(
StreamIn.Message.Replay(ReplayReq(List(
PersistenceIdSeqNr(PersistenceId(entityType, "b").id, 1L),
PersistenceIdSeqNr(PersistenceId(entityType, "c").id, 1L))))))
StreamIn.Message.Replay(ReplayReq(replayPersistenceIds = List(
replayPersistenceId(PersistenceId(entityType, "b").id, 1L),
replayPersistenceId(PersistenceId(entityType, "c").id, 1L))))))

outProbe.request(10)
// no guarantee of order between b and c
outProbe.expectNextN(2).map(_.event).toSet shouldBe Set("b1", "c1")
outProbe.expectNoMessage()

inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(ReplayReq(List(PersistenceIdSeqNr(PersistenceId(entityType, "d").id, 1L))))))
inPublisher.sendNext(streamInReplayReq(PersistenceId(entityType, "d").id, 1L))
// it will not emit replayed event until there is some progress from the ordinary envSource, probably ok
outProbe.expectNoMessage()
envPublisher.sendNext(createEnvelope(PersistenceId(entityType, "e"), 1, "e1", tags = Set("WIP")))
Expand All @@ -272,19 +280,15 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
outProbe.request(10)
outProbe.expectNext().event shouldBe "a2"

inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(
ReplayReq(List(PersistenceIdSeqNr(ReplicationId(entityType, "a", ReplicaId("A")).persistenceId.id, 1L))))))
inPublisher.sendNext(streamInReplayReq(ReplicationId(entityType, "a", ReplicaId("A")).persistenceId.id, 1L))
// it will not emit replayed event until there is some progress from the ordinary envSource, probably ok
envPublisher.sendNext(createEnvelope(PersistenceId(entityType, "e"), 1, "e1"))
outProbe.expectNext().event shouldBe "e1"
outProbe.expectNext().event shouldBe "a1"
outProbe.expectNext().event shouldBe "a2"

// but ignored if it's a request for another replicaId
inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(
ReplayReq(List(PersistenceIdSeqNr(ReplicationId(entityType, "a", ReplicaId("B")).persistenceId.id, 1L))))))
inPublisher.sendNext(streamInReplayReq(ReplicationId(entityType, "a", ReplicaId("B")).persistenceId.id, 1L))
outProbe.expectNoMessage()
}

Expand All @@ -294,14 +298,14 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
entityIds.map(id => createEnvelope(PersistenceId(entityType, id), 1, id))

inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(
ReplayReq(entityIds.take(7).map(id => PersistenceIdSeqNr(PersistenceId(entityType, id).id, 1L))))))
StreamIn(StreamIn.Message.Replay(ReplayReq(replayPersistenceIds =
entityIds.take(7).map(id => replayPersistenceId(PersistenceId(entityType, id).id, 1L))))))
inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(
ReplayReq(entityIds.slice(7, 10).map(id => PersistenceIdSeqNr(PersistenceId(entityType, id).id, 1L))))))
StreamIn(StreamIn.Message.Replay(ReplayReq(replayPersistenceIds =
entityIds.slice(7, 10).map(id => replayPersistenceId(PersistenceId(entityType, id).id, 1L))))))
inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(
ReplayReq(entityIds.drop(10).map(id => PersistenceIdSeqNr(PersistenceId(entityType, id).id, 1L))))))
StreamIn(StreamIn.Message.Replay(ReplayReq(replayPersistenceIds =
entityIds.drop(10).map(id => replayPersistenceId(PersistenceId(entityType, id).id, 1L))))))

outProbe.request(100)
// no guarantee of order between different entityIds
Expand All @@ -313,6 +317,34 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
envPublisher.sendComplete()
}

"replay from ReplayReq and filter after seqNr" in new Setup {
override lazy val allEnvelopes =
Vector(
createEnvelope(PersistenceId(entityType, "d"), 1, "d1"),
createEnvelope(PersistenceId(entityType, "d"), 2, "d2"),
createEnvelope(PersistenceId(entityType, "d"), 3, "d3", tags = Set("WIP")),
createEnvelope(PersistenceId(entityType, "d"), 4, "d4"),
createEnvelope(PersistenceId(entityType, "d"), 5, "d5"))

// filter should not exclude events from replay, e.g. d1 without the WIP tag, but
// filters are applied for events with seqNr >= filterAfterSeqNr
val filterCriteria = List(
FilterCriteria(FilterCriteria.Message.ExcludeMatchingEntityIds(ExcludeRegexEntityIds(List(".*")))),
FilterCriteria(FilterCriteria.Message.IncludeTags(IncludeTags(List("WIP")))))
inPublisher.sendNext(StreamIn(StreamIn.Message.Filter(FilterReq(filterCriteria))))

outProbe.request(10)

inPublisher.sendNext(streamInReplayReq(PersistenceId(entityType, "d").id, 1L, filterAfterSeqNr = 4))
// it will not emit replayed event until there is some progress from the ordinary envSource, probably ok
outProbe.expectNoMessage()
envPublisher.sendNext(createEnvelope(PersistenceId(entityType, "e"), 1, "e1"))
outProbe.expectNext().event shouldBe "d1"
outProbe.expectNext().event shouldBe "d2"
outProbe.expectNext().event shouldBe "d3"
outProbe.expectNoMessage() // d4 and d5 filtered out
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# adding filterAfterSeqNr
ProblemFilters.exclude[Problem]("akka.projection.grpc.internal.FilterStage*")
ProblemFilters.exclude[Problem]("akka.projection.grpc.internal.proto.ReplayReq*")
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,16 @@ message FilterReq {

// Replay events for given entities.
message ReplayReq {
// deprecated in 1.5.3, use replay_persistence_ids
repeated PersistenceIdSeqNr persistence_id_offset = 1;
repeated ReplayPersistenceId replay_persistence_ids = 2;
}

message ReplayPersistenceId {
PersistenceIdSeqNr from_persistence_id_offset = 1;
// apply filters for replayed events after this sequence number
int64 filter_after_seq_nr = 2;

}

message FilterCriteria {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
this(streamId, persistenceIdOffsets.asScala.toSet)
}

/**
* Explicit request to replay events for given entities.
*/
final case class ReplayWithFilter(streamId: String, replayPersistenceIds: Set[ReplayPersistenceId])
extends SubscriberCommand {

/** Java API */
def this(streamId: String, persistenceIdOffsets: JSet[ReplayPersistenceId]) =
this(streamId, persistenceIdOffsets.asScala.toSet)
}

sealed trait FilterCriteria
sealed trait RemoveCriteria extends FilterCriteria

Expand Down Expand Up @@ -306,6 +317,8 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {

final case class PersistenceIdOffset(persistenceIdId: String, seqNr: Long)

final case class ReplayPersistenceId(persistenceIdOffset: PersistenceIdOffset, filterAfterSeqNr: Long)

override def createExtension(system: ActorSystem[_]): ConsumerFilter = new ConsumerFilter(system)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ class GrpcReadJournal(delegate: scaladsl.GrpcReadJournal)
delegate.streamId

@InternalApi
private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr)
private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long, triggeredBySeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr, triggeredBySeqNr)

override def eventsBySlices[Event](
entityType: String,
Expand Down
Loading

0 comments on commit aa6f961

Please sign in to comment.