Skip to content

Commit

Permalink
doc: Reference docs of filters
Browse files Browse the repository at this point in the history
* simplified the consumer side filter sample to also use tags
  • Loading branch information
patriknw committed Apr 5, 2023
1 parent c90ca11 commit 1c37af5
Show file tree
Hide file tree
Showing 18 changed files with 249 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ final class EventProducerSource(
def this(entityType: String, streamId: String, transformation: Transformation, settings: EventProducerSettings) =
this(entityType, streamId, transformation, settings, producerFilter = _ => true)

// FIXME, can't we have a type parameter for the event here?
def withProducerFilter(producerFilter: java.util.function.Predicate[EventEnvelope[Any]]): EventProducerSource =
new EventProducerSource(entityType, streamId, transformation, settings, producerFilter)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ object EventProducer {
require(entityType.nonEmpty, "Stream id must not be empty")
require(streamId.nonEmpty, "Stream id must not be empty")

// FIXME, can't we have a type parameter for the event here?
def withProducerFilter(producerFilter: EventEnvelope[Any] => Boolean): EventProducerSource =
new EventProducerSource(entityType, streamId, transformation, settings, producerFilter)

Expand Down
24 changes: 24 additions & 0 deletions docs/src/main/paradox/grpc-replicated-event-sourcing-transport.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,30 @@ For some scenarios it may be necessary to do a two-step deploy of format changes
for a new serialization format so that all replicas can deserialize it, then a second deploy where the new field is actually
populated with data.

## Filters

By default, events from all Replicated Event Sourced entities are replicated.

The same kind of filters as described in @ref:[Akka Projection gRPC Filters](grpc.md#filters) can be used for
Replicated Event Sourcing.

The producer defined filter:

Scala
: @@snip [ShoppingCart.scala](/samples/replicated/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #init }

Java
: @@snip [ShoppingCart.java](/samples/replicated/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #init }

Consumer defined filters are updated as described in @ref:[Akka Projection gRPC Consumer defined filter](grpc.md#consumer-defined-filter)

One thing to note is that `streamId` is always the same as the `entityType` when using Replicated Event Sourcing.

The entity id based filter criteria must include the replica id as suffix to the entity id, with `|` separator.

Replicated Event Sourcing is bidirectional replication, and therefore you would typically have to define the same
filters on both sides. That is not handled automatically.

## Sample projects

Source code and build files for complete sample projects can be found in the `akka/akka-projection` GitHub repository.
Expand Down
126 changes: 126 additions & 0 deletions docs/src/main/paradox/grpc.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,132 @@ Java
This example includes an application specific `ShoppingCartService`, which is unrelated to Akka Projections gRPC,
but it illustrates how to combine the `EventProducer` service with other gRPC services.

## Filters

By default, events from all entities of the given entity type and slice range are emitted from the producer to the
consumer. The transformation function on the producer side can omit certain events, but the offsets for these
events are still transferred to the consumer, to ensure sequence number validations and offset storage.

Filters can be used when a consumer is only interested in a subset of the entities. The filters can be defined
on both the producer side and on the consumer side, and they can be changed at runtime.

### Tags

Tags are typically used for the filters, so first an example of how to tag events in the entity. Here, the tag is
based on total quantity of the shopping cart, i.e. the state of the cart. The tags are included in the
@apidoc[akka.persistence.query.typed.EventEnvelope].

Scala
: @@snip [ShoppingCart.scala](/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/ShoppingCart.scala) { #tags }

Java
: @@snip [ShoppingCart.java](/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/ShoppingCart.java) { #tags }


### Producer defined filter

The producer may define a filter function on the `EventProducerSource`.

Scala
: @@snip [PublishEvents.scala](/samples/grpc/shopping-cart-service-scala/src/main/scala/shopping/cart/PublishEvents.scala) { #withProducerFilter }

Java
: @@snip [PublishEvents.java](/samples/grpc/shopping-cart-service-java/src/main/java/shopping/cart/PublishEvents.java) { #withProducerFilter }

In this example the decision is based on tags, but the filter function can use anything in the
@apidoc[akka.persistence.query.typed.EventEnvelope] parameter or the event itself. Here, the entity sets the tag based
on the total quantity of the shopping cart, which requires the full state of the shopping cart and is not known from
an individual event.

Note that the purpose of the `withProducerFilter` is to toggle if all events for the entity is to be emitted or not.
If the purpose is to filter out certain events you should instead use the `Transformation`.

The producer filter is evaluated before the transformation function, i.e. the event is the original event and not
the transformed event.

A producer filter that excludes an event wins over any consumer defined filter, i.e. if the producer filter function
returns `false` the event will not be emitted.

### Consumer defined filter

The consumer may define declarative filters that are sent to the producer and evaluated on the producer side
before emitting the events.

Consumer filters consists of exclude and include criteria. In short, the exclude criteria are evaluated first and
may be overridden by an include criteria. More precisely, they are evaluated according to the following rules:

* Exclude criteria are evaluated first.
* If no matching exclude the event is emitted.
* If an exclude is matching the include criteria are evaluated.
* If no matching include the event is discarded.
* If matching include the event is emitted.

The exclude criteria can be a combination of:

* `ExcludeTags` - exclude events with any of the given tags
* `ExcludeRegexEntityIds` - exclude events for entities with entity ids matching the given regular expressions
* `ExcludeEntityIds` - exclude events for entities with the given entity ids

To exclude all events you can use `ExcludeRegexEntityIds` with `.*`.

The exclude criteria can be a combination of:

* `IncludeTags` - include events with any of the given tags
* `IncludeEntityIds` - include events for entities with the given entity ids

The filter is updated with the @apidoc[ConsumerFilter] extension.

Scala
: @@snip [ShoppingCartEventConsumer.scala](/samples/grpc/shopping-analytics-service-scala/src/main/scala/shopping/analytics/ShoppingCartEventConsumer.scala) { #update-filter }

Java
: @@snip [ShoppingCartEventConsumer.java](/samples/grpc/shopping-analytics-service-java/src/main/java/shopping/analytics/ShoppingCartEventConsumer.java) { #update-filter }

Note that the `streamId` must match what is used when initializing the `GrpcReadJournal`, which by default is from
the config property `akka.projection.grpc.consumer.stream-id`.

The filters can be dynamically changed in runtime without restarting the Projections or the `GrpcReadJournal`. The
updates are incremental. For example if you first add an `IncludeTags` of tag `"medium"` and then update the filter
with another `IncludeTags` of tag `"large"`, the full filter consists of both `"medium"` and `"large"`.

To remove a filter criteria you would use the corresponding @apidoc[ConsumerFilter.RemoveCriteria], for example
`RemoveIncludeTags`.

The updated filter is kept and remains after restarts of the Projection instances. If the consumer side is
running with Akka Cluster the filter is propagated to other nodes in the cluster automatically with
Akka Distributed Data. You only have to update at one place and it will be applied to all running Projections
with the given `streamId`. The filters will be cleared in case of a full Cluster stop, which means that you
need to take care of populating the initial filters at startup.

See @apidoc[ConsumerFilter] for full API documentation.

### Event replay

When the consumer receives an event that is not the first event for the entity, and it hasn't processed and stored
the offset for the preceding event (previous sequence number) a replay of previous events will be triggered.
The reason is that the consumer is typically interested in all events for an entity and must process them in
the original order. Even though this is completely automatic it can be good to be aware of since it may have
a substantial performance impact to replay many events for many entities.

The event replay is triggered "lazily" when a new event with unexpected sequence number is received, but with
the `ConsumerFilter.IncludeEntityIds` it is possible to explicitly define a sequence number from which the
replay will start immediately. You have the following choices for the sequence number in the `IncludeEntityIds`
criteria:

* if the previously processed sequence number is known, the next (+1) sequence number can be defined
* `1` can be used to for replaying all events of the entity
* `0` can be used to not replay events immediately, but they will be replayed lazily as described previously

Any duplicate events are filtered out by the Projection on the consumer side. This deduplication mechanism depends
on how long the Projection will keep old offsets. You may have to increase the configuration for this, but that has
the drawback of more memory usage.

```
akka.projection.r2dbc.offset-store.time-window = 15 minutes
```

Application level deduplication of idempotency may be needed if the Projection can't keep enough offsets in memory.

## Sample projects

Source code and build files for complete sample projects can be found in `akka/akka-projection` GitHub repository.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,5 +138,11 @@ public static void init(ActorSystem<?> system) {
ProjectionBehavior.stopMessage());
}

//#initProjections
//#update-filter
// FIXME
//#update-filter
//#initProjections

}
//#initProjections
2 changes: 1 addition & 1 deletion samples/grpc/shopping-analytics-service-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,6 @@

3. Start `shopping-cart-service` and add item to cart

4. Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter
4. Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter.

5. Notice the log output in the terminal of the `shopping-analytics-service`
2 changes: 1 addition & 1 deletion samples/grpc/shopping-analytics-service-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ val AkkaHttpVersion = "10.5.0"
val AkkaManagementVersion = "1.3.0"
val AkkaPersistenceR2dbcVersion = "1.1.0-M7+2-e241ee57-SNAPSHOT"
val AkkaProjectionVersion =
sys.props.getOrElse("akka-projection.version", "1.4.0-M3-124-96b9d6c2-SNAPSHOT") // FIXME
sys.props.getOrElse("akka-projection.version", "1.4.0-M3-126-b8edf16f-20230404-1623-SNAPSHOT") // FIXME
val AkkaDiagnosticsVersion = "2.0.0"

enablePlugins(AkkaGrpcPlugin)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,3 @@ message ItemRemoved {
message CheckedOut {
string cartId = 1;
}

message CustomerDefined {
string cartId = 1;
string customerId = 2;
string category = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ object Main {
AkkaManagement(system).start()
ClusterBootstrap(system).start()

ShoppingCartEventConsumer.updateConsumerFilter(
system,
excludeTags = Set("small"),
includeTags = Set("medium", "large"))

ShoppingCartEventConsumer.init(system)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import akka.projection.r2dbc.scaladsl.R2dbcProjection
import akka.projection.scaladsl.Handler
import org.slf4j.LoggerFactory
import shoppingcart.CheckedOut
import shoppingcart.CustomerDefined
import shoppingcart.ItemAdded
import shoppingcart.ItemQuantityAdjusted
import shoppingcart.ItemRemoved
Expand Down Expand Up @@ -85,19 +84,6 @@ object ShoppingCartEventConsumer {
projectionId.id,
checkedOut.cartId,
totalCount)
case customerDefined: CustomerDefined =>
log.info(
"Projection [{}] consumed CustomerDefined for cart {}. Total [{}] events.",
projectionId.id,
customerDefined.cartId,
totalCount)
// Only interested in gold customers, update the filter to exclude.
// In a real application it would have to remove such entityId filter when it's not relevant any more.
if (customerDefined.category != "gold")
ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(
streamId,
List(
ConsumerFilter.ExcludeEntityIds(Set(customerDefined.cartId))))

case unknown =>
throw new IllegalArgumentException(s"Unknown event $unknown")
Expand Down Expand Up @@ -160,5 +146,21 @@ object ShoppingCartEventConsumer {
})
}

//#initProjections
//#update-filter
def updateConsumerFilter(
system: ActorSystem[_],
excludeTags: Set[String],
includeTags: Set[String]): Unit = {
val streamId = system.settings.config
.getString("akka.projection.grpc.consumer.stream-id")
val criteria = Vector(
ConsumerFilter.ExcludeTags(excludeTags),
ConsumerFilter.IncludeTags(includeTags))
ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(streamId, criteria)
}
//#update-filter
//#initProjections

}
//#initProjections
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,18 @@ public static Function<HttpRequest, CompletionStage<HttpResponse>> eventProducer
.registerMapper(ShoppingCart.ItemRemoved.class, event -> Optional.of(transformItemRemoved(event)))
.registerMapper(ShoppingCart.CheckedOut.class, event -> Optional.of(transformCheckedOut(event)));

//#withProducerFilter
EventProducerSource eventProducerSource = new EventProducerSource(
"ShoppingCart",
"cart",
transformation,
EventProducerSettings.apply(system)
);
)
//#eventProducerService
//#eventProducerService
// FIXME withProducerFilter
//#withProducerFilter
;

return EventProducer.grpcServiceHandler(system, eventProducerSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,23 @@
* created, and when the entity is loaded from the database - each event will be replayed to
* recreate the state of the entity.
*/
//#tags
public final class ShoppingCart
extends EventSourcedBehaviorWithEnforcedReplies<
ShoppingCart.Command, ShoppingCart.Event, ShoppingCart.State> {

static final String SMALL_QUANTITY_TAG = "small";
static final String MEDIUM_QUANTITY_TAG = "medium";
static final String LARGE_QUANTITY_TAG = "large";

//#tags

/** The current state held by the `EventSourcedBehavior`. */
//#tags
static final class State implements CborSerializable {
final Map<String, Integer> items;

//#tags
private Optional<Instant> checkoutDate;

public State() {
Expand Down Expand Up @@ -87,8 +97,27 @@ public State removeItem(String itemId) {
public int itemCount(String itemId) {
return items.get(itemId);
}

//#tags
public int totalQuantity() {
return items.values().stream().reduce(0, Integer::sum);
}

public Set<String> tags() {
int total = totalQuantity();
if (total == 0)
return Collections.emptySet();
else if (total >= 100)
return Collections.singleton(LARGE_QUANTITY_TAG);
else if (total >= 10)
return Collections.singleton(MEDIUM_QUANTITY_TAG);
else
return return Collections.singleton(SMALL_QUANTITY_TAG);
}
}

//#tags

/** This interface defines all the commands (messages) that the ShoppingCart actor supports. */
interface Command extends CborSerializable {}

Expand Down Expand Up @@ -320,6 +349,14 @@ private ShoppingCart(String cartId) {
this.cartId = cartId;
}

//#tags
@Override
public Set<String> tagsFor(Event event) {
// FIXME state.tags
return Collections.emptySet();
}
//#tags

@Override
public RetentionCriteria retentionCriteria() {
return RetentionCriteria.snapshotEvery(100, 3);
Expand Down Expand Up @@ -459,4 +496,6 @@ public EventHandler<State, Event> eventHandler() {
.onEvent(CheckedOut.class, (state, evt) -> state.checkout(evt.eventTime))
.build();
}
//#tags
}
//#tags
4 changes: 1 addition & 3 deletions samples/grpc/shopping-cart-service-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
curl http://localhost:9101/ready
```

6. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl):
6. Try it with [grpcurl](https://github.com/fullstorydev/grpcurl). Add at least a total quantity of 10 to the cart, smaller carts are excluded by the event filter.

```shell
# add item to cart
Expand All @@ -49,8 +49,6 @@
# check out cart
grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout
# set customer
grpcurl -d '{"cartId":"cart2", "customer": {"customerId":"customer2", "category":"gold"}}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.SetCustomer
```

or same `grpcurl` commands to port 8102 to reach node 2.
Loading

0 comments on commit 1c37af5

Please sign in to comment.