Skip to content

Commit

Permalink
Split ResultList from Barrier so the barrier isn't modified by the ge…
Browse files Browse the repository at this point in the history
…t_item() call
  • Loading branch information
bslatkin committed Oct 4, 2015
1 parent 5500acc commit 941f2a6
Showing 1 changed file with 33 additions and 26 deletions.
59 changes: 33 additions & 26 deletions dpxdt/client/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ def __init__(self, input_queue, output_queue):
input_queue: Queue this worker consumes work from.
output_queue: Queue where this worker puts new work items, if any.
"""
threading.Thread.__init__(self)
super(WorkerThread, self).__init__()
self.daemon = True
self.input_queue = input_queue
self.output_queue = output_queue
Expand Down Expand Up @@ -189,7 +189,7 @@ class WorkflowItem(WorkItem):
root = False

def __init__(self, *args, **kwargs):
WorkItem.__init__(self)
super(WorkflowItem, self).__init__()
self.args = args
self.kwargs = kwargs
self.interrupted = False
Expand Down Expand Up @@ -220,7 +220,23 @@ def __init__(self, items):
self.items = items


class Barrier(list):
class ResultList(list):
"""A list of results."""

@property
def error(self):
"""Returns the error for this barrier and all work items, if any."""
# Copy the error from any failed item to be the error for the whole
# barrier. The first error seen "wins". Also handles the case where
# the WorkItems passed into the barrier have already completed and
# been marked with errors.
for item in self:
if isinstance(item, WorkItem) and item.error:
return item.error
return None


class Barrier(ResultList):
"""Barrier for running multiple WorkItems in parallel."""

def __init__(self, workflow, generator, work):
Expand All @@ -233,7 +249,7 @@ def __init__(self, workflow, generator, work):
a list or tuple that contains a set of WorkItems to run in
parallel.
"""
list.__init__(self)
super(Barrier, self).__init__()
self.workflow = workflow
self.generator = generator

Expand Down Expand Up @@ -278,31 +294,22 @@ def outstanding(self):

return True

@property
def error(self):
"""Returns the error for this barrier and all work items, if any."""
# Copy the error from any failed item to be the error for the whole
# barrier. The first error seen "wins". Also handles the case where
# the WorkItems passed into the barrier have already completed and
# been marked with errors.
for item in self:
if isinstance(item, WorkItem) and item.error:
return item.error
return None

def get_item(self):
"""Returns the item to send back into the workflow generator."""
if self.was_list:
blocking_items = self[:]
self[:] = []
for item in blocking_items:
if (isinstance(item, WorkflowItem) and
item.done and
not item.error):
self.append(item.result)
result = ResultList()
for item in self:
if isinstance(item, WorkflowItem):
if item.done and not item.error:
result.append(item.result)
else:
# When there's an error or the workflow isn't done yet,
# just return the original WorkflowItem so the caller
# can inspect its entire state.
result.append(item)
else:
self.append(item)
return self
result.append(item)
return result
else:
return self[0]

Expand Down Expand Up @@ -409,7 +416,7 @@ class using the register() method.
output_queue: Queue where this worker puts finished work items,
if any.
"""
WorkerThread.__init__(self, input_queue, output_queue)
super(WorkflowThread, self).__init__(input_queue, output_queue)
self.pending = PendingBarriers()
self.worker_threads = []
self.register(WorkflowItem, input_queue)
Expand Down

0 comments on commit 941f2a6

Please sign in to comment.