Skip to content

Commit

Permalink
Instantiate attributes in __init__()
Browse files Browse the repository at this point in the history
  • Loading branch information
dnaeon committed Sep 3, 2014
1 parent e59443b commit 2ea64dc
Showing 1 changed file with 14 additions and 20 deletions.
34 changes: 14 additions & 20 deletions src/vpoller/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, config_file, num_workers=0):
Args:
config_file (str): Path to the vPoller configuration file
num_workers (str): Number of vPoller Worker processes to create
num_workers (int): Number of vPoller Worker processes to create
"""
self.node = node()
Expand All @@ -59,9 +59,9 @@ def __init__(self, config_file, num_workers=0):
self.time_to_die = multiprocessing.Event()
self.config = {}
self.workers = []
self.zcontext = None
self.zpoller = None
self.mgmt_socket = None
self.zcontext = zmq.Context()
self.zpoller = zmq.Poller()
self.mgmt = self.zcontext.socket(zmq.REP)
self.mgmt_methods = {
'status': self.status,
'shutdown': self.signal_stop,
Expand Down Expand Up @@ -156,11 +156,8 @@ def create_sockets(self):
"""
logger.debug('Creating vPoller Worker Manager sockets')

self.zcontext = zmq.Context()
self.mgmt_socket = self.zcontext.socket(zmq.REP)
self.mgmt_socket.bind(self.config.get('mgmt'))
self.zpoller = zmq.Poller()
self.zpoller.register(self.mgmt_socket, zmq.POLLIN)
self.mgmt.bind(self.config.get('mgmt'))
self.zpoller.register(self.mgmt, zmq.POLLIN)

def close_sockets(self):
"""
Expand All @@ -169,8 +166,8 @@ def close_sockets(self):
"""
logger.debug('Closing vPoller Worker Manager sockets')

self.zpoller.unregister(self.mgmt_socket)
self.mgmt_socket.close()
self.zpoller.unregister(self.mgmt)
self.mgmt.close()
self.zcontext.term()

def wait_for_mgmt_task(self):
Expand All @@ -179,15 +176,15 @@ def wait_for_mgmt_task(self):
"""
socks = dict(self.zpoller.poll())
if socks.get(self.mgmt_socket) == zmq.POLLIN:
if socks.get(self.mgmt) == zmq.POLLIN:
try:
msg = self.mgmt_socket.recv_json()
msg = self.mgmt.recv_json()
except TypeError as e:
logger.warning('Invalid message received on management interface: %s', msg)
return

result = self.process_mgmt_task(msg)
self.mgmt_socket.send_json(result)
self.mgmt.send_json(result)

def process_mgmt_task(self, msg):
"""
Expand Down Expand Up @@ -270,9 +267,9 @@ def __init__(self, db, proxy):
}
self.time_to_die = multiprocessing.Event()
self.agents = {}
self.zcontext = None
self.zpoller = None
self.worker_socket = None
self.zcontext = zmq.Context()
self.zpoller = zmq.Poller()
self.worker_socket = self.zcontext.socket(zmq.DEALER)

def run(self):
"""
Expand Down Expand Up @@ -348,10 +345,7 @@ def create_sockets(self):
"""
logger.debug('Creating vPoller Worker sockets')

self.zcontext = zmq.Context()
self.worker_socket = self.zcontext.socket(zmq.DEALER)
self.worker_socket.connect(self.config.get('proxy'))
self.zpoller = zmq.Poller()
self.zpoller.register(self.worker_socket, zmq.POLLIN)

def close_sockets(self):
Expand Down

0 comments on commit 2ea64dc

Please sign in to comment.