diff --git a/scripts/tf-tests/README.md b/scripts/tf-tests/README.md index ee0caed1..712707a0 100644 --- a/scripts/tf-tests/README.md +++ b/scripts/tf-tests/README.md @@ -1,7 +1,7 @@ # Manual integration tests using Testflinger -The purpose of these scripts is to test hardware-api client and server -on the machines that are accessible via +The purpose of the `run-jobs.py` script is to test hardware-api client +and server on the machines that are accessible via [Testflinger](https://github.com/canonical/testflinger). It allows us to test the project components on multiple machines automatically. @@ -15,7 +15,8 @@ installed on your system: sudo snap install testflinger-cli ``` -Also, the following files are required to be present in this directory: +Also, the following files are required to be present in this +directory: - `machines.txt`: This file lists the Canonical IDs of the machines on which jobs will be run. Each Canonical ID should be on a separate @@ -42,76 +43,87 @@ Also, the following files are required to be present in this directory: Then copy the created file to this directory: ```sh - cp target/release/hwctl scripts/tf-tests/ + cp target/release/hwctl scripts/tf-tests/ ``` -## Running the scripts +## Running the script -After you meel the described requirements, make sure you have access +After you meet the described requirements, make sure you have access to [https://testflinger.canonical.com](https://testflinger.canonical.com). -These scripts are designed to work sequentially, where `run-jobs.py` -must be executed first to submit jobs and create job directories, -followed by `check-status.py` to monitor and fetch job results. +The script `run-jobs.py` can be used to submit jobs, monitor their +status, or both, depending on the options you provide. ```sh -./run-jobs.py -# wait until the script is completed -./check-status.py +../tf_test.py [options] ``` -## Scripts overview +Examples: -### `run-jobs.py` +* Submit Jobs and Monitor Statuses Sequentially: `./tf_test.py` +* Only Submit Jobs: `./tf_test.py --send-jobs` +* Only Monitor Job Statuses: `./tf_test.py --check-status` +* Custom Machines File and Poll Interval: `./tf_test.py + --machines-file custom_machines.txt --poll-interval 60` -This script submits jobs based on the Canonical IDs listed in machines.txt and generates directories for each ID. -How it works: -1. Reads each Canonical ID from machines.txt. -2. Replaces `$CANONICAL_ID` in `tf-job.yaml` with the actual ID. +## Script overview + +The script performs two main functions: + +- Job Submission +- Job Monitoring + +### Job Submission + +When submitting jobs, the script: + +1. Reads each Canonical ID from `machines.txt` (or the file specified + with `--machines-file`). +2. Replaces `$CANONICAL_ID` in `tf-job.yaml` (or the file specified + with `--template-file`) with the actual ID. 3. Submits the job with `testflinger submit `. 4. Captures the job UUID returned after submission. -5. Creates a directory for each Canonical ID in `job_outputs/` and saves - the job UUID in a file named `tf_job_id.txt` within that directory. +5. Creates a directory for each Canonical ID in `job_outputs/` (or the + directory specified with `--output-dir`) and saves the job UUID in + a file named `tf_job_id.txt` within that directory. -It creates a directory with the following structure: +Example directory structure after job submission: ``` job_outputs/ ├── 202101-28595/ -│ ├── tf_job_id.txt # Contains the job UUID +│ └── tf_job_id.txt # Contains the job UUID ├── 202012-28526/ -│ ├── tf_job_id.txt +│ └── tf_job_id.txt ``` -Each `tf_job_id.txt` file will contain the job UUID for the respective Canonical ID. - -### `check-status.py` - -This script monitors the status of each job until all jobs are -completed. For each completed job, it retrieves the test results and -saves them in the corresponding directory. +### Job Monitoring -How it works: +When monitoring jobs, the script: 1. Reads `tf_job_id.txt` files in `job_outputs/` to get the job UUIDs. -2. Enters a loop, checking the status of each job using `testflinger status `. -3. For jobs with status "complete", retrieves results using `testflinger-cli results | jq -r .test_output`. -4. Saves the test output to `output.txt` within the respective Canonical ID’s directory. -5. Extracts `hwapi` status and writes it to the `hw_status.txt` +2. Enters a loop, checking the status of each job using `testflinger + status `. +3. For jobs with status "complete", retrieves results using + `testflinger-cli results `. +4. Saves the test output to `output.txt` within the respective + Canonical ID’s directory. +5. Extracts the status field from the test output and writes it to + `hw_status.txt`. 6. Continues monitoring until all jobs are completed. -Each Canonical ID directory will contain output.txt with the test results. +Example directory structure after job monitoring: ``` job_outputs/ ├── 202101-28595/ │ ├── tf_job_id.txt │ ├── output.txt # Contains test output -| ├── hw_status.txt # hwapi status +│ └── hw_status.txt # Contains the hardware API status ├── 202012-28526/ │ ├── tf_job_id.txt │ ├── output.txt -│ ├── hw_status.txt +│ └── hw_status.txt ``` diff --git a/scripts/tf-tests/check-status.py b/scripts/tf-tests/check-status.py deleted file mode 100755 index fdefba0d..00000000 --- a/scripts/tf-tests/check-status.py +++ /dev/null @@ -1,127 +0,0 @@ -#!/usr/bin/env python3 - -import re -import subprocess -import json -from time import sleep -from pathlib import Path -import argparse - - -def parse_arguments(): - parser = argparse.ArgumentParser( - description="Monitor job status and retrieve results." - ) - parser.add_argument( - "--output-dir", - type=Path, - default="job_outputs", - help="Path to job outputs directory", - ) - parser.add_argument( - "--poll-interval", - type=int, - default=30, - help="Time delay between status checks in seconds", - ) - return parser.parse_args() - - -def load_jobs(output_dir): - """Load job IDs and directories from the job outputs directory.""" - jobs = [] - for id_dir in output_dir.iterdir(): - if id_dir.is_dir(): - job_id_file = id_dir / "tf_job_id.txt" - with open(job_id_file, "r") as file: - job_id = file.read().strip() - jobs.append((job_id, id_dir)) - return {job_id: id_dir for job_id, id_dir in jobs} - - -def check_job_status(job_id): - """Check the status of a job by its job ID.""" - try: - status_result = subprocess.run( - ["testflinger", "status", job_id], - capture_output=True, - text=True, - check=True, - ) - return status_result.stdout.strip() - except subprocess.CalledProcessError as e: - print(f"Error checking status for job {job_id}: {e.stderr}") - return None - - -def extract_status_from_output(test_output): - """Extracts the status value from the test output JSON.""" - match_ = re.search(r'"status":\s*"([^"]+)"', test_output) - return match_.group(1) if match_ else "Unknown" - - -def retrieve_job_results(job_id, id_dir): - """Retrieve and save the results of a completed job, including extracting and saving the status.""" - try: - results_result = subprocess.run( - ["testflinger-cli", "results", job_id], - capture_output=True, - text=True, - check=True, - ) - - results_data = json.loads(results_result.stdout) - test_output = results_data.get("test_output", "") - - # Write test output to output.txt in the Canonical ID directory - with open(id_dir / "output.txt", "w") as file: - file.write(test_output) - - # Extract the "status" field from the test output and write to hw_status.txt - status = extract_status_from_output(test_output) - with open(id_dir / "hw_status.txt", "w") as status_file: - status_file.write(status) - - print( - f"Results saved for job {job_id} in {id_dir.name}/output.txt and status in hw_status.txt" - ) - - except subprocess.CalledProcessError as e: - print(f"Error fetching results for job {job_id}: {e.stderr}") - except json.JSONDecodeError: - print(f"Error decoding JSON for job {job_id} results.") - - -def monitor_jobs(remaining_jobs, poll_interval): - """Monitor jobs until all are completed, fetching results as jobs finish.""" - while remaining_jobs: - for job_id, id_dir in list(remaining_jobs.items()): - job_status = check_job_status(job_id) - - if job_status: - print( - f"Status for job {job_id} (Canonical ID: {id_dir.name}): {job_status}" - ) - - # Retrieve results if the job is complete - if job_status == "complete": - retrieve_job_results(job_id, id_dir) - del remaining_jobs[job_id] - - # Wait before the next round of checks if there are still jobs left - if remaining_jobs: - print(f"Waiting {poll_interval} seconds before checking again...") - sleep(poll_interval) - - print("All jobs complete and results retrieved.") - - -def main(): - """Main function to load jobs and monitor their status.""" - args = parse_arguments() - remaining_jobs = load_jobs(args.output_dir) - monitor_jobs(remaining_jobs, args.poll_interval) - - -if __name__ == "__main__": - main() diff --git a/scripts/tf-tests/run-jobs.py b/scripts/tf-tests/run-jobs.py index 17611991..2244587a 100755 --- a/scripts/tf-tests/run-jobs.py +++ b/scripts/tf-tests/run-jobs.py @@ -1,12 +1,19 @@ #!/usr/bin/env python3 +import re +import argparse import subprocess +import json +import logging +from time import sleep from pathlib import Path -import argparse +from typing import Dict, Optional def parse_arguments(): - parser = argparse.ArgumentParser(description="Submit jobs to testflinger") + parser = argparse.ArgumentParser( + description="Submit jobs and monitor their status on Testflinger." + ) parser.add_argument( "--machines-file", type=Path, @@ -25,36 +32,48 @@ def parse_arguments(): default="job_outputs", help="Path to job outputs directory", ) + parser.add_argument( + "--poll-interval", + type=int, + default=30, + help="Time delay between status checks in seconds", + ) + group = parser.add_mutually_exclusive_group() + group.add_argument( + "--send-jobs", + action="store_true", + help="Only submit jobs without monitoring their statuses", + ) + group.add_argument( + "--check-status", + action="store_true", + help="Only check job statuses without submitting new jobs", + ) return parser.parse_args() -def load_canonical_ids(filename): +# Job Submission Functions +def load_canonical_ids(filename: Path) -> list: """Reads the Canonical IDs from a file.""" - with open(filename, "r") as file: + with open(filename, "r", encoding="utf8") as file: return file.read().strip().splitlines() -def prepare_output_directory(directory): - """Creates the main output directory if it doesn't exist.""" - directory.mkdir(exist_ok=True) - - -def create_job_yaml(template_file, canonical_id): +def create_job_yaml(template_file: Path, canonical_id: str) -> str: """Creates a modified job YAML for a specific Canonical ID.""" - with open(template_file, "r") as file: + with open(template_file, "r", encoding="utf8") as file: job_yaml = file.read() return job_yaml.replace("$CANONICAL_ID", canonical_id) -def write_temp_job_file(job_yaml, output_dir, canonical_id): +def write_temp_job_file(job_yaml: str, output_dir: Path, canonical_id: str) -> Path: """Writes the modified job YAML to a temporary file.""" temp_job_file = output_dir / f"{canonical_id}_tf-job.yaml" - with open(temp_job_file, "w") as file: - file.write(job_yaml) + temp_job_file.write_text(job_yaml) return temp_job_file -def submit_job(temp_job_file, canonical_id): +def submit_job(temp_job_file: Path, canonical_id: str) -> Optional[str]: """Submits the job and returns the job UUID.""" try: result = subprocess.run( @@ -66,45 +85,134 @@ def submit_job(temp_job_file, canonical_id): for line in result.stdout.splitlines(): if line.startswith("job_id:"): return line.split(": ")[1].strip() - print(f"Failed to retrieve job_id for {canonical_id}") + logging.warning("Failed to retrieve job_id for %s", canonical_id) except subprocess.CalledProcessError as e: - print(f"Error submitting job for {canonical_id}: {e.stderr}") + logging.error("Error submitting job for %s: %s", canonical_id, e.stderr) return None -def save_job_uuid(job_uuid, output_dir, canonical_id): +def save_job_uuid(job_uuid: str, output_dir: Path, canonical_id: str): """Creates a directory for the Canonical ID and saves the job UUID.""" id_dir = output_dir / canonical_id id_dir.mkdir(exist_ok=True) - with open(id_dir / "tf_job_id.txt", "w") as file: - file.write(job_uuid) - print(f"Job submitted successfully for {canonical_id} with job_id: {job_uuid}") - print(f"TF URL: https://testflinger.canonical.com/jobs/{job_uuid}") + (id_dir / "tf_job_id.txt").write_text(job_uuid) + logging.info("Job submitted for %s with job_id: %s", canonical_id, job_uuid) -def clean_temp_file(temp_job_file): - """Deletes the temporary job YAML file.""" - temp_job_file.unlink() +def submit_all_jobs( + machines_file: Path, template_file: Path, output_dir: Path +) -> Dict[str, Path]: + """Submit all jobs for the given machines.""" + canonical_ids = load_canonical_ids(machines_file) + job_ids = {} + for canonical_id in canonical_ids: + job_yaml = create_job_yaml(template_file, canonical_id) + temp_job_file = write_temp_job_file(job_yaml, output_dir, canonical_id) + job_uuid = submit_job(temp_job_file, canonical_id) -def main(): - """Main function to execute job submission workflow.""" - args = parse_arguments() + if job_uuid: + save_job_uuid(job_uuid, output_dir, canonical_id) + job_ids[job_uuid] = output_dir / canonical_id - # Load canonical IDs - canonical_ids = load_canonical_ids(args.machines_file) - prepare_output_directory(args.output_dir) + temp_job_file.unlink() # Clean up temporary YAML file - # Submit each job - for canonical_id in canonical_ids: - job_yaml = create_job_yaml(args.template_file, canonical_id) - temp_job_file = write_temp_job_file(job_yaml, args.output_dir, canonical_id) + return job_ids + + +# Job Monitoring Functions +def load_jobs(output_dir: Path) -> Dict[str, Path]: + """Load job IDs and directories from the job outputs directory.""" + jobs = {} + for id_dir in output_dir.iterdir(): + if id_dir.is_dir(): + job_id_file = id_dir / "tf_job_id.txt" + if job_id_file.exists(): + job_id = job_id_file.read_text().strip() + jobs[job_id] = id_dir + return jobs - job_uuid = submit_job(temp_job_file, canonical_id) - if job_uuid: - save_job_uuid(job_uuid, args.output_dir, canonical_id) - clean_temp_file(temp_job_file) +def check_job_status(job_id: str) -> Optional[str]: + """Check the status of a job by its job ID.""" + try: + result = subprocess.run( + ["testflinger", "status", job_id], + capture_output=True, + text=True, + check=True, + ) + return result.stdout.strip() + except subprocess.CalledProcessError as e: + logging.error("Error checking status for job %s: %s", job_id, e.stderr) + return None + + +def extract_status_from_output(test_output): + """Extracts the status value from the test output JSON.""" + match_ = re.search(r'"status":\s*"([^"]+)"', test_output) + return match_.group(1) if match_ else "Unknown" + + +def retrieve_job_results(job_id: str, id_dir: Path): + """Retrieve and save the results of a completed job.""" + try: + results_result = subprocess.run( + ["testflinger-cli", "results", job_id], + capture_output=True, + text=True, + check=True, + ) + results_data = json.loads(results_result.stdout) + test_output = results_data.get("test_output", "") + (id_dir / "output.txt").write_text(test_output) + status = extract_status_from_output(test_output) + (id_dir / "hw_status.txt").write_text(status) + logging.info("Results and status saved for job %s in %s", job_id, id_dir) + except (subprocess.CalledProcessError, json.JSONDecodeError) as e: + logging.error("Error fetching results for job %s: %s", job_id, str(e)) + + +def monitor_jobs(remaining_jobs: Dict[str, Path], poll_interval: int): + """Monitor jobs until all are completed, fetching results as jobs finish.""" + while remaining_jobs: + for job_id, id_dir in list(remaining_jobs.items()): + job_status = check_job_status(job_id) + logging.info( + "Status for job %s (Canonical ID: %s): %s", + job_id, + id_dir.name, + job_status, + ) + if job_status == "cancelled": + logging.error("The job %s got cancelled.", job_id) + del remaining_jobs[job_id] + if job_status == "complete": + retrieve_job_results(job_id, id_dir) + del remaining_jobs[job_id] + + if remaining_jobs: + logging.info("Waiting %d seconds before checking again...", poll_interval) + sleep(poll_interval) + + logging.info("All jobs complete and results retrieved.") + + +def main(): + logging.basicConfig(level=logging.INFO, format="%(message)s") + args = parse_arguments() + args.output_dir.mkdir(exist_ok=True) + + if args.send_jobs: + submit_all_jobs(args.machines_file, args.template_file, args.output_dir) + elif args.check_status: + remaining_jobs = load_jobs(args.output_dir) + monitor_jobs(remaining_jobs, args.poll_interval) + else: + job_ids = submit_all_jobs( + args.machines_file, args.template_file, args.output_dir + ) + monitor_jobs(job_ids, args.poll_interval) if __name__ == "__main__":