Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

doc: Replicated event sourcing gRPC doc improvements #785

Merged
merged 2 commits into from
Jan 31, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -21,6 +21,8 @@
import akka.projection.ProjectionContext;
import akka.projection.ProjectionId;
import akka.projection.grpc.producer.EventProducerSettings;
import akka.projection.grpc.producer.javadsl.EventProducer;
import akka.projection.grpc.producer.javadsl.EventProducerSource;
import akka.projection.grpc.replication.javadsl.Replica;
import akka.projection.grpc.replication.javadsl.ReplicatedBehaviors;
import akka.projection.grpc.replication.javadsl.Replication;
@@ -33,21 +35,26 @@
import akka.stream.javadsl.FlowWithContext;

import java.time.Duration;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionStage;

public class ReplicationCompileTest {
interface MyCommand {
}

static class MyReplicatedBehavior {

interface MyCommand {}

static Behavior<MyCommand> create(
ReplicatedBehaviors<MyCommand, Void, Void> replicatedBehaviors) {
return replicatedBehaviors.setup(
replicationContext -> {
throw new UnsupportedOperationException("just a dummy factory method");
});
static Behavior<MyCommand> create(
ReplicatedBehaviors<MyCommand, Void, Void> replicatedBehaviors) {
return replicatedBehaviors.setup(
replicationContext -> {
throw new UnsupportedOperationException("just a dummy factory method");
});
}
}

public static void start(ActorSystem<?> system) {
@@ -86,7 +93,7 @@ public AtLeastOnceFlowProjection<Offset, EventEnvelope<Object>> create(Projectio
8,
projectionProvider).configureEntity(entity -> entity.withRole("entities"));

Replication<MyCommand> replication = Replication.grpcReplication(settings, ReplicationCompileTest::create, system);
Replication<MyCommand> replication = Replication.grpcReplication(settings, MyReplicatedBehavior::create, system);

// bind a single handler endpoint
Function<HttpRequest, CompletionStage<HttpResponse>> handler = replication.createSingleServiceHandler();
@@ -99,4 +106,37 @@ public AtLeastOnceFlowProjection<Offset, EventEnvelope<Object>> create(Projectio
Http.get(system).newServerAt("127.0.0.1", 8080).bind(service);

}

static class ShoppingCart {
static Replication<MyCommand> init(ActorSystem<?> system) {
throw new UnsupportedOperationException("Just a sample");
}
}

public static void multiEventProducers(ActorSystem<?> system, ReplicationSettings<MyCommand> settings, String host, int port) {

Replication<Void> otherReplication = null;

// #multi-service
Set<EventProducerSource> allSources = new HashSet<>();

Replication<MyCommand> replication = ShoppingCart.init(system);
allSources.add(replication.eventProducerService());

// add additional EventProducerSource from other entities or
// Akka Projection gRPC
allSources.add(otherReplication.eventProducerService());

Function<HttpRequest, CompletionStage<HttpResponse>> route =
EventProducer.grpcServiceHandler(system, allSources);

@SuppressWarnings("unchecked")
Function<HttpRequest, CompletionStage<HttpResponse>> handler =
ServiceHandler.concatOrNotFound(route);
// #multi-service

CompletionStage<ServerBinding> bound =
Http.get(system).newServerAt(host, port).bind(handler);

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.grpc.replication.scaladsl

import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.grpc.scaladsl.ServiceHandler
import akka.http.scaladsl.Http
import akka.projection.grpc.producer.scaladsl.EventProducer
import akka.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource

object ProducerApiSample {

trait MyCommand
object MyReplicatedBehavior {
def apply(r: ReplicatedBehaviors[MyCommand, String, String]): Behavior[MyCommand] = ???
}

def otherReplication: Replication[Unit] = ???
def multiEventProducers(settings: ReplicationSettings[MyCommand], host: String, port: Int)(
implicit system: ActorSystem[_]): Unit = {
// #multi-service
val replication: Replication[MyCommand] =
Replication.grpcReplication(settings)(MyReplicatedBehavior.apply)

val allSources: Set[EventProducerSource] = {
Set(
replication.eventProducerService,
// producers from other replicated entities or gRPC projections
otherReplication.eventProducerService)
}
val route = EventProducer.grpcServiceHandler(allSources)

val handler = ServiceHandler.concatOrNotFound(route)
// #multi-service

val _ = Http(system).newServerAt(host, port).bind(handler)
}

}
Original file line number Diff line number Diff line change
@@ -149,7 +149,12 @@ When multiple producers exist, all instances of @apidoc[akka.projection.grpc.pro
be passed at once to `EventProducer.grpcServiceHandler` to create a single producer service handling each of the event
streams.

FIXME sample snippet
Scala
: @@snip [ProducerApiSample.scala](/akka-projection-grpc/src/test/scala/akka/projection/grpc/replication/scaladsl/ProducerApiSample.scala) { #multi-service }

Java
: @@snip [ReplicationCompileTest.java](/akka-projection-grpc/src/test/java/akka/projection/grpc/replication/javdsl/ReplicationCompileTest.java) { #multi-service }


The Akka HTTP server must be running with HTTP/2, this is done through config:

@@ -168,8 +173,9 @@ Note that having separate replicas increases the risk that two different seriali
are running at the same time, so extra care must be taken when changing the events and their serialization and deploying
new versions of the application to the replicas.

FIXME something more here - serialization can fail and stop the replication, but it could also silently lose data in new fields
before the consuming side has a new version.
For some scenarios it may be necessary to do a two-step deploy of format changes to not lose data, first deploy support
for a new serialization format so that all replicas can deserialize it, then a second deploy where the new field is actually
populated with data.

## Sample projects