diff --git a/src/vpoller/worker.py b/src/vpoller/worker.py index a1c625e..2eecf8a 100644 --- a/src/vpoller/worker.py +++ b/src/vpoller/worker.py @@ -42,6 +42,7 @@ from vpoller import __version__ from vpoller.log import logger +from vpoller.client import validate_message from vpoller.exceptions import VPollerException from vpoller.task.registry import registry from vconnector.core import VConnector @@ -83,6 +84,7 @@ def __init__(self, config_file, num_workers=0): 'mgmt': 'tcp://*:10000', 'proxy': 'tcp://localhost:10123', 'helpers': None, + 'tasks': None, } def start(self): @@ -138,9 +140,13 @@ def load_config(self): self.config['db'] = parser.get('worker', 'db') self.config['proxy'] = parser.get('worker', 'proxy') self.config['helpers'] = parser.get('worker', 'helpers') + self.config['tasks'] = parser.get('worker', 'tasks') if self.config['helpers']: self.config['helpers'] = self.config['helpers'].split(',') + + if self.config['tasks']: + self.config['tasks'] = self.config['tasks'].split(',') logger.debug( 'Worker Manager configuration: %s', @@ -166,7 +172,8 @@ def start_workers(self): worker = VPollerWorker( db=self.config.get('db'), proxy=self.config.get('proxy'), - helpers=self.config.get('helpers') + helpers=self.config.get('helpers'), + tasks=self.config.get('tasks') ) worker.daemon = True self.workers.append(worker) @@ -271,6 +278,7 @@ def status(self): 'db': self.config.get('db'), 'concurrency': self.num_workers, 'helpers': self.config.get('helpers'), + 'tasks': self.config.get('tasks'), } } @@ -293,7 +301,7 @@ class VPollerWorker(multiprocessing.Process): run() method """ - def __init__(self, db, proxy, helpers): + def __init__(self, db, proxy, helpers, tasks): """ Initialize a new VPollerWorker object @@ -302,6 +310,7 @@ def __init__(self, db, proxy, helpers): proxy (str): Endpoint to which vPoller Workers connect and receive new tasks for processing helpers (list): A list of helper modules to be loaded + task (list): A list of task modules to be loaded """ super(VPollerWorker, self).__init__() @@ -309,8 +318,10 @@ def __init__(self, db, proxy, helpers): 'db': db, 'proxy': proxy, 'helpers': helpers, + 'tasks': tasks, } - self.helpers = {} + self.task_modules = {} + self.helper_modules = {} self.time_to_die = multiprocessing.Event() self.agents = {} self.zcontext = None @@ -327,7 +338,8 @@ def run(self): """ logger.info('Worker process is starting') - self.load_helpers() + self.load_task_modules() + self.load_helper_modules() self.create_sockets() self.create_agents() @@ -356,7 +368,31 @@ def signal_stop(self): """ self.time_to_die.set() - def load_helpers(self): + def load_task_modules(self): + """ + Loads the task modules + + """ + if not self.config.get('tasks'): + raise VPollerException('No task modules provided') + + for task in self.config.get('tasks'): + task = task.strip() + logger.info('Loading task module %s', task) + try: + module = importlib.import_module(task) + except ImportError as e: + logger.warning( + 'Cannot import task module: %s', + e.message + ) + continue + self.task_modules[task] = module + + if not self.task_modules: + raise VPollerException('No task modules loaded') + + def load_helper_modules(self): """ Loads helper modules for post-processing of results @@ -394,7 +430,7 @@ def load_helpers(self): 'Helper module %s successfully loaded', helper ) - self.helpers[helper] = module + self.helper_modules[helper] = module def run_helper(self, helper, msg, data): """ @@ -411,7 +447,7 @@ def run_helper(self, helper, msg, data): helper ) - module = self.helpers[helper] + module = self.helper_modules[helper] h = module.HelperAgent(msg=msg, data=data) try: @@ -421,7 +457,7 @@ def run_helper(self, helper, msg, data): return data return result - + def wait_for_tasks(self): """ Poll the worker socket for new tasks @@ -456,7 +492,7 @@ def wait_for_tasks(self): result = self.process_client_msg(msg) # Process data using a helper before sending it to client? - if 'helper' in msg and msg['helper'] in self.helpers: + if 'helper' in msg and msg['helper'] in self.helper_modules: data = self.run_helper( helper=msg['helper'], msg=msg, @@ -592,11 +628,9 @@ def process_client_msg(self, msg): if not agent: return {'success': 1, 'msg': 'Unknown or missing agent name'} - # - #if not _validate_client_msg(msg, task.required): - # return {'success': 1, 'msg': 'Incorrect task request received'} - # + if not validate_message(msg=msg, required=task.required): + return {'success': 1, 'msg': 'Invalid task request'} result = task.function(agent, msg) - + return result