Scala | 2.12.6/2.11.12 |
Akka | 2.5.13 |
Event Store | v4.0.0 and higher is supported |
We have two APIs available:
- Calling methods on
EsConnection
We are using scala.concurrent.Future
for asynchronous calls, however it is not friendly enough for Java users.
In order to make Java devs happy and not reinvent a wheel, we propose to use tools invented by Akka team.
Check it out
final EsConnection connection = EsConnectionFactory.create(system);
final Future<Event> future = connection.readEvent("my-stream", new EventNumber.Exact(0), false, null);
import system.dispatcher
val connection = EsConnection(system)
val future = connection apply ReadEvent(EventStream.Id("my-stream"), EventNumber.First)
- Sending messages to
eventstore.ConnectionActor
final ActorRef connection = system.actorOf(ConnectionActor.getProps());
final ReadEvent readEvent = new ReadEventBuilder("my-stream")
.first()
.build();
connection.tell(readEvent, null);
val connection = system.actorOf(ConnectionActor.props())
connection ! ReadEvent(EventStream.Id("my-stream"), EventNumber.First)
libraryDependencies += "com.geteventstore" %% "eventstore-client" % "5.0.2"
<dependency>
<groupId>com.geteventstore</groupId>
<artifactId>eventstore-client_${scala.version}</artifactId>
<version>5.0.0</version>
</dependency>
import akka.actor.*;
import akka.actor.Status.Failure;
import akka.event.*;
import eventstore.*;
import eventstore.j.*;
import eventstore.tcp.ConnectionActor;
import java.net.InetSocketAddress;
public class ReadEventExample {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create();
final Settings settings = new SettingsBuilder()
.address(new InetSocketAddress("127.0.0.1", 1113))
.defaultCredentials("admin", "changeit")
.build();
final ActorRef connection = system.actorOf(ConnectionActor.getProps(settings));
final ActorRef readResult = system.actorOf(Props.create(ReadResult.class));
final ReadEvent readEvent = new ReadEventBuilder("my-stream")
.first()
.resolveLinkTos(false)
.requireMaster(true)
.build();
connection.tell(readEvent, readResult);
}
public static class ReadResult extends UntypedActor {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) throws Exception {
if (message instanceof ReadEventCompleted) {
final ReadEventCompleted completed = (ReadEventCompleted) message;
final Event event = completed.event();
log.info("event: {}", event);
} else if (message instanceof Failure) {
final Failure failure = ((Failure) message);
final EsException exception = (EsException) failure.cause();
log.error(exception, exception.toString());
} else
unhandled(message);
context().system().shutdown();
}
}
}
import akka.actor.*;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import eventstore.*;
import eventstore.j.EventDataBuilder;
import eventstore.j.WriteEventsBuilder;
import eventstore.tcp.ConnectionActor;
import java.util.UUID;
public class WriteEventExample {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create();
final ActorRef connection = system.actorOf(ConnectionActor.getProps());
final ActorRef writeResult = system.actorOf(Props.create(WriteResult.class));
final EventData event = new EventDataBuilder("my-event")
.eventId(UUID.randomUUID())
.data("my event data")
.metadata("my first event")
.build();
final WriteEvents writeEvents = new WriteEventsBuilder("my-stream")
.addEvent(event)
.expectAnyVersion()
.build();
connection.tell(writeEvents, writeResult);
}
public static class WriteResult extends UntypedActor {
final LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public void onReceive(Object message) throws Exception {
if (message instanceof WriteEventsCompleted) {
final WriteEventsCompleted completed = (WriteEventsCompleted) message;
log.info("range: {}, position: {}", completed.numbersRange(), completed.position());
} else if (message instanceof Status.Failure) {
final Status.Failure failure = ((Status.Failure) message);
final EsException exception = (EsException) failure.cause();
log.error(exception, exception.toString());
} else
unhandled(message);
context().system().shutdown();
}
}
}
import akka.actor.ActorSystem;
import eventstore.IndexedEvent;
import eventstore.SubscriptionObserver;
import eventstore.j.EsConnection;
import eventstore.j.EsConnectionFactory;
import java.io.Closeable;
public class SubscribeToAllExample {
public static void main(String[] args) {
final ActorSystem system = ActorSystem.create();
final EsConnection connection = EsConnectionFactory.create(system);
final Closeable closeable = connection.subscribeToAll(new SubscriptionObserver<IndexedEvent>() {
@Override
public void onLiveProcessingStart(Closeable subscription) {
system.log().info("live processing started");
}
@Override
public void onEvent(IndexedEvent event, Closeable subscription) {
system.log().info(event.toString());
}
@Override
public void onError(Throwable e) {
system.log().error(e.toString());
}
@Override
public void onClose() {
system.log().error("subscription closed");
}
}, false, null);
}
}
final EventData empty = new EventDataBuilder("empty").build();
final EventData binary = new EventDataBuilder("binary")
.eventId(UUID.randomUUID())
.data(new byte[]{1, 2, 3, 4})
.metadata(new byte[]{5, 6, 7, 8})
.build();
final EventData string = new EventDataBuilder("string")
.eventId(UUID.randomUUID())
.data("data")
.metadata("metadata")
.build();
final EventData json = new EventDataBuilder("json")
.eventId(UUID.randomUUID())
.jsonData("{\"data\":\"data\"}")
.jsonMetadata("{\"metadata\":\"metadata\"}")
.build();
import akka.actor.Status.Failure
import akka.actor._
import eventstore._
import eventstore.tcp.ConnectionActor
import java.net.InetSocketAddress
object ReadEventExample extends App {
val system = ActorSystem()
val settings = Settings(
address = new InetSocketAddress("127.0.0.1", 1113),
defaultCredentials = Some(UserCredentials("admin", "changeit")))
val connection = system.actorOf(ConnectionActor.props(settings))
implicit val readResult = system.actorOf(Props[ReadResult])
connection ! ReadEvent(EventStream.Id("my-stream"), EventNumber.First)
class ReadResult extends Actor with ActorLogging {
def receive = {
case ReadEventCompleted(event) =>
log.info("event: {}", event)
context.system.terminate()
case Failure(e: EsException) =>
log.error(e.toString)
context.system.terminate()
}
}
}
import akka.actor.Status.Failure
import akka.actor.{ ActorLogging, Actor, Props, ActorSystem }
import eventstore._
import eventstore.tcp.ConnectionActor
object WriteEventExample extends App {
val system = ActorSystem()
val connection = system.actorOf(ConnectionActor.props())
implicit val writeResult = system.actorOf(Props[WriteResult])
val event = EventData("my-event", data = Content("my event data"), metadata = Content("my first event"))
connection ! WriteEvents(EventStream.Id("my-stream"), List(event))
class WriteResult extends Actor with ActorLogging {
def receive = {
case WriteEventsCompleted(range, position) =>
log.info("range: {}, position: {}", range, position)
context.system.terminate()
case Failure(e: EsException) =>
log.error(e.toString)
context.system.terminate()
}
}
}
import akka.actor.ActorSystem
import eventstore.TransactionActor._
import eventstore.tcp.ConnectionActor
import eventstore.{ EventData, TransactionActor, EventStream, TransactionStart }
object StartTransactionExample extends App {
val system = ActorSystem()
val connection = system.actorOf(ConnectionActor.props())
val kickoff = Start(TransactionStart(EventStream.Id("my-stream")))
val transaction = system.actorOf(TransactionActor.props(connection, kickoff))
transaction ! GetTransactionId // replies with `TransactionId(transactionId)`
transaction ! Write(EventData("transaction-event")) // replies with `WriteCompleted`
transaction ! Write(EventData("transaction-event")) // replies with `WriteCompleted`
transaction ! Write(EventData("transaction-event")) // replies with `WriteCompleted`
transaction ! Commit // replies with `CommitCompleted`
}
import akka.actor._
import eventstore.LiveProcessingStarted
import eventstore.tcp.ConnectionActor
import eventstore.{ IndexedEvent, Settings, SubscriptionActor }
import scala.concurrent.duration._
object CountAll extends App {
val system = ActorSystem()
val connection = system.actorOf(ConnectionActor.props(), "connection")
val countAll = system.actorOf(Props[CountAll], "count-all")
system.actorOf(SubscriptionActor.props(connection, countAll, None, None, Settings.Default), "subscription")
}
class CountAll extends Actor with ActorLogging {
context.setReceiveTimeout(1.second)
def receive = count(0)
def count(n: Long, printed: Boolean = false): Receive = {
case x: IndexedEvent => context become count(n + 1)
case LiveProcessingStarted => log.info("live processing started")
case ReceiveTimeout if !printed =>
log.info("count {}", n)
context become count(n, printed = true)
}
}
import akka.actor.ActorSystem
import scala.concurrent.Future
import eventstore._
object EsConnectionExample extends App {
val system = ActorSystem()
import system.dispatcher
val connection = EsConnection(system)
val log = system.log
val stream = EventStream.Id("my-stream")
val readEvent: Future[ReadEventCompleted] = connection.apply(ReadEvent(stream))
readEvent foreach { x =>
log.info(x.event.toString)
}
val readStreamEvents: Future[ReadStreamEventsCompleted] = connection.apply(ReadStreamEvents(stream))
readStreamEvents foreach { x =>
log.info(x.events.toString())
}
val readAllEvents: Future[ReadAllEventsCompleted] = connection.apply(ReadAllEvents(maxCount = 5))
readAllEvents foreach { x =>
log.info(x.events.toString())
}
val writeEvents: Future[WriteEventsCompleted] = connection.apply(WriteEvents(stream, List(EventData("my-event"))))
writeEvents foreach { x =>
log.info(x.numbersRange.toString)
}
}
Most common use case is to have a single Event Store connection per application. Thus you can use our akka extension, it will make sure you have a single instance of connection actor.
EventStoreExtension(system).actor ! ReadEvent(EventStream.Id("stream"))
EventStoreExtension(system).connection(ReadEvent(EventStream.Id("stream")))
The client provides Akka Streams interface for EventStore subscriptions.
You can find two methods allStreamsSource
and streamSource
available in Java and Scala APIs.
Here is a short example on how to use it:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import eventstore.{ EventStoreExtension, EventStream }
object ListAllStreamsExample extends App {
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = ActorMaterializer()
val connection = EventStoreExtension(system).connection
val source = connection.streamSource(EventStream.System.`$streams`, infinite = false, resolveLinkTos = true)
source
.runForeach { x => println(x.streamId.streamId) }
.onComplete { _ => system.terminate() }
}
You can use generic Reactive Streams Publisher
interface for EventStore subscriptions,
by converting an Akka Stream to Publisher. See: Integrating Akka Streams with Reactive Streams
Here is a short example on how to accomplish that:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import eventstore.EventStoreExtension
import org.reactivestreams.{Publisher, Subscriber}
import scala.concurrent.duration._
object MessagesPerSecondReactiveStreams extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val connection = EventStoreExtension(system).connection
val publisher: Publisher[String] = connection.allStreamsSource()
.groupedWithin(Int.MaxValue, 1.second)
.map { xs => f"${xs.size.toDouble / 1000}%2.1fk m/s" }
.runWith(Sink.asPublisher(fanout = false))
val subscriber: Subscriber[String] = Source.asSubscriber[String]
.to(Sink.foreach(println))
.run()
publisher.subscribe(subscriber)
}
Default client settings defined in reference.conf
.
You can override them via own application.conf
put in the src/main/resources
, the same way you might already do for akka.
We are using the same approach - config.
It is possible to use client against cluster of Event Store.
For this you need to configure client via eventstore.cluster
section in reference.conf
or ClusterSettings
.
Using application.conf
for configuration is more preferable option.