Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: EventProducerSource.withStartingFromSnapshots and EventProducerPush #1049

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ final class EventProducerPush[Event] private (
val connectionMetadata: Optional[Metadata],
val grpcClientSettings: GrpcClientSettings) {

if (eventProducerSource.transformSnapshot.isPresent)
throw new IllegalArgumentException(
"`EventProducerSource.withStartingFromSnapshots` should not be used together with `EventProducerPush`. " +
"In that case `SourceProvider` with `eventsBySlicesStartingFromSnapshots` should be used instead.")

def handler(system: ActorSystem[_])
: FlowWithContext[EventEnvelope[Event], ProjectionContext, Done, ProjectionContext, NotUsed] = {
val eventConsumerClient = EventConsumerServiceClient(grpcClientSettings)(system)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ final class EventProducerSource(
withProducerFilter[Any](topicMatcher.matches(_, settings.topicTagPrefix))
}

/**
* Use snapshots as starting points and thereby reducing number of events that have to be loaded.
* This can be useful if the consumer start from zero without any previously processed
* offset or if it has been disconnected for a long while and its offset is far behind.
*
* First it loads all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one
* snapshot per persistenceId. The snapshots are transformed to events with the given `transformSnapshot` function.
*
* After emitting the snapshot events the ordinary events with sequence numbers after the snapshots are emitted.
*
* Important note: This should not be used together with [[EventProducerPush]]. In that case `SourceProvider` with
* `eventsBySlicesStartingFromSnapshots` should be used instead.
*/
def withStartingFromSnapshots[Snapshot, Event](transformSnapshot: java.util.function.Function[Snapshot, Event]) =
new EventProducerSource(
entityType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ object EventProducer {
withProducerFilter[Any](topicMatcher.matches(_, settings.topicTagPrefix))
}

/**
* Use snapshots as starting points and thereby reducing number of events that have to be loaded.
* This can be useful if the consumer start from zero without any previously processed
* offset or if it has been disconnected for a long while and its offset is far behind.
*
* First it loads all snapshots with timestamps greater than or equal to the offset timestamp. There is at most one
* snapshot per persistenceId. The snapshots are transformed to events with the given `transformSnapshot` function.
*
* After emitting the snapshot events the ordinary events with sequence numbers after the snapshots are emitted.
*
* Important note: This should not be used together with [[EventProducerPush]]. In that case `SourceProvider` with
* `eventsBySlicesStartingFromSnapshots` should be used instead.
*/
def withStartingFromSnapshots[Snapshot, Event](transformSnapshot: Snapshot => Event): EventProducerSource =
copy(transformSnapshot = Some(transformSnapshot.asInstanceOf[Any => Any]))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ final class EventProducerPush[Event](
val connectionMetadata: Option[Metadata],
val grpcClientSettings: GrpcClientSettings) {

if (eventProducerSource.transformSnapshot.isDefined)
throw new IllegalArgumentException(
"`EventProducerSource.withStartingFromSnapshots` should not be used together with `EventProducerPush`. " +
"In that case `SourceProvider` with `eventsBySlicesStartingFromSnapshots` should be used instead.")

def handler()(implicit system: ActorSystem[_])
: FlowWithContext[EventEnvelope[Event], ProjectionContext, Done, ProjectionContext, NotUsed] = {
val eventConsumerClient = EventConsumerServiceClient(grpcClientSettings)
Expand Down