EventStoreDB is an open-source state-transition database, designed for businesses that are ready to harness the true power of event-driven architecture. It is a purpose-built database for event-driven applications, with a focus on high performance, scalability, and reliability.
Add the following to your build.sbt
file:
libraryDependencies ++= Seq("io.github.lapsushq" %% "dolphin-core" % "0.0-`Latest Commit Hash`-SNAPSHOT", "io.github.lapsushq" %% "dolphin-circe" % "0.0-`Latest Commit Hash`-SNAPSHOT")
EventStoreDB distinguishes between a normal session and a persistent session. A normal session is a volatile session, which means that reads operate on the disk without the possibility of acknowledging. A persistent session, in turn, is a session that reads from the disk and provides a mechanism to acknowledge the read, in turn, you can not write with this type of subscription. This means that a persistent session could perform slower than the normal session.
import dolphin.*
import cats.effect.{IO, IOApp}
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import fs2.Stream
object Main extends IOApp.Simple {
implicit val logger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]
override def run: IO[Unit] =
(for {
session <- VolatileSession.stream[IO](Config.Default)
_ <- Stream.eval(
session.appendToStream(
"ShoppingCart",
"""{"id": "9b188885-04a8-4ae0-b8a4-74a82c17d2ec", "value": 1}""".getBytes,
Array.emptyByteArray,
"Counter"
)
)
} yield ())
.compile
.drain
}
import dolphin.*
import dolphin.setting.ReadFromStreamSettings
import cats.effect.{IO, IOApp}
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import fs2.Stream
object Main extends IOApp.Simple {
implicit val logger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]
override def run: IO[Unit] =
(for {
session <- VolatileSession.stream[IO](Config.Default)
read <- Stream.eval(session.readStream("ShoppingCart", ReadFromStreamSettings.Default))
data <- read.getEventData
_ <- Stream.eval(IO.println(new String(data))) // {"id": "9b188885-04a8-4ae0-b8a4-74a82c17d2ec", "value": 1}
} yield ())
.compile
.drain
}
EventStoreDB provides a mechanism to subscribe to a stream. This means that the client can subscribe to a stream and receive all the events that are appended to the stream. The client can also acknowledge the events that are received (if created with a persistent session).
There are two ways to subscribe to a stream. The first way is to use the subscribeToStream
method on the session. This
will return a Stream
of Message
objects. The second way is to use the subscribeToStream
method on the session and
provide a MessageHandler
. This will return a Resource
of Unit
.
- With
subscribeToStream
ofStream
:
import dolphin.*
import cats.effect.{IO, IOApp}
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import fs2.Stream
import java.util.UUID
import scala.concurrent.duration.*
object Main extends IOApp.Simple {
implicit val logger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]
private def program: Stream[IO, Unit] =
for {
session <- VolatileSession.stream[IO](Config.Default)
_ <- Stream
.iterateEval(UUID.randomUUID())(_ => IO(UUID.randomUUID()))
.evalMap { uuid =>
session
.appendToStream(
"ShoppingCart",
s"""{"id": "${uuid}", "value": 1}""".getBytes,
Array.emptyByteArray,
"Counter"
)
}
.metered(10.seconds)
.concurrently {
session.subscribeToStream("ShoppingCart").evalMap {
case Message.Event(_, event, _) => logger.info(new String(event.getEventData))
case Message.Cancelled(_, error) => logger.info(s"Received cancellation error: ${error}")
case Message.Confirmation(_) => logger.info("Received confirmation")
}
}
} yield ()
override def run: IO[Unit] = program.compile.drain
}
- With
subscribeToStream
ofMessageHandler
(i.e.Message[F, VolatileConsumer[F]] => F[Unit]
):
import dolphin.*
import cats.effect.{IO, IOApp}
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
object Main extends IOApp.Simple {
implicit val logger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]
private val handlers: Message[IO, VolatileConsumer[IO]] => IO[Unit] = {
case Message.Event(consumer, event, retryCount) =>
logger.info(s"Received event: $event")
case Message.Cancelled(consumer, error) =>
logger.info(s"Received cancellation")
case Message.Confirmation(consumer) =>
logger.info(s"Received confirmation")
}
override def run: IO[Unit] =
(for {
session <- VolatileSession.resource[IO](Config.Default)
_ <- session.subscribeToStream("ShoppingCart", handlers)
} yield ()).useForever
}
Projections are a way to transform the data in a stream. EventStoreDB provides a mechanism to create and manage projections.
import dolphin.*
import cats.effect.{IO, IOApp, Resource}
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import com.fasterxml.jackson.annotation.JsonProperty
object Main extends IOApp.Simple {
implicit val logger: SelfAwareStructuredLogger[IO] = Slf4jLogger.getLogger[IO]
private val GET_TOTAL_ON_A_SHOPPING_BASKET = """fromStream('Something').
when({
"$init": function() {
return {
state: {
id: "",
value: 0
}
}
},
Counter: function(s, e) {
if (e.data.id)
s.state.id = e.data.id;
if (e.data.value)
s.state.value = s.state.value + e.data.value;
}
}).outputState();"""
// JsonProperty is super important as it is how EventStore will deserialize internally
final case class ShoppingBasket(@JsonProperty("id") id: String, @JsonProperty("value") value: Int)
// You need to provide an empty instance of the state
object ShoppingBasket {
val Empty = ShoppingBasket("", 0)
}
/* EventStoreDB will use `getState` and `setState` to manipulate the state. There is no need to implement them
* yourself. But if you need to do so, you can do it by implementing the `Stateful` trait and overriding the `getState`
* method. To override the `setState` method, you need to implement the `WithSetter` trait and override the `setState`.
*
* You can distinguish between two `states`:
*
* - `ServerState` is the state that EventStoreDB will provide to you, in this case, the one given by the projection.
* - `ClientState` is the state that you will receive which can be manipulated by you by overriding the `setState` method.
*/
final case class Counter() extends Stateful[ShoppingBasket] {
def init = ShoppingBasket.Empty
}
def program: Resource[IO, Unit] =
for {
session <- ProjectionManager.resource[IO](Config.Default)
_ <- Resource.eval(session.create("Cart", GET_TOTAL_ON_A_SHOPPING_BASKET)).attempt
state <- Resource.eval(session.getState("Cart", classOf[Counter]))
_ <- Resource.eval(logger.info(state.getState.toString()))
} yield ()
override def run: IO[Unit] = program.use(_ => IO.never)
}
Go to Roadmap for further information.
- This project is not affiliated with EventStoreDB. For further information about EventStoreDB, please visit EventStoreDB.
- For further information about the Java client, please visit EventStoreDB Java Client.
- There's a lot to change/improve, please feel free to open an issue if you have any questions or suggestions, or if you find any bugs.
- For further information on usage and examples, please visit Dolphin Integration Tests.