diff --git a/.pylintrc b/.pylintrc index ca66376e..25a05430 100644 --- a/.pylintrc +++ b/.pylintrc @@ -254,7 +254,6 @@ enable=abstract-class-instantiated, using-cmp-argument, using-constant-test, wildcard-import, - wrong-import-order, xrange-builtin, yield-inside-async-function, yield-outside-function, @@ -268,8 +267,9 @@ enable=abstract-class-instantiated, # 4. Remove from this list. # attribute-defined-outside-init, # import-error, - # invalid-name, - # pointless-statement, + # invalid-name, # to many false positives + # pointless-statement, # u.trajectory[2] is a false positive here + # wrong-import-order, # doesn't except six imports at the file start [REPORTS] diff --git a/pmda/parallel.py b/pmda/parallel.py index f595740c..9fa0a063 100644 --- a/pmda/parallel.py +++ b/pmda/parallel.py @@ -15,17 +15,19 @@ """ from __future__ import absolute_import, division -from contextlib import contextmanager -import warnings - from six.moves import range +from six.moves import cPickle as pickle import MDAnalysis as mda +from collections import namedtuple +from contextlib import contextmanager +from dask import distributed, multiprocessing from dask.delayed import delayed import dask import dask.distributed from joblib import cpu_count import numpy as np +import warnings from .util import timeit, make_balanced_slices @@ -84,6 +86,10 @@ def conclude(self): return self._conclude +HelperResult = namedtuple("HelperResult", + "result, timing_io, timing_compute, timing_universe") + + class ParallelAnalysisBase(object): """Base class for defining parallel multi frame analysis @@ -190,7 +196,8 @@ def __init__(self, universe, atomgroups): self._trajectory = universe.trajectory self._top = universe.filename self._traj = universe.trajectory.filename - self._indices = [ag.indices for ag in atomgroups] + self._anchor = universe.anchor_name + self._pickles = [pickle.dumps(ag) for ag in atomgroups] @contextmanager def readonly_attributes(self): @@ -351,9 +358,10 @@ def run(self, task = delayed( self._dask_helper, pure=False)( bslice, - self._indices, + self._pickles, self._top, - self._traj, ) + self._traj, + self._anchor) blocks.append(task) blocks = delayed(blocks) res = blocks.compute(**scheduler_kwargs) @@ -371,11 +379,11 @@ def run(self, np.array([el[3] for el in res]), time_prepare, conclude.elapsed) return self - def _dask_helper(self, bslice, indices, top, traj): + def _dask_helper(self, bslice, pickles, top, traj, anchor): """helper function to actually setup dask graph""" with timeit() as b_universe: - u = mda.Universe(top, traj) - agroups = [u.atoms[idx] for idx in indices] + u = mda.Universe(top, traj, anchor_name=anchor) + agroups = [pickle.loads(idx) for idx in pickles] res = [] times_io = [] @@ -392,8 +400,8 @@ def _dask_helper(self, bslice, indices, top, traj): times_io.append(b_io.elapsed) times_compute.append(b_compute.elapsed) - return np.asarray(res), np.asarray(times_io), np.asarray( - times_compute), b_universe.elapsed + return HelperResult(np.asarray(res), np.asarray(times_io), + np.asarray(times_compute), b_universe.elapsed) @staticmethod def _reduce(res, result_single_frame):