Skip to content

Commit

Permalink
chore: EventProducerSource.withStartingFromSnapshots should not be us…
Browse files Browse the repository at this point in the history
…ed together with EventProducerPush (#1049)

* unfortunate, but EventProducerSource is shared between the two cases, and not easy to change that
  • Loading branch information
patriknw authored Oct 23, 2023
1 parent b49916e commit 416e5bd
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ final class EventProducerPush[Event] private (
val connectionMetadata: Optional[Metadata],
val grpcClientSettings: GrpcClientSettings) {

if (eventProducerSource.transformSnapshot.isPresent)
throw new IllegalArgumentException(
"`EventProducerSource.withStartingFromSnapshots` should not be used together with `EventProducerPush`. " +
"In that case `SourceProvider` with `eventsBySlicesStartingFromSnapshots` should be used instead.")

def handler(system: ActorSystem[_])
: FlowWithContext[EventEnvelope[Event], ProjectionContext, Done, ProjectionContext, NotUsed] = {
val eventConsumerClient = EventConsumerServiceClient(grpcClientSettings)(system)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ final class EventProducerSource(
withProducerFilter[Any](topicMatcher.matches(_, settings.topicTagPrefix))
}

/**
* Use snapshots as starting points and thereby reducing number of events that have to be loaded.
* This can be useful if the consumer start from zero without any previously processed
* offset or if it has been disconnected for a long while and its offset is far behind.
*
* First it loads all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one
* snapshot per persistenceId. The snapshots are transformed to events with the given `transformSnapshot` function.
*
* After emitting the snapshot events the ordinary events with sequence numbers after the snapshots are emitted.
*
* Important note: This should not be used together with [[EventProducerPush]]. In that case `SourceProvider` with
* `eventsBySlicesStartingFromSnapshots` should be used instead.
*/
def withStartingFromSnapshots[Snapshot, Event](transformSnapshot: java.util.function.Function[Snapshot, Event]) =
new EventProducerSource(
entityType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ object EventProducer {
withProducerFilter[Any](topicMatcher.matches(_, settings.topicTagPrefix))
}

/**
* Use snapshots as starting points and thereby reducing number of events that have to be loaded.
* This can be useful if the consumer start from zero without any previously processed
* offset or if it has been disconnected for a long while and its offset is far behind.
*
* First it loads all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one
* snapshot per persistenceId. The snapshots are transformed to events with the given `transformSnapshot` function.
*
* After emitting the snapshot events the ordinary events with sequence numbers after the snapshots are emitted.
*
* Important note: This should not be used together with [[EventProducerPush]]. In that case `SourceProvider` with
* `eventsBySlicesStartingFromSnapshots` should be used instead.
*/
def withStartingFromSnapshots[Snapshot, Event](transformSnapshot: Snapshot => Event): EventProducerSource =
copy(transformSnapshot = Some(transformSnapshot.asInstanceOf[Any => Any]))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ final class EventProducerPush[Event](
val connectionMetadata: Option[Metadata],
val grpcClientSettings: GrpcClientSettings) {

if (eventProducerSource.transformSnapshot.isDefined)
throw new IllegalArgumentException(
"`EventProducerSource.withStartingFromSnapshots` should not be used together with `EventProducerPush`. " +
"In that case `SourceProvider` with `eventsBySlicesStartingFromSnapshots` should be used instead.")

def handler()(implicit system: ActorSystem[_])
: FlowWithContext[EventEnvelope[Event], ProjectionContext, Done, ProjectionContext, NotUsed] = {
val eventConsumerClient = EventConsumerServiceClient(grpcClientSettings)
Expand Down

0 comments on commit 416e5bd

Please sign in to comment.