From 85512e967929c47934f08f2e77d4b6bd472b47e3 Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Tue, 19 Dec 2023 11:41:36 -0600 Subject: [PATCH 01/13] trim down `ewms_sidecar.py` --- ewms_sidecar/ewms_sidecar.py | 118 ++++++++--------------------------- 1 file changed, 26 insertions(+), 92 deletions(-) diff --git a/ewms_sidecar/ewms_sidecar.py b/ewms_sidecar/ewms_sidecar.py index 37997b51..c49b79f6 100644 --- a/ewms_sidecar/ewms_sidecar.py +++ b/ewms_sidecar/ewms_sidecar.py @@ -1,4 +1,4 @@ -"""The central module.""" +"""The EWMS Sidecar.""" import argparse @@ -7,14 +7,14 @@ from wipac_dev_tools import argparse_tools, logging_tools -from . import condor, k8s +from . import condor from .config import ENV, LOGGER def main() -> None: """Main.""" parser = argparse.ArgumentParser( - description="Manage Skymap Scanner client workers", + description="Handle EWMS requests beside a Skymap Scanner central server", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) @@ -24,32 +24,8 @@ def main() -> None: help="the uuid for the cluster", ) - # orchestrator - orch_subparsers = parser.add_subparsers( - required=True, - dest="orchestrator", - help="the resource orchestration tool to use for worker scheduling", - ) - OrchestratorArgs.condor( - orch_condor_parser := orch_subparsers.add_parser( - "condor", help="orchestrate with HTCondor" - ) - ) - OrchestratorArgs.k8s( - orch_k8s_parser := orch_subparsers.add_parser( - "k8s", help="orchestrate with Kubernetes" - ) - ) - - # action -- add sub-parser to each sub-parser (can't add multiple sub-parsers) - for p in [orch_condor_parser, orch_k8s_parser]: - act_subparsers = p.add_subparsers( - required=True, - dest="action", - help="the action to perform on the worker cluster", - ) - ActionArgs.starter(act_subparsers.add_parser("start", help="start workers")) - ActionArgs.stopper(act_subparsers.add_parser("stop", help="stop workers")) + SidecarArgs.condor(parser) + SidecarArgs.starter(parser) # parse args & set up logging args = parser.parse_args() @@ -63,60 +39,27 @@ def main() -> None: logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING") # Go! - match args.orchestrator: - case "condor": - condor.act(args) - case "k8s": - k8s.act(args) - case other: - raise RuntimeError(f"Orchestrator not supported: {other}") + condor.act(args) -class OrchestratorArgs: +class SidecarArgs: @staticmethod - def condor(sub_parser: argparse.ArgumentParser) -> None: - """Add args to subparser.""" - sub_parser.add_argument( + def condor(parser: argparse.ArgumentParser) -> None: + """Add args to parser.""" + parser.add_argument( "--collector", default="", help="the full URL address of the HTCondor collector server. Ex: foo-bar.icecube.wisc.edu", ) - sub_parser.add_argument( + parser.add_argument( "--schedd", default="", help="the full DNS name of the HTCondor Schedd server. Ex: baz.icecube.wisc.edu", ) @staticmethod - def k8s(sub_parser: argparse.ArgumentParser) -> None: - """Add args to subparser.""" - sub_parser.add_argument( - "--host", - required=True, - help="the host server address to connect to for running workers", - ) - sub_parser.add_argument( - "--namespace", - required=True, - help="the k8s namespace to use for running workers", - ) - sub_parser.add_argument( - "--cpu-arch", - default="x64", - help="which CPU architecture to use for running workers", - ) - sub_parser.add_argument( - "--job-config-stub", - type=Path, - default=Path("resources/worker_k8s_job_stub.json"), - help="worker k8s job config file to dynamically complete, then run (json)", - ) - - -class ActionArgs: - @staticmethod - def starter(sub_parser: argparse.ArgumentParser) -> None: - """Add args to subparser.""" + def starter(parser: argparse.ArgumentParser) -> None: + """Add args to parser.""" def wait_for_file(waitee: Path, wait_time: int) -> Path: """Wait for `waitee` to exist, then return fullly-resolved path.""" @@ -133,13 +76,13 @@ def wait_for_file(waitee: Path, wait_time: int) -> Path: return waitee.resolve() # helper args - sub_parser.add_argument( + parser.add_argument( "--dryrun", default=False, action="store_true", help="does everything except submitting the worker(s)", ) - sub_parser.add_argument( + parser.add_argument( "--spool", default=False, action="store_true", @@ -147,44 +90,44 @@ def wait_for_file(waitee: Path, wait_time: int) -> Path: ) # worker args - sub_parser.add_argument( + parser.add_argument( "--worker-memory-bytes", required=True, type=int, help="amount of worker memory (bytes)", ) - sub_parser.add_argument( + parser.add_argument( "--worker-disk-bytes", required=True, type=int, help="amount of worker disk (bytes)", ) - sub_parser.add_argument( + parser.add_argument( "--n-cores", default=1, type=int, help="number of cores per worker", ) - sub_parser.add_argument( + parser.add_argument( "--n-workers", required=True, type=int, help="number of worker to start", ) - sub_parser.add_argument( + parser.add_argument( "--max-worker-runtime", required=True, type=int, - help="how long each worker is allowed to run -- condor only", # TODO - set for k8s? + help="how long each worker is allowed to run", ) - sub_parser.add_argument( + parser.add_argument( "--priority", required=True, - help="relative priority of this job/jobs -- condor only", # TODO - set for k8s? + help="relative priority of this job/jobs", ) # client args - sub_parser.add_argument( + parser.add_argument( "--client-args", required=False, nargs="*", @@ -195,7 +138,7 @@ def wait_for_file(waitee: Path, wait_time: int) -> Path: ), help="n 'key:value' pairs containing the python CL arguments to pass to skymap_scanner.client", ) - sub_parser.add_argument( + parser.add_argument( "--client-startup-json", help="The 'startup.json' file to startup each client", type=lambda x: wait_for_file( @@ -203,17 +146,8 @@ def wait_for_file(waitee: Path, wait_time: int) -> Path: ENV.CLIENT_STARTER_WAIT_FOR_STARTUP_JSON, ), ) - sub_parser.add_argument( + parser.add_argument( "--image", required=True, help="a path or url to the workers' image", ) - - @staticmethod - def stopper(sub_parser: argparse.ArgumentParser) -> None: - """Add args to subparser.""" - sub_parser.add_argument( - "--cluster-id", - required=True, - help="the cluster id of the workers to be stopped/removed", - ) From 5e4c5ca9ce33ae1e45577bac280a58933605001a Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Tue, 19 Dec 2023 11:45:52 -0600 Subject: [PATCH 02/13] trim `act.py` --- ewms_sidecar/__main__.py | 6 +- ewms_sidecar/condor/act.py | 125 +++++++++++++++++-------------------- 2 files changed, 60 insertions(+), 71 deletions(-) diff --git a/ewms_sidecar/__main__.py b/ewms_sidecar/__main__.py index a84f189f..8e48c21d 100644 --- a/ewms_sidecar/__main__.py +++ b/ewms_sidecar/__main__.py @@ -1,7 +1,7 @@ -"""Entry-point to start up clientmanager service.""" +"""Entry-point to start up EWMS Sidecar.""" -from . import clientmanager, config +from . import config, ewms_sidecar if __name__ == "__main__": - clientmanager.main() + ewms_sidecar.main() config.LOGGER.info("Done.") diff --git a/ewms_sidecar/condor/act.py b/ewms_sidecar/condor/act.py index 5a8d3ab9..91f04fb1 100644 --- a/ewms_sidecar/condor/act.py +++ b/ewms_sidecar/condor/act.py @@ -7,7 +7,7 @@ from .. import utils from ..config import ENV, LOGGER -from . import condor_tools, starter, stopper, watcher +from . import condor_tools, starter, watcher def act(args: argparse.Namespace) -> None: @@ -26,70 +26,59 @@ def act(args: argparse.Namespace) -> None: def _act(args: argparse.Namespace, schedd_obj: htcondor.Schedd) -> None: - match args.action: - case "start": - LOGGER.info( - f"Starting {args.n_workers} Skymap Scanner client workers on {args.collector} / {args.schedd}" - ) - # make connections -- do now so we don't have any surprises downstream - skydriver_rc = utils.connect_to_skydriver() - # start - submit_dict = starter.prep( - spool=args.spool, - # starter CL args -- worker - worker_memory_bytes=args.worker_memory_bytes, - worker_disk_bytes=args.worker_disk_bytes, - n_cores=args.n_cores, - max_worker_runtime=args.max_worker_runtime, - priority=args.priority, - # starter CL args -- client - client_args=args.client_args, - client_startup_json_s3=utils.s3ify(args.client_startup_json), - image=args.image, - ) - # final checks - if args.dryrun: - LOGGER.critical("Script Aborted: dryrun enabled") - return - if utils.skydriver_aborted_scan(skydriver_rc): - LOGGER.critical("Script Aborted: SkyDriver aborted scan") - return - # start - submit_result_obj = starter.start( - schedd_obj=schedd_obj, - n_workers=args.n_workers, - submit_dict=submit_dict, - spool=args.spool, - ) - # report to SkyDriver - skydriver_cluster_obj = dict( - orchestrator="condor", - location={ - "collector": args.collector, - "schedd": args.schedd, - }, - uuid=args.uuid, - cluster_id=submit_result_obj.cluster(), - n_workers=submit_result_obj.num_procs(), - starter_info=submit_dict, - ) - utils.update_skydriver(skydriver_rc, **skydriver_cluster_obj) - LOGGER.info("Sent cluster info to SkyDriver") - watcher.watch( - args.collector, - args.schedd, - submit_result_obj.cluster(), - schedd_obj, - submit_result_obj.num_procs(), - skydriver_rc, - skydriver_cluster_obj, - ) - case "stop": - stopper.stop( - args.collector, - args.schedd, - args.cluster_id, - schedd_obj, - ) - case _: - raise RuntimeError(f"Unknown action: {args.action}") + LOGGER.info( + f"Starting {args.n_workers} Skymap Scanner client workers on {args.collector} / {args.schedd}" + ) + # make connections -- do now so we don't have any surprises downstream + skydriver_rc = utils.connect_to_skydriver() + # start + submit_dict = starter.prep( + spool=args.spool, + # starter CL args -- worker + worker_memory_bytes=args.worker_memory_bytes, + worker_disk_bytes=args.worker_disk_bytes, + n_cores=args.n_cores, + max_worker_runtime=args.max_worker_runtime, + priority=args.priority, + # starter CL args -- client + client_args=args.client_args, + client_startup_json_s3=utils.s3ify(args.client_startup_json), + image=args.image, + ) + # final checks + if args.dryrun: + LOGGER.critical("Script Aborted: dryrun enabled") + return + if utils.skydriver_aborted_scan(skydriver_rc): + LOGGER.critical("Script Aborted: SkyDriver aborted scan") + return + # start + submit_result_obj = starter.start( + schedd_obj=schedd_obj, + n_workers=args.n_workers, + submit_dict=submit_dict, + spool=args.spool, + ) + # report to SkyDriver + skydriver_cluster_obj = dict( + orchestrator="condor", + location={ + "collector": args.collector, + "schedd": args.schedd, + }, + uuid=args.uuid, + cluster_id=submit_result_obj.cluster(), + n_workers=submit_result_obj.num_procs(), + starter_info=submit_dict, + ) + utils.update_skydriver(skydriver_rc, **skydriver_cluster_obj) + LOGGER.info("Sent cluster info to SkyDriver") + watcher.watch( + args.collector, + args.schedd, + submit_result_obj.cluster(), + schedd_obj, + submit_result_obj.num_procs(), + skydriver_rc, + skydriver_cluster_obj, + ) From 1dd07ac764d0d6df29926c8cba836f4bcbf02be5 Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Tue, 19 Dec 2023 12:05:23 -0600 Subject: [PATCH 03/13] wording --- ewms_sidecar/ewms_sidecar.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ewms_sidecar/ewms_sidecar.py b/ewms_sidecar/ewms_sidecar.py index c49b79f6..ac9854f0 100644 --- a/ewms_sidecar/ewms_sidecar.py +++ b/ewms_sidecar/ewms_sidecar.py @@ -14,7 +14,7 @@ def main() -> None: """Main.""" parser = argparse.ArgumentParser( - description="Handle EWMS requests beside a Skymap Scanner central server", + description="Handle EWMS requests adjacent to a Skymap Scanner central server", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) From 082b7f84c13b454766ff25ef1410109b42f653cd Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Tue, 19 Dec 2023 12:10:37 -0600 Subject: [PATCH 04/13] add `methos` pos var --- ewms_sidecar/ewms_sidecar.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/ewms_sidecar/ewms_sidecar.py b/ewms_sidecar/ewms_sidecar.py index ac9854f0..8ee88719 100644 --- a/ewms_sidecar/ewms_sidecar.py +++ b/ewms_sidecar/ewms_sidecar.py @@ -18,6 +18,14 @@ def main() -> None: formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) + # method + parser.add_argument( + required=True, + dest="method", + help="how to start up the jobs", # TODO - remove once EWMS is full-time + choices=["remote-condor"], # , "ewms"], + ) + parser.add_argument( "--uuid", required=True, @@ -39,7 +47,13 @@ def main() -> None: logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING") # Go! - condor.act(args) + match args.method: + case "remote-condor": + condor.act(args) + # case "ewms": + # ewms.act(args) + case other: + raise RuntimeError(f"method not supported: {other}") class SidecarArgs: From 7359573273c14ee79d60c162ceb97dee7e07e078 Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Tue, 19 Dec 2023 15:07:39 -0600 Subject: [PATCH 05/13] adapt k8s args for testing with ewms_sidecar --- skydriver/k8s/scanner_instance.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/skydriver/k8s/scanner_instance.py b/skydriver/k8s/scanner_instance.py index cc6d1647..c0c4659a 100644 --- a/skydriver/k8s/scanner_instance.py +++ b/skydriver/k8s/scanner_instance.py @@ -127,6 +127,7 @@ def __init__( debug_mode=debug_mode, ), args=self.get_tms_starter_args( + starter_exc="clientmanager", # TODO - handle ewms_sidecar # TODO - remove once tested in prod common_space_volume_path=common_space_volume_path, docker_tag=docker_tag, worker_memory_bytes=worker_memory_bytes, @@ -183,6 +184,7 @@ def get_scanner_server_args( @staticmethod def get_tms_starter_args( + starter_exc: str, # TODO - remove once tested in prod common_space_volume_path: Path, docker_tag: str, worker_memory_bytes: int, @@ -234,6 +236,13 @@ def get_tms_starter_args( if DebugMode.CLIENT_LOGS in debug_mode: args += " --spool " + # ADAPT args for EWMS Sidecar + # TODO - remove once tested in prod + if starter_exc == "ewms_sidecar" and request_cluster.orchestrator == "condor": + args.replace("clientmanager", "ewms_sidecar remote-condor") + args.replace(" condor ", " ") + args.replace(" start ", " ") + return args.split() @staticmethod From f51878325e81e71ca803b087ba50c9c4143a7a0f Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Tue, 19 Dec 2023 16:04:39 -0600 Subject: [PATCH 06/13] add `unstable_testing__starter_exc` key under `classifiers` for testing --- skydriver/rest_handlers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/skydriver/rest_handlers.py b/skydriver/rest_handlers.py index c40e7889..c2ecf9bf 100644 --- a/skydriver/rest_handlers.py +++ b/skydriver/rest_handlers.py @@ -455,7 +455,10 @@ async def post(self) -> None: nsides=nsides, is_real_event=real_or_simulated_event in REAL_CHOICES, predictive_scanning_threshold=predictive_scanning_threshold, - # clientmanager + # cluster starter + starter_exc=str( # TODO - remove once tested in prod + classifiers.get("unstable_testing__starter_exc", "clientmanager") + ), request_clusters=request_clusters, worker_memory_bytes=worker_memory_bytes, worker_disk_bytes=worker_disk_bytes, From 9073a2ee6d943ea9256d120f071fca77722565a2 Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Tue, 19 Dec 2023 16:05:50 -0600 Subject: [PATCH 07/13] refactor `tms` & `clientmananger` to `cluster_starter` (no DB changes) --- skydriver/config.py | 12 +++--- skydriver/k8s/scanner_instance.py | 61 +++++++++++++-------------- skydriver/rest_handlers.py | 2 +- tests/integration/test_rest_routes.py | 30 +++++++------ 4 files changed, 52 insertions(+), 53 deletions(-) diff --git a/skydriver/config.py b/skydriver/config.py index 4f3f0812..2cca8812 100644 --- a/skydriver/config.py +++ b/skydriver/config.py @@ -24,11 +24,11 @@ DEFAULT_WORKER_DISK_BYTES: int = humanfriendly.parse_size("1GB") K8S_CONTAINER_MEMORY_DEFAULT_BYTES: int = humanfriendly.parse_size("64M") -K8S_CONTAINER_MEMORY_TMS_STOPPER_BYTES: int = humanfriendly.parse_size("256M") -K8S_CONTAINER_MEMORY_TMS_STARTER_BYTES: int = humanfriendly.parse_size("256M") +K8S_CONTAINER_MEMORY_CLUSTER_STOPPER_BYTES: int = humanfriendly.parse_size("256M") +K8S_CONTAINER_MEMORY_CLUSTER_STARTER_BYTES: int = humanfriendly.parse_size("256M") -TMS_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED = 1 * 60 * 60 -TMS_STOPPER_K8S_JOB_N_RETRIES = 6 +CLUSTER_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED = 1 * 60 * 60 +CLUSTER_STOPPER_K8S_JOB_N_RETRIES = 6 SCAN_MIN_PRIORITY_TO_START_NOW = 10 @@ -61,7 +61,7 @@ class EnvConfig: SCAN_BACKLOG_RUNNER_DELAY: int = 5 * 60 SCAN_BACKLOG_PENDING_ENTRY_TTL_REVIVE: int = 5 * 60 # entry is revived after N secs - CLIENTMANAGER_IMAGE_WITH_TAG: str = "" + THIS_IMAGE_WITH_TAG: str = "" # k8s K8S_NAMESPACE: str = "" @@ -96,7 +96,7 @@ def __post_init__(self) -> None: object.__setattr__(self, "LOG_LEVEL", self.LOG_LEVEL.upper()) # b/c frozen # check missing env var(s) - if not self.CLIENTMANAGER_IMAGE_WITH_TAG: + if not self.THIS_IMAGE_WITH_TAG: raise RuntimeError( "Missing required environment variable: 'CLIENTMANAGER_IMAGE_WITH_TAG'" ) diff --git a/skydriver/k8s/scanner_instance.py b/skydriver/k8s/scanner_instance.py index c0c4659a..672861a3 100644 --- a/skydriver/k8s/scanner_instance.py +++ b/skydriver/k8s/scanner_instance.py @@ -11,12 +11,12 @@ from .. import images from ..config import ( + CLUSTER_STOPPER_K8S_JOB_N_RETRIES, + CLUSTER_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED, ENV, - K8S_CONTAINER_MEMORY_TMS_STARTER_BYTES, - K8S_CONTAINER_MEMORY_TMS_STOPPER_BYTES, + K8S_CONTAINER_MEMORY_CLUSTER_STARTER_BYTES, + K8S_CONTAINER_MEMORY_CLUSTER_STOPPER_BYTES, LOGGER, - TMS_STOPPER_K8S_JOB_N_RETRIES, - TMS_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED, DebugMode, ) from ..database import schema @@ -32,7 +32,7 @@ def get_cluster_auth_v1envvars( return info["v1envvars"] # type: ignore[no-any-return] -def get_tms_s3_v1envvars() -> list[kubernetes.client.V1EnvVar]: +def get_cluster_starter_s3_v1envvars() -> list[kubernetes.client.V1EnvVar]: """Get the `V1EnvVar`s for TMS's S3 auth.""" return [ kubernetes.client.V1EnvVar( @@ -70,7 +70,8 @@ def __init__( nsides: dict[int, int], is_real_event: bool, predictive_scanning_threshold: float, - # tms + # cluster starter + starter_exc: str, # TODO - remove once tested in prod worker_memory_bytes: int, worker_disk_bytes: int, request_clusters: list[schema.Cluster], @@ -112,22 +113,22 @@ def __init__( ) self.env_dict["scanner_server"] = [e.to_dict() for e in scanner_server.env] - # CONTAINER(S): TMS Starter(s) + # CONTAINER(S): Cluster Starter(s) tms_starters = [] for i, cluster in enumerate(request_clusters): tms_starters.append( KubeAPITools.create_container( - f"tms-starter-{i}-{scan_id}", - ENV.CLIENTMANAGER_IMAGE_WITH_TAG, - env=self.make_tms_starter_v1envvars( + f"{starter_exc}-{i}-{scan_id}", # TODO - replace once tested in prod + ENV.THIS_IMAGE_WITH_TAG, + env=self.make_cluster_starter_v1envvars( rest_address=rest_address, scan_id=scan_id, cluster=cluster, max_pixel_reco_time=max_pixel_reco_time, debug_mode=debug_mode, ), - args=self.get_tms_starter_args( - starter_exc="clientmanager", # TODO - handle ewms_sidecar # TODO - remove once tested in prod + args=self.get_cluster_starter_args( + starter_exc=starter_exc, # TODO - remove once tested in prod common_space_volume_path=common_space_volume_path, docker_tag=docker_tag, worker_memory_bytes=worker_memory_bytes, @@ -139,10 +140,10 @@ def __init__( ), cpu=0.125, volumes={common_space_volume_path.name: common_space_volume_path}, - memory=K8S_CONTAINER_MEMORY_TMS_STARTER_BYTES, + memory=K8S_CONTAINER_MEMORY_CLUSTER_STARTER_BYTES, ) ) - self.tms_args_list = [" ".join(c.args) for c in tms_starters] + self.cluster_starter_args_list = [" ".join(c.args) for c in tms_starters] self.env_dict["tms_starters"] = [ [e.to_dict() for e in c.env] for c in tms_starters ] @@ -183,7 +184,7 @@ def get_scanner_server_args( return args @staticmethod - def get_tms_starter_args( + def get_cluster_starter_args( starter_exc: str, # TODO - remove once tested in prod common_space_volume_path: Path, docker_tag: str, @@ -194,11 +195,7 @@ def get_tms_starter_args( max_worker_runtime: int, priority: int, ) -> list[str]: - """Make the starter container args. - - This also includes any client args not added by the - clientmanager. - """ + """Make the starter container args.""" args = f"python -m clientmanager --uuid {str(uuid.uuid4().hex)}" match request_cluster.orchestrator: @@ -341,7 +338,7 @@ def make_skyscan_server_v1envvars( return env @staticmethod - def make_tms_starter_v1envvars( + def make_cluster_starter_v1envvars( rest_address: str, scan_id: str, cluster: schema.Cluster, @@ -352,13 +349,13 @@ def make_tms_starter_v1envvars( Also, get the secrets' keys & their values. """ - LOGGER.debug(f"making tms starter env vars for {scan_id=}") + LOGGER.debug(f"making cluster starter env vars for {scan_id=}") env = [] # 1. start w/ secrets # NOTE: the values come from an existing secret in the current namespace env.extend(get_cluster_auth_v1envvars(cluster)) - env.extend(get_tms_s3_v1envvars()) + env.extend(get_cluster_starter_s3_v1envvars()) # 2. add required env vars required = { @@ -464,12 +461,12 @@ def __init__( containers.append( KubeAPITools.create_container( - f"tms-stopper-{i}-{scan_id}", - ENV.CLIENTMANAGER_IMAGE_WITH_TAG, + f"cluster-stopper-{i}-{scan_id}", + ENV.THIS_IMAGE_WITH_TAG, cpu=0.125, env=get_cluster_auth_v1envvars(cluster), args=args.split(), - memory=K8S_CONTAINER_MEMORY_TMS_STOPPER_BYTES, + memory=K8S_CONTAINER_MEMORY_CLUSTER_STOPPER_BYTES, ) ) @@ -477,11 +474,11 @@ def __init__( self.worker_stopper_job_obj = None else: self.worker_stopper_job_obj = KubeAPITools.kube_create_job_object( - f"tms-stopper-{scan_id}", + f"cluster-stopper-{scan_id}", containers, ENV.K8S_NAMESPACE, - TMS_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED, - n_retries=TMS_STOPPER_K8S_JOB_N_RETRIES, + CLUSTER_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED, + n_retries=CLUSTER_STOPPER_K8S_JOB_N_RETRIES, ) def go(self) -> Any: @@ -490,9 +487,9 @@ def go(self) -> Any: # NOTE - we don't want to stop the first k8s job because its containers will stop themselves. # plus, 'K8S_TTL_SECONDS_AFTER_FINISHED' will allow logs & pod status to be retrieved for some time # - # stop first k8s job (server & tms starters) -- may not be instantaneous + # stop first k8s job (server & cluster starters) -- may not be instantaneous # LOGGER.info( - # f"requesting removal of Skymap Scanner Job (server & tms starters) -- {self.scan_id=}..." + # f"requesting removal of Skymap Scanner Job (server & cluster starters) -- {self.scan_id=}..." # ) # resp = self.k8s_batch_api.delete_namespaced_job( # name=SkymapScannerK8sWrapper.get_job_name(self.scan_id), @@ -507,7 +504,7 @@ def go(self) -> Any: # stop workers if self.worker_stopper_job_obj: - LOGGER.info(f"starting k8s TMS-STOPPER job for {self.scan_id=}") + LOGGER.info(f"starting k8s CLUSTER-STOPPER job for {self.scan_id=}") KubeAPITools.start_job(self.k8s_batch_api, self.worker_stopper_job_obj) else: LOGGER.info(f"no workers to stop for {self.scan_id=}") diff --git a/skydriver/rest_handlers.py b/skydriver/rest_handlers.py index c2ecf9bf..2951db5c 100644 --- a/skydriver/rest_handlers.py +++ b/skydriver/rest_handlers.py @@ -477,7 +477,7 @@ async def post(self) -> None: event_i3live_json_dict, scan_id, scanner_wrapper.scanner_server_args, - scanner_wrapper.tms_args_list, + scanner_wrapper.cluster_starter_args_list, from_dict(database.schema.EnvVars, scanner_wrapper.env_dict), classifiers, priority, diff --git a/tests/integration/test_rest_routes.py b/tests/integration/test_rest_routes.py index fe9cac50..da23249d 100644 --- a/tests/integration/test_rest_routes.py +++ b/tests/integration/test_rest_routes.py @@ -63,7 +63,9 @@ async def _launch_scan( - rc: RestClient, post_scan_body: dict, tms_args: list[str] + rc: RestClient, + post_scan_body: dict, + cluster_starter_args: list[str], ) -> dict: # launch scan launch_time = time.time() @@ -115,7 +117,7 @@ async def _launch_scan( # check args (avoid whitespace headaches...) assert resp["scanner_server_args"].split() == scanner_server_args.split() - for got_args, exp_args in zip(resp["ewms_task"]["tms_args"], tms_args): + for got_args, exp_args in zip(resp["ewms_task"]["tms_args"], cluster_starter_args): print(got_args, exp_args) for got, exp in zip(got_args.split(), exp_args.split()): print(got, exp) @@ -124,7 +126,7 @@ async def _launch_scan( else: assert got == exp assert len(got_args.split()) == len(exp_args.split()) - assert len(resp["ewms_task"]["tms_args"]) == len(tms_args) + assert len(resp["ewms_task"]["tms_args"]) == len(cluster_starter_args) # check env vars print(resp["ewms_task"]["env_vars"]) @@ -408,14 +410,14 @@ async def _server_reply_with_event_metadata(rc: RestClient, scan_id: str) -> Str return event_metadata -async def _clientmanager_reply( +async def _cluster_starter_reply( rc: RestClient, scan_id: str, cluster_name__n_workers: tuple[str, int], previous_clusters: list[StrDict], known_clusters: dict, ) -> StrDict: - # reply as the clientmanager with a new cluster + # reply as the cluster_starter with a new cluster cluster = dict( orchestrator=known_clusters[cluster_name__n_workers[0]]["orchestrator"], location=known_clusters[cluster_name__n_workers[0]]["location"], @@ -636,12 +638,12 @@ async def _delete_scan( assert [m["scan_id"] for m in resp["manifests"]] == [scan_id] -def get_tms_args( +def get_cluster_starter_args( clusters: list | dict, docker_tag_expected: str, known_clusters: dict, ) -> list[str]: - tms_args = [] + cluster_starter_args = [] for cluster in clusters if isinstance(clusters, list) else list(clusters.items()): orchestrator = known_clusters[cluster[0]]["orchestrator"] location = known_clusters[cluster[0]]["location"] @@ -650,7 +652,7 @@ def get_tms_args( if orchestrator == "condor" else f"icecube/skymap_scanner:{docker_tag_expected}" ) - tms_args += [ + cluster_starter_args += [ f"python -m clientmanager " f" --uuid {CLUSTER_ID_PLACEHOLDER} " f" {orchestrator} " @@ -666,7 +668,7 @@ def get_tms_args( f" --spool " ] - return tms_args + return cluster_starter_args ######################################################################################## @@ -717,7 +719,7 @@ async def test_00( "docker_tag": docker_tag_input, "cluster": clusters, }, - get_tms_args(clusters, docker_tag_expected, known_clusters), + get_cluster_starter_args(clusters, docker_tag_expected, known_clusters), ) scan_id = manifest["scan_id"] # follow-up query @@ -730,7 +732,7 @@ async def test_00( # INITIAL UPDATES # event_metadata = await _server_reply_with_event_metadata(rc, scan_id) - manifest = await _clientmanager_reply( + manifest = await _cluster_starter_reply( rc, scan_id, clusters[0] if isinstance(clusters, list) else list(clusters.items())[0], @@ -758,7 +760,7 @@ async def test_00( for cluster_name__n_workers in ( clusters[1:] if isinstance(clusters, list) else list(clusters.items())[1:] ): - manifest = await _clientmanager_reply( + manifest = await _cluster_starter_reply( rc, scan_id, cluster_name__n_workers, @@ -880,7 +882,7 @@ async def test_01__bad_data( manifest = await _launch_scan( rc, POST_SCAN_BODY_FOR_TEST_01, - get_tms_args( + get_cluster_starter_args( POST_SCAN_BODY_FOR_TEST_01["cluster"], # type: ignore[arg-type] os.environ["LATEST_TAG"], known_clusters, @@ -897,7 +899,7 @@ async def test_01__bad_data( # INITIAL UPDATES # event_metadata = await _server_reply_with_event_metadata(rc, scan_id) - manifest = await _clientmanager_reply( + manifest = await _cluster_starter_reply( rc, scan_id, ("foobar", random.randint(1, 10000)), From a874abc718f15ef297613cc73bf11016d98143ec Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Tue, 19 Dec 2023 16:08:47 -0600 Subject: [PATCH 08/13] typo --- skydriver/k8s/scanner_instance.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/skydriver/k8s/scanner_instance.py b/skydriver/k8s/scanner_instance.py index 672861a3..0a64bf36 100644 --- a/skydriver/k8s/scanner_instance.py +++ b/skydriver/k8s/scanner_instance.py @@ -504,7 +504,7 @@ def go(self) -> Any: # stop workers if self.worker_stopper_job_obj: - LOGGER.info(f"starting k8s CLUSTER-STOPPER job for {self.scan_id=}") + LOGGER.info(f"starting k8s CLUSTER-STOPPER job for {self.scan_id=}") KubeAPITools.start_job(self.k8s_batch_api, self.worker_stopper_job_obj) else: LOGGER.info(f"no workers to stop for {self.scan_id=}") From b13aa6b97e1038e72e4a83f9d576cc492c815802 Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Tue, 19 Dec 2023 16:11:34 -0600 Subject: [PATCH 09/13] fix ci --- .github/workflows/wipac-cicd.yml | 2 +- skydriver/config.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/wipac-cicd.yml b/.github/workflows/wipac-cicd.yml index 185397b9..12cf1bb9 100644 --- a/.github/workflows/wipac-cicd.yml +++ b/.github/workflows/wipac-cicd.yml @@ -4,7 +4,7 @@ on: [push] env: CI_TEST: 'yes' - CLIENTMANAGER_IMAGE_WITH_TAG: 'ghcr.io/wipacrepo/skydriver:latest' + THIS_IMAGE_WITH_TAG: 'ghcr.io/wipacrepo/skydriver:latest' EWMS_PILOT_TASK_TIMEOUT: 999 SCAN_BACKLOG_RUNNER_SHORT_DELAY: 1 SCAN_BACKLOG_RUNNER_DELAY: 1 diff --git a/skydriver/config.py b/skydriver/config.py index 2cca8812..8a25e6a3 100644 --- a/skydriver/config.py +++ b/skydriver/config.py @@ -98,7 +98,7 @@ def __post_init__(self) -> None: # check missing env var(s) if not self.THIS_IMAGE_WITH_TAG: raise RuntimeError( - "Missing required environment variable: 'CLIENTMANAGER_IMAGE_WITH_TAG'" + "Missing required environment variable: 'THIS_IMAGE_WITH_TAG'" ) if self.SCAN_BACKLOG_RUNNER_SHORT_DELAY > self.SCAN_BACKLOG_RUNNER_DELAY: From 30f3e360da62211e37a652f669bacb6820df7f29 Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Tue, 19 Dec 2023 16:19:56 -0600 Subject: [PATCH 10/13] revert test changes --- tests/integration/test_rest_routes.py | 30 +++++++++++++-------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/tests/integration/test_rest_routes.py b/tests/integration/test_rest_routes.py index da23249d..fe9cac50 100644 --- a/tests/integration/test_rest_routes.py +++ b/tests/integration/test_rest_routes.py @@ -63,9 +63,7 @@ async def _launch_scan( - rc: RestClient, - post_scan_body: dict, - cluster_starter_args: list[str], + rc: RestClient, post_scan_body: dict, tms_args: list[str] ) -> dict: # launch scan launch_time = time.time() @@ -117,7 +115,7 @@ async def _launch_scan( # check args (avoid whitespace headaches...) assert resp["scanner_server_args"].split() == scanner_server_args.split() - for got_args, exp_args in zip(resp["ewms_task"]["tms_args"], cluster_starter_args): + for got_args, exp_args in zip(resp["ewms_task"]["tms_args"], tms_args): print(got_args, exp_args) for got, exp in zip(got_args.split(), exp_args.split()): print(got, exp) @@ -126,7 +124,7 @@ async def _launch_scan( else: assert got == exp assert len(got_args.split()) == len(exp_args.split()) - assert len(resp["ewms_task"]["tms_args"]) == len(cluster_starter_args) + assert len(resp["ewms_task"]["tms_args"]) == len(tms_args) # check env vars print(resp["ewms_task"]["env_vars"]) @@ -410,14 +408,14 @@ async def _server_reply_with_event_metadata(rc: RestClient, scan_id: str) -> Str return event_metadata -async def _cluster_starter_reply( +async def _clientmanager_reply( rc: RestClient, scan_id: str, cluster_name__n_workers: tuple[str, int], previous_clusters: list[StrDict], known_clusters: dict, ) -> StrDict: - # reply as the cluster_starter with a new cluster + # reply as the clientmanager with a new cluster cluster = dict( orchestrator=known_clusters[cluster_name__n_workers[0]]["orchestrator"], location=known_clusters[cluster_name__n_workers[0]]["location"], @@ -638,12 +636,12 @@ async def _delete_scan( assert [m["scan_id"] for m in resp["manifests"]] == [scan_id] -def get_cluster_starter_args( +def get_tms_args( clusters: list | dict, docker_tag_expected: str, known_clusters: dict, ) -> list[str]: - cluster_starter_args = [] + tms_args = [] for cluster in clusters if isinstance(clusters, list) else list(clusters.items()): orchestrator = known_clusters[cluster[0]]["orchestrator"] location = known_clusters[cluster[0]]["location"] @@ -652,7 +650,7 @@ def get_cluster_starter_args( if orchestrator == "condor" else f"icecube/skymap_scanner:{docker_tag_expected}" ) - cluster_starter_args += [ + tms_args += [ f"python -m clientmanager " f" --uuid {CLUSTER_ID_PLACEHOLDER} " f" {orchestrator} " @@ -668,7 +666,7 @@ def get_cluster_starter_args( f" --spool " ] - return cluster_starter_args + return tms_args ######################################################################################## @@ -719,7 +717,7 @@ async def test_00( "docker_tag": docker_tag_input, "cluster": clusters, }, - get_cluster_starter_args(clusters, docker_tag_expected, known_clusters), + get_tms_args(clusters, docker_tag_expected, known_clusters), ) scan_id = manifest["scan_id"] # follow-up query @@ -732,7 +730,7 @@ async def test_00( # INITIAL UPDATES # event_metadata = await _server_reply_with_event_metadata(rc, scan_id) - manifest = await _cluster_starter_reply( + manifest = await _clientmanager_reply( rc, scan_id, clusters[0] if isinstance(clusters, list) else list(clusters.items())[0], @@ -760,7 +758,7 @@ async def test_00( for cluster_name__n_workers in ( clusters[1:] if isinstance(clusters, list) else list(clusters.items())[1:] ): - manifest = await _cluster_starter_reply( + manifest = await _clientmanager_reply( rc, scan_id, cluster_name__n_workers, @@ -882,7 +880,7 @@ async def test_01__bad_data( manifest = await _launch_scan( rc, POST_SCAN_BODY_FOR_TEST_01, - get_cluster_starter_args( + get_tms_args( POST_SCAN_BODY_FOR_TEST_01["cluster"], # type: ignore[arg-type] os.environ["LATEST_TAG"], known_clusters, @@ -899,7 +897,7 @@ async def test_01__bad_data( # INITIAL UPDATES # event_metadata = await _server_reply_with_event_metadata(rc, scan_id) - manifest = await _cluster_starter_reply( + manifest = await _clientmanager_reply( rc, scan_id, ("foobar", random.randint(1, 10000)), From 712857fb5768f55e481294fbfa99a20db5648b75 Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 20 Dec 2023 16:13:37 +0000 Subject: [PATCH 11/13] update dependencies*.log files(s) --- dependencies-from-Dockerfile.log | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/dependencies-from-Dockerfile.log b/dependencies-from-Dockerfile.log index 2dc2b65b..09bc3d1b 100644 --- a/dependencies-from-Dockerfile.log +++ b/dependencies-from-Dockerfile.log @@ -7,8 +7,8 @@ # pip freeze ######################################################################## backoff==2.2.1 -boto3==1.34.3 -botocore==1.34.3 +boto3==1.34.4 +botocore==1.34.4 cachetools==5.3.2 certifi==2023.11.17 cffi==1.16.0 @@ -75,15 +75,15 @@ pip==23.2.1 pipdeptree==2.13.1 setuptools==65.5.1 skydriver-clientmanager-ewms-sidecar -├── boto3 [required: Any, installed: 1.34.3] -│ ├── botocore [required: >=1.34.3,<1.35.0, installed: 1.34.3] +├── boto3 [required: Any, installed: 1.34.4] +│ ├── botocore [required: >=1.34.4,<1.35.0, installed: 1.34.4] │ │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1] │ │ ├── python-dateutil [required: >=2.1,<3.0.0, installed: 2.8.2] │ │ │ └── six [required: >=1.5, installed: 1.16.0] │ │ └── urllib3 [required: >=1.25.4,<2.1, installed: 1.26.18] │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1] │ └── s3transfer [required: >=0.9.0,<0.10.0, installed: 0.9.0] -│ └── botocore [required: >=1.33.2,<2.0a.0, installed: 1.34.3] +│ └── botocore [required: >=1.33.2,<2.0a.0, installed: 1.34.4] │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1] │ ├── python-dateutil [required: >=2.1,<3.0.0, installed: 2.8.2] │ │ └── six [required: >=1.5, installed: 1.16.0] From d14d7afd94767003702260e8602e701e7b52be1a Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Wed, 20 Dec 2023 10:25:05 -0600 Subject: [PATCH 12/13] fix ci env var --- .github/workflows/wipac-cicd.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/wipac-cicd.yml b/.github/workflows/wipac-cicd.yml index 12cf1bb9..1d7ed544 100644 --- a/.github/workflows/wipac-cicd.yml +++ b/.github/workflows/wipac-cicd.yml @@ -135,6 +135,7 @@ jobs: docker run --network="host" --rm -i --name test \ --env LATEST_TAG=$LATEST_TAG \ + --env THIS_IMAGE_WITH_TAG=$THIS_IMAGE_WITH_TAG \ $(env | grep '^SKYSCAN_' | awk '$0="--env "$0') \ $(env | grep '^EWMS_' | awk '$0="--env "$0') \ $(env | grep '^CLIENTMANAGER_' | awk '$0="--env "$0') \ From 24b9ece5df2defad5273abb902398d29d5953f73 Mon Sep 17 00:00:00 2001 From: Ric Evans Date: Wed, 20 Dec 2023 10:31:44 -0600 Subject: [PATCH 13/13] call it `direct-remote-condor` --- ewms_sidecar/ewms_sidecar.py | 4 ++-- skydriver/k8s/scanner_instance.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ewms_sidecar/ewms_sidecar.py b/ewms_sidecar/ewms_sidecar.py index 8ee88719..d1cea012 100644 --- a/ewms_sidecar/ewms_sidecar.py +++ b/ewms_sidecar/ewms_sidecar.py @@ -23,7 +23,7 @@ def main() -> None: required=True, dest="method", help="how to start up the jobs", # TODO - remove once EWMS is full-time - choices=["remote-condor"], # , "ewms"], + choices=["direct-remote-condor"], # , "ewms"], ) parser.add_argument( @@ -48,7 +48,7 @@ def main() -> None: # Go! match args.method: - case "remote-condor": + case "direct-remote-condor": condor.act(args) # case "ewms": # ewms.act(args) diff --git a/skydriver/k8s/scanner_instance.py b/skydriver/k8s/scanner_instance.py index 0a64bf36..89706c6a 100644 --- a/skydriver/k8s/scanner_instance.py +++ b/skydriver/k8s/scanner_instance.py @@ -236,7 +236,7 @@ def get_cluster_starter_args( # ADAPT args for EWMS Sidecar # TODO - remove once tested in prod if starter_exc == "ewms_sidecar" and request_cluster.orchestrator == "condor": - args.replace("clientmanager", "ewms_sidecar remote-condor") + args.replace("clientmanager", "ewms_sidecar direct-remote-condor") args.replace(" condor ", " ") args.replace(" start ", " ")