Skip to content

Commit

Permalink
chore: Consumer side filter in grpc samples
Browse files Browse the repository at this point in the history
  • Loading branch information
patriknw committed Mar 30, 2023
1 parent b09f852 commit 5ef35f9
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,9 @@ message ItemRemoved {
message CheckedOut {
string cartId = 1;
}

message CustomerDefined {
string cartId = 1;
string customerId = 2;
string category = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import akka.persistence.query.typed.EventEnvelope
import akka.projection.ProjectionBehavior
import akka.projection.ProjectionId
import akka.projection.eventsourced.scaladsl.EventSourcedProvider
import akka.projection.grpc.consumer.ConsumerFilter
import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal
import akka.projection.r2dbc.scaladsl.R2dbcProjection
import akka.projection.scaladsl.Handler
import org.slf4j.LoggerFactory
import shoppingcart.CheckedOut
import shoppingcart.CustomerDefined
import shoppingcart.ItemAdded
import shoppingcart.ItemQuantityAdjusted
import shoppingcart.ItemRemoved
Expand All @@ -28,7 +30,10 @@ object ShoppingCartEventConsumer {
LoggerFactory.getLogger("shopping.analytics.ShoppingCartEventConsumer")

//#eventHandler
private class EventHandler(projectionId: ProjectionId)
private class EventHandler(
projectionId: ProjectionId,
streamId: String,
system: ActorSystem[_])
extends Handler[EventEnvelope[AnyRef]] {
private var totalCount = 0
private var throughputStartTime = System.nanoTime()
Expand Down Expand Up @@ -80,6 +85,20 @@ object ShoppingCartEventConsumer {
projectionId.id,
checkedOut.cartId,
totalCount)
case customerDefined: CustomerDefined =>
log.info(
"Projection [{}] consumed CustomerDefined for cart {}. Total [{}] events.",
projectionId.id,
customerDefined.cartId,
totalCount)
// Only interested in gold customers, update the filter to exclude.
// In a real application it would have to remove such entityId filter when it's not relevant any more.
if (customerDefined.category != "gold")
ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter(
streamId,
List(
ConsumerFilter.ExcludeEntityIds(Set(customerDefined.cartId))))

case unknown =>
throw new IllegalArgumentException(s"Unknown event $unknown")
}
Expand Down Expand Up @@ -133,7 +152,11 @@ object ShoppingCartEventConsumer {
projectionId,
None,
sourceProvider,
() => new EventHandler(projectionId)))
() =>
new EventHandler(
projectionId,
eventsBySlicesQuery.streamId,
sys)))
})
}

Expand Down
6 changes: 3 additions & 3 deletions samples/grpc/shopping-cart-service-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
# check out cart
grpcurl -d '{"cartId":"cart1"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.Checkout
# get item popularity
grpcurl -d '{"itemId":"socks"}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.GetItemPopularity
# set customer
grpcurl -d '{"cartId":"cart2", "customer": {"customerId":"customer2", "category":"gold"}}' -plaintext 127.0.0.1:8101 shoppingcart.ShoppingCartService.SetCustomer
```

or same `grpcurl` commands to port 8102 to reach node 2.
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,9 @@ message ItemRemoved {
message CheckedOut {
string cartId = 1;
}

message CustomerDefined {
string cartId = 1;
string customerId = 2;
string category = 3;
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ service ShoppingCartService {
rpc UpdateItem (UpdateItemRequest) returns (Cart) {}
rpc Checkout (CheckoutRequest) returns (Cart) {}
rpc GetCart (GetCartRequest) returns (Cart) {}
rpc SetCustomer (SetCustomerRequest) returns (Cart) {}
}


Expand All @@ -35,9 +36,20 @@ message GetCartRequest {
string cartId = 1;
}

message SetCustomerRequest {
string cartId = 1;
Customer customer = 2;
}

message Customer {
string customerId = 1;
string category = 2;
}

message Cart {
repeated Item items = 1;
bool checkedOut = 2;
Customer customer = 3;
}

message Item {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ object PublishEvents {
Some(transformItemRemoved(event)))
.registerMapper[ShoppingCart.CheckedOut, proto.CheckedOut](event =>
Some(transformCheckedOut(event)))
.registerMapper[ShoppingCart.CustomerDefined, proto.CustomerDefined](event =>
Some(transformCustomerDefined(event)))

val eventProducerSource = EventProducer
.EventProducerSource(
Expand Down Expand Up @@ -67,4 +69,7 @@ object PublishEvents {
def transformCheckedOut(event: ShoppingCart.CheckedOut): proto.CheckedOut =
proto.CheckedOut(event.cartId)

def transformCustomerDefined(event: ShoppingCart.CustomerDefined): proto.CustomerDefined =
proto.CustomerDefined(event.cartId, event.customerId, event.category)

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ object ShoppingCart {
/**
* The current state held by the `EventSourcedBehavior`.
*/
final case class State(items: Map[String, Int], checkoutDate: Option[Instant])
final case class State(
items: Map[String, Int],
checkoutDate: Option[Instant],
customerId: String,
customerCategory: String)
extends CborSerializable {

def isCheckedOut: Boolean =
Expand All @@ -63,8 +67,11 @@ object ShoppingCart {
def checkout(now: Instant): State =
copy(checkoutDate = Some(now))

def setCustomer(customerId: String, category: String): State =
copy(customerId = customerId, customerCategory = category)

def toSummary: Summary =
Summary(items, isCheckedOut)
Summary(items, isCheckedOut, customerId, customerCategory)

def totalQuantity: Int =
items.valuesIterator.sum
Expand All @@ -80,7 +87,11 @@ object ShoppingCart {
}
object State {
val empty: State =
State(items = Map.empty, checkoutDate = None)
State(
items = Map.empty,
checkoutDate = None,
customerId = "",
customerCategory = "")
}

/**
Expand Down Expand Up @@ -131,9 +142,19 @@ object ShoppingCart {
/**
* Summary of the shopping cart state, used in reply messages.
*/
final case class Summary(items: Map[String, Int], checkedOut: Boolean)
final case class Summary(
items: Map[String, Int],
checkedOut: Boolean,
customerId: String,
customerCategory: String)
extends CborSerializable

final case class SetCustomer(
customerId: String,
category: String,
replyTo: ActorRef[StatusReply[Summary]])
extends Command

/**
* This interface defines all the events that the ShoppingCart supports.
*/
Expand All @@ -156,6 +177,12 @@ object ShoppingCart {

final case class CheckedOut(cartId: String, eventTime: Instant) extends Event

final case class CustomerDefined(
cartId: String,
customerId: String,
category: String)
extends Event

val EntityKey: EntityTypeKey[Command] =
EntityTypeKey[Command]("ShoppingCart")

Expand Down Expand Up @@ -258,6 +285,12 @@ object ShoppingCart {

case Get(replyTo) =>
Effect.reply(replyTo)(state.toSummary)

case SetCustomer(customerId, category, replyTo) =>
Effect
.persist(CustomerDefined(cartId, customerId, category))
.thenReply(replyTo)(updatedCart =>
StatusReply.Success(updatedCart.toSummary))
}
}

Expand All @@ -268,6 +301,11 @@ object ShoppingCart {
command match {
case Get(replyTo) =>
Effect.reply(replyTo)(state.toSummary)
case SetCustomer(customerId, category, replyTo) =>
Effect
.persist(CustomerDefined(cartId, customerId, category))
.thenReply(replyTo)(updatedCart =>
StatusReply.Success(updatedCart.toSummary))
case cmd: AddItem =>
Effect.reply(cmd.replyTo)(
StatusReply.Error(
Expand Down Expand Up @@ -296,6 +334,8 @@ object ShoppingCart {
state.updateItem(itemId, quantity)
case CheckedOut(_, eventTime) =>
state.checkout(eventTime)
case CustomerDefined(_, customerId, category) =>
state.setCustomer(customerId, category)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package shopping.cart

import java.util.concurrent.TimeoutException

import scala.concurrent.Future

import akka.actor.typed.ActorSystem
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.grpc.GrpcServiceException
import akka.util.Timeout
import io.grpc.Status
import org.slf4j.LoggerFactory
import shopping.cart.proto.Cart
import shopping.cart.proto.SetCustomerRequest

// tag::moreOperations[]
import akka.actor.typed.ActorRef
Expand Down Expand Up @@ -74,12 +78,35 @@ class ShoppingCartServiceImpl(system: ActorSystem[_])
convertError(response)
}

override def setCustomer(in: SetCustomerRequest): Future[Cart] = {
in.customer match {
case Some(c) =>
logger.info("setCustomer {} to cart {}", c.customerId, in.cartId)
val entityRef = sharding.entityRefFor(ShoppingCart.EntityKey, in.cartId)
val reply: Future[ShoppingCart.Summary] =
entityRef.askWithStatus(
ShoppingCart.SetCustomer(c.customerId, c.category, _))
val response = reply.map(cart => toProtoCart(cart))
convertError(response)
case None =>
Future.failed(new GrpcServiceException(
Status.INVALID_ARGUMENT.withDescription("customer must be defined")))
}
}

private def toProtoCart(cart: ShoppingCart.Summary): proto.Cart = {
val customer =
if (cart.customerId != "")
Some(proto.Customer(cart.customerId, cart.customerCategory))
else
None

proto.Cart(
cart.items.iterator.map { case (itemId, quantity) =>
proto.Item(itemId, quantity)
}.toSeq,
cart.checkedOut)
cart.checkedOut,
customer)
}

private def convertError[T](response: Future[T]): Future[T] = {
Expand Down

0 comments on commit 5ef35f9

Please sign in to comment.