Skip to content

Commit

Permalink
fix: Trigger replay only for a specific projection instance (#1201)
Browse files Browse the repository at this point in the history
* when using same streamId from several projections a replay
  request from one will send the replay request to all eventsBySlices
  queries with that same streamId
* this adds a correlation id between the triggerReplay and the eventsBySlices query
  via the GrpcReadJournal instance
* projection instances with different slice ranges of the same projection is
  not a problem, slice ranges are checked separately
* un-case class Replay and ReplayWithFilter
* correlationId in javadsl
  • Loading branch information
patriknw authored Sep 30, 2024
1 parent 0b7faea commit 0841b3c
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import scala.concurrent.Future
import scala.concurrent.duration._

import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.internal.CanTriggerReplay

object IntegrationSpec {

Expand Down Expand Up @@ -109,7 +110,7 @@ class IntegrationSpec(testContainerConf: TestContainerConf)

override def typedSystem: ActorSystem[_] = system
private implicit val ec: ExecutionContext = system.executionContext
private val numberOfTests = 6
private val numberOfTests = 7

// needs to be unique per test case and known up front for setting up the producer
case class TestSource(entityType: String, streamId: String, pid: PersistenceId)
Expand Down Expand Up @@ -450,4 +451,67 @@ class IntegrationSpec(testContainerConf: TestContainerConf)
}
}

"trigger replay for a specific projection instance" in new TestFixture {
entity ! TestEntity.Persist("a")
entity ! TestEntity.Persist("b")
entity ! TestEntity.Ping(replyProbe.ref)
replyProbe.receiveMessage()

// start the projection
val projection1 = spawnAtLeastOnceProjection()

// start another projection with the same streamId
val projectionId2 = randomProjectionId()
val processedProbe2 = createTestProbe[Processed]()
val sourceProvider2 = EventSourcedProvider.eventsBySlices[String](
system,
GrpcReadJournal(
GrpcQuerySettings(streamId) // same streamId
.withAdditionalRequestMetadata(new MetadataBuilder().addText("x-secret", "top_secret").build()),
GrpcClientSettings
.connectToServiceAt("127.0.0.1", grpcPort)
.withTls(false),
protobufDescriptors = Nil),
streamId, // same streamId
sliceRange.min,
sliceRange.max)
val projection2 = spawn(
ProjectionBehavior(
R2dbcProjection.atLeastOnceAsync(
projectionId2,
settings = None,
sourceProvider = sourceProvider2,
handler = () => new TestHandler(projectionId2, processedProbe2.ref))))

val processedA = processedProbe.receiveMessage()
processedA.envelope.persistenceId shouldBe pid.id
processedA.envelope.sequenceNr shouldBe 1L
processedA.envelope.event shouldBe "A"
processedProbe2.receiveMessage().envelope.event shouldBe "A"

val processedB = processedProbe.receiveMessage()
processedB.envelope.persistenceId shouldBe pid.id
processedB.envelope.sequenceNr shouldBe 2L
processedB.envelope.event shouldBe "B"
processedProbe2.receiveMessage().envelope.event shouldBe "B"

// look for log message to ensure that replay is triggered only once
LoggingTestKit.debug(s"Stream [$streamId (0-1023)]: Replay requested").withCheckExcess(true).expect {
sourceProvider2
.asInstanceOf[CanTriggerReplay]
.triggerReplay(pid.id, fromSeqNr = 2L, triggeredBySeqNr = 3L)
}

// no duplicates
processedProbe.expectNoMessage()
processedProbe2.expectNoMessage()

projection1 ! ProjectionBehavior.Stop
projection2 ! ProjectionBehavior.Stop
entity ! TestEntity.Stop(replyProbe.ref)
processedProbe.expectTerminated(projection1)
processedProbe.expectTerminated(projection2)
processedProbe.expectTerminated(entity)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

package akka.projection.grpc.consumer

import java.util.UUID
import java.util.{ List => JList }
import java.util.{ Set => JSet }

import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.runtime.AbstractFunction2

import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
Expand All @@ -20,6 +22,7 @@ import akka.annotation.InternalApi
import akka.persistence.typed.ReplicaId
import akka.projection.grpc.internal.ConsumerFilterRegistry
import akka.projection.grpc.internal.TopicMatcher
import akka.util.HashCode
import akka.util.JavaDurationConverters._
import akka.util.ccompat.JavaConverters._
import com.typesafe.config.Config
Expand Down Expand Up @@ -75,25 +78,158 @@ object ConsumerFilter extends ExtensionId[ConsumerFilter] {
criteria.asJava
}

object Replay extends AbstractFunction2[String, Set[PersistenceIdOffset], Replay] {
def apply(streamId: String, persistenceIdOffsets: Set[PersistenceIdOffset]): Replay =
new Replay(streamId, persistenceIdOffsets, correlationId = None)

/**
* Use the `replayCorrelationId` from the `GrpcReadJournal`.
*/
def apply(streamId: String, persistenceIdOffsets: Set[PersistenceIdOffset], correlationId: UUID): Replay =
new Replay(streamId, persistenceIdOffsets, Some(correlationId))

def unapply(arg: Replay): Option[(String, Set[PersistenceIdOffset])] =
Some((arg.streamId, arg.persistenceIdOffsets))

}

/**
* Explicit request to replay events for given entities.
*
* Use the `replayCorrelationId` from the `GrpcReadJournal`.
*/
final case class Replay(streamId: String, persistenceIdOffsets: Set[PersistenceIdOffset]) extends SubscriberCommand {
final class Replay(
val streamId: String,
val persistenceIdOffsets: Set[PersistenceIdOffset],
val correlationId: Option[UUID])
extends Product2[String, Set[PersistenceIdOffset]] // for binary compatibility (used to be a case class)
with SubscriberCommand
with Serializable {

// for binary compatibility (used to be a case class)
def this(streamId: String, persistenceIdOffsets: Set[PersistenceIdOffset]) =
this(streamId, persistenceIdOffsets, None)

/** Java API */
def this(streamId: String, persistenceIdOffsets: JSet[PersistenceIdOffset]) =
this(streamId, persistenceIdOffsets.asScala.toSet)
this(streamId, persistenceIdOffsets.asScala.toSet, None)

/**
* Java API
*
* Use the `replayCorrelationId` from the `GrpcReadJournal`.
*/
def this(streamId: String, persistenceIdOffsets: JSet[PersistenceIdOffset], correlationId: UUID) =
this(streamId, persistenceIdOffsets.asScala.toSet, Some(correlationId))

def toReplayWithFilter: ReplayWithFilter =
new ReplayWithFilter(
streamId,
persistenceIdOffsets.map(p => ReplayPersistenceId(p, filterAfterSeqNr = Long.MaxValue)),
correlationId)

override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, streamId)
result = HashCode.hash(result, persistenceIdOffsets)
result = HashCode.hash(result, correlationId)
result
}

override def equals(obj: Any): Boolean = obj match {
case other: Replay =>
streamId == other.streamId && persistenceIdOffsets == other.persistenceIdOffsets && correlationId == other.correlationId
case _ => false
}

override def toString: String =
s"Replay($streamId,$persistenceIdOffsets,$correlationId)"

// for binary compatibility (used to be a case class)
def copy(
streamId: String = streamId,
persistenceIdOffsets: Set[PersistenceIdOffset] = persistenceIdOffsets): Replay =
new Replay(streamId, persistenceIdOffsets, correlationId)

// Product2, for binary compatibility (used to be a case class)
override def productPrefix = "Replay"
override def _1: String = streamId
override def _2: Set[PersistenceIdOffset] = persistenceIdOffsets
override def canEqual(that: Any): Boolean = that.isInstanceOf[Replay]
}

object ReplayWithFilter extends AbstractFunction2[String, Set[ReplayPersistenceId], ReplayWithFilter] {
def apply(streamId: String, replayPersistenceIds: Set[ReplayPersistenceId]): ReplayWithFilter =
new ReplayWithFilter(streamId, replayPersistenceIds, correlationId = None)

/**
* Use the `replayCorrelationId` from the `GrpcReadJournal`.
*/
def apply(streamId: String, replayPersistenceIds: Set[ReplayPersistenceId], correlationId: UUID): ReplayWithFilter =
new ReplayWithFilter(streamId, replayPersistenceIds, Some(correlationId))

def unapply(arg: ReplayWithFilter): Option[(String, Set[ReplayPersistenceId])] =
Some((arg.streamId, arg.replayPersistenceIds))

}

/**
* Explicit request to replay events for given entities.
*
* Use the `replayCorrelationId` from the `GrpcReadJournal`.
*/
final case class ReplayWithFilter(streamId: String, replayPersistenceIds: Set[ReplayPersistenceId])
extends SubscriberCommand {
final class ReplayWithFilter(
val streamId: String,
val replayPersistenceIds: Set[ReplayPersistenceId],
val correlationId: Option[UUID])
extends Product2[String, Set[ReplayPersistenceId]] // for binary compatibility (used to be a case class)
with SubscriberCommand
with Serializable {

// for binary compatibility (used to be a case class)
def this(streamId: String, replayPersistenceIds: Set[ReplayPersistenceId]) =
this(streamId, replayPersistenceIds, None)

/** Java API */
def this(streamId: String, persistenceIdOffsets: JSet[ReplayPersistenceId]) =
this(streamId, persistenceIdOffsets.asScala.toSet)
this(streamId, persistenceIdOffsets.asScala.toSet, None)

/**
* Java API
*
* Use the `replayCorrelationId` from the `GrpcReadJournal`.
*/
def this(streamId: String, persistenceIdOffsets: JSet[ReplayPersistenceId], correlationId: UUID) =
this(streamId, persistenceIdOffsets.asScala.toSet, Some(correlationId))

override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, streamId)
result = HashCode.hash(result, replayPersistenceIds)
result = HashCode.hash(result, correlationId)
result
}

override def equals(obj: Any): Boolean = obj match {
case other: ReplayWithFilter =>
streamId == other.streamId && replayPersistenceIds == other.replayPersistenceIds && correlationId == other.correlationId
case _ => false
}

override def toString: String =
s"ReplayWithFilter($streamId,$replayPersistenceIds,$correlationId)"

// for binary compatibility (used to be a case class)
def copy(
streamId: String = streamId,
replayPersistenceIds: Set[ReplayPersistenceId] = replayPersistenceIds): ReplayWithFilter =
new ReplayWithFilter(streamId, replayPersistenceIds, correlationId)

// Product2, for binary compatibility (used to be a case class)
override def productPrefix = "ReplayWithFilter"
override def _1: String = streamId
override def _2: Set[ReplayPersistenceId] = replayPersistenceIds
override def canEqual(that: Any): Boolean = that.isInstanceOf[ReplayWithFilter]
}

sealed trait FilterCriteria
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ package akka.projection.grpc.consumer.javadsl
import java.time.Instant
import java.util
import java.util.Optional
import java.util.UUID
import java.util.concurrent.CompletionStage

import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._

import akka.Done
import akka.NotUsed
import akka.actor.ClassicActorSystemProvider
Expand Down Expand Up @@ -95,6 +98,16 @@ final class GrpcReadJournal(delegate: scaladsl.GrpcReadJournal)
def streamId(): String =
delegate.streamId

/**
* Correlation id to be used with [[ConsumerFilter.ReplayWithFilter]].
* Such replay request will trigger replay in all `eventsBySlices` queries
* with the same `streamId` running from this instance of the `GrpcReadJournal`.
* Create separate instances of the `GrpcReadJournal` to have separation between
* replay requests for the same `streamId`.
*/
val replayCorrelationId: UUID =
delegate.replayCorrelationId

@InternalApi
private[akka] override def triggerReplay(persistenceId: String, fromSeqNr: Long, triggeredBySeqNr: Long): Unit =
delegate.triggerReplay(persistenceId, fromSeqNr, triggeredBySeqNr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Instant
import java.util.UUID
import java.util.concurrent.TimeUnit

import scala.collection.immutable
Expand Down Expand Up @@ -186,10 +187,19 @@ final class GrpcReadJournal private (
def this(system: ExtendedActorSystem, config: Config, cfgPath: String) =
this(system, config, cfgPath, ProtoAnySerialization.Prefer.Scala)

/**
* Correlation id to be used with [[ConsumerFilter.ReplayWithFilter]].
* Such replay request will trigger replay in all `eventsBySlices` queries
* with the same `streamId` running from this instance of the `GrpcReadJournal`.
* Create separate instances of the `GrpcReadJournal` to have separation between
* replay requests for the same `streamId`.
*/
val replayCorrelationId: UUID = UUID.randomUUID()

private implicit val typedSystem: akka.actor.typed.ActorSystem[_] = system.toTyped
private val persistenceExt = Persistence(system)

lazy val consumerFilter = ConsumerFilter(typedSystem)
lazy val consumerFilter: ConsumerFilter = ConsumerFilter(typedSystem)

private val client = EventProducerServiceClient(clientSettings)
private val additionalRequestHeaders = settings.additionalRequestMetadata match {
Expand All @@ -206,7 +216,8 @@ final class GrpcReadJournal private (
streamId,
Set(
ConsumerFilter
.ReplayPersistenceId(ConsumerFilter.PersistenceIdOffset(persistenceId, fromSeqNr), triggeredBySeqNr + 1)))
.ReplayPersistenceId(ConsumerFilter.PersistenceIdOffset(persistenceId, fromSeqNr), triggeredBySeqNr + 1)),
replayCorrelationId)
}

private def addRequestHeaders[Req, Res](
Expand Down Expand Up @@ -303,14 +314,11 @@ final class GrpcReadJournal private (
log.debug2("{}: Filter updated [{}]", streamId, criteria.mkString(", "))
StreamIn(StreamIn.Message.Filter(FilterReq(protoCriteria)))

case r @ ConsumerFilter.ReplayWithFilter(`streamId`, _) =>
case r @ ConsumerFilter.ReplayWithFilter(`streamId`, _) if r.correlationId.forall(_ == replayCorrelationId) =>
streamInReplay(r)

case ConsumerFilter.Replay(`streamId`, persistenceIdOffsets) =>
val replayWithFilter = ConsumerFilter.ReplayWithFilter(
streamId,
persistenceIdOffsets.map(p => ConsumerFilter.ReplayPersistenceId(p, filterAfterSeqNr = Long.MaxValue)))
streamInReplay(replayWithFilter)
case r @ ConsumerFilter.Replay(`streamId`, _) if r.correlationId.forall(_ == replayCorrelationId) =>
streamInReplay(r.toReplayWithFilter)
}
.mapMaterializedValue { ref =>
consumerFilter.ref ! ConsumerFilter.Subscribe(streamId, initCriteria, ref)
Expand Down

0 comments on commit 0841b3c

Please sign in to comment.