diff --git a/package/MDAnalysis/analysis/base.py b/package/MDAnalysis/analysis/base.py index f3a722b985d..c2aaf6ae667 100644 --- a/package/MDAnalysis/analysis/base.py +++ b/package/MDAnalysis/analysis/base.py @@ -1,5 +1,5 @@ # -*- Mode: python; tab-width: 4; indent-tabs-mode:nil; coding:utf-8 -*- -# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 +# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4 # # MDAnalysis --- http://www.MDAnalysis.org # Copyright (c) 2006-2015 Naveen Michaud-Agrawal, Elizabeth J. Denning, Oliver Beckstein @@ -17,14 +17,17 @@ """ Analysis building blocks --- :mod:`MDAnalysis.analysis.base` ============================================================ - A collection of useful building blocks for creating Analysis classes. - - """ -import numpy as np +from __future__ import division + import logging +import multiprocessing as mp +import copy +from operator import itemgetter + +import MDAnalysis as mda logger = logging.getLogger(__name__) @@ -32,49 +35,46 @@ class AnalysisBase(object): """Base class for defining multi frame analysis - The analysis base class is designed as a template for creating - multiframe analysis. - + multiframe analysis. The class implements the following methods: - _setup_frames(trajectory, start=None, stop=None, step=None) Pass a Reader object and define the desired iteration pattern through the trajectory - run The user facing run method. Calls the analysis methods defined below - Your analysis can implement the following methods, which are called from run: - _prepare Called before iteration on the trajectory has begun. Data structures can be set up at this time, however most error checking should be done in the __init__ - _single_frame Called after the trajectory is moved onto each new frame. - _conclude Called once iteration on the trajectory is finished. Apply normalisation and averaging to results here. - """ - def _setup_frames(self, trajectory, start=None, - stop=None, step=None): - self._trajectory = trajectory - start, stop, step = trajectory.check_slice_indices( - start, stop, step) - self.start = start - self.stop = stop - self.step = step - self.nframes = len(xrange(start, stop, step)) - - def _single_frame(self): + def _setup_frames(self, universe, start=None, stop=None, step=None): + """ + Add method docstring + """ + if universe is None: + pass + else: + self._universe = universe + self._trajectory = self._universe.trajectory + + start, stop, step = self._trajectory.check_slice_indices( + start, stop, step) + self.start = start + self.stop = stop + self.step = step + self.nframes = len(xrange(start, stop, step)) + + def _single_frame(self, timestep): """Calculate data from a single frame of trajectory - Don't worry about normalising, just deal with a single frame. """ pass @@ -85,21 +85,175 @@ def _prepare(self): def _conclude(self): """Finalise the results you've gathered. - Called at the end of the run() method to finish everything up. """ pass - def run(self): + def run(self, parallel=None, nthreads=None): + """ Chooses whether to run analysis in serial or parallel + mode depending on user input""" + if not parallel: + self._serial_run() + else: + self._parallel_run(nthreads) + + def _serial_run(self): """Perform the calculation""" logger.info("Starting preparation") + prog = mda.lib.log.Progressbar([self.nframes]) + prog.start() self._prepare() for i, ts in enumerate( self._trajectory[self.start:self.stop:self.step]): - self._ts = ts #logger.info("--> Doing frame {} of {}".format(i+1, self.nframes)) - self._single_frame() - logger.info("Finishing up") + self._single_frame(ts) + prog.update(0) + #logger.info("Finishing up") self._conclude() + def _parallel_run(self, nthreads=None): + """ + Create copies of the original object to be + dispatched to multiprocess + """ + if nthreads is None: + self.nthreads = mp.cpu_count() - 1 + else: + # Cap number of threads + self.nthreads = min([mp.cpu_count(), nthreads, self.nframes]) + + self.slices = self._compute_slices() + + # Queues for the communication between parent and child processes + out_queue = mp.Manager().Queue() + progress = mp.Manager().Queue() + + # Prepare multiprocess objects + processes = [mp.Process(target=self._compute, + args=(out_queue, order, progress)) + for order in range(self.nthreads)] + + # Run processes + for process in processes: + process.start() + + thread_configs = [1+(elem[1]-elem[0]-1) // self.step + for elem in self.slices] + + prog = mda.lib.log.Progressbar( + thread_configs, bar_length=50, name=self.__class__.__name__) + prog.start() + + while any(process.is_alive() for process in processes): + while not progress.empty(): + core = progress.get() + prog.update(core) + + # Exit the completed processes + for process in processes: + process.join() + + results = [] + # Collects results from the queue + while not out_queue.empty(): + results.append(out_queue.get()) + + # Sort results, then collate them + for other_results in sorted(results, key=itemgetter(1)): + self._add_other_results(other_results[0]) + + # Averaging here + self._conclude() + + def _compute_slices(self): + """ + This function returns a list containing the start and + last configuration to be analyzed from each thread + """ + step = self.step + configs = 1 + (self.stop - self.start - 1) // step + + print "Total configurations: {}".format(configs) + print "Analysis running on {} threads.\n".format(self.nthreads) + + # Number of cfgs for each thread, and remainder to be added + thread_cfg = configs // self.nthreads + reminder = configs % self.nthreads + + slices = [] + beg = self.start + + # Compute the start and last configurations + for thread in range(0, self.nthreads): + if thread < reminder: + end = beg + step * thread_cfg + else: + end = beg + step * (thread_cfg-1) + + slices.append([beg, end+1]) + beg = end + step + + # Print on screen the configurations assigned to each thread + for thread in range(self.nthreads): + confs = 1+(slices[thread][1]-1-slices[thread][0])/step + digits = len(str(self.stop)) + line = "Thread "+str(thread+1).rjust(len(str(self.nthreads)))+": " \ + +str(slices[thread][0]).rjust(digits)+"/" \ + +str(slices[thread][1]-1).rjust(digits) \ + +" | Configurations: "\ + +str(confs).rjust(1+len(str(thread_cfg))) + print line + + return slices + + def _compute(self, out_queue, order, progress): + """ + Run the single_frame method for each analysis object for all + the trajectories in the batch + + order - my id among all the parallel versions + out_queue - where to put my results + progress - the progressbar to update + """ + start = self.slices[order][0] + stop = self.slices[order][1] + step = self.step + + # Create a local version of the analysis object + analysis_object = copy.deepcopy(self) + + analysis_object.nframes = len(xrange(start, stop, step)) + traj = analysis_object._universe.trajectory + + analysis_object._prepare() + + progress.put(order) + for timestep in traj[start:stop:step]: + analysis_object._single_frame(timestep) + progress.put(order) # Updates the progress bar + + # Returns the results along with our order index + out_queue.put((analysis_object.results, order)) + + def __getstate__(self): + state = dict(self.__dict__) + # Replace the _ags entry with indices + # pop removes the _ag key, or returns [] (empty list) if the Key didn't exist + ag_indices = [ag.indices for ag in state.pop('_ags', [])] + universe_filenames = (self._universe.filename, self._universe.trajectory.filename) + state.pop('_ags', None) + state.pop('_universe', None) + state.pop('_trajectory', None) + + return state, universe_filenames, ag_indices + + def __setstate__(self, state): + statedict, universe_filenames, ag_indices = state + self.__dict__ = statedict + # Create my local Universe + self._universe = mda.Universe(*universe_filenames) + self._ags = [self._universe.atoms[idx] + for idx in ag_indices] + + diff --git a/package/MDAnalysis/analysis/electromagnetism.py b/package/MDAnalysis/analysis/electromagnetism.py new file mode 100644 index 00000000000..05a3a445cda --- /dev/null +++ b/package/MDAnalysis/analysis/electromagnetism.py @@ -0,0 +1,59 @@ +from base import AnalysisBase +import numpy as np + +class TotalDipole(AnalysisBase): + + def __init__(self, universe=None, filename='order.dat', selection=None, start=None, stop=None, step=None): + + if selection is None: + raise RuntimeError('In class TotalDipole: constroctur requires a selection') + else: + self.selection_string = selection + + self._universe = universe + #self._trajectory = self._universe.trajectory + self.filename = filename + self.time = [] + + self._setup_frames(self._universe, start, stop, step) + self.dipoles = [] + + def _single_frame(self,timestep): + selection = self._universe.select_atoms(self.selection_string) + + dipole = np.zeros(3) + + for residue in selection.residues: + dipole += MolecularDipole(residue) + + self.dipoles.append(dipole) + + def _conclude(self): + total_dipole = np.sum(self.dipoles, axis=0) + print "Average dipole:", total_dipole/self.nframes + return total_dipole/self.nframes + + def __iadd__(self,other): + self.dipoles += other.dipoles + self.nframes += other.nframes + return self + + +#--- Functions ---# +def MolecularDipole(residue): + charges = residue.charges + abscharges = np.absolute(charges) + charge_sum = np.sum(abscharges) + positions = residue.positions + + charge_center = [] + + for coord in [0,1,2]: + charge_center.append(np.sum(np.multiply(abscharges,positions[:,coord]))/charge_sum) + + dipole = [] + # 4.803 converts to debyes + for coord in [0,1,2]: + dipole.append(np.sum(np.multiply(charges,positions[:,coord]-charge_center[coord]))*4.803) + + return dipole diff --git a/package/MDAnalysis/analysis/parallel_jobs.py b/package/MDAnalysis/analysis/parallel_jobs.py new file mode 100644 index 00000000000..b704d58e621 --- /dev/null +++ b/package/MDAnalysis/analysis/parallel_jobs.py @@ -0,0 +1,184 @@ +""" Add a docstring later + +""" +import multiprocessing as mp +import copy +import time +from operator import itemgetter +import MDAnalysis as mda + +class ParallelProcessor(object): + """ Add class docstring later + """ + def __init__(self, jobs_list, universe, start=None, stop=None, + step=None, threads=None): + self._universe = universe + self.topology = universe.filename + self.trajname = universe._trajectory.filename + + start, stop, step = universe.trajectory.check_slice_indices(start, + stop, step) + + self.start = start + self.stop = stop + self.step = step + self.nframes = len(xrange(start, stop, step)) + + self.jobs_list = jobs_list + + if threads is None: + self.threads = mp.cpu_count() + elif threads > mp.cpu_count(): + self.threads = mp.cpu_count() + else: + self.threads = threads + + self.slices = self.compute_slices() + + def compute_slices(self): + """ + This function returns a list containing the start and + last configuration to be analyzed from each thread + """ + threads = self.threads # Get number of threads from initialization + step = self.step + configs = 1+(self.stop-self.start-1)/step + + self.nframes = configs + + # Check whether the number of threads is higher than + # the number of trajectories to be analyzed + while configs/threads == 0: + threads -= 1 + + self.threads = threads # Update the number of threads + + print "Total configurations: "+str(configs) + print "Analysis running on ", threads, " threads.\n" + + # Number of cfgs for each thread, and remainder to be added + thread_cfg = configs/threads + reminder = configs%threads + + slices = [] + beg = self.start + + # Compute the start and last configurations + for thread in range(0, threads): + if thread < reminder: + end = beg+step*thread_cfg + else: + end = beg+step*(thread_cfg-1) + + slices.append([beg, end+1]) + beg = end+step + + # Print on screen the configurations assigned to each thread + for thread in range(threads): + confs = 1+(slices[thread][1]-1-slices[thread][0])/step + digits = len(str(self.stop)) + line = "Thread "+str(thread+1).rjust(len(str(threads)))+": " \ + +str(slices[thread][0]).rjust(digits)+"/" \ + +str(slices[thread][1]-1).rjust(digits) \ + +" | Configurations: "\ + +str(confs).rjust(1+len(str(thread_cfg))) + print line + + return slices + + + def compute(self, out_queue, order, progress): + """ + Run the single_frame method for each analysis object for all + the trajectories in the batch + """ + start = self.slices[order][0] + stop = self.slices[order][1] + step = self.step + + jobs_list = [] + universe = mda.Universe(self.topology, self.trajname) + traj = universe.trajectory + + for job in self.jobs_list: + jobs_list.append(copy.deepcopy(job)) + + for job in jobs_list: + # Initialize job objects. start, stop and step are + # given so that self.nframes is computed correctly + job._setup_frames(universe=universe, start=start, + stop=stop, step=self.step) + job._prepare() + + progress.put(order) + for timestep in traj[start:stop:step]: + for job in jobs_list: + job._single_frame(timestep) + progress.put(order) # Updates the progress bar + + out_queue.put((jobs_list, order)) # Returns the results + + def conclude(self, jobs_list): + """ + Run conclude for each job object + """ + for job in jobs_list: + job._conclude() + + def parallel_run(self): + """ + Create copies of the original object to be + dispatched to multiprocess + """ + threads = self.threads + + # Queues for the communication between parent and child processes + out_queue = mp.Manager().Queue() + progress = mp.Manager().Queue() + + # Prepare multiprocess objects + processes = [mp.Process(target=self.compute, + args=(out_queue, order, progress)) + for order in range(threads)] + + + # Run processes + for process in processes: + process.start() + + thread_configs = [1+(elem[1]-elem[0]-1)/self.step + for elem in self.slices] + + prog = mda.lib.log.Progressbar(thread_configs, bar_length=50, + name="ParallelProcessor") + prog.start() + + while any([process.is_alive() for process in processes]): + while not progress.empty(): + core = progress.get() + prog.update(core) + + # Exit the completed processes + for process in processes: + process.join() + + results = [] + + # Collects results from the queue + while not out_queue.empty(): + results.append(out_queue.get()) + + jobs_num = len(self.jobs_list) + + result_list = [] + + # Sum the job objects from each thread + for job in range(jobs_num): + for order, thread in enumerate(sorted(results, key=itemgetter(1))): + if order == 0: + result_list.append(thread[0][job]) + else: + result_list[job] += thread[0][job] + + # Run the conclude function for each job + self.conclude(result_list) diff --git a/package/MDAnalysis/analysis/polymer.py b/package/MDAnalysis/analysis/polymer.py index 77a4c05d60e..10708cf5a99 100644 --- a/package/MDAnalysis/analysis/polymer.py +++ b/package/MDAnalysis/analysis/polymer.py @@ -73,12 +73,12 @@ def __init__(self, atomgroups, if not all( l == chainlength for l in lens): raise ValueError("Not all AtomGroups were the same size") - self._setup_frames(atomgroups[0].universe.trajectory, + self._setup_frames(atomgroups[0].universe, start, stop, step) self._results = np.zeros(chainlength - 1, dtype=np.float32) - def _single_frame(self): + def _single_frame(self,timestep): # could optimise this by writing a "self dot array" # we're only using the upper triangle of np.inner # function would accept a bunch of coordinates and spit out the diff --git a/package/MDAnalysis/analysis/rdf.py b/package/MDAnalysis/analysis/rdf.py index 946e2ae8e9e..e0a6c677805 100644 --- a/package/MDAnalysis/analysis/rdf.py +++ b/package/MDAnalysis/analysis/rdf.py @@ -80,11 +80,10 @@ class InterRDF(AnalysisBase): def __init__(self, g1, g2, nbins=75, range=(0.0, 15.0), exclusion_block=None, start=None, stop=None, step=None): - self.g1 = g1 - self.g2 = g2 - self.u = g1.universe + self._ags = [g1, g2] + self._universe = g1.universe - self._setup_frames(self.u.trajectory, + self._setup_frames(self._universe, start=start, stop=stop, step=step) @@ -92,19 +91,22 @@ def __init__(self, g1, g2, self.rdf_settings = {'bins':nbins, 'range':range} + self.results = {} + # Empty histogram to store the RDF count, edges = np.histogram([-1], **self.rdf_settings) count = count.astype(np.float64) count *= 0.0 - self.count = count + self.results['count'] = count self.edges = edges self.bins = 0.5 * (edges[:-1] + edges[1:]) # Need to know average volume - self.volume = 0.0 + self.results['volume'] = 0.0 # Allocate a results array which we will reuse - self._result = np.zeros((len(self.g1), len(self.g2)), dtype=np.float64) + self._result = np.zeros((len(self._ags[0]), len(self._ags[1])), + dtype=np.float64) # If provided exclusions, create a mask of _result which # lets us take these out if not exclusion_block is None: @@ -115,22 +117,28 @@ def __init__(self, g1, g2, self._exclusion_block = None self._exclusion_mask = None - def _single_frame(self): - distances.distance_array(self.g1.positions, self.g2.positions, - box=self.u.dimensions, result=self._result) + def _add_other_results(self, other): + for k in ['count', 'volume']: + self.results[k] += other[k] + + def _single_frame(self,timestep): + distances.distance_array( + self._ags[0].positions, + self._ags[1].positions, + box=self._ags[0].dimensions, + result=self._result) # Maybe exclude same molecule distances if not self._exclusion_mask is None: self._exclusion_mask[:] = self._maxrange count = np.histogram(self._result, **self.rdf_settings)[0] - self.count += count - - self.volume += self._ts.volume + self.results['count'] += count + self.results['volume'] += timestep.volume def _conclude(self): # Number of each selection - nA = len(self.g1) - nB = len(self.g2) + nA = len(self._ags[0]) + nB = len(self._ags[1]) N = nA * nB # If we had exclusions, take these into account @@ -144,10 +152,10 @@ def _conclude(self): vol *= 4/3.0 * np.pi # Average number density - box_vol = self.volume / self.nframes + box_vol = self.results['volume'] / self.nframes density = N / box_vol - rdf = self.count / (density * vol * self.nframes) + rdf = self.results['count'] / (density * vol * self.nframes) self.rdf = rdf diff --git a/package/MDAnalysis/lib/log.py b/package/MDAnalysis/lib/log.py index 3f8d6eba708..2bef24b05cd 100644 --- a/package/MDAnalysis/lib/log.py +++ b/package/MDAnalysis/lib/log.py @@ -75,6 +75,10 @@ import sys import logging +import time +import datetime +import threading +import numpy as np from .. import version @@ -303,3 +307,131 @@ def echo(self, step, **kwargs): else: return echo(format % vars(self)) + +class Progressbar(threading.Thread): + """ Add class docstring later + """ + def __init__(self, list_of_totals, name="Analysis", + bar_length=40, refresh=1): + threading.Thread.__init__(self) + # Number of units per job, total of units and number of jobs + self.jobs_compute_units = np.array(list_of_totals) + self.total_units = np.sum(self.jobs_compute_units) + self.jobs = len(self.jobs_compute_units) + + self.has_started = [False] * self.jobs + self.counter = 0 # number of processed units + self.last_update = np.zeros(self.jobs) # times of last update + self.cumulative_time = 0 + + self.daemon = True # kills bar if main thread died + self.eta = 0 # estimated time of accomplishment + self.elaps = 0 # elapsed time + self.speed = 0 # seconds per unit + self.freq = refresh # frequency of bar refreshing + + self.remaining_units = np.amax(self.jobs_compute_units) + self.remaining_changed = False + + # Bar-related variables + self.name = name + self.bar_length = bar_length + self.dots = 0 + self.eta_started = False + self.cfg_len = len(str(self.total_units)) + + def _update_timings(self, job_id): + istant = time.time() + + # Update statistics each time a new unit has been completed + if self.has_started[job_id]: + self.counter += 1 + self.jobs_compute_units[job_id] -= 1 + self.cumulative_time += istant-self.last_update[job_id] + self.last_update[job_id] = istant + else: + self.has_started[job_id] = True + self.last_update[job_id] = istant + + # Update eta only if the highest number of units left has + # decreased (prevents eta from changind all the time) + remainings = np.amax(self.jobs_compute_units) + + if remainings != self.remaining_units: + self.remaining_changed = True + self.remaining_units = remainings + + def _compute_eta(self): + # Only update eta if the highest number of units left has changed + if self.remaining_changed: + self.speed = (self.cumulative_time/self.counter) + self.eta = self.speed*self.remaining_units + self.remaining_changed = False + + def update(self, job_id): + """Update progressbar by sending the list index corresponding to the + job you're willing to update. + + Example: two jobs, job1 has 10 work units and job2 15. To + initialize Progressbar you created the object as: + progressbar = MDAnalysis.lib.log.Progressbar([10, 15]) + + If job1 finished one unit, you should run: + progressbar.update(0) + + while if job2 finished one unit, you should run: + progressbar.update(1) + + """ + self._update_timings(job_id) + self._compute_eta() + + def _print_bar(self): + percentage = self.counter*100./self.total_units + bars = int(percentage/100.*self.bar_length) + empty = self.bar_length-bars + + eta = "" + + left_cfgs = " "+str(self.total_units-self.counter).rjust(self.cfg_len)+"/" \ + +str(self.total_units).rjust(self.cfg_len) + + # Only start timing if at least one unit has arrived + if self.eta < 1 and self.eta_started is False: + eta = str(self.dots*'.')+str((3-self.dots)*' ') + self.dots += 1 + if self.dots > 3: + self.dots = 0 + else: + self.eta_started = True + eta = str(datetime.timedelta(seconds=int(self.eta))) + + # Output bar to stderr + print "\033[2A" # move cursor one line up + sys.stderr.write(self.name+" ["+str(bars*"=")+str(empty*" ")+"] " \ + +str(round(percentage, 1)).rjust(4)+"% Elapsed: " \ + +str(datetime.timedelta(seconds=self.elaps)) \ + +" ETA: "+eta+left_cfgs+"\n") + sys.stdout.flush() + + def run(self): + # Avoids negative ETA + print "\n" # avoid overwriting previous line in terminal + while self.remaining_units > 0: + if self.eta > self.freq: + self.eta -= self.freq + self._print_bar() + else: + self._print_bar() + # Update elaps time only if at least one unit has arrived + if any(self.has_started): + self.elaps += self.freq + + time.sleep(self.freq) + # Print a summary at the end + self._summary() + + def _summary(self): + sys.stderr.write("\n") + print self.name+": computed "+str(self.total_units) \ + +" units in "+str(datetime.timedelta(seconds=self.elaps))+"\n" diff --git a/testsuite/MDAnalysisTests/analysis/test_base.py b/testsuite/MDAnalysisTests/analysis/test_base.py index 5e9c1bbd725..e02b542f05d 100644 --- a/testsuite/MDAnalysisTests/analysis/test_base.py +++ b/testsuite/MDAnalysisTests/analysis/test_base.py @@ -18,6 +18,7 @@ from numpy.testing import ( assert_, ) +from multiprocessing import cpu_count import MDAnalysis as mda from MDAnalysis.analysis.base import AnalysisBase @@ -27,51 +28,77 @@ class FrameAnalysis(AnalysisBase): """Just grabs frame numbers of frames it goes over""" - def __init__(self, reader, start=None, stop=None, step=None): - self.traj = reader - self._setup_frames(reader, + def __init__(self, universe, start=None, stop=None, step=None): + self._setup_frames(universe, start=start, stop=stop, step=step) - self.frames = [] + self.results = {} + self.results['frames'] = [] - def _single_frame(self): - self.frames.append(self._ts.frame) + def _single_frame(self, timestep): + self.results['frames'].append(timestep.frame) + def _add_other_results(self, other_result): + self.results['frames'] += other_result['frames'] class TestAnalysisBase(object): def setUp(self): # has 98 frames self.u = mda.Universe(PSF, DCD) + self.frames = len(self.u.trajectory) def tearDown(self): del self.u def test_default(self): - an = FrameAnalysis(self.u.trajectory) - assert_(an.nframes == len(self.u.trajectory)) - + an = FrameAnalysis(self.u) + assert_(an.nframes == self.frames) an.run() - assert_(an.frames == range(len(self.u.trajectory))) + assert_(an.results['frames'] == range(self.frames)) + + for cores in range(1, cpu_count() + 1): + an_par = FrameAnalysis(self.u) + assert_(an_par.nframes == self.frames) + an_par.run(parallel=True, nthreads=cores) + assert_(an_par.results['frames'] == range(self.frames)) def test_start(self): - an = FrameAnalysis(self.u.trajectory, start=20) - assert_(an.nframes == len(self.u.trajectory) - 20) + an = FrameAnalysis(self.u, start=20) + assert_(an.nframes == self.frames - 20) an.run() - assert_(an.frames == range(20, len(self.u.trajectory))) + assert_(an.results['frames'] == range(20, self.frames)) + + for cores in range(1, cpu_count() + 1): + an_par = FrameAnalysis(self.u, start=20) + assert_(an_par.nframes == self.frames - 20) + an_par.run(parallel=True, nthreads=cores) + assert_(an_par.results['frames'] == range(20, self.frames)) def test_stop(self): - an = FrameAnalysis(self.u.trajectory, stop=20) + an = FrameAnalysis(self.u, stop=20) assert_(an.nframes == 20) an.run() - assert_(an.frames == range(20)) + assert_(an.results['frames'] == range(20)) + + for cores in range(1, cpu_count() + 1): + an_par = FrameAnalysis(self.u, stop=20) + assert_(an_par.nframes == 20) + an_par.run(parallel=True, nthreads=cores) + assert_(an_par.results['frames'] == range(20)) def test_step(self): - an = FrameAnalysis(self.u.trajectory, step=20) + an = FrameAnalysis(self.u, step=20) assert_(an.nframes == 5) an.run() - assert_(an.frames == range(98)[::20]) + assert_(an.results['frames'] == range(98)[::20]) + + for cores in range(1, cpu_count() + 1): + an_par = FrameAnalysis(self.u, step=20) + assert_(an_par.nframes == 5) + an_par.run(parallel=True, nthreads=cores) + assert_(an_par.results['frames'] == range(98)[::20]) diff --git a/testsuite/MDAnalysisTests/test_parallel_jobs.py b/testsuite/MDAnalysisTests/test_parallel_jobs.py new file mode 100644 index 00000000000..6b22784b8d4 --- /dev/null +++ b/testsuite/MDAnalysisTests/test_parallel_jobs.py @@ -0,0 +1,29 @@ +import MDAnalysis as mda +import numpy as np +import MDAnalysis.analysis.parallel_jobs as pj +import MDAnalysis.analysis.electromagnetism as em + +from numpy.testing import * + +from MDAnalysisTests.datafiles import (DCD, PSF) + +class TestParallel(TestCase): + def setUp(self): + self.universe = mda.Universe(PSF, DCD) + self.selection_string = 'all' + + # Single thread analysis + single_analysis = em.TotalDipole(universe=self.universe, selection=self.selection_string) + self.single_result = single_analysis.run() + + def test_parallel(self): + jobs = [em.TotalDipole(selection=self.selection_string)] + process = pj.ParallelProcessor(jobs,self.universe) + assert_equal(self.single_result, process.parallel_run()) + + def test_parallel_base(self): + single_analysis = em.TotalDipole(universe=self.universe, selection=self.selection_string) + assert_equal(self.single_result, single_analysis.run(parallel=True)) + + def tearDown(self): + del self.universe