Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel trajectory analysis (revisited) #618

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
f974a80
Modified base.py to work with parallel_job.py, added total dipole mom…
mattiafelice-palermo Jan 12, 2016
015a6ea
Minor modifications
mattiafelice-palermo Jan 12, 2016
5fddd1e
New AnalysisBase and ParallelProcessor classes. Now each child proces…
mattiafelice-palermo Jan 13, 2016
ba9568f
_single_frame in AnalysisBase now is called with timestep as argument
mattiafelice-palermo Jan 13, 2016
447f306
Added parallel feature to AnalysisBase class. Added a small progress …
mattiafelice-palermo Jan 17, 2016
dc29f26
Number of read configurations with AnalysisBase run(parallel=True) an…
mattiafelice-palermo Jan 17, 2016
59ce29e
Progressbar now outputs to stderr
mattiafelice-palermo Jan 17, 2016
fc94606
Now progressbar works in threading mode (thus also for serial jobs). …
mattiafelice-palermo Jan 18, 2016
7a54fad
Code check with pylint and more meaningful naming of variables
mattiafelice-palermo Jan 18, 2016
4fccebd
Modified method run() of AnalysisBase class so that it redirects to a…
mattiafelice-palermo Jan 18, 2016
d83a821
Modified tests and analysis modules to work with new AnalysisBase (wh…
mattiafelice-palermo Jan 20, 2016
747b486
Modified AnalysisBase, selections should now be passed within a list.
mattiafelice-palermo Jan 20, 2016
a67c120
Proposed improvements to parallelbase
richardjgowers Jan 20, 2016
e9535f6
Merge pull request #1 from MDAnalysis/feature-ParallelAnalysis
mattiafelice-palermo Jan 20, 2016
d6a77f9
Added tests for parallel usage of AnalysisBase
mattiafelice-palermo Jan 22, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 29 additions & 29 deletions package/MDAnalysis/analysis/base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*-
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# MDAnalysis --- http://www.MDAnalysis.org
# Copyright (c) 2006-2015 Naveen Michaud-Agrawal, Elizabeth J. Denning, Oliver Beckstein
Expand All @@ -17,13 +17,10 @@
"""
Analysis building blocks --- :mod:`MDAnalysis.analysis.base`
============================================================

A collection of useful building blocks for creating Analysis
classes.


"""
import numpy as np

import logging


Expand All @@ -32,49 +29,49 @@

class AnalysisBase(object):
"""Base class for defining multi frame analysis

The analysis base class is designed as a template for creating
multiframe analysis.

multiframe analysis.
The class implements the following methods:

_setup_frames(trajectory, start=None, stop=None, step=None)
Pass a Reader object and define the desired iteration pattern
through the trajectory

run
The user facing run method. Calls the analysis methods
defined below

Your analysis can implement the following methods, which are
called from run:

_prepare
Called before iteration on the trajectory has begun.
Data structures can be set up at this time, however most
error checking should be done in the __init__

_single_frame
Called after the trajectory is moved onto each new frame.

_conclude
Called once iteration on the trajectory is finished.
Apply normalisation and averaging to results here.

"""
def _setup_frames(self, trajectory, start=None,
stop=None, step=None):
self._trajectory = trajectory
start, stop, step = trajectory.check_slice_indices(
start, stop, step)
self.start = start
self.stop = stop
self.step = step
self.nframes = len(xrange(start, stop, step))
def _setup_frames(self, universe, start=None, stop=None, step=None):
"""
Add method docstring
"""
if universe is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need this hack any more right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean the if universe is None: ? Yep I think it should go. I'll commit a modified version right away.

pass
else:
self._universe = universe
self._trajectory = self._universe.trajectory

start, stop, step = self._trajectory.check_slice_indices(start,
stop,
step)
self.start = start
self.stop = stop
self.step = step
self.nframes = len(xrange(start, stop, step))

def _single_frame(self):
"""Calculate data from a single frame of trajectory

Don't worry about normalising, just deal with a single frame.
"""
pass
Expand All @@ -85,7 +82,6 @@ def _prepare(self):

def _conclude(self):
"""Finalise the results you've gathered.

Called at the end of the run() method to finish everything up.
"""
pass
Expand All @@ -96,10 +92,14 @@ def run(self):
self._prepare()
for i, ts in enumerate(
self._trajectory[self.start:self.stop:self.step]):
self._ts = ts
#logger.info("--> Doing frame {} of {}".format(i+1, self.nframes))
self._single_frame()
logger.info("Finishing up")
#logger.info("Finishing up")
self._conclude()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why this line has to go? Could we call _single_frame with ts as the argument?
ie.

for i, ts in enumerate(self._traj[etc]):
    self._single_frame(ts)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, calling _single_frame with ts as the argument looks much cleaner.


def __getstate__(self):
state = dict(self.__dict__)
for key in ['_universe', '_trajectory']:
if key in state:
del state[key]
return state
59 changes: 59 additions & 0 deletions package/MDAnalysis/analysis/electromagnetism.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from base import AnalysisBase
import numpy as np

class TotalDipole(AnalysisBase):

def __init__(self, universe=None, filename='order.dat', selection=None, start=None, stop=None, step=None):

if selection is None:
raise RuntimeError('In class TotalDipole: constroctur requires a selection')
else:
self.selection_string = selection

self._universe = universe
#self._trajectory = self._universe.trajectory
self.filename = filename
self.time = []

self._setup_frames(self._universe, start, stop, step)
self.dipoles = []

def _single_frame(self):
selection = self._universe.select_atoms(self.selection_string)

dipole = np.zeros(3)

for residue in selection.residues:
dipole += MolecularDipole(residue)

self.dipoles.append(dipole)

def _conclude(self):
total_dipole = np.sum(self.dipoles, axis=0)
print "Average dipole:", total_dipole/self.nframes
return total_dipole/self.nframes

def __iadd__(self,other):
self.dipoles += other.dipoles
self.nframes += other.nframes
return self


#--- Functions ---#
def MolecularDipole(residue):
charges = residue.charges
abscharges = np.absolute(charges)
charge_sum = np.sum(abscharges)
positions = residue.positions

charge_center = []

for coord in [0,1,2]:
charge_center.append(np.sum(np.multiply(abscharges,positions[:,coord]))/charge_sum)

dipole = []
# 4.803 converts to debyes
for coord in [0,1,2]:
dipole.append(np.sum(np.multiply(charges,positions[:,coord]-charge_center[coord]))*4.803)

return dipole
198 changes: 198 additions & 0 deletions package/MDAnalysis/analysis/parallel_jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
""" Add a docstring later

"""
import MDAnalysis as mda
import multiprocessing as mp
import copy
from operator import itemgetter

class ParallelProcessor(object):
""" Add class docstring later
"""
def __init__(self, jobs_list, universe, start=None, stop=None,
step=None, threads=None):
self._universe = universe
self.topology = universe.filename
self.trajname = universe._trajectory.filename

start, stop, step = universe.trajectory.check_slice_indices(start,
stop, step)

self.start = start
self.stop = stop
self.step = step
self.nframes = len(xrange(start, stop, step))

self.jobs_list = jobs_list

if threads is None:
self.threads = mp.cpu_count()
elif threads > mp.cpu_count():
self.threads = mp.cpu_count()
else:
self.threads = threads

self.slices = self.compute_slices()

def compute_slices(self):
"""
This function returns a list containing the start and
last configuration to be analyzed from each thread
"""
threads = self.threads # Get number of threads from initialization
step = self.step
configs = (self.stop-self.start)/step

self.nframes = configs

# Check whether the number of threads is higher than
# the number of trajectories to be analyzed
while configs/threads == 0:
threads -= 1

self.threads = threads # Update the number of threads

print "Total configurations: "+str(configs)
print "Analysis running on ", threads, " threads.\n"

# Number of cfgs for each thread, and remainder to be added
thread_cfg = configs/threads
reminder = configs%threads

slices = []
beg = self.start

# Compute the start and last configurations
for thread in range(0, threads):
if thread < reminder:
end = beg+step*thread_cfg
else:
end = beg+step*(thread_cfg-1)

slices.append([beg, end+1])
beg = end+step

# Print on screen the configurations assigned to each thread
for thread in range(threads):
confs = 1+(slices[thread][1]-1-slices[thread][0])/step
digits = len(str(self.stop))
line = "Thread "+str(thread+1).rjust(len(str(threads)))+": " \
+str(slices[thread][0]).rjust(digits)+"/" \
+str(slices[thread][1]-1).rjust(digits) \
+" | Configurations: "\
+str(confs).rjust(1+len(str(thread_cfg)))
print line

return slices


def compute(self, out_queue, order, progress):
"""
Run the single_frame method for each analysis object for all
the trajectories in the batch
"""
start = self.slices[order][0]
stop = self.slices[order][1]
step = self.step

jobs_list = []
universe = mda.Universe(self.topology, self.trajname)
traj = universe.trajectory

for job in self.jobs_list:
jobs_list.append(copy.deepcopy(job))

for job in jobs_list:
# Initialize job objects. start, stop and step are
# given so that self.nframes is computed correctly
job._setup_frames(universe=universe, start=start,
stop=stop, step=self.step)
job._prepare()

for timestep in traj[start:stop:step]:
for job in jobs_list:
job._single_frame()
progress.put(order) # Updates the progress bar

out_queue.put((jobs_list, order)) # Returns the results

def conclude(self, jobs_list):
"""
Run conclude for each job object
"""
for job in jobs_list:
job._conclude()

def parallel_run(self):
"""
Create copies of the original object to be
dispatched to multiprocess
"""
threads = self.threads

# Queues for the communication between parent and child processes
out_queue = mp.Manager().Queue()
progress = mp.Manager().Queue()

# Prepare multiprocess objects
processes = [mp.Process(target=self.compute,
args=(out_queue, order, progress))
for order in range(threads)]


# Run processes
for process in processes:
process.start()

thread_configs = [1+(elem[1]-elem[0]-1)/self.step
for elem in self.slices]

data = {}

for thread in range(threads):
data[thread] = 0

total = 0
print
while any([process.is_alive() for process in processes]):
while not progress.empty():
core = progress.get()
data[core] += 1
total += 1

for thread in range(threads):
print "Core "+str(thread)+": " \
+str(data[thread])+"/" \
+str(thread_configs[thread])

print "Total: "+str(total)+"/"\
+str(self.nframes)

print "\033["+str(threads+2)+"A"

print (threads+1)*"\n"

# Exit the completed processes
for process in processes:
process.join()

results = []

# Collects results from the queue
while not out_queue.empty():
results.append(out_queue.get())

jobs_num = len(self.jobs_list)

result_list = []

# Sum the job objects from each thread
for job in range(jobs_num):
for order, thread in enumerate(sorted(results, key=itemgetter(1))):
if order == 0:
result_list.append(thread[0][job])
else:
result_list[job] += thread[0][job]

# Run the conclude function for each job
self.conclude(result_list)
Loading