-
Notifications
You must be signed in to change notification settings - Fork 667
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parallel trajectory analysis (revisited) #618
Changes from 3 commits
f974a80
015a6ea
5fddd1e
ba9568f
447f306
dc29f26
59ce29e
fc94606
7a54fad
4fccebd
d83a821
747b486
a67c120
e9535f6
d6a77f9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*- | ||
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 | ||
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 | ||
# | ||
# MDAnalysis --- http://www.MDAnalysis.org | ||
# Copyright (c) 2006-2015 Naveen Michaud-Agrawal, Elizabeth J. Denning, Oliver Beckstein | ||
|
@@ -17,13 +17,10 @@ | |
""" | ||
Analysis building blocks --- :mod:`MDAnalysis.analysis.base` | ||
============================================================ | ||
|
||
A collection of useful building blocks for creating Analysis | ||
classes. | ||
|
||
|
||
""" | ||
import numpy as np | ||
|
||
import logging | ||
|
||
|
||
|
@@ -32,49 +29,49 @@ | |
|
||
class AnalysisBase(object): | ||
"""Base class for defining multi frame analysis | ||
|
||
The analysis base class is designed as a template for creating | ||
multiframe analysis. | ||
|
||
multiframe analysis. | ||
The class implements the following methods: | ||
|
||
_setup_frames(trajectory, start=None, stop=None, step=None) | ||
Pass a Reader object and define the desired iteration pattern | ||
through the trajectory | ||
|
||
run | ||
The user facing run method. Calls the analysis methods | ||
defined below | ||
|
||
Your analysis can implement the following methods, which are | ||
called from run: | ||
|
||
_prepare | ||
Called before iteration on the trajectory has begun. | ||
Data structures can be set up at this time, however most | ||
error checking should be done in the __init__ | ||
|
||
_single_frame | ||
Called after the trajectory is moved onto each new frame. | ||
|
||
_conclude | ||
Called once iteration on the trajectory is finished. | ||
Apply normalisation and averaging to results here. | ||
|
||
""" | ||
def _setup_frames(self, trajectory, start=None, | ||
stop=None, step=None): | ||
self._trajectory = trajectory | ||
start, stop, step = trajectory.check_slice_indices( | ||
start, stop, step) | ||
self.start = start | ||
self.stop = stop | ||
self.step = step | ||
self.nframes = len(xrange(start, stop, step)) | ||
def _setup_frames(self, universe, start=None, stop=None, step=None): | ||
""" | ||
Add method docstring | ||
""" | ||
if universe is None: | ||
pass | ||
else: | ||
self._universe = universe | ||
self._trajectory = self._universe.trajectory | ||
|
||
start, stop, step = self._trajectory.check_slice_indices(start, | ||
stop, | ||
step) | ||
self.start = start | ||
self.stop = stop | ||
self.step = step | ||
self.nframes = len(xrange(start, stop, step)) | ||
|
||
def _single_frame(self): | ||
"""Calculate data from a single frame of trajectory | ||
|
||
Don't worry about normalising, just deal with a single frame. | ||
""" | ||
pass | ||
|
@@ -85,7 +82,6 @@ def _prepare(self): | |
|
||
def _conclude(self): | ||
"""Finalise the results you've gathered. | ||
|
||
Called at the end of the run() method to finish everything up. | ||
""" | ||
pass | ||
|
@@ -96,10 +92,14 @@ def run(self): | |
self._prepare() | ||
for i, ts in enumerate( | ||
self._trajectory[self.start:self.stop:self.step]): | ||
self._ts = ts | ||
#logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) | ||
self._single_frame() | ||
logger.info("Finishing up") | ||
#logger.info("Finishing up") | ||
self._conclude() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason why this line has to go? Could we call for i, ts in enumerate(self._traj[etc]):
self._single_frame(ts) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree, calling _single_frame with ts as the argument looks much cleaner. |
||
|
||
def __getstate__(self): | ||
state = dict(self.__dict__) | ||
for key in ['_universe', '_trajectory']: | ||
if key in state: | ||
del state[key] | ||
return state |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
from base import AnalysisBase | ||
import numpy as np | ||
|
||
class TotalDipole(AnalysisBase): | ||
|
||
def __init__(self, universe=None, filename='order.dat', selection=None, start=None, stop=None, step=None): | ||
|
||
if selection is None: | ||
raise RuntimeError('In class TotalDipole: constroctur requires a selection') | ||
else: | ||
self.selection_string = selection | ||
|
||
self._universe = universe | ||
#self._trajectory = self._universe.trajectory | ||
self.filename = filename | ||
self.time = [] | ||
|
||
self._setup_frames(self._universe, start, stop, step) | ||
self.dipoles = [] | ||
|
||
def _single_frame(self): | ||
selection = self._universe.select_atoms(self.selection_string) | ||
|
||
dipole = np.zeros(3) | ||
|
||
for residue in selection.residues: | ||
dipole += MolecularDipole(residue) | ||
|
||
self.dipoles.append(dipole) | ||
|
||
def _conclude(self): | ||
total_dipole = np.sum(self.dipoles, axis=0) | ||
print "Average dipole:", total_dipole/self.nframes | ||
return total_dipole/self.nframes | ||
|
||
def __iadd__(self,other): | ||
self.dipoles += other.dipoles | ||
self.nframes += other.nframes | ||
return self | ||
|
||
|
||
#--- Functions ---# | ||
def MolecularDipole(residue): | ||
charges = residue.charges | ||
abscharges = np.absolute(charges) | ||
charge_sum = np.sum(abscharges) | ||
positions = residue.positions | ||
|
||
charge_center = [] | ||
|
||
for coord in [0,1,2]: | ||
charge_center.append(np.sum(np.multiply(abscharges,positions[:,coord]))/charge_sum) | ||
|
||
dipole = [] | ||
# 4.803 converts to debyes | ||
for coord in [0,1,2]: | ||
dipole.append(np.sum(np.multiply(charges,positions[:,coord]-charge_center[coord]))*4.803) | ||
|
||
return dipole |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,198 @@ | ||
""" Add a docstring later | ||
|
||
""" | ||
import MDAnalysis as mda | ||
import multiprocessing as mp | ||
import copy | ||
from operator import itemgetter | ||
|
||
class ParallelProcessor(object): | ||
""" Add class docstring later | ||
""" | ||
def __init__(self, jobs_list, universe, start=None, stop=None, | ||
step=None, threads=None): | ||
self._universe = universe | ||
self.topology = universe.filename | ||
self.trajname = universe._trajectory.filename | ||
|
||
start, stop, step = universe.trajectory.check_slice_indices(start, | ||
stop, step) | ||
|
||
self.start = start | ||
self.stop = stop | ||
self.step = step | ||
self.nframes = len(xrange(start, stop, step)) | ||
|
||
self.jobs_list = jobs_list | ||
|
||
if threads is None: | ||
self.threads = mp.cpu_count() | ||
elif threads > mp.cpu_count(): | ||
self.threads = mp.cpu_count() | ||
else: | ||
self.threads = threads | ||
|
||
self.slices = self.compute_slices() | ||
|
||
def compute_slices(self): | ||
""" | ||
This function returns a list containing the start and | ||
last configuration to be analyzed from each thread | ||
""" | ||
threads = self.threads # Get number of threads from initialization | ||
step = self.step | ||
configs = (self.stop-self.start)/step | ||
|
||
self.nframes = configs | ||
|
||
# Check whether the number of threads is higher than | ||
# the number of trajectories to be analyzed | ||
while configs/threads == 0: | ||
threads -= 1 | ||
|
||
self.threads = threads # Update the number of threads | ||
|
||
print "Total configurations: "+str(configs) | ||
print "Analysis running on ", threads, " threads.\n" | ||
|
||
# Number of cfgs for each thread, and remainder to be added | ||
thread_cfg = configs/threads | ||
reminder = configs%threads | ||
|
||
slices = [] | ||
beg = self.start | ||
|
||
# Compute the start and last configurations | ||
for thread in range(0, threads): | ||
if thread < reminder: | ||
end = beg+step*thread_cfg | ||
else: | ||
end = beg+step*(thread_cfg-1) | ||
|
||
slices.append([beg, end+1]) | ||
beg = end+step | ||
|
||
# Print on screen the configurations assigned to each thread | ||
for thread in range(threads): | ||
confs = 1+(slices[thread][1]-1-slices[thread][0])/step | ||
digits = len(str(self.stop)) | ||
line = "Thread "+str(thread+1).rjust(len(str(threads)))+": " \ | ||
+str(slices[thread][0]).rjust(digits)+"/" \ | ||
+str(slices[thread][1]-1).rjust(digits) \ | ||
+" | Configurations: "\ | ||
+str(confs).rjust(1+len(str(thread_cfg))) | ||
print line | ||
|
||
return slices | ||
|
||
|
||
def compute(self, out_queue, order, progress): | ||
""" | ||
Run the single_frame method for each analysis object for all | ||
the trajectories in the batch | ||
""" | ||
start = self.slices[order][0] | ||
stop = self.slices[order][1] | ||
step = self.step | ||
|
||
jobs_list = [] | ||
universe = mda.Universe(self.topology, self.trajname) | ||
traj = universe.trajectory | ||
|
||
for job in self.jobs_list: | ||
jobs_list.append(copy.deepcopy(job)) | ||
|
||
for job in jobs_list: | ||
# Initialize job objects. start, stop and step are | ||
# given so that self.nframes is computed correctly | ||
job._setup_frames(universe=universe, start=start, | ||
stop=stop, step=self.step) | ||
job._prepare() | ||
|
||
for timestep in traj[start:stop:step]: | ||
for job in jobs_list: | ||
job._single_frame() | ||
progress.put(order) # Updates the progress bar | ||
|
||
out_queue.put((jobs_list, order)) # Returns the results | ||
|
||
def conclude(self, jobs_list): | ||
""" | ||
Run conclude for each job object | ||
""" | ||
for job in jobs_list: | ||
job._conclude() | ||
|
||
def parallel_run(self): | ||
""" | ||
Create copies of the original object to be | ||
dispatched to multiprocess | ||
""" | ||
threads = self.threads | ||
|
||
# Queues for the communication between parent and child processes | ||
out_queue = mp.Manager().Queue() | ||
progress = mp.Manager().Queue() | ||
|
||
# Prepare multiprocess objects | ||
processes = [mp.Process(target=self.compute, | ||
args=(out_queue, order, progress)) | ||
for order in range(threads)] | ||
|
||
|
||
# Run processes | ||
for process in processes: | ||
process.start() | ||
|
||
thread_configs = [1+(elem[1]-elem[0]-1)/self.step | ||
for elem in self.slices] | ||
|
||
data = {} | ||
|
||
for thread in range(threads): | ||
data[thread] = 0 | ||
|
||
total = 0 | ||
while any([process.is_alive() for process in processes]): | ||
while not progress.empty(): | ||
core = progress.get() | ||
data[core] += 1 | ||
total += 1 | ||
|
||
for thread in range(threads): | ||
print "Core "+str(thread)+": " \ | ||
+str(data[thread])+"/" \ | ||
+str(thread_configs[thread]) | ||
|
||
print "Total: "+str(total)+"/"\ | ||
+str(self.nframes) | ||
|
||
print "\033["+str(threads+2)+"A" | ||
|
||
print (threads+1)*"\n" | ||
|
||
# Exit the completed processes | ||
for process in processes: | ||
process.join() | ||
|
||
results = [] | ||
|
||
# Collects results from the queue | ||
while not out_queue.empty(): | ||
results.append(out_queue.get()) | ||
|
||
jobs_num = len(self.jobs_list) | ||
|
||
result_list = [] | ||
|
||
# Sum the job objects from each thread | ||
for job in range(jobs_num): | ||
for order, thread in enumerate(sorted(results, key=itemgetter(1))): | ||
if order == 0: | ||
result_list.append(thread[0][job]) | ||
else: | ||
result_list[job] += thread[0][job] | ||
|
||
# Run the conclude function for each job | ||
self.conclude(result_list) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need this hack any more right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean the
if universe is None:
? Yep I think it should go. I'll commit a modified version right away.