diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml index 4ab3c8cba..c62a70060 100644 --- a/.github/workflows/checks.yml +++ b/.github/workflows/checks.yml @@ -190,3 +190,8 @@ jobs: run: |- cd samples/grpc/local-drone-control-java mvn compile -nsu -Dakka-projection.version=`cat ~/.version` + + - name: Compile Java Projection gRPC Restaurant Drone Deliveries sample + run: |- + cd samples/grpc/restaurant-drone-deliveries-service-java + mvn compile -nsu -Dakka-projection.version=`cat ~/.version` diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/LICENSE b/samples/grpc/restaurant-drone-deliveries-service-java/LICENSE new file mode 100644 index 000000000..4239f09e0 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/LICENSE @@ -0,0 +1,10 @@ +Akka sample by Lightbend + +Licensed under Public Domain (CC0) + +To the extent possible under law, the person who associated CC0 with +this Template has waived all copyright and related or neighboring +rights to this Template. + +You should have received a copy of the CC0 legalcode along with this +work. If not, see . diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/README.md b/samples/grpc/restaurant-drone-deliveries-service-java/README.md new file mode 100644 index 000000000..20fb90788 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/README.md @@ -0,0 +1,81 @@ +# Restaurant Drone Deliveries Service + +The sample show-cases a service for drones doing restaurant deliveries. + +It is intended to be used together with the local-drone-control sample. + +* Keeps track of a coarse grained location of each drone to the cloud +* FIXME Accepts restaurant delivery requests which are then fed to the right local drone control + +## Running the sample code + +1. Start a local PostgresSQL server on default port 5432. The included `docker-compose.yml` starts everything required for running locally. + + ```shell + docker compose up --wait + + # creates the tables needed for Akka Persistence + # as well as the offset store table for Akka Projection + docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql + ``` + +2. Start a first node: + +```shell +mvn compile exec:exec -DAPP_CONFIG=local1.conf +``` + +3. (Optional) Start another node with different ports: + +```shell +mvn compile exec:exec -DAPP_CONFIG=local2.conf +``` + +4. (Optional) More can be started: + +```shell +mvn compile exec:exec -DAPP_CONFIG=local3.conf +``` + +5. Check for service readiness + + ```shell +curl http://localhost:9101/ready + ``` + + +Query a location for drone coordinates + +```shell +grpcurl -d '{"location":"sweden/stockholm/kungsholmen"}' -plaintext localhost:8101 central.drones.DroneOverviewService/GetCoarseDroneLocations +``` + +Query the state for a specific drone + +```shell +grpcurl -d '{"drone_id":"drone1"}' -plaintext localhost:8101 central.drones.DroneOverviewService.GetDroneOverview +``` + +Set up a restaurant + +```shell +grpcurl -d '{"restaurant_id":"restaurant1","coordinates":{"latitude": 59.330324, "longitude": 18.039568}, "local_control_location_id": "sweden/stockholm/kungsholmen" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant +``` + +Set up another restaurant, closest to a different local drone control + +```shell +grpcurl -d '{"restaurant_id":"restaurant2","coordinates":{"latitude": 59.342046, "longitude": 18.059095}, "local_control_location_id": "sweden/stockholm/norrmalm" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant +``` + +Register a delivery for the first restaurant + +```shell +grpcurl -d '{"restaurant_id":"restaurant1","delivery_id": "order1","coordinates":{"latitude": 59.330841, "longitude": 18.038885}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery +``` + +Register a delivery for the second restaurant + +```shell +grpcurl -d '{"restaurant_id":"restaurant2","delivery_id": "order2","coordinates":{"latitude": 59.340128, "longitude": 18.056303}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery +``` diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/create_tables.sql b/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/create_tables.sql new file mode 100644 index 000000000..94c2e71cc --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/create_tables.sql @@ -0,0 +1,85 @@ +CREATE TABLE IF NOT EXISTS event_journal( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + db_timestamp timestamp with time zone NOT NULL, + + event_ser_id INTEGER NOT NULL, + event_ser_manifest VARCHAR(255) NOT NULL, + event_payload BYTEA NOT NULL, + + deleted BOOLEAN DEFAULT FALSE NOT NULL, + writer VARCHAR(255) NOT NULL, + adapter_manifest VARCHAR(255), + tags TEXT ARRAY, + + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BYTEA, + + PRIMARY KEY(persistence_id, seq_nr) +); + +CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice, entity_type, db_timestamp, seq_nr); + +CREATE TABLE IF NOT EXISTS snapshot( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + write_timestamp BIGINT NOT NULL, + ser_id INTEGER NOT NULL, + ser_manifest VARCHAR(255) NOT NULL, + snapshot BYTEA NOT NULL, + meta_ser_id INTEGER, + meta_ser_manifest VARCHAR(255), + meta_payload BYTEA, + + PRIMARY KEY(persistence_id) +); + +CREATE TABLE IF NOT EXISTS durable_state ( + slice INT NOT NULL, + entity_type VARCHAR(255) NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + revision BIGINT NOT NULL, + db_timestamp timestamp with time zone NOT NULL, + + state_ser_id INTEGER NOT NULL, + state_ser_manifest VARCHAR(255), + state_payload BYTEA NOT NULL, + tags TEXT ARRAY, + + -- additional column + location VARCHAR(255), + + PRIMARY KEY(persistence_id, revision) +); + +-- to query drones by location +CREATE INDEX IF NOT EXISTS durable_state_drone_location_idx ON durable_state(location); + + +-- Timestamp based offsets are stored in this table. +CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + slice INT NOT NULL, + persistence_id VARCHAR(255) NOT NULL, + seq_nr BIGINT NOT NULL, + -- timestamp_offset is the db_timestamp of the original event + timestamp_offset timestamp with time zone NOT NULL, + -- timestamp_consumed is when the offset was stored + -- the consumer lag is timestamp_consumed - timestamp_offset + timestamp_consumed timestamp with time zone NOT NULL, + PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr) +); + +CREATE TABLE IF NOT EXISTS akka_projection_management ( + projection_name VARCHAR(255) NOT NULL, + projection_key VARCHAR(255) NOT NULL, + paused BOOLEAN NOT NULL, + last_updated BIGINT NOT NULL, + PRIMARY KEY(projection_name, projection_key) +); \ No newline at end of file diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/drop_tables.sql b/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/drop_tables.sql new file mode 100644 index 000000000..c71bc7ae2 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/drop_tables.sql @@ -0,0 +1,5 @@ +DROP TABLE event_journal; +DROP TABLE snapshot; +DROP TABLE durable_state; +DROP TABLE akka_projection_timestamp_offset_store; +DROP TABLE akka_projection_management; \ No newline at end of file diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/docker-compose.yml b/samples/grpc/restaurant-drone-deliveries-service-java/docker-compose.yml new file mode 100644 index 000000000..366c5a10a --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/docker-compose.yml @@ -0,0 +1,16 @@ +version: '2.2' +services: + postgres-db: + image: postgres:latest + container_name: postgres_db + ports: + - 5432:5432 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + healthcheck: + test: ['CMD', 'pg_isready', "-q", "-d", "postgres", "-U", "postgres"] + interval: 5s + retries: 5 + start_period: 5s + timeout: 5s diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml b/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml new file mode 100644 index 000000000..3b964ebeb --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml @@ -0,0 +1,279 @@ + + + + 4.0.0 + shopping-cart-service + com.lightbend.akka.samples + 1.0 + + + + Public Domain (CC0) + http://creativecommons.org/publicdomain/zero/1.0/ + + + + + UTF-8 + 2.8.4 + 1.5.0-M3 + 1.2.0-M3 + 1.4.0 + 2.0.1 + 2.3.3 + 2.3.3 + 1.3.6 + 4.13.1 + 2.13 + + application.conf + + ${git.commit.time}-${git.commit.id.abbrev} + + + + + com.typesafe.akka + akka-cluster-typed_${scala.binary.version} + ${akka.version} + + + com.typesafe.akka + akka-cluster-sharding-typed_${scala.binary.version} + ${akka.version} + + + com.typesafe.akka + akka-persistence-typed_${scala.binary.version} + ${akka.version} + + + com.typesafe.akka + akka-persistence-query_${scala.binary.version} + ${akka.version} + + + com.typesafe.akka + akka-serialization-jackson_${scala.binary.version} + ${akka.version} + + + com.typesafe.akka + akka-cluster-tools_${scala.binary.version} + ${akka.version} + + + com.typesafe.akka + akka-discovery_${scala.binary.version} + ${akka.version} + + + com.lightbend.akka + akka-persistence-r2dbc_${scala.binary.version} + ${akka-persistence-r2dbc.version} + + + com.lightbend.akka + akka-projection-r2dbc_${scala.binary.version} + ${akka-projection.version} + + + com.lightbend.akka + akka-projection-grpc_${scala.binary.version} + ${akka-projection.version} + + + com.lightbend.akka + akka-projection-eventsourced_${scala.binary.version} + ${akka-projection.version} + + + com.lightbend.akka.management + akka-management-cluster-http_${scala.binary.version} + ${akka-management.version} + + + com.lightbend.akka.management + akka-management-cluster-bootstrap_${scala.binary.version} + ${akka-management.version} + + + com.lightbend.akka.discovery + akka-discovery-kubernetes-api_${scala.binary.version} + ${akka-management.version} + + + com.lightbend.akka.grpc + akka-grpc-runtime_${scala.binary.version} + ${akka-grpc.version} + + + com.lightbend.akka + akka-diagnostics_${scala.binary.version} + ${akka-diagnostics.version} + + + + ch.qos.logback + logback-classic + ${logback.version} + + + + com.typesafe.akka + akka-actor-testkit-typed_${scala.binary.version} + ${akka.version} + test + + + com.typesafe.akka + akka-persistence-testkit_${scala.binary.version} + ${akka.version} + test + + + com.lightbend.akka + akka-projection-testkit_${scala.binary.version} + ${akka-projection.version} + test + + + com.typesafe.akka + akka-stream-testkit_${scala.binary.version} + ${akka.version} + test + + + + junit + junit + ${junit.version} + test + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 11 + 11 + + -Xlint:unchecked + -Xlint:deprecation + -parameters + + + + + org.codehaus.mojo + exec-maven-plugin + 3.0.0 + + java + + -Djava.library.path=target/lib + -Dconfig.resource=${APP_CONFIG} + -classpath + + central.Main + + + + + + com.lightbend.akka.grpc + akka-grpc-maven-plugin + ${akka-grpc-maven-plugin.version} + + Java + + + + + generate + + + + + + com.diffplug.spotless + spotless-maven-plugin + 2.35.0 + + + + 1.11.0 + + + + + + + pl.project13.maven + git-commit-id-plugin + 4.0.0 + + + validate + + revision + + + + + yyyyMMdd-HHmmss + + ${project.basedir}/.git + false + false + + + + io.fabric8 + docker-maven-plugin + 0.35.0 + + + + %a + + docker.io/library/eclipse-temurin:17.0.3_7-jre-jammy + + ${version.number} + + + + java + -cp + /maven/* + central.Main + + + + artifact-with-dependencies + + + + + + + + build-docker-image + package + + build + + + + + + + diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/CborSerializable.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/CborSerializable.java new file mode 100644 index 000000000..e3c7ac8b8 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/CborSerializable.java @@ -0,0 +1,7 @@ +package central; + +/** + * Marker trait for serialization with Jackson CBOR. Enabled in serialization.conf + * `akka.actor.serialization-bindings` (via application.conf). + */ +public interface CborSerializable {} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/CoarseGrainedCoordinates.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/CoarseGrainedCoordinates.java new file mode 100644 index 000000000..15c90b5e8 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/CoarseGrainedCoordinates.java @@ -0,0 +1,60 @@ +package central; + +// common location representations, could be a shared library between local control and restaurant +// but to keep project structure simple we duplicate + +public final class CoarseGrainedCoordinates { + public final double latitude; + public final double longitude; + + public CoarseGrainedCoordinates(double latitude, double longitude) { + this.latitude = latitude; + this.longitude = longitude; + } + + public static CoarseGrainedCoordinates fromCoordinates(Coordinates location) { + // not entirely correct, but good enough for a sample/demo + // 435-1020m precision depending on place on earth + return new CoarseGrainedCoordinates( + Math.floor(location.latitude * 100 + 0.5d) / 100, + Math.floor(location.longitude * 100 + 0.5d) / 100); + } + + public common.proto.Coordinates toProto() { + return common.proto.Coordinates.newBuilder() + .setLatitude(latitude) + .setLongitude(longitude) + .build(); + } + + public static CoarseGrainedCoordinates fromProto(common.proto.Coordinates pc) { + return new CoarseGrainedCoordinates(pc.getLatitude(), pc.getLongitude()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CoarseGrainedCoordinates that = (CoarseGrainedCoordinates) o; + + if (Double.compare(latitude, that.latitude) != 0) return false; + return Double.compare(longitude, that.longitude) == 0; + } + + @Override + public int hashCode() { + int result; + long temp; + temp = Double.doubleToLongBits(latitude); + result = (int) (temp ^ (temp >>> 32)); + temp = Double.doubleToLongBits(longitude); + result = 31 * result + (int) (temp ^ (temp >>> 32)); + return result; + } + + @Override + public String toString() { + return "CoarseGrainedCoordinates{" + "latitude=" + latitude + ", longitude=" + longitude + '}'; + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Coordinates.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Coordinates.java new file mode 100644 index 000000000..cc1c2e52c --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Coordinates.java @@ -0,0 +1,71 @@ +package central; + +// common location representations, could be a shared library between local control and restaurant +// but to keep project structure simple we duplicate + +/** Decimal degree coordinates */ +public final class Coordinates { + + private static final int ROUGHLY_RADIUS_OF_EARTH_IN_M = 6371000; + public final double latitude; + public final double longitude; + + public Coordinates(double latitude, double longitude) { + this.latitude = latitude; + this.longitude = longitude; + } + + public long distanceTo(Coordinates other) { + // using the haversine formula https://en.wikipedia.org/wiki/Versine#hav + var latitudeDistance = Math.toRadians(latitude - other.latitude); + var longitudeDistance = Math.toRadians(longitude - other.longitude); + var sinLatitude = Math.sin(latitudeDistance / 2); + var sinLongitude = Math.sin(longitudeDistance / 2); + var a = + sinLatitude * sinLatitude + + (Math.cos(Math.toRadians(latitude)) + * Math.cos(Math.toRadians(other.latitude)) + * sinLongitude + * sinLongitude); + var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a)); + return (long) (ROUGHLY_RADIUS_OF_EARTH_IN_M * c); + } + + public common.proto.Coordinates toProto() { + return common.proto.Coordinates.newBuilder() + .setLatitude(latitude) + .setLongitude(longitude) + .build(); + } + + public static Coordinates fromProto(common.proto.Coordinates pc) { + return new Coordinates(pc.getLatitude(), pc.getLongitude()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Coordinates that = (Coordinates) o; + + if (Double.compare(latitude, that.latitude) != 0) return false; + return Double.compare(longitude, that.longitude) == 0; + } + + @Override + public int hashCode() { + int result; + long temp; + temp = Double.doubleToLongBits(latitude); + result = (int) (temp ^ (temp >>> 32)); + temp = Double.doubleToLongBits(longitude); + result = 31 * result + (int) (temp ^ (temp >>> 32)); + return result; + } + + @Override + public String toString() { + return "Coordinates{" + "latitude=" + latitude + ", longitude=" + longitude + '}'; + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DeliveriesSettings.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DeliveriesSettings.java new file mode 100644 index 000000000..31bb24fc6 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DeliveriesSettings.java @@ -0,0 +1,21 @@ +package central; + +import akka.actor.typed.ActorSystem; +import java.util.HashSet; +import java.util.Set; + +public final class DeliveriesSettings { + public final Set locationIds; + + public DeliveriesSettings(Set locationIds) { + this.locationIds = locationIds; + } + + public static DeliveriesSettings create(ActorSystem system) { + var config = system.settings().config().getConfig("restaurant-drone-deliveries-service"); + var locationIds = new HashSet<>(config.getStringList("local-drone-control.locations")); + + // FIXME move timeouts here as well + return new DeliveriesSettings(locationIds); + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java new file mode 100644 index 000000000..9eca73d93 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java @@ -0,0 +1,61 @@ +package central; + +import akka.actor.typed.ActorSystem; +import akka.grpc.javadsl.ServerReflection; +import akka.grpc.javadsl.ServiceHandler; +import akka.http.javadsl.Http; +import akka.http.javadsl.model.HttpRequest; +import akka.http.javadsl.model.HttpResponse; +import akka.japi.function.Function; +import central.deliveries.proto.RestaurantDeliveriesService; +import central.deliveries.proto.RestaurantDeliveriesServiceHandlerFactory; +import central.drones.proto.DroneOverviewService; +import central.drones.proto.DroneOverviewServiceHandlerFactory; +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletionStage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class DroneDeliveriesServer { + + private static final Logger logger = LoggerFactory.getLogger(DroneDeliveriesServer.class); + + public static void start( + ActorSystem system, + String host, + int port, + central.drones.proto.DroneOverviewService droneOverviewService, + central.deliveries.proto.RestaurantDeliveriesService restaurantDeliveriesService, + Function> deliveryEventsProducerService, + Function> pushedDroneEventsHandler) { + + @SuppressWarnings("unchecked") + var service = + ServiceHandler.concatOrNotFound( + DroneOverviewServiceHandlerFactory.create(droneOverviewService, system), + RestaurantDeliveriesServiceHandlerFactory.create(restaurantDeliveriesService, system), + ServerReflection.create( + Arrays.asList( + DroneOverviewService.description, RestaurantDeliveriesService.description), + system), + deliveryEventsProducerService, + // FIXME not last once actually partial + pushedDroneEventsHandler); + + var bound = Http.get(system).newServerAt(host, port).bind(service); + bound.whenComplete( + (binding, error) -> { + if (error == null) { + logger.info( + "Drone event consumer listening at: {}:{}", + binding.localAddress().getHostString(), + binding.localAddress().getPort()); + binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system); + } else { + logger.error("Failed to bind gRPC endpoint, terminating system", error); + system.terminate(); + } + }); + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Main.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Main.java new file mode 100644 index 000000000..af7cd086a --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Main.java @@ -0,0 +1,57 @@ +package central; + +import akka.actor.typed.ActorSystem; +import akka.actor.typed.SpawnProtocol; +import akka.management.cluster.bootstrap.ClusterBootstrap; +import akka.management.javadsl.AkkaManagement; +import central.deliveries.DeliveryEvents; +import central.deliveries.RestaurantDeliveries; +import central.deliveries.RestaurantDeliveriesServiceImpl; +import central.drones.Drone; +import central.drones.DroneOverviewServiceImpl; +import central.drones.LocalDroneEvents; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class Main { + + private static final Logger logger = LoggerFactory.getLogger(Main.class); + + public static void main(String[] args) { + var system = ActorSystem.create(SpawnProtocol.create(), "deliveries"); + try { + init(system); + } catch (Throwable e) { + logger.error("Terminating due to initialization failure.", e); + system.terminate(); + } + } + + private static void init(ActorSystem system) { + var settings = DeliveriesSettings.create(system); + AkkaManagement.get(system).start(); + ClusterBootstrap.get(system).start(); + + Drone.init(system); + LocalDroneEvents.initPushedEventsConsumer(system); + RestaurantDeliveries.init(system); + + var host = + system.settings().config().getString("restaurant-drone-deliveries-service.grpc.interface"); + var port = system.settings().config().getInt("restaurant-drone-deliveries-service.grpc.port"); + + var pushedDroneEventsHandler = LocalDroneEvents.pushedEventsGrpcHandler(system); + var deliveryEventsProducerService = DeliveryEvents.eventProducerService(system); + var droneOverviewService = new DroneOverviewServiceImpl(system); + var restaurantDeliveriesService = new RestaurantDeliveriesServiceImpl(system, settings); + + DroneDeliveriesServer.start( + system, + host, + port, + droneOverviewService, + restaurantDeliveriesService, + deliveryEventsProducerService, + pushedDroneEventsHandler); + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/DeliveryEvents.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/DeliveryEvents.java new file mode 100644 index 000000000..b32ad4135 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/DeliveryEvents.java @@ -0,0 +1,53 @@ +package central.deliveries; + +import akka.actor.typed.ActorSystem; +import akka.http.javadsl.model.HttpRequest; +import akka.http.javadsl.model.HttpResponse; +import akka.japi.function.Function; +import akka.persistence.query.typed.EventEnvelope; +import akka.projection.grpc.producer.EventProducerSettings; +import akka.projection.grpc.producer.javadsl.EventProducer; +import akka.projection.grpc.producer.javadsl.EventProducerSource; +import akka.projection.grpc.producer.javadsl.Transformation; +import central.deliveries.proto.DeliveryRegistered; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; + +public final class DeliveryEvents { + + // Note: stream id used in consumer to consume this specific stream + public static final String STREAM_ID = "delivery-events"; + + public static Function> eventProducerService( + ActorSystem system) { + var transformation = + Transformation.empty() + .registerAsyncEnvelopeMapper( + RestaurantDeliveries.DeliveryRegistered.class, + DeliveryEvents::transformDeliveryRegistration) + // exclude all other types of events for the RestaurantDeliveries + .registerOrElseMapper(envelope -> Optional.empty()); + + var eventProducerSource = + new EventProducerSource( + RestaurantDeliveries.ENTITY_KEY.name(), + STREAM_ID, + transformation, + EventProducerSettings.create(system)); + + return EventProducer.grpcServiceHandler(system, eventProducerSource); + } + + private static CompletionStage> transformDeliveryRegistration( + EventEnvelope envelope) { + var delivery = envelope.event().delivery; + return CompletableFuture.completedFuture( + Optional.of( + central.deliveries.proto.DeliveryRegistered.newBuilder() + .setDeliveryId(delivery.deliveryId) + .setOrigin(delivery.origin.toProto()) + .setDestination(delivery.destination.toProto()) + .build())); + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java new file mode 100644 index 000000000..b81aac8a8 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java @@ -0,0 +1,243 @@ +package central.deliveries; + +import akka.Done; +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.Entity; +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.pattern.StatusReply; +import akka.persistence.typed.PersistenceId; +import akka.persistence.typed.javadsl.CommandHandler; +import akka.persistence.typed.javadsl.Effect; +import akka.persistence.typed.javadsl.EventHandler; +import akka.persistence.typed.javadsl.EventSourcedBehavior; +import central.CborSerializable; +import central.Coordinates; +import com.fasterxml.jackson.annotation.JsonCreator; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; + +public final class RestaurantDeliveries + extends EventSourcedBehavior< + RestaurantDeliveries.Command, RestaurantDeliveries.Event, RestaurantDeliveries.State> { + public interface Command extends CborSerializable {} + + public static final class SetUpRestaurant implements Command { + public final String localControlLocationId; + public final Coordinates restaurantLocation; + public final ActorRef> replyTo; + + public SetUpRestaurant( + String localControlLocationId, + Coordinates restaurantLocation, + ActorRef> replyTo) { + this.localControlLocationId = localControlLocationId; + this.restaurantLocation = restaurantLocation; + this.replyTo = replyTo; + } + } + + public static final class RegisterDelivery implements Command { + public final String deliveryId; + public final Coordinates destination; + public final ActorRef> replyTo; + + public RegisterDelivery( + String deliveryId, Coordinates destination, ActorRef> replyTo) { + this.deliveryId = deliveryId; + this.destination = destination; + this.replyTo = replyTo; + } + } + + public static final class ListCurrentDeliveries implements Command { + public final ActorRef> replyTo; + + @JsonCreator + public ListCurrentDeliveries(ActorRef> replyTo) { + this.replyTo = replyTo; + } + } + + public interface Event extends CborSerializable {} + + public static final class RestaurantLocationSet implements Event { + public final String localControlLocationId; + public final Coordinates coordinates; + + public RestaurantLocationSet(String localControlLocationId, Coordinates coordinates) { + this.localControlLocationId = localControlLocationId; + this.coordinates = coordinates; + } + } + + public static final class DeliveryRegistered implements Event { + public final Delivery delivery; + + @JsonCreator + public DeliveryRegistered(Delivery delivery) { + this.delivery = delivery; + } + } + + public static final class State implements CborSerializable { + public final String localControlLocationId; + public final Coordinates restaurantLocation; + public final List currentDeliveries; + + public State( + String localControlLocationId, + Coordinates restaurantLocation, + List currentDeliveries) { + this.localControlLocationId = localControlLocationId; + this.restaurantLocation = restaurantLocation; + this.currentDeliveries = currentDeliveries; + } + } + + public static final class Delivery { + public final String deliveryId; + // The following two fields always the same for the same restaurant, so that they can be seen in + // the downstream projection. + public final String localControlLocationId; + public final Coordinates origin; + public final Coordinates destination; + public final Instant timestamp; + + public Delivery( + String deliveryId, + String localControlLocationId, + Coordinates origin, + Coordinates destination, + Instant timestamp) { + this.deliveryId = deliveryId; + this.localControlLocationId = localControlLocationId; + this.origin = origin; + this.destination = destination; + this.timestamp = timestamp; + } + } + + public static final EntityTypeKey ENTITY_KEY = + EntityTypeKey.create(Command.class, "RestaurantDeliveries"); + + public static void init(ActorSystem system) { + ClusterSharding.get(system) + .init( + Entity.of( + ENTITY_KEY, + entityContext -> new RestaurantDeliveries(entityContext.getEntityId()))); + } + + @Override + public Set tagsFor(State state, Event event) { + if (state == null) { + return Collections.emptySet(); + } else { + // tag events with location id as topic, grpc projection filters makes sure only that location + // picks them up for drone delivery + return Collections.singleton("t:" + state.localControlLocationId); + } + } + + public RestaurantDeliveries(String restaurantId) { + super(PersistenceId.of(ENTITY_KEY.name(), restaurantId)); + } + + @Override + public State emptyState() { + return null; + } + + @Override + public CommandHandler commandHandler() { + var noStateHandler = + newCommandHandlerBuilder() + .forNullState() + .onCommand(SetUpRestaurant.class, this::onSetUpRestaurant) + .onCommand( + RegisterDelivery.class, + command -> + Effect() + .reply( + command.replyTo, + StatusReply.error( + "Restaurant not yet initialized, cannot accept registrations"))) + .onCommand( + ListCurrentDeliveries.class, + command -> Effect().reply(command.replyTo, Collections.emptyList())); + + var stateHandler = + newCommandHandlerBuilder() + .forNonNullState() + .onCommand( + SetUpRestaurant.class, + command -> + Effect() + .reply( + command.replyTo, + StatusReply.error("Changing restaurant location not supported"))) + .onCommand(RegisterDelivery.class, this::onRegisterDelivery) + .onCommand( + ListCurrentDeliveries.class, + (state, command) -> + // reply with defensive copy of internal mutable state + Effect().reply(command.replyTo, new ArrayList<>(state.currentDeliveries))); + + return noStateHandler.orElse(stateHandler).build(); + } + + private Effect onRegisterDelivery(State state, RegisterDelivery command) { + var existing = + state.currentDeliveries.stream() + .filter(delivery -> delivery.deliveryId.equals(command.deliveryId)) + .findFirst(); + + if (existing.isPresent()) { + if (existing.get().destination.equals(command.destination)) { + // already registered + return Effect().reply(command.replyTo, StatusReply.ack()); + } else { + return Effect() + .reply( + command.replyTo, StatusReply.error("Delivery id exists but for other destination")); + } + } else { + return Effect() + .persist( + new DeliveryRegistered( + new Delivery( + command.deliveryId, + state.localControlLocationId, + state.restaurantLocation, + command.destination, + Instant.now()))) + .thenReply(command.replyTo, updatedState -> StatusReply.ack()); + } + } + + private Effect onSetUpRestaurant(SetUpRestaurant command) { + return Effect() + .persist( + new RestaurantLocationSet(command.localControlLocationId, command.restaurantLocation)) + .thenReply(command.replyTo, updatedState -> StatusReply.ack()); + } + + @Override + public EventHandler eventHandler() { + return newEventHandlerBuilder() + .forAnyState() + .onEvent(RestaurantLocationSet.class, (event) -> + new State(event.localControlLocationId, event.coordinates, new ArrayList<>()) + ) + .onEvent(DeliveryRegistered.class, (state, event) -> { + state.currentDeliveries.add(event.delivery); + return state; + }) + .build(); + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveriesServiceImpl.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveriesServiceImpl.java new file mode 100644 index 000000000..fe5f7cbd5 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveriesServiceImpl.java @@ -0,0 +1,103 @@ +package central.deliveries; + +import akka.Done; +import akka.actor.typed.ActorSystem; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.grpc.GrpcServiceException; +import central.Coordinates; +import central.DeliveriesSettings; +import central.deliveries.proto.*; +import io.grpc.Status; +import java.time.Duration; +import java.util.concurrent.CompletionStage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class RestaurantDeliveriesServiceImpl implements RestaurantDeliveriesService { + + private static final Logger logger = LoggerFactory.getLogger(RestaurantDeliveries.class); + + private final ActorSystem system; + private final DeliveriesSettings settings; + private final ClusterSharding sharding; + + private final Duration askTimeout; + + public RestaurantDeliveriesServiceImpl(ActorSystem system, DeliveriesSettings settings) { + this.system = system; + this.settings = settings; + this.sharding = ClusterSharding.get(system); + this.askTimeout = + system + .settings() + .config() + .getDuration("restaurant-drone-deliveries-service.restaurant-deliveries-ask-timeout"); + } + + @Override + public CompletionStage setUpRestaurant(SetUpRestaurantRequest in) { + logger.info( + "Set up restaurant {}, coordinates {}-{}, location [{}]", + in.getRestaurantId(), + in.getCoordinates().getLatitude(), + in.getCoordinates().getLongitude(), + in.getLocalControlLocationId()); + + if (!settings.locationIds.contains(in.getLocalControlLocationId())) { + throw new GrpcServiceException( + Status.INVALID_ARGUMENT.withDescription( + "The local control location id " + + in.getLocalControlLocationId() + + " is not known to the service")); + } + + var entityRef = sharding.entityRefFor(RestaurantDeliveries.ENTITY_KEY, in.getRestaurantId()); + + var coordinates = Coordinates.fromProto(in.getCoordinates()); + CompletionStage reply = + entityRef.askWithStatus( + replyTo -> + new RestaurantDeliveries.SetUpRestaurant( + in.getLocalControlLocationId(), coordinates, replyTo), + askTimeout); + + return reply.handle( + (done, error) -> { + if (error != null) { + throw new GrpcServiceException(Status.INTERNAL.withDescription(error.getMessage())); + } else { + + return RegisterRestaurantResponse.getDefaultInstance(); + } + }); + } + + @Override + public CompletionStage registerDelivery(RegisterDeliveryRequest in) { + logger.info( + "Register delivery for restaurant {}, delivery id {}, destination {},{}", + in.getRestaurantId(), + in.getDeliveryId(), + in.getCoordinates().getLatitude(), + in.getCoordinates().getLongitude()); + + var entityRef = sharding.entityRefFor(RestaurantDeliveries.ENTITY_KEY, in.getRestaurantId()); + + var destination = Coordinates.fromProto(in.getCoordinates()); + + CompletionStage reply = + entityRef.askWithStatus( + replyTo -> + new RestaurantDeliveries.RegisterDelivery(in.getDeliveryId(), destination, replyTo), + askTimeout); + + return reply.handle( + (done, error) -> { + if (error != null) { + throw new GrpcServiceException(Status.INTERNAL.withDescription(error.getMessage())); + } else { + return RegisterDeliveryResponse.getDefaultInstance(); + } + }); + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java new file mode 100644 index 000000000..17c554edf --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java @@ -0,0 +1,117 @@ +package central.drones; + +import akka.Done; +import akka.actor.typed.ActorRef; +import akka.actor.typed.ActorSystem; +import akka.actor.typed.javadsl.ActorContext; +import akka.actor.typed.javadsl.Behaviors; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.Entity; +import akka.cluster.sharding.typed.javadsl.EntityTypeKey; +import akka.pattern.StatusReply; +import akka.persistence.typed.PersistenceId; +import akka.persistence.typed.state.javadsl.CommandHandler; +import akka.persistence.typed.state.javadsl.DurableStateBehavior; +import akka.persistence.typed.state.javadsl.Effect; +import central.CborSerializable; +import central.CoarseGrainedCoordinates; +import com.fasterxml.jackson.annotation.JsonCreator; +import java.time.Instant; +import java.util.Optional; + +public final class Drone extends DurableStateBehavior { + + interface Command extends CborSerializable {} + + public static final class UpdateLocation implements Command { + public final String locationName; + public final CoarseGrainedCoordinates coarseGrainedCoordinates; + public final ActorRef> replyTo; + + public UpdateLocation( + String locationName, + CoarseGrainedCoordinates coarseGrainedCoordinates, + ActorRef> replyTo) { + this.locationName = locationName; + this.coarseGrainedCoordinates = coarseGrainedCoordinates; + this.replyTo = replyTo; + } + } + + public static final class GetState implements Command { + public final ActorRef replyTo; + + @JsonCreator + public GetState(ActorRef replyTo) { + this.replyTo = replyTo; + } + } + + public static final class State implements CborSerializable { + public String locationName; + public Optional currentLocation; + public Instant lastChange; + + public State( + String locationName, + Optional currentLocation, + Instant lastChange) { + this.locationName = locationName; + this.currentLocation = currentLocation; + this.lastChange = lastChange; + } + } + + public static final EntityTypeKey ENTITY_KEY = + EntityTypeKey.create(Command.class, "CentralDrone"); + + public static void init(ActorSystem system) { + ClusterSharding.get(system) + .init( + Entity.of( + ENTITY_KEY, + entityContext -> + Behaviors.setup(context -> new Drone(context, entityContext.getEntityId())))); + } + + private final ActorContext context; + + private Drone(ActorContext context, String entityId) { + super(PersistenceId.of(ENTITY_KEY.name(), entityId)); + this.context = context; + } + + @Override + public State emptyState() { + return new State("unknown", Optional.empty(), Instant.EPOCH); + } + + @Override + public CommandHandler commandHandler() { + return newCommandHandlerBuilder() + .forAnyState() + .onCommand(UpdateLocation.class, this::onUpdateLocation) + .onCommand( + GetState.class, + (state, command) -> + // reply with defensive copy since state is mutable + Effect() + .reply( + command.replyTo, + new State(state.locationName, state.currentLocation, state.lastChange))) + .build(); + } + + private Effect onUpdateLocation(State state, UpdateLocation command) { + context + .getLog() + .info( + "Updating location to [{}], [{}]", + command.locationName, + command.coarseGrainedCoordinates); + state.locationName = command.locationName; + state.currentLocation = Optional.of(command.coarseGrainedCoordinates); + state.lastChange = Instant.now(); + return Effect().persist(state).thenReply(command.replyTo, updatedState -> StatusReply.ack()); + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/DroneOverviewServiceImpl.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/DroneOverviewServiceImpl.java new file mode 100644 index 000000000..e03d34bb2 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/DroneOverviewServiceImpl.java @@ -0,0 +1,111 @@ +package central.drones; + +import akka.actor.typed.ActorSystem; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.grpc.GrpcServiceException; +import akka.japi.Pair; +import akka.persistence.r2dbc.session.javadsl.R2dbcSession; +import akka.persistence.typed.PersistenceId; +import akka.serialization.Serialization; +import akka.serialization.SerializationExtension; +import central.CoarseGrainedCoordinates; +import central.drones.proto.*; +import io.grpc.Status; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class DroneOverviewServiceImpl implements DroneOverviewService { + + private Logger logger = LoggerFactory.getLogger(DroneOverviewServiceImpl.class); + + private final ActorSystem system; + private final Duration timeout; + private final Serialization serialization; + private final ClusterSharding sharding; + + private static final String FIND_BY_LOCATION_SQL = + "SELECT persistence_id, state_ser_id, state_ser_manifest, state_payload " + + "FROM durable_state " + + "WHERE location = $1"; + + public DroneOverviewServiceImpl(ActorSystem system) { + this.system = system; + this.timeout = + system + .settings() + .config() + .getDuration("restaurant-drone-deliveries-service.drone-ask-timeout"); + this.serialization = SerializationExtension.get(system); + this.sharding = ClusterSharding.get(system); + } + + @Override + public CompletionStage getDroneOverview(GetDroneOverviewRequest in) { + return null; + } + + @Override + public CompletionStage getCoarseDroneLocations( + CoarseDroneLocationsRequest in) { + // query against additional columns for drone + logger.info("List drones for location {}", in.getLocation()); + CompletionStage>> queryResult = + R2dbcSession.withSession( + system, + session -> + session.select( + session.createStatement(FIND_BY_LOCATION_SQL).bind(0, in.getLocation()), + row -> { + var serializerId = row.get("state_ser_id", Integer.class); + var serializerManifest = row.get("state_ser_manifest", String.class); + var payload = row.get("state_payload", byte[].class); + var state = + (Drone.State) + serialization + .deserialize(payload, serializerId, serializerManifest) + .get(); + var droneId = + PersistenceId.extractEntityId(row.get("persistence_id", String.class)); + + // we expect it to always be present + var coordinates = state.currentLocation.get(); + return Pair.create(coordinates, droneId); + })); + + return queryResult.thenApply( + (List> droneIdAndLocations) -> { + if (droneIdAndLocations.isEmpty()) throw new GrpcServiceException(Status.NOT_FOUND); + else { + // group drones by coarse location + Map> byLocation = + droneIdAndLocations.stream() + .collect( + Collectors.toMap( + Pair::first, + pair -> new HashSet<>(Collections.singletonList(pair.second())), + (existingSet, newSet) -> { + existingSet.addAll(newSet); + return existingSet; + })); + + // turn into response protobuf message + var protoEntries = + byLocation.entrySet().stream() + .map( + entry -> + CoarseDroneLocations.newBuilder() + .setCoordinates(entry.getKey().toProto()) + .addAllDrones(entry.getValue()) + .build()) + .collect(Collectors.toList()); + return CoarseDroneLocationsResponse.newBuilder() + .addAllCoarseLocations(protoEntries) + .build(); + } + }); + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java new file mode 100644 index 000000000..554e0496e --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java @@ -0,0 +1,146 @@ +package central.drones; + +import akka.Done; +import akka.actor.typed.ActorSystem; +import akka.cluster.sharding.typed.javadsl.ClusterSharding; +import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess; +import akka.http.javadsl.model.HttpRequest; +import akka.http.javadsl.model.HttpResponse; +import akka.japi.Pair; +import akka.japi.function.Function; +import akka.persistence.query.Offset; +import akka.persistence.query.typed.EventEnvelope; +import akka.persistence.r2dbc.query.javadsl.R2dbcReadJournal; +import akka.persistence.typed.PersistenceId; +import akka.projection.Projection; +import akka.projection.ProjectionBehavior; +import akka.projection.ProjectionId; +import akka.projection.eventsourced.javadsl.EventSourcedProvider; +import akka.projection.grpc.consumer.javadsl.EventProducerPushDestination; +import akka.projection.javadsl.Handler; +import akka.projection.javadsl.SourceProvider; +import akka.projection.r2dbc.javadsl.R2dbcProjection; +import central.CoarseGrainedCoordinates; +import java.time.Duration; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.CompletionStage; +import local.drones.proto.CoarseDroneLocation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class LocalDroneEvents { + + private static final Logger logger = LoggerFactory.getLogger(LocalDroneEvents.class); + + // Note: stream id used in producer for the drone events + public static final String DRONE_EVENT_STREAM_ID = "drone-events"; + + // FIXME The type key on the producer side. Make sure we have documented it. + private static final String PRODUCER_ENTITY_TYPE = "Drone"; + + public static Function> pushedEventsGrpcHandler( + ActorSystem system) { + var destination = + EventProducerPushDestination.create( + DRONE_EVENT_STREAM_ID, + Collections.singletonList(local.drones.proto.DroneEvents.getDescriptor()), + system) + .withTransformationForOrigin( + (origin, metadataa) -> + akka.projection.grpc.consumer.javadsl.Transformation.empty() + // tag all events with the location name of the local control it came from + .registerTagMapper( + local.drones.proto.CoarseDroneLocation.class, + envelope -> Collections.singleton("location:" + origin))); + + return EventProducerPushDestination.grpcServiceHandler(destination, system); + } + + private static class LocationHandler extends Handler> { + + private final ClusterSharding sharding; + private final Duration askTimeout; + + public LocationHandler(ActorSystem system) { + this.sharding = ClusterSharding.get(system); + this.askTimeout = + system + .settings() + .config() + .getDuration("restaurant-drone-deliveries-service.drone-ask-timeout"); + } + + @Override + public CompletionStage process(EventEnvelope envelope) { + logger.info( + "Saw projected event: {}-{}: {}", + envelope.persistenceId(), + envelope.sequenceNr(), + envelope.eventOption()); + + // Drone id without producer entity key + var droneId = PersistenceId.extractEntityId(envelope.persistenceId()); + + // same drone id as local but different entity type (our Drone overview representation) + var entityRef = sharding.entityRefFor(Drone.ENTITY_KEY, droneId); + + // we have encoded origin in a tag, extract it + // FIXME could there be a more automatic place where origin is available (envelope.source?) + var originName = + envelope.getTags().stream() + .filter(tag -> tag.startsWith("location:")) + .findFirst() + .get() + .substring("location:".length()); + + return entityRef.askWithStatus( + replyTo -> + new Drone.UpdateLocation( + originName, + CoarseGrainedCoordinates.fromProto(envelope.event().getCoordinates()), + replyTo), + askTimeout); + } + } + ; + + public static void initPushedEventsConsumer(ActorSystem system) { + // Split the slices into N ranges + var numberOfSliceRanges = + system + .settings() + .config() + .getInt("restaurant-drone-deliveries-service.drones.projections-slice-count"); + + var sliceRanges = + EventSourcedProvider.sliceRanges( + system, R2dbcReadJournal.Identifier(), numberOfSliceRanges); + + ShardedDaemonProcess.get(system) + .init( + ProjectionBehavior.Command.class, + "LocalDronesProjection", + sliceRanges.size(), + i -> ProjectionBehavior.create(projection(system, sliceRanges.get(i))), + ProjectionBehavior.stopMessage()); + } + + private static Projection> projection( + ActorSystem system, Pair sliceRange) { + var minSlice = sliceRange.first(); + var maxSlice = sliceRange.second(); + var projectionId = ProjectionId.of("DroneEvents", "drone-" + minSlice + "-" + maxSlice); + + SourceProvider> sourceProvider = + EventSourcedProvider.eventsBySlices( + system, + R2dbcReadJournal.Identifier(), + PRODUCER_ENTITY_TYPE, + sliceRange.first(), + sliceRange.second()); + + return R2dbcProjection.atLeastOnceAsync( + projectionId, Optional.empty(), sourceProvider, () -> new LocationHandler(system), system); + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocationColumn.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocationColumn.java new file mode 100644 index 000000000..6ea18617d --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocationColumn.java @@ -0,0 +1,24 @@ +package central.drones; + +import akka.persistence.r2dbc.state.javadsl.AdditionalColumn; + +/** + * Write local drone control location name column for querying drone locations per control location + */ +public final class LocationColumn extends AdditionalColumn { + + @Override + public Class fieldClass() { + return String.class; + } + + @Override + public String columnName() { + return "location"; + } + + @Override + public Binding bind(Upsert upsert) { + return AdditionalColumn.bindValue(upsert.value().locationName); + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/delivery_events.proto b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/delivery_events.proto new file mode 100644 index 000000000..0a98ec3db --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/delivery_events.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "central.deliveries.proto"; + +package central.deliveries; + +import "common/coordinates.proto"; + +message DeliveryRegistered { + string delivery_id = 1; + common.Coordinates origin = 2; + common.Coordinates destination = 3; +} \ No newline at end of file diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/restaurant_deliveries_api.proto b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/restaurant_deliveries_api.proto new file mode 100644 index 000000000..b832f6043 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/restaurant_deliveries_api.proto @@ -0,0 +1,31 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "central.deliveries.proto"; + +package central.deliveries; + +import "common/coordinates.proto"; + +service RestaurantDeliveriesService { + rpc SetUpRestaurant(SetUpRestaurantRequest) returns (RegisterRestaurantResponse) {}; + rpc RegisterDelivery(RegisterDeliveryRequest) returns (RegisterDeliveryResponse) {}; +} + +message SetUpRestaurantRequest { + string restaurant_id = 1; + common.Coordinates coordinates = 2; + string local_control_location_id = 3; +} + +message RegisterRestaurantResponse {} + +message RegisterDeliveryRequest { + string delivery_id = 1; + string restaurant_id = 2; + common.Coordinates coordinates = 3; +} + +message RegisterDeliveryResponse { + +} \ No newline at end of file diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/drones/drone_overview_api.proto b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/drones/drone_overview_api.proto new file mode 100644 index 000000000..ea649a32b --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/drones/drone_overview_api.proto @@ -0,0 +1,37 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "central.drones.proto"; + +package central.drones; + +import "common/coordinates.proto"; + +service DroneOverviewService { + rpc GetDroneOverview(GetDroneOverviewRequest) returns (GetDroneOverviewResponse) {} + rpc GetCoarseDroneLocations(CoarseDroneLocationsRequest) returns (CoarseDroneLocationsResponse) {} +} + +message CoarseDroneLocationsRequest { + // name of the location + string location = 1; +} + +message CoarseDroneLocationsResponse { + repeated CoarseDroneLocations coarse_locations = 1; +} + +message CoarseDroneLocations { + common.Coordinates coordinates = 1; + repeated string drones = 2; +} + +message GetDroneOverviewRequest { + string drone_id = 1; +} + +message GetDroneOverviewResponse { + string location_name = 1; + double coarse_latitude = 2; + double coarse_longitude = 3; +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/common/coordinates.proto b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/common/coordinates.proto new file mode 100644 index 000000000..99f385d72 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/common/coordinates.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "common.proto"; + +package common; + +// generic messages, shared between local-drone-control and restaurant-drone-deliveries + +message Coordinates { + // latitude (north-south) in decimal degree coordinates + double latitude = 1; + // longitude (east west) in decimal degree coordinates + double longitude = 2; +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/local/drones/drone_events.proto b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/local/drones/drone_events.proto new file mode 100644 index 000000000..94535d78f --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/local/drones/drone_events.proto @@ -0,0 +1,16 @@ +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "local.drones.proto"; + +package local.drones; + +import "common/coordinates.proto"; + +// copy of the descriptor from local-drone-control who owns/publishes it + +// events published by the drones +message CoarseDroneLocation { + common.Coordinates coordinates = 1; +} + diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/application.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/application.conf new file mode 100644 index 000000000..382b8ae7f --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/application.conf @@ -0,0 +1,28 @@ +include "cluster" +include "grpc" +include "serialization" +include "persistence" + +akka { + loglevel = DEBUG +} + +restaurant-drone-deliveries-service { + drone-ask-timeout = 3s + restaurant-deliveries-ask-timeout = 3s + + drones { + # Run this many local projections to consume the pushed drone updates from the local drone control services + # to the central drone durable state entities + projections-slice-count = 4 + } + + local-drone-control { + # local drone control locations needs to be known up front, + # restaurants are tied to the closest location + locations = [ + "sweden/stockholm/kungsholmen", + "sweden/stockholm/norrmalm" + ] + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/cluster.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/cluster.conf new file mode 100644 index 000000000..dc3b0bc8c --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/cluster.conf @@ -0,0 +1,33 @@ +akka { + actor.provider = cluster + + remote.artery { + canonical.port = 2551 + } + + cluster { + downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider" + + shutdown-after-unsuccessful-join-seed-nodes = 120s + + sharding { + least-shard-allocation-strategy.rebalance-absolute-limit = 20 + passivation.strategy = default-strategy + } + } +} + +akka.management { + http { + port = 8558 + port = ${?HTTP_MGMT_PORT} + } + cluster.bootstrap { + contact-point-discovery { + service-name = "shopping-cart-service" + discovery-method = kubernetes-api + required-contact-point-nr = 1 + required-contact-point-nr = ${?REQUIRED_CONTACT_POINT_NR} + } + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/grpc.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/grpc.conf new file mode 100644 index 000000000..1d77467fa --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/grpc.conf @@ -0,0 +1,19 @@ +// #http2 +akka.http.server.enable-http2 = on +// #http2 + +restaurant-drone-deliveries-service { + + grpc { + # consider setting this to a specific interface for your environment + interface = "127.0.0.1" + port = 8101 + port = ${?GRPC_PORT} + } +} + +akka.projection.grpc { + producer { + query-plugin-id = "akka.persistence.r2dbc.query" + } +} \ No newline at end of file diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local-shared.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local-shared.conf new file mode 100644 index 000000000..0814f2570 --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local-shared.conf @@ -0,0 +1,29 @@ +shopping-cart-service.grpc.interface = "127.0.0.1" +akka.remote.artery.canonical.hostname = "127.0.0.1" +akka.management.http.hostname = "127.0.0.1" + +akka.management.cluster.bootstrap.contact-point-discovery { + service-name = "shopping-cart-service" + discovery-method = config + # boostrap filters ports with the same IP assuming they are previous instances running on the same node + # unless a port is specified + port-name = "management" + required-contact-point-nr = 1 + # config service discovery never changes + stable-margin = 1 ms + # bootstrap without all the nodes being up + contact-with-all-contact-points = false +} + +akka.discovery.config.services { + "shopping-cart-service" { + endpoints = [ + {host = "127.0.0.1", port = 9101} + {host = "127.0.0.1", port = 9102} + {host = "127.0.0.1", port = 9103} + ] + } +} + +shopping-order-service.host = "localhost" +shopping-order-service.port = 8301 diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local1.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local1.conf new file mode 100644 index 000000000..20e3b819d --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local1.conf @@ -0,0 +1,8 @@ +include "application" +include "local-shared" + +shopping-cart-service.grpc.port = 8101 + +akka.remote.artery.canonical.port = 2551 +akka.management.http.port = 9101 + diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local2.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local2.conf new file mode 100644 index 000000000..fe3d6a5de --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local2.conf @@ -0,0 +1,7 @@ +include "application" +include "local-shared" + +shopping-cart-service.grpc.port = 8102 + +akka.management.http.port = 9102 +akka.remote.artery.canonical.port = 2552 diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local3.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local3.conf new file mode 100644 index 000000000..f33c855fb --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local3.conf @@ -0,0 +1,7 @@ +include "application" +include "local-shared" + +shopping-cart-service.grpc.port = 8103 + +akka.remote.artery.canonical.port = 2553 +akka.management.http.port = 9103 diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/logback.xml b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/logback.xml new file mode 100644 index 000000000..f4e889fcd --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/logback.xml @@ -0,0 +1,19 @@ + + + + + [%date{ISO8601}] [%level] [%logger] [%X{akkaAddress}] [%marker] [%thread] - %msg%n + + + + + 8192 + true + + + + + + + + diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf new file mode 100644 index 000000000..163bd253a --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf @@ -0,0 +1,37 @@ +akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.postgres} +akka { + persistence { + journal.plugin = "akka.persistence.r2dbc.journal" + snapshot-store.plugin = "akka.persistence.r2dbc.snapshot" + state.plugin = "akka.persistence.r2dbc.state" + + r2dbc { + connection-factory { + host = "localhost" + host = ${?DB_HOST} + port = 5432 + database = "postgres" + user = "postgres" + user = ${?DB_USER} + password = "postgres" + password = ${?DB_PASSWORD} + } + + state = { + additional-columns { + "CentralDrone" = ["central.drones.LocationColumn"] + } + } + + # FIXME disabled until we have fixed handling of FilteredEvents for publishing in r2dbc + journal.publish-events = off + } + } + + projection.r2dbc { + offset-store { + # only timestamp based offsets + offset-table = "" + } + } +} diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/serialization.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/serialization.conf new file mode 100644 index 000000000..aca0ab40b --- /dev/null +++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/serialization.conf @@ -0,0 +1,6 @@ +akka.actor.serialization-bindings { + "central.CborSerializable" = jackson-cbor + # FIXME allows us to persist arbitrary protobuf messages pushed from the + # event producer, questionable + "scalapb.GeneratedMessage" = proto +}