Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Project check #158

Merged
merged 6 commits into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions src/jobflow_remote/cli/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,14 @@ def check(
help="Print the errors at the end of the checks",
),
] = False,
full: Annotated[
bool,
typer.Option(
"--full",
davidwaroquiers marked this conversation as resolved.
Show resolved Hide resolved
"-f",
help="Perform a full check",
),
] = False,
) -> None:
"""Check that the connection to the different elements of the projects are working."""
check_incompatible_opt({"jobstore": jobstore, "queue": queue, "worker": worker})
Expand All @@ -183,6 +191,7 @@ def check(
workers_to_test = [worker]

tick = "[bold green]✓[/] "
tick_warn = "[bold yellow]✓[/] "
cross = "[bold red]x[/] "
errors = []
with loading_spinner(processing=False) as progress:
Expand All @@ -192,12 +201,18 @@ def check(
worker_to_test = project.workers[worker_name]
if worker_to_test.get_host().interactive_login:
with hide_progress(progress):
err = check_worker(worker_to_test)
err, worker_warn = check_worker(worker_to_test, full_check=full)
else:
err = check_worker(worker_to_test)
err, worker_warn = check_worker(worker_to_test, full_check=full)
header = tick
# At the moment the check_worker should return either an error or a
# warning. The code below also deals with the case where both are
# returned in the future.
if worker_warn:
errors.append((f"Worker {worker_name} warning ", worker_warn))
header = tick_warn
if err:
errors.append((f"Worker {worker_name}", err))
errors.append((f"Worker {worker_name} ", err))
header = cross
progress.print(Text.from_markup(header + f"Worker {worker_name}"))

Expand Down
84 changes: 79 additions & 5 deletions src/jobflow_remote/config/helper.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import importlib.metadata
import json
import logging
import traceback
from typing import TYPE_CHECKING
Expand Down Expand Up @@ -150,32 +152,43 @@ def _check_workdir(worker: WorkerBase, host: BaseHost) -> str | None:
host.execute(f"rm {str(canary_file)!r}")


def check_worker(worker: WorkerBase) -> str | None:
def check_worker(
worker: WorkerBase, full_check: bool = False
) -> tuple[str | None, str | None]:
"""Check that a connection to the configured worker can be made."""
host = worker.get_host()
worker_warn = None
try:
host.connect()
host_error = host.test()
if host_error:
return host_error
return host_error, None

from jobflow_remote.remote.queue import QueueManager

qm = QueueManager(scheduler_io=worker.get_scheduler_io(), host=host)
qm.get_jobs_list()

_check_workdir(worker=worker, host=host)
workdir_err = _check_workdir(worker=worker, host=host)
if workdir_err:
return workdir_err, None

# don't perform the environment check, as they will be equivalent
if worker.type != "local":
worker_warn = _check_environment(
worker=worker, host=host, full_check=full_check
)

except Exception:
exc = traceback.format_exc()
return f"Error while testing worker:\n {exc}"
return f"Error while testing worker:\n {exc}", worker_warn
finally:
try:
host.close()
except Exception:
logger.warning(f"error while closing connection to host {host}")

return None
return None, worker_warn


def _check_store(store: Store) -> str | None:
Expand Down Expand Up @@ -206,3 +219,64 @@ def check_jobstore(jobstore: JobStore) -> str | None:
if err:
return f"Error while checking additional store {store_name}:\n{err}"
return None


def _check_environment(
worker: WorkerBase, host: BaseHost, full_check: bool = False
) -> str | None:
"""Check that the worker has a python environment with the same versions of libraries.

Parameters
----------
host: A connected host.
full_check: Whether to check the entire environment and not just jobflow and jobflow-remote.

Returns
-------
str | None
A message describing the environment mismatches. None if no mismatch is found.
"""
installed_packages = importlib.metadata.distributions()
local_package_versions = {
package.metadata["Name"]: package.version for package in installed_packages
}
cmd = "pip list --format=json"
if worker.pre_run:
cmd = "; ".join(worker.pre_run.strip().splitlines()) + "; " + cmd

stdout, stderr, errcode = host.execute(cmd)
if errcode != 0:
return f"Error while checking the compatibility of the environments: {stderr}"
host_package_versions = {
package_dict["name"]: package_dict["version"]
for package_dict in json.loads(stdout)
}
if full_check:
packages_to_check = list(local_package_versions.keys())
else:
packages_to_check = ["jobflow", "jobflow-remote"]
missing = []
mismatch = []
for package in packages_to_check:
if package not in host_package_versions:
missing.append((package, local_package_versions[package]))
continue
if local_package_versions[package] != host_package_versions[package]:
mismatch.append(
(
package,
local_package_versions[package],
host_package_versions[package],
)
)
msg = None
if mismatch or missing:
msg = "Note: inconsistencies may be due to the proper python environment not being correctly loaded.\n"
if missing:
missing_str = [f"{m[0]} - {m[1]}" for m in missing]
msg += f"Missing packages: {', '.join(missing_str)}. "
if mismatch:
mismatch_str = [f"{m[0]} - {m[1]} vs {m[2]}" for m in mismatch]
msg += f"Mismatching versions: {', '.join(mismatch_str)}"

return msg
73 changes: 73 additions & 0 deletions tests/db/cli/test_project.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,79 @@ def test_check(job_controller) -> None:
run_check_cli(["project", "check"], required_out=output)


def test_check_fail(job_controller, monkeypatch, tmp_dir) -> None:
import json
import os

from maggma.stores.mongolike import MongoStore
from monty.serialization import dumpfn

from jobflow_remote import SETTINGS
from jobflow_remote.config import helper
from jobflow_remote.remote.host.remote import RemoteHost
from jobflow_remote.remote.queue import QueueManager
from jobflow_remote.testing.cli import run_check_cli

def return_none(*args, **kwargs):
return None

def exec_jobflow_version(*args, **kwargs):
d = [
{"name": "jobflow", "version": "0.1.0"},
{"name": "jobflow-remote", "version": "0.1.0"},
]
return json.dumps(d), "", 0

# change project directory and test options there
with monkeypatch.context() as m:
m.setattr(SETTINGS, "projects_folder", os.getcwd())
m.setattr(SETTINGS, "project", "testtest")
run_check_cli(["project", "list"], required_out="No project available in")

# create a project with a fake remote worker
worker_dict = {
"scheduler_type": "shell",
"work_dir": "/fake/path",
"type": "remote",
"host": "fake_host",
"timeout_execute": 1,
}
queue_dict = {"store": MongoStore("xxx", "yyy").as_dict()}
dumpfn(
{
"name": "testtest",
"workers": {"fake_remote_worker": worker_dict},
"queue": queue_dict,
},
"testtest.yaml",
)

# project check fails as it cannot connect
err_required = ["Errors:", "x Worker fake_remote_worker"]
run_check_cli(
["project", "check", "-e", "-w", "fake_remote_worker"],
required_out=err_required,
)

# mock all the functions to make the check succeed, except the mismatching jobflow versions
m.setattr(RemoteHost, "connect", return_none)
m.setattr(RemoteHost, "test", return_none)
m.setattr(RemoteHost, "write_text_file", return_none)
m.setattr(RemoteHost, "execute", exec_jobflow_version)
m.setattr(QueueManager, "get_jobs_list", return_none)
m.setattr(helper, "_check_workdir", return_none)
warn_required = [
"✓ Worker fake_remote_worker",
"Errors:",
"Mismatching versions: jobflow",
"jobflow-remote",
]
run_check_cli(
["project", "check", "-e", "-w", "fake_remote_worker"],
required_out=warn_required,
)


def test_remove(job_controller, random_project_name, monkeypatch, tmp_dir) -> None:
import os

Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ def test_project_check(job_controller, capsys) -> None:
"✓ Jobstore",
"✓ Queue store",
]
# Here there will be "errors" reported as there likely be a mismatch of the
# jobflow-remote version between the local environment and the one in the
# container
run_check_cli(["project", "check", "-e"], required_out=expected)


Expand Down