Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: producer and consumer filters for Replicated Event Sourcing #840

Merged
merged 3 commits into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ import akka.annotation.InternalApi
*/
@InternalApi
private[akka] trait CanTriggerReplay {
private[akka] def triggerReplay(entityId: String, fromSeqNr: Long): Unit
private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ object EventSourcedProvider {
new EventsBySlicesSourceProvider[Event](eventsBySlicesQuery, entityType, minSlice, maxSlice, system)
with CanTriggerReplay {

private[akka] override def triggerReplay(entityId: String, fromSeqNr: Long): Unit =
query.triggerReplay(entityId, fromSeqNr)
private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
query.triggerReplay(persistenceId, fromSeqNr)

}
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ object EventSourcedProvider {
case query: EventsBySliceQuery with CanTriggerReplay =>
new EventsBySlicesSourceProvider[Event](eventsBySlicesQuery, entityType, minSlice, maxSlice, system)
with CanTriggerReplay {
override private[akka] def triggerReplay(entityId: String, fromSeqNr: Long): Unit =
query.triggerReplay(entityId, fromSeqNr)
override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
query.triggerReplay(persistenceId, fromSeqNr)
}
case _ =>
new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, minSlice, maxSlice, system)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
processedB.envelope.event shouldBe "B"

val consumerFilter = ConsumerFilter(system).ref
consumerFilter ! ConsumerFilter.Replay(streamId, Set(ConsumerFilter.EntityIdOffset(pid.entityId, 2L)))
consumerFilter ! ConsumerFilter.Replay(streamId, Set(ConsumerFilter.PersistenceIdOffset(pid.id, 2L)))
// FIXME hack sleep to let it propagate to producer side
Thread.sleep(3000)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@

package akka.projection.grpc.internal

import java.time.Instant
import java.util.concurrent.atomic.AtomicInteger

import scala.concurrent.Future
import scala.concurrent.Promise

import akka.NotUsed
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
Expand All @@ -19,6 +25,7 @@ import akka.projection.grpc.internal.proto.ExcludeRegexEntityIds
import akka.projection.grpc.internal.proto.FilterCriteria
import akka.projection.grpc.internal.proto.FilterReq
import akka.projection.grpc.internal.proto.IncludeEntityIds
import akka.projection.grpc.internal.proto.PersistenceIdSeqNr
import akka.projection.grpc.internal.proto.ReplayReq
import akka.projection.grpc.internal.proto.StreamIn
import akka.stream.scaladsl.BidiFlow
Expand All @@ -31,11 +38,6 @@ import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import org.scalatest.wordspec.AnyWordSpecLike

import java.time.Instant
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.Future
import scala.concurrent.Promise

class FilterStageSpec extends ScalaTestWithActorTestKit("""
akka.loglevel = DEBUG
""") with AnyWordSpecLike with LogCapturing {
Expand Down Expand Up @@ -92,9 +94,9 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
.sortBy(_.sequenceNr)
.map(_.asInstanceOf[EventEnvelope[Event]])
// simulate initial delay for more realistic testing, and concurrency check
import akka.pattern.{ after => futureAfter }

import scala.concurrent.duration._

import akka.pattern.{ after => futureAfter }
if (eventsByPersistenceIdConcurrency.incrementAndGet() > FilterStage.ReplayParallelism)
throw new IllegalStateException("Unexpected, too many concurrent calls to currentEventsByPersistenceId")
Source
Expand Down Expand Up @@ -221,14 +223,18 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
createEnvelope(PersistenceId(entityType, "d"), 2, "d2"))

inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(ReplayReq(List(EntityIdOffset("b", 1L), EntityIdOffset("c", 1L))))))
StreamIn(
StreamIn.Message.Replay(ReplayReq(List(
PersistenceIdSeqNr(PersistenceId(entityType, "b").id, 1L),
PersistenceIdSeqNr(PersistenceId(entityType, "c").id, 1L))))))

outProbe.request(10)
// no guarantee of order between b and c
outProbe.expectNextN(2).map(_.event).toSet shouldBe Set("b1", "c1")
outProbe.expectNoMessage()

inPublisher.sendNext(StreamIn(StreamIn.Message.Replay(ReplayReq(List(EntityIdOffset("d", 1L))))))
inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(ReplayReq(List(PersistenceIdSeqNr(PersistenceId(entityType, "d").id, 1L))))))
// it will not emit replayed event until there is some progress from the ordinary envSource, probably ok
outProbe.expectNoMessage()
envPublisher.sendNext(createEnvelope(PersistenceId(entityType, "e"), 1, "e1"))
Expand All @@ -243,11 +249,14 @@ class FilterStageSpec extends ScalaTestWithActorTestKit("""
entityIds.map(id => createEnvelope(PersistenceId(entityType, id), 1, id))

inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(ReplayReq(entityIds.take(7).map(id => EntityIdOffset(id, 1L))))))
StreamIn(StreamIn.Message.Replay(
ReplayReq(entityIds.take(7).map(id => PersistenceIdSeqNr(PersistenceId(entityType, id).id, 1L))))))
inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(ReplayReq(entityIds.slice(7, 10).map(id => EntityIdOffset(id, 1L))))))
StreamIn(StreamIn.Message.Replay(
ReplayReq(entityIds.slice(7, 10).map(id => PersistenceIdSeqNr(PersistenceId(entityType, id).id, 1L))))))
inPublisher.sendNext(
StreamIn(StreamIn.Message.Replay(ReplayReq(entityIds.drop(10).map(id => EntityIdOffset(id, 1L))))))
StreamIn(StreamIn.Message.Replay(
ReplayReq(entityIds.drop(10).map(id => PersistenceIdSeqNr(PersistenceId(entityType, id).id, 1L))))))

outProbe.request(100)
// no guarantee of order between different entityIds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ message FilterReq {

// Replay events for given entities.
message ReplayReq {
repeated EntityIdOffset entity_id_offset = 1;
repeated PersistenceIdSeqNr persistence_id_offset = 1;
}

message FilterCriteria {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.actor.typed.Props
import akka.annotation.InternalApi
import akka.persistence.typed.ReplicaId
import akka.projection.grpc.internal.ConsumerFilterRegistry

// FIXME add ApiMayChange in all places
Expand All @@ -25,6 +26,9 @@ import akka.projection.grpc.internal.ConsumerFilterRegistry
* Extension to dynamically control the filters for the `GrpcReadJournal`.
*/
object ConsumerFilter extends ExtensionId[ConsumerFilter] {

private val ReplicationIdSeparator = '|'

trait Command
sealed trait SubscriberCommand extends Command {
def streamId: String
Expand Down Expand Up @@ -71,11 +75,11 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
/**
* Explicit request to replay events for given entities.
*/
final case class Replay(streamId: String, entityOffsets: Set[EntityIdOffset]) extends SubscriberCommand {
final case class Replay(streamId: String, persistenceIdOffsets: Set[PersistenceIdOffset]) extends SubscriberCommand {

/** Java API */
def this(streamId: String, entityOffsets: JSet[EntityIdOffset]) =
this(streamId, entityOffsets.asScala.toSet)
def this(streamId: String, persistenceIdOffsets: JSet[PersistenceIdOffset]) =
this(streamId, persistenceIdOffsets.asScala.toSet)
}

sealed trait FilterCriteria
Expand Down Expand Up @@ -103,6 +107,11 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
this(matching.asScala.toSet)
}

object ExcludeEntityIds {
def apply(replicaId: ReplicaId, entityIds: Set[String]): ExcludeEntityIds =
ExcludeEntityIds(entityIds.map(addReplicaIdToEntityId(replicaId, _)))
}

/**
* Exclude events for entities with the given entity ids,
* unless there is a matching include filter that overrides the exclude.
Expand All @@ -114,6 +123,11 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
this(entityIds.asScala.toSet)
}

object RemoveExcludeEntityIds {
def apply(replicaId: ReplicaId, entityIds: Set[String]): RemoveExcludeEntityIds =
RemoveExcludeEntityIds(entityIds.map(addReplicaIdToEntityId(replicaId, _)))
}

/**
* Remove a previously added [[ExcludeEntityIds]].
*/
Expand All @@ -124,6 +138,12 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
this(entityIds.asScala.toSet)
}

object IncludeEntityIds {
def apply(replicaId: ReplicaId, entityOffsets: Set[EntityIdOffset]): IncludeEntityIds =
IncludeEntityIds(
entityOffsets.map(offset => EntityIdOffset(addReplicaIdToEntityId(replicaId, offset.entityId), offset.seqNr)))
}

/**
* Include events for entities with the given entity ids. A matching include overrides
* a matching exclude.
Expand All @@ -138,6 +158,11 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
this(entityOffsets.asScala.toSet)
}

object RemoveIncludeEntityIds {
def apply(replicaId: ReplicaId, entityIds: Set[String]): RemoveIncludeEntityIds =
RemoveIncludeEntityIds(entityIds.map(addReplicaIdToEntityId(replicaId, _)))
}

/**
* Remove a previously added [[IncludeEntityIds]].
*/
Expand All @@ -148,8 +173,13 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
this(entityIds.asScala.toSet)
}

private def addReplicaIdToEntityId(replicaId: ReplicaId, entityId: String): String =
s"$entityId$ReplicationIdSeparator${replicaId.id}"

final case class EntityIdOffset(entityId: String, seqNr: Long)

final case class PersistenceIdOffset(persistenceIdId: String, seqNr: Long)

override def createExtension(system: ActorSystem[_]): ConsumerFilter = new ConsumerFilter(system)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ class GrpcReadJournal(delegate: scaladsl.GrpcReadJournal)
delegate.streamId

@InternalApi
private[akka] override def triggerReplay(entityId: String, fromSeqNr: Long): Unit =
delegate.triggerReplay(entityId, fromSeqNr)
private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr)

override def eventsBySlices[Event](
entityType: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,10 @@ final class GrpcReadJournal private (
}

@InternalApi
private[akka] override def triggerReplay(entityId: String, fromSeqNr: Long): Unit = {
consumerFilter.ref ! ConsumerFilter.Replay(streamId, Set(ConsumerFilter.EntityIdOffset(entityId, fromSeqNr)))
private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit = {
consumerFilter.ref ! ConsumerFilter.Replay(
streamId,
Set(ConsumerFilter.PersistenceIdOffset(persistenceId, fromSeqNr)))
}

private def addRequestHeaders[Req, Res](
Expand Down Expand Up @@ -256,6 +258,11 @@ final class GrpcReadJournal private (
case _ => offset
})

def sliceHandledByThisStream(pid: String): Boolean = {
val slice = persistenceExt.sliceForPersistenceId(pid)
minSlice <= slice && slice <= maxSlice
}

val protoOffset =
offset match {
case o: TimestampOffset =>
Expand Down Expand Up @@ -285,14 +292,21 @@ final class GrpcReadJournal private (
log.debug2("{}: Filter updated [{}]", streamId, criteria.mkString(", "))
StreamIn(StreamIn.Message.Filter(FilterReq(protoCriteria)))

case ConsumerFilter.Replay(`streamId`, entityOffsets) =>
if (log.isDebugEnabled())
log.debug2("{}: Replay triggered for [{}]", streamId, entityOffsets.mkString(", "))
case ConsumerFilter.Replay(`streamId`, persistenceIdOffsets) =>
// FIXME for RES, would it be possible to skip replicaId not handled by this stream here?

val protoEntityOffsets = entityOffsets.map {
case ConsumerFilter.EntityIdOffset(entityId, seqNr) => EntityIdOffset(entityId, seqNr)
val protoPersistenceIdOffsets = persistenceIdOffsets.collect {
case ConsumerFilter.PersistenceIdOffset(pid, seqNr) if sliceHandledByThisStream(pid) =>
PersistenceIdSeqNr(pid, seqNr)
}.toVector
StreamIn(StreamIn.Message.Replay(ReplayReq(protoEntityOffsets)))

if (log.isDebugEnabled() && protoPersistenceIdOffsets.nonEmpty)
log.debug2(
"{}: Replay triggered for [{}]",
streamId,
protoPersistenceIdOffsets.map(offset => offset.persistenceId -> offset.seqNr).mkString(", "))

StreamIn(StreamIn.Message.Replay(ReplayReq(protoPersistenceIdOffsets)))
}
.mapMaterializedValue { ref =>
consumerFilter.ref ! ConsumerFilter.Subscribe(streamId, initCriteria, ref)
Expand Down
Loading