diff --git a/asv/benchmarks.py b/asv/benchmarks.py index e02f0261f..e58d992a8 100644 --- a/asv/benchmarks.py +++ b/asv/benchmarks.py @@ -73,6 +73,41 @@ def __init__(self, conf, benchmarks, regex=None): if not regex or any(re.search(reg, benchmark['name']) for reg in regex): self[benchmark['name']] = benchmark + @property + def benchmark_selection(self): + """ + Active sets of parameterized benchmarks. + """ + return self._benchmark_selection + + @property + def benchmark_dir(self): + """ + Benchmark directory. + """ + return self._benchmark_dir + + def filter_out(self, skip): + """ + Return a new Benchmarks object, with some benchmarks filtered out. + """ + benchmarks = super(Benchmarks, self).__new__(self.__class__) + benchmarks._conf = self._conf + benchmarks._benchmark_dir = self._benchmark_dir + benchmarks._all_benchmarks = self._all_benchmarks + + selected_idx = {} + + for name, benchmark in six.iteritems(self): + if name not in skip: + benchmarks[name] = benchmark + if name in self._benchmark_selection: + selected_idx[name] = self._benchmark_selection[name] + + benchmarks._benchmark_selection = selected_idx + + return benchmarks + @classmethod def discover(cls, conf, repo, environments, commit_hash, regex=None): """ @@ -248,7 +283,7 @@ def load(cls, conf): "regenerate benchmarks.json".format(str(err))) def run_benchmarks(self, env, show_stderr=False, quick=False, profile=False, - skip=None, extra_params=None): + extra_params=None): """ Run all of the benchmarks in the given `Environment`. @@ -270,9 +305,6 @@ def run_benchmarks(self, env, show_stderr=False, quick=False, profile=False, When `True`, run the benchmark through the `cProfile` profiler. - skip : set, optional - Benchmark names to skip. - extra_params : dict, optional Override values for benchmark attributes. @@ -298,29 +330,12 @@ def run_benchmarks(self, env, show_stderr=False, quick=False, profile=False, - `profile`: If `profile` is `True`, this key will exist, and be a byte string containing the cProfile data. """ - log.info("Benchmarking {0}".format(env.name)) - - benchmarks = sorted(list(six.iteritems(self))) - - # Remove skipped benchmarks - if skip: - benchmarks = [ - (name, benchmark) for (name, benchmark) in - benchmarks if name not in skip] - # Setup runner and run benchmarks - times = {} - benchmark_runner = runner.BenchmarkRunner(benchmarks, - self._benchmark_dir, - show_stderr=show_stderr, - quick=quick, - extra_params=extra_params, - profile=profile, - selected_idx=self._benchmark_selection) - jobs = benchmark_runner.plan() - times = benchmark_runner.run(jobs, env) - - return times + return runner.BenchmarkRunner(self, + show_stderr=show_stderr, + quick=quick, + extra_params=extra_params, + profile=profile).run(env) def skip_benchmarks(self, env): """ @@ -329,14 +344,10 @@ def skip_benchmarks(self, env): log.warn("Skipping {0}".format(env.name)) with log.indent(): times = {} - for name in self: + for name, benchmark in six.iteritems(self): log.step() log.warn('Benchmark {0} skipped'.format(name)) - timestamp = datetime.datetime.utcnow() - times[name] = {'result': None, - 'samples': None, - 'stats': None, - 'params': [], - 'started_at': timestamp, - 'ended_at': timestamp} + times[name] = runner.get_failed_benchmark_result( + name, benchmark, self._benchmark_selection.get(name)) + return times diff --git a/asv/commands/find.py b/asv/commands/find.py index 4dadfbc3e..eb6b682a3 100644 --- a/asv/commands/find.py +++ b/asv/commands/find.py @@ -123,7 +123,7 @@ def do_benchmark(i): env.install_project(conf, repo, commit_hash) x = benchmarks.run_benchmarks( env, show_stderr=show_stderr) - result = list(x.values())[0]['result'] + result = list(x.values())[0].result results[i] = result diff --git a/asv/commands/profiling.py b/asv/commands/profiling.py index bdba84674..337f2e94f 100644 --- a/asv/commands/profiling.py +++ b/asv/commands/profiling.py @@ -185,7 +185,7 @@ def run(cls, conf, benchmark, revision=None, gui=None, output=None, results = benchmarks.run_benchmarks( env, show_stderr=True, quick=False, profile=True) - profile_data = results[benchmark]['profile'] + profile_data = results[benchmark].profile if gui is not None: log.debug("Opening gui {0}".format(gui)) diff --git a/asv/commands/run.py b/asv/commands/run.py index 65d42dd01..8dde0a3fc 100644 --- a/asv/commands/run.py +++ b/asv/commands/run.py @@ -287,13 +287,14 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para params['python'] = env.python params.update(env.requirements) + benchmark_set = benchmarks.filter_out(skipped_benchmarks[env.name]) + if success: - results = benchmarks.run_benchmarks( + results = benchmark_set.run_benchmarks( env, show_stderr=show_stderr, quick=quick, - profile=profile, skip=skipped_benchmarks[env.name], - extra_params=attribute) + profile=profile, extra_params=attribute) else: - results = benchmarks.skip_benchmarks(env) + results = benchmark_set.skip_benchmarks(env) if dry_run or isinstance(env, environment.ExistingEnvironment): continue @@ -307,10 +308,8 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para env.name) for benchmark_name, d in six.iteritems(results): - if not record_samples: - d['samples'] = None - benchmark_version = benchmarks[benchmark_name]['version'] - result.add_result(benchmark_name, d, benchmark_version) + result.add_result(benchmark_name, d, benchmark_version, + record_samples=record_samples) result.update_save(conf.results_dir) diff --git a/asv/console.py b/asv/console.py index e8cf11e74..b573bd03f 100644 --- a/asv/console.py +++ b/asv/console.py @@ -21,6 +21,9 @@ import six from six.moves import xrange, input +from . import util + + WIN = (os.name == "nt") @@ -278,7 +281,9 @@ def _stream_formatter(self, record): color_print('ยท' * self._indent, end='') color_print(' ', end='') - if record.levelno < logging.DEBUG: + if hasattr(record, 'color'): + color = record.color + elif record.levelno < logging.DEBUG: color = 'default' elif record.levelno < logging.INFO: color = 'default' @@ -325,17 +330,30 @@ def dot(self): def set_nitems(self, n): """ - Set the number of items in a lengthy process. Each of these + Set the number of remaining items to process. Each of these steps should be incremented through using `step`. + + Can be called multiple times. The progress percentage is ensured + to be non-decreasing, except if 100% was already reached in which + case it is restarted from 0%. """ - self._total = n + try: + # Ensure count/total is nondecreasing + self._total = util.ceildiv(n * self._total, self._total - self._count) + self._count = self._total - n + except ZeroDivisionError: + # Reset counting from start + self._total = n + self._count = 0 def step(self): """ Write that a step has been completed. A percentage is displayed along with it. + + If we are stepping beyond the number of items, stop counting. """ - self._count += 1 + self._count = min(self._total, self._count + 1) def enable(self, verbose=False): sh = logging.StreamHandler() @@ -358,21 +376,54 @@ def set_level(self, level): def is_debug_enabled(self): return self._logger.getEffectiveLevel() <= logging.DEBUG + def _message(self, routine, message, reserve_space=False, color=None): + kwargs = {} + if color is not None: + kwargs['extra'] = dict(color=color) + + if reserve_space: + max_width = max(16, util.get_terminal_width() - 33) + message = truncate_left(message, max_width) + self._prev_message = message + + routine(message, **kwargs) + def info(self, *args, **kwargs): - self._logger.info(*args, **kwargs) + self._message(self._logger.info, *args, **kwargs) def warn(self, *args, **kwargs): - self._logger.warn(*args, **kwargs) + self._message(self._logger.warn, *args, **kwargs) def debug(self, *args, **kwargs): - self._logger.debug(*args, **kwargs) + self._message(self._logger.debug, *args, **kwargs) def error(self, *args, **kwargs): - self._logger.error(*args, **kwargs) + self._message(self._logger.error, *args, **kwargs) def add(self, msg): - _write_with_fallback(msg, sys.stdout.write, sys.stdout) - sys.stdout.flush() + if self._needs_newline: + _write_with_fallback(msg, sys.stdout.write, sys.stdout) + sys.stdout.flush() + else: + self.info(msg) + + def add_padded(self, msg): + """ + Final part of two-part info message. + Should be preceded by a call to info/warn/...(msg, reserve_space=True) + """ + if self._prev_message is None: + # No previous part: print as an info message + self.info(msg) + return + + padding_length = util.get_terminal_width() - len(self._prev_message) - 14 - 1 - len(msg) + if WIN: + padding_length -= 1 + padding = " "*padding_length + + self._prev_message = None + self.add(" {0}{1}".format(padding, msg)) def flush(self): """ @@ -384,4 +435,5 @@ def flush(self): self._needs_newline = False sys.stdout.flush() + log = Log() diff --git a/asv/results.py b/asv/results.py index ae626d158..396eedced 100644 --- a/asv/results.py +++ b/asv/results.py @@ -381,7 +381,8 @@ def remove_result(self, key): # Remove version (may be missing) self._benchmark_version.pop(key, None) - def add_result(self, benchmark_name, result, benchmark_version): + def add_result(self, benchmark_name, result, benchmark_version, + record_samples=False): """ Add benchmark result. @@ -390,21 +391,23 @@ def add_result(self, benchmark_name, result, benchmark_version): benchmark_name : str Name of benchmark - result : dict - Result of the benchmark, as returned by `benchmarks.run_benchmark`. + result : runner.BenchmarkResult + Result of the benchmark. """ - self._results[benchmark_name] = result['result'] - self._samples[benchmark_name] = result['samples'] - self._stats[benchmark_name] = result['stats'] - self._benchmark_params[benchmark_name] = result['params'] - self._started_at[benchmark_name] = util.datetime_to_js_timestamp(result['started_at']) - self._ended_at[benchmark_name] = util.datetime_to_js_timestamp(result['ended_at']) + self._results[benchmark_name] = result.result + if record_samples: + self._samples[benchmark_name] = result.samples + else: + self._samples[benchmark_name] = None + self._stats[benchmark_name] = result.stats + self._benchmark_params[benchmark_name] = result.params + self._started_at[benchmark_name] = util.datetime_to_js_timestamp(result.started_at) + self._ended_at[benchmark_name] = util.datetime_to_js_timestamp(result.ended_at) self._benchmark_version[benchmark_name] = benchmark_version - if 'profile' in result and result['profile']: - profile_data = base64.b64encode( - zlib.compress(result['profile'])) + if result.profile: + profile_data = base64.b64encode(zlib.compress(result.profile)) if sys.version_info[0] >= 3: profile_data = profile_data.decode('ascii') self._profiles[benchmark_name] = profile_data diff --git a/asv/runner.py b/asv/runner.py index 64e100492..2aa29033b 100644 --- a/asv/runner.py +++ b/asv/runner.py @@ -10,6 +10,7 @@ import re import sys import shutil +import time import tempfile import itertools import datetime @@ -31,6 +32,71 @@ os.path.dirname(os.path.abspath(__file__)), "benchmark.py") +JSON_ERROR_RETCODE = -257 + + +BenchmarkResult = util.namedtuple_with_doc( + 'BenchmarkResult', + ['result', 'samples', 'stats', 'params', 'errcode', 'stderr', 'profile', + 'started_at', 'ended_at'], + """ + Postprocessed benchmark result + + Attributes + ---------- + result : {list of object, None} + List of numeric values of the benchmarks (one for each parameter + combination), either returned directly or obtained from `samples` via + statistical analysis. + Values are `None` if benchmark failed or NaN if it was skipped. + The whole value can be None if benchmark could not be run at all. + samples : {list of float, None} + List of lists of sampled raw data points, if benchmark produces + those and was successful. None if no data. + stats : {list of dict, None} + List of results of statistical analysis of data. + params : list + Same as `benchmark['params']`. Empty list if non-parameterized. + errcode : int + Process exit code + stderr : str + Process stdout/stderr output + profile : bytes + If `profile` is `True` and run was at least partially successful, + this key will be a byte string containing the cProfile data. + Otherwise, None. + started_at : datetime.datetime + Benchmark start time + ended_at : datetime.datetime + Benchmark end time + """) + + +RawBenchmarkResult = util.namedtuple_with_doc( + 'RawBenchmarkResult', + ['result', 'samples', 'number', 'errcode', 'stderr', 'profile'], + """ + Unprocessed benchmark result for a single run + + Attributes + ---------- + result : object + Benchmark result (None indicates failure) + samples : {list of float, None} + Benchmark measurement samples (or None, + if not applicable) + number : {int, None} + Compute benchmark 'number' attribute (if + applicable) + errcode : int + Process exit code + stderr : str + Process stdout/stderr output + profile : bytes + Profile data + """) + + class BenchmarkRunner(object): """ Control and plan running of a set of benchmarks. @@ -46,25 +112,26 @@ class BenchmarkRunner(object): which the `run` method then runs. """ - def __init__(self, benchmarks, benchmark_dir, show_stderr=False, quick=False, - profile=False, extra_params=None, selected_idx=None): + def __init__(self, benchmarks, show_stderr=False, quick=False, + profile=False, extra_params=None): """ Initialize BenchmarkRunner. Parameters ---------- - benchmarks : dict, {benchmark_name: Benchmark} + benchmarks : Benchmarks Set of benchmarks to run. - benchmark_dir : str - Root directory for the benchmark suite. - show_stderr : bool + show_stderr : bool, optional Whether to dump output stream from benchmark program. - quick : bool + quick : bool, optional Whether to force a 'quick' run. + profile : bool, optional + Whether to run with profiling data collection + extra_params : dict, optional + Attribute overrides for benchmarks. """ self.benchmarks = benchmarks - self.benchmark_dir = benchmark_dir self.show_stderr = show_stderr self.quick = quick self.profile = profile @@ -72,10 +139,6 @@ def __init__(self, benchmarks, benchmark_dir, show_stderr=False, quick=False, self.extra_params = {} else: self.extra_params = dict(extra_params) - if selected_idx is None: - selected_idx = {} - else: - self.selected_idx = selected_idx if quick: self.extra_params['number'] = 1 @@ -83,106 +146,126 @@ def __init__(self, benchmarks, benchmark_dir, show_stderr=False, quick=False, self.extra_params['warmup_time'] = 0 self.extra_params['processes'] = 1 + def _get_processes(self, benchmark): + """Get number of processes to use for a job""" + if 'processes' in self.extra_params: + return int(self.extra_params['processes']) + else: + return int(benchmark.get('processes', 1)) + def plan(self): + """ + Compute required Job objects + + Yields + ------ + job : *Job + A job object to run. + """ # Find all setup_cache routines needed - setup_caches = {} setup_cache_timeout = {} + benchmark_order = {} + cache_users = {} + max_processes = 0 - for name, benchmark in self.benchmarks: + for name, benchmark in sorted(six.iteritems(self.benchmarks)): key = benchmark.get('setup_cache_key') setup_cache_timeout[key] = max(benchmark.get('setup_cache_timeout', benchmark['timeout']), setup_cache_timeout.get(key, 0)) + benchmark_order.setdefault(key, []).append((name, benchmark)) + max_processes = max(max_processes, self._get_processes(benchmark)) + cache_users.setdefault(key, set()).add(name) # Interleave benchmark runs, in setup_cache order - jobs = [] - - insert_stack = [] - benchmark_order = {} - cache_users = {} + def iter_run_items(): + for run_round in range(max_processes): + for setup_cache_key, benchmark_set in six.iteritems(benchmark_order): + for name, benchmark in benchmark_set: + processes = self._get_processes(benchmark) + if run_round >= processes: + continue + is_final = (run_round + 1 >= processes) + yield name, benchmark, setup_cache_key, is_final + + # Produce job objects setup_cache_jobs = {None: None} prev_runs = {} - for name, benchmark in self.benchmarks: - key = benchmark.get('setup_cache_key') - benchmark_order.setdefault(key, []).append((name, benchmark)) - - for setup_cache_key, benchmark_set in six.iteritems(benchmark_order): - for name, benchmark in benchmark_set: - if 'processes' in self.extra_params: - processes = int(self.extra_params['processes']) - else: - processes = int(benchmark.get('processes', 1)) - insert_stack.append((name, benchmark, processes, setup_cache_key)) - cache_users.setdefault(setup_cache_key, []).append(name) - - while insert_stack: - name, benchmark, processes, setup_cache_key = insert_stack.pop(0) - + for name, benchmark, setup_cache_key, is_final in iter_run_items(): # Setup cache first, if needed if setup_cache_key is None: setup_cache_job = None elif setup_cache_key in setup_cache_jobs: setup_cache_job = setup_cache_jobs[setup_cache_key] else: - setup_cache_job = SetupCacheJob(self.benchmark_dir, + setup_cache_job = SetupCacheJob(self.benchmarks.benchmark_dir, name, setup_cache_key, setup_cache_timeout[setup_cache_key]) setup_cache_jobs[setup_cache_key] = setup_cache_job - jobs.append(setup_cache_job) + yield setup_cache_job # Run benchmark prev_job = prev_runs.get(name, None) - job = LaunchBenchmarkJob(name, benchmark, self.benchmark_dir, + job = LaunchBenchmarkJob(name, benchmark, self.benchmarks.benchmark_dir, self.profile, self.extra_params, cache_job=setup_cache_job, prev_job=prev_job, - partial=(processes > 1), - selected_idx=self.selected_idx.get(name)) - prev_runs[name] = job - jobs.append(job) - - # Interleave remaining runs - if processes > 1: - insert_stack.append((name, benchmark, processes - 1, setup_cache_key)) + partial=not is_final, + selected_idx=self.benchmarks.benchmark_selection.get(name)) + if self._get_processes(benchmark) > 1: + prev_runs[name] = job + yield job # Cleanup setup cache, if no users left - if setup_cache_job is not None and processes == 1: + if setup_cache_job is not None and is_final: cache_users[setup_cache_key].remove(name) if not cache_users[setup_cache_key]: # No users of this cache left, perform cleanup - job = SetupCacheCleanupJob(setup_cache_job) - jobs.append(job) + yield SetupCacheCleanupJob(setup_cache_job) + del setup_cache_jobs[setup_cache_key] del cache_users[setup_cache_key] - return jobs + # Cleanup any dangling caches + for job in setup_cache_jobs.values(): + if job is not None: + yield SetupCacheCleanupJob(job) - def run(self, jobs, env): + def run(self, env): times = {} + jobs = self.plan() + name_max_width = max(16, util.get_terminal_width() - 33) partial_info_printed = False + log.info("Benchmarking {0}".format(env.name)) + try: with log.indent(): + prev_run_info_time = time.time() + for job in jobs: short_name = truncate_left(job.name, name_max_width) if isinstance(job, SetupCacheJob): partial_info_printed = False - self._log_initial("Setting up {0}".format(short_name)) + log.info("Setting up {0}".format(short_name)) job.run(env) - self._log_cache_result(job) elif isinstance(job, LaunchBenchmarkJob): if job.partial: + if time.time() > prev_run_info_time + 30: + partial_info_printed = False + if partial_info_printed: log.add(".") else: - self._log_initial('Running benchmarks...') + log.info('Running ({0}--)'.format(short_name)) + prev_run_info_time = time.time() partial_info_printed = True else: log.step() - self._log_initial('{0}'.format(short_name)) + log.info(short_name, reserve_space=True) partial_info_printed = False job.run(env) self._log_benchmark_result(job) @@ -198,68 +281,55 @@ def run(self, jobs, env): return times - def _log_initial(self, msg): - self._initial_message = msg - log.info(msg) - - def _log_result(self, msg): - assert self._initial_message is not None - padding_length = util.get_terminal_width() - len(self._initial_message) - 14 - 1 - len(msg) - self._initial_message = None - if WIN: - padding_length -= 1 - padding = " "*padding_length - log.add(" {0}{1}".format(padding, msg)) - - def _log_cache_result(self, item): - pass - def _log_benchmark_result(self, job): if job.partial: return + if job.result.result is None: + total_count = 1 + failure_count = 1 + else: + total_count = len(job.result.result) + failure_count = sum(r is None for r in job.result.result) + # Display status - if job.failure_count > 0: - if job.bad_output is None: - if job.failure_count == job.total_count: - self._log_result("failed") - else: - self._log_result("{0}/{1} failed".format(job.failure_count, - job.total_count)) + if failure_count > 0: + if failure_count == total_count: + log.add_padded("failed") else: - self._log_result("invalid output") - with log.indent(): - log.debug(job.bad_output) + log.add_padded("{0}/{1} failed".format(failure_count, + total_count)) # Display results if job.benchmark['params'] and self.show_stderr: # Long format display - if job.failure_count == 0: - self._log_result("ok") + if failure_count == 0: + log.add_padded("ok") display_result = [(v, statistics.get_err(v, s) if s is not None else None) - for v, s in zip(job.result['result'], job.result['stats'])] + for v, s in zip(job.result.result, job.result.stats)] display = _format_benchmark_result(display_result, job.benchmark) - log.info("\n" + "\n".join(display)) + display = "\n".join(display).strip() + log.info(display, color='default') else: - if job.failure_count == 0: + if failure_count == 0: # Failure already shown above - if not job.result['result']: + if not job.result.result: display = "[]" else: - if job.result['stats'][0]: - err = statistics.get_err(job.result['result'][0], job.result['stats'][0]) + if job.result.stats[0]: + err = statistics.get_err(job.result.result[0], job.result.stats[0]) else: err = None - display = util.human_value(job.result['result'][0], job.benchmark['unit'], err=err) - if len(job.result['result']) > 1: + display = util.human_value(job.result.result[0], job.benchmark['unit'], err=err) + if len(job.result.result) > 1: display += ";..." - self._log_result(display) + log.add_padded(display) # Dump program output - if self.show_stderr and job.result.get('stderr'): + if self.show_stderr and job.result.stderr: with log.indent(): - log.error(job.result['stderr']) + log.error(job.result.stderr) class LaunchBenchmarkJob(object): @@ -268,31 +338,9 @@ class LaunchBenchmarkJob(object): Attributes ---------- - result : dict - Present after completing the job (successfully or unsuccessfully). - A dictionary with the following keys: - - - `result`: List of numeric values of the benchmarks (one for each parameter - combination), either returned directly or obtained from `samples` via - statistical analysis. - - Values are `None` if benchmark failed or NaN if it was skipped. - - If benchmark is not parameterized, the list contains a single number. - - - `params`: Same as `benchmark['params']`. Empty list if non-parameterized. + result : {BenchmarkResult, None} + Job result (None if job was not yet run). - - `samples`: List of lists of sampled raw data points, if benchmark produces - those and was successful. - - - `stats`: List of results of statistical analysis of data. - - - `profile`: If `profile` is `True` and run was at least partially successful, - this key will be a byte string containing the cProfile data. Otherwise, None. - - - `stderr`: Output produced. - - - `errcode`: Error return code. """ def __init__(self, name, benchmark, benchmark_dir, profile=False, extra_params=None, @@ -339,9 +387,6 @@ def __init__(self, name, benchmark, benchmark_dir, profile=False, extra_params=N self.selected_idx = selected_idx self.result = None - self.bad_output = None - self.failure_count = 0 - self.total_count = 0 def __repr__(self): return "".format(self.name, id(self)) @@ -349,140 +394,96 @@ def __repr__(self): def run(self, env): if self.cache_job is not None and self.cache_job.cache_dir is None: # Our setup_cache failed, so skip this job - timestamp = datetime.datetime.utcnow() - self.result = {'result': None, - 'samples': None, - 'stats': None, - 'params': [], - 'stderr': self.cache_job.stderr, - 'started_at': timestamp, - 'ended_at': timestamp} + self.result = get_failed_benchmark_result( + self.name, self.benchmark, self.selected_idx, + stderr=self.cache_job.stderr, + errcode=self.cache_job.errcode) return - if self.prev_job is not None and self.prev_job.result['result'] is None: + if self.prev_job is not None and self.prev_job.result.result is None: # Previous job in a multi-process benchmark failed, so skip this job self.result = self.prev_job.result return - result = {"stderr": "", "errcode": 0} + if self.cache_job: + cache_dir = self.cache_job.cache_dir + else: + cache_dir = None + + started_at = datetime.datetime.utcnow() - extra_params = dict(self.extra_params) + result = [] + samples = [] + stats = [] + profiles = [] + stderr = '' + errcode = 0 if self.benchmark['params']: param_iter = enumerate(itertools.product(*self.benchmark['params'])) else: - param_iter = [(None, None)] - - self.bad_output = None - - result['started_at'] = datetime.datetime.utcnow() - - bench_results = [] - bench_profiles = [] + param_iter = [(0, None)] for param_idx, params in param_iter: if (self.selected_idx is not None and self.benchmark['params'] and param_idx not in self.selected_idx): # Use NaN to mark the result as skipped - bench_results.append(dict(samples=None, result=float('nan'), - stats=None)) - bench_profiles.append(None) + result.append(util.nan) + samples.append(None) + stats.append(None) + profiles.append(None) continue + cur_extra_params = dict(self.extra_params) + if self.prev_job: - idx = param_idx - if idx is None: - idx = 0 - prev_stats = self.prev_job.result['stats'][idx] + prev_stats = self.prev_job.result.stats[param_idx] if prev_stats is not None: - extra_params['number'] = prev_stats['number'] - prev_samples = self.prev_job.result['samples'][idx] + cur_extra_params['number'] = prev_stats['number'] + prev_samples = self.prev_job.result.samples[param_idx] else: prev_samples = None - if self.cache_job is None: - cwd = tempfile.mkdtemp() - else: - cwd = self.cache_job.cache_dir + res = run_benchmark_single( + self.benchmark, self.benchmark_dir, env, param_idx, + extra_params=cur_extra_params, profile=self.profile, + cwd=cache_dir) - try: - success, data, profile_data, err, out, errcode = \ - run_benchmark_single( - self.benchmark, self.benchmark_dir, env, param_idx, - extra_params=extra_params, profile=self.profile, - cwd=cwd) - finally: - if self.cache_job is None: - shutil.rmtree(cwd, True) - - self.total_count += 1 - if success: - if isinstance(data, dict) and 'samples' in data: - if prev_samples is not None: - # Combine samples - data['samples'] = prev_samples + data['samples'] - - value, stats = statistics.compute_stats(data['samples'], - data['number']) - result_data = dict(samples=data['samples'], - result=value, - stats=stats) + if res.samples is not None: + # Compute statistics + if prev_samples is not None: + cur_samples = prev_samples + res.samples else: - result_data = dict(samples=None, - result=data, - stats=None) - - bench_results.append(result_data) - if self.profile: - bench_profiles.append(profile_data) + cur_samples = res.samples + r, s = statistics.compute_stats(cur_samples, res.number) + result.append(r) + stats.append(s) else: - self.failure_count += 1 - bench_results.append(dict(samples=None, result=None, stats=None)) - bench_profiles.append(None) - if data is not None: - self.bad_output = data - - err = err.strip() - out = out.strip() - - if errcode: - if errcode == util.TIMEOUT_RETCODE: - if err: - err += "\n\n" - err += "asv: benchmark timed out (timeout {0}s)\n".format(self.benchmark['timeout']) - result['errcode'] = errcode - - if err or out: - err += out - if self.benchmark['params']: - head_msg = "\n\nFor parameters: %s\n" % (", ".join(params),) - else: - head_msg = '' + result.append(res.result) + stats.append(None) - result['stderr'] += head_msg - result['stderr'] += err + samples.append(res.samples) + profiles.append(res.profile) - # Produce result - for key in ['samples', 'result', 'stats']: - result[key] = [x[key] for x in bench_results] + if res.stderr: + stderr += "\n\n" + stderr += res.stderr - if self.benchmark['params']: - result['params'] = self.benchmark['params'] - else: - result['params'] = [] + if res.errcode != 0: + errcode = res.errcode - # Combine profile data - if self.prev_job and 'profile' in self.prev_job.result: - bench_profiles.append(self.prev_job.result['profile']) - - profile_data = _combine_profile_data(bench_profiles) - if profile_data is not None: - result['profile'] = profile_data - - result['ended_at'] = datetime.datetime.utcnow() - - self.result = result + self.result = BenchmarkResult( + result=result, + samples=samples, + stats=stats, + params=self.benchmark['params'] if self.benchmark['params'] else [], + errcode=errcode, + stderr=stderr.strip(), + profile=_combine_profile_data(profiles), + started_at=started_at, + ended_at=datetime.datetime.utcnow() + ) class SetupCacheJob(object): @@ -518,13 +519,16 @@ def run(self, env): self.benchmark_id], dots=False, display_error=False, return_stderr=True, valid_return_codes=None, + redirect_stderr=True, cwd=cache_dir, timeout=self.timeout) + self.errcode = errcode + if errcode == 0: self.stderr = None self.cache_dir = cache_dir else: - self.stderr = err + self.stderr = out self.clean() def clean(self): @@ -548,29 +552,60 @@ def run(self, env): self.cache_job.clean() -def run_benchmark_single(benchmark, root, env, param_idx, profile, extra_params, cwd): +def get_failed_benchmark_result(name, benchmark, selected_idx=None, + stderr='', errcode=1): + timestamp = datetime.datetime.utcnow() + + if benchmark['params'] and selected_idx is not None: + # Mark only selected parameter combinations skipped + params = itertools.product(*benchmark['params']) + result = [None if idx in selected_idx else util.nan + for idx, _ in enumerate(params)] + else: + result = None + + return BenchmarkResult(result=result, + samples=None, + stats=None, + params=benchmark['params'], + errcode=errcode, + stderr=stderr, + profile=None, + started_at=timestamp, + ended_at=timestamp) + + +def run_benchmark_single(benchmark, benchmark_dir, env, param_idx, profile, extra_params, cwd): """ Run a benchmark, for single parameter combination index in case it is parameterized + Parameters + ---------- + benchmark : dict + Benchmark object dict + benchmark_dir : str + Benchmark directory root + env : Environment + Environment to run in + param_idx : {int, None} + Parameter index to run benchmark for + profile : bool + Whether to run with profile + extra_params : dict + Additional parameters to pass to the benchmark + cwd : {str, None} + Working directory to run the benchmark in. + If None, run in a temporary directory. + Returns ------- - success : bool - Whether test was successful - data - If success, the parsed JSON data. If failure, unparsed json data. - profile_data - Collected profiler data - err - Stderr content - out - Stdout content - errcode - Process return value + result : RawBenchmarkResult + Result data. """ name = benchmark['name'] - if param_idx is not None: + if benchmark['params']: name += '-%d' % (param_idx,) if profile: @@ -579,45 +614,78 @@ def run_benchmark_single(benchmark, root, env, param_idx, profile, extra_params, else: profile_path = 'None' + params_str = json.dumps(extra_params) + + if cwd is None: + real_cwd = tempfile.mkdtemp() + else: + real_cwd = cwd + result_file = tempfile.NamedTemporaryFile(delete=False) try: result_file.close() - success = True - params_str = json.dumps(extra_params) - - out, err, errcode = env.run( - [BENCHMARK_RUN_SCRIPT, 'run', os.path.abspath(root), + out, _, errcode = env.run( + [BENCHMARK_RUN_SCRIPT, 'run', os.path.abspath(benchmark_dir), name, params_str, profile_path, result_file.name], dots=False, timeout=benchmark['timeout'], - display_error=False, return_stderr=True, - valid_return_codes=None, cwd=cwd) + display_error=False, return_stderr=True, redirect_stderr=True, + valid_return_codes=None, cwd=real_cwd) - if errcode: - success = False - parsed = None + if errcode != 0: + if errcode == util.TIMEOUT_RETCODE: + out += "\n\nasv: benchmark timed out (timeout {0}s)\n".format(benchmark['timeout']) + + result = None + samples = None + number = None else: with open(result_file.name, 'r') as stream: data = stream.read() + try: - parsed = json.loads(data) - except: - success = False - parsed = data + data = json.loads(data) + except ValueError as exc: + data = None + errcode = JSON_ERROR_RETCODE + out += "\n\nasv: failed to parse benchmark result: {0}\n".format(exc) + + # Special parsing for timing benchmark results + if isinstance(data, dict) and 'samples' in data and 'number' in data: + result = None + samples = data['samples'] + number = data['number'] + else: + result = data + samples = None + number = None + + if benchmark['params'] and out: + params, = itertools.islice(itertools.product(*benchmark['params']), + param_idx, param_idx + 1) + out = "For parameters: {0}\n{1}".format(", ".join(params), out) if profile: with io.open(profile_path, 'rb') as profile_fd: profile_data = profile_fd.read() - if not profile_data: - profile_data = None + profile_data = profile_data if profile_data else None else: profile_data = None - return success, parsed, profile_data, err, out, errcode + return RawBenchmarkResult( + result=result, + samples=samples, + number=number, + errcode=errcode, + stderr=out.strip(), + profile=profile_data) + finally: os.remove(result_file.name) if profile: os.remove(profile_path) + if cwd is None: + util.long_path_rmtree(real_cwd, True) def _combine_profile_data(datasets): diff --git a/asv/util.py b/asv/util.py index 036de4122..787fd152b 100644 --- a/asv/util.py +++ b/asv/util.py @@ -24,11 +24,11 @@ import shutil import stat import operator +import collections import six from six.moves import xrange -from .console import log from .extern import minify_json @@ -363,7 +363,7 @@ def check_call(args, valid_return_codes=(0,), timeout=600, dots=True, def check_output(args, valid_return_codes=(0,), timeout=600, dots=True, display_error=True, shell=False, return_stderr=False, - env=None, cwd=None): + env=None, cwd=None, redirect_stderr=False): """ Runs the given command in a subprocess, raising ProcessError if it fails. Returns stdout as a string on success. @@ -403,7 +403,18 @@ def check_output(args, valid_return_codes=(0,), timeout=600, dots=True, cwd : str, optional Specify the current working directory to use when running the process. + + redirect_stderr : bool, optional + Whether to redirect stderr to stdout. In this case the returned + ``stderr`` (when return_stderr == True) is an empty string. + + Returns + ------- + stdout, stderr, retcode : when return_stderr == True + stdout : otherwise """ + from .console import log + # Hide traceback from expected exceptions in pytest reports __tracebackhide__ = operator.methodcaller('errisinstance', ProcessError) @@ -411,13 +422,18 @@ def get_content(header=None): content = [] if header is not None: content.append(header) - content.extend([ - 'STDOUT -------->', - stdout[:-1], - 'STDERR -------->', - stderr[:-1] - ]) - + if redirect_stderr: + content.extend([ + 'OUTPUT -------->', + stdout[:-1] + ]) + else: + content.extend([ + 'STDOUT -------->', + stdout[:-1], + 'STDERR -------->', + stderr[:-1] + ]) return '\n'.join(content) if isinstance(args, six.string_types): @@ -433,6 +449,8 @@ def _fix_env(s): kwargs = dict(shell=shell, env=env, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + if redirect_stderr: + kwargs['stderr'] = subprocess.STDOUT if WIN: kwargs['close_fds'] = False kwargs['creationflags'] = subprocess.CREATE_NEW_PROCESS_GROUP @@ -487,8 +505,9 @@ def watcher_run(): stdout_reader = threading.Thread(target=stdout_reader_run) stdout_reader.start() - stderr_reader = threading.Thread(target=stderr_reader_run) - stderr_reader.start() + if not redirect_stderr: + stderr_reader = threading.Thread(target=stderr_reader_run) + stderr_reader.start() try: proc.wait() @@ -497,11 +516,13 @@ def watcher_run(): proc.terminate() proc.wait() watcher.join() - stderr_reader.join() + if not redirect_stderr: + stderr_reader.join() stdout_reader.join() proc.stdout.close() - proc.stderr.close() + if not redirect_stderr: + proc.stderr.close() is_timeout = was_timeout[0] else: @@ -518,9 +539,10 @@ def sig_forward(signum, frame): signal.signal(signal.SIGCONT, sig_forward) fds = { - proc.stdout.fileno(): stdout_chunks, - proc.stderr.fileno(): stderr_chunks + proc.stdout.fileno(): stdout_chunks } + if not redirect_stderr: + fds[proc.stderr.fileno()] = stderr_chunks while proc.poll() is None: try: @@ -574,13 +596,16 @@ def sig_forward(signum, frame): proc.wait() proc.stdout.flush() - proc.stderr.flush() + if not redirect_stderr: + proc.stderr.flush() stdout_chunks.append(proc.stdout.read()) - stderr_chunks.append(proc.stderr.read()) + if not redirect_stderr: + stderr_chunks.append(proc.stderr.read()) proc.stdout.close() - proc.stderr.close() + if not redirect_stderr: + proc.stderr.close() stdout = b''.join(stdout_chunks) stderr = b''.join(stderr_chunks) @@ -1011,6 +1036,11 @@ def geom_mean_na(values): return None +def ceildiv(numerator, denominator): + """Ceiling division""" + return -((-numerator)//denominator) + + if not WIN: long_path_open = open long_path_rmtree = shutil.rmtree @@ -1073,3 +1103,12 @@ def sanitize_filename(filename): filename = filename + "_" return filename + + +def namedtuple_with_doc(name, slots, doc): + cls = collections.namedtuple(name, slots) + if sys.version_info[0] >= 3: + cls.__doc__ = doc + return cls + else: + return type(str(name), (cls,), {'__doc__': doc}) diff --git a/test/test_benchmarks.py b/test/test_benchmarks.py index 9e8823b6c..4c4084c4c 100644 --- a/test/test_benchmarks.py +++ b/test/test_benchmarks.py @@ -114,81 +114,79 @@ def test_find_benchmarks(tmpdir): assert len(times) == len(b) assert times[ - 'time_examples.TimeSuite.time_example_benchmark_1']['result'] != [None] - assert isinstance(times['time_examples.TimeSuite.time_example_benchmark_1']['stats'][0]['std'], float) + 'time_examples.TimeSuite.time_example_benchmark_1'].result != [None] + assert isinstance(times['time_examples.TimeSuite.time_example_benchmark_1'].stats[0]['std'], float) # The exact number of samples may vary if the calibration is not fully accurate - assert len(times['time_examples.TimeSuite.time_example_benchmark_1']['samples'][0]) >= 4 + assert len(times['time_examples.TimeSuite.time_example_benchmark_1'].samples[0]) >= 4 # Benchmarks that raise exceptions should have a time of "None" assert times[ - 'time_secondary.TimeSecondary.time_exception']['result'] == [None] + 'time_secondary.TimeSecondary.time_exception'].result == [None] assert times[ - 'subdir.time_subdir.time_foo']['result'] != [None] + 'subdir.time_subdir.time_foo'].result != [None] if not ON_PYPY: # XXX: the memory benchmarks don't work on Pypy, since asizeof # is CPython-only assert times[ - 'mem_examples.mem_list']['result'][0] > 1000 + 'mem_examples.mem_list'].result[0] > 1000 assert times[ - 'time_secondary.track_value']['result'] == [42.0] - assert 'profile' in times[ - 'time_secondary.track_value'] - assert 'stderr' in times[ - 'time_examples.time_with_warnings'] - assert times['time_examples.time_with_warnings']['errcode'] != 0 + 'time_secondary.track_value'].result == [42.0] + assert times['time_secondary.track_value'].profile is not None + assert isinstance(times['time_examples.time_with_warnings'].stderr, type('')) + assert times['time_examples.time_with_warnings'].errcode != 0 - assert times['time_examples.TimeWithBadTimer.time_it']['result'] == [0.0] + assert times['time_examples.TimeWithBadTimer.time_it'].result == [0.0] - assert times['params_examples.track_param']['params'] == [["", + assert times['params_examples.track_param'].params == [["", ""]] - assert times['params_examples.track_param']['result'] == [42, 42] + assert times['params_examples.track_param'].result == [42, 42] - assert times['params_examples.mem_param']['params'] == [['10', '20'], ['2', '3']] - assert len(times['params_examples.mem_param']['result']) == 2*2 + assert times['params_examples.mem_param'].params == [['10', '20'], ['2', '3']] + assert len(times['params_examples.mem_param'].result) == 2*2 - assert times['params_examples.ParamSuite.track_value']['params'] == [["'a'", "'b'", "'c'"]] - assert times['params_examples.ParamSuite.track_value']['result'] == [1+0, 2+0, 3+0] + assert times['params_examples.ParamSuite.track_value'].params == [["'a'", "'b'", "'c'"]] + assert times['params_examples.ParamSuite.track_value'].result == [1+0, 2+0, 3+0] - assert isinstance(times['params_examples.TuningTest.time_it']['result'][0], float) - assert isinstance(times['params_examples.TuningTest.time_it']['result'][1], float) + assert isinstance(times['params_examples.TuningTest.time_it'].result[0], float) + assert isinstance(times['params_examples.TuningTest.time_it'].result[1], float) - assert isinstance(times['params_examples.time_skip']['result'][0], float) - assert isinstance(times['params_examples.time_skip']['result'][1], float) - assert util.is_nan(times['params_examples.time_skip']['result'][2]) + assert isinstance(times['params_examples.time_skip'].result[0], float) + assert isinstance(times['params_examples.time_skip'].result[1], float) + assert util.is_nan(times['params_examples.time_skip'].result[2]) - assert times['peakmem_examples.peakmem_list']['result'][0] >= 4 * 2**20 + assert times['peakmem_examples.peakmem_list'].result[0] >= 4 * 2**20 - assert times['cache_examples.ClassLevelSetup.track_example']['result'] == [500] - assert times['cache_examples.ClassLevelSetup.track_example2']['result'] == [500] + assert times['cache_examples.ClassLevelSetup.track_example'].result == [500] + assert times['cache_examples.ClassLevelSetup.track_example2'].result == [500] - assert times['cache_examples.track_cache_foo']['result'] == [42] - assert times['cache_examples.track_cache_bar']['result'] == [12] - assert times['cache_examples.track_my_cache_foo']['result'] == [0] + assert times['cache_examples.track_cache_foo'].result == [42] + assert times['cache_examples.track_cache_bar'].result == [12] + assert times['cache_examples.track_my_cache_foo'].result == [0] - assert times['cache_examples.ClassLevelSetupFail.track_fail']['result'] == None - assert 'raise RuntimeError()' in times['cache_examples.ClassLevelSetupFail.track_fail']['stderr'] + assert times['cache_examples.ClassLevelSetupFail.track_fail'].result == None + assert 'raise RuntimeError()' in times['cache_examples.ClassLevelSetupFail.track_fail'].stderr - assert times['cache_examples.ClassLevelCacheTimeout.track_fail']['result'] == None - assert times['cache_examples.ClassLevelCacheTimeoutSuccess.track_success']['result'] == [0] + assert times['cache_examples.ClassLevelCacheTimeout.track_fail'].result == None + assert times['cache_examples.ClassLevelCacheTimeoutSuccess.track_success'].result == [0] profile_path = join(tmpdir, 'test.profile') with open(profile_path, 'wb') as fd: - fd.write(times['time_secondary.track_value']['profile']) + fd.write(times['time_secondary.track_value'].profile) pstats.Stats(profile_path) # Check for running setup on each repeat (one extra run from profile) # The output would contain error messages if the asserts in the benchmark fail. expected = ["<%d>" % j for j in range(1, 12)] - assert times['time_examples.TimeWithRepeat.time_it']['stderr'].split() == expected + assert times['time_examples.TimeWithRepeat.time_it'].stderr.split() == expected # Calibration of iterations should not rerun setup expected = (['setup']*2, ['setup']*3) - assert times['time_examples.TimeWithRepeatCalibrate.time_it']['stderr'].split() in expected + assert times['time_examples.TimeWithRepeatCalibrate.time_it'].stderr.split() in expected # Check run time timestamps for name, result in times.items(): - assert result['started_at'] >= start_timestamp - assert result['ended_at'] >= result['started_at'] - assert result['ended_at'] <= end_timestamp + assert result.started_at >= start_timestamp + assert result.ended_at >= result.started_at + assert result.ended_at <= end_timestamp def test_invalid_benchmark_tree(tmpdir): @@ -318,14 +316,15 @@ def test_quick(tmpdir): b = benchmarks.Benchmarks.discover(conf, repo, envs, [commit_hash]) skip_names = [name for name in b.keys() if name != 'time_examples.TimeWithRepeat.time_it'] - times = b.run_benchmarks(envs[0], quick=True, show_stderr=True, skip=skip_names) + b2 = b.filter_out(skip_names) + times = b2.run_benchmarks(envs[0], quick=True, show_stderr=True) assert len(times) == 1 # Check that the benchmark was run only once. The result for quick==False # is tested above in test_find_benchmarks expected = ["<1>"] - assert times['time_examples.TimeWithRepeat.time_it']['stderr'].split() == expected + assert times['time_examples.TimeWithRepeat.time_it'].stderr.split() == expected def test_code_extraction(tmpdir): @@ -399,3 +398,26 @@ def test_asv_benchmark_timings(): util.check_call([sys.executable, '-masv.benchmark', 'timing', '--setup=import time', 'time.sleep(0)']) + + +def test_skip_param_selection(): + d = {'repo': 'foo'} + d.update(ASV_CONF_JSON) + conf = config.Config.from_json(d) + + class DummyEnv(object): + name = 'env' + + d = [ + {'name': 'test_nonparam', 'params': []}, + {'name': 'test_param', + 'params': [['1', '2', '3']], + 'param_names': ['n']} + ] + + b = benchmarks.Benchmarks(conf, d, [r'test_nonparam', r'test_param\([23]\)']) + result = b.skip_benchmarks(DummyEnv()) + + assert result['test_nonparam'].result == None + assert util.is_nan(result['test_param'].result[0]) + assert result['test_param'].result[1:] == [None, None] diff --git a/test/test_results.py b/test/test_results.py index 6a02e3c3b..cd6230f84 100644 --- a/test/test_results.py +++ b/test/test_results.py @@ -11,7 +11,7 @@ import six -from asv import results, util +from asv import results, runner, util import pytest @@ -46,9 +46,13 @@ def test_results(tmpdir): for key, val in values.items(): val = dict(val) - val['started_at'] = timestamp1 - val['ended_at'] = timestamp2 - r.add_result(key, val, val.pop("version")) + version = val.pop('version') + val = runner.BenchmarkResult(started_at=timestamp1, + ended_at=timestamp2, + errcode=0, + stderr='', + **val) + r.add_result(key, val, version, record_samples=True) # Save / add_existing_results roundtrip r.save(resultsdir) @@ -143,14 +147,17 @@ def test_json_timestamp(tmpdir): r = results.Results({'machine': 'mach'}, {}, 'aaaa', util.datetime_to_timestamp(stamp0), 'py', 'env') - value = { - 'result': [42], - 'params': [], - 'stats': None, - 'samples': None, - 'started_at': stamp1, - 'ended_at': stamp2 - } + value = runner.BenchmarkResult( + result=[42], + params=[], + stats=None, + samples=None, + started_at=stamp1, + ended_at=stamp2, + profile=None, + errcode=0, + stderr='' + ) r.add_result('some_benchmark', value, "some version") r.save(tmpdir) diff --git a/test/test_subprocess.py b/test/test_subprocess.py index f883ec73b..320ff1984 100644 --- a/test/test_subprocess.py +++ b/test/test_subprocess.py @@ -132,6 +132,21 @@ def test_no_timeout(): assert retcode == 0 +def test_stderr_redirect(): + # Check redirecting stderr to stdout works + code = ("import sys;" + "sys.stdout.write('OUT\\n');" + "sys.stdout.flush();" + "sys.stderr.write('ERR\\n')") + out = util.check_output([sys.executable, "-c", code], redirect_stderr=True) + assert out.splitlines() == ['OUT', 'ERR'] + out, err, retcode = util.check_output([sys.executable, "-c", code], + return_stderr=True, redirect_stderr=True) + assert out.splitlines() == ['OUT', 'ERR'] + assert err == '' + assert retcode == 0 + + # This *does* seem to work, only seems untestable somehow... # def test_dots(capsys): # code = r""" diff --git a/test/tools.py b/test/tools.py index eae70ff17..0aebfb4ef 100644 --- a/test/tools.py +++ b/test/tools.py @@ -31,6 +31,7 @@ from asv import util from asv import commands from asv import config +from asv import runner from asv.commands.preview import create_httpd from asv.repo import get_repo from asv.results import Results @@ -388,15 +389,17 @@ def generate_result_dir(tmpdir, dvcs, values, branches=None): params = value["params"] result = Results({"machine": "tarzan"}, {}, commit, repo.get_date_from_name(commit), "2.7", None) - value = { - 'result': [value], - 'params': [], - 'started_at': timestamp, - 'ended_at': timestamp, - 'stats': None, - 'samples': None, - 'number': None, - } + value = runner.BenchmarkResult( + result=[value], + samples=None, + stats=None, + params=[], + errcode=0, + stderr='', + profile=None, + started_at=timestamp, + ended_at=timestamp + ) result.add_result("time_func", value, benchmark_version) result.save(result_dir)