Skip to content

Commit

Permalink
Revert "#41 Make StreamSrv cluster ready"
Browse files Browse the repository at this point in the history
This reverts commit 5355f33.
  • Loading branch information
To-om committed Jan 16, 2018
1 parent 5355f33 commit 9fff5dd
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 54 deletions.
57 changes: 6 additions & 51 deletions app/org/elastic4play/services/EventSrv.scala
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
package org.elastic4play.services

import java.util.Date
import javax.inject.{ Inject, Singleton }
import javax.inject.Singleton

import scala.util.Try

import play.api.Logger
import play.api.libs.json.JsObject
import play.api.mvc.{ RequestHeader, Result }

import akka.actor.{ Actor, ActorRef, ActorSystem, Props, actorRef2Scala }
import akka.cluster.pubsub.DistributedPubSubMediator.{ Publish, Subscribe }
import akka.cluster.pubsub.DistributedPubSub
import akka.event.{ ActorEventBus, EventBus, SubchannelClassification }
import akka.actor.{ ActorRef, actorRef2Scala }
import akka.event.{ ActorEventBus, SubchannelClassification }
import akka.util.Subclassification

import org.elastic4play.models.{ BaseEntity, HiveEnumeration }
Expand All @@ -25,7 +23,6 @@ object AuditableAction extends Enumeration with HiveEnumeration {
}

case class RequestProcessStart(request: RequestHeader) extends EventMessage

case class RequestProcessEnd(request: RequestHeader, result: Try[Result]) extends EventMessage

case class AuditOperation(
Expand All @@ -35,52 +32,9 @@ case class AuditOperation(
authContext: AuthContext,
date: Date = new Date()) extends EventMessage

trait EventSrv {
def publish(event: EventMessage): Unit
def subscribe(subscriber: ActorRef, classifier: Class[_ <: EventMessage]): Boolean
def unsubscribe(subscriber: ActorRef, from: Class[_ <: EventMessage]): Boolean
def unsubscribe(subscriber: ActorRef): Unit
}

class DistributedEventActor(localEventSrv: LocalEventSrv) extends Actor {
override def receive: Receive = {
case message: EventMessage localEventSrv.publish(message)
}
}

@Singleton
class DistributedEventSrv @Inject() (
system: ActorSystem,
localEventSrv: LocalEventSrv) extends EventBus with EventSrv {
type Event = EventMessage
type Classifier = Class[_ <: EventMessage]
type Subscriber = ActorRef

private val mediator = DistributedPubSub(system).mediator

private val eventActor = system.actorOf(Props(classOf[DistributedEventActor], localEventSrv), "DistributedEventActor")
mediator ! Subscribe("stream", eventActor)

def publish(event: Event): Unit = {
mediator ! Publish("stream", event)
}

def subscribe(subscriber: Subscriber, classifier: Classifier): Boolean = {
localEventSrv.subscribe(subscriber, classifier)
}

def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = {
localEventSrv.unsubscribe(subscriber, from)
}

def unsubscribe(subscriber: Subscriber): Unit = {
localEventSrv.unsubscribe(subscriber)
}
}

@Singleton
class LocalEventSrv extends ActorEventBus with SubchannelClassification with EventSrv {
private[LocalEventSrv] lazy val logger = Logger(getClass)
class EventSrv extends ActorEventBus with SubchannelClassification {
private[EventSrv] lazy val logger = Logger(getClass)
override type Classifier = Class[_ <: EventMessage]
override type Event = EventMessage

Expand All @@ -92,3 +46,4 @@ class LocalEventSrv extends ActorEventBus with SubchannelClassification with Eve
def isSubclass(x: Classifier, y: Classifier): Boolean = y.isAssignableFrom(x)
}
}

2 changes: 0 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-stream-testkit" % "2.5.6" % Test,
"org.scalactic" %% "scalactic" % "3.0.4",
"org.bouncycastle" % "bcprov-jdk15on" % "1.58",
"com.typesafe.akka" %% "akka-cluster" % "2.5.6",
"com.typesafe.akka" %% "akka-cluster-tools" % "2.5.6",
specs2 % Test
)

Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version := "1.5.0-SNAPSHOT"
version := "1.4.2-SNAPSHOT"

0 comments on commit 9fff5dd

Please sign in to comment.