diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerPush.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerPush.scala index d5e8304b0..d9917c418 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerPush.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerPush.scala @@ -52,6 +52,11 @@ final class EventProducerPush[Event] private ( val connectionMetadata: Optional[Metadata], val grpcClientSettings: GrpcClientSettings) { + if (eventProducerSource.transformSnapshot.isPresent) + throw new IllegalArgumentException( + "`EventProducerSource.withStartingFromSnapshots` should not be used together with `EventProducerPush`. " + + "In that case `SourceProvider` with `eventsBySlicesStartingFromSnapshots` should be used instead.") + def handler(system: ActorSystem[_]) : FlowWithContext[EventEnvelope[Event], ProjectionContext, Done, ProjectionContext, NotUsed] = { val eventConsumerClient = EventConsumerServiceClient(grpcClientSettings)(system) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerSource.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerSource.scala index 0d7b14d57..a74615ab7 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerSource.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerSource.scala @@ -65,6 +65,19 @@ final class EventProducerSource( withProducerFilter[Any](topicMatcher.matches(_, settings.topicTagPrefix)) } + /** + * Use snapshots as starting points and thereby reducing number of events that have to be loaded. + * This can be useful if the consumer start from zero without any previously processed + * offset or if it has been disconnected for a long while and its offset is far behind. + * + * First it loads all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one + * snapshot per persistenceId. The snapshots are transformed to events with the given `transformSnapshot` function. + * + * After emitting the snapshot events the ordinary events with sequence numbers after the snapshots are emitted. + * + * Important note: This should not be used together with [[EventProducerPush]]. In that case `SourceProvider` with + * `eventsBySlicesStartingFromSnapshots` should be used instead. + */ def withStartingFromSnapshots[Snapshot, Event](transformSnapshot: java.util.function.Function[Snapshot, Event]) = new EventProducerSource( entityType, diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala index f136c002f..d6d29342e 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala @@ -88,6 +88,19 @@ object EventProducer { withProducerFilter[Any](topicMatcher.matches(_, settings.topicTagPrefix)) } + /** + * Use snapshots as starting points and thereby reducing number of events that have to be loaded. + * This can be useful if the consumer start from zero without any previously processed + * offset or if it has been disconnected for a long while and its offset is far behind. + * + * First it loads all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one + * snapshot per persistenceId. The snapshots are transformed to events with the given `transformSnapshot` function. + * + * After emitting the snapshot events the ordinary events with sequence numbers after the snapshots are emitted. + * + * Important note: This should not be used together with [[EventProducerPush]]. In that case `SourceProvider` with + * `eventsBySlicesStartingFromSnapshots` should be used instead. + */ def withStartingFromSnapshots[Snapshot, Event](transformSnapshot: Snapshot => Event): EventProducerSource = copy(transformSnapshot = Some(transformSnapshot.asInstanceOf[Any => Any])) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducerPush.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducerPush.scala index 92cf6c1f3..bd3d5b6bd 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducerPush.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducerPush.scala @@ -50,6 +50,11 @@ final class EventProducerPush[Event]( val connectionMetadata: Option[Metadata], val grpcClientSettings: GrpcClientSettings) { + if (eventProducerSource.transformSnapshot.isDefined) + throw new IllegalArgumentException( + "`EventProducerSource.withStartingFromSnapshots` should not be used together with `EventProducerPush`. " + + "In that case `SourceProvider` with `eventsBySlicesStartingFromSnapshots` should be used instead.") + def handler()(implicit system: ActorSystem[_]) : FlowWithContext[EventEnvelope[Event], ProjectionContext, Done, ProjectionContext, NotUsed] = { val eventConsumerClient = EventConsumerServiceClient(grpcClientSettings)