From 941f2a648a9adc39fdec2c3d66edd6b6aa95e141 Mon Sep 17 00:00:00 2001 From: Brett Slatkin Date: Sun, 4 Oct 2015 16:28:32 -0700 Subject: [PATCH] Split ResultList from Barrier so the barrier isn't modified by the get_item() call --- dpxdt/client/workers.py | 59 +++++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/dpxdt/client/workers.py b/dpxdt/client/workers.py index 8b7cc3b..1c6ab91 100644 --- a/dpxdt/client/workers.py +++ b/dpxdt/client/workers.py @@ -112,7 +112,7 @@ def __init__(self, input_queue, output_queue): input_queue: Queue this worker consumes work from. output_queue: Queue where this worker puts new work items, if any. """ - threading.Thread.__init__(self) + super(WorkerThread, self).__init__() self.daemon = True self.input_queue = input_queue self.output_queue = output_queue @@ -189,7 +189,7 @@ class WorkflowItem(WorkItem): root = False def __init__(self, *args, **kwargs): - WorkItem.__init__(self) + super(WorkflowItem, self).__init__() self.args = args self.kwargs = kwargs self.interrupted = False @@ -220,7 +220,23 @@ def __init__(self, items): self.items = items -class Barrier(list): +class ResultList(list): + """A list of results.""" + + @property + def error(self): + """Returns the error for this barrier and all work items, if any.""" + # Copy the error from any failed item to be the error for the whole + # barrier. The first error seen "wins". Also handles the case where + # the WorkItems passed into the barrier have already completed and + # been marked with errors. + for item in self: + if isinstance(item, WorkItem) and item.error: + return item.error + return None + + +class Barrier(ResultList): """Barrier for running multiple WorkItems in parallel.""" def __init__(self, workflow, generator, work): @@ -233,7 +249,7 @@ def __init__(self, workflow, generator, work): a list or tuple that contains a set of WorkItems to run in parallel. """ - list.__init__(self) + super(Barrier, self).__init__() self.workflow = workflow self.generator = generator @@ -278,31 +294,22 @@ def outstanding(self): return True - @property - def error(self): - """Returns the error for this barrier and all work items, if any.""" - # Copy the error from any failed item to be the error for the whole - # barrier. The first error seen "wins". Also handles the case where - # the WorkItems passed into the barrier have already completed and - # been marked with errors. - for item in self: - if isinstance(item, WorkItem) and item.error: - return item.error - return None - def get_item(self): """Returns the item to send back into the workflow generator.""" if self.was_list: - blocking_items = self[:] - self[:] = [] - for item in blocking_items: - if (isinstance(item, WorkflowItem) and - item.done and - not item.error): - self.append(item.result) + result = ResultList() + for item in self: + if isinstance(item, WorkflowItem): + if item.done and not item.error: + result.append(item.result) + else: + # When there's an error or the workflow isn't done yet, + # just return the original WorkflowItem so the caller + # can inspect its entire state. + result.append(item) else: - self.append(item) - return self + result.append(item) + return result else: return self[0] @@ -409,7 +416,7 @@ class using the register() method. output_queue: Queue where this worker puts finished work items, if any. """ - WorkerThread.__init__(self, input_queue, output_queue) + super(WorkflowThread, self).__init__(input_queue, output_queue) self.pending = PendingBarriers() self.worker_threads = [] self.register(WorkflowItem, input_queue)