Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use pickle of ag to transport information #65

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ enable=abstract-class-instantiated,
using-cmp-argument,
using-constant-test,
wildcard-import,
wrong-import-order,
xrange-builtin,
yield-inside-async-function,
yield-outside-function,
Expand All @@ -268,8 +267,9 @@ enable=abstract-class-instantiated,
# 4. Remove from this list.
# attribute-defined-outside-init,
# import-error,
# invalid-name,
# pointless-statement,
# invalid-name, # to many false positives
# pointless-statement, # u.trajectory[2] is a false positive here
# wrong-import-order, # doesn't except six imports at the file start


[REPORTS]
Expand Down
30 changes: 19 additions & 11 deletions pmda/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,19 @@

"""
from __future__ import absolute_import, division
from contextlib import contextmanager
import warnings

from six.moves import range
from six.moves import cPickle as pickle

import MDAnalysis as mda
from collections import namedtuple
from contextlib import contextmanager
from dask import distributed, multiprocessing
from dask.delayed import delayed
import dask
import dask.distributed
from joblib import cpu_count
import numpy as np
import warnings

from .util import timeit, make_balanced_slices

Expand Down Expand Up @@ -84,6 +86,10 @@ def conclude(self):
return self._conclude


HelperResult = namedtuple("HelperResult",
"result, timing_io, timing_compute, timing_universe")


class ParallelAnalysisBase(object):
"""Base class for defining parallel multi frame analysis

Expand Down Expand Up @@ -190,7 +196,8 @@ def __init__(self, universe, atomgroups):
self._trajectory = universe.trajectory
self._top = universe.filename
self._traj = universe.trajectory.filename
self._indices = [ag.indices for ag in atomgroups]
self._anchor = universe.anchor_name
self._pickles = [pickle.dumps(ag) for ag in atomgroups]

@contextmanager
def readonly_attributes(self):
Expand Down Expand Up @@ -351,9 +358,10 @@ def run(self,
task = delayed(
self._dask_helper, pure=False)(
bslice,
self._indices,
self._pickles,
self._top,
self._traj, )
self._traj,
self._anchor)
blocks.append(task)
blocks = delayed(blocks)
res = blocks.compute(**scheduler_kwargs)
Expand All @@ -371,11 +379,11 @@ def run(self,
np.array([el[3] for el in res]), time_prepare, conclude.elapsed)
return self

def _dask_helper(self, bslice, indices, top, traj):
def _dask_helper(self, bslice, pickles, top, traj, anchor):
"""helper function to actually setup dask graph"""
with timeit() as b_universe:
u = mda.Universe(top, traj)
agroups = [u.atoms[idx] for idx in indices]
u = mda.Universe(top, traj, anchor_name=anchor)
agroups = [pickle.loads(idx) for idx in pickles]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E   RuntimeError: Couldn't find a suitable Universe to unpickle AtomGroup onto with Universe hash '63ae64d5-94f2-42e5-ad84-cb7f89970d36'.  Available hashes: 303e8959-6b17-49ee-9910-9528cf5ee0d5

@richardjgowers the simple pickle solution doesn't work. The universe Hash values are different for some reason.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kain88-de it's not actually hashing the Universe, it's just creating a uuid when the Universe is created. So when you create a new Universe from the same files it "hashes" different.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I also need to pickle the universe?


res = []
times_io = []
Expand All @@ -392,8 +400,8 @@ def _dask_helper(self, bslice, indices, top, traj):
times_io.append(b_io.elapsed)
times_compute.append(b_compute.elapsed)

return np.asarray(res), np.asarray(times_io), np.asarray(
times_compute), b_universe.elapsed
return HelperResult(np.asarray(res), np.asarray(times_io),
np.asarray(times_compute), b_universe.elapsed)

@staticmethod
def _reduce(res, result_single_frame):
Expand Down