Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add dynamic scheduling operation mode #271

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
41 changes: 36 additions & 5 deletions stestr/commands/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import subprocess
import sys
import warnings

from cliff import command
import subunit
Expand Down Expand Up @@ -244,6 +245,12 @@ def get_parser(self, prog_name):
help="If set, show non-text attachments. This is "
"generally only useful for debug purposes.",
)
parser.add_argument(
"--dynamic",
action="store_true",
default=False,
help="Enable the EXPERIMENTAL dynamic scheduler",
)
return parser

def take_action(self, parsed_args):
Expand Down Expand Up @@ -335,6 +342,7 @@ def take_action(self, parsed_args):
all_attachments=all_attachments,
show_binary_attachments=args.show_binary_attachments,
pdb=args.pdb,
dynamic=args.dynamic,
)

# Always output slowest test info if requested, regardless of other
Expand Down Expand Up @@ -396,6 +404,7 @@ def run_command(
all_attachments=False,
show_binary_attachments=True,
pdb=False,
dynamic=False,
):
"""Function to execute the run command

Expand Down Expand Up @@ -460,6 +469,7 @@ def run_command(
:param str pdb: Takes in a single test_id to bypasses test
discover and just execute the test specified without launching any
additional processes. A file name may be used in place of a test name.
:param bool dynamic: Enable dynamic scheduling

:return return_code: The exit code for the command. 0 for success and > 0
for failures.
Expand Down Expand Up @@ -501,6 +511,11 @@ def run_command(
)
stdout.write(msg)
return 2
if dynamic:
warnings.warn(
"WARNING: The dynamic scheduler is still experimental. "
"You might encounter issues while using it"
)
if combine:
latest_id = repo.latest_id()
combine_id = str(latest_id)
Expand Down Expand Up @@ -542,7 +557,8 @@ def run_tests():
(
"subunit",
output.ReturnCodeToSubunit(
subprocess.Popen(run_cmd, shell=True, stdout=subprocess.PIPE)
subprocess.Popen(run_cmd, shell=True, stdout=subprocess.PIPE),
dynamic=False,
),
)
]
Expand Down Expand Up @@ -645,6 +661,7 @@ def run_tests():
top_dir=top_dir,
test_path=test_path,
randomize=random,
dynamic=dynamic,
)
if isolated:
result = 0
Expand Down Expand Up @@ -684,6 +701,7 @@ def run_tests():
suppress_attachments=suppress_attachments,
all_attachments=all_attachments,
show_binary_attachments=show_binary_attachments,
dynamic=dynamic,
)
if run_result > result:
result = run_result
Expand All @@ -702,6 +720,7 @@ def run_tests():
suppress_attachments=suppress_attachments,
all_attachments=all_attachments,
show_binary_attachments=show_binary_attachments,
dynamic=dynamic,
)
else:
# Where do we source data about the cause of conflicts.
Expand Down Expand Up @@ -771,16 +790,28 @@ def _run_tests(
suppress_attachments=False,
all_attachments=False,
show_binary_attachments=False,
dynamic=False,
):
"""Run the tests cmd was parameterised with."""
cmd.setUp()
try:

def run_tests():
run_procs = [
("subunit", output.ReturnCodeToSubunit(proc))
for proc in cmd.run_tests()
]
if not dynamic or cmd.concurrency == 1:
run_procs = [
("subunit", output.ReturnCodeToSubunit(proc, dynamic=False))
for proc in cmd.run_tests()
]
else:
run_procs = [
(
"subunit",
output.ReturnCodeToSubunit(
os.fdopen(proc["stream"]), proc["proc"]
),
)
for proc in cmd.run_tests()
]
if not run_procs:
stdout.write("The specified regex doesn't match with anything")
return 1
Expand Down
3 changes: 3 additions & 0 deletions stestr/config_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ def get_run_command(
exclude_regex=None,
randomize=False,
parallel_class=None,
dynamic=False,
):
"""Get a test_processor.TestProcessorFixture for this config file

Expand Down Expand Up @@ -158,6 +159,7 @@ def get_run_command(
stestr scheduler by class. If both this and the corresponding
config file option which includes `group-regex` are set, this value
will be used.
:param bool dynamic: Enable dynamic scheduling

:returns: a TestProcessorFixture object for the specified config file
and any arguments passed into this function
Expand Down Expand Up @@ -236,4 +238,5 @@ def group_callback(test_id, regex=re.compile(group_regex)):
exclude_regex=exclude_regex,
include_list=include_list,
randomize=randomize,
dynamic=dynamic,
)
20 changes: 16 additions & 4 deletions stestr/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,33 @@ class ReturnCodeToSubunit:
generating subunit.
"""

def __init__(self, process):
def __init__(self, process, thread=None, dynamic=True):
"""Adapt a process to a readable stream."""
self.proc = process
self.done = False
self.source = self.proc.stdout
if dynamic:
self.source = process
self.proc = thread
else:
self.source = self.proc.stdout
self.dynamic = dynamic
self.lastoutput = bytes((b"\n")[0])

def __del__(self):
self.proc.wait()
if hasattr(self.proc, "wait"):
self.proc.wait()
else:
self.proc.join()

def _append_return_code_as_test(self):
if self.done is True:
return
self.source = io.BytesIO()
returncode = self.proc.wait()
if not self.dynamic:
returncode = self.proc.wait()
else:
self.proc.join()
returncode = self.proc.exitcode
if returncode != 0:
if self.lastoutput != bytes((b"\n")[0]):
# Subunit V1 is line orientated, it has to start on a fresh
Expand Down
68 changes: 68 additions & 0 deletions stestr/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,74 @@
from stestr import selection


def get_dynamic_test_list(
test_ids, repository=None, group_callback=None, randomize=False
):
dynamic_test_list = []
_group_callback = group_callback
time_data = {}
if randomize:
return random.shuffle(test_ids)
if repository:
time_data = repository.get_test_times(test_ids)
timed_tests = time_data["known"]
unknown_tests = time_data["unknown"]
else:
timed_tests = {}
unknown_tests = set(test_ids)
# Group tests: generate group_id -> test_ids.
group_ids = collections.defaultdict(list)
if _group_callback is None:

def group_callback(_):
return None

else:
group_callback = _group_callback
for test_id in test_ids:
group_id = group_callback(test_id) or test_id
group_ids[group_id].append(test_id)
# Time groups: generate three sets of groups:
# - fully timed dict(group_id -> time),
# - partially timed dict(group_id -> time) and
# - unknown (set of group_id)
# We may in future treat partially timed different for scheduling, but
# at least today we just schedule them after the fully timed groups.
timed = {}
partial = {}
unknown = []
for group_id, group_tests in group_ids.items():
untimed_ids = unknown_tests.intersection(group_tests)
group_time = sum(
[
timed_tests[test_id]
for test_id in untimed_ids.symmetric_difference(group_tests)
]
)
if not untimed_ids:
timed[group_id] = group_time
elif group_time:
partial[group_id] = group_time
else:
unknown.append(group_id)

# Scheduling is NP complete in general, so we avoid aiming for
# perfection. A quick approximation that is sufficient for our general
# needs:
# sort the groups by time
# allocate to partitions by putting each group in to the partition with
# the current (lowest time, shortest length[in tests])
def consume_queue(groups):
queue = sorted(groups.items(), key=operator.itemgetter(1), reverse=True)
dynamic_test_list.extend([group[0] for group in queue])

consume_queue(timed)
consume_queue(partial)
dynamic_test_list.extend(unknown)

return dynamic_test_list


def partition_tests(test_ids, concurrency, repository, group_callback, randomize=False):
"""Partition test_ids by concurrency.

Expand Down
81 changes: 67 additions & 14 deletions stestr/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
# License for the specific language governing permissions and limitations
# under the License.

import functools
import io
import multiprocessing
import os
import re
import signal
Expand All @@ -24,6 +26,8 @@
from stestr import results
from stestr import scheduler
from stestr import selection
from stestr.subunit_runner import program
from stestr.subunit_runner import run
from stestr import testlist


Expand Down Expand Up @@ -94,6 +98,7 @@ def __init__(
exclude_regex=None,
include_list=None,
randomize=False,
dynamic=False,
):
"""Create a TestProcessorFixture."""

Expand All @@ -115,6 +120,7 @@ def __init__(
self.include_list = include_list
self.exclude_regex = exclude_regex
self.randomize = randomize
self.dynamic = dynamic

def setUp(self):
super().setUp()
Expand Down Expand Up @@ -249,6 +255,31 @@ def list_tests(self):
ids = testlist.parse_enumeration(out)
return ids

def _dynamic_run_tests(self, job_queue, subunit_pipe):
while True:
# NOTE(mtreinish): Open on each loop iteration with a dup to
# remove the chance of being garbage collected. Without this
# you'll be fighting random Bad file desciptor errors
subunit_pipe = os.fdopen(os.dup(subunit_pipe.fileno()), "wb")
if job_queue.empty():
subunit_pipe.close()
return
try:
test_id = job_queue.get(block=False)
except Exception:
subunit_pipe.close()
return
if not test_id:
os.close(subunit_pipe.fileno())
raise ValueError("Invalid blank test_id: %s" % test_id)
cmd_list = [self.cmd, test_id]
test_runner = run.SubunitTestRunner
program.TestProgram(
module=None,
argv=cmd_list,
testRunner=functools.partial(test_runner, stdout=subunit_pipe),
)

def run_tests(self):
"""Run the tests defined by the command

Expand Down Expand Up @@ -280,19 +311,41 @@ def run_tests(self):
test_id_groups = scheduler.partition_tests(
test_ids, self.concurrency, self.repository, self._group_callback
)
for test_ids in test_id_groups:
if not test_ids:
# No tests in this partition
continue
fixture = self.useFixture(
TestProcessorFixture(
test_ids,
self.template,
self.listopt,
self.idoption,
self.repository,
parallel=False,
if not self.dynamic:
for test_ids in test_id_groups:
if not test_ids:
# No tests in this partition
continue
fixture = self.useFixture(
TestProcessorFixture(
test_ids,
self.template,
self.listopt,
self.idoption,
self.repository,
parallel=False,
)
)
result.extend(fixture.run_tests())
return result
else:
test_id_list = scheduler.get_dynamic_test_list(
test_ids, self.repository, self._group_callback
)
result.extend(fixture.run_tests())
return result
test_list = multiprocessing.Queue()

for test_id in test_id_list:
test_list.put(test_id)

for i in range(self.concurrency):
fd_pipe_r, fd_pipe_w = multiprocessing.Pipe(False)
name = "worker-%s" % i
proc = multiprocessing.Process(
target=self._dynamic_run_tests,
name=name,
args=(test_list, fd_pipe_w),
)
proc.start()
stream_read = os.dup(fd_pipe_r.fileno())
result.append({"stream": stream_read, "proc": proc})
return result
1 change: 1 addition & 0 deletions stestr/tests/test_config_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def _check_get_run_command(
exclude_regex=None,
exclude_list=None,
concurrency=0,
dynamic=False,
group_callback=expected_group_callback,
test_filters=None,
randomize=False,
Expand Down
1 change: 0 additions & 1 deletion stestr/tests/test_return_codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ def assertRunExit(self, cmd, expected, subunit=False, stdin=None):
"%s" % cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
out, err = p.communicate()

if not subunit:
self.assertEqual(
p.returncode, expected, "Stdout: {}; Stderr: {}".format(out, err)
Expand Down
Loading
Loading