diff --git a/.github/workflows/wipac-cicd.yml b/.github/workflows/wipac-cicd.yml index 185397b9..1d7ed544 100644 --- a/.github/workflows/wipac-cicd.yml +++ b/.github/workflows/wipac-cicd.yml @@ -4,7 +4,7 @@ on: [push] env: CI_TEST: 'yes' - CLIENTMANAGER_IMAGE_WITH_TAG: 'ghcr.io/wipacrepo/skydriver:latest' + THIS_IMAGE_WITH_TAG: 'ghcr.io/wipacrepo/skydriver:latest' EWMS_PILOT_TASK_TIMEOUT: 999 SCAN_BACKLOG_RUNNER_SHORT_DELAY: 1 SCAN_BACKLOG_RUNNER_DELAY: 1 @@ -135,6 +135,7 @@ jobs: docker run --network="host" --rm -i --name test \ --env LATEST_TAG=$LATEST_TAG \ + --env THIS_IMAGE_WITH_TAG=$THIS_IMAGE_WITH_TAG \ $(env | grep '^SKYSCAN_' | awk '$0="--env "$0') \ $(env | grep '^EWMS_' | awk '$0="--env "$0') \ $(env | grep '^CLIENTMANAGER_' | awk '$0="--env "$0') \ diff --git a/dependencies-from-Dockerfile.log b/dependencies-from-Dockerfile.log index 2dc2b65b..09bc3d1b 100644 --- a/dependencies-from-Dockerfile.log +++ b/dependencies-from-Dockerfile.log @@ -7,8 +7,8 @@ # pip freeze ######################################################################## backoff==2.2.1 -boto3==1.34.3 -botocore==1.34.3 +boto3==1.34.4 +botocore==1.34.4 cachetools==5.3.2 certifi==2023.11.17 cffi==1.16.0 @@ -75,15 +75,15 @@ pip==23.2.1 pipdeptree==2.13.1 setuptools==65.5.1 skydriver-clientmanager-ewms-sidecar -├── boto3 [required: Any, installed: 1.34.3] -│ ├── botocore [required: >=1.34.3,<1.35.0, installed: 1.34.3] +├── boto3 [required: Any, installed: 1.34.4] +│ ├── botocore [required: >=1.34.4,<1.35.0, installed: 1.34.4] │ │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1] │ │ ├── python-dateutil [required: >=2.1,<3.0.0, installed: 2.8.2] │ │ │ └── six [required: >=1.5, installed: 1.16.0] │ │ └── urllib3 [required: >=1.25.4,<2.1, installed: 1.26.18] │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1] │ └── s3transfer [required: >=0.9.0,<0.10.0, installed: 0.9.0] -│ └── botocore [required: >=1.33.2,<2.0a.0, installed: 1.34.3] +│ └── botocore [required: >=1.33.2,<2.0a.0, installed: 1.34.4] │ ├── jmespath [required: >=0.7.1,<2.0.0, installed: 1.0.1] │ ├── python-dateutil [required: >=2.1,<3.0.0, installed: 2.8.2] │ │ └── six [required: >=1.5, installed: 1.16.0] diff --git a/ewms_sidecar/__main__.py b/ewms_sidecar/__main__.py index a84f189f..8e48c21d 100644 --- a/ewms_sidecar/__main__.py +++ b/ewms_sidecar/__main__.py @@ -1,7 +1,7 @@ -"""Entry-point to start up clientmanager service.""" +"""Entry-point to start up EWMS Sidecar.""" -from . import clientmanager, config +from . import config, ewms_sidecar if __name__ == "__main__": - clientmanager.main() + ewms_sidecar.main() config.LOGGER.info("Done.") diff --git a/ewms_sidecar/condor/act.py b/ewms_sidecar/condor/act.py index 5a8d3ab9..91f04fb1 100644 --- a/ewms_sidecar/condor/act.py +++ b/ewms_sidecar/condor/act.py @@ -7,7 +7,7 @@ from .. import utils from ..config import ENV, LOGGER -from . import condor_tools, starter, stopper, watcher +from . import condor_tools, starter, watcher def act(args: argparse.Namespace) -> None: @@ -26,70 +26,59 @@ def act(args: argparse.Namespace) -> None: def _act(args: argparse.Namespace, schedd_obj: htcondor.Schedd) -> None: - match args.action: - case "start": - LOGGER.info( - f"Starting {args.n_workers} Skymap Scanner client workers on {args.collector} / {args.schedd}" - ) - # make connections -- do now so we don't have any surprises downstream - skydriver_rc = utils.connect_to_skydriver() - # start - submit_dict = starter.prep( - spool=args.spool, - # starter CL args -- worker - worker_memory_bytes=args.worker_memory_bytes, - worker_disk_bytes=args.worker_disk_bytes, - n_cores=args.n_cores, - max_worker_runtime=args.max_worker_runtime, - priority=args.priority, - # starter CL args -- client - client_args=args.client_args, - client_startup_json_s3=utils.s3ify(args.client_startup_json), - image=args.image, - ) - # final checks - if args.dryrun: - LOGGER.critical("Script Aborted: dryrun enabled") - return - if utils.skydriver_aborted_scan(skydriver_rc): - LOGGER.critical("Script Aborted: SkyDriver aborted scan") - return - # start - submit_result_obj = starter.start( - schedd_obj=schedd_obj, - n_workers=args.n_workers, - submit_dict=submit_dict, - spool=args.spool, - ) - # report to SkyDriver - skydriver_cluster_obj = dict( - orchestrator="condor", - location={ - "collector": args.collector, - "schedd": args.schedd, - }, - uuid=args.uuid, - cluster_id=submit_result_obj.cluster(), - n_workers=submit_result_obj.num_procs(), - starter_info=submit_dict, - ) - utils.update_skydriver(skydriver_rc, **skydriver_cluster_obj) - LOGGER.info("Sent cluster info to SkyDriver") - watcher.watch( - args.collector, - args.schedd, - submit_result_obj.cluster(), - schedd_obj, - submit_result_obj.num_procs(), - skydriver_rc, - skydriver_cluster_obj, - ) - case "stop": - stopper.stop( - args.collector, - args.schedd, - args.cluster_id, - schedd_obj, - ) - case _: - raise RuntimeError(f"Unknown action: {args.action}") + LOGGER.info( + f"Starting {args.n_workers} Skymap Scanner client workers on {args.collector} / {args.schedd}" + ) + # make connections -- do now so we don't have any surprises downstream + skydriver_rc = utils.connect_to_skydriver() + # start + submit_dict = starter.prep( + spool=args.spool, + # starter CL args -- worker + worker_memory_bytes=args.worker_memory_bytes, + worker_disk_bytes=args.worker_disk_bytes, + n_cores=args.n_cores, + max_worker_runtime=args.max_worker_runtime, + priority=args.priority, + # starter CL args -- client + client_args=args.client_args, + client_startup_json_s3=utils.s3ify(args.client_startup_json), + image=args.image, + ) + # final checks + if args.dryrun: + LOGGER.critical("Script Aborted: dryrun enabled") + return + if utils.skydriver_aborted_scan(skydriver_rc): + LOGGER.critical("Script Aborted: SkyDriver aborted scan") + return + # start + submit_result_obj = starter.start( + schedd_obj=schedd_obj, + n_workers=args.n_workers, + submit_dict=submit_dict, + spool=args.spool, + ) + # report to SkyDriver + skydriver_cluster_obj = dict( + orchestrator="condor", + location={ + "collector": args.collector, + "schedd": args.schedd, + }, + uuid=args.uuid, + cluster_id=submit_result_obj.cluster(), + n_workers=submit_result_obj.num_procs(), + starter_info=submit_dict, + ) + utils.update_skydriver(skydriver_rc, **skydriver_cluster_obj) + LOGGER.info("Sent cluster info to SkyDriver") + watcher.watch( + args.collector, + args.schedd, + submit_result_obj.cluster(), + schedd_obj, + submit_result_obj.num_procs(), + skydriver_rc, + skydriver_cluster_obj, + ) diff --git a/ewms_sidecar/ewms_sidecar.py b/ewms_sidecar/ewms_sidecar.py index 37997b51..d1cea012 100644 --- a/ewms_sidecar/ewms_sidecar.py +++ b/ewms_sidecar/ewms_sidecar.py @@ -1,4 +1,4 @@ -"""The central module.""" +"""The EWMS Sidecar.""" import argparse @@ -7,49 +7,33 @@ from wipac_dev_tools import argparse_tools, logging_tools -from . import condor, k8s +from . import condor from .config import ENV, LOGGER def main() -> None: """Main.""" parser = argparse.ArgumentParser( - description="Manage Skymap Scanner client workers", + description="Handle EWMS requests adjacent to a Skymap Scanner central server", formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) + # method parser.add_argument( - "--uuid", required=True, - help="the uuid for the cluster", + dest="method", + help="how to start up the jobs", # TODO - remove once EWMS is full-time + choices=["direct-remote-condor"], # , "ewms"], ) - # orchestrator - orch_subparsers = parser.add_subparsers( + parser.add_argument( + "--uuid", required=True, - dest="orchestrator", - help="the resource orchestration tool to use for worker scheduling", - ) - OrchestratorArgs.condor( - orch_condor_parser := orch_subparsers.add_parser( - "condor", help="orchestrate with HTCondor" - ) - ) - OrchestratorArgs.k8s( - orch_k8s_parser := orch_subparsers.add_parser( - "k8s", help="orchestrate with Kubernetes" - ) + help="the uuid for the cluster", ) - # action -- add sub-parser to each sub-parser (can't add multiple sub-parsers) - for p in [orch_condor_parser, orch_k8s_parser]: - act_subparsers = p.add_subparsers( - required=True, - dest="action", - help="the action to perform on the worker cluster", - ) - ActionArgs.starter(act_subparsers.add_parser("start", help="start workers")) - ActionArgs.stopper(act_subparsers.add_parser("stop", help="stop workers")) + SidecarArgs.condor(parser) + SidecarArgs.starter(parser) # parse args & set up logging args = parser.parse_args() @@ -63,60 +47,33 @@ def main() -> None: logging_tools.log_argparse_args(args, logger=LOGGER, level="WARNING") # Go! - match args.orchestrator: - case "condor": + match args.method: + case "direct-remote-condor": condor.act(args) - case "k8s": - k8s.act(args) + # case "ewms": + # ewms.act(args) case other: - raise RuntimeError(f"Orchestrator not supported: {other}") + raise RuntimeError(f"method not supported: {other}") -class OrchestratorArgs: +class SidecarArgs: @staticmethod - def condor(sub_parser: argparse.ArgumentParser) -> None: - """Add args to subparser.""" - sub_parser.add_argument( + def condor(parser: argparse.ArgumentParser) -> None: + """Add args to parser.""" + parser.add_argument( "--collector", default="", help="the full URL address of the HTCondor collector server. Ex: foo-bar.icecube.wisc.edu", ) - sub_parser.add_argument( + parser.add_argument( "--schedd", default="", help="the full DNS name of the HTCondor Schedd server. Ex: baz.icecube.wisc.edu", ) @staticmethod - def k8s(sub_parser: argparse.ArgumentParser) -> None: - """Add args to subparser.""" - sub_parser.add_argument( - "--host", - required=True, - help="the host server address to connect to for running workers", - ) - sub_parser.add_argument( - "--namespace", - required=True, - help="the k8s namespace to use for running workers", - ) - sub_parser.add_argument( - "--cpu-arch", - default="x64", - help="which CPU architecture to use for running workers", - ) - sub_parser.add_argument( - "--job-config-stub", - type=Path, - default=Path("resources/worker_k8s_job_stub.json"), - help="worker k8s job config file to dynamically complete, then run (json)", - ) - - -class ActionArgs: - @staticmethod - def starter(sub_parser: argparse.ArgumentParser) -> None: - """Add args to subparser.""" + def starter(parser: argparse.ArgumentParser) -> None: + """Add args to parser.""" def wait_for_file(waitee: Path, wait_time: int) -> Path: """Wait for `waitee` to exist, then return fullly-resolved path.""" @@ -133,13 +90,13 @@ def wait_for_file(waitee: Path, wait_time: int) -> Path: return waitee.resolve() # helper args - sub_parser.add_argument( + parser.add_argument( "--dryrun", default=False, action="store_true", help="does everything except submitting the worker(s)", ) - sub_parser.add_argument( + parser.add_argument( "--spool", default=False, action="store_true", @@ -147,44 +104,44 @@ def wait_for_file(waitee: Path, wait_time: int) -> Path: ) # worker args - sub_parser.add_argument( + parser.add_argument( "--worker-memory-bytes", required=True, type=int, help="amount of worker memory (bytes)", ) - sub_parser.add_argument( + parser.add_argument( "--worker-disk-bytes", required=True, type=int, help="amount of worker disk (bytes)", ) - sub_parser.add_argument( + parser.add_argument( "--n-cores", default=1, type=int, help="number of cores per worker", ) - sub_parser.add_argument( + parser.add_argument( "--n-workers", required=True, type=int, help="number of worker to start", ) - sub_parser.add_argument( + parser.add_argument( "--max-worker-runtime", required=True, type=int, - help="how long each worker is allowed to run -- condor only", # TODO - set for k8s? + help="how long each worker is allowed to run", ) - sub_parser.add_argument( + parser.add_argument( "--priority", required=True, - help="relative priority of this job/jobs -- condor only", # TODO - set for k8s? + help="relative priority of this job/jobs", ) # client args - sub_parser.add_argument( + parser.add_argument( "--client-args", required=False, nargs="*", @@ -195,7 +152,7 @@ def wait_for_file(waitee: Path, wait_time: int) -> Path: ), help="n 'key:value' pairs containing the python CL arguments to pass to skymap_scanner.client", ) - sub_parser.add_argument( + parser.add_argument( "--client-startup-json", help="The 'startup.json' file to startup each client", type=lambda x: wait_for_file( @@ -203,17 +160,8 @@ def wait_for_file(waitee: Path, wait_time: int) -> Path: ENV.CLIENT_STARTER_WAIT_FOR_STARTUP_JSON, ), ) - sub_parser.add_argument( + parser.add_argument( "--image", required=True, help="a path or url to the workers' image", ) - - @staticmethod - def stopper(sub_parser: argparse.ArgumentParser) -> None: - """Add args to subparser.""" - sub_parser.add_argument( - "--cluster-id", - required=True, - help="the cluster id of the workers to be stopped/removed", - ) diff --git a/skydriver/config.py b/skydriver/config.py index 4f3f0812..8a25e6a3 100644 --- a/skydriver/config.py +++ b/skydriver/config.py @@ -24,11 +24,11 @@ DEFAULT_WORKER_DISK_BYTES: int = humanfriendly.parse_size("1GB") K8S_CONTAINER_MEMORY_DEFAULT_BYTES: int = humanfriendly.parse_size("64M") -K8S_CONTAINER_MEMORY_TMS_STOPPER_BYTES: int = humanfriendly.parse_size("256M") -K8S_CONTAINER_MEMORY_TMS_STARTER_BYTES: int = humanfriendly.parse_size("256M") +K8S_CONTAINER_MEMORY_CLUSTER_STOPPER_BYTES: int = humanfriendly.parse_size("256M") +K8S_CONTAINER_MEMORY_CLUSTER_STARTER_BYTES: int = humanfriendly.parse_size("256M") -TMS_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED = 1 * 60 * 60 -TMS_STOPPER_K8S_JOB_N_RETRIES = 6 +CLUSTER_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED = 1 * 60 * 60 +CLUSTER_STOPPER_K8S_JOB_N_RETRIES = 6 SCAN_MIN_PRIORITY_TO_START_NOW = 10 @@ -61,7 +61,7 @@ class EnvConfig: SCAN_BACKLOG_RUNNER_DELAY: int = 5 * 60 SCAN_BACKLOG_PENDING_ENTRY_TTL_REVIVE: int = 5 * 60 # entry is revived after N secs - CLIENTMANAGER_IMAGE_WITH_TAG: str = "" + THIS_IMAGE_WITH_TAG: str = "" # k8s K8S_NAMESPACE: str = "" @@ -96,9 +96,9 @@ def __post_init__(self) -> None: object.__setattr__(self, "LOG_LEVEL", self.LOG_LEVEL.upper()) # b/c frozen # check missing env var(s) - if not self.CLIENTMANAGER_IMAGE_WITH_TAG: + if not self.THIS_IMAGE_WITH_TAG: raise RuntimeError( - "Missing required environment variable: 'CLIENTMANAGER_IMAGE_WITH_TAG'" + "Missing required environment variable: 'THIS_IMAGE_WITH_TAG'" ) if self.SCAN_BACKLOG_RUNNER_SHORT_DELAY > self.SCAN_BACKLOG_RUNNER_DELAY: diff --git a/skydriver/k8s/scanner_instance.py b/skydriver/k8s/scanner_instance.py index cc6d1647..89706c6a 100644 --- a/skydriver/k8s/scanner_instance.py +++ b/skydriver/k8s/scanner_instance.py @@ -11,12 +11,12 @@ from .. import images from ..config import ( + CLUSTER_STOPPER_K8S_JOB_N_RETRIES, + CLUSTER_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED, ENV, - K8S_CONTAINER_MEMORY_TMS_STARTER_BYTES, - K8S_CONTAINER_MEMORY_TMS_STOPPER_BYTES, + K8S_CONTAINER_MEMORY_CLUSTER_STARTER_BYTES, + K8S_CONTAINER_MEMORY_CLUSTER_STOPPER_BYTES, LOGGER, - TMS_STOPPER_K8S_JOB_N_RETRIES, - TMS_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED, DebugMode, ) from ..database import schema @@ -32,7 +32,7 @@ def get_cluster_auth_v1envvars( return info["v1envvars"] # type: ignore[no-any-return] -def get_tms_s3_v1envvars() -> list[kubernetes.client.V1EnvVar]: +def get_cluster_starter_s3_v1envvars() -> list[kubernetes.client.V1EnvVar]: """Get the `V1EnvVar`s for TMS's S3 auth.""" return [ kubernetes.client.V1EnvVar( @@ -70,7 +70,8 @@ def __init__( nsides: dict[int, int], is_real_event: bool, predictive_scanning_threshold: float, - # tms + # cluster starter + starter_exc: str, # TODO - remove once tested in prod worker_memory_bytes: int, worker_disk_bytes: int, request_clusters: list[schema.Cluster], @@ -112,21 +113,22 @@ def __init__( ) self.env_dict["scanner_server"] = [e.to_dict() for e in scanner_server.env] - # CONTAINER(S): TMS Starter(s) + # CONTAINER(S): Cluster Starter(s) tms_starters = [] for i, cluster in enumerate(request_clusters): tms_starters.append( KubeAPITools.create_container( - f"tms-starter-{i}-{scan_id}", - ENV.CLIENTMANAGER_IMAGE_WITH_TAG, - env=self.make_tms_starter_v1envvars( + f"{starter_exc}-{i}-{scan_id}", # TODO - replace once tested in prod + ENV.THIS_IMAGE_WITH_TAG, + env=self.make_cluster_starter_v1envvars( rest_address=rest_address, scan_id=scan_id, cluster=cluster, max_pixel_reco_time=max_pixel_reco_time, debug_mode=debug_mode, ), - args=self.get_tms_starter_args( + args=self.get_cluster_starter_args( + starter_exc=starter_exc, # TODO - remove once tested in prod common_space_volume_path=common_space_volume_path, docker_tag=docker_tag, worker_memory_bytes=worker_memory_bytes, @@ -138,10 +140,10 @@ def __init__( ), cpu=0.125, volumes={common_space_volume_path.name: common_space_volume_path}, - memory=K8S_CONTAINER_MEMORY_TMS_STARTER_BYTES, + memory=K8S_CONTAINER_MEMORY_CLUSTER_STARTER_BYTES, ) ) - self.tms_args_list = [" ".join(c.args) for c in tms_starters] + self.cluster_starter_args_list = [" ".join(c.args) for c in tms_starters] self.env_dict["tms_starters"] = [ [e.to_dict() for e in c.env] for c in tms_starters ] @@ -182,7 +184,8 @@ def get_scanner_server_args( return args @staticmethod - def get_tms_starter_args( + def get_cluster_starter_args( + starter_exc: str, # TODO - remove once tested in prod common_space_volume_path: Path, docker_tag: str, worker_memory_bytes: int, @@ -192,11 +195,7 @@ def get_tms_starter_args( max_worker_runtime: int, priority: int, ) -> list[str]: - """Make the starter container args. - - This also includes any client args not added by the - clientmanager. - """ + """Make the starter container args.""" args = f"python -m clientmanager --uuid {str(uuid.uuid4().hex)}" match request_cluster.orchestrator: @@ -234,6 +233,13 @@ def get_tms_starter_args( if DebugMode.CLIENT_LOGS in debug_mode: args += " --spool " + # ADAPT args for EWMS Sidecar + # TODO - remove once tested in prod + if starter_exc == "ewms_sidecar" and request_cluster.orchestrator == "condor": + args.replace("clientmanager", "ewms_sidecar direct-remote-condor") + args.replace(" condor ", " ") + args.replace(" start ", " ") + return args.split() @staticmethod @@ -332,7 +338,7 @@ def make_skyscan_server_v1envvars( return env @staticmethod - def make_tms_starter_v1envvars( + def make_cluster_starter_v1envvars( rest_address: str, scan_id: str, cluster: schema.Cluster, @@ -343,13 +349,13 @@ def make_tms_starter_v1envvars( Also, get the secrets' keys & their values. """ - LOGGER.debug(f"making tms starter env vars for {scan_id=}") + LOGGER.debug(f"making cluster starter env vars for {scan_id=}") env = [] # 1. start w/ secrets # NOTE: the values come from an existing secret in the current namespace env.extend(get_cluster_auth_v1envvars(cluster)) - env.extend(get_tms_s3_v1envvars()) + env.extend(get_cluster_starter_s3_v1envvars()) # 2. add required env vars required = { @@ -455,12 +461,12 @@ def __init__( containers.append( KubeAPITools.create_container( - f"tms-stopper-{i}-{scan_id}", - ENV.CLIENTMANAGER_IMAGE_WITH_TAG, + f"cluster-stopper-{i}-{scan_id}", + ENV.THIS_IMAGE_WITH_TAG, cpu=0.125, env=get_cluster_auth_v1envvars(cluster), args=args.split(), - memory=K8S_CONTAINER_MEMORY_TMS_STOPPER_BYTES, + memory=K8S_CONTAINER_MEMORY_CLUSTER_STOPPER_BYTES, ) ) @@ -468,11 +474,11 @@ def __init__( self.worker_stopper_job_obj = None else: self.worker_stopper_job_obj = KubeAPITools.kube_create_job_object( - f"tms-stopper-{scan_id}", + f"cluster-stopper-{scan_id}", containers, ENV.K8S_NAMESPACE, - TMS_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED, - n_retries=TMS_STOPPER_K8S_JOB_N_RETRIES, + CLUSTER_STOPPER_K8S_TTL_SECONDS_AFTER_FINISHED, + n_retries=CLUSTER_STOPPER_K8S_JOB_N_RETRIES, ) def go(self) -> Any: @@ -481,9 +487,9 @@ def go(self) -> Any: # NOTE - we don't want to stop the first k8s job because its containers will stop themselves. # plus, 'K8S_TTL_SECONDS_AFTER_FINISHED' will allow logs & pod status to be retrieved for some time # - # stop first k8s job (server & tms starters) -- may not be instantaneous + # stop first k8s job (server & cluster starters) -- may not be instantaneous # LOGGER.info( - # f"requesting removal of Skymap Scanner Job (server & tms starters) -- {self.scan_id=}..." + # f"requesting removal of Skymap Scanner Job (server & cluster starters) -- {self.scan_id=}..." # ) # resp = self.k8s_batch_api.delete_namespaced_job( # name=SkymapScannerK8sWrapper.get_job_name(self.scan_id), @@ -498,7 +504,7 @@ def go(self) -> Any: # stop workers if self.worker_stopper_job_obj: - LOGGER.info(f"starting k8s TMS-STOPPER job for {self.scan_id=}") + LOGGER.info(f"starting k8s CLUSTER-STOPPER job for {self.scan_id=}") KubeAPITools.start_job(self.k8s_batch_api, self.worker_stopper_job_obj) else: LOGGER.info(f"no workers to stop for {self.scan_id=}") diff --git a/skydriver/rest_handlers.py b/skydriver/rest_handlers.py index c40e7889..2951db5c 100644 --- a/skydriver/rest_handlers.py +++ b/skydriver/rest_handlers.py @@ -455,7 +455,10 @@ async def post(self) -> None: nsides=nsides, is_real_event=real_or_simulated_event in REAL_CHOICES, predictive_scanning_threshold=predictive_scanning_threshold, - # clientmanager + # cluster starter + starter_exc=str( # TODO - remove once tested in prod + classifiers.get("unstable_testing__starter_exc", "clientmanager") + ), request_clusters=request_clusters, worker_memory_bytes=worker_memory_bytes, worker_disk_bytes=worker_disk_bytes, @@ -474,7 +477,7 @@ async def post(self) -> None: event_i3live_json_dict, scan_id, scanner_wrapper.scanner_server_args, - scanner_wrapper.tms_args_list, + scanner_wrapper.cluster_starter_args_list, from_dict(database.schema.EnvVars, scanner_wrapper.env_dict), classifiers, priority,