Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DAOS-16217 test: Use subprocess.run() for run_local() #14882

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/tests/ftest/util/dfuse_utils.py
Original file line number Diff line number Diff line change
@@ -503,7 +503,7 @@ def __init__(self, hosts, namespace="/run/verify_perms/*"):

# run options
self.hosts = hosts.copy()
self.timeout = 240
self.timeout = 120

# Most usage requires root permission
self.run_user = 'root'
45 changes: 2 additions & 43 deletions src/tests/ftest/util/launch_utils.py
Original file line number Diff line number Diff line change
@@ -7,12 +7,9 @@
import logging
import os
import re
import shlex
import subprocess # nosec
import sys
import time
from pathlib import Path
from socket import gethostname

from ClusterShell.NodeSet import NodeSet
from slurm_setup import SlurmSetup, SlurmSetupException
@@ -392,8 +389,9 @@ def execute(self, logger, test, repeat, number, sparse, fail_fast):
"[Test %s/%s] Running the %s test on repetition %s/%s",
number, self.total_tests, test, repeat, self.total_repeats)
start_time = int(time.time())
return_code = self._run_subprocess(logger, " ".join(command)).returncode
result = run_local(logger, " ".join(command), capture_output=False)
end_time = int(time.time())
return_code = result.output[0].returncode
if return_code == 0:
logger.debug("All avocado test variants passed")
elif return_code & 1 == 1:
@@ -836,45 +834,6 @@ def _collect_crash_files(self, logger):
else:
logger.debug("No avocado crash files found in %s", crash_dir)

@staticmethod
def _run_subprocess(log, command):
"""Run the command locally.

Args:
log (logger): logger for the messages produced by this method
command (str): command from which to obtain the output

Raises:
RunException: if the command fails: is interrupted by the user, or
encounters some other exception.

Returns:
subprocess.CompletedProcess: an object representing the result of the command
execution with the following properties:
- args (the command argument)
- returncode
"""
local_host = gethostname().split(".")[0]
kwargs = {"encoding": "utf-8", "shell": False, "check": False, "timeout": None}
log.debug("Running on %s: %s", local_host, command)

try:
# pylint: disable=subprocess-run-check
return subprocess.run(shlex.split(command), **kwargs) # nosec

except KeyboardInterrupt as error:
# User Ctrl-C
message = f"Command '{command}' interrupted by user"
log.debug(message)
raise RunException(message) from error

except Exception as error:
# Catch all
message = f"Command '{command}' encountered unknown error"
log.debug(message)
log.debug(str(error))
raise RunException(message) from error


class TestGroup():
"""Runs a group of tests with same configuration."""
114 changes: 100 additions & 14 deletions src/tests/ftest/util/run_utils.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
SPDX-License-Identifier: BSD-2-Clause-Patent
"""
import os
import subprocess # nosec
import time
from getpass import getuser
from socket import gethostname
@@ -71,6 +72,69 @@ def passed(self):
return self.returncode == 0


class LocalTask():
"""A mock ClusterShell.Task object for subprocess command output."""

def __init__(self, host, return_code, stdout, stderr, timed_out):
"""Create a LocalTask.

Args:
host (NodeSet): host from which the command was executed
return_code (int): executed command's return code
stdout (str): executed command's stdout
stderr (str): executed command's stderr
timed_out (bool) did the executed command time out
"""
self._return_codes = {return_code: [host]}
self._stdout = {stdout if stdout is not None else '': [host]}
self._stderr = {stderr if stderr is not None else '': [host]}
self._timeout_sources = []
if timed_out:
self._timeout_sources.append(host)

def iter_retcodes(self):
"""Iterate over return codes of the local command result.

Yields:
tuple: return code (int), hosts (list)
"""
yield from self._return_codes.items()

def iter_keys_timeout(self):
"""Iterate over hosts that timed out.

Yields:
str: host where the command timed out
"""
yield from self._timeout_sources

def iter_buffers(self, match_keys=None):
"""Iterate over the command stdout for each host.

Args:
match_keys (list, optional): only return output matching these hosts. Defaults to None.

Returns:
tuple: command stdout (str), hosts (list)
"""
for output, hosts in self._stdout.items():
if match_keys is None or hosts[0] in match_keys:
yield output, hosts

def iter_errors(self, match_keys=None):
"""Iterate over the command stderr for each host.

Args:
match_keys (list, optional): only return output matching these hosts. Defaults to None.

Returns:
tuple: command stderr (str), hosts (list)
"""
for output, hosts in self._stderr.items():
if match_keys is None or hosts[0] in match_keys:
yield output, hosts


class CommandResult():
"""Groups of command results from the same hosts with the same return status."""

@@ -305,7 +369,7 @@ def get_clush_command(hosts, args=None, command="", command_env=None, command_su
return " ".join(cmd_list)


def run_local(log, command, verbose=True, timeout=None, task_debug=False, stderr=False):
def run_local(log, command, verbose=True, timeout=None, stderr=False, capture_output=True):
"""Run the command on the local host.

Args:
@@ -314,27 +378,49 @@ def run_local(log, command, verbose=True, timeout=None, task_debug=False, stderr
verbose (bool, optional): log the command output. Defaults to True.
timeout (int, optional): number of seconds to wait for the command to complete.
Defaults to None.
task_debug (bool, optional): whether to enable debug for the task object. Defaults to False.
stderr (bool, optional): whether to enable stdout/stderr separation. Defaults to False.
capture_output (bool, optional): whether to include stdout/stderr in the CommandResult.
Defaults to True.

Returns:
CommandResult: groups of command results from the same hosts with the same return status
"""
local_host = NodeSet(gethostname().split(".")[0])
task = task_self()
task.set_info('debug', task_debug)
task.set_default("stderr", stderr)
if verbose:
if timeout is None:
log.debug("Running on %s without a timeout: %s", local_host, command)
else:
log.debug("Running on %s with a %s second timeout: %s", local_host, timeout, command)
task.run(command=command, key=str(local_host), timeout=timeout)
kwargs = {
"encoding": "utf-8",
"shell": True,
"check": False,
"timeout": timeout,
"env": os.environ.copy()
}
if capture_output:
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.PIPE if stderr else subprocess.STDOUT
daltonbohning marked this conversation as resolved.
Show resolved Hide resolved

if timeout and verbose:
log.debug("Running on %s with a %s timeout: %s", local_host, timeout, command)
elif verbose:
log.debug("Running on %s: %s", local_host, command)

try:
# pylint: disable=subprocess-run-check
process = subprocess.run(command, **kwargs) # nosec
task = LocalTask(local_host, process.returncode, process.stdout, process.stderr, False)

except subprocess.TimeoutExpired as error:
# Raised if command times out
task = LocalTask(local_host, 124, error.stdout, error.stderr, True)

except Exception as error: # pylint: disable=broad-except
# Catch all
task = LocalTask(local_host, 255, None, str(error), False)

results = CommandResult(command, task)
if verbose:
if capture_output and verbose:
# Log any captured command output when requested
results.log_output(log)
else:
# Always log any failed commands
elif capture_output:
# Always log any failed commands whose output was captured
for data in results.output:
if not data.passed:
log_result_data(log, data)
Loading