Skip to content

Commit

Permalink
feat: Low level transform with metadata for Projection gRPC
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Dec 20, 2022
1 parent 8583f12 commit 991cb41
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory

import scala.annotation.nowarn
import scala.util.Success

/**
* INTERNAL API
Expand Down Expand Up @@ -201,14 +202,15 @@ import scala.annotation.nowarn
env.eventOption match {
case Some(event) =>
import system.executionContext
val f = transformation.mappers
.getOrElse(event.getClass, transformation.orElse)

f(event).map {
_.map { transformedEvent =>
val protoEvent = protoAnySerialization.serialize(transformedEvent)
Event(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)), Some(protoEvent))
}
val mappedFuture: Future[Option[Any]] = transformation(event, env.eventMetadata)
def toEvent(transformedEvent: Any): Event = {
val protoEvent = protoAnySerialization.serialize(transformedEvent)
Event(env.persistenceId, env.sequenceNr, env.slice, Some(protoOffset(env)), Some(protoEvent))
}
mappedFuture.value match {
case Some(Success(Some(transformedEvent))) => Future.successful(Some(toEvent(transformedEvent)))
case Some(Success(None)) => Future.successful(None)
case _ => mappedFuture.map(_.map(toEvent))
}

case None =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ import scala.reflect.ClassTag
import akka.dispatch.ExecutionContexts
import akka.projection.grpc.producer.scaladsl

@ApiMayChange
@FunctionalInterface
trait Mapper[A, B] {
def apply(event: A, metadata: Optional[Any]): CompletionStage[Optional[B]]

}

@ApiMayChange
object Transformation {
val empty: Transformation = new Transformation(scaladsl.EventProducer.Transformation.empty)
Expand Down Expand Up @@ -45,6 +52,12 @@ final class Transformation private (private[akka] val delegate: scaladsl.EventPr
new Transformation(delegate.registerMapper[A, B](event => f.apply(event).asScala))
}

def registerLowLevelMapper[A, B](inputEventClass: Class[A], mapper: Mapper[A, B]): Transformation = {
implicit val ct: ClassTag[A] = ClassTag(inputEventClass)
new Transformation(delegate.registerLowLevelMapper[A, B]((event: Any, meta: Option[Any]) =>
mapper.apply(event.asInstanceOf[A], meta.asJava).toScala.map(_.asScala)(ExecutionContexts.parasitic)))
}

def registerAsyncOrElseMapper(f: AnyRef => CompletionStage[Optional[AnyRef]]): Transformation = {
new Transformation(
delegate.registerAsyncOrElseMapper(
Expand All @@ -57,4 +70,9 @@ final class Transformation private (private[akka] val delegate: scaladsl.EventPr
def registerOrElseMapper(f: AnyRef => Optional[AnyRef]): Transformation = {
new Transformation(delegate.registerOrElseMapper(event => f.apply(event.asInstanceOf[AnyRef]).asScala))
}

def registerLowLevelOrElseMapper(mapper: Mapper[Any, Any]): Transformation = {
new Transformation(delegate.registerLowLevelOrElseMapper((event: Any, meta: Option[Any]) =>
mapper.apply(event, meta.asJava).toScala.map(_.asScala)(ExecutionContexts.parasitic)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,22 @@ object EventProducer {

@ApiMayChange
object Transformation {

@ApiMayChange
trait Mapper[A, B] {
def apply(event: A, metadata: Option[Any]): Future[Option[B]]
}

val empty: Transformation = new Transformation(
mappers = Map.empty,
orElse = event =>
orElse = (event, _) =>
Future.failed(new IllegalArgumentException(s"Missing transformation for event [${event.getClass}]")))

/**
* No transformation. Pass through each event as is.
*/
val identity: Transformation =
new Transformation(mappers = Map.empty, orElse = event => Future.successful(Option(event)))
new Transformation(mappers = Map.empty, orElse = (event, _) => Future.successful(Option(event)))
}

/**
Expand All @@ -61,25 +67,44 @@ object EventProducer {
*/
@ApiMayChange
final class Transformation private (
val mappers: Map[Class[_], Any => Future[Option[Any]]],
val orElse: Any => Future[Option[Any]]) {
private[akka] val mappers: Map[Class[_], Transformation.Mapper[Any, Any]],
private[akka] val orElse: Transformation.Mapper[Any, Any]) {
import Transformation._

/**
* @param f A function that is fed each event, and the possible additional metadata
*/
def registerLowLevelMapper[A: ClassTag, B](m: Mapper[A, B]): Transformation = {
val clazz = implicitly[ClassTag[A]].runtimeClass
new Transformation(mappers.updated(clazz, m.asInstanceOf[Mapper[Any, Any]]), orElse)
}

def registerAsyncMapper[A: ClassTag, B](f: A => Future[Option[B]]): Transformation = {
val clazz = implicitly[ClassTag[A]].runtimeClass
new Transformation(mappers.updated(clazz, f.asInstanceOf[Any => Future[Option[Any]]]), orElse)
new Transformation(mappers.updated(clazz, (event: Any, _) => f(event.asInstanceOf[A])), orElse)
}

def registerMapper[A: ClassTag, B](f: A => Option[B]): Transformation = {
registerAsyncMapper[A, B](event => Future.successful(f(event)))
}

def registerAsyncOrElseMapper(f: Any => Future[Option[Any]]): Transformation = {
new Transformation(mappers, f)
new Transformation(mappers, (event: Any, _) => f(event))
}

def registerOrElseMapper(f: Any => Option[Any]): Transformation = {
registerAsyncOrElseMapper(event => Future.successful(f(event)))
}

def registerLowLevelOrElseMapper(m: Mapper[Any, Any]): Transformation = {
new Transformation(mappers, m)
}

private[akka] def apply(event: Any, metadata: Option[Any]): Future[Option[Any]] = {
val mapper: Transformation.Mapper[Any, Any] = mappers.getOrElse(event.getClass, orElse)
mapper.apply(event, metadata)
}

}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.projection.grpc.producer.scaladsl

import akka.projection.grpc.producer.scaladsl.EventProducer.Transformation
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec

import scala.concurrent.Future

class TransformationSpec extends AnyWordSpec with Matchers with ScalaFutures {

"The gRPC event Transformation" should {

"transform simple" in {
val transformer = Transformation.empty.registerMapper((_: String) => Some("mapped"))
transformer("whatever", None).futureValue should ===(Some("mapped"))
}

"transform filter out simple" in {
val transformer = Transformation.empty.registerMapper((_: String) => None)
transformer("whatever", None).futureValue should ===(None)
}

"transform simple async" in {
val transformer = Transformation.empty.registerAsyncMapper((_: String) => Future.successful(Some("mapped")))
transformer("whatever", None).futureValue should ===(Some("mapped"))
}

"transform low level with metadata" in {
val transformer =
Transformation.empty.registerLowLevelMapper((_: String, meta) => Future.successful(meta))
transformer("whatever", Some("meta")).futureValue should ===(Some("meta"))
}

"fail by default if no transformer exist for event" in {
val transformer = Transformation.empty
transformer("whatever", Some("meta")).failed.futureValue
}

"fallback if no transformer exist for event" in {
val transformer = Transformation.empty.registerOrElseMapper(_ => Some("fallback"))
transformer("whatever", Some("meta")).futureValue should ===(Some("fallback"))
}

"fallback low level with metadata if no transformer exist for event" in {
val transformer = Transformation.empty.registerLowLevelOrElseMapper((_, meta) => Future.successful(meta))
transformer("whatever", Some("meta")).futureValue should ===(Some("meta"))
}
}

}

0 comments on commit 991cb41

Please sign in to comment.