diff --git a/asv/commands/continuous.py b/asv/commands/continuous.py index 17f1f16fb..05656729e 100644 --- a/asv/commands/continuous.py +++ b/asv/commands/continuous.py @@ -41,6 +41,13 @@ def setup_arguments(cls, subparsers): run only once. This is useful to find basic errors in the benchmark functions faster. The results are unlikely to be useful, and thus are not saved.""") + parser.add_argument( + "--interleave-processes", action="store_true", default=None, + help="""Interleave benchmarks with multiple processes across + commits. This can avoid measurement biases from commit ordering, + can take longer.""") + parser.add_argument( + "--no-interleave-processes", action="store_false", dest="interleave_processes") common_args.add_compare(parser, sort_default='ratio', only_changed_default=True) common_args.add_show_stderr(parser) common_args.add_bench(parser) @@ -60,7 +67,7 @@ def run_from_conf_args(cls, conf, args, **kwargs): machine=args.machine, env_spec=args.env_spec, record_samples=args.record_samples, append_samples=args.append_samples, - quick=args.quick, **kwargs + quick=args.quick, interleave_processes=args.interleave_processes, **kwargs ) @classmethod @@ -68,7 +75,7 @@ def run(cls, conf, branch=None, base=None, factor=None, split=False, only_changed=True, sort='ratio', show_stderr=False, bench=None, attribute=None, machine=None, env_spec=None, record_samples=False, append_samples=False, - quick=False, _machine_file=None): + quick=False, interleave_processes=None, _machine_file=None): repo = get_repo(conf) repo.pull() @@ -88,6 +95,7 @@ def run(cls, conf, branch=None, base=None, conf, range_spec=commit_hashes, bench=bench, attribute=attribute, show_stderr=show_stderr, machine=machine, env_spec=env_spec, record_samples=record_samples, append_samples=append_samples, quick=quick, + interleave_processes=interleave_processes, _returns=run_objs, _machine_file=_machine_file) if result: return result diff --git a/asv/commands/run.py b/asv/commands/run.py index 353e5cc13..399e1b0bf 100644 --- a/asv/commands/run.py +++ b/asv/commands/run.py @@ -4,12 +4,14 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) -import six - import logging import traceback +import itertools + from collections import defaultdict +import six + from . import Command from ..benchmarks import Benchmarks from ..console import log @@ -107,6 +109,13 @@ def setup_arguments(cls, subparsers): help="""Skip running benchmarks that have previous successful or failed results""") common_args.add_record_samples(parser) + parser.add_argument( + "--interleave-processes", action="store_true", default=False, + help="""Interleave benchmarks with multiple processes across + commits. This can avoid measurement biases from commit ordering, + can take longer.""") + parser.add_argument( + "--no-interleave-processes", action="store_false", dest="interleave_processes") parser.add_argument( "--no-pull", action="store_true", help="Do not pull the repository") @@ -127,7 +136,7 @@ def run_from_conf_args(cls, conf, args, **kwargs): skip_failed=args.skip_existing_failed or args.skip_existing, skip_existing_commits=args.skip_existing_commits, record_samples=args.record_samples, append_samples=args.append_samples, - pull=not args.no_pull, + pull=not args.no_pull, interleave_processes=args.interleave_processes, **kwargs ) @@ -136,7 +145,7 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para show_stderr=False, quick=False, profile=False, env_spec=None, dry_run=False, machine=None, _machine_file=None, skip_successful=False, skip_failed=False, skip_existing_commits=False, record_samples=False, - append_samples=False, pull=True, _returns={}): + append_samples=False, pull=True, interleave_processes=False, _returns={}): machine_params = Machine.load( machine_name=machine, _path=_machine_file, interactive=True) @@ -148,6 +157,22 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para # No repository required, so skip using it conf.dvcs = "none" + has_existing_env = any(isinstance(env, environment.ExistingEnvironment) + for env in environments) + + if interleave_processes: + if dry_run: + raise util.UserError("--interleave-commits and --dry-run cannot be used together") + if has_existing_env: + raise util.UserError("--interleave-commits cannot be used with existing environment " + "(or python=same)") + elif interleave_processes is None: + # Enable if possible + interleave_processes = not (dry_run or has_existing_env) + + if append_samples: + record_samples = True + repo = get_repo(conf) if pull: repo.pull() @@ -205,7 +230,6 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para "({1} commits * {2} environments * {3} benchmarks)".format( steps, len(commit_hashes), len(environments), len(benchmarks))) - log.set_nitems(steps) parallel, multiprocessing = util.get_multiprocessing(parallel) @@ -213,16 +237,24 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para _returns['environments'] = environments _returns['machine_params'] = machine_params.__dict__ - for commit_hash in commit_hashes: - skipped_benchmarks = defaultdict(lambda: set()) + if attribute and 'processes' in attribute: + max_processes = int(attribute['processes']) + else: + max_processes = max(b.get('processes', 1) + for b in six.itervalues(benchmarks)) + + log.set_nitems(steps * max_processes) + + skipped_benchmarks = defaultdict(lambda: set()) + for commit_hash in commit_hashes: if skip_successful or skip_failed or skip_existing_commits: try: for result in iter_results_for_machine_and_hash( conf.results_dir, machine_params.machine, commit_hash): if skip_existing_commits: - skipped_benchmarks[result.env_name].update(benchmarks) + skipped_benchmarks[commit_hash] = True break for key in result.get_result_keys(benchmarks): @@ -234,28 +266,65 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para failed = value is None or (isinstance(value, list) and None in value) if skip_failed and failed: - skipped_benchmarks[result.env_name].add(key) + skipped_benchmarks[(commit_hash, result.env_name)].add(key) if skip_successful and not failed: - skipped_benchmarks[result.env_name].add(key) + skipped_benchmarks[(commit_hash, result.env_name)].add(key) except IOError: pass + if interleave_processes: + run_round_set = [[j] for j in range(max_processes, 0, -1)] + else: + run_round_set = [None] + + def iter_rounds_commits(): + for run_rounds in run_round_set: + if interleave_processes and run_rounds[0] % 2 == 0: + for commit_hash in commit_hashes[::-1]: + yield run_rounds, commit_hash + else: + for commit_hash in commit_hashes: + yield run_rounds, commit_hash + + for run_rounds, commit_hash in iter_rounds_commits(): + if commit_hash in skipped_benchmarks: + for env in environments: + for bench in benchmarks: + if interleave_processes: + log.step() + else: + for j in range(max_processes): + log.step() + continue + for env in environments: + skip_list = skipped_benchmarks[(commit_hash, env.name)] for bench in benchmarks: - if bench in skipped_benchmarks[env.name]: - log.step() + if bench in skip_list: + if interleave_processes: + log.step() + else: + for j in range(max_processes): + log.step() active_environments = [env for env in environments if set(six.iterkeys(benchmarks)) - .difference(skipped_benchmarks[env.name])] + .difference(skipped_benchmarks[(commit_hash, env.name)])] if not active_environments: continue if commit_hash: + if interleave_processes: + round_info = " (round {}/{})".format( + max_processes - run_rounds[0] + 1, + max_processes) + else: + round_info = "" + log.info( - "For {0} commit hash {1}:".format( - conf.project, commit_hash[:8])) + "For {0} commit hash {1}{2}:".format( + conf.project, commit_hash[:8], round_info)) with log.indent(): @@ -288,7 +357,8 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para skip_save = (dry_run or isinstance(env, environment.ExistingEnvironment)) - benchmark_set = benchmarks.filter_out(skipped_benchmarks[env.name]) + skip_list = skipped_benchmarks[(commit_hash, env.name)] + benchmark_set = benchmarks.filter_out(skip_list) result = Results( params, @@ -301,13 +371,23 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para if not skip_save: result.load_data(conf.results_dir) + # If we are interleaving commits, we need to + # append samples (except for the first round) + # and record samples (except for the final + # round). + force_append_samples = (interleave_processes and + run_rounds[0] < max_processes) + force_record_samples = (interleave_processes and + run_rounds[0] > 1) + if success: run_benchmarks( benchmark_set, env, results=result, show_stderr=show_stderr, quick=quick, profile=profile, extra_params=attribute, - record_samples=record_samples, - append_samples=append_samples) + record_samples=(record_samples or force_record_samples), + append_samples=(append_samples or force_append_samples), + run_rounds=run_rounds) else: skip_benchmarks(benchmark_set, env, results=result) diff --git a/asv/results.py b/asv/results.py index af658dbf3..5abe2f162 100644 --- a/asv/results.py +++ b/asv/results.py @@ -402,6 +402,19 @@ def remove_result(self, key): # Remove version (may be missing) self._benchmark_version.pop(key, None) + def remove_samples(self, key, selected_idx=None): + """ + Remove measurement samples from the selected benchmark. + """ + if key not in self._results: + raise ValueError(key) + + if selected_idx is None: + self._samples[key] = None + elif self._samples[key] is not None: + for j in selected_idx: + self._samples[key][j] = None + def add_result(self, benchmark, result, started_at=None, ended_at=None, record_samples=False, diff --git a/asv/runner.py b/asv/runner.py index 93db841cd..517137cbc 100644 --- a/asv/runner.py +++ b/asv/runner.py @@ -8,8 +8,6 @@ import json import os import re -import sys -import shutil import time import tempfile import itertools @@ -18,10 +16,10 @@ import six -from .console import log, truncate_left +from .console import log from .results import Results -from . import util from . import statistics +from . import util WIN = (os.name == "nt") @@ -108,7 +106,8 @@ def skip_benchmarks(benchmarks, env, results=None): def run_benchmarks(benchmarks, env, results=None, show_stderr=False, quick=False, profile=False, extra_params=None, - record_samples=False, append_samples=False): + record_samples=False, append_samples=False, + run_rounds=None): """ Run all of the benchmarks in the given `Environment`. @@ -138,6 +137,9 @@ def run_benchmarks(benchmarks, env, results=None, append_samples : bool, optional Whether to retain any previously measured result samples and use them in statistics computations. + run_rounds : sequence of int, optional + Run rounds for benchmarks with multiple processes. + If None, run all rounds. Returns ------- @@ -160,9 +162,6 @@ def run_benchmarks(benchmarks, env, results=None, if results is None: results = Results.unnamed() - if append_samples: - record_samples = True - # Find all setup_cache routines needed setup_cache_timeout = {} benchmark_order = {} @@ -185,14 +184,32 @@ def get_processes(benchmark): max_processes = max(max_processes, get_processes(benchmark)) cache_users.setdefault(key, set()).add(name) + if run_rounds is None: + run_rounds = list(range(1, max_processes + 1)) + # Interleave benchmark runs, in setup_cache order + existing_results = results.get_result_keys(benchmarks) + def iter_run_items(): - for run_round in range(max_processes, 0, -1): + for run_round in run_rounds[::-1]: for setup_cache_key, benchmark_set in six.iteritems(benchmark_order): for name, benchmark in benchmark_set: + log.step() + processes = get_processes(benchmark) + if run_round > processes: + if (not append_samples and + run_round == run_rounds[-1] and + name in existing_results): + # We need to remove samples here so that + # append_samples=False has an effect on all + # benchmarks regardless of whether they were + # run this round. + selected_idx = benchmarks.benchmark_selection.get(name) + results.remove_samples(name, selected_idx) continue + is_final = (run_round == 1) yield name, benchmark, setup_cache_key, is_final @@ -204,7 +221,7 @@ def iter_run_items(): started_at = datetime.datetime.utcnow() if append_samples: - previous_result_keys = results.get_result_keys(benchmarks) + previous_result_keys = existing_results else: previous_result_keys = set() @@ -216,9 +233,6 @@ def iter_run_items(): try: for name, benchmark, setup_cache_key, is_final in iter_run_items(): - if is_final: - log.step() - selected_idx = benchmarks.benchmark_selection.get(name) # Don't try to rerun failed benchmarks diff --git a/test/test_results.py b/test/test_results.py index 2d9f3efc2..9e960376c 100644 --- a/test/test_results.py +++ b/test/test_results.py @@ -210,3 +210,30 @@ def test_filename_format(): r = results.Results({'machine': 'foo'}, [], "hash", 0, "", "a"*128) assert r._filename == join("foo", "hash-env-e510683b3f5ffe4093d021808bc6ff70.json") + + +def test_remove_samples(): + benchmark1 = {'name': 'a', 'version': '1', 'params': []} + benchmark2 = {'name': 'b', 'version': '1', 'params': [['1', '2', '3']]} + + r = results.Results.unnamed() + + v1 = runner.BenchmarkResult(result=[True], samples=[[1]], number=[1], + profile=None, errcode=0, stderr='') + v2 = runner.BenchmarkResult(result=[True]*3, samples=[[1],[2],[3]], number=[1,1,1], + profile=None, errcode=0, stderr='') + + r.add_result(benchmark1, v1, record_samples=True) + r.add_result(benchmark2, v2, record_samples=True) + + assert r.get_result_samples(benchmark1['name'], benchmark1['params']) == v1.samples + assert r.get_result_samples(benchmark2['name'], benchmark2['params']) == v2.samples + + r.remove_samples(benchmark1['name']) + assert r.get_result_samples(benchmark1['name'], benchmark1['params']) == [None] + + r.remove_samples(benchmark2['name'], selected_idx=[1]) + assert r.get_result_samples(benchmark2['name'], benchmark2['params']) == [[1], None, [3]] + + r.remove_samples(benchmark2['name']) + assert r.get_result_samples(benchmark2['name'], benchmark2['params']) == [None, None, None] diff --git a/test/test_workflow.py b/test/test_workflow.py index 9608595a2..fbb72a7a7 100644 --- a/test/test_workflow.py +++ b/test/test_workflow.py @@ -4,18 +4,21 @@ from __future__ import (absolute_import, division, print_function, unicode_literals) +import re import glob import os -from os.path import abspath, dirname, join, isfile, relpath import shutil import sys import datetime +import json import six -import json import pytest +from os.path import abspath, dirname, join, isfile, relpath + from asv import config, environment, util +from asv.results import iter_results_for_machine from asv.util import check_output, which from . import tools @@ -179,6 +182,9 @@ def test_continuous(capfd, basic_conf): tools.run_asv_with_conf(conf, 'continuous', "master^", '--show-stderr', '--bench=params_examples.track_find_test', '--bench=params_examples.track_param', + '--bench=time_examples.TimeSuite.time_example_benchmark_1', + '--attribute=repeat=1', '--attribute=number=1', + '--attribute=warmup_time=0', *env_spec, _machine_file=machine_file) text, err = capfd.readouterr() @@ -186,6 +192,16 @@ def test_continuous(capfd, basic_conf): assert "+ 1.00s 6.00s 6.00 params_examples.track_find_test(2)" in text assert "params_examples.ClassOne" in text + # Check processes were interleaved (timing benchmark was run twice) + assert re.search(r"For.*commit hash [a-f0-9]+ \(round 1/2\)", text, re.M) + + result_found = False + for results in iter_results_for_machine(conf.results_dir, "orangutan"): + result_found = True + stats = results.get_result_stats('time_examples.TimeSuite.time_example_benchmark_1', []) + assert stats[0]['repeat'] == 2 + assert result_found + def test_find(capfd, basic_conf): tmpdir, local, conf, machine_file = basic_conf