diff --git a/akka-projection-core/src/main/scala/akka/projection/internal/CanTriggerReplay.scala b/akka-projection-core/src/main/scala/akka/projection/internal/CanTriggerReplay.scala index e9c2f1607..33d7baac7 100644 --- a/akka-projection-core/src/main/scala/akka/projection/internal/CanTriggerReplay.scala +++ b/akka-projection-core/src/main/scala/akka/projection/internal/CanTriggerReplay.scala @@ -11,5 +11,5 @@ import akka.annotation.InternalApi */ @InternalApi private[akka] trait CanTriggerReplay { - private[akka] def triggerReplay(entityId: String, fromSeqNr: Long): Unit + private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit } diff --git a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala index 87ef559c3..56553bf28 100644 --- a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala +++ b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/javadsl/EventSourcedProvider.scala @@ -96,8 +96,8 @@ object EventSourcedProvider { new EventsBySlicesSourceProvider[Event](eventsBySlicesQuery, entityType, minSlice, maxSlice, system) with CanTriggerReplay { - private[akka] override def triggerReplay(entityId: String, fromSeqNr: Long): Unit = - query.triggerReplay(entityId, fromSeqNr) + private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit = + query.triggerReplay(persistenceId, fromSeqNr) } case _ => diff --git a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala index afda9b0ff..9d68aa7b0 100644 --- a/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala +++ b/akka-projection-eventsourced/src/main/scala/akka/projection/eventsourced/scaladsl/EventSourcedProvider.scala @@ -82,8 +82,8 @@ object EventSourcedProvider { case query: EventsBySliceQuery with CanTriggerReplay => new EventsBySlicesSourceProvider[Event](eventsBySlicesQuery, entityType, minSlice, maxSlice, system) with CanTriggerReplay { - override private[akka] def triggerReplay(entityId: String, fromSeqNr: Long): Unit = - query.triggerReplay(entityId, fromSeqNr) + override private[akka] def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit = + query.triggerReplay(persistenceId, fromSeqNr) } case _ => new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, minSlice, maxSlice, system) diff --git a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/IntegrationSpec.scala b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/IntegrationSpec.scala index 68e367630..098cee771 100644 --- a/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/IntegrationSpec.scala +++ b/akka-projection-grpc-tests/src/it/scala/akka/projection/grpc/IntegrationSpec.scala @@ -419,7 +419,7 @@ class IntegrationSpec(testContainerConf: TestContainerConf) processedB.envelope.event shouldBe "B" val consumerFilter = ConsumerFilter(system).ref - consumerFilter ! ConsumerFilter.Replay(streamId, Set(ConsumerFilter.EntityIdOffset(pid.entityId, 2L))) + consumerFilter ! ConsumerFilter.Replay(streamId, Set(ConsumerFilter.PersistenceIdOffset(pid.id, 2L))) // FIXME hack sleep to let it propagate to producer side Thread.sleep(3000) diff --git a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/FilterStageSpec.scala b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/FilterStageSpec.scala index a1a4fdec5..b57e524a6 100644 --- a/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/FilterStageSpec.scala +++ b/akka-projection-grpc-tests/src/test/scala/akka/projection/grpc/internal/FilterStageSpec.scala @@ -4,6 +4,12 @@ package akka.projection.grpc.internal +import java.time.Instant +import java.util.concurrent.atomic.AtomicInteger + +import scala.concurrent.Future +import scala.concurrent.Promise + import akka.NotUsed import akka.actor.testkit.typed.scaladsl.LogCapturing import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit @@ -19,6 +25,7 @@ import akka.projection.grpc.internal.proto.ExcludeRegexEntityIds import akka.projection.grpc.internal.proto.FilterCriteria import akka.projection.grpc.internal.proto.FilterReq import akka.projection.grpc.internal.proto.IncludeEntityIds +import akka.projection.grpc.internal.proto.PersistenceIdSeqNr import akka.projection.grpc.internal.proto.ReplayReq import akka.projection.grpc.internal.proto.StreamIn import akka.stream.scaladsl.BidiFlow @@ -31,11 +38,6 @@ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSource import org.scalatest.wordspec.AnyWordSpecLike -import java.time.Instant -import java.util.concurrent.atomic.AtomicInteger -import scala.concurrent.Future -import scala.concurrent.Promise - class FilterStageSpec extends ScalaTestWithActorTestKit(""" akka.loglevel = DEBUG """) with AnyWordSpecLike with LogCapturing { @@ -92,9 +94,9 @@ class FilterStageSpec extends ScalaTestWithActorTestKit(""" .sortBy(_.sequenceNr) .map(_.asInstanceOf[EventEnvelope[Event]]) // simulate initial delay for more realistic testing, and concurrency check - import akka.pattern.{ after => futureAfter } - import scala.concurrent.duration._ + + import akka.pattern.{ after => futureAfter } if (eventsByPersistenceIdConcurrency.incrementAndGet() > FilterStage.ReplayParallelism) throw new IllegalStateException("Unexpected, too many concurrent calls to currentEventsByPersistenceId") Source @@ -221,14 +223,18 @@ class FilterStageSpec extends ScalaTestWithActorTestKit(""" createEnvelope(PersistenceId(entityType, "d"), 2, "d2")) inPublisher.sendNext( - StreamIn(StreamIn.Message.Replay(ReplayReq(List(EntityIdOffset("b", 1L), EntityIdOffset("c", 1L)))))) + StreamIn( + StreamIn.Message.Replay(ReplayReq(List( + PersistenceIdSeqNr(PersistenceId(entityType, "b").id, 1L), + PersistenceIdSeqNr(PersistenceId(entityType, "c").id, 1L)))))) outProbe.request(10) // no guarantee of order between b and c outProbe.expectNextN(2).map(_.event).toSet shouldBe Set("b1", "c1") outProbe.expectNoMessage() - inPublisher.sendNext(StreamIn(StreamIn.Message.Replay(ReplayReq(List(EntityIdOffset("d", 1L)))))) + inPublisher.sendNext( + StreamIn(StreamIn.Message.Replay(ReplayReq(List(PersistenceIdSeqNr(PersistenceId(entityType, "d").id, 1L)))))) // it will not emit replayed event until there is some progress from the ordinary envSource, probably ok outProbe.expectNoMessage() envPublisher.sendNext(createEnvelope(PersistenceId(entityType, "e"), 1, "e1")) @@ -243,11 +249,14 @@ class FilterStageSpec extends ScalaTestWithActorTestKit(""" entityIds.map(id => createEnvelope(PersistenceId(entityType, id), 1, id)) inPublisher.sendNext( - StreamIn(StreamIn.Message.Replay(ReplayReq(entityIds.take(7).map(id => EntityIdOffset(id, 1L)))))) + StreamIn(StreamIn.Message.Replay( + ReplayReq(entityIds.take(7).map(id => PersistenceIdSeqNr(PersistenceId(entityType, id).id, 1L)))))) inPublisher.sendNext( - StreamIn(StreamIn.Message.Replay(ReplayReq(entityIds.slice(7, 10).map(id => EntityIdOffset(id, 1L)))))) + StreamIn(StreamIn.Message.Replay( + ReplayReq(entityIds.slice(7, 10).map(id => PersistenceIdSeqNr(PersistenceId(entityType, id).id, 1L)))))) inPublisher.sendNext( - StreamIn(StreamIn.Message.Replay(ReplayReq(entityIds.drop(10).map(id => EntityIdOffset(id, 1L)))))) + StreamIn(StreamIn.Message.Replay( + ReplayReq(entityIds.drop(10).map(id => PersistenceIdSeqNr(PersistenceId(entityType, id).id, 1L)))))) outProbe.request(100) // no guarantee of order between different entityIds diff --git a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto index e3cc6d19a..03372b0e9 100644 --- a/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto +++ b/akka-projection-grpc/src/main/protobuf/akka/projection/grpc/event_producer.proto @@ -47,7 +47,7 @@ message FilterReq { // Replay events for given entities. message ReplayReq { - repeated EntityIdOffset entity_id_offset = 1; + repeated PersistenceIdSeqNr persistence_id_offset = 1; } message FilterCriteria { diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/ConsumerFilter.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/ConsumerFilter.scala index 3f4cff2d1..15730601b 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/ConsumerFilter.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/ConsumerFilter.scala @@ -17,6 +17,7 @@ import akka.actor.typed.Extension import akka.actor.typed.ExtensionId import akka.actor.typed.Props import akka.annotation.InternalApi +import akka.persistence.typed.ReplicaId import akka.projection.grpc.internal.ConsumerFilterRegistry // FIXME add ApiMayChange in all places @@ -25,6 +26,9 @@ import akka.projection.grpc.internal.ConsumerFilterRegistry * Extension to dynamically control the filters for the `GrpcReadJournal`. */ object ConsumerFilter extends ExtensionId[ConsumerFilter] { + + private val ReplicationIdSeparator = '|' + trait Command sealed trait SubscriberCommand extends Command { def streamId: String @@ -71,11 +75,11 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] { /** * Explicit request to replay events for given entities. */ - final case class Replay(streamId: String, entityOffsets: Set[EntityIdOffset]) extends SubscriberCommand { + final case class Replay(streamId: String, persistenceIdOffsets: Set[PersistenceIdOffset]) extends SubscriberCommand { /** Java API */ - def this(streamId: String, entityOffsets: JSet[EntityIdOffset]) = - this(streamId, entityOffsets.asScala.toSet) + def this(streamId: String, persistenceIdOffsets: JSet[PersistenceIdOffset]) = + this(streamId, persistenceIdOffsets.asScala.toSet) } sealed trait FilterCriteria @@ -103,6 +107,11 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] { this(matching.asScala.toSet) } + object ExcludeEntityIds { + def apply(replicaId: ReplicaId, entityIds: Set[String]): ExcludeEntityIds = + ExcludeEntityIds(entityIds.map(addReplicaIdToEntityId(replicaId, _))) + } + /** * Exclude events for entities with the given entity ids, * unless there is a matching include filter that overrides the exclude. @@ -114,6 +123,11 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] { this(entityIds.asScala.toSet) } + object RemoveExcludeEntityIds { + def apply(replicaId: ReplicaId, entityIds: Set[String]): RemoveExcludeEntityIds = + RemoveExcludeEntityIds(entityIds.map(addReplicaIdToEntityId(replicaId, _))) + } + /** * Remove a previously added [[ExcludeEntityIds]]. */ @@ -124,6 +138,12 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] { this(entityIds.asScala.toSet) } + object IncludeEntityIds { + def apply(replicaId: ReplicaId, entityOffsets: Set[EntityIdOffset]): IncludeEntityIds = + IncludeEntityIds( + entityOffsets.map(offset => EntityIdOffset(addReplicaIdToEntityId(replicaId, offset.entityId), offset.seqNr))) + } + /** * Include events for entities with the given entity ids. A matching include overrides * a matching exclude. @@ -138,6 +158,11 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] { this(entityOffsets.asScala.toSet) } + object RemoveIncludeEntityIds { + def apply(replicaId: ReplicaId, entityIds: Set[String]): RemoveIncludeEntityIds = + RemoveIncludeEntityIds(entityIds.map(addReplicaIdToEntityId(replicaId, _))) + } + /** * Remove a previously added [[IncludeEntityIds]]. */ @@ -148,8 +173,13 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] { this(entityIds.asScala.toSet) } + private def addReplicaIdToEntityId(replicaId: ReplicaId, entityId: String): String = + s"$entityId$ReplicationIdSeparator${replicaId.id}" + final case class EntityIdOffset(entityId: String, seqNr: Long) + final case class PersistenceIdOffset(persistenceIdId: String, seqNr: Long) + override def createExtension(system: ActorSystem[_]): ConsumerFilter = new ConsumerFilter(system) /** diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala index 0df93988e..f4858ec45 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/javadsl/GrpcReadJournal.scala @@ -81,8 +81,8 @@ class GrpcReadJournal(delegate: scaladsl.GrpcReadJournal) delegate.streamId @InternalApi - private[akka] override def triggerReplay(entityId: String, fromSeqNr: Long): Unit = - delegate.triggerReplay(entityId, fromSeqNr) + private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit = + delegate.triggerReplay(persistenceId, fromSeqNr) override def eventsBySlices[Event]( entityType: String, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala index 45539bf56..2bdb7a99e 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/consumer/scaladsl/GrpcReadJournal.scala @@ -179,8 +179,10 @@ final class GrpcReadJournal private ( } @InternalApi - private[akka] override def triggerReplay(entityId: String, fromSeqNr: Long): Unit = { - consumerFilter.ref ! ConsumerFilter.Replay(streamId, Set(ConsumerFilter.EntityIdOffset(entityId, fromSeqNr))) + private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long): Unit = { + consumerFilter.ref ! ConsumerFilter.Replay( + streamId, + Set(ConsumerFilter.PersistenceIdOffset(persistenceId, fromSeqNr))) } private def addRequestHeaders[Req, Res]( @@ -256,6 +258,11 @@ final class GrpcReadJournal private ( case _ => offset }) + def sliceHandledByThisStream(pid: String): Boolean = { + val slice = persistenceExt.sliceForPersistenceId(pid) + minSlice <= slice && slice <= maxSlice + } + val protoOffset = offset match { case o: TimestampOffset => @@ -285,14 +292,21 @@ final class GrpcReadJournal private ( log.debug2("{}: Filter updated [{}]", streamId, criteria.mkString(", ")) StreamIn(StreamIn.Message.Filter(FilterReq(protoCriteria))) - case ConsumerFilter.Replay(`streamId`, entityOffsets) => - if (log.isDebugEnabled()) - log.debug2("{}: Replay triggered for [{}]", streamId, entityOffsets.mkString(", ")) + case ConsumerFilter.Replay(`streamId`, persistenceIdOffsets) => + // FIXME for RES, would it be possible to skip replicaId not handled by this stream here? - val protoEntityOffsets = entityOffsets.map { - case ConsumerFilter.EntityIdOffset(entityId, seqNr) => EntityIdOffset(entityId, seqNr) + val protoPersistenceIdOffsets = persistenceIdOffsets.collect { + case ConsumerFilter.PersistenceIdOffset(pid, seqNr) if sliceHandledByThisStream(pid) => + PersistenceIdSeqNr(pid, seqNr) }.toVector - StreamIn(StreamIn.Message.Replay(ReplayReq(protoEntityOffsets))) + + if (log.isDebugEnabled() && protoPersistenceIdOffsets.nonEmpty) + log.debug2( + "{}: Replay triggered for [{}]", + streamId, + protoPersistenceIdOffsets.map(offset => offset.persistenceId -> offset.seqNr).mkString(", ")) + + StreamIn(StreamIn.Message.Replay(ReplayReq(protoPersistenceIdOffsets))) } .mapMaterializedValue { ref => consumerFilter.ref ! ConsumerFilter.Subscribe(streamId, initCriteria, ref) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala index 8a2c35c45..c5189d62d 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/internal/FilterStage.scala @@ -4,6 +4,11 @@ package akka.projection.grpc.internal +import scala.util.Failure +import scala.util.Success +import scala.util.Try +import scala.util.matching.Regex + import akka.NotUsed import akka.actor.typed.scaladsl.LoggerOps import akka.annotation.InternalApi @@ -12,8 +17,10 @@ import akka.persistence.Persistence import akka.persistence.query.typed.EventEnvelope import akka.persistence.query.typed.scaladsl.CurrentEventsByPersistenceIdTypedQuery import akka.persistence.typed.PersistenceId +import akka.persistence.typed.ReplicationId import akka.projection.grpc.internal.proto.EntityIdOffset import akka.projection.grpc.internal.proto.FilterCriteria +import akka.projection.grpc.internal.proto.PersistenceIdSeqNr import akka.projection.grpc.internal.proto.StreamIn import akka.stream.Attributes import akka.stream.BidiShape @@ -27,16 +34,12 @@ import akka.stream.stage.InHandler import akka.stream.stage.OutHandler import org.slf4j.LoggerFactory -import scala.util.Failure -import scala.util.Success -import scala.util.Try -import scala.util.matching.Regex - /** * INTERNAL API */ @InternalApi private[akka] object FilterStage { val ReplayParallelism = 3 // FIXME config + private val ReplicationIdSeparator = '|' object Filter { val empty: Filter = Filter(Set.empty, Set.empty, Map.empty) @@ -85,7 +88,7 @@ import scala.util.matching.Regex } } - private case class ReplayEnvelope(entityId: String, env: Option[EventEnvelope[Any]]) + private case class ReplayEnvelope(persistenceId: String, env: Option[EventEnvelope[Any]]) private case class ReplaySession(fromSeqNr: Long, queue: SinkQueueWithCancel[EventEnvelope[Any]]) @@ -120,7 +123,7 @@ import scala.util.matching.Regex // only one pull replay stream -> async callback at a time private var replayHasBeenPulled = false - private var pendingReplayRequests: Vector[EntityIdOffset] = Vector.empty + private var pendingReplayRequests: Vector[PersistenceIdSeqNr] = Vector.empty // several replay streams may be in progress at the same time private var replayInProgress: Map[String, ReplaySession] = Map.empty private val replayCallback = getAsyncCallback[Try[ReplayEnvelope]] { @@ -139,14 +142,14 @@ import scala.util.matching.Regex private def onReplay(replayEnv: ReplayEnvelope): Unit = { def replayCompleted(): Unit = { - replayInProgress -= replayEnv.entityId + replayInProgress -= replayEnv.persistenceId pullInEnvOrReplay() } replayHasBeenPulled = false replayEnv match { - case ReplayEnvelope(entityId, Some(env)) => + case ReplayEnvelope(persistenceId, Some(env)) => // the predicate to replay events from start for a given pid // Note: we do not apply the producer filter here as that may be what triggered the replay if (filter.matches(env.persistenceId)) { @@ -162,22 +165,22 @@ import scala.util.matching.Regex logPrefix, env.persistenceId, env.sequenceNr) - val queue = replayInProgress(entityId).queue + val queue = replayInProgress(persistenceId).queue queue.cancel() replayCompleted() } - case ReplayEnvelope(entityId, None) => - log.debug2("Stream [{}]: Completed replay of entityId [{}]", logPrefix, entityId) + case ReplayEnvelope(persistenceId, None) => + log.debug2("Stream [{}]: Completed replay of persistenceId [{}]", logPrefix, persistenceId) replayCompleted() } } - private def tryPullReplay(entityId: String): Unit = { + private def tryPullReplay(pid: String): Unit = { if (!replayHasBeenPulled && isAvailable(outEnv) && !hasBeenPulled(inEnv)) { - log.trace2("Stream [{}]: tryPullReplay entityId [{}}]", logPrefix, entityId) + log.trace2("Stream [{}]: tryPullReplay persistenceId [{}}]", logPrefix, pid) val next = - replayInProgress(entityId).queue.pull().map(ReplayEnvelope(entityId, _))(ExecutionContexts.parasitic) + replayInProgress(pid).queue.pull().map(ReplayEnvelope(pid, _))(ExecutionContexts.parasitic) next.value match { case None => replayHasBeenPulled = true @@ -219,31 +222,49 @@ import scala.util.matching.Regex log.trace2("Stream [{}]: updated filter to [{}}]", logPrefix, filter) } - private def handledByThisStream(pid: PersistenceId): Boolean = { - // note that it's not possible to decide this on the consumer side because there we don't know the - // mapping between streamId and entityType - val slice = persistence.sliceForPersistenceId(pid.id) + private def sliceHandledByThisStream(pid: String): Boolean = { + val slice = persistence.sliceForPersistenceId(pid) sliceRange.contains(slice) } + // Translate the streamId to the entityType and thereby constructing the full persistenceId. + private def mapEntityIdToPid(entityId: String): String = { + if (entityId.indexOf(ReplicationIdSeparator) < 0) + PersistenceId(entityType, entityId).id + else + ReplicationId.fromString(s"$streamId$ReplicationIdSeparator$entityId").persistenceId.id + } + // Translate the streamId to the entityType and thereby constructing the full persistenceId. private def mapEntityIdToPidHandledByThisStream(entityIds: Seq[String]): Seq[String] = - entityIds.map(PersistenceId(entityType, _)).filter(handledByThisStream).map(_.id) + entityIds + .map(mapEntityIdToPid) + .filter(sliceHandledByThisStream) + + // Translate the streamId to the entityType and thereby constructing the full persistenceId. + private def mapEntityIdOffsetToPidHandledByThisStream( + entityIdOffsets: Seq[EntityIdOffset]): Seq[PersistenceIdSeqNr] = + entityIdOffsets.flatMap { offset => + val pid = mapEntityIdToPid(offset.entityId) + if (sliceHandledByThisStream(pid)) Some(PersistenceIdSeqNr(pid, offset.seqNr)) + else None + } private def replayFromFilterCriteria(criteria: Iterable[FilterCriteria]): Unit = { criteria.foreach { _.message match { - case FilterCriteria.Message.IncludeEntityIds(include) => replayAll(include.entityIdOffset) - case _ => + case FilterCriteria.Message.IncludeEntityIds(include) => + replayAll(mapEntityIdOffsetToPidHandledByThisStream(include.entityIdOffset)) + case _ => } } } - private def replayAll(entityOffsets: Iterable[EntityIdOffset]): Unit = { + private def replayAll(persistenceIdOffsets: Iterable[PersistenceIdSeqNr]): Unit = { // FIXME limit number of concurrent replay requests, place additional in a pending queue - entityOffsets.foreach { entityOffset => - if (entityOffset.seqNr >= 1) - replay(entityOffset) + persistenceIdOffsets.foreach { offset => + if (offset.seqNr >= 1) + replay(offset) // FIXME seqNr 0 would be to support a mode where we only deliver events after the include filter // change. In that case we must have a way to signal to the R2dbcOffsetStore that the // first seqNr of that new pid is ok to pass through even though it isn't 1 @@ -251,20 +272,20 @@ import scala.util.matching.Regex } } - private def replay(entityOffset: EntityIdOffset): Unit = { - val entityId = entityOffset.entityId - val fromSeqNr = entityOffset.seqNr - val pid = PersistenceId(entityType, entityId) - if (handledByThisStream(pid)) { + private def replay(persistenceIdOffset: PersistenceIdSeqNr): Unit = { + val fromSeqNr = persistenceIdOffset.seqNr + // FIXME what about the replicaId when using RES? + val pid = persistenceIdOffset.persistenceId + if (sliceHandledByThisStream(pid)) { val sameInProgress = - replayInProgress.get(entityId) match { + replayInProgress.get(pid) match { case Some(replay) if replay.fromSeqNr == fromSeqNr => // no point in cancel and starting new from same seqNr true case Some(replay) => - log.debug2("Stream [{}]: Cancel replay of entityId [{}], replaced by new replay", logPrefix, entityId) + log.debug2("Stream [{}]: Cancel replay of persistenceId [{}], replaced by new replay", logPrefix, pid) replay.queue.cancel() - replayInProgress -= entityId + replayInProgress -= pid false case None => false @@ -272,21 +293,21 @@ import scala.util.matching.Regex if (sameInProgress) { log.debugN( - "Stream [{}]: Replay of entityId [{}] already in progress from seqNr [{}], replaced by new replay", + "Stream [{}]: Replay of persistenceId [{}] already in progress from seqNr [{}], replaced by new replay", logPrefix, - entityId, + pid, fromSeqNr) } else if (replayInProgress.size < ReplayParallelism) { - log.debugN("Stream [{}]: Starting replay of entityId [{}], from seqNr [{}]", logPrefix, entityId, fromSeqNr) + log.debugN("Stream [{}]: Starting replay of persistenceId [{}], from seqNr [{}]", logPrefix, pid, fromSeqNr) val queue = currentEventsByPersistenceIdQuery - .currentEventsByPersistenceIdTyped[Any](pid.id, fromSeqNr, Long.MaxValue) + .currentEventsByPersistenceIdTyped[Any](pid, fromSeqNr, Long.MaxValue) .runWith(Sink.queue())(materializer) - replayInProgress = replayInProgress.updated(entityId, ReplaySession(fromSeqNr, queue)) - tryPullReplay(entityId) + replayInProgress = replayInProgress.updated(pid, ReplaySession(fromSeqNr, queue)) + tryPullReplay(pid) } else { - log.debugN("Stream [{}]: Queueing replay of entityId [{}], from seqNr [{}]", logPrefix, entityId, fromSeqNr) - pendingReplayRequests = pendingReplayRequests.filterNot(_.entityId == entityId) :+ entityOffset + log.debugN("Stream [{}]: Queueing replay of persistenceId [{}], from seqNr [{}]", logPrefix, pid, fromSeqNr) + pendingReplayRequests = pendingReplayRequests.filterNot(_.persistenceId == pid) :+ persistenceIdOffset } } } @@ -317,8 +338,11 @@ import scala.util.matching.Regex replayFromFilterCriteria(filterReq.criteria) case StreamIn(StreamIn.Message.Replay(replayReq), _) => - log.debug2("Stream [{}]: Replay requested for [{}]", logPrefix, replayReq.entityIdOffset) - replayAll(replayReq.entityIdOffset) + if (replayReq.persistenceIdOffset.nonEmpty) { + log.debug2("Stream [{}]: Replay requested for [{}]", logPrefix, replayReq.persistenceIdOffset) + // FIXME for RES, can we check if the replicaId is handled by this stream, to avoid unnecessary replay queries + replayAll(replayReq.persistenceIdOffset) + } case StreamIn(StreamIn.Message.Init(_), _) => log.warn("Stream [{}]: Init request can only be used as the first message", logPrefix) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala index 4a0092cbe..d137ed12d 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/internal/ReplicationImpl.scala @@ -76,6 +76,7 @@ private[akka] object ReplicationImpl { */ def grpcReplication[Command, Event, State]( settings: ReplicationSettings[Command], + producerFilter: EventEnvelope[Event] => Boolean, replicatedEntity: ReplicatedEntity[Command])(implicit system: ActorSystem[_]): ReplicationImpl[Command] = { require( system.classicSystem.asInstanceOf[ExtendedActorSystem].provider.isInstanceOf[ClusterActorRefProvider], @@ -95,7 +96,8 @@ private[akka] object ReplicationImpl { settings.entityTypeKey.name, settings.streamId, onlyLocalOriginTransformer, - settings.eventProducerSettings) + settings.eventProducerSettings, + producerFilter.asInstanceOf[EventEnvelope[Any] => Boolean]) val sharding = ClusterSharding(system) sharding.init(replicatedEntity.entity) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala index 2177a2721..465a2942d 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/javadsl/Replication.scala @@ -23,8 +23,10 @@ import akka.persistence.typed.scaladsl.ReplicatedEventSourcing import akka.projection.grpc.producer.javadsl.EventProducer import akka.projection.grpc.producer.javadsl.EventProducerSource import akka.projection.grpc.replication.internal.ReplicationImpl - import java.util.concurrent.CompletionStage +import java.util.function.Predicate + +import akka.persistence.query.typed.EventEnvelope /** * Created using [[Replication.grpcReplication]], which starts sharding with the entity and @@ -76,6 +78,22 @@ object Replication { settings: ReplicationSettings[Command], replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], system: ActorSystem[_]): Replication[Command] = { + val trueProducerFilter = new Predicate[EventEnvelope[Event]] { + override def test(env: EventEnvelope[Event]): Boolean = true + } + grpcReplication[Command, Event, State](settings, trueProducerFilter, replicatedBehaviorFactory, system) + } + + /** + * Called to bootstrap the entity on each cluster node in each of the replicas. + * + * Important: Note that this does not publish the endpoint, additional steps are needed! + */ + def grpcReplication[Command, Event, State]( + settings: ReplicationSettings[Command], + producerFilter: Predicate[EventEnvelope[Event]], + replicatedBehaviorFactory: JFunction[ReplicatedBehaviors[Command, Event, State], Behavior[Command]], + system: ActorSystem[_]): Replication[Command] = { val scalaReplicationSettings = settings.toScala @@ -102,8 +120,13 @@ object Replication { })) .toScala) + val scalaProducerFilter: EventEnvelope[Event] => Boolean = producerFilter.test + val scalaRESOG = - ReplicationImpl.grpcReplication[Command, Event, State](scalaReplicationSettings, replicatedEntity)(system) + ReplicationImpl.grpcReplication[Command, Event, State]( + scalaReplicationSettings, + scalaProducerFilter, + replicatedEntity)(system) val jEventProducerSource = new EventProducerSource( scalaRESOG.eventProducerService.entityType, scalaRESOG.eventProducerService.streamId, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala index 71f50533c..221c86678 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/replication/scaladsl/Replication.scala @@ -18,9 +18,10 @@ import akka.persistence.typed.ReplicationId import akka.persistence.typed.scaladsl.ReplicatedEventSourcing import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource import akka.projection.grpc.replication.internal.ReplicationImpl - import scala.concurrent.Future +import akka.persistence.query.typed.EventEnvelope + /** * Created using [[Replication.grpcReplication]], which starts sharding with the entity and * replication stream consumers but not the replication endpoint needed to publish events to other replication places. @@ -68,6 +69,18 @@ object Replication { * Important: Note that this does not publish the endpoint, additional steps are needed! */ def grpcReplication[Command, Event, State](settings: ReplicationSettings[Command])( + replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])( + implicit system: ActorSystem[_]): Replication[Command] = + grpcReplication[Command, Event, State](settings, (_: EventEnvelope[Event]) => true)(replicatedBehaviorFactory) + + /** + * Called to bootstrap the entity on each cluster node in each of the replicas. + * + * Important: Note that this does not publish the endpoint, additional steps are needed! + */ + def grpcReplication[Command, Event, State]( + settings: ReplicationSettings[Command], + producerFilter: EventEnvelope[Event] => Boolean)( replicatedBehaviorFactory: ReplicatedBehaviors[Command, Event, State] => Behavior[Command])( implicit system: ActorSystem[_]): Replication[Command] = { @@ -84,7 +97,7 @@ object Replication { } })) - ReplicationImpl.grpcReplication[Command, Event, State](settings, replicatedEntity) + ReplicationImpl.grpcReplication[Command, Event, State](settings, producerFilter, replicatedEntity) } } diff --git a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala index 420ba79e6..d8eda5d89 100644 --- a/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala +++ b/akka-projection-r2dbc/src/main/scala/akka/projection/r2dbc/internal/R2dbcProjectionImpl.scala @@ -27,7 +27,6 @@ import akka.persistence.query.typed.scaladsl.LoadEventQuery import akka.persistence.r2dbc.internal.R2dbcExecutor import akka.persistence.state.scaladsl.DurableStateStore import akka.persistence.state.scaladsl.GetObjectResult -import akka.persistence.typed.PersistenceId import akka.projection.BySlicesSourceProvider import akka.projection.HandlerRecoveryStrategy import akka.projection.HandlerRecoveryStrategy.Internal.RetryAndSkip @@ -458,9 +457,8 @@ private[projection] object R2dbcProjectionImpl { case env: EventEnvelope[Any @unchecked] if env.sequenceNr > 1 => sourceProvider match { case provider: CanTriggerReplay => - val entityId = PersistenceId.extractEntityId(env.persistenceId) val fromSeqNr = offsetStore.storedSeqNr(env.persistenceId) + 1 - provider.triggerReplay(entityId, fromSeqNr) + provider.triggerReplay(env.persistenceId, fromSeqNr) true case _ => false // no replay support for other source providers diff --git a/samples/grpc/shopping-analytics-service-scala/README.md b/samples/grpc/shopping-analytics-service-scala/README.md index 0edb5e117..5d1f551a5 100644 --- a/samples/grpc/shopping-analytics-service-scala/README.md +++ b/samples/grpc/shopping-analytics-service-scala/README.md @@ -18,4 +18,6 @@ 3. Start `shopping-cart-service` and add item to cart -4. Notice the log output in the terminal of the `shopping-analytics-service` \ No newline at end of file +4. Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter + +5. Notice the log output in the terminal of the `shopping-analytics-service` diff --git a/samples/grpc/shopping-cart-service-scala/README.md b/samples/grpc/shopping-cart-service-scala/README.md index 52305f0ea..ed4c7a450 100644 --- a/samples/grpc/shopping-cart-service-scala/README.md +++ b/samples/grpc/shopping-cart-service-scala/README.md @@ -1,6 +1,6 @@ ## Running the sample code -1. Start a local PostgresSQL server on default port 5432 and a Kafka broker on port 9092. The included `docker-compose.yml` starts everything required for running locally. +1. Start a local PostgresSQL server on default port 5432. The included `docker-compose.yml` starts everything required for running locally. ```shell docker-compose up -d diff --git a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala index a90c06802..f63ba45dc 100644 --- a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala +++ b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala @@ -205,7 +205,6 @@ object ShoppingCart { eventHandler = (state, event) => handleEvent(state, event)) .withTaggerForState { case (state, _) => state.tags - } .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100)) .onPersistFailure( diff --git a/samples/replicated/shopping-cart-service-scala/README.md b/samples/replicated/shopping-cart-service-scala/README.md index ac4ef7cf7..045ec18a7 100644 --- a/samples/replicated/shopping-cart-service-scala/README.md +++ b/samples/replicated/shopping-cart-service-scala/README.md @@ -51,7 +51,7 @@ curl http://localhost:9201/ready ``` -6. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl): +6. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl). Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter. ```shell # add item to cart on the first replica @@ -68,6 +68,12 @@ # check out cart grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout + + # exclude cart from first replica + grpcurl -d '{"cartId":"cart1", "exclude":true}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.ExcludeCart + + # get cart from second replica + grpcurl -d '{"cartId":"cart1", "exclude":true}' -plaintext 127.0.0.1:8201 shoppingcart.ShoppingCartService.ExcludeCart ``` or same `grpcurl` commands to port 8102/8202 to reach node 2. diff --git a/samples/replicated/shopping-cart-service-scala/build.sbt b/samples/replicated/shopping-cart-service-scala/build.sbt index 0855447b3..9bde300bf 100644 --- a/samples/replicated/shopping-cart-service-scala/build.sbt +++ b/samples/replicated/shopping-cart-service-scala/build.sbt @@ -27,12 +27,15 @@ run / javaOptions ++= sys.props .fold(Seq.empty[String])(res => Seq(s"-Dconfig.resource=$res")) Global / cancelable := false // ctrl-c -val AkkaVersion = sys.props.getOrElse("akka.version", "2.8.0") +// FIXME for r2dbc snapshot, remove when release is out +ThisBuild / resolvers ++= Resolver.sonatypeOssRepos("snapshots") + +val AkkaVersion = sys.props.getOrElse("akka.version", "2.8.1-M1") val AkkaHttpVersion = "10.5.0" -val AkkaManagementVersion = "1.2.0" -val AkkaPersistenceR2dbcVersion = "1.1.0-M5" +val AkkaManagementVersion = "1.3.0" +val AkkaPersistenceR2dbcVersion = "1.1.0-M7+2-e241ee57-SNAPSHOT" val AkkaProjectionVersion = - sys.props.getOrElse("akka-projection.version", "1.4.0-M3") + sys.props.getOrElse("akka-projection.version", "1.4.0-M3-124-96b9d6c2-SNAPSHOT") // FIXME val AkkaDiagnosticsVersion = "2.0.0" enablePlugins(AkkaGrpcPlugin) @@ -71,7 +74,7 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-persistence-testkit" % AkkaVersion % Test, // 4. Querying and publishing data from Akka Persistence "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion, - "com.lightbend.akka" %% "akka-projection-r2dbc" % AkkaPersistenceR2dbcVersion, + "com.lightbend.akka" %% "akka-projection-r2dbc" % AkkaProjectionVersion, "com.lightbend.akka" %% "akka-projection-grpc" % AkkaProjectionVersion, "com.lightbend.akka" %% "akka-projection-eventsourced" % AkkaProjectionVersion, "com.lightbend.akka" %% "akka-projection-testkit" % AkkaProjectionVersion % Test) diff --git a/samples/replicated/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto b/samples/replicated/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto index 65eed87bd..ee3759269 100644 --- a/samples/replicated/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto +++ b/samples/replicated/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto @@ -12,6 +12,7 @@ service ShoppingCartService { rpc RemoveItem (RemoveItemRequest) returns (Cart) {} rpc Checkout (CheckoutRequest) returns (Cart) {} rpc GetCart (GetCartRequest) returns (Cart) {} + rpc ExcludeCart (ExcludeCartRequest) returns (Empty) {} } message AddItemRequest { @@ -43,3 +44,10 @@ message Item { string itemId = 1; int32 quantity = 2; } + +message ExcludeCartRequest { + string cartId = 1; + bool exclude = 2; +} + +message Empty {} diff --git a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala index 7461a1c5f..1ff8a08af 100644 --- a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala +++ b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala @@ -1,7 +1,9 @@ package shopping.cart import java.time.Instant + import scala.concurrent.duration._ + import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior @@ -9,6 +11,7 @@ import akka.actor.typed.SupervisorStrategy import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors import akka.pattern.StatusReply +import akka.persistence.query.typed.EventEnvelope import akka.persistence.typed.ReplicaId import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior @@ -64,6 +67,17 @@ object ShoppingCart { } Summary(cartItems, isClosed) } + + def totalQuantity: Int = + items.valuesIterator.sum + + def tags: Set[String] = { + val total = totalQuantity + if (total == 0) Set.empty + else if (total >= 100) Set(LargeQuantityTag) + else if (total >= 10) Set(MediumQuantityTag) + else Set(SmallQuantityTag) + } } object State { @@ -124,12 +138,20 @@ object ShoppingCart { final case class CheckedOut(eventTime: Instant) extends Event + val SmallQuantityTag = "small" + val MediumQuantityTag = "medium" + val LargeQuantityTag = "large" + + val EntityType = "replicated-shopping-cart" + // #init def init(implicit system: ActorSystem[_]): Replication[Command] = { - val replicationSettings = ReplicationSettings[Command]( - "replicated-shopping-cart", - R2dbcReplication()) - Replication.grpcReplication(replicationSettings)(ShoppingCart.apply) + val replicationSettings = ReplicationSettings[Command](EntityType, R2dbcReplication()) + val producerFilter: EventEnvelope[Event] => Boolean = { envelope => + val tags = envelope.tags + tags.contains(ShoppingCart.MediumQuantityTag) || tags.contains(ShoppingCart.LargeQuantityTag) + } + Replication.grpcReplication(replicationSettings, producerFilter)(ShoppingCart.apply) } def apply(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]): Behavior[Command] = { @@ -159,6 +181,9 @@ class ShoppingCart(context: ActorContext[ShoppingCart.Command], replicationConte emptyState = State.empty, commandHandler = handleCommand, eventHandler = handleEvent) + .withTaggerForState { case (state, _) => + state.tags + } .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100)) .onPersistFailure(SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1)) } diff --git a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala index 9ad40ae39..58a96b489 100644 --- a/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala +++ b/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala @@ -1,14 +1,20 @@ package shopping.cart import java.util.concurrent.TimeoutException + import scala.concurrent.Future + import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.grpc.GrpcServiceException +import akka.persistence.typed.ReplicaId +import akka.projection.grpc.consumer.ConsumerFilter import akka.util.Timeout import io.grpc.Status import org.slf4j.LoggerFactory +import shopping.cart.proto.Empty +import shopping.cart.proto.ExcludeCartRequest class ShoppingCartServiceImpl(system: ActorSystem[_], entityKey: EntityTypeKey[ShoppingCart.Command]) extends proto.ShoppingCartService { @@ -62,6 +68,20 @@ class ShoppingCartServiceImpl(system: ActorSystem[_], entityKey: EntityTypeKey[S convertError(response) } + override def excludeCart(in: ExcludeCartRequest): Future[Empty] = { + val config = system.settings.config.getConfig(ShoppingCart.EntityType) + val selfReplicaId = config.getString("self-replica-id") + val otherReplicaId = if (selfReplicaId == "replica1") "replica2" else "replica1" + + val filterCriteria = + if (in.exclude) + ConsumerFilter.ExcludeEntityIds(ReplicaId(otherReplicaId), Set(in.cartId)) + else + ConsumerFilter.RemoveExcludeEntityIds(ReplicaId(otherReplicaId), Set(in.cartId)) + ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(ShoppingCart.EntityType, Vector(filterCriteria)) + Future.successful(Empty.defaultInstance) + } + private def toProtoCart(cart: ShoppingCart.Summary): proto.Cart = { proto.Cart( cart.items.iterator.map { case (itemId, quantity) =>