From 1c37af59021afe539991afb097904141fac246e3 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 5 Apr 2023 15:08:21 +0200 Subject: [PATCH] doc: Reference docs of filters * simplified the consumer side filter sample to also use tags --- .../javadsl/EventProducerSource.scala | 1 + .../producer/scaladsl/EventProducer.scala | 1 + ...rpc-replicated-event-sourcing-transport.md | 24 ++++ docs/src/main/paradox/grpc.md | 126 ++++++++++++++++++ .../analytics/ShoppingCartEventConsumer.java | 6 + .../README.md | 2 +- .../build.sbt | 2 +- .../main/protobuf/ShoppingCartEvents.proto | 6 - .../main/scala/shopping/analytics/Main.scala | 5 + .../analytics/ShoppingCartEventConsumer.scala | 30 +++-- .../java/shopping/cart/PublishEvents.java | 8 +- .../main/java/shopping/cart/ShoppingCart.java | 39 ++++++ .../shopping-cart-service-scala/README.md | 4 +- .../main/protobuf/ShoppingCartEvents.proto | 6 - .../main/protobuf/ShoppingCartService.proto | 12 -- .../scala/shopping/cart/PublishEvents.scala | 9 +- .../scala/shopping/cart/ShoppingCart.scala | 16 +++ .../cart/ShoppingCartServiceImpl.scala | 27 +--- 18 files changed, 249 insertions(+), 75 deletions(-) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerSource.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerSource.scala index 908e1d145..9617adef7 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerSource.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/javadsl/EventProducerSource.scala @@ -25,6 +25,7 @@ final class EventProducerSource( def this(entityType: String, streamId: String, transformation: Transformation, settings: EventProducerSettings) = this(entityType, streamId, transformation, settings, producerFilter = _ => true) + // FIXME, can't we have a type parameter for the event here? def withProducerFilter(producerFilter: java.util.function.Predicate[EventEnvelope[Any]]): EventProducerSource = new EventProducerSource(entityType, streamId, transformation, settings, producerFilter) diff --git a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala index cbf2395f8..f8f30bb82 100644 --- a/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala +++ b/akka-projection-grpc/src/main/scala/akka/projection/grpc/producer/scaladsl/EventProducer.scala @@ -62,6 +62,7 @@ object EventProducer { require(entityType.nonEmpty, "Stream id must not be empty") require(streamId.nonEmpty, "Stream id must not be empty") + // FIXME, can't we have a type parameter for the event here? def withProducerFilter(producerFilter: EventEnvelope[Any] => Boolean): EventProducerSource = new EventProducerSource(entityType, streamId, transformation, settings, producerFilter) diff --git a/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md b/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md index 91ecefb60..60ecd51e0 100644 --- a/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md +++ b/docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md @@ -177,6 +177,30 @@ For some scenarios it may be necessary to do a two-step deploy of format changes for a new serialization format so that all replicas can deserialize it, then a second deploy where the new field is actually populated with data. +## Filters + +By default, events from all Replicated Event Sourced entities are replicated. + +The same kind of filters as described in @ref:[Akka Projection gRPC Filters](grpc.md#filters) can be used for +Replicated Event Sourcing. + +The producer defined filter: + +Scala +: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init } + +Java +: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init } + +Consumer defined filters are updated as described in @ref:[Akka Projection gRPC Consumer defined filter](grpc.md#consumer-defined-filter) + +One thing to note is that `streamId` is always the same as the `entityType` when using Replicated Event Sourcing. + +The entity id based filter criteria must include the replica id as suffix to the entity id, with `|` separator. + +Replicated Event Sourcing is bidirectional replication, and therefore you would typically have to define the same +filters on both sides. That is not handled automatically. + ## Sample projects Source code and build files for complete sample projects can be found in the `akka/akka-projection` GitHub repository. diff --git a/docs/src/main/paradox/grpc.md b/docs/src/main/paradox/grpc.md index 17d4bd0e6..87ae8f898 100644 --- a/docs/src/main/paradox/grpc.md +++ b/docs/src/main/paradox/grpc.md @@ -135,6 +135,132 @@ Java This example includes an application specific `ShoppingCartService`, which is unrelated to Akka Projections gRPC, but it illustrates how to combine the `EventProducer` service with other gRPC services. +## Filters + +By default, events from all entities of the given entity type and slice range are emitted from the producer to the +consumer. The transformation function on the producer side can omit certain events, but the offsets for these +events are still transferred to the consumer, to ensure sequence number validations and offset storage. + +Filters can be used when a consumer is only interested in a subset of the entities. The filters can be defined +on both the producer side and on the consumer side, and they can be changed at runtime. + +### Tags + +Tags are typically used for the filters, so first an example of how to tag events in the entity. Here, the tag is +based on total quantity of the shopping cart, i.e. the state of the cart. The tags are included in the +@apidoc[akka.persistence.query.typed.EventEnvelope]. + +Scala +: @@snip [ShoppingCart.scala](/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #tags } + +Java +: @@snip [ShoppingCart.java](/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #tags } + + +### Producer defined filter + +The producer may define a filter function on the `EventProducerSource`. + +Scala +: @@snip [PublishEvents.scala](/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala) { #withProducerFilter } + +Java +: @@snip [PublishEvents.java](/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java) { #withProducerFilter } + +In this example the decision is based on tags, but the filter function can use anything in the +@apidoc[akka.persistence.query.typed.EventEnvelope] parameter or the event itself. Here, the entity sets the tag based +on the total quantity of the shopping cart, which requires the full state of the shopping cart and is not known from +an individual event. + +Note that the purpose of the `withProducerFilter` is to toggle if all events for the entity is to be emitted or not. +If the purpose is to filter out certain events you should instead use the `Transformation`. + +The producer filter is evaluated before the transformation function, i.e. the event is the original event and not +the transformed event. + +A producer filter that excludes an event wins over any consumer defined filter, i.e. if the producer filter function +returns `false` the event will not be emitted. + +### Consumer defined filter + +The consumer may define declarative filters that are sent to the producer and evaluated on the producer side +before emitting the events. + +Consumer filters consists of exclude and include criteria. In short, the exclude criteria are evaluated first and +may be overridden by an include criteria. More precisely, they are evaluated according to the following rules: + +* Exclude criteria are evaluated first. +* If no matching exclude the event is emitted. +* If an exclude is matching the include criteria are evaluated. +* If no matching include the event is discarded. +* If matching include the event is emitted. + +The exclude criteria can be a combination of: + +* `ExcludeTags` - exclude events with any of the given tags +* `ExcludeRegexEntityIds` - exclude events for entities with entity ids matching the given regular expressions +* `ExcludeEntityIds` - exclude events for entities with the given entity ids + +To exclude all events you can use `ExcludeRegexEntityIds` with `.*`. + +The exclude criteria can be a combination of: + +* `IncludeTags` - include events with any of the given tags +* `IncludeEntityIds` - include events for entities with the given entity ids + +The filter is updated with the @apidoc[ConsumerFilter] extension. + +Scala +: @@snip [ShoppingCartEventConsumer.scala](/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala) { #update-filter } + +Java +: @@snip [ShoppingCartEventConsumer.java](/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java) { #update-filter } + +Note that the `streamId` must match what is used when initializing the `GrpcReadJournal`, which by default is from +the config property `akka.projection.grpc.consumer.stream-id`. + +The filters can be dynamically changed in runtime without restarting the Projections or the `GrpcReadJournal`. The +updates are incremental. For example if you first add an `IncludeTags` of tag `"medium"` and then update the filter +with another `IncludeTags` of tag `"large"`, the full filter consists of both `"medium"` and `"large"`. + +To remove a filter criteria you would use the corresponding @apidoc[ConsumerFilter.RemoveCriteria], for example +`RemoveIncludeTags`. + +The updated filter is kept and remains after restarts of the Projection instances. If the consumer side is +running with Akka Cluster the filter is propagated to other nodes in the cluster automatically with +Akka Distributed Data. You only have to update at one place and it will be applied to all running Projections +with the given `streamId`. The filters will be cleared in case of a full Cluster stop, which means that you +need to take care of populating the initial filters at startup. + +See @apidoc[ConsumerFilter] for full API documentation. + +### Event replay + +When the consumer receives an event that is not the first event for the entity, and it hasn't processed and stored +the offset for the preceding event (previous sequence number) a replay of previous events will be triggered. +The reason is that the consumer is typically interested in all events for an entity and must process them in +the original order. Even though this is completely automatic it can be good to be aware of since it may have +a substantial performance impact to replay many events for many entities. + +The event replay is triggered "lazily" when a new event with unexpected sequence number is received, but with +the `ConsumerFilter.IncludeEntityIds` it is possible to explicitly define a sequence number from which the +replay will start immediately. You have the following choices for the sequence number in the `IncludeEntityIds` +criteria: + +* if the previously processed sequence number is known, the next (+1) sequence number can be defined +* `1` can be used to for replaying all events of the entity +* `0` can be used to not replay events immediately, but they will be replayed lazily as described previously + +Any duplicate events are filtered out by the Projection on the consumer side. This deduplication mechanism depends +on how long the Projection will keep old offsets. You may have to increase the configuration for this, but that has +the drawback of more memory usage. + +``` +akka.projection.r2dbc.offset-store.time-window = 15 minutes +``` + +Application level deduplication of idempotency may be needed if the Projection can't keep enough offsets in memory. + ## Sample projects Source code and build files for complete sample projects can be found in `akka/akka-projection` GitHub repository. diff --git a/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java b/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java index ce7fabc09..513498f99 100644 --- a/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java +++ b/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java @@ -138,5 +138,11 @@ public static void init(ActorSystem system) { ProjectionBehavior.stopMessage()); } + //#initProjections + //#update-filter + // FIXME + //#update-filter + //#initProjections + } //#initProjections diff --git a/samples/grpc/shopping-analytics-service-scala/README.md b/samples/grpc/shopping-analytics-service-scala/README.md index 5d1f551a5..316e1f174 100644 --- a/samples/grpc/shopping-analytics-service-scala/README.md +++ b/samples/grpc/shopping-analytics-service-scala/README.md @@ -18,6 +18,6 @@ 3. Start `shopping-cart-service` and add item to cart -4. Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter +4. Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter. 5. Notice the log output in the terminal of the `shopping-analytics-service` diff --git a/samples/grpc/shopping-analytics-service-scala/build.sbt b/samples/grpc/shopping-analytics-service-scala/build.sbt index cfd783773..ab3a3f9cb 100644 --- a/samples/grpc/shopping-analytics-service-scala/build.sbt +++ b/samples/grpc/shopping-analytics-service-scala/build.sbt @@ -35,7 +35,7 @@ val AkkaHttpVersion = "10.5.0" val AkkaManagementVersion = "1.3.0" val AkkaPersistenceR2dbcVersion = "1.1.0-M7+2-e241ee57-SNAPSHOT" val AkkaProjectionVersion = - sys.props.getOrElse("akka-projection.version", "1.4.0-M3-124-96b9d6c2-SNAPSHOT") // FIXME + sys.props.getOrElse("akka-projection.version", "1.4.0-M3-126-b8edf16f-20230404-1623-SNAPSHOT") // FIXME val AkkaDiagnosticsVersion = "2.0.0" enablePlugins(AkkaGrpcPlugin) diff --git a/samples/grpc/shopping-analytics-service-scala/src/main/protobuf/ShoppingCartEvents.proto b/samples/grpc/shopping-analytics-service-scala/src/main/protobuf/ShoppingCartEvents.proto index bf9d54d5d..9d6912a80 100644 --- a/samples/grpc/shopping-analytics-service-scala/src/main/protobuf/ShoppingCartEvents.proto +++ b/samples/grpc/shopping-analytics-service-scala/src/main/protobuf/ShoppingCartEvents.proto @@ -24,9 +24,3 @@ message ItemRemoved { message CheckedOut { string cartId = 1; } - -message CustomerDefined { - string cartId = 1; - string customerId = 2; - string category = 3; -} diff --git a/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/Main.scala b/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/Main.scala index 569175ea2..57a97df66 100644 --- a/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/Main.scala +++ b/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/Main.scala @@ -27,6 +27,11 @@ object Main { AkkaManagement(system).start() ClusterBootstrap(system).start() + ShoppingCartEventConsumer.updateConsumerFilter( + system, + excludeTags = Set("small"), + includeTags = Set("medium", "large")) + ShoppingCartEventConsumer.init(system) } diff --git a/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala b/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala index 3253ca6b7..77a453ad7 100644 --- a/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala +++ b/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala @@ -17,7 +17,6 @@ import akka.projection.r2dbc.scaladsl.R2dbcProjection import akka.projection.scaladsl.Handler import org.slf4j.LoggerFactory import shoppingcart.CheckedOut -import shoppingcart.CustomerDefined import shoppingcart.ItemAdded import shoppingcart.ItemQuantityAdjusted import shoppingcart.ItemRemoved @@ -85,19 +84,6 @@ object ShoppingCartEventConsumer { projectionId.id, checkedOut.cartId, totalCount) - case customerDefined: CustomerDefined => - log.info( - "Projection [{}] consumed CustomerDefined for cart {}. Total [{}] events.", - projectionId.id, - customerDefined.cartId, - totalCount) - // Only interested in gold customers, update the filter to exclude. - // In a real application it would have to remove such entityId filter when it's not relevant any more. - if (customerDefined.category != "gold") - ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter( - streamId, - List( - ConsumerFilter.ExcludeEntityIds(Set(customerDefined.cartId)))) case unknown => throw new IllegalArgumentException(s"Unknown event $unknown") @@ -160,5 +146,21 @@ object ShoppingCartEventConsumer { }) } + //#initProjections + //#update-filter + def updateConsumerFilter( + system: ActorSystem[_], + excludeTags: Set[String], + includeTags: Set[String]): Unit = { + val streamId = system.settings.config + .getString("akka.projection.grpc.consumer.stream-id") + val criteria = Vector( + ConsumerFilter.ExcludeTags(excludeTags), + ConsumerFilter.IncludeTags(includeTags)) + ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(streamId, criteria) + } + //#update-filter + //#initProjections + } //#initProjections diff --git a/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java b/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java index 103e83dbd..c78c49dd7 100644 --- a/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java +++ b/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java @@ -26,12 +26,18 @@ public static Function> eventProducer .registerMapper(ShoppingCart.ItemRemoved.class, event -> Optional.of(transformItemRemoved(event))) .registerMapper(ShoppingCart.CheckedOut.class, event -> Optional.of(transformCheckedOut(event))); + //#withProducerFilter EventProducerSource eventProducerSource = new EventProducerSource( "ShoppingCart", "cart", transformation, EventProducerSettings.apply(system) - ); + ) + //#eventProducerService + //#eventProducerService + // FIXME withProducerFilter + //#withProducerFilter + ; return EventProducer.grpcServiceHandler(system, eventProducerSource); } diff --git a/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java b/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java index 0fca6c2d8..5b28d59c7 100644 --- a/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java +++ b/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java @@ -31,13 +31,23 @@ * created, and when the entity is loaded from the database - each event will be replayed to * recreate the state of the entity. */ +//#tags public final class ShoppingCart extends EventSourcedBehaviorWithEnforcedReplies< ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> { + static final String SMALL_QUANTITY_TAG = "small"; + static final String MEDIUM_QUANTITY_TAG = "medium"; + static final String LARGE_QUANTITY_TAG = "large"; + + //#tags + /** The current state held by the `EventSourcedBehavior`. */ + //#tags static final class State implements CborSerializable { final Map items; + + //#tags private Optional checkoutDate; public State() { @@ -87,8 +97,27 @@ public State removeItem(String itemId) { public int itemCount(String itemId) { return items.get(itemId); } + + //#tags + public int totalQuantity() { + return items.values().stream().reduce(0, Integer::sum); + } + + public Set tags() { + int total = totalQuantity(); + if (total == 0) + return Collections.emptySet(); + else if (total >= 100) + return Collections.singleton(LARGE_QUANTITY_TAG); + else if (total >= 10) + return Collections.singleton(MEDIUM_QUANTITY_TAG); + else + return return Collections.singleton(SMALL_QUANTITY_TAG); + } } + //#tags + /** This interface defines all the commands (messages) that the ShoppingCart actor supports. */ interface Command extends CborSerializable {} @@ -320,6 +349,14 @@ private ShoppingCart(String cartId) { this.cartId = cartId; } + //#tags + @Override + public Set tagsFor(Event event) { + // FIXME state.tags + return Collections.emptySet(); + } + //#tags + @Override public RetentionCriteria retentionCriteria() { return RetentionCriteria.snapshotEvery(100, 3); @@ -459,4 +496,6 @@ public EventHandler eventHandler() { .onEvent(CheckedOut.class, (state, evt) -> state.checkout(evt.eventTime)) .build(); } + //#tags } +//#tags diff --git a/samples/grpc/shopping-cart-service-scala/README.md b/samples/grpc/shopping-cart-service-scala/README.md index ed4c7a450..e749ef4b1 100644 --- a/samples/grpc/shopping-cart-service-scala/README.md +++ b/samples/grpc/shopping-cart-service-scala/README.md @@ -34,7 +34,7 @@ curl http://localhost:9101/ready ``` -6. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl): +6. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl). Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter. ```shell # add item to cart @@ -49,8 +49,6 @@ # check out cart grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout - # set customer - grpcurl -d '{"cartId":"cart2", "customer": {"customerId":"customer2", "category":"gold"}}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.SetCustomer ``` or same `grpcurl` commands to port 8102 to reach node 2. diff --git a/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartEvents.proto b/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartEvents.proto index eb54b906c..fc721c46f 100644 --- a/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartEvents.proto +++ b/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartEvents.proto @@ -27,9 +27,3 @@ message ItemRemoved { message CheckedOut { string cartId = 1; } - -message CustomerDefined { - string cartId = 1; - string customerId = 2; - string category = 3; -} diff --git a/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto b/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto index 2ab2f123e..fbf4858ec 100644 --- a/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto +++ b/samples/grpc/shopping-cart-service-scala/src/main/protobuf/ShoppingCartService.proto @@ -12,7 +12,6 @@ service ShoppingCartService { rpc UpdateItem (UpdateItemRequest) returns (Cart) {} rpc Checkout (CheckoutRequest) returns (Cart) {} rpc GetCart (GetCartRequest) returns (Cart) {} - rpc SetCustomer (SetCustomerRequest) returns (Cart) {} } @@ -36,20 +35,9 @@ message GetCartRequest { string cartId = 1; } -message SetCustomerRequest { - string cartId = 1; - Customer customer = 2; -} - -message Customer { - string customerId = 1; - string category = 2; -} - message Cart { repeated Item items = 1; bool checkedOut = 2; - Customer customer = 3; } message Item { diff --git a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala index 41dc6cdd7..779c8b187 100644 --- a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala +++ b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala @@ -28,20 +28,22 @@ object PublishEvents { Some(transformItemRemoved(event))) .registerMapper[ShoppingCart.CheckedOut, proto.CheckedOut](event => Some(transformCheckedOut(event))) - .registerMapper[ShoppingCart.CustomerDefined, proto.CustomerDefined](event => - Some(transformCustomerDefined(event))) + //#withProducerFilter val eventProducerSource = EventProducer .EventProducerSource( "ShoppingCart", "cart", transformation, EventProducerSettings(system)) + //#eventProducerService .withProducerFilter { envelope => val tags = envelope.tags tags.contains(ShoppingCart.MediumQuantityTag) || tags.contains( ShoppingCart.LargeQuantityTag) } + //#eventProducerService + //#withProducerFilter EventProducer.grpcServiceHandler(eventProducerSource)(system) } @@ -69,7 +71,4 @@ object PublishEvents { def transformCheckedOut(event: ShoppingCart.CheckedOut): proto.CheckedOut = proto.CheckedOut(event.cartId) - def transformCustomerDefined(event: ShoppingCart.CustomerDefined): proto.CustomerDefined = - proto.CustomerDefined(event.cartId, event.customerId, event.category) - } diff --git a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala index f63ba45dc..0bdd84246 100644 --- a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala +++ b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala @@ -33,11 +33,14 @@ import akka.persistence.typed.scaladsl.RetentionCriteria * loaded from the database - each event will be replayed to recreate the state * of the entity. */ +//#tags object ShoppingCart { + //#tags /** * The current state held by the `EventSourcedBehavior`. */ + //#tags final case class State( items: Map[String, Int], checkoutDate: Option[Instant], @@ -45,6 +48,8 @@ object ShoppingCart { customerCategory: String) extends CborSerializable { + //#tags + def isCheckedOut: Boolean = checkoutDate.isDefined @@ -73,6 +78,7 @@ object ShoppingCart { def toSummary: Summary = Summary(items, isCheckedOut, customerId, customerCategory) + //#tags def totalQuantity: Int = items.valuesIterator.sum @@ -85,6 +91,9 @@ object ShoppingCart { } } + + //#tags + object State { val empty: State = State( @@ -186,15 +195,19 @@ object ShoppingCart { val EntityKey: EntityTypeKey[Command] = EntityTypeKey[Command]("ShoppingCart") + //#tags val SmallQuantityTag = "small" val MediumQuantityTag = "medium" val LargeQuantityTag = "large" + //#tags + def init(system: ActorSystem[_]): Unit = { ClusterSharding(system).init(Entity(EntityKey)(entityContext => ShoppingCart(entityContext.entityId))) } + //#tags def apply(cartId: String): Behavior[Command] = { EventSourcedBehavior .withEnforcedReplies[Command, Event, State]( @@ -210,6 +223,7 @@ object ShoppingCart { .onPersistFailure( SupervisorStrategy.restartWithBackoff(200.millis, 5.seconds, 0.1)) } + //#tags private def handleCommand( cartId: String, @@ -337,4 +351,6 @@ object ShoppingCart { state.setCustomer(customerId, category) } } + //#tags } +//#tags diff --git a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala index 70970bb99..bc26b5362 100644 --- a/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala +++ b/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCartServiceImpl.scala @@ -10,8 +10,6 @@ import akka.grpc.GrpcServiceException import akka.util.Timeout import io.grpc.Status import org.slf4j.LoggerFactory -import shopping.cart.proto.Cart -import shopping.cart.proto.SetCustomerRequest // tag::moreOperations[] import akka.actor.typed.ActorRef @@ -78,35 +76,12 @@ class ShoppingCartServiceImpl(system: ActorSystem[_]) convertError(response) } - override def setCustomer(in: SetCustomerRequest): Future[Cart] = { - in.customer match { - case Some(c) => - logger.info("setCustomer {} to cart {}", c.customerId, in.cartId) - val entityRef = sharding.entityRefFor(ShoppingCart.EntityKey, in.cartId) - val reply: Future[ShoppingCart.Summary] = - entityRef.askWithStatus( - ShoppingCart.SetCustomer(c.customerId, c.category, _)) - val response = reply.map(cart => toProtoCart(cart)) - convertError(response) - case None => - Future.failed(new GrpcServiceException( - Status.INVALID_ARGUMENT.withDescription("customer must be defined"))) - } - } - private def toProtoCart(cart: ShoppingCart.Summary): proto.Cart = { - val customer = - if (cart.customerId != "") - Some(proto.Customer(cart.customerId, cart.customerCategory)) - else - None - proto.Cart( cart.items.iterator.map { case (itemId, quantity) => proto.Item(itemId, quantity) }.toSeq, - cart.checkedOut, - customer) + cart.checkedOut) } private def convertError[T](response: Future[T]): Future[T] = {