Skip to content

Commit

Permalink
VPollerWorker class refactoring after the 'multiprocessing' changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dnaeon committed Sep 3, 2014
1 parent f06262f commit b61facf
Showing 1 changed file with 67 additions and 204 deletions.
271 changes: 67 additions & 204 deletions src/vpoller/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,167 +285,115 @@ def run(self):
config (str): Path to the confuguration file for vPoller Worker
"""
logging.debug('Preparing vPoller Worker for start up')
logging.info('vPoller Worker process is starting')

# Note the time we start up
self.running_since = asctime()

# A flag to signal that our daemon should be terminated
self.time_to_die = False

# Load the configuration file of the vPoller Worker
self.load_worker_config(config)

# Create the worker sockets
self.create_worker_sockets()

# Spawn the vSphere Agents of the Worker
self.spawn_vsphere_agents()

# Enter the main daemon loop from here
logging.debug('Entering main daemon loop')
while not self.time_to_die:
socks = dict(self.zpoller.poll(1000))

# Worker socket, receives client messages for processing
#
# The routing envelope of the message looks like this:
#
# Frame 1: [ N ][...] <- Identity of connection
# Frame 2: [ 0 ][] <- Empty delimiter frame
# Frame 3: [ N ][...] <- Data frame
if socks.get(self.worker_socket) == zmq.POLLIN:
_id = self.worker_socket.recv()
_empty = self.worker_socket.recv()

try:
msg = self.worker_socket.recv_json()
except Exception as e:
logging.warning('Received bad client message, request will be ignored: %s', e)
continue

result = self.process_client_msg(msg)

# Return result back to client
self.worker_socket.send(_id, zmq.SNDMORE)
self.worker_socket.send("", zmq.SNDMORE)
try:
self.worker_socket.send_json(result)
except TypeError as e:
logging.warning('Cannot serialize result: %s', e)
self.worker_socket.send_json({ 'success': 1, 'msg': 'Cannot serialize result: %s' % e})

# Management socket
if socks.get(self.mgmt_socket) == zmq.POLLIN:
msg = self.mgmt_socket.recv_json()
result = self.process_mgmt_msg(msg)
self.mgmt_socket.send_json(result)
self.create_sockets()
self.create_agents()

while not self.time_to_die.is_set():
self.wait_for_tasks()

# Shutdown time has arrived, let's clean up a bit
logging.debug('Shutdown time has arrived, vPoller Worker is going down')
self.close_worker_sockets()
self.shutdown_vsphere_agents()
self.stop()

def load_worker_config(self, config):
def stop(self):
"""
Loads the vPoller Worker configuration file
Stop vPoller Worker process
Args:
config (str): Path to the config file of vPoller Worker
"""
self.close_sockets()
self.stop_agents()

Raises:
VPollerException
def signal_stop(self):
"""
Signal the vPoller Worker process that shutdown time has arrived
"""
logging.debug('Loading vPoller Worker config file %s', config)
self.time_to_die.set()

def wait_for_tasks(self):
"""
Poll the worker socket for new tasks
if not os.path.exists(config):
logging.error('Configuration file does not exists: %s', config)
raise VPollerException, 'Configuration file does not exists: %s' % config
"""
socks = dict(self.zpoller.poll(1000))

parser = ConfigParser.ConfigParser()
parser.read(config)
# The routing envelope of the message on the worker socket is this:
#
# Frame 1: [ N ][...] <- Identity of connection
# Frame 2: [ 0 ][] <- Empty delimiter frame
# Frame 3: [ N ][...] <- Data frame
if socks.get(self.worker_socket) == zmq.POLLIN:
# TODO: Use recv_multipart()
_id = self.worker_socket.recv()
_empty = self.worker_socket.recv()

try:
self.connector_db = parser.get('worker', 'db')
self.proxy_endpoint = parser.get('worker', 'proxy')
self.mgmt_endpoint = parser.get('worker', 'mgmt')
except ConfigParser.NoOptionError as e:
logging.error('Configuration issues detected in %s: %s' , config, e)
raise

def create_worker_sockets(self):
try:
msg = self.worker_socket.recv_json()
except Exception as e:
logging.warning('Invalid client message received, will be ignored: %s', msg)

# Process task and return result to client
result = self.process_client_msg(msg)
self.worker_socket.send(_id, zmq.SNDMORE)
self.worker_socket.send("", zmq.SNDMORE)
try:
self.worker_socket.send_json(result)
except TypeError as e:
logging.warning('Cannot serialize result: %s', e)
self.worker_socket.send_json({ 'success': 1, 'msg': 'Cannot serialize result: %s' % e})

def create_sockets(self):
"""
Creates the ZeroMQ sockets used by the Worker
Creates the ZeroMQ sockets used by the vPoller Worker
Creates two sockets:
* REP socket (mgmt_socket) used for management
* DEALER socket (worker_socket) connected to the VPoller Proxy
Raises:
VPollerException
"""
logging.debug('Creating vPoller Worker sockets')

self.zcontext = zmq.Context()

self.mgmt_socket = self.zcontext.socket(zmq.REP)
self.mgmt_socket.bind(self.mgmt_endpoint)

logging.info('Connecting to the vPoller Proxy server')
self.zcontext = zmq.Context()
self.worker_socket = self.zcontext.socket(zmq.DEALER)
self.worker_socket.connect(self.proxy_endpoint)

# Create a poll set for our sockets
logging.debug('Creating poll set for vPoller Worker sockets')
self.worker_socket.connect(self.config.get('proxy'))
self.zpoller = zmq.Poller()
self.zpoller.register(self.mgmt_socket, zmq.POLLIN)
self.zpoller.register(self.worker_socket, zmq.POLLIN)

def close_worker_sockets(self):
def close_sockets(self):
"""
Closes the ZeroMQ sockets used by the Worker
Closes the ZeroMQ sockets used by the vPoller Worker
"""
logging.debug('Closing vPoller Worker sockets')

self.zpoller.unregister(self.mgmt_socket)
self.zpoller.unregister(self.worker_socket)

self.mgmt_socket.close()
self.worker_socket.close()
self.zcontext.term()

def spawn_vsphere_agents(self):
def create_agents(self):
"""
Prepares the vSphere Agents used by the vPoller Worker
"""
logging.debug('Spawning vSphere Agents')
Raises:
VPollerException
self.agents = dict()
"""
logging.debug('Creating vSphere Agents')

db = VConnectorDatabase(self.connector_db)
db_agents = db.get_agents(only_enabled=True)
db = VConnectorDatabase(self.config.get('db'))
agents = db.get_agents(only_enabled=True)

if not db_agents:
if not agents:
logging.warning('No registered or enabled vSphere Agents found')
raise VPollerException, 'No registered or enabled vSphere Agents found'

for each_agent in db_agents:
agent = VSphereAgent(
user=each_agent['user'],
pwd=each_agent['pwd'],
host=each_agent['host']
for agent in agents:
a = VSphereAgent(
user=agent['user'],
pwd=agent['pwd'],
host=agent['host']
)
self.agents[agent.host] = agent
self.agents[a.host] = a

def shutdown_vsphere_agents(self):
def stop_agents(self):
"""
Disconnects all vPoller Agents from their respective VMware vSphere hosts
Disconnects all vPoller Agents from the VMware vSphere hosts they are connected to
"""
logging.debug('Shutting down vSphere Agents')
Expand Down Expand Up @@ -657,88 +605,3 @@ def process_client_msg(self, msg):
result = agent_method['method'](msg)

return result

def process_mgmt_msg(self, msg):
"""
Processes a message for the management interface
Example client message to shutdown the vPoller Worker would be:
{
"method": "worker.shutdown"
}
Getting status information from the vPoller worker:
{
"method": "worker.status"
}
Args:
msg (dict): The client message for processing
"""
logging.debug('Processing management message: %s', msg)

if 'method' not in msg:
return { 'success': 1, 'msg': 'Missing method name' }

# The vPoller Worker management methods we support and process
methods = {
'worker.status': self.get_worker_status,
'worker.shutdown': self.worker_shutdown,
}

if msg['method'] not in methods:
return { 'success': 1, 'msg': 'Unknown method received' }

# Process management request and return result to client
result = methods[msg['method']](msg)

return result

def get_worker_status(self, msg):
"""
Get status information about the vPoller Worker
Args:
msg (dict): The client message for processing (ignored)
Returns:
Status information about the vPoller Worker
"""
logging.debug('Getting vPoller Worker status')

result = {
'success': 0,
'msg': 'vPoller Worker Status',
'result': {
'status': 'running',
'hostname': os.uname()[1],
'proxy_endpoint': self.proxy_endpoint,
'mgmt_endpoint': self.mgmt_endpoint,
'connector_db': self.connector_db,
'vsphere_agents': self.agents.keys(),
'running_since': self.running_since,
'uname': ' '.join(os.uname()),
}
}

logging.debug('Returning result to client: %s', result)

return result

def worker_shutdown(self, msg):
"""
Shutdown the vPoller Worker
Args:
msg (dict): The client message for processing (ignored)
"""
logging.info('vPoller Worker is shutting down')

self.time_to_die = True

return { 'success': 0, 'msg': 'vPoller Worker is shutting down' }

0 comments on commit b61facf

Please sign in to comment.