diff --git a/.github/workflows/checks.yml b/.github/workflows/checks.yml
index 4ab3c8cba..c62a70060 100644
--- a/.github/workflows/checks.yml
+++ b/.github/workflows/checks.yml
@@ -190,3 +190,8 @@ jobs:
run: |-
cd samples/grpc/local-drone-control-java
mvn compile -nsu -Dakka-projection.version=`cat ~/.version`
+
+ - name: Compile Java Projection gRPC Restaurant Drone Deliveries sample
+ run: |-
+ cd samples/grpc/restaurant-drone-deliveries-service-java
+ mvn compile -nsu -Dakka-projection.version=`cat ~/.version`
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/LICENSE b/samples/grpc/restaurant-drone-deliveries-service-java/LICENSE
new file mode 100644
index 000000000..4239f09e0
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/LICENSE
@@ -0,0 +1,10 @@
+Akka sample by Lightbend
+
+Licensed under Public Domain (CC0)
+
+To the extent possible under law, the person who associated CC0 with
+this Template has waived all copyright and related or neighboring
+rights to this Template.
+
+You should have received a copy of the CC0 legalcode along with this
+work. If not, see .
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/README.md b/samples/grpc/restaurant-drone-deliveries-service-java/README.md
new file mode 100644
index 000000000..20fb90788
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/README.md
@@ -0,0 +1,81 @@
+# Restaurant Drone Deliveries Service
+
+The sample show-cases a service for drones doing restaurant deliveries.
+
+It is intended to be used together with the local-drone-control sample.
+
+* Keeps track of a coarse grained location of each drone to the cloud
+* FIXME Accepts restaurant delivery requests which are then fed to the right local drone control
+
+## Running the sample code
+
+1. Start a local PostgresSQL server on default port 5432. The included `docker-compose.yml` starts everything required for running locally.
+
+ ```shell
+ docker compose up --wait
+
+ # creates the tables needed for Akka Persistence
+ # as well as the offset store table for Akka Projection
+ docker exec -i postgres_db psql -U postgres -t < ddl-scripts/create_tables.sql
+ ```
+
+2. Start a first node:
+
+```shell
+mvn compile exec:exec -DAPP_CONFIG=local1.conf
+```
+
+3. (Optional) Start another node with different ports:
+
+```shell
+mvn compile exec:exec -DAPP_CONFIG=local2.conf
+```
+
+4. (Optional) More can be started:
+
+```shell
+mvn compile exec:exec -DAPP_CONFIG=local3.conf
+```
+
+5. Check for service readiness
+
+ ```shell
+curl http://localhost:9101/ready
+ ```
+
+
+Query a location for drone coordinates
+
+```shell
+grpcurl -d '{"location":"sweden/stockholm/kungsholmen"}' -plaintext localhost:8101 central.drones.DroneOverviewService/GetCoarseDroneLocations
+```
+
+Query the state for a specific drone
+
+```shell
+grpcurl -d '{"drone_id":"drone1"}' -plaintext localhost:8101 central.drones.DroneOverviewService.GetDroneOverview
+```
+
+Set up a restaurant
+
+```shell
+grpcurl -d '{"restaurant_id":"restaurant1","coordinates":{"latitude": 59.330324, "longitude": 18.039568}, "local_control_location_id": "sweden/stockholm/kungsholmen" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant
+```
+
+Set up another restaurant, closest to a different local drone control
+
+```shell
+grpcurl -d '{"restaurant_id":"restaurant2","coordinates":{"latitude": 59.342046, "longitude": 18.059095}, "local_control_location_id": "sweden/stockholm/norrmalm" }' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.SetUpRestaurant
+```
+
+Register a delivery for the first restaurant
+
+```shell
+grpcurl -d '{"restaurant_id":"restaurant1","delivery_id": "order1","coordinates":{"latitude": 59.330841, "longitude": 18.038885}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery
+```
+
+Register a delivery for the second restaurant
+
+```shell
+grpcurl -d '{"restaurant_id":"restaurant2","delivery_id": "order2","coordinates":{"latitude": 59.340128, "longitude": 18.056303}}' -plaintext localhost:8101 central.deliveries.RestaurantDeliveriesService.RegisterDelivery
+```
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/create_tables.sql b/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/create_tables.sql
new file mode 100644
index 000000000..94c2e71cc
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/create_tables.sql
@@ -0,0 +1,85 @@
+CREATE TABLE IF NOT EXISTS event_journal(
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ db_timestamp timestamp with time zone NOT NULL,
+
+ event_ser_id INTEGER NOT NULL,
+ event_ser_manifest VARCHAR(255) NOT NULL,
+ event_payload BYTEA NOT NULL,
+
+ deleted BOOLEAN DEFAULT FALSE NOT NULL,
+ writer VARCHAR(255) NOT NULL,
+ adapter_manifest VARCHAR(255),
+ tags TEXT ARRAY,
+
+ meta_ser_id INTEGER,
+ meta_ser_manifest VARCHAR(255),
+ meta_payload BYTEA,
+
+ PRIMARY KEY(persistence_id, seq_nr)
+);
+
+CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice, entity_type, db_timestamp, seq_nr);
+
+CREATE TABLE IF NOT EXISTS snapshot(
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ write_timestamp BIGINT NOT NULL,
+ ser_id INTEGER NOT NULL,
+ ser_manifest VARCHAR(255) NOT NULL,
+ snapshot BYTEA NOT NULL,
+ meta_ser_id INTEGER,
+ meta_ser_manifest VARCHAR(255),
+ meta_payload BYTEA,
+
+ PRIMARY KEY(persistence_id)
+);
+
+CREATE TABLE IF NOT EXISTS durable_state (
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ revision BIGINT NOT NULL,
+ db_timestamp timestamp with time zone NOT NULL,
+
+ state_ser_id INTEGER NOT NULL,
+ state_ser_manifest VARCHAR(255),
+ state_payload BYTEA NOT NULL,
+ tags TEXT ARRAY,
+
+ -- additional column
+ location VARCHAR(255),
+
+ PRIMARY KEY(persistence_id, revision)
+);
+
+-- to query drones by location
+CREATE INDEX IF NOT EXISTS durable_state_drone_location_idx ON durable_state(location);
+
+
+-- Timestamp based offsets are stored in this table.
+CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ slice INT NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ -- timestamp_offset is the db_timestamp of the original event
+ timestamp_offset timestamp with time zone NOT NULL,
+ -- timestamp_consumed is when the offset was stored
+ -- the consumer lag is timestamp_consumed - timestamp_offset
+ timestamp_consumed timestamp with time zone NOT NULL,
+ PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
+);
+
+CREATE TABLE IF NOT EXISTS akka_projection_management (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ paused BOOLEAN NOT NULL,
+ last_updated BIGINT NOT NULL,
+ PRIMARY KEY(projection_name, projection_key)
+);
\ No newline at end of file
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/drop_tables.sql b/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/drop_tables.sql
new file mode 100644
index 000000000..c71bc7ae2
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/ddl-scripts/drop_tables.sql
@@ -0,0 +1,5 @@
+DROP TABLE event_journal;
+DROP TABLE snapshot;
+DROP TABLE durable_state;
+DROP TABLE akka_projection_timestamp_offset_store;
+DROP TABLE akka_projection_management;
\ No newline at end of file
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/docker-compose.yml b/samples/grpc/restaurant-drone-deliveries-service-java/docker-compose.yml
new file mode 100644
index 000000000..366c5a10a
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/docker-compose.yml
@@ -0,0 +1,16 @@
+version: '2.2'
+services:
+ postgres-db:
+ image: postgres:latest
+ container_name: postgres_db
+ ports:
+ - 5432:5432
+ environment:
+ POSTGRES_USER: postgres
+ POSTGRES_PASSWORD: postgres
+ healthcheck:
+ test: ['CMD', 'pg_isready', "-q", "-d", "postgres", "-U", "postgres"]
+ interval: 5s
+ retries: 5
+ start_period: 5s
+ timeout: 5s
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml b/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml
new file mode 100644
index 000000000..3b964ebeb
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/pom.xml
@@ -0,0 +1,279 @@
+
+
+
+ 4.0.0
+ shopping-cart-service
+ com.lightbend.akka.samples
+ 1.0
+
+
+
+ Public Domain (CC0)
+ http://creativecommons.org/publicdomain/zero/1.0/
+
+
+
+
+ UTF-8
+ 2.8.4
+ 1.5.0-M3
+ 1.2.0-M3
+ 1.4.0
+ 2.0.1
+ 2.3.3
+ 2.3.3
+ 1.3.6
+ 4.13.1
+ 2.13
+
+ application.conf
+
+ ${git.commit.time}-${git.commit.id.abbrev}
+
+
+
+
+ com.typesafe.akka
+ akka-cluster-typed_${scala.binary.version}
+ ${akka.version}
+
+
+ com.typesafe.akka
+ akka-cluster-sharding-typed_${scala.binary.version}
+ ${akka.version}
+
+
+ com.typesafe.akka
+ akka-persistence-typed_${scala.binary.version}
+ ${akka.version}
+
+
+ com.typesafe.akka
+ akka-persistence-query_${scala.binary.version}
+ ${akka.version}
+
+
+ com.typesafe.akka
+ akka-serialization-jackson_${scala.binary.version}
+ ${akka.version}
+
+
+ com.typesafe.akka
+ akka-cluster-tools_${scala.binary.version}
+ ${akka.version}
+
+
+ com.typesafe.akka
+ akka-discovery_${scala.binary.version}
+ ${akka.version}
+
+
+ com.lightbend.akka
+ akka-persistence-r2dbc_${scala.binary.version}
+ ${akka-persistence-r2dbc.version}
+
+
+ com.lightbend.akka
+ akka-projection-r2dbc_${scala.binary.version}
+ ${akka-projection.version}
+
+
+ com.lightbend.akka
+ akka-projection-grpc_${scala.binary.version}
+ ${akka-projection.version}
+
+
+ com.lightbend.akka
+ akka-projection-eventsourced_${scala.binary.version}
+ ${akka-projection.version}
+
+
+ com.lightbend.akka.management
+ akka-management-cluster-http_${scala.binary.version}
+ ${akka-management.version}
+
+
+ com.lightbend.akka.management
+ akka-management-cluster-bootstrap_${scala.binary.version}
+ ${akka-management.version}
+
+
+ com.lightbend.akka.discovery
+ akka-discovery-kubernetes-api_${scala.binary.version}
+ ${akka-management.version}
+
+
+ com.lightbend.akka.grpc
+ akka-grpc-runtime_${scala.binary.version}
+ ${akka-grpc.version}
+
+
+ com.lightbend.akka
+ akka-diagnostics_${scala.binary.version}
+ ${akka-diagnostics.version}
+
+
+
+ ch.qos.logback
+ logback-classic
+ ${logback.version}
+
+
+
+ com.typesafe.akka
+ akka-actor-testkit-typed_${scala.binary.version}
+ ${akka.version}
+ test
+
+
+ com.typesafe.akka
+ akka-persistence-testkit_${scala.binary.version}
+ ${akka.version}
+ test
+
+
+ com.lightbend.akka
+ akka-projection-testkit_${scala.binary.version}
+ ${akka-projection.version}
+ test
+
+
+ com.typesafe.akka
+ akka-stream-testkit_${scala.binary.version}
+ ${akka.version}
+ test
+
+
+
+ junit
+ junit
+ ${junit.version}
+ test
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+
+ 11
+
+ -Xlint:unchecked
+ -Xlint:deprecation
+ -parameters
+
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 3.0.0
+
+ java
+
+ -Djava.library.path=target/lib
+ -Dconfig.resource=${APP_CONFIG}
+ -classpath
+
+ central.Main
+
+
+
+
+
+ com.lightbend.akka.grpc
+ akka-grpc-maven-plugin
+ ${akka-grpc-maven-plugin.version}
+
+ Java
+
+
+
+
+ generate
+
+
+
+
+
+ com.diffplug.spotless
+ spotless-maven-plugin
+ 2.35.0
+
+
+
+ 1.11.0
+
+
+
+
+
+
+ pl.project13.maven
+ git-commit-id-plugin
+ 4.0.0
+
+
+ validate
+
+ revision
+
+
+
+
+ yyyyMMdd-HHmmss
+
+ ${project.basedir}/.git
+ false
+ false
+
+
+
+ io.fabric8
+ docker-maven-plugin
+ 0.35.0
+
+
+
+ %a
+
+ docker.io/library/eclipse-temurin:17.0.3_7-jre-jammy
+
+ ${version.number}
+
+
+
+ java
+ -cp
+ /maven/*
+ central.Main
+
+
+
+ artifact-with-dependencies
+
+
+
+
+
+
+
+ build-docker-image
+ package
+
+ build
+
+
+
+
+
+
+
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/CborSerializable.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/CborSerializable.java
new file mode 100644
index 000000000..e3c7ac8b8
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/CborSerializable.java
@@ -0,0 +1,7 @@
+package central;
+
+/**
+ * Marker trait for serialization with Jackson CBOR. Enabled in serialization.conf
+ * `akka.actor.serialization-bindings` (via application.conf).
+ */
+public interface CborSerializable {}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/CoarseGrainedCoordinates.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/CoarseGrainedCoordinates.java
new file mode 100644
index 000000000..15c90b5e8
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/CoarseGrainedCoordinates.java
@@ -0,0 +1,60 @@
+package central;
+
+// common location representations, could be a shared library between local control and restaurant
+// but to keep project structure simple we duplicate
+
+public final class CoarseGrainedCoordinates {
+ public final double latitude;
+ public final double longitude;
+
+ public CoarseGrainedCoordinates(double latitude, double longitude) {
+ this.latitude = latitude;
+ this.longitude = longitude;
+ }
+
+ public static CoarseGrainedCoordinates fromCoordinates(Coordinates location) {
+ // not entirely correct, but good enough for a sample/demo
+ // 435-1020m precision depending on place on earth
+ return new CoarseGrainedCoordinates(
+ Math.floor(location.latitude * 100 + 0.5d) / 100,
+ Math.floor(location.longitude * 100 + 0.5d) / 100);
+ }
+
+ public common.proto.Coordinates toProto() {
+ return common.proto.Coordinates.newBuilder()
+ .setLatitude(latitude)
+ .setLongitude(longitude)
+ .build();
+ }
+
+ public static CoarseGrainedCoordinates fromProto(common.proto.Coordinates pc) {
+ return new CoarseGrainedCoordinates(pc.getLatitude(), pc.getLongitude());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ CoarseGrainedCoordinates that = (CoarseGrainedCoordinates) o;
+
+ if (Double.compare(latitude, that.latitude) != 0) return false;
+ return Double.compare(longitude, that.longitude) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ int result;
+ long temp;
+ temp = Double.doubleToLongBits(latitude);
+ result = (int) (temp ^ (temp >>> 32));
+ temp = Double.doubleToLongBits(longitude);
+ result = 31 * result + (int) (temp ^ (temp >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "CoarseGrainedCoordinates{" + "latitude=" + latitude + ", longitude=" + longitude + '}';
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Coordinates.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Coordinates.java
new file mode 100644
index 000000000..cc1c2e52c
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Coordinates.java
@@ -0,0 +1,71 @@
+package central;
+
+// common location representations, could be a shared library between local control and restaurant
+// but to keep project structure simple we duplicate
+
+/** Decimal degree coordinates */
+public final class Coordinates {
+
+ private static final int ROUGHLY_RADIUS_OF_EARTH_IN_M = 6371000;
+ public final double latitude;
+ public final double longitude;
+
+ public Coordinates(double latitude, double longitude) {
+ this.latitude = latitude;
+ this.longitude = longitude;
+ }
+
+ public long distanceTo(Coordinates other) {
+ // using the haversine formula https://en.wikipedia.org/wiki/Versine#hav
+ var latitudeDistance = Math.toRadians(latitude - other.latitude);
+ var longitudeDistance = Math.toRadians(longitude - other.longitude);
+ var sinLatitude = Math.sin(latitudeDistance / 2);
+ var sinLongitude = Math.sin(longitudeDistance / 2);
+ var a =
+ sinLatitude * sinLatitude
+ + (Math.cos(Math.toRadians(latitude))
+ * Math.cos(Math.toRadians(other.latitude))
+ * sinLongitude
+ * sinLongitude);
+ var c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1 - a));
+ return (long) (ROUGHLY_RADIUS_OF_EARTH_IN_M * c);
+ }
+
+ public common.proto.Coordinates toProto() {
+ return common.proto.Coordinates.newBuilder()
+ .setLatitude(latitude)
+ .setLongitude(longitude)
+ .build();
+ }
+
+ public static Coordinates fromProto(common.proto.Coordinates pc) {
+ return new Coordinates(pc.getLatitude(), pc.getLongitude());
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Coordinates that = (Coordinates) o;
+
+ if (Double.compare(latitude, that.latitude) != 0) return false;
+ return Double.compare(longitude, that.longitude) == 0;
+ }
+
+ @Override
+ public int hashCode() {
+ int result;
+ long temp;
+ temp = Double.doubleToLongBits(latitude);
+ result = (int) (temp ^ (temp >>> 32));
+ temp = Double.doubleToLongBits(longitude);
+ result = 31 * result + (int) (temp ^ (temp >>> 32));
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "Coordinates{" + "latitude=" + latitude + ", longitude=" + longitude + '}';
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DeliveriesSettings.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DeliveriesSettings.java
new file mode 100644
index 000000000..31bb24fc6
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DeliveriesSettings.java
@@ -0,0 +1,21 @@
+package central;
+
+import akka.actor.typed.ActorSystem;
+import java.util.HashSet;
+import java.util.Set;
+
+public final class DeliveriesSettings {
+ public final Set locationIds;
+
+ public DeliveriesSettings(Set locationIds) {
+ this.locationIds = locationIds;
+ }
+
+ public static DeliveriesSettings create(ActorSystem> system) {
+ var config = system.settings().config().getConfig("restaurant-drone-deliveries-service");
+ var locationIds = new HashSet<>(config.getStringList("local-drone-control.locations"));
+
+ // FIXME move timeouts here as well
+ return new DeliveriesSettings(locationIds);
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java
new file mode 100644
index 000000000..9eca73d93
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/DroneDeliveriesServer.java
@@ -0,0 +1,61 @@
+package central;
+
+import akka.actor.typed.ActorSystem;
+import akka.grpc.javadsl.ServerReflection;
+import akka.grpc.javadsl.ServiceHandler;
+import akka.http.javadsl.Http;
+import akka.http.javadsl.model.HttpRequest;
+import akka.http.javadsl.model.HttpResponse;
+import akka.japi.function.Function;
+import central.deliveries.proto.RestaurantDeliveriesService;
+import central.deliveries.proto.RestaurantDeliveriesServiceHandlerFactory;
+import central.drones.proto.DroneOverviewService;
+import central.drones.proto.DroneOverviewServiceHandlerFactory;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.CompletionStage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class DroneDeliveriesServer {
+
+ private static final Logger logger = LoggerFactory.getLogger(DroneDeliveriesServer.class);
+
+ public static void start(
+ ActorSystem> system,
+ String host,
+ int port,
+ central.drones.proto.DroneOverviewService droneOverviewService,
+ central.deliveries.proto.RestaurantDeliveriesService restaurantDeliveriesService,
+ Function> deliveryEventsProducerService,
+ Function> pushedDroneEventsHandler) {
+
+ @SuppressWarnings("unchecked")
+ var service =
+ ServiceHandler.concatOrNotFound(
+ DroneOverviewServiceHandlerFactory.create(droneOverviewService, system),
+ RestaurantDeliveriesServiceHandlerFactory.create(restaurantDeliveriesService, system),
+ ServerReflection.create(
+ Arrays.asList(
+ DroneOverviewService.description, RestaurantDeliveriesService.description),
+ system),
+ deliveryEventsProducerService,
+ // FIXME not last once actually partial
+ pushedDroneEventsHandler);
+
+ var bound = Http.get(system).newServerAt(host, port).bind(service);
+ bound.whenComplete(
+ (binding, error) -> {
+ if (error == null) {
+ logger.info(
+ "Drone event consumer listening at: {}:{}",
+ binding.localAddress().getHostString(),
+ binding.localAddress().getPort());
+ binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system);
+ } else {
+ logger.error("Failed to bind gRPC endpoint, terminating system", error);
+ system.terminate();
+ }
+ });
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Main.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Main.java
new file mode 100644
index 000000000..af7cd086a
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/Main.java
@@ -0,0 +1,57 @@
+package central;
+
+import akka.actor.typed.ActorSystem;
+import akka.actor.typed.SpawnProtocol;
+import akka.management.cluster.bootstrap.ClusterBootstrap;
+import akka.management.javadsl.AkkaManagement;
+import central.deliveries.DeliveryEvents;
+import central.deliveries.RestaurantDeliveries;
+import central.deliveries.RestaurantDeliveriesServiceImpl;
+import central.drones.Drone;
+import central.drones.DroneOverviewServiceImpl;
+import central.drones.LocalDroneEvents;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Main {
+
+ private static final Logger logger = LoggerFactory.getLogger(Main.class);
+
+ public static void main(String[] args) {
+ var system = ActorSystem.create(SpawnProtocol.create(), "deliveries");
+ try {
+ init(system);
+ } catch (Throwable e) {
+ logger.error("Terminating due to initialization failure.", e);
+ system.terminate();
+ }
+ }
+
+ private static void init(ActorSystem system) {
+ var settings = DeliveriesSettings.create(system);
+ AkkaManagement.get(system).start();
+ ClusterBootstrap.get(system).start();
+
+ Drone.init(system);
+ LocalDroneEvents.initPushedEventsConsumer(system);
+ RestaurantDeliveries.init(system);
+
+ var host =
+ system.settings().config().getString("restaurant-drone-deliveries-service.grpc.interface");
+ var port = system.settings().config().getInt("restaurant-drone-deliveries-service.grpc.port");
+
+ var pushedDroneEventsHandler = LocalDroneEvents.pushedEventsGrpcHandler(system);
+ var deliveryEventsProducerService = DeliveryEvents.eventProducerService(system);
+ var droneOverviewService = new DroneOverviewServiceImpl(system);
+ var restaurantDeliveriesService = new RestaurantDeliveriesServiceImpl(system, settings);
+
+ DroneDeliveriesServer.start(
+ system,
+ host,
+ port,
+ droneOverviewService,
+ restaurantDeliveriesService,
+ deliveryEventsProducerService,
+ pushedDroneEventsHandler);
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/DeliveryEvents.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/DeliveryEvents.java
new file mode 100644
index 000000000..b32ad4135
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/DeliveryEvents.java
@@ -0,0 +1,53 @@
+package central.deliveries;
+
+import akka.actor.typed.ActorSystem;
+import akka.http.javadsl.model.HttpRequest;
+import akka.http.javadsl.model.HttpResponse;
+import akka.japi.function.Function;
+import akka.persistence.query.typed.EventEnvelope;
+import akka.projection.grpc.producer.EventProducerSettings;
+import akka.projection.grpc.producer.javadsl.EventProducer;
+import akka.projection.grpc.producer.javadsl.EventProducerSource;
+import akka.projection.grpc.producer.javadsl.Transformation;
+import central.deliveries.proto.DeliveryRegistered;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+public final class DeliveryEvents {
+
+ // Note: stream id used in consumer to consume this specific stream
+ public static final String STREAM_ID = "delivery-events";
+
+ public static Function> eventProducerService(
+ ActorSystem> system) {
+ var transformation =
+ Transformation.empty()
+ .registerAsyncEnvelopeMapper(
+ RestaurantDeliveries.DeliveryRegistered.class,
+ DeliveryEvents::transformDeliveryRegistration)
+ // exclude all other types of events for the RestaurantDeliveries
+ .registerOrElseMapper(envelope -> Optional.empty());
+
+ var eventProducerSource =
+ new EventProducerSource(
+ RestaurantDeliveries.ENTITY_KEY.name(),
+ STREAM_ID,
+ transformation,
+ EventProducerSettings.create(system));
+
+ return EventProducer.grpcServiceHandler(system, eventProducerSource);
+ }
+
+ private static CompletionStage> transformDeliveryRegistration(
+ EventEnvelope envelope) {
+ var delivery = envelope.event().delivery;
+ return CompletableFuture.completedFuture(
+ Optional.of(
+ central.deliveries.proto.DeliveryRegistered.newBuilder()
+ .setDeliveryId(delivery.deliveryId)
+ .setOrigin(delivery.origin.toProto())
+ .setDestination(delivery.destination.toProto())
+ .build()));
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java
new file mode 100644
index 000000000..b81aac8a8
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveries.java
@@ -0,0 +1,243 @@
+package central.deliveries;
+
+import akka.Done;
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.ActorSystem;
+import akka.cluster.sharding.typed.javadsl.ClusterSharding;
+import akka.cluster.sharding.typed.javadsl.Entity;
+import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
+import akka.pattern.StatusReply;
+import akka.persistence.typed.PersistenceId;
+import akka.persistence.typed.javadsl.CommandHandler;
+import akka.persistence.typed.javadsl.Effect;
+import akka.persistence.typed.javadsl.EventHandler;
+import akka.persistence.typed.javadsl.EventSourcedBehavior;
+import central.CborSerializable;
+import central.Coordinates;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+public final class RestaurantDeliveries
+ extends EventSourcedBehavior<
+ RestaurantDeliveries.Command, RestaurantDeliveries.Event, RestaurantDeliveries.State> {
+ public interface Command extends CborSerializable {}
+
+ public static final class SetUpRestaurant implements Command {
+ public final String localControlLocationId;
+ public final Coordinates restaurantLocation;
+ public final ActorRef> replyTo;
+
+ public SetUpRestaurant(
+ String localControlLocationId,
+ Coordinates restaurantLocation,
+ ActorRef> replyTo) {
+ this.localControlLocationId = localControlLocationId;
+ this.restaurantLocation = restaurantLocation;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class RegisterDelivery implements Command {
+ public final String deliveryId;
+ public final Coordinates destination;
+ public final ActorRef> replyTo;
+
+ public RegisterDelivery(
+ String deliveryId, Coordinates destination, ActorRef> replyTo) {
+ this.deliveryId = deliveryId;
+ this.destination = destination;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class ListCurrentDeliveries implements Command {
+ public final ActorRef> replyTo;
+
+ @JsonCreator
+ public ListCurrentDeliveries(ActorRef> replyTo) {
+ this.replyTo = replyTo;
+ }
+ }
+
+ public interface Event extends CborSerializable {}
+
+ public static final class RestaurantLocationSet implements Event {
+ public final String localControlLocationId;
+ public final Coordinates coordinates;
+
+ public RestaurantLocationSet(String localControlLocationId, Coordinates coordinates) {
+ this.localControlLocationId = localControlLocationId;
+ this.coordinates = coordinates;
+ }
+ }
+
+ public static final class DeliveryRegistered implements Event {
+ public final Delivery delivery;
+
+ @JsonCreator
+ public DeliveryRegistered(Delivery delivery) {
+ this.delivery = delivery;
+ }
+ }
+
+ public static final class State implements CborSerializable {
+ public final String localControlLocationId;
+ public final Coordinates restaurantLocation;
+ public final List currentDeliveries;
+
+ public State(
+ String localControlLocationId,
+ Coordinates restaurantLocation,
+ List currentDeliveries) {
+ this.localControlLocationId = localControlLocationId;
+ this.restaurantLocation = restaurantLocation;
+ this.currentDeliveries = currentDeliveries;
+ }
+ }
+
+ public static final class Delivery {
+ public final String deliveryId;
+ // The following two fields always the same for the same restaurant, so that they can be seen in
+ // the downstream projection.
+ public final String localControlLocationId;
+ public final Coordinates origin;
+ public final Coordinates destination;
+ public final Instant timestamp;
+
+ public Delivery(
+ String deliveryId,
+ String localControlLocationId,
+ Coordinates origin,
+ Coordinates destination,
+ Instant timestamp) {
+ this.deliveryId = deliveryId;
+ this.localControlLocationId = localControlLocationId;
+ this.origin = origin;
+ this.destination = destination;
+ this.timestamp = timestamp;
+ }
+ }
+
+ public static final EntityTypeKey ENTITY_KEY =
+ EntityTypeKey.create(Command.class, "RestaurantDeliveries");
+
+ public static void init(ActorSystem> system) {
+ ClusterSharding.get(system)
+ .init(
+ Entity.of(
+ ENTITY_KEY,
+ entityContext -> new RestaurantDeliveries(entityContext.getEntityId())));
+ }
+
+ @Override
+ public Set tagsFor(State state, Event event) {
+ if (state == null) {
+ return Collections.emptySet();
+ } else {
+ // tag events with location id as topic, grpc projection filters makes sure only that location
+ // picks them up for drone delivery
+ return Collections.singleton("t:" + state.localControlLocationId);
+ }
+ }
+
+ public RestaurantDeliveries(String restaurantId) {
+ super(PersistenceId.of(ENTITY_KEY.name(), restaurantId));
+ }
+
+ @Override
+ public State emptyState() {
+ return null;
+ }
+
+ @Override
+ public CommandHandler commandHandler() {
+ var noStateHandler =
+ newCommandHandlerBuilder()
+ .forNullState()
+ .onCommand(SetUpRestaurant.class, this::onSetUpRestaurant)
+ .onCommand(
+ RegisterDelivery.class,
+ command ->
+ Effect()
+ .reply(
+ command.replyTo,
+ StatusReply.error(
+ "Restaurant not yet initialized, cannot accept registrations")))
+ .onCommand(
+ ListCurrentDeliveries.class,
+ command -> Effect().reply(command.replyTo, Collections.emptyList()));
+
+ var stateHandler =
+ newCommandHandlerBuilder()
+ .forNonNullState()
+ .onCommand(
+ SetUpRestaurant.class,
+ command ->
+ Effect()
+ .reply(
+ command.replyTo,
+ StatusReply.error("Changing restaurant location not supported")))
+ .onCommand(RegisterDelivery.class, this::onRegisterDelivery)
+ .onCommand(
+ ListCurrentDeliveries.class,
+ (state, command) ->
+ // reply with defensive copy of internal mutable state
+ Effect().reply(command.replyTo, new ArrayList<>(state.currentDeliveries)));
+
+ return noStateHandler.orElse(stateHandler).build();
+ }
+
+ private Effect onRegisterDelivery(State state, RegisterDelivery command) {
+ var existing =
+ state.currentDeliveries.stream()
+ .filter(delivery -> delivery.deliveryId.equals(command.deliveryId))
+ .findFirst();
+
+ if (existing.isPresent()) {
+ if (existing.get().destination.equals(command.destination)) {
+ // already registered
+ return Effect().reply(command.replyTo, StatusReply.ack());
+ } else {
+ return Effect()
+ .reply(
+ command.replyTo, StatusReply.error("Delivery id exists but for other destination"));
+ }
+ } else {
+ return Effect()
+ .persist(
+ new DeliveryRegistered(
+ new Delivery(
+ command.deliveryId,
+ state.localControlLocationId,
+ state.restaurantLocation,
+ command.destination,
+ Instant.now())))
+ .thenReply(command.replyTo, updatedState -> StatusReply.ack());
+ }
+ }
+
+ private Effect onSetUpRestaurant(SetUpRestaurant command) {
+ return Effect()
+ .persist(
+ new RestaurantLocationSet(command.localControlLocationId, command.restaurantLocation))
+ .thenReply(command.replyTo, updatedState -> StatusReply.ack());
+ }
+
+ @Override
+ public EventHandler eventHandler() {
+ return newEventHandlerBuilder()
+ .forAnyState()
+ .onEvent(RestaurantLocationSet.class, (event) ->
+ new State(event.localControlLocationId, event.coordinates, new ArrayList<>())
+ )
+ .onEvent(DeliveryRegistered.class, (state, event) -> {
+ state.currentDeliveries.add(event.delivery);
+ return state;
+ })
+ .build();
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveriesServiceImpl.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveriesServiceImpl.java
new file mode 100644
index 000000000..fe5f7cbd5
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/deliveries/RestaurantDeliveriesServiceImpl.java
@@ -0,0 +1,103 @@
+package central.deliveries;
+
+import akka.Done;
+import akka.actor.typed.ActorSystem;
+import akka.cluster.sharding.typed.javadsl.ClusterSharding;
+import akka.grpc.GrpcServiceException;
+import central.Coordinates;
+import central.DeliveriesSettings;
+import central.deliveries.proto.*;
+import io.grpc.Status;
+import java.time.Duration;
+import java.util.concurrent.CompletionStage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class RestaurantDeliveriesServiceImpl implements RestaurantDeliveriesService {
+
+ private static final Logger logger = LoggerFactory.getLogger(RestaurantDeliveries.class);
+
+ private final ActorSystem> system;
+ private final DeliveriesSettings settings;
+ private final ClusterSharding sharding;
+
+ private final Duration askTimeout;
+
+ public RestaurantDeliveriesServiceImpl(ActorSystem> system, DeliveriesSettings settings) {
+ this.system = system;
+ this.settings = settings;
+ this.sharding = ClusterSharding.get(system);
+ this.askTimeout =
+ system
+ .settings()
+ .config()
+ .getDuration("restaurant-drone-deliveries-service.restaurant-deliveries-ask-timeout");
+ }
+
+ @Override
+ public CompletionStage setUpRestaurant(SetUpRestaurantRequest in) {
+ logger.info(
+ "Set up restaurant {}, coordinates {}-{}, location [{}]",
+ in.getRestaurantId(),
+ in.getCoordinates().getLatitude(),
+ in.getCoordinates().getLongitude(),
+ in.getLocalControlLocationId());
+
+ if (!settings.locationIds.contains(in.getLocalControlLocationId())) {
+ throw new GrpcServiceException(
+ Status.INVALID_ARGUMENT.withDescription(
+ "The local control location id "
+ + in.getLocalControlLocationId()
+ + " is not known to the service"));
+ }
+
+ var entityRef = sharding.entityRefFor(RestaurantDeliveries.ENTITY_KEY, in.getRestaurantId());
+
+ var coordinates = Coordinates.fromProto(in.getCoordinates());
+ CompletionStage reply =
+ entityRef.askWithStatus(
+ replyTo ->
+ new RestaurantDeliveries.SetUpRestaurant(
+ in.getLocalControlLocationId(), coordinates, replyTo),
+ askTimeout);
+
+ return reply.handle(
+ (done, error) -> {
+ if (error != null) {
+ throw new GrpcServiceException(Status.INTERNAL.withDescription(error.getMessage()));
+ } else {
+
+ return RegisterRestaurantResponse.getDefaultInstance();
+ }
+ });
+ }
+
+ @Override
+ public CompletionStage registerDelivery(RegisterDeliveryRequest in) {
+ logger.info(
+ "Register delivery for restaurant {}, delivery id {}, destination {},{}",
+ in.getRestaurantId(),
+ in.getDeliveryId(),
+ in.getCoordinates().getLatitude(),
+ in.getCoordinates().getLongitude());
+
+ var entityRef = sharding.entityRefFor(RestaurantDeliveries.ENTITY_KEY, in.getRestaurantId());
+
+ var destination = Coordinates.fromProto(in.getCoordinates());
+
+ CompletionStage reply =
+ entityRef.askWithStatus(
+ replyTo ->
+ new RestaurantDeliveries.RegisterDelivery(in.getDeliveryId(), destination, replyTo),
+ askTimeout);
+
+ return reply.handle(
+ (done, error) -> {
+ if (error != null) {
+ throw new GrpcServiceException(Status.INTERNAL.withDescription(error.getMessage()));
+ } else {
+ return RegisterDeliveryResponse.getDefaultInstance();
+ }
+ });
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java
new file mode 100644
index 000000000..17c554edf
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/Drone.java
@@ -0,0 +1,117 @@
+package central.drones;
+
+import akka.Done;
+import akka.actor.typed.ActorRef;
+import akka.actor.typed.ActorSystem;
+import akka.actor.typed.javadsl.ActorContext;
+import akka.actor.typed.javadsl.Behaviors;
+import akka.cluster.sharding.typed.javadsl.ClusterSharding;
+import akka.cluster.sharding.typed.javadsl.Entity;
+import akka.cluster.sharding.typed.javadsl.EntityTypeKey;
+import akka.pattern.StatusReply;
+import akka.persistence.typed.PersistenceId;
+import akka.persistence.typed.state.javadsl.CommandHandler;
+import akka.persistence.typed.state.javadsl.DurableStateBehavior;
+import akka.persistence.typed.state.javadsl.Effect;
+import central.CborSerializable;
+import central.CoarseGrainedCoordinates;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import java.time.Instant;
+import java.util.Optional;
+
+public final class Drone extends DurableStateBehavior {
+
+ interface Command extends CborSerializable {}
+
+ public static final class UpdateLocation implements Command {
+ public final String locationName;
+ public final CoarseGrainedCoordinates coarseGrainedCoordinates;
+ public final ActorRef> replyTo;
+
+ public UpdateLocation(
+ String locationName,
+ CoarseGrainedCoordinates coarseGrainedCoordinates,
+ ActorRef> replyTo) {
+ this.locationName = locationName;
+ this.coarseGrainedCoordinates = coarseGrainedCoordinates;
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class GetState implements Command {
+ public final ActorRef replyTo;
+
+ @JsonCreator
+ public GetState(ActorRef replyTo) {
+ this.replyTo = replyTo;
+ }
+ }
+
+ public static final class State implements CborSerializable {
+ public String locationName;
+ public Optional currentLocation;
+ public Instant lastChange;
+
+ public State(
+ String locationName,
+ Optional currentLocation,
+ Instant lastChange) {
+ this.locationName = locationName;
+ this.currentLocation = currentLocation;
+ this.lastChange = lastChange;
+ }
+ }
+
+ public static final EntityTypeKey ENTITY_KEY =
+ EntityTypeKey.create(Command.class, "CentralDrone");
+
+ public static void init(ActorSystem> system) {
+ ClusterSharding.get(system)
+ .init(
+ Entity.of(
+ ENTITY_KEY,
+ entityContext ->
+ Behaviors.setup(context -> new Drone(context, entityContext.getEntityId()))));
+ }
+
+ private final ActorContext context;
+
+ private Drone(ActorContext context, String entityId) {
+ super(PersistenceId.of(ENTITY_KEY.name(), entityId));
+ this.context = context;
+ }
+
+ @Override
+ public State emptyState() {
+ return new State("unknown", Optional.empty(), Instant.EPOCH);
+ }
+
+ @Override
+ public CommandHandler commandHandler() {
+ return newCommandHandlerBuilder()
+ .forAnyState()
+ .onCommand(UpdateLocation.class, this::onUpdateLocation)
+ .onCommand(
+ GetState.class,
+ (state, command) ->
+ // reply with defensive copy since state is mutable
+ Effect()
+ .reply(
+ command.replyTo,
+ new State(state.locationName, state.currentLocation, state.lastChange)))
+ .build();
+ }
+
+ private Effect onUpdateLocation(State state, UpdateLocation command) {
+ context
+ .getLog()
+ .info(
+ "Updating location to [{}], [{}]",
+ command.locationName,
+ command.coarseGrainedCoordinates);
+ state.locationName = command.locationName;
+ state.currentLocation = Optional.of(command.coarseGrainedCoordinates);
+ state.lastChange = Instant.now();
+ return Effect().persist(state).thenReply(command.replyTo, updatedState -> StatusReply.ack());
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/DroneOverviewServiceImpl.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/DroneOverviewServiceImpl.java
new file mode 100644
index 000000000..e03d34bb2
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/DroneOverviewServiceImpl.java
@@ -0,0 +1,111 @@
+package central.drones;
+
+import akka.actor.typed.ActorSystem;
+import akka.cluster.sharding.typed.javadsl.ClusterSharding;
+import akka.grpc.GrpcServiceException;
+import akka.japi.Pair;
+import akka.persistence.r2dbc.session.javadsl.R2dbcSession;
+import akka.persistence.typed.PersistenceId;
+import akka.serialization.Serialization;
+import akka.serialization.SerializationExtension;
+import central.CoarseGrainedCoordinates;
+import central.drones.proto.*;
+import io.grpc.Status;
+import java.time.Duration;
+import java.util.*;
+import java.util.concurrent.CompletionStage;
+import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class DroneOverviewServiceImpl implements DroneOverviewService {
+
+ private Logger logger = LoggerFactory.getLogger(DroneOverviewServiceImpl.class);
+
+ private final ActorSystem> system;
+ private final Duration timeout;
+ private final Serialization serialization;
+ private final ClusterSharding sharding;
+
+ private static final String FIND_BY_LOCATION_SQL =
+ "SELECT persistence_id, state_ser_id, state_ser_manifest, state_payload "
+ + "FROM durable_state "
+ + "WHERE location = $1";
+
+ public DroneOverviewServiceImpl(ActorSystem> system) {
+ this.system = system;
+ this.timeout =
+ system
+ .settings()
+ .config()
+ .getDuration("restaurant-drone-deliveries-service.drone-ask-timeout");
+ this.serialization = SerializationExtension.get(system);
+ this.sharding = ClusterSharding.get(system);
+ }
+
+ @Override
+ public CompletionStage getDroneOverview(GetDroneOverviewRequest in) {
+ return null;
+ }
+
+ @Override
+ public CompletionStage getCoarseDroneLocations(
+ CoarseDroneLocationsRequest in) {
+ // query against additional columns for drone
+ logger.info("List drones for location {}", in.getLocation());
+ CompletionStage>> queryResult =
+ R2dbcSession.withSession(
+ system,
+ session ->
+ session.select(
+ session.createStatement(FIND_BY_LOCATION_SQL).bind(0, in.getLocation()),
+ row -> {
+ var serializerId = row.get("state_ser_id", Integer.class);
+ var serializerManifest = row.get("state_ser_manifest", String.class);
+ var payload = row.get("state_payload", byte[].class);
+ var state =
+ (Drone.State)
+ serialization
+ .deserialize(payload, serializerId, serializerManifest)
+ .get();
+ var droneId =
+ PersistenceId.extractEntityId(row.get("persistence_id", String.class));
+
+ // we expect it to always be present
+ var coordinates = state.currentLocation.get();
+ return Pair.create(coordinates, droneId);
+ }));
+
+ return queryResult.thenApply(
+ (List> droneIdAndLocations) -> {
+ if (droneIdAndLocations.isEmpty()) throw new GrpcServiceException(Status.NOT_FOUND);
+ else {
+ // group drones by coarse location
+ Map> byLocation =
+ droneIdAndLocations.stream()
+ .collect(
+ Collectors.toMap(
+ Pair::first,
+ pair -> new HashSet<>(Collections.singletonList(pair.second())),
+ (existingSet, newSet) -> {
+ existingSet.addAll(newSet);
+ return existingSet;
+ }));
+
+ // turn into response protobuf message
+ var protoEntries =
+ byLocation.entrySet().stream()
+ .map(
+ entry ->
+ CoarseDroneLocations.newBuilder()
+ .setCoordinates(entry.getKey().toProto())
+ .addAllDrones(entry.getValue())
+ .build())
+ .collect(Collectors.toList());
+ return CoarseDroneLocationsResponse.newBuilder()
+ .addAllCoarseLocations(protoEntries)
+ .build();
+ }
+ });
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java
new file mode 100644
index 000000000..554e0496e
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocalDroneEvents.java
@@ -0,0 +1,146 @@
+package central.drones;
+
+import akka.Done;
+import akka.actor.typed.ActorSystem;
+import akka.cluster.sharding.typed.javadsl.ClusterSharding;
+import akka.cluster.sharding.typed.javadsl.ShardedDaemonProcess;
+import akka.http.javadsl.model.HttpRequest;
+import akka.http.javadsl.model.HttpResponse;
+import akka.japi.Pair;
+import akka.japi.function.Function;
+import akka.persistence.query.Offset;
+import akka.persistence.query.typed.EventEnvelope;
+import akka.persistence.r2dbc.query.javadsl.R2dbcReadJournal;
+import akka.persistence.typed.PersistenceId;
+import akka.projection.Projection;
+import akka.projection.ProjectionBehavior;
+import akka.projection.ProjectionId;
+import akka.projection.eventsourced.javadsl.EventSourcedProvider;
+import akka.projection.grpc.consumer.javadsl.EventProducerPushDestination;
+import akka.projection.javadsl.Handler;
+import akka.projection.javadsl.SourceProvider;
+import akka.projection.r2dbc.javadsl.R2dbcProjection;
+import central.CoarseGrainedCoordinates;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.CompletionStage;
+import local.drones.proto.CoarseDroneLocation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public final class LocalDroneEvents {
+
+ private static final Logger logger = LoggerFactory.getLogger(LocalDroneEvents.class);
+
+ // Note: stream id used in producer for the drone events
+ public static final String DRONE_EVENT_STREAM_ID = "drone-events";
+
+ // FIXME The type key on the producer side. Make sure we have documented it.
+ private static final String PRODUCER_ENTITY_TYPE = "Drone";
+
+ public static Function> pushedEventsGrpcHandler(
+ ActorSystem> system) {
+ var destination =
+ EventProducerPushDestination.create(
+ DRONE_EVENT_STREAM_ID,
+ Collections.singletonList(local.drones.proto.DroneEvents.getDescriptor()),
+ system)
+ .withTransformationForOrigin(
+ (origin, metadataa) ->
+ akka.projection.grpc.consumer.javadsl.Transformation.empty()
+ // tag all events with the location name of the local control it came from
+ .registerTagMapper(
+ local.drones.proto.CoarseDroneLocation.class,
+ envelope -> Collections.singleton("location:" + origin)));
+
+ return EventProducerPushDestination.grpcServiceHandler(destination, system);
+ }
+
+ private static class LocationHandler extends Handler> {
+
+ private final ClusterSharding sharding;
+ private final Duration askTimeout;
+
+ public LocationHandler(ActorSystem> system) {
+ this.sharding = ClusterSharding.get(system);
+ this.askTimeout =
+ system
+ .settings()
+ .config()
+ .getDuration("restaurant-drone-deliveries-service.drone-ask-timeout");
+ }
+
+ @Override
+ public CompletionStage process(EventEnvelope envelope) {
+ logger.info(
+ "Saw projected event: {}-{}: {}",
+ envelope.persistenceId(),
+ envelope.sequenceNr(),
+ envelope.eventOption());
+
+ // Drone id without producer entity key
+ var droneId = PersistenceId.extractEntityId(envelope.persistenceId());
+
+ // same drone id as local but different entity type (our Drone overview representation)
+ var entityRef = sharding.entityRefFor(Drone.ENTITY_KEY, droneId);
+
+ // we have encoded origin in a tag, extract it
+ // FIXME could there be a more automatic place where origin is available (envelope.source?)
+ var originName =
+ envelope.getTags().stream()
+ .filter(tag -> tag.startsWith("location:"))
+ .findFirst()
+ .get()
+ .substring("location:".length());
+
+ return entityRef.askWithStatus(
+ replyTo ->
+ new Drone.UpdateLocation(
+ originName,
+ CoarseGrainedCoordinates.fromProto(envelope.event().getCoordinates()),
+ replyTo),
+ askTimeout);
+ }
+ }
+ ;
+
+ public static void initPushedEventsConsumer(ActorSystem> system) {
+ // Split the slices into N ranges
+ var numberOfSliceRanges =
+ system
+ .settings()
+ .config()
+ .getInt("restaurant-drone-deliveries-service.drones.projections-slice-count");
+
+ var sliceRanges =
+ EventSourcedProvider.sliceRanges(
+ system, R2dbcReadJournal.Identifier(), numberOfSliceRanges);
+
+ ShardedDaemonProcess.get(system)
+ .init(
+ ProjectionBehavior.Command.class,
+ "LocalDronesProjection",
+ sliceRanges.size(),
+ i -> ProjectionBehavior.create(projection(system, sliceRanges.get(i))),
+ ProjectionBehavior.stopMessage());
+ }
+
+ private static Projection> projection(
+ ActorSystem> system, Pair sliceRange) {
+ var minSlice = sliceRange.first();
+ var maxSlice = sliceRange.second();
+ var projectionId = ProjectionId.of("DroneEvents", "drone-" + minSlice + "-" + maxSlice);
+
+ SourceProvider> sourceProvider =
+ EventSourcedProvider.eventsBySlices(
+ system,
+ R2dbcReadJournal.Identifier(),
+ PRODUCER_ENTITY_TYPE,
+ sliceRange.first(),
+ sliceRange.second());
+
+ return R2dbcProjection.atLeastOnceAsync(
+ projectionId, Optional.empty(), sourceProvider, () -> new LocationHandler(system), system);
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocationColumn.java b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocationColumn.java
new file mode 100644
index 000000000..6ea18617d
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/java/central/drones/LocationColumn.java
@@ -0,0 +1,24 @@
+package central.drones;
+
+import akka.persistence.r2dbc.state.javadsl.AdditionalColumn;
+
+/**
+ * Write local drone control location name column for querying drone locations per control location
+ */
+public final class LocationColumn extends AdditionalColumn {
+
+ @Override
+ public Class fieldClass() {
+ return String.class;
+ }
+
+ @Override
+ public String columnName() {
+ return "location";
+ }
+
+ @Override
+ public Binding bind(Upsert upsert) {
+ return AdditionalColumn.bindValue(upsert.value().locationName);
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/delivery_events.proto b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/delivery_events.proto
new file mode 100644
index 000000000..0a98ec3db
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/delivery_events.proto
@@ -0,0 +1,14 @@
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "central.deliveries.proto";
+
+package central.deliveries;
+
+import "common/coordinates.proto";
+
+message DeliveryRegistered {
+ string delivery_id = 1;
+ common.Coordinates origin = 2;
+ common.Coordinates destination = 3;
+}
\ No newline at end of file
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/restaurant_deliveries_api.proto b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/restaurant_deliveries_api.proto
new file mode 100644
index 000000000..b832f6043
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/deliveries/restaurant_deliveries_api.proto
@@ -0,0 +1,31 @@
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "central.deliveries.proto";
+
+package central.deliveries;
+
+import "common/coordinates.proto";
+
+service RestaurantDeliveriesService {
+ rpc SetUpRestaurant(SetUpRestaurantRequest) returns (RegisterRestaurantResponse) {};
+ rpc RegisterDelivery(RegisterDeliveryRequest) returns (RegisterDeliveryResponse) {};
+}
+
+message SetUpRestaurantRequest {
+ string restaurant_id = 1;
+ common.Coordinates coordinates = 2;
+ string local_control_location_id = 3;
+}
+
+message RegisterRestaurantResponse {}
+
+message RegisterDeliveryRequest {
+ string delivery_id = 1;
+ string restaurant_id = 2;
+ common.Coordinates coordinates = 3;
+}
+
+message RegisterDeliveryResponse {
+
+}
\ No newline at end of file
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/drones/drone_overview_api.proto b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/drones/drone_overview_api.proto
new file mode 100644
index 000000000..ea649a32b
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/central/drones/drone_overview_api.proto
@@ -0,0 +1,37 @@
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "central.drones.proto";
+
+package central.drones;
+
+import "common/coordinates.proto";
+
+service DroneOverviewService {
+ rpc GetDroneOverview(GetDroneOverviewRequest) returns (GetDroneOverviewResponse) {}
+ rpc GetCoarseDroneLocations(CoarseDroneLocationsRequest) returns (CoarseDroneLocationsResponse) {}
+}
+
+message CoarseDroneLocationsRequest {
+ // name of the location
+ string location = 1;
+}
+
+message CoarseDroneLocationsResponse {
+ repeated CoarseDroneLocations coarse_locations = 1;
+}
+
+message CoarseDroneLocations {
+ common.Coordinates coordinates = 1;
+ repeated string drones = 2;
+}
+
+message GetDroneOverviewRequest {
+ string drone_id = 1;
+}
+
+message GetDroneOverviewResponse {
+ string location_name = 1;
+ double coarse_latitude = 2;
+ double coarse_longitude = 3;
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/common/coordinates.proto b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/common/coordinates.proto
new file mode 100644
index 000000000..99f385d72
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/common/coordinates.proto
@@ -0,0 +1,15 @@
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "common.proto";
+
+package common;
+
+// generic messages, shared between local-drone-control and restaurant-drone-deliveries
+
+message Coordinates {
+ // latitude (north-south) in decimal degree coordinates
+ double latitude = 1;
+ // longitude (east west) in decimal degree coordinates
+ double longitude = 2;
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/local/drones/drone_events.proto b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/local/drones/drone_events.proto
new file mode 100644
index 000000000..94535d78f
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/protobuf/local/drones/drone_events.proto
@@ -0,0 +1,16 @@
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "local.drones.proto";
+
+package local.drones;
+
+import "common/coordinates.proto";
+
+// copy of the descriptor from local-drone-control who owns/publishes it
+
+// events published by the drones
+message CoarseDroneLocation {
+ common.Coordinates coordinates = 1;
+}
+
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/application.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/application.conf
new file mode 100644
index 000000000..382b8ae7f
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/application.conf
@@ -0,0 +1,28 @@
+include "cluster"
+include "grpc"
+include "serialization"
+include "persistence"
+
+akka {
+ loglevel = DEBUG
+}
+
+restaurant-drone-deliveries-service {
+ drone-ask-timeout = 3s
+ restaurant-deliveries-ask-timeout = 3s
+
+ drones {
+ # Run this many local projections to consume the pushed drone updates from the local drone control services
+ # to the central drone durable state entities
+ projections-slice-count = 4
+ }
+
+ local-drone-control {
+ # local drone control locations needs to be known up front,
+ # restaurants are tied to the closest location
+ locations = [
+ "sweden/stockholm/kungsholmen",
+ "sweden/stockholm/norrmalm"
+ ]
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/cluster.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/cluster.conf
new file mode 100644
index 000000000..dc3b0bc8c
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/cluster.conf
@@ -0,0 +1,33 @@
+akka {
+ actor.provider = cluster
+
+ remote.artery {
+ canonical.port = 2551
+ }
+
+ cluster {
+ downing-provider-class = "akka.cluster.sbr.SplitBrainResolverProvider"
+
+ shutdown-after-unsuccessful-join-seed-nodes = 120s
+
+ sharding {
+ least-shard-allocation-strategy.rebalance-absolute-limit = 20
+ passivation.strategy = default-strategy
+ }
+ }
+}
+
+akka.management {
+ http {
+ port = 8558
+ port = ${?HTTP_MGMT_PORT}
+ }
+ cluster.bootstrap {
+ contact-point-discovery {
+ service-name = "shopping-cart-service"
+ discovery-method = kubernetes-api
+ required-contact-point-nr = 1
+ required-contact-point-nr = ${?REQUIRED_CONTACT_POINT_NR}
+ }
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/grpc.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/grpc.conf
new file mode 100644
index 000000000..1d77467fa
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/grpc.conf
@@ -0,0 +1,19 @@
+// #http2
+akka.http.server.enable-http2 = on
+// #http2
+
+restaurant-drone-deliveries-service {
+
+ grpc {
+ # consider setting this to a specific interface for your environment
+ interface = "127.0.0.1"
+ port = 8101
+ port = ${?GRPC_PORT}
+ }
+}
+
+akka.projection.grpc {
+ producer {
+ query-plugin-id = "akka.persistence.r2dbc.query"
+ }
+}
\ No newline at end of file
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local-shared.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local-shared.conf
new file mode 100644
index 000000000..0814f2570
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local-shared.conf
@@ -0,0 +1,29 @@
+shopping-cart-service.grpc.interface = "127.0.0.1"
+akka.remote.artery.canonical.hostname = "127.0.0.1"
+akka.management.http.hostname = "127.0.0.1"
+
+akka.management.cluster.bootstrap.contact-point-discovery {
+ service-name = "shopping-cart-service"
+ discovery-method = config
+ # boostrap filters ports with the same IP assuming they are previous instances running on the same node
+ # unless a port is specified
+ port-name = "management"
+ required-contact-point-nr = 1
+ # config service discovery never changes
+ stable-margin = 1 ms
+ # bootstrap without all the nodes being up
+ contact-with-all-contact-points = false
+}
+
+akka.discovery.config.services {
+ "shopping-cart-service" {
+ endpoints = [
+ {host = "127.0.0.1", port = 9101}
+ {host = "127.0.0.1", port = 9102}
+ {host = "127.0.0.1", port = 9103}
+ ]
+ }
+}
+
+shopping-order-service.host = "localhost"
+shopping-order-service.port = 8301
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local1.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local1.conf
new file mode 100644
index 000000000..20e3b819d
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local1.conf
@@ -0,0 +1,8 @@
+include "application"
+include "local-shared"
+
+shopping-cart-service.grpc.port = 8101
+
+akka.remote.artery.canonical.port = 2551
+akka.management.http.port = 9101
+
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local2.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local2.conf
new file mode 100644
index 000000000..fe3d6a5de
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local2.conf
@@ -0,0 +1,7 @@
+include "application"
+include "local-shared"
+
+shopping-cart-service.grpc.port = 8102
+
+akka.management.http.port = 9102
+akka.remote.artery.canonical.port = 2552
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local3.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local3.conf
new file mode 100644
index 000000000..f33c855fb
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/local3.conf
@@ -0,0 +1,7 @@
+include "application"
+include "local-shared"
+
+shopping-cart-service.grpc.port = 8103
+
+akka.remote.artery.canonical.port = 2553
+akka.management.http.port = 9103
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/logback.xml b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/logback.xml
new file mode 100644
index 000000000..f4e889fcd
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/logback.xml
@@ -0,0 +1,19 @@
+
+
+
+
+ [%date{ISO8601}] [%level] [%logger] [%X{akkaAddress}] [%marker] [%thread] - %msg%n
+
+
+
+
+ 8192
+ true
+
+
+
+
+
+
+
+
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf
new file mode 100644
index 000000000..163bd253a
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/persistence.conf
@@ -0,0 +1,37 @@
+akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.postgres}
+akka {
+ persistence {
+ journal.plugin = "akka.persistence.r2dbc.journal"
+ snapshot-store.plugin = "akka.persistence.r2dbc.snapshot"
+ state.plugin = "akka.persistence.r2dbc.state"
+
+ r2dbc {
+ connection-factory {
+ host = "localhost"
+ host = ${?DB_HOST}
+ port = 5432
+ database = "postgres"
+ user = "postgres"
+ user = ${?DB_USER}
+ password = "postgres"
+ password = ${?DB_PASSWORD}
+ }
+
+ state = {
+ additional-columns {
+ "CentralDrone" = ["central.drones.LocationColumn"]
+ }
+ }
+
+ # FIXME disabled until we have fixed handling of FilteredEvents for publishing in r2dbc
+ journal.publish-events = off
+ }
+ }
+
+ projection.r2dbc {
+ offset-store {
+ # only timestamp based offsets
+ offset-table = ""
+ }
+ }
+}
diff --git a/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/serialization.conf b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/serialization.conf
new file mode 100644
index 000000000..aca0ab40b
--- /dev/null
+++ b/samples/grpc/restaurant-drone-deliveries-service-java/src/main/resources/serialization.conf
@@ -0,0 +1,6 @@
+akka.actor.serialization-bindings {
+ "central.CborSerializable" = jackson-cbor
+ # FIXME allows us to persist arbitrary protobuf messages pushed from the
+ # event producer, questionable
+ "scalapb.GeneratedMessage" = proto
+}