Skip to content

Commit

Permalink
Merge branch 'master' into dynamic-schedule
Browse files Browse the repository at this point in the history
  • Loading branch information
mtreinish authored Aug 9, 2020
2 parents e47ad6e + bbc839f commit e8989ca
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 98 deletions.
192 changes: 96 additions & 96 deletions stestr/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,103 +86,103 @@ def consume_queue(groups):

def partition_tests(test_ids, concurrency, repository, group_callback,
randomize=False):
"""Partition test_ids by concurrency.
Test durations from the repository are used to get partitions which
have roughly the same expected runtime. New tests - those with no
recorded duration - are allocated in round-robin fashion to the
partitions created using test durations.
:param list test_ids: The list of test_ids to be partitioned
:param int concurrency: The concurrency that will be used for running
the tests. This is the number of partitions that test_ids will be
split into.
:param repository: A repository object that
:param group_callback: A callback function that is used as a scheduler
hint to group test_ids together and treat them as a single unit for
scheduling. This function expects a single test_id parameter and it
will return a group identifier. Tests_ids that have the same group
identifier will be kept on the same worker.
:param bool randomize: If true each partition's test order will be
randomized
:return: A list where each element is a distinct subset of test_ids,
and the union of all the elements is equal to set(test_ids).
"""
def noop(_):
return None

_group_callback = group_callback
partitions = [list() for i in range(concurrency)]
timed_partitions = [[0.0, partition] for partition in partitions]
time_data = {}
if repository:
time_data = repository.get_test_times(test_ids)
timed_tests = time_data['known']
unknown_tests = time_data['unknown']
else:
timed_tests = {}
unknown_tests = set(test_ids)
# Group tests: generate group_id -> test_ids.
group_ids = collections.defaultdict(list)
if _group_callback is None:
group_callback = noop
else:
group_callback = _group_callback
for test_id in test_ids:
group_id = group_callback(test_id) or test_id
group_ids[group_id].append(test_id)
# Time groups: generate three sets of groups:
# - fully timed dict(group_id -> time),
# - partially timed dict(group_id -> time) and
# - unknown (set of group_id)
# We may in future treat partially timed different for scheduling, but
# at least today we just schedule them after the fully timed groups.
timed = {}
partial = {}
unknown = []
for group_id, group_tests in group_ids.items():
untimed_ids = unknown_tests.intersection(group_tests)
group_time = sum(
[timed_tests[test_id]
for test_id in untimed_ids.symmetric_difference(
group_tests)])
if not untimed_ids:
timed[group_id] = group_time
elif group_time:
partial[group_id] = group_time
else:
unknown.append(group_id)

# Scheduling is NP complete in general, so we avoid aiming for
# perfection. A quick approximation that is sufficient for our general
# needs:
# sort the groups by time
# allocate to partitions by putting each group in to the partition with
# the current (lowest time, shortest length[in tests])
def consume_queue(groups):
queue = sorted(
groups.items(), key=operator.itemgetter(1), reverse=True)
for group_id, duration in queue:
timed_partitions[0][0] = timed_partitions[0][0] + duration
timed_partitions[0][1].extend(group_ids[group_id])
timed_partitions.sort(key=lambda item: (item[0], len(item[1])))

consume_queue(timed)
consume_queue(partial)
# Assign groups with entirely unknown times in round robin fashion to
# the partitions.
for partition, group_id in zip(itertools.cycle(partitions), unknown):
partition.extend(group_ids[group_id])
if randomize:
out_parts = []
for partition in partitions:
temp_part = list(partition)
random.shuffle(temp_part)
out_parts.append(list(temp_part))
return out_parts
"""Partition test_ids by concurrency.
Test durations from the repository are used to get partitions which
have roughly the same expected runtime. New tests - those with no
recorded duration - are allocated in round-robin fashion to the
partitions created using test durations.
:param list test_ids: The list of test_ids to be partitioned
:param int concurrency: The concurrency that will be used for running
the tests. This is the number of partitions that test_ids will be
split into.
:param repository: A repository object that
:param group_callback: A callback function that is used as a scheduler
hint to group test_ids together and treat them as a single unit for
scheduling. This function expects a single test_id parameter and it
will return a group identifier. Tests_ids that have the same group
identifier will be kept on the same worker.
:param bool randomize: If true each partition's test order will be
randomized
:return: A list where each element is a distinct subset of test_ids,
and the union of all the elements is equal to set(test_ids).
"""
def noop(_):
return None

_group_callback = group_callback
partitions = [list() for i in range(concurrency)]
timed_partitions = [[0.0, partition] for partition in partitions]
time_data = {}
if repository:
time_data = repository.get_test_times(test_ids)
timed_tests = time_data['known']
unknown_tests = time_data['unknown']
else:
timed_tests = {}
unknown_tests = set(test_ids)
# Group tests: generate group_id -> test_ids.
group_ids = collections.defaultdict(list)
if _group_callback is None:
group_callback = noop
else:
group_callback = _group_callback
for test_id in test_ids:
group_id = group_callback(test_id) or test_id
group_ids[group_id].append(test_id)
# Time groups: generate three sets of groups:
# - fully timed dict(group_id -> time),
# - partially timed dict(group_id -> time) and
# - unknown (set of group_id)
# We may in future treat partially timed different for scheduling, but
# at least today we just schedule them after the fully timed groups.
timed = {}
partial = {}
unknown = []
for group_id, group_tests in group_ids.items():
untimed_ids = unknown_tests.intersection(group_tests)
group_time = sum(
[timed_tests[test_id]
for test_id in untimed_ids.symmetric_difference(
group_tests)])
if not untimed_ids:
timed[group_id] = group_time
elif group_time:
partial[group_id] = group_time
else:
return partitions
unknown.append(group_id)

# Scheduling is NP complete in general, so we avoid aiming for
# perfection. A quick approximation that is sufficient for our general
# needs:
# sort the groups by time
# allocate to partitions by putting each group in to the partition with
# the current (lowest time, shortest length[in tests])
def consume_queue(groups):
queue = sorted(
groups.items(), key=operator.itemgetter(1), reverse=True)
for group_id, duration in queue:
timed_partitions[0][0] = timed_partitions[0][0] + duration
timed_partitions[0][1].extend(group_ids[group_id])
timed_partitions.sort(key=lambda item: (item[0], len(item[1])))

consume_queue(timed)
consume_queue(partial)
# Assign groups with entirely unknown times in round robin fashion to
# the partitions.
for partition, group_id in zip(itertools.cycle(partitions), unknown):
partition.extend(group_ids[group_id])
if randomize:
out_parts = []
for partition in partitions:
temp_part = list(partition)
random.shuffle(temp_part)
out_parts.append(list(temp_part))
return out_parts
else:
return partitions


def local_concurrency():
Expand Down
2 changes: 1 addition & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.

hacking<1.2.0,>=1.1.0
hacking<3.2.0,>=3.1.0
sphinx>2.1.0 # BSD
subunit2sql>=1.8.0
coverage>=4.0 # Apache-2.0
Expand Down
3 changes: 2 additions & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ commands = sphinx-build -a -E -W -d releasenotes/build/doctrees -b html releasen
# E129 skipped because it is too limiting when combined with other rules
# H305 skipped because it is inconsistent between python versions
# E711 skipped because sqlalchemy filter() requires using == instead of is
ignore = E125,H402,E123,E129,H305,E711
# W504 skipped because it makes multiline operations too hard with W503
ignore = E125,H402,E123,E129,H305,E711,W504
exclude = .venv,.git,.tox,dist,doc,*egg,build,releasenotes

[testenv:pip-check-reqs]
Expand Down

0 comments on commit e8989ca

Please sign in to comment.