-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathDataBot.scala
94 lines (75 loc) · 2.67 KB
/
DataBot.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package akka.contrib.datareplication.sample
import scala.concurrent.duration._
import akka.actor.ActorLogging
import akka.contrib.datareplication.protobuf.msg.ReplicatorMessages.GetSuccess
import akka.contrib.datareplication.DataReplication
import akka.cluster.Cluster
import akka.contrib.datareplication.Replicator
import akka.actor.Actor
import akka.contrib.datareplication.ORSet
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.ActorSystem
import com.typesafe.config.ConfigFactory
import akka.actor.Props
object DataBot {
def main(args: Array[String]): Unit = {
if (args.isEmpty)
startup(Seq("2551", "2552", "0"))
else
startup(args)
}
def startup(ports: Seq[String]): Unit = {
ports foreach { port =>
// Override the configuration of the port
val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
withFallback(ConfigFactory.load(
ConfigFactory.parseString("""
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote {
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
akka.cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
""")))
// Create an Akka system
val system = ActorSystem("ClusterSystem", config)
// Create an actor that handles cluster domain events
system.actorOf(Props[DataBot], name = "dataBot")
}
}
private case object Tick
}
// This sample is used in the README.md (remember to copy when it is changed)
class DataBot extends Actor with ActorLogging {
import DataBot._
import Replicator._
val replicator = DataReplication(context.system).replicator
implicit val cluster = Cluster(context.system)
import context.dispatcher
val tickTask = context.system.scheduler.schedule(5.seconds, 5.seconds, self, Tick)
replicator ! Subscribe("key", self)
def receive = {
case Tick =>
val s = ThreadLocalRandom.current().nextInt(97, 123).toChar.toString
if (ThreadLocalRandom.current().nextBoolean()) {
// add
log.info("Adding: {}", s)
replicator ! Update("key", ORSet())(_ + s)
} else {
// remove
log.info("Removing: {}", s)
replicator ! Update("key", ORSet())(_ - s)
}
case _: UpdateResponse => // ignore
case Changed("key", data: ORSet) =>
log.info("Current elements: {}", data.value)
}
override def postStop(): Unit = tickTask.cancel()
}