From b61facf9181c2b04766a11ef67c44ddb7a6c7b47 Mon Sep 17 00:00:00 2001 From: Marin Atanasov Nikolov Date: Fri, 29 Aug 2014 18:17:26 +0300 Subject: [PATCH] VPollerWorker class refactoring after the 'multiprocessing' changes --- src/vpoller/worker.py | 271 +++++++++++------------------------------- 1 file changed, 67 insertions(+), 204 deletions(-) diff --git a/src/vpoller/worker.py b/src/vpoller/worker.py index 7b3a88b..8e6100d 100644 --- a/src/vpoller/worker.py +++ b/src/vpoller/worker.py @@ -285,167 +285,115 @@ def run(self): config (str): Path to the confuguration file for vPoller Worker """ - logging.debug('Preparing vPoller Worker for start up') + logging.info('vPoller Worker process is starting') - # Note the time we start up - self.running_since = asctime() - - # A flag to signal that our daemon should be terminated - self.time_to_die = False - - # Load the configuration file of the vPoller Worker - self.load_worker_config(config) - - # Create the worker sockets - self.create_worker_sockets() - - # Spawn the vSphere Agents of the Worker - self.spawn_vsphere_agents() - - # Enter the main daemon loop from here - logging.debug('Entering main daemon loop') - while not self.time_to_die: - socks = dict(self.zpoller.poll(1000)) - - # Worker socket, receives client messages for processing - # - # The routing envelope of the message looks like this: - # - # Frame 1: [ N ][...] <- Identity of connection - # Frame 2: [ 0 ][] <- Empty delimiter frame - # Frame 3: [ N ][...] <- Data frame - if socks.get(self.worker_socket) == zmq.POLLIN: - _id = self.worker_socket.recv() - _empty = self.worker_socket.recv() - - try: - msg = self.worker_socket.recv_json() - except Exception as e: - logging.warning('Received bad client message, request will be ignored: %s', e) - continue - - result = self.process_client_msg(msg) - - # Return result back to client - self.worker_socket.send(_id, zmq.SNDMORE) - self.worker_socket.send("", zmq.SNDMORE) - try: - self.worker_socket.send_json(result) - except TypeError as e: - logging.warning('Cannot serialize result: %s', e) - self.worker_socket.send_json({ 'success': 1, 'msg': 'Cannot serialize result: %s' % e}) - - # Management socket - if socks.get(self.mgmt_socket) == zmq.POLLIN: - msg = self.mgmt_socket.recv_json() - result = self.process_mgmt_msg(msg) - self.mgmt_socket.send_json(result) + self.create_sockets() + self.create_agents() + + while not self.time_to_die.is_set(): + self.wait_for_tasks() - # Shutdown time has arrived, let's clean up a bit - logging.debug('Shutdown time has arrived, vPoller Worker is going down') - self.close_worker_sockets() - self.shutdown_vsphere_agents() self.stop() - def load_worker_config(self, config): + def stop(self): """ - Loads the vPoller Worker configuration file + Stop vPoller Worker process - Args: - config (str): Path to the config file of vPoller Worker + """ + self.close_sockets() + self.stop_agents() - Raises: - VPollerException + def signal_stop(self): + """ + Signal the vPoller Worker process that shutdown time has arrived """ - logging.debug('Loading vPoller Worker config file %s', config) + self.time_to_die.set() + + def wait_for_tasks(self): + """ + Poll the worker socket for new tasks - if not os.path.exists(config): - logging.error('Configuration file does not exists: %s', config) - raise VPollerException, 'Configuration file does not exists: %s' % config + """ + socks = dict(self.zpoller.poll(1000)) - parser = ConfigParser.ConfigParser() - parser.read(config) + # The routing envelope of the message on the worker socket is this: + # + # Frame 1: [ N ][...] <- Identity of connection + # Frame 2: [ 0 ][] <- Empty delimiter frame + # Frame 3: [ N ][...] <- Data frame + if socks.get(self.worker_socket) == zmq.POLLIN: + # TODO: Use recv_multipart() + _id = self.worker_socket.recv() + _empty = self.worker_socket.recv() - try: - self.connector_db = parser.get('worker', 'db') - self.proxy_endpoint = parser.get('worker', 'proxy') - self.mgmt_endpoint = parser.get('worker', 'mgmt') - except ConfigParser.NoOptionError as e: - logging.error('Configuration issues detected in %s: %s' , config, e) - raise - - def create_worker_sockets(self): + try: + msg = self.worker_socket.recv_json() + except Exception as e: + logging.warning('Invalid client message received, will be ignored: %s', msg) + + # Process task and return result to client + result = self.process_client_msg(msg) + self.worker_socket.send(_id, zmq.SNDMORE) + self.worker_socket.send("", zmq.SNDMORE) + try: + self.worker_socket.send_json(result) + except TypeError as e: + logging.warning('Cannot serialize result: %s', e) + self.worker_socket.send_json({ 'success': 1, 'msg': 'Cannot serialize result: %s' % e}) + + def create_sockets(self): """ - Creates the ZeroMQ sockets used by the Worker + Creates the ZeroMQ sockets used by the vPoller Worker Creates two sockets: - * REP socket (mgmt_socket) used for management - * DEALER socket (worker_socket) connected to the VPoller Proxy - - Raises: - VPollerException - """ logging.debug('Creating vPoller Worker sockets') - - self.zcontext = zmq.Context() - self.mgmt_socket = self.zcontext.socket(zmq.REP) - self.mgmt_socket.bind(self.mgmt_endpoint) - - logging.info('Connecting to the vPoller Proxy server') + self.zcontext = zmq.Context() self.worker_socket = self.zcontext.socket(zmq.DEALER) - self.worker_socket.connect(self.proxy_endpoint) - - # Create a poll set for our sockets - logging.debug('Creating poll set for vPoller Worker sockets') + self.worker_socket.connect(self.config.get('proxy')) self.zpoller = zmq.Poller() - self.zpoller.register(self.mgmt_socket, zmq.POLLIN) self.zpoller.register(self.worker_socket, zmq.POLLIN) - def close_worker_sockets(self): + def close_sockets(self): """ - Closes the ZeroMQ sockets used by the Worker + Closes the ZeroMQ sockets used by the vPoller Worker """ - logging.debug('Closing vPoller Worker sockets') - - self.zpoller.unregister(self.mgmt_socket) self.zpoller.unregister(self.worker_socket) - - self.mgmt_socket.close() self.worker_socket.close() self.zcontext.term() - def spawn_vsphere_agents(self): + def create_agents(self): """ Prepares the vSphere Agents used by the vPoller Worker - """ - logging.debug('Spawning vSphere Agents') + Raises: + VPollerException - self.agents = dict() + """ + logging.debug('Creating vSphere Agents') - db = VConnectorDatabase(self.connector_db) - db_agents = db.get_agents(only_enabled=True) + db = VConnectorDatabase(self.config.get('db')) + agents = db.get_agents(only_enabled=True) - if not db_agents: + if not agents: logging.warning('No registered or enabled vSphere Agents found') raise VPollerException, 'No registered or enabled vSphere Agents found' - for each_agent in db_agents: - agent = VSphereAgent( - user=each_agent['user'], - pwd=each_agent['pwd'], - host=each_agent['host'] + for agent in agents: + a = VSphereAgent( + user=agent['user'], + pwd=agent['pwd'], + host=agent['host'] ) - self.agents[agent.host] = agent + self.agents[a.host] = a - def shutdown_vsphere_agents(self): + def stop_agents(self): """ - Disconnects all vPoller Agents from their respective VMware vSphere hosts + Disconnects all vPoller Agents from the VMware vSphere hosts they are connected to """ logging.debug('Shutting down vSphere Agents') @@ -657,88 +605,3 @@ def process_client_msg(self, msg): result = agent_method['method'](msg) return result - - def process_mgmt_msg(self, msg): - """ - Processes a message for the management interface - - Example client message to shutdown the vPoller Worker would be: - - { - "method": "worker.shutdown" - } - - Getting status information from the vPoller worker: - - { - "method": "worker.status" - } - - Args: - msg (dict): The client message for processing - - """ - logging.debug('Processing management message: %s', msg) - - if 'method' not in msg: - return { 'success': 1, 'msg': 'Missing method name' } - - # The vPoller Worker management methods we support and process - methods = { - 'worker.status': self.get_worker_status, - 'worker.shutdown': self.worker_shutdown, - } - - if msg['method'] not in methods: - return { 'success': 1, 'msg': 'Unknown method received' } - - # Process management request and return result to client - result = methods[msg['method']](msg) - - return result - - def get_worker_status(self, msg): - """ - Get status information about the vPoller Worker - - Args: - msg (dict): The client message for processing (ignored) - - Returns: - Status information about the vPoller Worker - - """ - logging.debug('Getting vPoller Worker status') - - result = { - 'success': 0, - 'msg': 'vPoller Worker Status', - 'result': { - 'status': 'running', - 'hostname': os.uname()[1], - 'proxy_endpoint': self.proxy_endpoint, - 'mgmt_endpoint': self.mgmt_endpoint, - 'connector_db': self.connector_db, - 'vsphere_agents': self.agents.keys(), - 'running_since': self.running_since, - 'uname': ' '.join(os.uname()), - } - } - - logging.debug('Returning result to client: %s', result) - - return result - - def worker_shutdown(self, msg): - """ - Shutdown the vPoller Worker - - Args: - msg (dict): The client message for processing (ignored) - - """ - logging.info('vPoller Worker is shutting down') - - self.time_to_die = True - - return { 'success': 0, 'msg': 'vPoller Worker is shutting down' }