From 5355f33ed4cd9fb39abcde2caa6a5235e71ac2ff Mon Sep 17 00:00:00 2001 From: To-om Date: Wed, 10 Jan 2018 13:23:08 +0100 Subject: [PATCH] #41 Make StreamSrv cluster ready --- app/org/elastic4play/services/EventSrv.scala | 57 +++++++++++++++++--- build.sbt | 2 + version.sbt | 2 +- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/app/org/elastic4play/services/EventSrv.scala b/app/org/elastic4play/services/EventSrv.scala index 953048e..d7ce602 100644 --- a/app/org/elastic4play/services/EventSrv.scala +++ b/app/org/elastic4play/services/EventSrv.scala @@ -1,7 +1,7 @@ package org.elastic4play.services import java.util.Date -import javax.inject.Singleton +import javax.inject.{ Inject, Singleton } import scala.util.Try @@ -9,8 +9,10 @@ import play.api.Logger import play.api.libs.json.JsObject import play.api.mvc.{ RequestHeader, Result } -import akka.actor.{ ActorRef, actorRef2Scala } -import akka.event.{ ActorEventBus, SubchannelClassification } +import akka.actor.{ Actor, ActorRef, ActorSystem, Props, actorRef2Scala } +import akka.cluster.pubsub.DistributedPubSubMediator.{ Publish, Subscribe } +import akka.cluster.pubsub.DistributedPubSub +import akka.event.{ ActorEventBus, EventBus, SubchannelClassification } import akka.util.Subclassification import org.elastic4play.models.{ BaseEntity, HiveEnumeration } @@ -23,6 +25,7 @@ object AuditableAction extends Enumeration with HiveEnumeration { } case class RequestProcessStart(request: RequestHeader) extends EventMessage + case class RequestProcessEnd(request: RequestHeader, result: Try[Result]) extends EventMessage case class AuditOperation( @@ -32,9 +35,52 @@ case class AuditOperation( authContext: AuthContext, date: Date = new Date()) extends EventMessage +trait EventSrv { + def publish(event: EventMessage): Unit + def subscribe(subscriber: ActorRef, classifier: Class[_ <: EventMessage]): Boolean + def unsubscribe(subscriber: ActorRef, from: Class[_ <: EventMessage]): Boolean + def unsubscribe(subscriber: ActorRef): Unit +} + +class DistributedEventActor(localEventSrv: LocalEventSrv) extends Actor { + override def receive: Receive = { + case message: EventMessage ⇒ localEventSrv.publish(message) + } +} + +@Singleton +class DistributedEventSrv @Inject() ( + system: ActorSystem, + localEventSrv: LocalEventSrv) extends EventBus with EventSrv { + type Event = EventMessage + type Classifier = Class[_ <: EventMessage] + type Subscriber = ActorRef + + private val mediator = DistributedPubSub(system).mediator + + private val eventActor = system.actorOf(Props(classOf[DistributedEventActor], localEventSrv), "DistributedEventActor") + mediator ! Subscribe("stream", eventActor) + + def publish(event: Event): Unit = { + mediator ! Publish("stream", event) + } + + def subscribe(subscriber: Subscriber, classifier: Classifier): Boolean = { + localEventSrv.subscribe(subscriber, classifier) + } + + def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = { + localEventSrv.unsubscribe(subscriber, from) + } + + def unsubscribe(subscriber: Subscriber): Unit = { + localEventSrv.unsubscribe(subscriber) + } +} + @Singleton -class EventSrv extends ActorEventBus with SubchannelClassification { - private[EventSrv] lazy val logger = Logger(getClass) +class LocalEventSrv extends ActorEventBus with SubchannelClassification with EventSrv { + private[LocalEventSrv] lazy val logger = Logger(getClass) override type Classifier = Class[_ <: EventMessage] override type Event = EventMessage @@ -46,4 +92,3 @@ class EventSrv extends ActorEventBus with SubchannelClassification { def isSubclass(x: Classifier, y: Classifier): Boolean = y.isAssignableFrom(x) } } - diff --git a/build.sbt b/build.sbt index 7942930..5063e2a 100644 --- a/build.sbt +++ b/build.sbt @@ -24,6 +24,8 @@ libraryDependencies ++= Seq( "com.typesafe.akka" %% "akka-stream-testkit" % "2.5.6" % Test, "org.scalactic" %% "scalactic" % "3.0.4", "org.bouncycastle" % "bcprov-jdk15on" % "1.58", + "com.typesafe.akka" %% "akka-cluster" % "2.5.6", + "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.6", specs2 % Test ) diff --git a/version.sbt b/version.sbt index 275e341..3db635c 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version := "1.4.2-SNAPSHOT" +version := "1.5.0-SNAPSHOT"