From 1ef5dd4432595f45746b2c43c22e8851be6827db Mon Sep 17 00:00:00 2001 From: William Silversmith Date: Sat, 23 Feb 2019 21:57:14 -0500 Subject: [PATCH] fix: support dynamic classes in multiprocessing (#16) Need to set __module__ to __main__. https://github.com/uqfoundation/dill/issues/56 --- taskqueue/taskqueue.py | 21 +++++++++++++++++++-- test/pathos_issue.py | 21 +++++++++++++++++++++ test/test_taskqueue.py | 13 ++++++++++++- 3 files changed, 52 insertions(+), 3 deletions(-) create mode 100644 test/pathos_issue.py diff --git a/taskqueue/taskqueue.py b/taskqueue/taskqueue.py index 4e6a356..f80b699 100644 --- a/taskqueue/taskqueue.py +++ b/taskqueue/taskqueue.py @@ -663,8 +663,25 @@ def capturing_soloprocess_upload(*args, **kwargs): ) tasks = _scatter(tasks, parallel) - pool = pathos.pools.ProcessPool(parallel) - pool.map(uploadfn, tasks) + # This is a hack to get dill to pickle dynamically + # generated classes. This is an important use case + # for when we create iterators with generator __iter__ + # functions on demand. + + # https://github.com/uqfoundation/dill/issues/56 + + try: + task = next(item for item in tasks if item is not None) + except StopIteration: + return + + cls_module = task.__class__.__module__ + task.__class__.__module__ = '__main__' + + with pathos.pools.ProcessPool(parallel) as pool: + pool.map(uploadfn, tasks) + + task.__class__.__module__ = cls_module if not error_queue.empty(): errors = [] diff --git a/test/pathos_issue.py b/test/pathos_issue.py new file mode 100644 index 0000000..88794cf --- /dev/null +++ b/test/pathos_issue.py @@ -0,0 +1,21 @@ +from taskqueue import PrintTask + +import copy + +def crt_tasks(a,b): + bounds = 5 + + class TaskIterator(): + def __init__(self, x): + self.x = x + def __len__(self): + return b-a + def __getitem__(self, slc): + itr = copy.deepcopy(self) + itr.x = 666 + return itr + def __iter__(self): + for i in range(a,b): + yield PrintTask(str(i) + str(self.x)) + + return TaskIterator(bounds) diff --git a/test/test_taskqueue.py b/test/test_taskqueue.py index 304b742..251d671 100644 --- a/test/test_taskqueue.py +++ b/test/test_taskqueue.py @@ -145,4 +145,15 @@ def test_local_taskqueue(): with MockTaskQueue(parallel=True, progress=False) as tq: for i in range(200): - tq.insert(ExecutePrintTask(), [i], { 'wow2': 4 }) \ No newline at end of file + tq.insert(ExecutePrintTask(), [i], { 'wow2': 4 }) + +def test_parallel_insert_all(): + import pathos_issue + + global QURL + tq = GreenTaskQueue(QURL) + + tasks = pathos_issue.crt_tasks(5, 20) + tq.insert_all(tasks, parallel=2) + + tq.purge() \ No newline at end of file