Skip to content

Commit

Permalink
Merge pull request #697 from pv/commit-interleave-3
Browse files Browse the repository at this point in the history
Implement --interleave-processes for spreading processes across commits

Interleaving commits can reduce bias in measurements due to commit
ordering, which affects e.g. CPU heating etc.

Turn it on by default for "asv continuous".
  • Loading branch information
pv authored Aug 18, 2018
2 parents f363a22 + 89f9204 commit b652684
Show file tree
Hide file tree
Showing 6 changed files with 193 additions and 35 deletions.
12 changes: 10 additions & 2 deletions asv/commands/continuous.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ def setup_arguments(cls, subparsers):
run only once. This is useful to find basic errors in the
benchmark functions faster. The results are unlikely to
be useful, and thus are not saved.""")
parser.add_argument(
"--interleave-processes", action="store_true", default=None,
help="""Interleave benchmarks with multiple processes across
commits. This can avoid measurement biases from commit ordering,
can take longer.""")
parser.add_argument(
"--no-interleave-processes", action="store_false", dest="interleave_processes")
common_args.add_compare(parser, sort_default='ratio', only_changed_default=True)
common_args.add_show_stderr(parser)
common_args.add_bench(parser)
Expand All @@ -60,15 +67,15 @@ def run_from_conf_args(cls, conf, args, **kwargs):
machine=args.machine,
env_spec=args.env_spec, record_samples=args.record_samples,
append_samples=args.append_samples,
quick=args.quick, **kwargs
quick=args.quick, interleave_processes=args.interleave_processes, **kwargs
)

@classmethod
def run(cls, conf, branch=None, base=None,
factor=None, split=False, only_changed=True, sort='ratio',
show_stderr=False, bench=None,
attribute=None, machine=None, env_spec=None, record_samples=False, append_samples=False,
quick=False, _machine_file=None):
quick=False, interleave_processes=None, _machine_file=None):
repo = get_repo(conf)
repo.pull()

Expand All @@ -88,6 +95,7 @@ def run(cls, conf, branch=None, base=None,
conf, range_spec=commit_hashes, bench=bench, attribute=attribute,
show_stderr=show_stderr, machine=machine, env_spec=env_spec,
record_samples=record_samples, append_samples=append_samples, quick=quick,
interleave_processes=interleave_processes,
_returns=run_objs, _machine_file=_machine_file)
if result:
return result
Expand Down
116 changes: 98 additions & 18 deletions asv/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)

import six

import logging
import traceback
import itertools

from collections import defaultdict

import six

from . import Command
from ..benchmarks import Benchmarks
from ..console import log
Expand Down Expand Up @@ -107,6 +109,13 @@ def setup_arguments(cls, subparsers):
help="""Skip running benchmarks that have previous successful
or failed results""")
common_args.add_record_samples(parser)
parser.add_argument(
"--interleave-processes", action="store_true", default=False,
help="""Interleave benchmarks with multiple processes across
commits. This can avoid measurement biases from commit ordering,
can take longer.""")
parser.add_argument(
"--no-interleave-processes", action="store_false", dest="interleave_processes")
parser.add_argument(
"--no-pull", action="store_true",
help="Do not pull the repository")
Expand All @@ -127,7 +136,7 @@ def run_from_conf_args(cls, conf, args, **kwargs):
skip_failed=args.skip_existing_failed or args.skip_existing,
skip_existing_commits=args.skip_existing_commits,
record_samples=args.record_samples, append_samples=args.append_samples,
pull=not args.no_pull,
pull=not args.no_pull, interleave_processes=args.interleave_processes,
**kwargs
)

Expand All @@ -136,7 +145,7 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para
show_stderr=False, quick=False, profile=False, env_spec=None,
dry_run=False, machine=None, _machine_file=None, skip_successful=False,
skip_failed=False, skip_existing_commits=False, record_samples=False,
append_samples=False, pull=True, _returns={}):
append_samples=False, pull=True, interleave_processes=False, _returns={}):
machine_params = Machine.load(
machine_name=machine,
_path=_machine_file, interactive=True)
Expand All @@ -148,6 +157,22 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para
# No repository required, so skip using it
conf.dvcs = "none"

has_existing_env = any(isinstance(env, environment.ExistingEnvironment)
for env in environments)

if interleave_processes:
if dry_run:
raise util.UserError("--interleave-commits and --dry-run cannot be used together")
if has_existing_env:
raise util.UserError("--interleave-commits cannot be used with existing environment "
"(or python=same)")
elif interleave_processes is None:
# Enable if possible
interleave_processes = not (dry_run or has_existing_env)

if append_samples:
record_samples = True

repo = get_repo(conf)
if pull:
repo.pull()
Expand Down Expand Up @@ -205,24 +230,31 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para
"({1} commits * {2} environments * {3} benchmarks)".format(
steps, len(commit_hashes),
len(environments), len(benchmarks)))
log.set_nitems(steps)

parallel, multiprocessing = util.get_multiprocessing(parallel)

_returns['benchmarks'] = benchmarks
_returns['environments'] = environments
_returns['machine_params'] = machine_params.__dict__

for commit_hash in commit_hashes:
skipped_benchmarks = defaultdict(lambda: set())
if attribute and 'processes' in attribute:
max_processes = int(attribute['processes'])
else:
max_processes = max(b.get('processes', 1)
for b in six.itervalues(benchmarks))

log.set_nitems(steps * max_processes)

skipped_benchmarks = defaultdict(lambda: set())

for commit_hash in commit_hashes:
if skip_successful or skip_failed or skip_existing_commits:
try:
for result in iter_results_for_machine_and_hash(
conf.results_dir, machine_params.machine, commit_hash):

if skip_existing_commits:
skipped_benchmarks[result.env_name].update(benchmarks)
skipped_benchmarks[commit_hash] = True
break

for key in result.get_result_keys(benchmarks):
Expand All @@ -234,28 +266,65 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para
failed = value is None or (isinstance(value, list) and None in value)

if skip_failed and failed:
skipped_benchmarks[result.env_name].add(key)
skipped_benchmarks[(commit_hash, result.env_name)].add(key)
if skip_successful and not failed:
skipped_benchmarks[result.env_name].add(key)
skipped_benchmarks[(commit_hash, result.env_name)].add(key)
except IOError:
pass

if interleave_processes:
run_round_set = [[j] for j in range(max_processes, 0, -1)]
else:
run_round_set = [None]

def iter_rounds_commits():
for run_rounds in run_round_set:
if interleave_processes and run_rounds[0] % 2 == 0:
for commit_hash in commit_hashes[::-1]:
yield run_rounds, commit_hash
else:
for commit_hash in commit_hashes:
yield run_rounds, commit_hash

for run_rounds, commit_hash in iter_rounds_commits():
if commit_hash in skipped_benchmarks:
for env in environments:
for bench in benchmarks:
if interleave_processes:
log.step()
else:
for j in range(max_processes):
log.step()
continue

for env in environments:
skip_list = skipped_benchmarks[(commit_hash, env.name)]
for bench in benchmarks:
if bench in skipped_benchmarks[env.name]:
log.step()
if bench in skip_list:
if interleave_processes:
log.step()
else:
for j in range(max_processes):
log.step()

active_environments = [env for env in environments
if set(six.iterkeys(benchmarks))
.difference(skipped_benchmarks[env.name])]
.difference(skipped_benchmarks[(commit_hash, env.name)])]

if not active_environments:
continue

if commit_hash:
if interleave_processes:
round_info = " (round {}/{})".format(
max_processes - run_rounds[0] + 1,
max_processes)
else:
round_info = ""

log.info(
"For {0} commit hash {1}:".format(
conf.project, commit_hash[:8]))
"For {0} commit hash {1}{2}:".format(
conf.project, commit_hash[:8], round_info))

with log.indent():

Expand Down Expand Up @@ -288,7 +357,8 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para

skip_save = (dry_run or isinstance(env, environment.ExistingEnvironment))

benchmark_set = benchmarks.filter_out(skipped_benchmarks[env.name])
skip_list = skipped_benchmarks[(commit_hash, env.name)]
benchmark_set = benchmarks.filter_out(skip_list)

result = Results(
params,
Expand All @@ -301,13 +371,23 @@ def run(cls, conf, range_spec=None, steps=None, bench=None, attribute=None, para
if not skip_save:
result.load_data(conf.results_dir)

# If we are interleaving commits, we need to
# append samples (except for the first round)
# and record samples (except for the final
# round).
force_append_samples = (interleave_processes and
run_rounds[0] < max_processes)
force_record_samples = (interleave_processes and
run_rounds[0] > 1)

if success:
run_benchmarks(
benchmark_set, env, results=result,
show_stderr=show_stderr, quick=quick,
profile=profile, extra_params=attribute,
record_samples=record_samples,
append_samples=append_samples)
record_samples=(record_samples or force_record_samples),
append_samples=(append_samples or force_append_samples),
run_rounds=run_rounds)
else:
skip_benchmarks(benchmark_set, env, results=result)

Expand Down
13 changes: 13 additions & 0 deletions asv/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,19 @@ def remove_result(self, key):
# Remove version (may be missing)
self._benchmark_version.pop(key, None)

def remove_samples(self, key, selected_idx=None):
"""
Remove measurement samples from the selected benchmark.
"""
if key not in self._results:
raise ValueError(key)

if selected_idx is None:
self._samples[key] = None
elif self._samples[key] is not None:
for j in selected_idx:
self._samples[key][j] = None

def add_result(self, benchmark, result,
started_at=None, ended_at=None,
record_samples=False,
Expand Down
40 changes: 27 additions & 13 deletions asv/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import json
import os
import re
import sys
import shutil
import time
import tempfile
import itertools
Expand All @@ -18,10 +16,10 @@

import six

from .console import log, truncate_left
from .console import log
from .results import Results
from . import util
from . import statistics
from . import util


WIN = (os.name == "nt")
Expand Down Expand Up @@ -108,7 +106,8 @@ def skip_benchmarks(benchmarks, env, results=None):
def run_benchmarks(benchmarks, env, results=None,
show_stderr=False, quick=False, profile=False,
extra_params=None,
record_samples=False, append_samples=False):
record_samples=False, append_samples=False,
run_rounds=None):
"""
Run all of the benchmarks in the given `Environment`.
Expand Down Expand Up @@ -138,6 +137,9 @@ def run_benchmarks(benchmarks, env, results=None,
append_samples : bool, optional
Whether to retain any previously measured result samples
and use them in statistics computations.
run_rounds : sequence of int, optional
Run rounds for benchmarks with multiple processes.
If None, run all rounds.
Returns
-------
Expand All @@ -160,9 +162,6 @@ def run_benchmarks(benchmarks, env, results=None,
if results is None:
results = Results.unnamed()

if append_samples:
record_samples = True

# Find all setup_cache routines needed
setup_cache_timeout = {}
benchmark_order = {}
Expand All @@ -185,14 +184,32 @@ def get_processes(benchmark):
max_processes = max(max_processes, get_processes(benchmark))
cache_users.setdefault(key, set()).add(name)

if run_rounds is None:
run_rounds = list(range(1, max_processes + 1))

# Interleave benchmark runs, in setup_cache order
existing_results = results.get_result_keys(benchmarks)

def iter_run_items():
for run_round in range(max_processes, 0, -1):
for run_round in run_rounds[::-1]:
for setup_cache_key, benchmark_set in six.iteritems(benchmark_order):
for name, benchmark in benchmark_set:
log.step()

processes = get_processes(benchmark)

if run_round > processes:
if (not append_samples and
run_round == run_rounds[-1] and
name in existing_results):
# We need to remove samples here so that
# append_samples=False has an effect on all
# benchmarks regardless of whether they were
# run this round.
selected_idx = benchmarks.benchmark_selection.get(name)
results.remove_samples(name, selected_idx)
continue

is_final = (run_round == 1)
yield name, benchmark, setup_cache_key, is_final

Expand All @@ -204,7 +221,7 @@ def iter_run_items():
started_at = datetime.datetime.utcnow()

if append_samples:
previous_result_keys = results.get_result_keys(benchmarks)
previous_result_keys = existing_results
else:
previous_result_keys = set()

Expand All @@ -216,9 +233,6 @@ def iter_run_items():

try:
for name, benchmark, setup_cache_key, is_final in iter_run_items():
if is_final:
log.step()

selected_idx = benchmarks.benchmark_selection.get(name)

# Don't try to rerun failed benchmarks
Expand Down
Loading

0 comments on commit b652684

Please sign in to comment.