diff --git a/samples/grpc/local-drone-control-scala/README.md b/samples/grpc/local-drone-control-scala/README.md index caa9e2344..cbb194efe 100644 --- a/samples/grpc/local-drone-control-scala/README.md +++ b/samples/grpc/local-drone-control-scala/README.md @@ -6,25 +6,43 @@ located geographically close to the actual drones for short latencies and resili Drones interact with the closest control service in the following ways: * Report their precise location, at a high frequency - * FIXME Ask for the next delivery to perform + * Ask for the next delivery to perform + * Mark a delivery as completed The control service interacts with the global cloud service, represented by the separate restaurant-drone-deliveries-service sample, in the following ways: * Replicates a coarse grained location of each drone to the cloud, at a lower frequency, only when they change location at a coarse grained grid - * FIXME get restaurant to home delivery orders in the geographical area of the local drone control + * get restaurant to home delivery orders in the geographical area of the local drone control ## Running the sample Start one instance with: -``` +```shell sbt run ``` Posting updated location for a drone: +```shell +grpcurl -d '{"drone_id":"drone1", "coordinates": {"longitude": 18.07125, "latitude": 59.31834}, "altitude": 5}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation +``` + +Request assignment of a delivery (it needs to have reported location at least once first) + +```shell +grpcurl -d '{"drone_id":"drone1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.RequestNextDelivery ``` -grpcurl -d '{"drone_id":"drone1", "longitude": 18.07125, "latitude": 59.31834}' -plaintext 127.0.0.1:8080 local.drones.DroneService.ReportLocation + +Mark the delivery as completed +```shell +grpcurl -d '{"delivery_id":"order1"}' -plaintext 127.0.0.1:8080 local.drones.DroneService.CompleteDelivery +``` + +Inspect the current state of the local delivery queue + +```shell +grpcurl -plaintext 127.0.0.1:8080 local.drones.DeliveriesQueueService.GetCurrentQueue ``` \ No newline at end of file diff --git a/samples/grpc/local-drone-control-scala/build.sbt b/samples/grpc/local-drone-control-scala/build.sbt index 5a00b10c9..843643e5e 100644 --- a/samples/grpc/local-drone-control-scala/build.sbt +++ b/samples/grpc/local-drone-control-scala/build.sbt @@ -8,6 +8,7 @@ licenses := Seq( scalaVersion := "2.13.11" Compile / scalacOptions ++= Seq( + "-release:11", "-deprecation", "-feature", "-unchecked", diff --git a/samples/grpc/local-drone-control-scala/src/main/protobuf/central/deliveries/delivery_events.proto b/samples/grpc/local-drone-control-scala/src/main/protobuf/central/deliveries/delivery_events.proto new file mode 100644 index 000000000..760b7d1de --- /dev/null +++ b/samples/grpc/local-drone-control-scala/src/main/protobuf/central/deliveries/delivery_events.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "central.deliveries.proto"; + +package central.deliveries; + +// copy of the descriptor from restaurant-drone-deliveries who owns/publishes the events + +import "common/coordinates.proto"; + +message DeliveryRegistered { + string delivery_id = 1; + common.Coordinates origin = 2; + common.Coordinates destination = 3; +} \ No newline at end of file diff --git a/samples/grpc/local-drone-control-scala/src/main/protobuf/common/coordinates.proto b/samples/grpc/local-drone-control-scala/src/main/protobuf/common/coordinates.proto new file mode 100644 index 000000000..99f385d72 --- /dev/null +++ b/samples/grpc/local-drone-control-scala/src/main/protobuf/common/coordinates.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "common.proto"; + +package common; + +// generic messages, shared between local-drone-control and restaurant-drone-deliveries + +message Coordinates { + // latitude (north-south) in decimal degree coordinates + double latitude = 1; + // longitude (east west) in decimal degree coordinates + double longitude = 2; +} diff --git a/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/deliveries_queue_api.proto b/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/deliveries_queue_api.proto new file mode 100644 index 000000000..19bff6e72 --- /dev/null +++ b/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/deliveries_queue_api.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "local.drones.proto"; + +import "google/protobuf/empty.proto"; +import "common/coordinates.proto"; + +package local.drones; + +// gRPC definition for DroneService, for drones to interact with + +service DeliveriesQueueService { + rpc GetCurrentQueue (google.protobuf.Empty) returns (GetCurrentQueueResponse) {} +} + +message GetCurrentQueueResponse { + repeated WaitingDelivery waitingDeliveries = 1; + repeated DeliveryInProgress deliveriesInProgress = 2; +} + +message WaitingDelivery { + string delivery_id = 1; + common.Coordinates from = 2; + common.Coordinates to = 3; +} + +message DeliveryInProgress { + string delivery_id = 1; + string drone_id = 2; +} \ No newline at end of file diff --git a/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_api.proto b/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_api.proto index 722887008..08e81a261 100644 --- a/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_api.proto +++ b/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_api.proto @@ -4,22 +4,38 @@ option java_multiple_files = true; option java_package = "local.drones.proto"; import "google/protobuf/empty.proto"; +import "common/coordinates.proto"; package local.drones; -// gRPC definition for DroneService +// gRPC definition for DroneService, for drones to interact with service DroneService { rpc ReportLocation (ReportLocationRequest) returns (google.protobuf.Empty) {} + + // deliveries + rpc RequestNextDelivery (RequestNextDeliveryRequest) returns (RequestNextDeliveryResponse) {} + rpc CompleteDelivery (CompleteDeliveryRequest) returns (google.protobuf.Empty) {} } message ReportLocationRequest { string drone_id = 1; - // longitude in decimal degree coordinates - double longitude = 2; - // latitude in decimal degree coordinates - double latitude = 3; + common.Coordinates coordinates = 2; // altitude in meters double altitude = 4; +} + +message RequestNextDeliveryRequest { + string drone_id = 1; +} + +message RequestNextDeliveryResponse { + string delivery_id = 1; + common.Coordinates from = 2; + common.Coordinates to = 3; +} + +message CompleteDeliveryRequest { + string delivery_id = 1; } \ No newline at end of file diff --git a/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_events.proto b/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_events.proto index aacd2419b..326d7e64b 100644 --- a/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_events.proto +++ b/samples/grpc/local-drone-control-scala/src/main/protobuf/local/drones/drone_events.proto @@ -5,10 +5,12 @@ option java_package = "local.drones.proto"; package local.drones; -// events published by the drones +import "common/coordinates.proto"; + +// events published by the drone entity, for consumption/push to the cloud + message CoarseDroneLocation { - string drone_id = 1; - double latitude = 2; - double longitude = 3; + // coordinates but truncated to a coarse grained location + common.Coordinates coordinates = 1; } diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/application.conf b/samples/grpc/local-drone-control-scala/src/main/resources/application.conf index 9289186e9..9722e8b47 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/application.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/application.conf @@ -9,11 +9,9 @@ akka { } local-drone-control { - # unique identifier for the instance of local control, for example geographic location of the - # node, or ip - service-id = "kungsholmen" - service-id = ${?SERVICE_ID} - drone-service { - ask-timeout = 3s - } + # unique identifier for the instance of local control, must be known up front by the cloud service + location-id = "sweden/stockholm/kungsholmen" + location-id = ${?LOCATION_ID} + + ask-timeout = 3s } \ No newline at end of file diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf b/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf index 26ea6fb2d..ca5a3bcd5 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/grpc.conf @@ -15,4 +15,14 @@ akka.grpc.client.central-drone-control = { host = "127.0.0.1" port = 8101 use-tls = false +} + +akka.projection.grpc.consumer { + client { + # same as for producer above, so re-use config from there + host = ${akka.grpc.client.central-drone-control.host} + port = ${akka.grpc.client.central-drone-control.port} + use-tls = ${akka.grpc.client.central-drone-control.use-tls} + } + stream-id = "delivery-events" } \ No newline at end of file diff --git a/samples/grpc/local-drone-control-scala/src/main/resources/persistence.conf b/samples/grpc/local-drone-control-scala/src/main/resources/persistence.conf index 1cbdedb56..2604a56a5 100644 --- a/samples/grpc/local-drone-control-scala/src/main/resources/persistence.conf +++ b/samples/grpc/local-drone-control-scala/src/main/resources/persistence.conf @@ -6,6 +6,9 @@ akka { snapshot-store { plugin = "akka.persistence.r2dbc.snapshot" } + state { + plugin = "akka.persistence.r2dbc.state" + } r2dbc { # Single projection instance, no need for many topics diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Coordinates.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Coordinates.scala index 0f1f48609..680b20736 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Coordinates.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Coordinates.scala @@ -3,7 +3,37 @@ package local.drones /** * Decimal degree coordinates */ -final case class Coordinates(latitude: Double, longitude: Double) +final case class Coordinates(latitude: Double, longitude: Double) { + + import Coordinates._ + + def distanceTo(other: Coordinates): Long = { + // using the haversine formula https://en.wikipedia.org/wiki/Versine#hav + val latitudeDistance = Math.toRadians(latitude - other.latitude) + val longitudeDistance = Math.toRadians(longitude - other.longitude) + val sinLatitude = Math.sin(latitudeDistance / 2) + val sinLongitude = Math.sin(longitudeDistance / 2) + val a = sinLatitude * sinLatitude + + (Math.cos(Math.toRadians(latitude)) * + Math.cos(Math.toRadians(other.latitude)) * + sinLongitude * sinLongitude) + val c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)) + (roughlyRadiusOfEarthInM * c).toLong + } + + def toProto: common.proto.Coordinates = + common.proto.Coordinates(latitude, longitude) + +} + +object Coordinates { + + private val roughlyRadiusOfEarthInM = 6371000 + + def fromProto(pc: common.proto.Coordinates): Coordinates = + Coordinates(pc.latitude, pc.longitude) + +} final case class Position(coordinates: Coordinates, altitudeMeters: Double) @@ -19,4 +49,7 @@ object CoarseGrainedCoordinates { } -final case class CoarseGrainedCoordinates(latitude: Double, longitude: Double) +final case class CoarseGrainedCoordinates(latitude: Double, longitude: Double) { + def toProto: common.proto.Coordinates = + common.proto.Coordinates(latitude, longitude) +} diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueue.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueue.scala new file mode 100644 index 000000000..d4d4cd987 --- /dev/null +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueue.scala @@ -0,0 +1,116 @@ +package local.drones + +import akka.Done +import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } +import akka.actor.typed.{ ActorRef, Behavior } +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.pattern.StatusReply +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.state.scaladsl.{ DurableStateBehavior, Effect } + +import java.time.Instant + +object DeliveriesQueue { + + sealed trait Command extends CborSerializable + + final case class AddDelivery( + waitingDelivery: WaitingDelivery, + replyTo: ActorRef[Done]) + extends Command + + final case class RequestDelivery( + droneId: String, + droneCoordinates: Coordinates, + replyTo: ActorRef[StatusReply[WaitingDelivery]]) + extends Command + + final case class CompleteDelivery( + deliveryId: String, + replyTo: ActorRef[StatusReply[Done]]) + extends Command + + final case class GetCurrentState(replyTo: ActorRef[State]) extends Command + + final case class WaitingDelivery( + deliveryId: String, + from: Coordinates, + to: Coordinates) + + final case class DeliveryInProgress( + deliveryId: String, + droneId: String, + pickupTime: Instant) + final case class State( + waitingDeliveries: Vector[WaitingDelivery], + deliveriesInProgress: Vector[DeliveryInProgress]) + extends CborSerializable + + // Not really an entity, we just have one + val EntityKey = EntityTypeKey("RestaurantDeliveries") + + def apply(): Behavior[Command] = { + Behaviors.setup { context => + DurableStateBehavior[Command, State]( + PersistenceId(EntityKey.name, "DeliveriesQueue"), + State(Vector.empty, Vector.empty), + onCommand(context)) + } + } + + private def onCommand(context: ActorContext[Command])( + state: State, + command: Command): Effect[State] = + command match { + case AddDelivery(delivery, replyTo) => + context.log.info("Adding delivery [{}] to queue", delivery.deliveryId) + if (state.waitingDeliveries.contains( + delivery) || state.deliveriesInProgress.exists( + _.deliveryId == delivery.deliveryId)) + Effect.reply(replyTo)(Done) + else + Effect + .persist( + state.copy(waitingDeliveries = + state.waitingDeliveries :+ delivery)) + .thenReply(replyTo)(_ => Done) + + case RequestDelivery(droneId, droneCoordinates, replyTo) => + if (state.waitingDeliveries.isEmpty) + Effect.reply(replyTo)(StatusReply.Error("No waiting orders")) + else { + val closestPickupForDrone = state.waitingDeliveries.minBy(delivery => + droneCoordinates.distanceTo(delivery.from)) + context.log.info( + "Selected next delivery [{}] for drone [{}]", + closestPickupForDrone.deliveryId, + droneId) + // Note: A real application would have to care more about retries/lost data here + Effect + .persist( + state.copy( + waitingDeliveries = + state.waitingDeliveries.filterNot(_ == closestPickupForDrone), + state.deliveriesInProgress :+ DeliveryInProgress( + closestPickupForDrone.deliveryId, + droneId, + Instant.now()))) + .thenReply(replyTo)(_ => StatusReply.Success(closestPickupForDrone)) + } + + case CompleteDelivery(deliveryId, replyTo) => + if (!state.deliveriesInProgress.exists(_.deliveryId == deliveryId)) { + Effect.reply(replyTo)( + StatusReply.Error(s"Unknown delivery id: ${deliveryId}")) + } else { + Effect + .persist(state.copy(deliveriesInProgress = + state.deliveriesInProgress.filterNot(_.deliveryId == deliveryId))) + .thenReply(replyTo)(_ => StatusReply.Success(Done)) + } + + case GetCurrentState(replyTo) => + Effect.reply(replyTo)(state) + } + +} diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueueServiceImpl.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueueServiceImpl.scala new file mode 100644 index 000000000..8af5b6914 --- /dev/null +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveriesQueueServiceImpl.scala @@ -0,0 +1,38 @@ +package local.drones + +import akka.actor.typed.scaladsl.AskPattern._ +import akka.actor.typed.{ ActorRef, ActorSystem } +import akka.util.Timeout +import com.google.protobuf.empty.Empty +import local.drones.proto.DeliveriesQueueService + +import scala.concurrent.Future + +class DeliveriesQueueServiceImpl( + settings: Settings, + deliveriesQueue: ActorRef[DeliveriesQueue.Command])( + implicit system: ActorSystem[_]) + extends DeliveriesQueueService { + + import system.executionContext + private implicit val timeout: Timeout = settings.askTimeout + + override def getCurrentQueue( + in: Empty): Future[proto.GetCurrentQueueResponse] = { + val reply = deliveriesQueue.ask(DeliveriesQueue.GetCurrentState(_)) + + reply.map { state => + proto.GetCurrentQueueResponse( + waitingDeliveries = state.waitingDeliveries.map(waiting => + proto.WaitingDelivery( + deliveryId = waiting.deliveryId, + from = Some(waiting.from.toProto), + to = Some(waiting.to.toProto))), + deliveriesInProgress = state.deliveriesInProgress.map(inProgress => + proto.DeliveryInProgress( + deliveryId = inProgress.deliveryId, + droneId = inProgress.droneId))) + } + + } +} diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveryEvents.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveryEvents.scala new file mode 100644 index 000000000..27128165e --- /dev/null +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DeliveryEvents.scala @@ -0,0 +1,77 @@ +package local.drones + +import akka.actor.typed.{ ActorRef, ActorSystem, Behavior } +import akka.persistence.Persistence +import akka.persistence.query.typed.EventEnvelope +import akka.projection.eventsourced.scaladsl.EventSourcedProvider +import akka.projection.grpc.consumer.ConsumerFilter +import akka.projection.grpc.consumer.scaladsl.GrpcReadJournal +import akka.projection.r2dbc.scaladsl.R2dbcProjection +import akka.projection.scaladsl.Handler +import akka.projection.{ ProjectionBehavior, ProjectionId } +import akka.util.Timeout + +/** + * Consume delivery events from the cloud and pass to the delivery queue actor + */ +object DeliveryEvents { + + def projectionBehavior( + queueActor: ActorRef[DeliveriesQueue.Command], + settings: Settings)( + implicit system: ActorSystem[_]): Behavior[ProjectionBehavior.Command] = { + val projectionName: String = "delivery-events" + + implicit val timeout: Timeout = settings.askTimeout + + // initial consumer topic filter for location id + // FIXME no docs of setting up initial consumer filter, am I missing some API? + // Async setup is a race condition but maybe ok? Does not seem to quite work, all or the wrong events are delivered + ConsumerFilter(system).ref ! ConsumerFilter.UpdateFilter( + // FIXME stream-id duplicated in config + "delivery-events", + // location id already is in the format of a topic filter expression + Vector( + ConsumerFilter.ExcludeRegexEntityIds(Set(".*")), + ConsumerFilter.IncludeTopics(Set(settings.locationId)))) + + val eventsBySlicesQuery = + GrpcReadJournal( + List(central.deliveries.proto.DeliveryEventsProto.javaDescriptor)) + + // single projection handling all slices + val sliceRanges = + Persistence(system).sliceRanges(1) + val sliceRange = sliceRanges(0) + val projectionKey = + s"${eventsBySlicesQuery.streamId}-${sliceRange.min}-${sliceRange.max}" + val projectionId = ProjectionId.of(projectionName, projectionKey) + + val sourceProvider = EventSourcedProvider + .eventsBySlices[central.deliveries.proto.DeliveryRegistered]( + system, + eventsBySlicesQuery, + eventsBySlicesQuery.streamId, + sliceRange.min, + sliceRange.max) + + import akka.actor.typed.scaladsl.AskPattern._ + val handler: Handler[ + EventEnvelope[central.deliveries.proto.DeliveryRegistered]] = { + envelope => + queueActor.ask( + DeliveriesQueue.AddDelivery( + DeliveriesQueue.WaitingDelivery( + deliveryId = envelope.event.deliveryId, + from = Coordinates.fromProto(envelope.event.origin.get), + to = Coordinates.fromProto(envelope.event.destination.get)), + _)) + } + + ProjectionBehavior( + R2dbcProjection + .atLeastOnceAsync(projectionId, None, sourceProvider, () => handler)) + + } + +} diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala index 7c836a56b..ac2afbf47 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Drone.scala @@ -7,6 +7,7 @@ import akka.actor.typed.Behavior import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.pattern.StatusReply import akka.persistence.typed.PersistenceId import akka.persistence.typed.scaladsl.Effect import akka.persistence.typed.scaladsl.EventSourcedBehavior @@ -18,6 +19,9 @@ object Drone { final case class ReportPosition(position: Position, replyTo: ActorRef[Done]) extends Command + final case class GetCurrentPosition(replyTo: ActorRef[StatusReply[Position]]) + extends Command + sealed trait Event extends CborSerializable final case class PositionUpdated(position: Position) extends Event final case class CoarseGrainedLocationChanged( @@ -68,7 +72,7 @@ object Drone { .persist(PositionUpdated(position)) .thenReply(replyTo)(_ => Done) } else { - // new grid location + // no previous location known or new grid location Effect .persist( PositionUpdated(position), @@ -76,6 +80,16 @@ object Drone { .thenReply(replyTo)(_ => Done) } } + + case GetCurrentPosition(replyTo) => + state.currentPosition match { + case Some(position) => + Effect.reply(replyTo)(StatusReply.Success(position)) + case None => + Effect.reply(replyTo)( + StatusReply.Error("Position of drone is unknown")) + } + } private def handleEvent(state: State, event: Event): State = event match { diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala index b9acd9fd9..321d7b0b9 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneEvents.scala @@ -24,14 +24,11 @@ object DroneEvents { val StreamId = "drone-events" - def eventToCloudPushBehavior( + def eventToCloudPushBehavior(settings: Settings)( implicit system: ActorSystem[_]): Behavior[ProjectionBehavior.Command] = { - val producerOriginId = - system.settings.config.getString("local-drone-control.service-id") - logger.info( "Pushing events to central cloud, origin id [{}]", - producerOriginId) + settings.locationId) // turn events into a public protocol (protobuf) type before publishing val eventTransformation = @@ -40,15 +37,12 @@ object DroneEvents { proto.CoarseDroneLocation] { envelope => val event = envelope.event Future.successful( - Some( - proto.CoarseDroneLocation( - envelope.persistenceId, - event.coordinates.latitude, - event.coordinates.longitude))) + Some(proto.CoarseDroneLocation(Some(event.coordinates.toProto)))) } val eventProducer = EventProducerPush[Drone.Event]( - originId = producerOriginId, + // location id is unique and informative, so use it as producer origin id as well + originId = settings.locationId, eventProducerSource = EventProducerSource[Drone.Event]( Drone.EntityKey.name, StreamId, diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala index 543e7cd3e..1d5942653 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/DroneServiceImpl.scala @@ -1,7 +1,8 @@ package local.drones import akka.Done -import akka.actor.typed.ActorSystem +import akka.actor.typed.scaladsl.AskPattern._ +import akka.actor.typed.{ ActorRef, ActorSystem } import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.grpc.GrpcServiceException import akka.util.Timeout @@ -13,36 +14,73 @@ import local.drones.proto import scala.concurrent.Future import scala.concurrent.TimeoutException -class DroneServiceImpl(system: ActorSystem[_]) extends proto.DroneService { +class DroneServiceImpl( + deliveriesQueue: ActorRef[DeliveriesQueue.Command], + settings: Settings)(implicit system: ActorSystem[_]) + extends proto.DroneService { import system.executionContext private val logger = LoggerFactory.getLogger(getClass) - implicit private val timeout: Timeout = - Timeout.create( - system.settings.config - .getDuration("local-drone-control.drone-service.ask-timeout")) + implicit private val timeout: Timeout = settings.askTimeout private val sharding = ClusterSharding(system) override def reportLocation( in: proto.ReportLocationRequest): Future[Empty] = { + val coordinates = in.coordinates.getOrElse { + throw new GrpcServiceException( + Status.INVALID_ARGUMENT.withDescription( + "coordinates are required but missing")) + } logger.info( "Report location ({},{},{}) for drone {}", - in.latitude, - in.longitude, + coordinates.latitude, + coordinates.longitude, in.altitude, in.droneId) val entityRef = sharding.entityRefFor(Drone.EntityKey, in.droneId) val reply: Future[Done] = entityRef.ask( Drone.ReportPosition( - Position(Coordinates(in.latitude, in.longitude), in.altitude), + Position(Coordinates.fromProto(coordinates), in.altitude), _)) val response = reply.map(_ => Empty.defaultInstance) convertError(response) } + override def requestNextDelivery(in: proto.RequestNextDeliveryRequest) + : Future[proto.RequestNextDeliveryResponse] = { + logger.info("Drone {} requesting next delivery", in.droneId) + + // get location for drone + val entityRef = sharding.entityRefFor(Drone.EntityKey, in.droneId) + + // ask for closest delivery + val response = for { + position <- entityRef.askWithStatus[Position](Drone.GetCurrentPosition(_)) + chosenDelivery <- deliveriesQueue + .askWithStatus[DeliveriesQueue.WaitingDelivery]( + DeliveriesQueue.RequestDelivery(in.droneId, position.coordinates, _)) + } yield { + proto.RequestNextDeliveryResponse( + deliveryId = chosenDelivery.deliveryId, + from = Some(chosenDelivery.from.toProto), + to = Some(chosenDelivery.to.toProto)) + } + + convertError(response) + } + + override def completeDelivery( + in: proto.CompleteDeliveryRequest): Future[Empty] = { + logger.info("Delivery {} completed", in.deliveryId) + + deliveriesQueue + .askWithStatus[Done](DeliveriesQueue.CompleteDelivery(in.deliveryId, _)) + .map(_ => Empty.defaultInstance) + } + private def convertError[T](response: Future[T]): Future[T] = { response.recoverWith { case _: TimeoutException => @@ -52,7 +90,7 @@ class DroneServiceImpl(system: ActorSystem[_]) extends proto.DroneService { case exc => Future.failed( new GrpcServiceException( - Status.INVALID_ARGUMENT.withDescription(exc.getMessage))) + Status.INTERNAL.withDescription(exc.getMessage))) } } } diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/LocalDroneControlServer.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/LocalDroneControlServer.scala index 5bef8ea03..1ffb68660 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/LocalDroneControlServer.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/LocalDroneControlServer.scala @@ -19,16 +19,19 @@ object LocalDroneControlServer { interface: String, port: Int, system: ActorSystem[_], - droneGrpcService: proto.DroneService): Unit = { + droneService: proto.DroneService, + deliveriesQueueService: proto.DeliveriesQueueService): Unit = { implicit val sys: ActorSystem[_] = system implicit val ec: ExecutionContext = system.executionContext val service: HttpRequest => Future[HttpResponse] = ServiceHandler.concatOrNotFound( - proto.DroneServiceHandler.partial(droneGrpcService), + proto.DroneServiceHandler.partial(droneService), + proto.DeliveriesQueueServiceHandler.partial(deliveriesQueueService), // ServerReflection enabled to support grpcurl without import-path and proto parameters - ServerReflection.partial(List(proto.DroneService))) + ServerReflection.partial( + List(proto.DroneService, proto.DeliveriesQueueService))) val bound = Http() diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala index 3014f5fdb..7ae7da5da 100644 --- a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Main.scala @@ -1,48 +1,58 @@ package local.drones -import akka.actor.typed.{ ActorSystem, Props, SpawnProtocol } +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.{ ActorSystem, Behavior } import akka.cluster.typed.Cluster import akka.cluster.typed.Join import org.slf4j.LoggerFactory -import scala.util.control.NonFatal - object Main { val logger = LoggerFactory.getLogger("drones.Main") def main(args: Array[String]): Unit = { - val system = - ActorSystem[SpawnProtocol.Command](SpawnProtocol(), "local-drone-control") - try { - init(system) - } catch { - case NonFatal(e) => - logger.error("Terminating due to initialization failure.", e) - system.terminate() - } - + ActorSystem[Nothing](rootBehavior(), "local-drone-control") } - def init(system: ActorSystem[SpawnProtocol.Command]): Unit = { - // A single node, but still a cluster, to be able to run sharding for the entities - val cluster = Cluster(system) - cluster.manager ! Join(cluster.selfMember.address) - - Drone.init(system) - system ! SpawnProtocol.Spawn( - DroneEvents.eventToCloudPushBehavior(system), - "DroneEventsProjection", - Props.empty, - system.ignoreRef) - - val grpcInterface = - system.settings.config.getString("local-drone-control.grpc.interface") - val grpcPort = - system.settings.config.getInt("local-drone-control.grpc.port") - val grpcService = - new DroneServiceImpl(system) - LocalDroneControlServer.start(grpcInterface, grpcPort, system, grpcService) + private def rootBehavior(): Behavior[Nothing] = Behaviors.setup[Nothing] { + context => + val settings = Settings(context.system) + + context.log + .info("Local Drone Control [{}] starting up", settings.locationId) + + // A single node, but still a cluster, to be able to run sharding for the entities + val cluster = Cluster(context.system) + cluster.manager ! Join(cluster.selfMember.address) + + // keep track of local drones, project aggregate info to the cloud + Drone.init(context.system) + context.spawn( + DroneEvents.eventToCloudPushBehavior(settings)(context.system), + "DroneEventsProjection") + + // consume delivery events from the cloud service + val deliveriesQueue = context.spawn(DeliveriesQueue(), "DeliveriesQueue") + context.spawn( + DeliveryEvents.projectionBehavior(deliveriesQueue, settings)( + context.system), + "DeliveriesProjection") + val deliveriesQueueService = new DeliveriesQueueServiceImpl(settings, deliveriesQueue)(context.system) + + val grpcInterface = + context.system.settings.config + .getString("local-drone-control.grpc.interface") + val grpcPort = + context.system.settings.config.getInt("local-drone-control.grpc.port") + val droneService = + new DroneServiceImpl(deliveriesQueue, settings)(context.system) + LocalDroneControlServer.start( + grpcInterface, + grpcPort, + context.system, + droneService, + deliveriesQueueService) + + Behaviors.empty } - } diff --git a/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Settings.scala b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Settings.scala new file mode 100644 index 000000000..ba6562a7e --- /dev/null +++ b/samples/grpc/local-drone-control-scala/src/main/scala/local/drones/Settings.scala @@ -0,0 +1,19 @@ +package local.drones + +import akka.actor.typed.ActorSystem +import akka.util.Timeout + +import scala.jdk.DurationConverters.JavaDurationOps + +object Settings { + def apply(system: ActorSystem[_]): Settings = { + val config = system.settings.config.getConfig("local-drone-control") + + val locationId = config.getString("location-id") + val askTimeout = config.getDuration("ask-timeout").toScala + + Settings(locationId, askTimeout) + } +} + +final case class Settings(locationId: String, askTimeout: Timeout) diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/README.md b/samples/grpc/restaurant-drone-deliveries-service-scala/README.md index 0e14eff3e..d00da5e86 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/README.md +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/README.md @@ -47,7 +47,7 @@ curl http://localhost:9101/ready Query a location for drone coordinates ```shell -grpcurl -d '{"location":"kungsholmen"}' -plaintext localhost:8101 central.drones.DroneOverviewService/GetCoarseDroneLocations +grpcurl -d '{"location":"sweden/stockholm/kungsholmen"}' -plaintext localhost:8101 central.drones.DroneOverviewService/GetCoarseDroneLocations ``` Query the state for a specific drone @@ -56,4 +56,26 @@ Query the state for a specific drone grpcurl -d '{"drone_id":"drone1"}' -plaintext localhost:8101 central.drones.DroneOverviewService.GetDroneOverview ``` -FIXME more stuff \ No newline at end of file +Set up a restaurant + +```shell +grpcurl -d '{"restaurant_id":"restaurant1","coordinates":{"latitude": 59.330324, "longitude": 18.039568}, "local_control_location_id": "sweden/stockholm/kungsholmen" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant +``` + +Set up another restaurant, closest to a different local drone control + +```shell +grpcurl -d '{"restaurant_id":"restaurant2","coordinates":{"latitude": 59.342046, "longitude": 18.059095}, "local_control_location_id": "sweden/stockholm/norrmalm" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant +``` + +Register a delivery for the first restaurant + +```shell +grpcurl -d '{"restaurant_id":"restaurant1","delivery_id": "order1","coordinates":{"latitude": 59.330841, "longitude": 18.038885}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery +``` + +Register a delivery for the second restaurant + +```shell +grpcurl -d '{"restaurant_id":"restaurant2","delivery_id": "order2","coordinates":{"latitude": 59.340128, "longitude": 18.056303}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery +``` diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt b/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt index 46616c7eb..e92ae50cf 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/build.sbt @@ -8,7 +8,7 @@ licenses := Seq( scalaVersion := "2.13.11" Compile / scalacOptions ++= Seq( - "-target:11", + "-release:11", "-deprecation", "-feature", "-unchecked", diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/central/deliveries/delivery_events.proto b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/central/deliveries/delivery_events.proto new file mode 100644 index 000000000..0a98ec3db --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/central/deliveries/delivery_events.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "central.deliveries.proto"; + +package central.deliveries; + +import "common/coordinates.proto"; + +message DeliveryRegistered { + string delivery_id = 1; + common.Coordinates origin = 2; + common.Coordinates destination = 3; +} \ No newline at end of file diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/central/deliveries/restaurant_deliveries_api.proto b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/central/deliveries/restaurant_deliveries_api.proto new file mode 100644 index 000000000..b832f6043 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/central/deliveries/restaurant_deliveries_api.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "central.deliveries.proto"; + +package central.deliveries; + +import "common/coordinates.proto"; + +service RestaurantDeliveriesService { + rpc SetUpRestaurant(SetUpRestaurantRequest) returns (RegisterRestaurantResponse) {}; + rpc RegisterDelivery(RegisterDeliveryRequest) returns (RegisterDeliveryResponse) {}; +} + +message SetUpRestaurantRequest { + string restaurant_id = 1; + common.Coordinates coordinates = 2; + string local_control_location_id = 3; +} + +message RegisterRestaurantResponse {} + +message RegisterDeliveryRequest { + string delivery_id = 1; + string restaurant_id = 2; + common.Coordinates coordinates = 3; +} + +message RegisterDeliveryResponse { + +} \ No newline at end of file diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/central/drones/drone_overview_api.proto b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/central/drones/drone_overview_api.proto index 6ace97b15..ea649a32b 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/central/drones/drone_overview_api.proto +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/central/drones/drone_overview_api.proto @@ -5,6 +5,8 @@ option java_package = "central.drones.proto"; package central.drones; +import "common/coordinates.proto"; + service DroneOverviewService { rpc GetDroneOverview(GetDroneOverviewRequest) returns (GetDroneOverviewResponse) {} rpc GetCoarseDroneLocations(CoarseDroneLocationsRequest) returns (CoarseDroneLocationsResponse) {} @@ -20,9 +22,8 @@ message CoarseDroneLocationsResponse { } message CoarseDroneLocations { - double coarse_latitude = 1; - double coarse_longitude = 2; - repeated string drones = 3; + common.Coordinates coordinates = 1; + repeated string drones = 2; } message GetDroneOverviewRequest { diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/common/coordinates.proto b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/common/coordinates.proto new file mode 100644 index 000000000..99f385d72 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/common/coordinates.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "common.proto"; + +package common; + +// generic messages, shared between local-drone-control and restaurant-drone-deliveries + +message Coordinates { + // latitude (north-south) in decimal degree coordinates + double latitude = 1; + // longitude (east west) in decimal degree coordinates + double longitude = 2; +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/local/drones/drone_events.proto b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/local/drones/drone_events.proto index aacd2419b..94535d78f 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/local/drones/drone_events.proto +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/protobuf/local/drones/drone_events.proto @@ -5,10 +5,12 @@ option java_package = "local.drones.proto"; package local.drones; +import "common/coordinates.proto"; + +// copy of the descriptor from local-drone-control who owns/publishes it + // events published by the drones message CoarseDroneLocation { - string drone_id = 1; - double latitude = 2; - double longitude = 3; + common.Coordinates coordinates = 1; } diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/application.conf b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/application.conf index 3ef709a36..0d28e0eb8 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/application.conf +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/resources/application.conf @@ -15,9 +15,20 @@ akka.projection.grpc { restaurant-drone-deliveries-service { drone-ask-timeout = 3s + restaurant-deliveries-ask-timeout = 3s + drones { # Run this many local projections to consume the pushed drone updates from the local drone control services # to the central drone durable state entities projections-slice-count = 4 } + + local-drone-control { + # local drone control locations needs to be known up front, + # restaurants are tied to the closest location + locations = [ + "sweden/stockholm/kungsholmen", + "sweden/stockholm/norrmalm" + ] + } } diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Coordinates.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Coordinates.scala similarity index 65% rename from samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Coordinates.scala rename to samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Coordinates.scala index 72c3f23ec..be354cfa7 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Coordinates.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Coordinates.scala @@ -1,4 +1,4 @@ -package central.drones +package central // common location representations, could be a shared library between local control and restaurant // but to keep project structure simple we duplicate @@ -6,7 +6,16 @@ package central.drones /** * Decimal degree coordinates */ -final case class Coordinates(latitude: Double, longitude: Double) +final case class Coordinates(latitude: Double, longitude: Double) { + def toProto: common.proto.Coordinates = + common.proto.Coordinates(latitude, longitude) +} + +object Coordinates { + def fromProto(pc: common.proto.Coordinates): Coordinates = + Coordinates(pc.latitude, pc.longitude) + +} object CoarseGrainedCoordinates { @@ -18,6 +27,9 @@ object CoarseGrainedCoordinates { Math.floor(location.longitude * 100 + 0.5d) / 100) } + def fromProto(pc: common.proto.Coordinates): CoarseGrainedCoordinates = + CoarseGrainedCoordinates(pc.latitude, pc.longitude) + } final case class CoarseGrainedCoordinates(latitude: Double, longitude: Double) diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DeliveriesSettings.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DeliveriesSettings.scala new file mode 100644 index 000000000..54cb07591 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DeliveriesSettings.scala @@ -0,0 +1,19 @@ +package central + +import akka.actor.typed.ActorSystem + +import scala.jdk.CollectionConverters._ + +object DeliveriesSettings { + def apply(system: ActorSystem[_]): DeliveriesSettings = { + val config = + system.settings.config.getConfig("restaurant-drone-deliveries-service") + val locationIds = + config.getStringList("local-drone-control.locations").asScala.toSet + + // FIXME move timeouts here as well + DeliveriesSettings(locationIds) + } +} + +case class DeliveriesSettings(locationIds: Set[String]) diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala index 001ef0318..d40796381 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/DroneDeliveriesServer.scala @@ -7,6 +7,8 @@ import akka.http.scaladsl.Http import akka.http.scaladsl.model.HttpRequest import akka.http.scaladsl.model.HttpResponse import central.Main.logger +import central.deliveries.proto.RestaurantDeliveriesService +import central.deliveries.proto.RestaurantDeliveriesServiceHandler import central.drones.proto.DroneOverviewService import central.drones.proto.DroneOverviewServiceHandler @@ -20,6 +22,10 @@ object DroneDeliveriesServer { interface: String, port: Int, droneOverviewService: central.drones.proto.DroneOverviewService, + restaurantDeliveriesService: central.deliveries.proto.RestaurantDeliveriesService, + deliveryEventsProducerService: PartialFunction[ + HttpRequest, + Future[HttpResponse]], pushedDroneEventsHandler: PartialFunction[ HttpRequest, Future[HttpResponse]])(implicit system: ActorSystem[_]): Unit = { @@ -27,7 +33,10 @@ object DroneDeliveriesServer { val service = ServiceHandler.concatOrNotFound( DroneOverviewServiceHandler.partial(droneOverviewService), - ServerReflection.partial(List(DroneOverviewService)), + RestaurantDeliveriesServiceHandler.partial(restaurantDeliveriesService), + ServerReflection.partial( + List(DroneOverviewService, RestaurantDeliveriesService)), + deliveryEventsProducerService, // FIXME not last once actually partial pushedDroneEventsHandler) diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala index b11d2339e..4266be108 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/Main.scala @@ -4,8 +4,10 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.SpawnProtocol import akka.management.cluster.bootstrap.ClusterBootstrap import akka.management.scaladsl.AkkaManagement -import central.drones.Drone -import central.drones.DroneOverviewServiceImpl +import central.deliveries.DeliveryEvents +import central.deliveries.RestaurantDeliveries +import central.deliveries.RestaurantDeliveriesServiceImpl +import central.drones.{Drone, DroneOverviewServiceImpl, LocalDroneEvents} import org.slf4j.LoggerFactory import scala.util.control.NonFatal @@ -27,11 +29,13 @@ object Main { } def init(implicit system: ActorSystem[SpawnProtocol.Command]): Unit = { + val settings = DeliveriesSettings(system) AkkaManagement(system).start() ClusterBootstrap(system).start() Drone.init(system) LocalDroneEvents.initPushedEventsConsumer(system) + RestaurantDeliveries.init(system) val interface = system.settings.config .getString("restaurant-drone-deliveries-service.grpc.interface") @@ -40,12 +44,18 @@ object Main { val pushedDroneEventsHandler = LocalDroneEvents.pushedEventsGrpcHandler(system) + val deliveryEventsProducerService = + DeliveryEvents.eventProducerService(system) val droneOverviewService = new DroneOverviewServiceImpl(system) + val restaurantDeliveriesService = + new RestaurantDeliveriesServiceImpl(system, settings) DroneDeliveriesServer.start( interface, port, droneOverviewService, + restaurantDeliveriesService, + deliveryEventsProducerService, pushedDroneEventsHandler) } diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/DeliveryEvents.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/DeliveryEvents.scala new file mode 100644 index 000000000..72da01e90 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/DeliveryEvents.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2009-2023 Lightbend Inc. + */ +package central.deliveries + +import akka.actor.typed.ActorSystem +import akka.http.scaladsl.model.HttpRequest +import akka.http.scaladsl.model.HttpResponse +import akka.persistence.query.typed.EventEnvelope +import akka.projection.grpc.producer.EventProducerSettings +import akka.projection.grpc.producer.scaladsl.EventProducer +import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation + +import scala.concurrent.Future + +object DeliveryEvents { + + def eventProducerService(system: ActorSystem[_]) + : PartialFunction[HttpRequest, Future[HttpResponse]] = { + val transformation = Transformation.empty + .registerAsyncEnvelopeMapper[ + RestaurantDeliveries.DeliveryRegistered, + proto.DeliveryRegistered](envelope => + Future.successful(Some(transformDeliveryRegistration(envelope)))) + // filter all other types of events for the RestaurantDeliveries + .registerOrElseMapper(_ => None) + + val eventProducerSource = EventProducer.EventProducerSource( + RestaurantDeliveries.EntityKey.name, + // Note: stream id used in consumer to consume this specific stream + "delivery-events", + transformation, + EventProducerSettings(system)) + + EventProducer.grpcServiceHandler(eventProducerSource)(system) + } + + private def transformDeliveryRegistration( + envelope: EventEnvelope[RestaurantDeliveries.DeliveryRegistered]) + : proto.DeliveryRegistered = { + val delivery = envelope.event.delivery + proto.DeliveryRegistered( + deliveryId = delivery.deliveryId, + origin = Some(delivery.origin.toProto), + destination = Some(delivery.destination.toProto)) + } + +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveries.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveries.scala new file mode 100644 index 000000000..bad4fc93a --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveries.scala @@ -0,0 +1,152 @@ +package central.deliveries + +import akka.Done +import akka.actor.typed.ActorRef +import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.cluster.sharding.typed.scaladsl.Entity +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.pattern.StatusReply +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.scaladsl.Effect +import akka.persistence.typed.scaladsl.EventSourcedBehavior +import central.CborSerializable +import central.Coordinates + +import java.time.Instant + +/** + * Keeps track of registered deliveries for per restaurant + */ +object RestaurantDeliveries { + + sealed trait Command extends CborSerializable + + final case class RegisterDelivery( + deliveryId: String, + destination: Coordinates, + replyTo: ActorRef[StatusReply[Done]]) + extends Command + + final case class SetUpRestaurant( + localControlLocationId: String, + restaurantLocation: Coordinates, + replyTo: ActorRef[StatusReply[Done]]) + extends Command + + final case class ListCurrentDeliveries(replyTo: ActorRef[Seq[Delivery]]) + extends Command + + sealed trait Event extends CborSerializable + + final case class DeliveryRegistered(delivery: Delivery) extends Event + final case class RestaurantLocationSet( + localControlLocationId: String, + coordinates: Coordinates) + extends Event + + private final case class State( + localControlLocationId: String, + restaurantLocation: Coordinates, + currentDeliveries: Vector[Delivery]) + final case class Delivery( + deliveryId: String, + // FIXME next two fields always the same for the same restaurant, annoying, + // but how else would we see them in downstream projection? + localControlLocationId: String, + origin: Coordinates, + destination: Coordinates, + timestamp: Instant) + + val EntityKey = EntityTypeKey[Command]("RestaurantDeliveries") + + def init(system: ActorSystem[_]): Unit = { + ClusterSharding(system).init(Entity(EntityKey)(entityContext => + RestaurantDeliveries(entityContext.entityId))) + } + + def apply(restaurantId: String): Behavior[Command] = + EventSourcedBehavior[Command, Event, Option[State]]( + PersistenceId(EntityKey.name, restaurantId), + None, + onCommand, + onEvent).withTaggerForState { + case (Some(state), _) => + // tag events with location id as topic, grpc projection filters makes sure only that location + // picks them up for drone delivery + Set("t:" + state.localControlLocationId) + case _ => Set.empty + } + + private def onCommand( + state: Option[State], + command: Command): Effect[Event, Option[State]] = + state match { + case None => onCommandNoState(command) + case Some(state) => onCommandInitialized(state, command) + } + + private def onCommandNoState(command: Command): Effect[Event, Option[State]] = + command match { + case RegisterDelivery(_, _, replyTo) => + Effect.reply(replyTo)( + StatusReply.Error( + "Restaurant not yet initialized, cannot accept registrations")) + case ListCurrentDeliveries(replyTo) => + Effect.reply(replyTo)(Vector.empty) + case SetUpRestaurant(locationId, coordinates, replyTo) => + Effect + .persist(RestaurantLocationSet(locationId, coordinates)) + .thenReply(replyTo)(_ => StatusReply.Ack) + } + + private def onCommandInitialized( + state: State, + command: Command): Effect[Event, Option[State]] = { + command match { + case RegisterDelivery(deliveryId, destination, replyTo) => + state.currentDeliveries.find(_.deliveryId == deliveryId) match { + case Some(existing) if existing.destination == destination => + // already registered + Effect.reply(replyTo)(StatusReply.Ack) + case Some(_) => + Effect.reply(replyTo)( + StatusReply.Error("Delivery id exists but for other destination")) + case None => + Effect + .persist( + DeliveryRegistered( + Delivery( + deliveryId, + state.localControlLocationId, + state.restaurantLocation, + destination, + Instant.now()))) + .thenReply(replyTo)(_ => StatusReply.Ack) + } + case ListCurrentDeliveries(replyTo) => + Effect.reply(replyTo)(state.currentDeliveries) + case setup: SetUpRestaurant => + Effect.reply(setup.replyTo)( + StatusReply.Error("Changing restaurant location not supported")) + } + } + + private def onEvent(state: Option[State], event: Event): Option[State] = + (state, event) match { + case (Some(state), DeliveryRegistered(delivery)) => + Some( + state.copy(currentDeliveries = state.currentDeliveries :+ delivery)) + case (None, RestaurantLocationSet(localControlLocationId, location)) => + // initial setup of location + Some( + State( + restaurantLocation = location, + localControlLocationId = localControlLocationId, + currentDeliveries = Vector.empty)) + case _ => + throw new RuntimeException("Unexpected event/state combination") + } + +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveriesServiceImpl.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveriesServiceImpl.scala new file mode 100644 index 000000000..6ca4eca76 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/deliveries/RestaurantDeliveriesServiceImpl.scala @@ -0,0 +1,95 @@ +package central.deliveries + +import akka.actor.typed.ActorSystem +import akka.cluster.sharding.typed.scaladsl.ClusterSharding +import akka.grpc.GrpcServiceException +import akka.pattern.StatusReply +import akka.util.Timeout +import central.{ Coordinates, DeliveriesSettings } +import io.grpc.Status +import org.slf4j.LoggerFactory + +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.jdk.DurationConverters.JavaDurationOps + +class RestaurantDeliveriesServiceImpl( + system: ActorSystem[_], + settings: DeliveriesSettings) + extends proto.RestaurantDeliveriesService { + + private val logger = LoggerFactory.getLogger(getClass) + private val sharding = ClusterSharding(system) + + private implicit val ec: ExecutionContext = system.executionContext + private implicit val timeout: Timeout = system.settings.config + .getDuration( + "restaurant-drone-deliveries-service.restaurant-deliveries-ask-timeout") + .toScala + + override def setUpRestaurant(in: proto.SetUpRestaurantRequest) + : Future[proto.RegisterRestaurantResponse] = { + logger.info( + "Set up restaurant {}, coordinates {}, location [{}]", + in.restaurantId, + in.coordinates, + in.localControlLocationId) + + if (!settings.locationIds.contains(in.localControlLocationId)) { + throw new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription( + s"The local control location id ${in.localControlLocationId} is not known to the service")) + } + + val entityRef = + sharding.entityRefFor(RestaurantDeliveries.EntityKey, in.restaurantId) + + val coordinates = toCoordinates(in.coordinates) + val reply = + entityRef.ask( + RestaurantDeliveries + .SetUpRestaurant(in.localControlLocationId, coordinates, _)) + + reply.map { + case StatusReply.Error(error) => + throw new GrpcServiceException( + Status.INTERNAL.withDescription(error.getMessage)) + case _ => + proto.RegisterRestaurantResponse() + } + } + + override def registerDelivery(in: proto.RegisterDeliveryRequest) + : Future[proto.RegisterDeliveryResponse] = { + logger.info( + "Register delivery for restaurant {}, delivery id {}, destination {}", + in.restaurantId, + in.deliveryId, + in.coordinates.get) + + val entityRef = + sharding.entityRefFor(RestaurantDeliveries.EntityKey, in.restaurantId) + + val destination = toCoordinates(in.coordinates) + + val reply = entityRef.ask( + RestaurantDeliveries.RegisterDelivery(in.deliveryId, destination, _)) + + reply.map { + case StatusReply.Error(error) => + throw new GrpcServiceException( + Status.INTERNAL.withDescription(error.getMessage)) + case _ => proto.RegisterDeliveryResponse() + } + + } + + private def toCoordinates( + protoCoordinates: Option[common.proto.Coordinates]): Coordinates = + protoCoordinates match { + case Some(pc) => Coordinates.fromProto(pc) + case None => + throw new GrpcServiceException( + Status.INVALID_ARGUMENT.withDescription("Missing coordinates")) + + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala index 3ad129c9b..2d143dcb4 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/Drone.scala @@ -3,17 +3,14 @@ package central.drones import akka.Done import akka.actor.typed.scaladsl.ActorContext import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.{ ActorRef, ActorSystem, Behavior } -import akka.cluster.sharding.typed.scaladsl.{ - ClusterSharding, - Entity, - EntityTypeKey -} +import akka.actor.typed.{ActorRef, ActorSystem, Behavior} +import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey} import akka.pattern.StatusReply import akka.persistence.r2dbc.state.scaladsl.AdditionalColumn import akka.persistence.typed.PersistenceId -import akka.persistence.typed.state.scaladsl.{ DurableStateBehavior, Effect } +import akka.persistence.typed.state.scaladsl.{DurableStateBehavior, Effect} import central.CborSerializable +import central.CoarseGrainedCoordinates import java.time.Instant diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/DroneOverviewServiceImpl.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/DroneOverviewServiceImpl.scala index 054cd6014..dba6abf67 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/DroneOverviewServiceImpl.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/DroneOverviewServiceImpl.scala @@ -7,7 +7,11 @@ import akka.persistence.r2dbc.session.scaladsl.R2dbcSession import akka.persistence.typed.PersistenceId import akka.serialization.SerializationExtension import akka.util.Timeout -import central.drones.proto.{DroneOverviewService, GetDroneOverviewRequest, GetDroneOverviewResponse} +import central.drones.proto.{ + DroneOverviewService, + GetDroneOverviewRequest, + GetDroneOverviewResponse +} import io.grpc.Status import org.slf4j.LoggerFactory @@ -21,7 +25,9 @@ class DroneOverviewServiceImpl(system: ActorSystem[_]) private val logger = LoggerFactory.getLogger(getClass) private implicit val ec: ExecutionContext = system.executionContext - private implicit val timeout: Timeout = system.settings.config.getDuration("restaurant-drone-deliveries-service.drone-ask-timeout").toScala + private implicit val timeout: Timeout = system.settings.config + .getDuration("restaurant-drone-deliveries-service.drone-ask-timeout") + .toScala private val serialization = SerializationExtension(system) private val sharding = ClusterSharding(system) @@ -39,15 +45,18 @@ class DroneOverviewServiceImpl(system: ActorSystem[_]) .select( session.createStatement(findByLocationSql).bind(0, in.location)) { row => - val serializerId = row.get("state_ser_id", classOf[java.lang.Integer]) - val serializerManifest = row.get("state_ser_manifest", classOf[String]) + val serializerId = + row.get("state_ser_id", classOf[java.lang.Integer]) + val serializerManifest = + row.get("state_ser_manifest", classOf[String]) val payload = row.get("state_payload", classOf[Array[Byte]]) val state = serialization .deserialize(payload, serializerId, serializerManifest) .get .asInstanceOf[Drone.State] - val droneId = PersistenceId.extractEntityId(row.get("persistence_id", classOf[String])) + val droneId = PersistenceId.extractEntityId( + row.get("persistence_id", classOf[String])) state.currentLocation.map(coordinates => (droneId, coordinates)) } .map { maybeLocations => @@ -56,19 +65,24 @@ class DroneOverviewServiceImpl(system: ActorSystem[_]) if (locations.isEmpty) throw new GrpcServiceException(Status.NOT_FOUND) else { - val byLocation = locations.groupMap { case (_, coarse) => coarse } { case (droneId, _) => droneId } + val byLocation = locations.groupMap { case (_, coarse) => coarse } { + case (droneId, _) => droneId + } - proto.CoarseDroneLocationsResponse( - byLocation.map { case (location, entries) => - proto - .CoarseDroneLocations(location.latitude, location.longitude, entries) - }.toVector) + proto.CoarseDroneLocationsResponse(byLocation.map { + case (location, entries) => + proto.CoarseDroneLocations( + Some(common.proto + .Coordinates(location.latitude, location.longitude)), + entries) + }.toVector) } } } } - override def getDroneOverview(in: GetDroneOverviewRequest): Future[GetDroneOverviewResponse] = { + override def getDroneOverview( + in: GetDroneOverviewRequest): Future[GetDroneOverviewResponse] = { // query against additional columns for drone logger.info("Get drone overview for drone {}", in.droneId) @@ -80,7 +94,7 @@ class DroneOverviewServiceImpl(system: ActorSystem[_]) GetDroneOverviewResponse( locationName = state.locationName, coarseLatitude = state.currentLocation.map(_.latitude).getOrElse(0.0), - coarseLongitude = state.currentLocation.map(_.longitude).getOrElse(0.0), - )) + coarseLongitude = + state.currentLocation.map(_.longitude).getOrElse(0.0))) } } diff --git a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/LocalDroneEvents.scala b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala similarity index 88% rename from samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/LocalDroneEvents.scala rename to samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala index 5c870c0d4..1649a8e52 100644 --- a/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/LocalDroneEvents.scala +++ b/samples/grpc/restaurant-drone-deliveries-service-scala/src/main/scala/central/drones/LocalDroneEvents.scala @@ -1,4 +1,4 @@ -package central +package central.drones import akka.actor.typed.ActorSystem import akka.cluster.sharding.typed.scaladsl.{ @@ -10,20 +10,20 @@ import akka.persistence.query.Offset import akka.persistence.query.typed.EventEnvelope import akka.persistence.r2dbc.query.scaladsl.R2dbcReadJournal import akka.persistence.typed.PersistenceId -import akka.projection.{ Projection, ProjectionBehavior, ProjectionId } import akka.projection.eventsourced.scaladsl.EventSourcedProvider import akka.projection.grpc.consumer.scaladsl.EventProducerPushDestination import akka.projection.r2dbc.scaladsl.R2dbcProjection import akka.projection.scaladsl.{ Handler, SourceProvider } +import akka.projection.{ Projection, ProjectionBehavior, ProjectionId } import akka.util.Timeout +import central.CoarseGrainedCoordinates import central.Main.logger -import central.drones.{ CoarseGrainedCoordinates, Drone } import scala.concurrent.Future import scala.jdk.DurationConverters.JavaDurationOps /** - * Handle drone events pushed by the local drone control systems. + * Handle aggregate drone events pushed by the local drone control systems. */ object LocalDroneEvents { @@ -87,13 +87,14 @@ object LocalDroneEvents { // Drone id without producer entity key val droneId = PersistenceId.extractEntityId(envelope.persistenceId) + + // same drone but different entity type (our Drone representation) val entityRef = sharding.entityRefFor(Drone.EntityKey, droneId) - // FIXME we are getting - // java.lang.ClassCastException: class akka.persistence.FilteredPayload$ cannot be cast to class local.drones.Drone$Event (akka.persistence.FilteredPayload$ and local.drones.Drone$Event are in unnamed module of loader 'app') - // here + envelope.event match { - case local.drones.proto - .CoarseDroneLocation(droneId, latitude, longitude, _) => + case local.drones.proto.CoarseDroneLocation(coordinates, _) => + // we have encoded origin in a tag, extract it + // FIXME could there be a more automatic place where origin is available (envelope.source?) val originName = envelope.tags .find(_.startsWith("location:")) .get @@ -101,7 +102,7 @@ object LocalDroneEvents { entityRef.askWithStatus( Drone.UpdateLocation( originName, - CoarseGrainedCoordinates(latitude, longitude), + CoarseGrainedCoordinates.fromProto(coordinates.get), _)) case unknown => throw new RuntimeException(