Skip to content

Commit

Permalink
docs: Ability to run the local-drone-control-scala as multi node (#1005)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren authored Sep 14, 2023
1 parent 6cd9e47 commit 975f41c
Show file tree
Hide file tree
Showing 12 changed files with 398 additions and 2 deletions.
22 changes: 21 additions & 1 deletion samples/grpc/local-drone-control-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,24 @@ Inspect the current state of the local delivery queue

```shell
grpcurl -plaintext 127.0.0.1:8080 local.drones.DeliveriesQueueService.GetCurrentQueue
```
```

## Running the sample as a multi node service

It is also possible to run this sample service as a multi node Akka Cluster, for that you need to start a PostgreSQL
instance for all nodes to use for storage and create schema for it:

```shell
docker compose up --wait
docker exec -i local_drone_control_db psql -U postgres -t < ddl-scripts/create_tables.sql
```

Start 3 nodes, in separate terminals:

```shell
sbt -Dconfig.resource=local1.conf "runMain local.drones.ClusteredMain"
sbt -Dconfig.resource=local2.conf "runMain local.drones.ClusteredMain"
sbt -Dconfig.resource=local3.conf "runMain local.drones.ClusteredMain"
```

The nodes now accept plaintext gRPC requests on ports 8080, 8081, 8082
13 changes: 12 additions & 1 deletion samples/grpc/local-drone-control-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ name := "local-drone-control"

organization := "com.lightbend.akka.samples"
organizationHomepage := Some(url("https://akka.io"))
licenses := Seq(("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")))
licenses := Seq(
("CC0", url("https://creativecommons.org/publicdomain/zero/1.0")))

scalaVersion := "2.13.11"

Expand All @@ -20,6 +21,8 @@ Test / testOptions += Tests.Argument("-oDF")
Test / logBuffered := false

run / fork := true
// use the single node main by default
Compile / run / mainClass := Some("local.drones.Main")
// pass along config selection to forked jvm
run / javaOptions ++= sys.props
.get("config.resource")
Expand Down Expand Up @@ -53,6 +56,14 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-sharding-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-discovery" % AkkaVersion,
"com.lightbend.akka" %% "akka-diagnostics" % AkkaDiagnosticsVersion,
// Note: only management/bootstrap only needed when running the multi-node version of the service
// Akka Management powers Health Checks and Akka Cluster Bootstrapping
"com.lightbend.akka.management" %% "akka-management" % AkkaManagementVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion,
"com.lightbend.akka.management" %% "akka-management-cluster-http" % AkkaManagementVersion,
"com.lightbend.akka.management" %% "akka-management-cluster-bootstrap" % AkkaManagementVersion,
"com.lightbend.akka.discovery" %% "akka-discovery-kubernetes-api" % AkkaManagementVersion,
// Common dependencies for logging and testing
"com.typesafe.akka" %% "akka-slf4j" % AkkaVersion,
"ch.qos.logback" % "logback-classic" % "1.3.6",
Expand Down
102 changes: 102 additions & 0 deletions samples/grpc/local-drone-control-scala/ddl-scripts/create_tables.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
-- Only needed if running multi-node clustered version of the app with a separate standalone PostgreSQL server

CREATE TABLE IF NOT EXISTS event_journal(
slice INT NOT NULL,
entity_type VARCHAR(255) NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
seq_nr BIGINT NOT NULL,
db_timestamp timestamp with time zone NOT NULL,

event_ser_id INTEGER NOT NULL,
event_ser_manifest VARCHAR(255) NOT NULL,
event_payload BYTEA NOT NULL,

deleted BOOLEAN DEFAULT FALSE NOT NULL,
writer VARCHAR(255) NOT NULL,
adapter_manifest VARCHAR(255),
tags TEXT ARRAY,

meta_ser_id INTEGER,
meta_ser_manifest VARCHAR(255),
meta_payload BYTEA,

PRIMARY KEY(persistence_id, seq_nr)
);

-- `event_journal_slice_idx` is only needed if the slice based queries are used
CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice, entity_type, db_timestamp, seq_nr);

CREATE TABLE IF NOT EXISTS snapshot(
slice INT NOT NULL,
entity_type VARCHAR(255) NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
seq_nr BIGINT NOT NULL,
db_timestamp timestamp with time zone,
write_timestamp BIGINT NOT NULL,
ser_id INTEGER NOT NULL,
ser_manifest VARCHAR(255) NOT NULL,
snapshot BYTEA NOT NULL,
tags TEXT ARRAY,
meta_ser_id INTEGER,
meta_ser_manifest VARCHAR(255),
meta_payload BYTEA,

PRIMARY KEY(persistence_id)
);

-- `snapshot_slice_idx` is only needed if the slice based queries are used together with snapshot as starting point
CREATE INDEX IF NOT EXISTS snapshot_slice_idx ON snapshot(slice, entity_type, db_timestamp);

CREATE TABLE IF NOT EXISTS durable_state (
slice INT NOT NULL,
entity_type VARCHAR(255) NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
revision BIGINT NOT NULL,
db_timestamp timestamp with time zone NOT NULL,

state_ser_id INTEGER NOT NULL,
state_ser_manifest VARCHAR(255),
state_payload BYTEA NOT NULL,
tags TEXT ARRAY,

PRIMARY KEY(persistence_id, revision)
);

-- `durable_state_slice_idx` is only needed if the slice based queries are used
CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice, entity_type, db_timestamp, revision);

-- Primitive offset types are stored in this table.
-- If only timestamp based offsets are used this table is optional.
-- Configure akka.projection.r2dbc.offset-store.offset-table="" if the table is not created.
CREATE TABLE IF NOT EXISTS akka_projection_offset_store (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
current_offset VARCHAR(255) NOT NULL,
manifest VARCHAR(32) NOT NULL,
mergeable BOOLEAN NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);

-- Timestamp based offsets are stored in this table.
CREATE TABLE IF NOT EXISTS akka_projection_timestamp_offset_store (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
slice INT NOT NULL,
persistence_id VARCHAR(255) NOT NULL,
seq_nr BIGINT NOT NULL,
-- timestamp_offset is the db_timestamp of the original event
timestamp_offset timestamp with time zone NOT NULL,
-- timestamp_consumed is when the offset was stored
-- the consumer lag is timestamp_consumed - timestamp_offset
timestamp_consumed timestamp with time zone NOT NULL,
PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
);

CREATE TABLE IF NOT EXISTS akka_projection_management (
projection_name VARCHAR(255) NOT NULL,
projection_key VARCHAR(255) NOT NULL,
paused BOOLEAN NOT NULL,
last_updated BIGINT NOT NULL,
PRIMARY KEY(projection_name, projection_key)
);
17 changes: 17 additions & 0 deletions samples/grpc/local-drone-control-scala/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Only needed if running multi-node clustered version of the app with a separate standalone PostgreSQL server locally
version: '2.2'
services:
postgres-db:
image: postgres:latest
container_name: local_drone_control_db
ports:
- 5433:5432
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
healthcheck:
test: ['CMD', 'pg_isready', "-q", "-d", "postgres", "-U", "postgres"]
interval: 5s
retries: 5
start_period: 5s
timeout: 5s
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ include "grpc"
include "persistence"
include "cluster"

# Default config, used for running a single-node cluster that cannot scale out to many nodes, using H2 for
# persistence, started through local.drones.Main

akka {
loglevel = DEBUG
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Used for running the local-drone-control as a cluster with several nodes
# through the local.drones.ClusteredMain and a separate PostgreSQL database

akka.persistence.r2dbc.connection-factory = ${akka.persistence.r2dbc.postgres}
akka {
persistence {
journal {
plugin = "akka.persistence.r2dbc.journal"
}
snapshot-store {
plugin = "akka.persistence.r2dbc.snapshot"
}
state {
plugin = "akka.persistence.r2dbc.state"
}

r2dbc {
connection-factory {
host = "localhost"
host = ${?DB_HOST}
# note: different port for running in parallel with db for restaurant-drone-deliveries-service
port = 5433
database = "postgres"
user = "postgres"
user = ${?DB_USER}
password = "postgres"
password = ${?DB_PASSWORD}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Used for running the local-drone-control as a cluster with several nodes
# through the local.drones.ClusteredMain and a separate PostgreSQL database
include "cluster"
include "grpc"
include "local-persistence.conf"

local-drone-control.grpc.interface = "127.0.0.1"
akka.remote.artery.canonical.hostname = "127.0.0.1"
akka.management.http.hostname = "127.0.0.1"

local-drone-control {
nr-of-event-producers = 4
# unique identifier for the instance of local control, must be known up front by the cloud service
location-id = "sweden/stockholm/kungsholmen"
location-id = ${?LOCATION_ID}

ask-timeout = 3s
}

akka.management {
http {
port = 9201
port = ${?HTTP_MGMT_PORT}
}
cluster.bootstrap {
contact-point-discovery {
service-name = "local-drone-control"
discovery-method = kubernetes-api
required-contact-point-nr = 1
required-contact-point-nr = ${?REQUIRED_CONTACT_POINT_NR}
}
}
}

akka.management.cluster.bootstrap.contact-point-discovery {
service-name = "local-drone-control"
discovery-method = config
# boostrap filters ports with the same IP assuming they are previous instances running on the same node
# unless a port is specified
port-name = "management"
required-contact-point-nr = 1
# config service discovery never changes
stable-margin = 1 ms
# bootstrap without all the nodes being up
contact-with-all-contact-points = false
}

akka.discovery.config.services {
"local-drone-control" {
endpoints = [
{host = "127.0.0.1", port = 9201}
{host = "127.0.0.1", port = 9202}
{host = "127.0.0.1", port = 9203}
]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Used for running the local-drone-control as a cluster with several nodes
# through the local.drones.ClusteredMain and a separate PostgreSQL database
include "local-shared"


local-drone-control.grpc.port = 8080

akka.remote.artery.canonical.port = 2651
akka.management.http.port = 9201

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Used for running the local-drone-control as a cluster with several nodes
# through the local.drones.ClusteredMain and a separate PostgreSQL database
include "local-shared"


local-drone-control.grpc.port = 8081

akka.management.http.port = 9202
akka.remote.artery.canonical.port = 2652
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Used for running the local-drone-control as a cluster with several nodes
# through the local.drones.ClusteredMain and a separate PostgreSQL database
include "local-shared"


local-drone-control.grpc.port = 8082

akka.remote.artery.canonical.port = 2653
akka.management.http.port = 9203
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package local.drones

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorSystem, Behavior }
import akka.cluster.typed.{ ClusterSingleton, SingletonActor }
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.scaladsl.AkkaManagement

/**
* Main for starting the local-drone-control as a cluster rather than a single self-contained node. Requires
* a separate database, start with config from local{1,2,3}.conf files for running as cluster locally.
*/
object ClusteredMain {

def main(args: Array[String]): Unit = {
ActorSystem[Nothing](rootBehavior(), "local-drone-control")
}

private def rootBehavior(): Behavior[Nothing] = Behaviors.setup[Nothing] {
context =>
val settings = Settings(context.system)

context.log
.info("Local Drone Control [{}] starting up", settings.locationId)

// Bootstrap cluster
AkkaManagement(context.system).start()
ClusterBootstrap(context.system).start()

// keep track of local drones, project aggregate info to the cloud
Drone.init(context.system)
DroneEvents.initEventToCloudDaemonProcess(settings)(context.system)

// consume delivery events from the cloud service, single queue in cluster singleton
val deliveriesQueue =
ClusterSingleton(context.system).init(
SingletonActor[DeliveriesQueue.Command](
DeliveriesQueue(),
"DeliveriesQueue"))

// single queue, single grpc projection consumer
ClusterSingleton(context.system).init(
SingletonActor(
DeliveryEvents.projectionBehavior(deliveriesQueue, settings)(
context.system),
"DeliveriesProjection"))
val deliveriesQueueService =
new DeliveriesQueueServiceImpl(settings, deliveriesQueue)(
context.system)

val grpcInterface =
context.system.settings.config
.getString("local-drone-control.grpc.interface")
val grpcPort =
context.system.settings.config.getInt("local-drone-control.grpc.port")
val droneService =
new DroneServiceImpl(deliveriesQueue, settings)(context.system)
LocalDroneControlServer.start(
grpcInterface,
grpcPort,
context.system,
droneService,
deliveriesQueueService)

Behaviors.empty
}

}
Loading

0 comments on commit 975f41c

Please sign in to comment.