From 0a9f1964b53080ced76ba2f13dba41e83382ec39 Mon Sep 17 00:00:00 2001 From: Max Linke Date: Thu, 20 Sep 2018 16:13:13 +0200 Subject: [PATCH 1/8] use named tuple as return type for dask_helper This makes functions unpacking this tuple easier to read. --- pmda/parallel.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index f595740c..77a4eb2a 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -15,12 +15,12 @@ """ from __future__ import absolute_import, division -from contextlib import contextmanager import warnings -from six.moves import range - import MDAnalysis as mda +from collections import namedtuple +from contextlib import contextmanager +from dask import distributed, multiprocessing from dask.delayed import delayed import dask import dask.distributed @@ -84,6 +84,9 @@ def conclude(self): return self._conclude +HelperResult = namedtuple("HelperResult", "result, timing_io, timing_compute, timing_universe") + + class ParallelAnalysisBase(object): """Base class for defining parallel multi frame analysis @@ -366,9 +369,9 @@ def run(self, self._conclude() self.timing = Timing( - np.hstack([el[1] for el in res]), - np.hstack([el[2] for el in res]), total.elapsed, - np.array([el[3] for el in res]), time_prepare, conclude.elapsed) + np.hstack([el.timing_io for el in res]), + np.hstack([el.timing_compute for el in res]), total.elapsed, + np.array([el.timing_universe for el in res]), time_prepare, conclude.elapsed) return self def _dask_helper(self, bslice, indices, top, traj): @@ -392,8 +395,8 @@ def _dask_helper(self, bslice, indices, top, traj): times_io.append(b_io.elapsed) times_compute.append(b_compute.elapsed) - return np.asarray(res), np.asarray(times_io), np.asarray( - times_compute), b_universe.elapsed + return HelperResult(np.asarray(res), np.asarray(times_io), + np.asarray(times_compute), b_universe.elapsed) @staticmethod def _reduce(res, result_single_frame): From 6c09ded09993380c5f79a46d3a5adbab88c427cc Mon Sep 17 00:00:00 2001 From: Max Linke Date: Thu, 20 Sep 2018 12:00:19 +0200 Subject: [PATCH 2/8] use pickle of ag to transport information --- pmda/parallel.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 77a4eb2a..73f7df7f 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -15,7 +15,8 @@ """ from __future__ import absolute_import, division -import warnings +from six.moves import range +from six.moves import cPickle as pickle import MDAnalysis as mda from collections import namedtuple @@ -26,6 +27,7 @@ import dask.distributed from joblib import cpu_count import numpy as np +import warnings from .util import timeit, make_balanced_slices @@ -193,7 +195,7 @@ def __init__(self, universe, atomgroups): self._trajectory = universe.trajectory self._top = universe.filename self._traj = universe.trajectory.filename - self._indices = [ag.indices for ag in atomgroups] + self._pickles = [pickle.dump(ag) for ag in atomgroups] @contextmanager def readonly_attributes(self): @@ -354,7 +356,7 @@ def run(self, task = delayed( self._dask_helper, pure=False)( bslice, - self._indices, + self._pickles, self._top, self._traj, ) blocks.append(task) @@ -374,11 +376,11 @@ def run(self, np.array([el.timing_universe for el in res]), time_prepare, conclude.elapsed) return self - def _dask_helper(self, bslice, indices, top, traj): + def _dask_helper(self, bslice, pickles, top, traj): """helper function to actually setup dask graph""" with timeit() as b_universe: u = mda.Universe(top, traj) - agroups = [u.atoms[idx] for idx in indices] + agroups = [pickle.loads(idx) for idx in pickles] res = [] times_io = [] From d7e65b7646711d475ca7980d2b755ccfc58f2a60 Mon Sep 17 00:00:00 2001 From: Max Linke Date: Thu, 20 Sep 2018 13:09:54 +0200 Subject: [PATCH 3/8] fix typo --- pmda/parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 73f7df7f..de198d39 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -195,7 +195,7 @@ def __init__(self, universe, atomgroups): self._trajectory = universe.trajectory self._top = universe.filename self._traj = universe.trajectory.filename - self._pickles = [pickle.dump(ag) for ag in atomgroups] + self._pickles = [pickle.dumps(ag) for ag in atomgroups] @contextmanager def readonly_attributes(self): From 750630cb2e0d9c3685e4dea2af16c1fd68e75feb Mon Sep 17 00:00:00 2001 From: Max Linke Date: Thu, 20 Sep 2018 16:22:37 +0200 Subject: [PATCH 4/8] try anchor thingy --- pmda/parallel.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index de198d39..77a71ed7 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -195,6 +195,7 @@ def __init__(self, universe, atomgroups): self._trajectory = universe.trajectory self._top = universe.filename self._traj = universe.trajectory.filename + self._anchor = universe.anchor_name self._pickles = [pickle.dumps(ag) for ag in atomgroups] @contextmanager @@ -358,7 +359,8 @@ def run(self, bslice, self._pickles, self._top, - self._traj, ) + self._traj, + self._anchor) blocks.append(task) blocks = delayed(blocks) res = blocks.compute(**scheduler_kwargs) @@ -376,10 +378,11 @@ def run(self, np.array([el.timing_universe for el in res]), time_prepare, conclude.elapsed) return self - def _dask_helper(self, bslice, pickles, top, traj): + def _dask_helper(self, bslice, pickles, top, traj, anchor): """helper function to actually setup dask graph""" with timeit() as b_universe: u = mda.Universe(top, traj) + u.anchor_name = anchor agroups = [pickle.loads(idx) for idx in pickles] res = [] From 6197285ebbc373b5d79fc08ad1d33206ab3cd34a Mon Sep 17 00:00:00 2001 From: Max Linke Date: Thu, 20 Sep 2018 16:35:27 +0200 Subject: [PATCH 5/8] add import --- pmda/parallel.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 77a71ed7..76fee6fb 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -86,7 +86,8 @@ def conclude(self): return self._conclude -HelperResult = namedtuple("HelperResult", "result, timing_io, timing_compute, timing_universe") +HelperResult = namedtuple("HelperResult", + "result, timing_io, timing_compute, timing_universe") class ParallelAnalysisBase(object): From 80ae146a473023bf554d69741bff01ffb53730bd Mon Sep 17 00:00:00 2001 From: Max Linke Date: Thu, 20 Sep 2018 16:36:46 +0200 Subject: [PATCH 6/8] adjust pylint --- .pylintrc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.pylintrc b/.pylintrc index ca66376e..25a05430 100644 --- a/.pylintrc +++ b/.pylintrc @@ -254,7 +254,6 @@ enable=abstract-class-instantiated, using-cmp-argument, using-constant-test, wildcard-import, - wrong-import-order, xrange-builtin, yield-inside-async-function, yield-outside-function, @@ -268,8 +267,9 @@ enable=abstract-class-instantiated, # 4. Remove from this list. # attribute-defined-outside-init, # import-error, - # invalid-name, - # pointless-statement, + # invalid-name, # to many false positives + # pointless-statement, # u.trajectory[2] is a false positive here + # wrong-import-order, # doesn't except six imports at the file start [REPORTS] From 9b9db567bb46eeb3552b9bbda8af9b5bc57d0ac4 Mon Sep 17 00:00:00 2001 From: Richard Gowers Date: Mon, 5 Nov 2018 13:21:02 -0600 Subject: [PATCH 7/8] Update parallel.py --- pmda/parallel.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 76fee6fb..4a97592a 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -382,8 +382,7 @@ def run(self, def _dask_helper(self, bslice, pickles, top, traj, anchor): """helper function to actually setup dask graph""" with timeit() as b_universe: - u = mda.Universe(top, traj) - u.anchor_name = anchor + u = mda.Universe(top, traj, anchor_name=anchor) agroups = [pickle.loads(idx) for idx in pickles] res = [] From 3ac70806038d10a53475ced9321478fbb0445e8d Mon Sep 17 00:00:00 2001 From: Max Linke Date: Tue, 6 Nov 2018 19:31:16 +0100 Subject: [PATCH 8/8] maybe that helps for a reason? --- pmda/parallel.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pmda/parallel.py b/pmda/parallel.py index 4a97592a..9fa0a063 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -374,9 +374,9 @@ def run(self, self._conclude() self.timing = Timing( - np.hstack([el.timing_io for el in res]), - np.hstack([el.timing_compute for el in res]), total.elapsed, - np.array([el.timing_universe for el in res]), time_prepare, conclude.elapsed) + np.hstack([el[1] for el in res]), + np.hstack([el[2] for el in res]), total.elapsed, + np.array([el[3] for el in res]), time_prepare, conclude.elapsed) return self def _dask_helper(self, bslice, pickles, top, traj, anchor):