Skip to content

Commit

Permalink
add pubsub endpoint to get show events.
Browse files Browse the repository at this point in the history
Events are pubsub topics:

`show.<showname>.reap`: when a fly is reaped
`show.<showname>.spawn`: when a fly is spawned
`show.<showname>.kill`: when a fly is killed
`show.<showname>.updated` : when show configuration is updated
`show.<showname>.stop`: when a show is stopped
`show.<showname>.start`: when a show is started

All events messages are in a json.

The client has been updated to provide a simple way to listen on the
events:

    $ circusctl listen tcp://127.0.0.1:5556
       show.refuge.spawn: {u'fly_id': 6, u'fly_pid': 72976, u'time': 1331681080.985104}
       show.refuge.spawn: {u'fly_id': 7, u'fly_pid': 72995, u'time': 1331681086.208542}
       show.refuge.spawn: {u'fly_id': 8, u'fly_pid': 73014, u'time': 1331681091.427005}
  • Loading branch information
benoitc committed Mar 13, 2012
1 parent 2af96d0 commit 374e5e0
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 19 deletions.
3 changes: 2 additions & 1 deletion circus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

def get_trainer(cmd, num_workers=1., timeout=1.0, check=1.,
warmup_delay=0., controller='tcp://127.0.0.1:5555',
pubsub_endpoint='tcp://127.0.0.1:5556',
shell=False, working_dir=None, uid=None, gid=None,
env=None, name=None):
from circus.show import Show
Expand All @@ -18,4 +19,4 @@ def get_trainer(cmd, num_workers=1., timeout=1.0, check=1.,
show = Show(name, cmd, num_workers, working_dir=working_dir,
warmup_delay=warmup_delay, shell=shell, uid=uid, gid=gid,
env=env)
return Trainer([show], controller)
return Trainer([show], controller, pubsub_endpoint)
74 changes: 61 additions & 13 deletions circus/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import errno
import json
import zmq
import sys
import signal
Expand Down Expand Up @@ -61,26 +62,73 @@ def call(self, cmd):
msg = socket.recv()
return msg

class CircusConsumer(object):
def __init__(self, topics, endpoint='tcp://127.0.0.1:5556', timeout=5.0):
self.context = zmq.Context()
self.socket = self.context.socket(zmq.SUB)
self.socket.connect(endpoint)
for topic in topics:
self.socket.setsockopt(zmq.SUBSCRIBE, topic)


def start(self):
try:
while True:
try:
topic, msg = self.socket.recv_multipart()
print ' %s: %s' % (topic, json.loads(msg))
except zmq.ZMQError as e:
raise CallError(str(e))

except KeyboardInterrupt:
pass

def stop(self):
try:
self.context.destroy(0)
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
pass
else:
raise

def main():
client = CircusClient(sys.argv[1])

cmd_parts = sys.argv[2:]
if sys.argv[1] == "listen":
if len(sys.argv) < 3:
sys.stderr.write("incorrect usage")
sys.exit(1)
else:
if len(sys.argv) > 3:
topics = sys.argv[2:]
else:
topics = ['']

if len(cmd_parts) >= 2:
cmd = " ".join(cmd_parts[:2]).lower() + " " + " ".join(cmd_parts[2:])
client = CircusConsumer(topics, sys.argv[2])
try:
client.start()
except CallError as e:
sys.stderr.write(str(e))
sys.exit(1)
sys.exit(0)
else:
cmd = cmd_parts[0].lower()
client = CircusClient(sys.argv[1])
cmd_parts = sys.argv[2:]

try:
print client.call(cmd.strip())
sys.exit(0)
except CallError as e:
print str(e)
sys.exit(1)
if len(cmd_parts) >= 2:
cmd = " ".join(cmd_parts[:2]).lower() + " " + " ".join(cmd_parts[2:])
else:
cmd = cmd_parts[0].lower()

finally:
client.stop()
try:
print client.call(cmd.strip())
sys.exit(0)
except CallError as e:
print str(e)
sys.exit(1)

finally:
client.stop()


if __name__ == '__main__':
Expand Down
7 changes: 4 additions & 3 deletions circus/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from circus.show import Show
from circus.pidfile import Pidfile
from circus import util
from circus.channel import StatChannel

MAXFD = 1024
if hasattr(os, "devnull"):
Expand Down Expand Up @@ -151,7 +150,7 @@ def main():
max_retry = cfg.dget(section, "max_retry", 5, int)
graceful_timeout = cfg.dget(section, "graceful_timeout", 30, int)

show = Show(name, cmd, num_flies=num_flies,
show = Show(name, cmd, numflies=num_flies,
warmup_delay=warmup_delay, working_dir=working_dir,
shell=shell, uid=uid, gid=gid, send_hup=send_hup,
times=times, within=within, retry_in=retry_in,
Expand All @@ -162,8 +161,10 @@ def main():
# main circus options
check = cfg.dget('circus', 'check_delay', 5, int)
endpoint = cfg.dget('circus', 'endpoint', 'tcp://127.0.0.1:5555')
pubsub_endpoint = cfg.dget('circus', 'pubsub_endpoint',
'tcp://127.0.0.1:5556')

trainer = Trainer(shows, endpoint, check)
trainer = Trainer(shows, endpoint, pubsub_endpoint, check)
try:
trainer.start()
finally:
Expand Down
21 changes: 20 additions & 1 deletion circus/show.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import errno
import json
import signal
import time

Expand All @@ -16,6 +17,8 @@ def __init__(self, name, cmd, numflies=1, warmup_delay=0.,
times=2, within=1., retry_in=7., max_retry=5,
graceful_timeout=30., prereload_fn=None):
self.name = name

self.res_name = name.lower().replace(" ", "_")
self.numflies = int(numflies)
self.warmup_delay = warmup_delay
self.cmd = cmd
Expand Down Expand Up @@ -45,21 +48,29 @@ def __init__(self, name, cmd, numflies=1, warmup_delay=0.,
self.gid = gid
self.env = env
self.send_hup = send_hup
self.pubsub_io = None

# define flapping object
self.flapping = Flapping(self, times, within, retry_in, max_retry)

def __len__(self):
return len(self.flies)

def send_msg(self, topic, msg):
multipart_msg = ["show.%s.%s" % (self.res_name, topic),
json.dumps(msg)]
self.pubsub_io.send_multipart(multipart_msg)

@util.debuglog
def reap_flies(self):
if self.stopped:
return

for wid, fly in self.flies.items():
if fly.poll() is not None:
self.flapping.notify()
self.send_msg("reap", {"fly_id": wid,
"fly_pid": fly.pid,
"time": time.time()})
if self.stopped:
break
self.flies.pop(wid)
Expand Down Expand Up @@ -113,11 +124,15 @@ def spawn_fly(self):
nb_tries += 1
continue
else:
self.send_msg("spawn", {"fly_id": fly.wid,
"fly_pid": fly.pid,
"time": time.time()})
return

self.stop()

def kill_fly(self, fly, sig=signal.SIGTERM):
self.send_msg("kill", {"fly_id": fly.wid, "time": time.time()})
logger.info("%s: kill fly %s" % (self.name, fly.pid))
fly.send_signal(sig)

Expand Down Expand Up @@ -179,9 +194,11 @@ def start(self):
self.reap_flies()
self.manage_flies()
logger.info('%s started' % self.name)
self.send_msg("start", {"time": time.time()})

@util.debuglog
def restart(self):
self.send_msg("restart", {"time": time.time()})
self.stop()
self.start()
logger.info('%s restarted' % self.name)
Expand All @@ -199,6 +216,7 @@ def reload(self):
for i in range(self.numflies):
self.spawn_fly()
self.manage_flies()
self.send_msg("reload", {"time": time.time()})

def set_opt(self, key, val):
""" set a show option
Expand Down Expand Up @@ -247,6 +265,7 @@ def set_opt(self, key, val):
elif key == "graceful_timeout":
self.graceful_timeout = float(val)
action = -1
self.send_msg("updated", {"time": time.time()})
return action

def do_action(self, num):
Expand Down
13 changes: 12 additions & 1 deletion circus/trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import sys
from threading import Lock
import time
from functools import wraps

import zmq
Expand All @@ -16,12 +17,16 @@

class Trainer(object):

def __init__(self, shows, endpoint, check_delay=1., prereload_fn=None):
def __init__(self, shows, endpoint, pubsub_endpoint, check_delay=1.,
prereload_fn=None):

self.shows = shows
self.endpoint = endpoint
self.check_delay = check_delay
self.prereload_fn = prereload_fn
self.pubsub_endpoint = pubsub_endpoint
self.context = zmq.Context()

self.ctrl = Controller(self.context, endpoint, self, self.check_delay)
self.pid = os.getpid()
self._shows_names = {}
Expand All @@ -31,8 +36,13 @@ def __init__(self, shows, endpoint, check_delay=1., prereload_fn=None):
logger.info("Starting master on pid %s" % self.pid)

def setup(self):
# set pubsub endpoint
self.pubsub_io = self.context.socket(zmq.PUB)
self.pubsub_io.bind(self.pubsub_endpoint)

for show in self.shows:
self._shows_names[show.name.lower()] = show
show.pubsub_io = self.pubsub_io

def start(self):
logger.debug('Starting the controller')
Expand All @@ -57,6 +67,7 @@ def stop(self, graceful=True):
for show in self.shows:
show.stop(graceful=graceful)

time.sleep(0.5)
try:
self.context.destroy(0)
except zmq.ZMQError as e:
Expand Down

0 comments on commit 374e5e0

Please sign in to comment.