-
Notifications
You must be signed in to change notification settings - Fork 23
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PMDA with refactored _single_frame
#128
base: master
Are you sure you want to change the base?
Conversation
Hello @yuxuanzhuang! Thanks for updating this PR. We checked the lines you've touched for PEP 8 issues, and found:
Comment last updated at 2020-07-16 10:59:39 UTC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to unify the two _single_frame()
methods. In my opinion, returning values as in PMDA is the better design compared to storing state in MDAnalysis 1.x, though. I am almost more tempted to break AnalysisBase in 2.0 but this would be a wider discussion and require broad consensus.
Some initial comments/questions below.
pmda/parallel.py
Outdated
for i, ts in enumerate(self._universe._trajectory[bslice]): | ||
self._frame_index = i | ||
# record io time per frame | ||
with timeit() as b_io: | ||
# explicit instead of 'for ts in u.trajectory[bslice]' | ||
# so that we can get accurate timing. | ||
ts = u.trajectory[i] | ||
# record compute time per frame |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where did the per-frame timing information go?
|
||
@staticmethod | ||
def _reduce(res, result_single_frame): | ||
""" 'append' action for a time series""" | ||
res.append(result_single_frame) | ||
return res |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How are you going to deal with aggregations as in DensityAnalysis or complicated reductions as in RMSF?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's still doable...with some workaround. i.e. we can do aggregations and further calculations inside _conclude
, but it also means we have to transfer more data back to the main process. (new examples updated in the gist jupyter notebook)
pmda/parallel.py
Outdated
n_frames = len(range(start, stop, step)) | ||
|
||
self.start, self.stop, self.step = start, stop, step | ||
|
||
self.n_frames = n_frames | ||
|
||
# in case _prepare has not set an array. | ||
self._results = np.zeros(n_frames) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure this is always something we want to do. Check the other analysis classes in mda to see if such a default would make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should normally be overridden by the definition inside _prepare
to suit what _single_frame
saves.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not like this reasoning. This means I always have to know about the default and that I can overwrite it in one place. This just looks weird to me. I would rather have to just set this up in once single place.
This is pretty cool. I'm do not expect this to work for everything. But it could make a neat little wrapper for some analysis functions. It could definitely be working on 'AnalysisFromFunction'. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sold on how to unify the two _single_frame()
methods.
I'd rather have a simple PR that just upgrades PMDA to use serializable universes but leaves everything else intact. Then we can worry about massive API changes. First make it work in a simple manner where you immediately reap the benefits of the serialization.
# self._results[self._frame_index][0] = self._ts.frame | ||
# the actual trajectory is at self._trajectory | ||
# self._results[self._frame_index][1] = self._trajectory.time | ||
self._results[self._frame_index] = h |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Storing a full histogram for each frame is bad – you can easily run out of memory. I think it is important that the aggregation is done every step and not just in _conclude
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But isn't that what also happens with _reduce
? It won't pass the full histogram back to the main process, but only the calculated frames in _dask_helper
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the density reduce
Lines 326 to 332 in 13fa3b5
def _reduce(res, result_single_frame): | |
""" 'accumulate' action for a time series""" | |
if isinstance(res, list) and len(res) == 0: | |
res = result_single_frame | |
else: | |
res += result_single_frame | |
return res |
Lines 305 to 306 in 13fa3b5
self._grid = self._results[:].sum(axis=0) | |
self._grid /= float(self.n_frames) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, the PMDA paper has a discussion on that topic.
@staticmethod | ||
def _reduce(res, result_single_frame): | ||
""" 'accumulate' action for a time series""" | ||
if isinstance(res, list) and len(res) == 0: | ||
res = result_single_frame | ||
else: | ||
res += result_single_frame | ||
return res |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the design that gets rid of reduce.
subgraphs = nx.connected_components(graph) | ||
comp = [g for g in subgraphs] | ||
return comp | ||
#raise TypeError(data) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I think I packed too much inside this PR. I was intended to discover the possibility to parallel LeafletFinder both among frames and inside single frame. Because for now, it only starts to work on the next frame after all the jobs are done in the current one.
So I changed this more coarsed instead of passing hundreds of jobs per frame * hundreds of frames to dask graph.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Problem is twofold (after AtomGroup
and everything are implemented):
- XDR and DCD format files fail to pickle the last frame:
u = mda.Universe(GRO_MEMPROT, XTC_MEMPROT)
u.trajectory[4] # last frame
pickle.loads(pickle.dumps(u.trajectory))
EOFError: Trying to seek over max number of frames
The major problem is trajectory._xdr.current_frame == 5 (1-based). I might need to add extra fix (and test?) to https://github.com/MDAnalysis/mdanalysis/pull/2723/files, or maybe in an individual PR? since the pickling is handled on their own
- The algorithm itself gets different results (for the same ts) with different
n_jobs
(maybe because I made some mistakes).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The "last frame" thing is a real issue. Oops!
Don't worry about LeafletFinder at the moment, it's not really your job to fix it, and it has lots of issues. (If you need it for your own research and you have an interest in getting it working then that's a bit different but I'd still say, focus on the serialization core problem for now.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just pushed a fix for the "last frame" issue.
Not LeafletFinder
per se, but maybe a general framework to suit all conditions. e.g.
- in each frame deserves parallelism.
- run multiple analysis on one single universe.
- run one analysis on multiple universe.
- complex dependencies between each job
A solution I can think of is to let ParallelAnalysisBase
inherents basic methods DaskMethodsMixin
as a dask custom collection, so that we can build complex dask graph. But I am not sure how well we can build a legit API that users do not need to care too much about the underlying implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good analysis of use cases and it would be useful to write this down somewhere. With PMDA so far (except LeafletFinder) we have been focusing on the simple split-apply-combine because that can be put in a simple "framework". Beyond that, it becomes difficult to do a "one size fits all" and it becomes a serious research project in CS.
I would be happy if we had a library that allows users to easily write their own split/apply/combine type analysis and where we provide a few additional parallelized analysis that might not fit into this scheme (such as LeafletFinder).
An interesting idea that has been coming up repeatedly is to "stack" multiple analysis, i.e., run multiple _single_frame()
sequentially so that the expensive data loading into memory time is amortized.
Finally, run one analysis on multiple universes seems to be a standard pleasingly parallel job that can make use of existing workflow management tools – I don't see what we can do directly to support it.
@@ -200,12 +204,13 @@ def _single_frame(self, ts, atomgroups, scheduler_kwargs, n_jobs, | |||
# Distribute the data over the available cores, apply the map function | |||
# and execute. | |||
parAtoms = db.from_sequence(arranged_coord, | |||
npartitions=len(arranged_coord)) | |||
npartitions=n_jobs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LeafletFinder is not parallelized over frames... I am not sure that choosing n_jobs is the correct choice here. Need to look at the original paper/algorithm.
# record time to generate universe and atom groups | ||
with timeit() as b_universe: | ||
u = mda.Universe(top, traj) | ||
agroups = [u.atoms[idx] for idx in indices] | ||
|
||
res = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting rid of this cludge is great.
yeah, makes sense. |
_single_frame
It makes sense to have this PR as a test case and to try out what might be possible. To make immediate progress, I'll continue with your PR #132 for now. |
Fix #131
Still on-going, showing some possible simplifications after
Universe
can be serialized.PR Checklist