From 78ef6199542e52dbfeabb62b2c3464501b52d9cf Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Tue, 23 Jul 2024 12:29:35 -0700 Subject: [PATCH 01/35] Include celerymanager and update celeryadapter to check the status of celery workers. --- merlin/study/celeryadapter.py | 9 ++ merlin/study/celerymanager.py | 144 +++++++++++++++++++++++++++ merlin/study/celerymanageradapter.py | 60 +++++++++++ 3 files changed, 213 insertions(+) create mode 100644 merlin/study/celerymanager.py create mode 100644 merlin/study/celerymanageradapter.py diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 5b5bdd41..11f1244a 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -47,6 +47,7 @@ from merlin.common.dumper import dump_handler from merlin.config import Config from merlin.study.batch import batch_check_parallel, batch_worker_launch +from merlin.study.celerymanageradapter import add_monitor_workers, remove_monitor_workers from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running @@ -762,6 +763,13 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs): """ try: _ = subprocess.Popen(worker_cmd, **kwargs) # pylint: disable=R1732 + # Get the worker name from worker_cmd and add to be monitored by celery manager + worker_cmd_list = worker_cmd.split() + worker_name = worker_cmd_list[worker_cmd_list.index("-n")+1].replace("%h", kwargs["env"]["HOSTNAME"]) + worker_name = "celery@" + worker_name + add_monitor_workers(workers=(worker_name, )) + LOG.info(f"Added {worker_name} to be monitored") + worker_list.append(worker_cmd) except Exception as e: # pylint: disable=C0103 LOG.error(f"Cannot start celery workers, {e}") @@ -866,6 +874,7 @@ def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): if workers_to_stop: LOG.info(f"Sending stop to these workers: {workers_to_stop}") app.control.broadcast("shutdown", destination=workers_to_stop) + remove_monitor_workers(workers=workers_to_stop) else: LOG.warning("No workers found to stop") diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py new file mode 100644 index 00000000..a5310ba9 --- /dev/null +++ b/merlin/study/celerymanager.py @@ -0,0 +1,144 @@ +############################################################################### +# Copyright (c) 2023, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory +# Written by the Merlin dev team, listed in the CONTRIBUTORS file. +# +# +# LLNL-CODE-797170 +# All rights reserved. +# This file is part of Merlin, Version: 1.12.1. +# +# For details, see https://github.com/LLNL/merlin. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +############################################################################### + +from merlin.config.configfile import CONFIG +from merlin.config.results_backend import get_backend_password +import os +import redis +import time + + +class WorkerStatus: + running = "Running" + stalled = "Stalled" + stopped = "Stopped" + rebooting = "Rebooting" + +WORKER_INFO = { + "status" : WorkerStatus.running, + "monitored": 1, + "num_unresponsive": 0, +} + +class CeleryManager(): + + def __init__(self, query_frequency=60, query_timeout=0.5, worker_timeout=180): + self.redis_connection = self.get_worker_status_redis_connection() + self.query_frequency = query_frequency + self.query_timeout = query_timeout + self.worker_timeout = worker_timeout + + @staticmethod + def get_worker_status_redis_connection(): + return CeleryManager.get_redis_connection(1) + + @staticmethod + def get_worker_args_redis_connection(): + return CeleryManager.get_redis_connection(2) + + @staticmethod + def get_redis_connection(db_num): + password_file = CONFIG.results_backend.password + try: + password = get_backend_password(password_file) + except IOError: + password = CONFIG.results_backend.password + return redis.Redis(host=CONFIG.results_backend.server, + port=CONFIG.results_backend.port, + db=db_num, + username=CONFIG.results_backend.username, + password=password) + + def get_celery_worker_status(worker): + pass + + def restart_celery_worker(worker): + pass + + def check_pid(pid): + """ Check For the existence of a unix pid. """ + try: + os.kill(pid, 0) + except OSError: + return False + else: + return True + + def run(self): + manager_info = { + "status": "Running", + "process id": os.getpid(), + } + self.redis_connection.hmset(name="manager", mapping=manager_info) + + + + + #while True: + # Get the list of running workers + workers = [i.decode("ascii") for i in self.redis_connection.keys()] + workers.remove("manager") + workers = [worker for worker in workers if int(self.redis_connection.hget(worker, "monitored"))] + print("Current Monitored Workers", workers) + + # Check/ Ping each worker to see if they are still running + if workers: + from merlin.celery import app + + celery_app = app.control + ping_result = celery_app.ping(workers, timeout=self.query_timeout) + worker_results = {worker: status for d in ping_result for worker, status in d.items()} + print("Worker result from ping", worker_results) + + # If running set the status on redis that it is running + for worker in list(worker_results.keys()): + self.redis_connection.hset(worker, "status", WorkerStatus.running) + + # If not running attempt to restart it + for worker in workers: + if worker not in worker_results: + # If time where the worker is unresponsive is less than the worker time out then just increment + num_unresponsive = int(self.redis_connection.hget(worker, "num_unresponsive"))+1 + if num_unresponsive*self.query_frequency < self.worker_timeout: + # Attempt to restart worker + + # If successful set the status to running + + # If failed set the status to stopped + #TODO Try to restart the worker + continue + else: + self.redis_connection.hset(worker, "num_unresponsive", num_unresponsive) + + #time.sleep(self.query_frequency) + +if __name__ == "__main__": + cm = CeleryManager() + cm.run() \ No newline at end of file diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py new file mode 100644 index 00000000..3f410ee5 --- /dev/null +++ b/merlin/study/celerymanageradapter.py @@ -0,0 +1,60 @@ +############################################################################### +# Copyright (c) 2023, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory +# Written by the Merlin dev team, listed in the CONTRIBUTORS file. +# +# +# LLNL-CODE-797170 +# All rights reserved. +# This file is part of Merlin, Version: 1.12.1. +# +# For details, see https://github.com/LLNL/merlin. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +############################################################################### +from merlin.study.celerymanager import CeleryManager, WORKER_INFO + +def add_monitor_workers(workers: list): + if workers is None or len(workers) <= 0: + return + + redis_connection = CeleryManager.get_worker_status_redis_connection() + for worker in workers: + if redis_connection.exists(worker): + redis_connection.hset(worker, "monitored", 1) + redis_connection.hmset(name=worker, mapping=WORKER_INFO) + redis_connection.quit() + +def remove_monitor_workers(workers: list): + if workers is None or len(workers) <= 0: + return + redis_connection = CeleryManager.get_worker_status_redis_connection() + for worker in workers: + if redis_connection.exists(worker): + redis_connection.hset(worker, "monitored", 0) + redis_connection.quit() + +def is_manager_runnning() -> bool: + pass + +def start_manager() -> bool: + pass + +def stop_manager() -> bool: + pass + From 8561a18b4dad9884d928645b35ee2d85bd98ceb0 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Tue, 23 Jul 2024 12:37:17 -0700 Subject: [PATCH 02/35] Fixed issue where the update status was outside of if statement for checking workers --- merlin/study/celerymanager.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index a5310ba9..abaa15d7 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -117,9 +117,9 @@ def run(self): worker_results = {worker: status for d in ping_result for worker, status in d.items()} print("Worker result from ping", worker_results) - # If running set the status on redis that it is running - for worker in list(worker_results.keys()): - self.redis_connection.hset(worker, "status", WorkerStatus.running) + # If running set the status on redis that it is running + for worker in list(worker_results.keys()): + self.redis_connection.hset(worker, "status", WorkerStatus.running) # If not running attempt to restart it for worker in workers: From 1120dd7cb64bc3e1704d9a0d7763cf47b9bbd5b5 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Thu, 1 Aug 2024 10:31:41 -0700 Subject: [PATCH 03/35] Include worker status stop and add template for merlin restart --- merlin/study/celeryadapter.py | 17 +++++++-- merlin/study/celerymanager.py | 57 ++++++++++++++++++++++------ merlin/study/celerymanageradapter.py | 17 +++++++-- 3 files changed, 71 insertions(+), 20 deletions(-) diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 11f1244a..4dad1efd 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -47,6 +47,7 @@ from merlin.common.dumper import dump_handler from merlin.config import Config from merlin.study.batch import batch_check_parallel, batch_worker_launch +from merlin.study.celerymanager import CeleryManager from merlin.study.celerymanageradapter import add_monitor_workers, remove_monitor_workers from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running @@ -762,15 +763,23 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs): :side effect: Launches a celery worker via a subprocess """ try: - _ = subprocess.Popen(worker_cmd, **kwargs) # pylint: disable=R1732 + process = subprocess.Popen(worker_cmd, **kwargs) # pylint: disable=R1732 # Get the worker name from worker_cmd and add to be monitored by celery manager worker_cmd_list = worker_cmd.split() worker_name = worker_cmd_list[worker_cmd_list.index("-n")+1].replace("%h", kwargs["env"]["HOSTNAME"]) worker_name = "celery@" + worker_name - add_monitor_workers(workers=(worker_name, )) - LOG.info(f"Added {worker_name} to be monitored") - worker_list.append(worker_cmd) + + # Adding the worker args to redis db + redis_connection = CeleryManager.get_worker_args_redis_connection() + args = kwargs['env'] + args["worker_cmd"] = worker_cmd + redis_connection.hmset(name=worker_name, mapping=args) + redis_connection.quit() + + # Adding the worker to redis db to be monitored + add_monitor_workers(workers=((worker_name, process.pid), )) + LOG.info(f"Added {worker_name} to be monitored") except Exception as e: # pylint: disable=C0103 LOG.error(f"Cannot start celery workers, {e}") raise diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index abaa15d7..7061b2df 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -32,6 +32,7 @@ from merlin.config.results_backend import get_backend_password import os import redis +import subprocess import time @@ -43,6 +44,7 @@ class WorkerStatus: WORKER_INFO = { "status" : WorkerStatus.running, + "pid": -1, "monitored": 1, "num_unresponsive": 0, } @@ -74,13 +76,48 @@ def get_redis_connection(db_num): port=CONFIG.results_backend.port, db=db_num, username=CONFIG.results_backend.username, - password=password) + password=password, + decode_responses=True) - def get_celery_worker_status(worker): - pass + def get_celery_workers_status(self, workers): + from merlin.celery import app - def restart_celery_worker(worker): - pass + celery_app = app.control + ping_result = celery_app.ping(workers, timeout=self.query_timeout) + worker_results = {worker: status for d in ping_result for worker, status in d.items()} + print("Worker result from ping", worker_results) + return worker_results + + def stop_celery_worker(self, worker): + """ + Stop a celery worker by first broadcasting shutdown. If unsuccessful kill the worker with pid + :param CeleryManager self: CeleryManager attempting the stop. + :param str worker: Worker that is being stopped. + """ + from merlin.celery import app + + app.control.broadcast("shutdown", destination=(worker, )) + + + + def restart_celery_worker(self, worker): + # Stop the worker that is currently running + + + # Start the worker again with the args saved in redis db + worker_args_connect = self.get_worker_args_redis_connection() + worker_status_connect = self.get_worker_status_redis_connection() + # Get the args and remove the worker_cmd from the hash set + args = worker_args_connect.hgetall(worker) + worker_cmd = args["worker_cmd"] + del args["worker_cmd"] + # Run the subprocess for the worker and save the PID + process = subprocess.Popen(worker_cmd, *args) + worker_status_connect.hset(worker, "pid", process.pid) + + worker_args_connect.quit() + worker_status_connect.quit() + def check_pid(pid): """ Check For the existence of a unix pid. """ @@ -103,19 +140,15 @@ def run(self): #while True: # Get the list of running workers - workers = [i.decode("ascii") for i in self.redis_connection.keys()] + workers = self.redis_connection.keys() workers.remove("manager") workers = [worker for worker in workers if int(self.redis_connection.hget(worker, "monitored"))] print("Current Monitored Workers", workers) + self.restart_celery_worker(workers[0]) # Check/ Ping each worker to see if they are still running if workers: - from merlin.celery import app - - celery_app = app.control - ping_result = celery_app.ping(workers, timeout=self.query_timeout) - worker_results = {worker: status for d in ping_result for worker, status in d.items()} - print("Worker result from ping", worker_results) + worker_results = self.get_celery_workers_status(workers) # If running set the status on redis that it is running for worker in list(worker_results.keys()): diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py index 3f410ee5..f990e740 100644 --- a/merlin/study/celerymanageradapter.py +++ b/merlin/study/celerymanageradapter.py @@ -27,17 +27,24 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. ############################################################################### -from merlin.study.celerymanager import CeleryManager, WORKER_INFO +from merlin.study.celerymanager import CeleryManager, WORKER_INFO, WorkerStatus def add_monitor_workers(workers: list): + """ + Adds a worker to be monitored by the celery manager. + :param list workers: A list of tuples which includes (worker_name, pid) + """ if workers is None or len(workers) <= 0: return redis_connection = CeleryManager.get_worker_status_redis_connection() for worker in workers: - if redis_connection.exists(worker): - redis_connection.hset(worker, "monitored", 1) - redis_connection.hmset(name=worker, mapping=WORKER_INFO) + if redis_connection.exists(worker[0]): + redis_connection.hset(worker[0], "monitored", 1) + redis_connection.hset(worker[0], "pid", worker[1]) + worker_info = WORKER_INFO + worker_info["pid"] = worker[1] + redis_connection.hmset(name=worker[0], mapping=worker_info) redis_connection.quit() def remove_monitor_workers(workers: list): @@ -47,6 +54,8 @@ def remove_monitor_workers(workers: list): for worker in workers: if redis_connection.exists(worker): redis_connection.hset(worker, "monitored", 0) + redis_connection.hset(worker, "status", WorkerStatus.stopped) + redis_connection.quit() def is_manager_runnning() -> bool: From f41938faf3e2356f0ee0f261c79effe8e58352de Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Thu, 1 Aug 2024 17:17:43 -0700 Subject: [PATCH 04/35] Added comment to the CeleryManager init --- merlin/study/celerymanager.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 7061b2df..1800e76e 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -51,7 +51,13 @@ class WorkerStatus: class CeleryManager(): - def __init__(self, query_frequency=60, query_timeout=0.5, worker_timeout=180): + def __init__(self, query_frequency:int=60, query_timeout:float=0.5, worker_timeout:int=180): + """ + Initializer for Celery Manager + @param int query_frequency: The frequency at which workers will be queried with ping commands + @param float query_timeout: The timeout for the query pings that are sent to workers + @param int worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. + """ self.redis_connection = self.get_worker_status_redis_connection() self.query_frequency = query_frequency self.query_timeout = query_timeout @@ -98,8 +104,6 @@ def stop_celery_worker(self, worker): app.control.broadcast("shutdown", destination=(worker, )) - - def restart_celery_worker(self, worker): # Stop the worker that is currently running From 690115e7fb6ea2a51f39730fbc7899e8bfbb74ea Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Thu, 1 Aug 2024 17:20:26 -0700 Subject: [PATCH 05/35] Increment db_num instead of being fixed --- merlin/study/celerymanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 1800e76e..53accf3a 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -80,7 +80,7 @@ def get_redis_connection(db_num): password = CONFIG.results_backend.password return redis.Redis(host=CONFIG.results_backend.server, port=CONFIG.results_backend.port, - db=db_num, + db=CONFIG.results_backend.db_num+db_num, #Increment db_num to avoid conflicts username=CONFIG.results_backend.username, password=password, decode_responses=True) From de4ffd02ccbd0a428dbae97d67ad12ffd45945eb Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Thu, 1 Aug 2024 18:09:08 -0700 Subject: [PATCH 06/35] Added other subprocess parameters and created a linking system for redis to store env dict --- merlin/study/celeryadapter.py | 16 +++++++++++++++- merlin/study/celerymanager.py | 12 ++++++++++-- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 4dad1efd..860b30b4 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -772,8 +772,22 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs): # Adding the worker args to redis db redis_connection = CeleryManager.get_worker_args_redis_connection() - args = kwargs['env'] + args = kwargs + # Save worker command with the arguements args["worker_cmd"] = worker_cmd + # Store the nested dictionaries into a separate key with a link. + # Note: This only support single nested dicts(for simplicity) and + # further nesting can be accomplished by making this recursive. + for key in kwargs: + if type(kwargs[key]) is dict: + key_name = worker_name+"_"+key + redis_connection.hmset(name=key_name, mapping=kwargs[key]) + args[key] = "link:"+key_name + if type(kwargs[key]) is bool: + if kwargs[key]: + args[key] = "True" + else: + args[key] = "False" redis_connection.hmset(name=worker_name, mapping=args) redis_connection.quit() diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 53accf3a..3ed3437a 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -115,8 +115,17 @@ def restart_celery_worker(self, worker): args = worker_args_connect.hgetall(worker) worker_cmd = args["worker_cmd"] del args["worker_cmd"] + kwargs = args + for key in args: + if args[key].startswith("link:"): + kwargs[key] = worker_args_connect.hgetall(args[key].split(":", 1)[1]) + elif args[key] == "True": + kwargs[key] = True + elif args[key] == "False": + kwargs[key] = False + # Run the subprocess for the worker and save the PID - process = subprocess.Popen(worker_cmd, *args) + process = subprocess.Popen(worker_cmd, **kwargs) worker_status_connect.hset(worker, "pid", process.pid) worker_args_connect.quit() @@ -148,7 +157,6 @@ def run(self): workers.remove("manager") workers = [worker for worker in workers if int(self.redis_connection.hget(worker, "monitored"))] print("Current Monitored Workers", workers) - self.restart_celery_worker(workers[0]) # Check/ Ping each worker to see if they are still running if workers: From 67e9268afd3098afe4cf9d4eb0a487f52400e4ea Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Tue, 6 Aug 2024 16:12:46 -0700 Subject: [PATCH 07/35] Implemented stopping of celery workers and restarting workers properly --- merlin/study/celerymanager.py | 112 +++++++++++++++++++--------------- 1 file changed, 63 insertions(+), 49 deletions(-) diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 3ed3437a..1b30f202 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -31,6 +31,7 @@ from merlin.config.configfile import CONFIG from merlin.config.results_backend import get_backend_password import os +import psutil import redis import subprocess import time @@ -91,23 +92,42 @@ def get_celery_workers_status(self, workers): celery_app = app.control ping_result = celery_app.ping(workers, timeout=self.query_timeout) worker_results = {worker: status for d in ping_result for worker, status in d.items()} - print("Worker result from ping", worker_results) return worker_results def stop_celery_worker(self, worker): """ - Stop a celery worker by first broadcasting shutdown. If unsuccessful kill the worker with pid + Stop a celery worker by kill the worker with pid :param CeleryManager self: CeleryManager attempting the stop. :param str worker: Worker that is being stopped. + + :return bool: The result of whether a worker was stopped. """ - from merlin.celery import app - app.control.broadcast("shutdown", destination=(worker, )) + # Get the PID associated with the pid + worker_status_connect = self.get_worker_status_redis_connection() + worker_pid = int(worker_status_connect.hget(worker, "pid")) + # Check to see if the pid exists + if psutil.pid_exists(worker_pid): + # Check to see if the pid is associated with celery + worker_process = psutil.Process(worker_pid) + if "celery" in worker_process.name(): + # Kill the pid if both conditions are right + worker_process.kill() + return True + return False def restart_celery_worker(self, worker): - # Stop the worker that is currently running + """ + Restart a celery worker with the same arguements and parameters during its creation + :param CeleryManager self: CeleryManager attempting the stop. + :param str worker: Worker that is being restarted. + :return bool: The result of whether a worker was restarted. + """ + # Stop the worker that is currently running + if not self.stop_celery_worker(worker): + return False # Start the worker again with the args saved in redis db worker_args_connect = self.get_worker_args_redis_connection() worker_status_connect = self.get_worker_status_redis_connection() @@ -130,59 +150,53 @@ def restart_celery_worker(self, worker): worker_args_connect.quit() worker_status_connect.quit() - - def check_pid(pid): - """ Check For the existence of a unix pid. """ - try: - os.kill(pid, 0) - except OSError: - return False - else: - return True + return True + def run(self): + """ + Main manager loop + """ + manager_info = { "status": "Running", "process id": os.getpid(), } self.redis_connection.hmset(name="manager", mapping=manager_info) - - - - #while True: - # Get the list of running workers - workers = self.redis_connection.keys() - workers.remove("manager") - workers = [worker for worker in workers if int(self.redis_connection.hget(worker, "monitored"))] - print("Current Monitored Workers", workers) - - # Check/ Ping each worker to see if they are still running - if workers: - worker_results = self.get_celery_workers_status(workers) - - # If running set the status on redis that it is running - for worker in list(worker_results.keys()): - self.redis_connection.hset(worker, "status", WorkerStatus.running) - - # If not running attempt to restart it - for worker in workers: - if worker not in worker_results: - # If time where the worker is unresponsive is less than the worker time out then just increment - num_unresponsive = int(self.redis_connection.hget(worker, "num_unresponsive"))+1 - if num_unresponsive*self.query_frequency < self.worker_timeout: - # Attempt to restart worker - - # If successful set the status to running - - # If failed set the status to stopped - #TODO Try to restart the worker - continue - else: - self.redis_connection.hset(worker, "num_unresponsive", num_unresponsive) - - #time.sleep(self.query_frequency) + while True: #TODO Make it so that it will stop after a list of workers is stopped + # Get the list of running workers + workers = self.redis_connection.keys() + workers.remove("manager") + workers = [worker for worker in workers if int(self.redis_connection.hget(worker, "monitored"))] + + # Check/ Ping each worker to see if they are still running + if workers: + worker_results = self.get_celery_workers_status(workers) + + # If running set the status on redis that it is running + for worker in list(worker_results.keys()): + self.redis_connection.hset(worker, "status", WorkerStatus.running) + + # If not running attempt to restart it + for worker in workers: + if worker not in worker_results: + # If time where the worker is unresponsive is less than the worker time out then just increment + num_unresponsive = int(self.redis_connection.hget(worker, "num_unresponsive"))+1 + if num_unresponsive*self.query_frequency < self.worker_timeout: + # Attempt to restart worker + if self.restart_celery_worker(worker): + # If successful set the status to running and reset num_unresponsive + self.redis_connection.hset(worker, "status", WorkerStatus.running) + self.redis_connection.hset(worker, "num_unresponsive", 0) + # If failed set the status to stopped + self.redis_connection.hset(worker, "status", WorkerStatus.stopped) + else: + self.redis_connection.hset(worker, "num_unresponsive", num_unresponsive) + # Sleep for the query_frequency for the next iteration + print("Finished checking") + time.sleep(self.query_frequency) if __name__ == "__main__": cm = CeleryManager() From 406e4c2b280d86aa3fa1eca5630ed9342bf21489 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Tue, 6 Aug 2024 16:21:15 -0700 Subject: [PATCH 08/35] Update stopped to stalled for when the worker doesn't respond to restart --- merlin/study/celerymanager.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 1b30f202..3f4c3745 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -106,8 +106,10 @@ def stop_celery_worker(self, worker): # Get the PID associated with the pid worker_status_connect = self.get_worker_status_redis_connection() worker_pid = int(worker_status_connect.hget(worker, "pid")) - # Check to see if the pid exists - if psutil.pid_exists(worker_pid): + worker_status = worker_status_connect.hget(worker, "status") + worker_status_connect.quit() + # Check to see if the pid exists and worker is set as running + if worker_status == WorkerStatus.running and psutil.pid_exists(worker_pid): # Check to see if the pid is associated with celery worker_process = psutil.Process(worker_pid) if "celery" in worker_process.name(): @@ -190,8 +192,8 @@ def run(self): # If successful set the status to running and reset num_unresponsive self.redis_connection.hset(worker, "status", WorkerStatus.running) self.redis_connection.hset(worker, "num_unresponsive", 0) - # If failed set the status to stopped - self.redis_connection.hset(worker, "status", WorkerStatus.stopped) + # If failed set the status to stalled + self.redis_connection.hset(worker, "status", WorkerStatus.stalled) else: self.redis_connection.hset(worker, "num_unresponsive", num_unresponsive) # Sleep for the query_frequency for the next iteration From 78e45254ea79b42f8323b601c9a7ea438961f486 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Tue, 6 Aug 2024 18:25:19 -0700 Subject: [PATCH 09/35] Working merlin manager run but start and stop not working properly --- merlin/main.py | 101 +++++++++++++++++++++++++++ merlin/study/celerymanager.py | 4 +- merlin/study/celerymanageradapter.py | 48 +++++++++++-- 3 files changed, 146 insertions(+), 7 deletions(-) diff --git a/merlin/main.py b/merlin/main.py index 4bb00598..aba27251 100644 --- a/merlin/main.py +++ b/merlin/main.py @@ -57,6 +57,7 @@ from merlin.server.server_commands import config_server, init_server, restart_server, start_server, status_server, stop_server from merlin.spec.expansion import RESERVED, get_spec_with_expansion from merlin.spec.specification import MerlinSpec +from merlin.study.celerymanageradapter import run_manager, start_manager, stop_manager from merlin.study.status import DetailedStatus, Status from merlin.study.status_constants import VALID_RETURN_CODES, VALID_STATUS_FILTERS from merlin.study.status_renderers import status_renderer_factory @@ -400,6 +401,26 @@ def process_example(args: Namespace) -> None: setup_example(args.workflow, args.path) +def process_manager(args : Namespace): + if args.command == "run": + run_manager(query_frequency=args.query_frequency, + query_timeout=args.query_timeout, + worker_timeout=args.worker_timeout) + elif args.command == "start": + if start_manager(query_frequency=args.query_frequency, + query_timeout=args.query_timeout, + worker_timeout=args.worker_timeout): + LOG.info("Manager started successfully.") + elif args.command == "stop": + if stop_manager(): + LOG.info("Manager stopped successfully.") + else: + LOG.error("Unable to stop manager.") + else: + print("Run manager with a command. Try 'merlin manager -h' for more details") + + + def process_monitor(args): """ CLI command to monitor merlin workers and queues to keep @@ -897,6 +918,86 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None: help="regex match for specific workers to stop", ) + # merlin manager + manager : ArgumentParser = subparsers.add_parser( + "manager", + help="Watchdog application to manage workers", + description="A daemon process that helps to restart and communicate with workers while running.", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + manager.set_defaults(func=process_manager) + + manager_commands: ArgumentParser = manager.add_subparsers(dest="command") + manager_run = manager_commands.add_parser( + "run", + help="Run the daemon process", + description="Run manager", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + manager_run.add_argument( + "-qf", + "--query_frequency", + action="store", + type=int, + default=60, + help="The frequency at which workers will be queried for response.", + ) + manager_run.add_argument( + "-qt", + "--query_timeout", + action="store", + type=float, + default=0.5, + help="The timeout for the query response that are sent to workers.", + ) + manager_run.add_argument( + "-wt", + "--worker_timeout", + action="store", + type=int, + default=180, + help="The sum total(query_frequency*tries) time before an attempt is made to restart worker.", + ) + manager_run.set_defaults(func=process_manager) + manager_start = manager_commands.add_parser( + "start", + help="Start the daemon process", + description="Start manager", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + manager_start.add_argument( + "-qf", + "--query_frequency", + action="store", + type=int, + default=60, + help="The frequency at which workers will be queried for response.", + ) + manager_start.add_argument( + "-qt", + "--query_timeout", + action="store", + type=float, + default=0.5, + help="The timeout for the query response that are sent to workers.", + ) + manager_start.add_argument( + "-wt", + "--worker_timeout", + action="store", + type=int, + default=180, + help="The sum total(query_frequency*tries) time before an attempt is made to restart worker.", + ) + manager_start.set_defaults(func=process_manager) + manager_stop = manager_commands.add_parser( + "stop", + help="Stop the daemon process", + description="Stop manager", + formatter_class=ArgumentDefaultsHelpFormatter, + ) + manager_stop.set_defaults(func=process_manager) + # merlin monitor monitor: ArgumentParser = subparsers.add_parser( "monitor", diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 3f4c3745..0a2ffacc 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -155,7 +155,7 @@ def restart_celery_worker(self, worker): return True - + #TODO add some logs def run(self): """ Main manager loop @@ -172,6 +172,7 @@ def run(self): workers = self.redis_connection.keys() workers.remove("manager") workers = [worker for worker in workers if int(self.redis_connection.hget(worker, "monitored"))] + print(f"Monitoring {workers} workers") # Check/ Ping each worker to see if they are still running if workers: @@ -197,7 +198,6 @@ def run(self): else: self.redis_connection.hset(worker, "num_unresponsive", num_unresponsive) # Sleep for the query_frequency for the next iteration - print("Finished checking") time.sleep(self.query_frequency) if __name__ == "__main__": diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py index f990e740..f703b099 100644 --- a/merlin/study/celerymanageradapter.py +++ b/merlin/study/celerymanageradapter.py @@ -28,10 +28,12 @@ # SOFTWARE. ############################################################################### from merlin.study.celerymanager import CeleryManager, WORKER_INFO, WorkerStatus +import psutil +import subprocess def add_monitor_workers(workers: list): """ - Adds a worker to be monitored by the celery manager. + Adds workers to be monitored by the celery manager. :param list workers: A list of tuples which includes (worker_name, pid) """ if workers is None or len(workers) <= 0: @@ -48,6 +50,10 @@ def add_monitor_workers(workers: list): redis_connection.quit() def remove_monitor_workers(workers: list): + """ + Remove workers from being monitored by the celery manager. + :param list workers: A worker names + """ if workers is None or len(workers) <= 0: return redis_connection = CeleryManager.get_worker_status_redis_connection() @@ -59,11 +65,43 @@ def remove_monitor_workers(workers: list): redis_connection.quit() def is_manager_runnning() -> bool: - pass + """ + Check to see if the manager is running -def start_manager() -> bool: - pass + :return: True if manager is running and False if not. + """ + redis_connection = CeleryManager.get_worker_args_redis_connection() + manager_status = redis_connection.hgetall("manager") + redis_connection.quit() + return manager_status["status"] == WorkerStatus.running and psutil.pid_exists(manager_status["pid"]) + +def run_manager(query_frequency:int = 60, query_timeout:float = 0.5, worker_timeout:int = 180) -> bool: + celerymanager = CeleryManager(query_frequency=query_frequency, + query_timeout=query_timeout, + worker_timeout=worker_timeout) + celerymanager.run() + + +def start_manager(query_frequency:int = 60, query_timeout:float = 0.5, worker_timeout:int = 180) -> bool: + process = subprocess.Popen(f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}".split(), + start_new_session=True, + close_fds=True, + stdout=subprocess.PIPE, + ) + redis_connection = CeleryManager.get_worker_args_redis_connection() + redis_connection.hset("manager", "pid", process.pid) + redis_connection.quit() + return True def stop_manager() -> bool: - pass + redis_connection = CeleryManager.get_worker_args_redis_connection() + manager_pid = redis_connection.hget("manager", "pid") + manager_status = redis_connection.hget("manager", "status") + redis_connection.quit() + + # Check to make sure that the manager is running and the pid exists + if manager_status == WorkerStatus.running and psutil.pid_exists(manager_pid): + psutil.Process(manager_pid).terminate() + return True + return False From eca74ac53fb4ff91b71c64930888413d2dd6eb17 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Tue, 6 Aug 2024 18:40:39 -0700 Subject: [PATCH 10/35] Made fix for subprocess to start new shell and fixed manager start and stop --- merlin/study/celerymanageradapter.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py index f703b099..c98f801a 100644 --- a/merlin/study/celerymanageradapter.py +++ b/merlin/study/celerymanageradapter.py @@ -83,22 +83,21 @@ def run_manager(query_frequency:int = 60, query_timeout:float = 0.5, worker_time def start_manager(query_frequency:int = 60, query_timeout:float = 0.5, worker_timeout:int = 180) -> bool: - process = subprocess.Popen(f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}".split(), - start_new_session=True, + process = subprocess.Popen(f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}", + shell=True, close_fds=True, stdout=subprocess.PIPE, ) - redis_connection = CeleryManager.get_worker_args_redis_connection() - redis_connection.hset("manager", "pid", process.pid) - redis_connection.quit() return True def stop_manager() -> bool: - redis_connection = CeleryManager.get_worker_args_redis_connection() - manager_pid = redis_connection.hget("manager", "pid") + redis_connection = CeleryManager.get_worker_status_redis_connection() + manager_pid = int(redis_connection.hget("manager", "pid")) manager_status = redis_connection.hget("manager", "status") + print(redis_connection.hgetall("manager")) redis_connection.quit() + print(manager_status, psutil.pid_exists(manager_pid)) # Check to make sure that the manager is running and the pid exists if manager_status == WorkerStatus.running and psutil.pid_exists(manager_pid): psutil.Process(manager_pid).terminate() From ec8aa789953781faf586e1fbd46943cac7ca36f9 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Tue, 6 Aug 2024 18:55:33 -0700 Subject: [PATCH 11/35] Added comments and update changelog --- CHANGELOG.md | 4 ++++ merlin/study/celerymanager.py | 19 +++++++++++++++++++ merlin/study/celerymanageradapter.py | 17 ++++++++++++++++- 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21b4427b..f0760df7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ All notable changes to Merlin will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [unreleased] +### Added +- Merlin manager capability to monitor celery workers. + ## [1.12.2b1] ### Added - Conflict handler option to the `dict_deep_merge` function in `utils.py` diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 0a2ffacc..41bbc1cd 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -66,14 +66,26 @@ def __init__(self, query_frequency:int=60, query_timeout:float=0.5, worker_timeo @staticmethod def get_worker_status_redis_connection(): + """ + Get the redis connection for info regarding the worker and manager status. + """ return CeleryManager.get_redis_connection(1) @staticmethod def get_worker_args_redis_connection(): + """ + Get the redis connection for info regarding the args used to generate each worker. + """ return CeleryManager.get_redis_connection(2) @staticmethod def get_redis_connection(db_num): + """ + Generic redis connection function to get the results backend redis server with a given db number increment. + :param int db_num: Increment number for the db from the one provided in the config file. + + :return Redis: Redis connections object that can be used to access values for the manager. + """ password_file = CONFIG.results_backend.password try: password = get_backend_password(password_file) @@ -87,6 +99,13 @@ def get_redis_connection(db_num): decode_responses=True) def get_celery_workers_status(self, workers): + """ + Get the worker status of a current worker that is being managed + :param CeleryManager self: CeleryManager attempting the stop. + :param list workers: Workers that are checked. + + :return dict: The result dictionary for each worker and the response. + """ from merlin.celery import app celery_app = app.control diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py index c98f801a..e6fba6ca 100644 --- a/merlin/study/celerymanageradapter.py +++ b/merlin/study/celerymanageradapter.py @@ -76,6 +76,10 @@ def is_manager_runnning() -> bool: return manager_status["status"] == WorkerStatus.running and psutil.pid_exists(manager_status["pid"]) def run_manager(query_frequency:int = 60, query_timeout:float = 0.5, worker_timeout:int = 180) -> bool: + """ + A process locking function that calls the celery manager with proper arguments. + :params: See CeleryManager for more information regarding the parameters + """ celerymanager = CeleryManager(query_frequency=query_frequency, query_timeout=query_timeout, worker_timeout=worker_timeout) @@ -83,7 +87,13 @@ def run_manager(query_frequency:int = 60, query_timeout:float = 0.5, worker_time def start_manager(query_frequency:int = 60, query_timeout:float = 0.5, worker_timeout:int = 180) -> bool: - process = subprocess.Popen(f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}", + """ + A Non-locking function that calls the celery manager with proper arguments. + :params: See CeleryManager for more information regarding the parameters + + :return bool: True if the manager was started successfully. + """ + subprocess.Popen(f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}", shell=True, close_fds=True, stdout=subprocess.PIPE, @@ -91,6 +101,11 @@ def start_manager(query_frequency:int = 60, query_timeout:float = 0.5, worker_ti return True def stop_manager() -> bool: + """ + Stop the manager process using it's pid. + + :return bool: True if the manager was stopped successfully and False otherwise. + """ redis_connection = CeleryManager.get_worker_status_redis_connection() manager_pid = int(redis_connection.hget("manager", "pid")) manager_status = redis_connection.hget("manager", "status") From 3f04d24cfcea09a3c2be86b12ebbb56ebc962c7f Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Tue, 6 Aug 2024 19:00:49 -0700 Subject: [PATCH 12/35] Include style fixes --- merlin/study/celerymanager.py | 10 ++++++---- merlin/study/celerymanageradapter.py | 7 +++++-- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 41bbc1cd..3d33f533 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -28,14 +28,16 @@ # SOFTWARE. ############################################################################### -from merlin.config.configfile import CONFIG -from merlin.config.results_backend import get_backend_password import os -import psutil -import redis import subprocess import time +import psutil +import redis + +from merlin.config.configfile import CONFIG +from merlin.config.results_backend import get_backend_password + class WorkerStatus: running = "Running" diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py index e6fba6ca..6ec94f5b 100644 --- a/merlin/study/celerymanageradapter.py +++ b/merlin/study/celerymanageradapter.py @@ -27,10 +27,13 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. ############################################################################### -from merlin.study.celerymanager import CeleryManager, WORKER_INFO, WorkerStatus -import psutil import subprocess +import psutil + +from merlin.study.celerymanager import WORKER_INFO, CeleryManager, WorkerStatus + + def add_monitor_workers(workers: list): """ Adds workers to be monitored by the celery manager. From 5538f4ba6d3898e716748ff1f32aab3e2f8a42d2 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Tue, 6 Aug 2024 19:07:25 -0700 Subject: [PATCH 13/35] Fix style for black --- .../null_spec/scripts/launch_jobs.py | 4 +- merlin/main.py | 15 +++---- merlin/study/celeryadapter.py | 8 ++-- merlin/study/celerymanager.py | 40 ++++++++++--------- merlin/study/celerymanageradapter.py | 24 ++++++----- 5 files changed, 46 insertions(+), 45 deletions(-) diff --git a/merlin/examples/workflows/null_spec/scripts/launch_jobs.py b/merlin/examples/workflows/null_spec/scripts/launch_jobs.py index a6b6d137..99c3c3d6 100644 --- a/merlin/examples/workflows/null_spec/scripts/launch_jobs.py +++ b/merlin/examples/workflows/null_spec/scripts/launch_jobs.py @@ -78,9 +78,7 @@ if real_time > 1440: real_time = 1440 submit: str = "submit.sbatch" - command: str = ( - f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}" - ) + command: str = f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}" shutil.copyfile(os.path.join(submit_path, submit), submit) shutil.copyfile(args.spec_path, "spec.yaml") shutil.copyfile(args.script_path, os.path.join("scripts", "make_samples.py")) diff --git a/merlin/main.py b/merlin/main.py index aba27251..e6bb7c0c 100644 --- a/merlin/main.py +++ b/merlin/main.py @@ -401,15 +401,13 @@ def process_example(args: Namespace) -> None: setup_example(args.workflow, args.path) -def process_manager(args : Namespace): +def process_manager(args: Namespace): if args.command == "run": - run_manager(query_frequency=args.query_frequency, - query_timeout=args.query_timeout, - worker_timeout=args.worker_timeout) + run_manager(query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout) elif args.command == "start": - if start_manager(query_frequency=args.query_frequency, - query_timeout=args.query_timeout, - worker_timeout=args.worker_timeout): + if start_manager( + query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout + ): LOG.info("Manager started successfully.") elif args.command == "stop": if stop_manager(): @@ -420,7 +418,6 @@ def process_manager(args : Namespace): print("Run manager with a command. Try 'merlin manager -h' for more details") - def process_monitor(args): """ CLI command to monitor merlin workers and queues to keep @@ -919,7 +916,7 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None: ) # merlin manager - manager : ArgumentParser = subparsers.add_parser( + manager: ArgumentParser = subparsers.add_parser( "manager", help="Watchdog application to manage workers", description="A daemon process that helps to restart and communicate with workers while running.", diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 860b30b4..6c09590a 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -766,7 +766,7 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs): process = subprocess.Popen(worker_cmd, **kwargs) # pylint: disable=R1732 # Get the worker name from worker_cmd and add to be monitored by celery manager worker_cmd_list = worker_cmd.split() - worker_name = worker_cmd_list[worker_cmd_list.index("-n")+1].replace("%h", kwargs["env"]["HOSTNAME"]) + worker_name = worker_cmd_list[worker_cmd_list.index("-n") + 1].replace("%h", kwargs["env"]["HOSTNAME"]) worker_name = "celery@" + worker_name worker_list.append(worker_cmd) @@ -780,9 +780,9 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs): # further nesting can be accomplished by making this recursive. for key in kwargs: if type(kwargs[key]) is dict: - key_name = worker_name+"_"+key + key_name = worker_name + "_" + key redis_connection.hmset(name=key_name, mapping=kwargs[key]) - args[key] = "link:"+key_name + args[key] = "link:" + key_name if type(kwargs[key]) is bool: if kwargs[key]: args[key] = "True" @@ -792,7 +792,7 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs): redis_connection.quit() # Adding the worker to redis db to be monitored - add_monitor_workers(workers=((worker_name, process.pid), )) + add_monitor_workers(workers=((worker_name, process.pid),)) LOG.info(f"Added {worker_name} to be monitored") except Exception as e: # pylint: disable=C0103 LOG.error(f"Cannot start celery workers, {e}") diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 3d33f533..795ca10f 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -45,27 +45,28 @@ class WorkerStatus: stopped = "Stopped" rebooting = "Rebooting" + WORKER_INFO = { - "status" : WorkerStatus.running, + "status": WorkerStatus.running, "pid": -1, "monitored": 1, "num_unresponsive": 0, } -class CeleryManager(): - def __init__(self, query_frequency:int=60, query_timeout:float=0.5, worker_timeout:int=180): +class CeleryManager: + def __init__(self, query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180): """ Initializer for Celery Manager @param int query_frequency: The frequency at which workers will be queried with ping commands @param float query_timeout: The timeout for the query pings that are sent to workers - @param int worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. + @param int worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. """ self.redis_connection = self.get_worker_status_redis_connection() self.query_frequency = query_frequency self.query_timeout = query_timeout self.worker_timeout = worker_timeout - + @staticmethod def get_worker_status_redis_connection(): """ @@ -93,12 +94,14 @@ def get_redis_connection(db_num): password = get_backend_password(password_file) except IOError: password = CONFIG.results_backend.password - return redis.Redis(host=CONFIG.results_backend.server, - port=CONFIG.results_backend.port, - db=CONFIG.results_backend.db_num+db_num, #Increment db_num to avoid conflicts - username=CONFIG.results_backend.username, - password=password, - decode_responses=True) + return redis.Redis( + host=CONFIG.results_backend.server, + port=CONFIG.results_backend.port, + db=CONFIG.results_backend.db_num + db_num, # Increment db_num to avoid conflicts + username=CONFIG.results_backend.username, + password=password, + decode_responses=True, + ) def get_celery_workers_status(self, workers): """ @@ -108,7 +111,7 @@ def get_celery_workers_status(self, workers): :return dict: The result dictionary for each worker and the response. """ - from merlin.celery import app + from merlin.celery import app celery_app = app.control ping_result = celery_app.ping(workers, timeout=self.query_timeout) @@ -176,7 +179,7 @@ def restart_celery_worker(self, worker): return True - #TODO add some logs + # TODO add some logs def run(self): """ Main manager loop @@ -188,7 +191,7 @@ def run(self): } self.redis_connection.hmset(name="manager", mapping=manager_info) - while True: #TODO Make it so that it will stop after a list of workers is stopped + while True: # TODO Make it so that it will stop after a list of workers is stopped # Get the list of running workers workers = self.redis_connection.keys() workers.remove("manager") @@ -207,8 +210,8 @@ def run(self): for worker in workers: if worker not in worker_results: # If time where the worker is unresponsive is less than the worker time out then just increment - num_unresponsive = int(self.redis_connection.hget(worker, "num_unresponsive"))+1 - if num_unresponsive*self.query_frequency < self.worker_timeout: + num_unresponsive = int(self.redis_connection.hget(worker, "num_unresponsive")) + 1 + if num_unresponsive * self.query_frequency < self.worker_timeout: # Attempt to restart worker if self.restart_celery_worker(worker): # If successful set the status to running and reset num_unresponsive @@ -220,7 +223,8 @@ def run(self): self.redis_connection.hset(worker, "num_unresponsive", num_unresponsive) # Sleep for the query_frequency for the next iteration time.sleep(self.query_frequency) - + + if __name__ == "__main__": cm = CeleryManager() - cm.run() \ No newline at end of file + cm.run() diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py index 6ec94f5b..a433a8ca 100644 --- a/merlin/study/celerymanageradapter.py +++ b/merlin/study/celerymanageradapter.py @@ -41,7 +41,7 @@ def add_monitor_workers(workers: list): """ if workers is None or len(workers) <= 0: return - + redis_connection = CeleryManager.get_worker_status_redis_connection() for worker in workers: if redis_connection.exists(worker[0]): @@ -52,6 +52,7 @@ def add_monitor_workers(workers: list): redis_connection.hmset(name=worker[0], mapping=worker_info) redis_connection.quit() + def remove_monitor_workers(workers: list): """ Remove workers from being monitored by the celery manager. @@ -64,9 +65,10 @@ def remove_monitor_workers(workers: list): if redis_connection.exists(worker): redis_connection.hset(worker, "monitored", 0) redis_connection.hset(worker, "status", WorkerStatus.stopped) - + redis_connection.quit() + def is_manager_runnning() -> bool: """ Check to see if the manager is running @@ -78,31 +80,32 @@ def is_manager_runnning() -> bool: redis_connection.quit() return manager_status["status"] == WorkerStatus.running and psutil.pid_exists(manager_status["pid"]) -def run_manager(query_frequency:int = 60, query_timeout:float = 0.5, worker_timeout:int = 180) -> bool: + +def run_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180) -> bool: """ A process locking function that calls the celery manager with proper arguments. :params: See CeleryManager for more information regarding the parameters """ - celerymanager = CeleryManager(query_frequency=query_frequency, - query_timeout=query_timeout, - worker_timeout=worker_timeout) + celerymanager = CeleryManager(query_frequency=query_frequency, query_timeout=query_timeout, worker_timeout=worker_timeout) celerymanager.run() - -def start_manager(query_frequency:int = 60, query_timeout:float = 0.5, worker_timeout:int = 180) -> bool: + +def start_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180) -> bool: """ A Non-locking function that calls the celery manager with proper arguments. :params: See CeleryManager for more information regarding the parameters :return bool: True if the manager was started successfully. """ - subprocess.Popen(f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}", + subprocess.Popen( + f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}", shell=True, close_fds=True, stdout=subprocess.PIPE, ) return True + def stop_manager() -> bool: """ Stop the manager process using it's pid. @@ -114,11 +117,10 @@ def stop_manager() -> bool: manager_status = redis_connection.hget("manager", "status") print(redis_connection.hgetall("manager")) redis_connection.quit() - + print(manager_status, psutil.pid_exists(manager_pid)) # Check to make sure that the manager is running and the pid exists if manager_status == WorkerStatus.running and psutil.pid_exists(manager_pid): psutil.Process(manager_pid).terminate() return True return False - From b6bcd3323256ef44d19fa0378569c03a9e877305 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Wed, 7 Aug 2024 14:00:04 -0700 Subject: [PATCH 14/35] Revert launch_job script that was edited when doing automated lint --- merlin/examples/workflows/null_spec/scripts/launch_jobs.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/merlin/examples/workflows/null_spec/scripts/launch_jobs.py b/merlin/examples/workflows/null_spec/scripts/launch_jobs.py index 99c3c3d6..a6b6d137 100644 --- a/merlin/examples/workflows/null_spec/scripts/launch_jobs.py +++ b/merlin/examples/workflows/null_spec/scripts/launch_jobs.py @@ -78,7 +78,9 @@ if real_time > 1440: real_time = 1440 submit: str = "submit.sbatch" - command: str = f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}" + command: str = ( + f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}" + ) shutil.copyfile(os.path.join(submit_path, submit), submit) shutil.copyfile(args.spec_path, "spec.yaml") shutil.copyfile(args.script_path, os.path.join("scripts", "make_samples.py")) From 9b97f8bcc4794c8efed5dc98e52eb56db9f4ee99 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Wed, 7 Aug 2024 14:07:15 -0700 Subject: [PATCH 15/35] Move importing of CONFIG to be within redis_connection due to error of config not being created yet --- merlin/study/celerymanager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 795ca10f..7335ae0a 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -35,7 +35,6 @@ import psutil import redis -from merlin.config.configfile import CONFIG from merlin.config.results_backend import get_backend_password @@ -89,6 +88,7 @@ def get_redis_connection(db_num): :return Redis: Redis connections object that can be used to access values for the manager. """ + from merlin.config.configfile import CONFIG password_file = CONFIG.results_backend.password try: password = get_backend_password(password_file) From c9dfd312f4bcc08b1444a8193f8dad62ff31215f Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Wed, 7 Aug 2024 14:08:55 -0700 Subject: [PATCH 16/35] Added space to fix style --- merlin/examples/workflows/null_spec/scripts/launch_jobs.py | 4 +--- merlin/study/celerymanager.py | 1 + 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/merlin/examples/workflows/null_spec/scripts/launch_jobs.py b/merlin/examples/workflows/null_spec/scripts/launch_jobs.py index a6b6d137..99c3c3d6 100644 --- a/merlin/examples/workflows/null_spec/scripts/launch_jobs.py +++ b/merlin/examples/workflows/null_spec/scripts/launch_jobs.py @@ -78,9 +78,7 @@ if real_time > 1440: real_time = 1440 submit: str = "submit.sbatch" - command: str = ( - f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}" - ) + command: str = f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}" shutil.copyfile(os.path.join(submit_path, submit), submit) shutil.copyfile(args.spec_path, "spec.yaml") shutil.copyfile(args.script_path, os.path.join("scripts", "make_samples.py")) diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 7335ae0a..ebc1924f 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -89,6 +89,7 @@ def get_redis_connection(db_num): :return Redis: Redis connections object that can be used to access values for the manager. """ from merlin.config.configfile import CONFIG + password_file = CONFIG.results_backend.password try: password = get_backend_password(password_file) From a9bd86564491295858de1dd5ee1a7ca2a619c93b Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Wed, 7 Aug 2024 14:09:43 -0700 Subject: [PATCH 17/35] Revert launch_jobs.py: --- merlin/examples/workflows/null_spec/scripts/launch_jobs.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/merlin/examples/workflows/null_spec/scripts/launch_jobs.py b/merlin/examples/workflows/null_spec/scripts/launch_jobs.py index 99c3c3d6..a6b6d137 100644 --- a/merlin/examples/workflows/null_spec/scripts/launch_jobs.py +++ b/merlin/examples/workflows/null_spec/scripts/launch_jobs.py @@ -78,7 +78,9 @@ if real_time > 1440: real_time = 1440 submit: str = "submit.sbatch" - command: str = f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}" + command: str = ( + f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}" + ) shutil.copyfile(os.path.join(submit_path, submit), submit) shutil.copyfile(args.spec_path, "spec.yaml") shutil.copyfile(args.script_path, os.path.join("scripts", "make_samples.py")) From ddc76142b61c4d2837a60339965d8a8d004ed95f Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Wed, 7 Aug 2024 14:13:05 -0700 Subject: [PATCH 18/35] Update import of all merlin.config to be in the function --- merlin/study/celerymanager.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index ebc1924f..6f2e697a 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -35,8 +35,6 @@ import psutil import redis -from merlin.config.results_backend import get_backend_password - class WorkerStatus: running = "Running" @@ -89,6 +87,7 @@ def get_redis_connection(db_num): :return Redis: Redis connections object that can be used to access values for the manager. """ from merlin.config.configfile import CONFIG + from merlin.config.results_backend import get_backend_password password_file = CONFIG.results_backend.password try: From 353a66b8497d43771bc03cf8d48cd68a1b62f590 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Fri, 16 Aug 2024 17:12:47 -0700 Subject: [PATCH 19/35] suggested changes plus beginning work on monitor/manager collab --- merlin/main.py | 96 +++++----- merlin/study/celeryadapter.py | 58 +++--- merlin/study/celerymanager.py | 268 +++++++++++++++------------ merlin/study/celerymanageradapter.py | 72 ++++--- 4 files changed, 276 insertions(+), 218 deletions(-) diff --git a/merlin/main.py b/merlin/main.py index e6bb7c0c..46683d27 100644 --- a/merlin/main.py +++ b/merlin/main.py @@ -402,6 +402,15 @@ def process_example(args: Namespace) -> None: def process_manager(args: Namespace): + """ + Process the command for managing the workers. + + This function interprets the command provided in the `args` namespace and + executes the corresponding manager function. It supports three commands: + "run", "start", and "stop". + + :param args: parsed CLI arguments + """ if args.command == "run": run_manager(query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout) elif args.command == "start": @@ -409,6 +418,8 @@ def process_manager(args: Namespace): query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout ): LOG.info("Manager started successfully.") + else: + LOG.error("Unable to start manager") elif args.command == "stop": if stop_manager(): LOG.info("Manager stopped successfully.") @@ -924,6 +935,41 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None: ) manager.set_defaults(func=process_manager) + def add_manager_options(manager_parser: ArgumentParser): + """ + Add shared options for manager subcommands. + + The `manager run` and `manager start` subcommands have the same options. + Rather than writing duplicate code for these we'll use this function + to add the arguments to these subcommands. + + :param manager_parser: The ArgumentParser object to add these options to + """ + manager_parser.add_argument( + "-qf", + "--query_frequency", + action="store", + type=int, + default=60, + help="The frequency at which workers will be queried for response.", + ) + manager_parser.add_argument( + "-qt", + "--query_timeout", + action="store", + type=float, + default=0.5, + help="The timeout for the query response that are sent to workers.", + ) + manager_parser.add_argument( + "-wt", + "--worker_timeout", + action="store", + type=int, + default=180, + help="The sum total (query_frequency*tries) time before an attempt is made to restart worker.", + ) + manager_commands: ArgumentParser = manager.add_subparsers(dest="command") manager_run = manager_commands.add_parser( "run", @@ -931,30 +977,7 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None: description="Run manager", formatter_class=ArgumentDefaultsHelpFormatter, ) - manager_run.add_argument( - "-qf", - "--query_frequency", - action="store", - type=int, - default=60, - help="The frequency at which workers will be queried for response.", - ) - manager_run.add_argument( - "-qt", - "--query_timeout", - action="store", - type=float, - default=0.5, - help="The timeout for the query response that are sent to workers.", - ) - manager_run.add_argument( - "-wt", - "--worker_timeout", - action="store", - type=int, - default=180, - help="The sum total(query_frequency*tries) time before an attempt is made to restart worker.", - ) + add_manager_options(manager_run) manager_run.set_defaults(func=process_manager) manager_start = manager_commands.add_parser( "start", @@ -962,30 +985,7 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None: description="Start manager", formatter_class=ArgumentDefaultsHelpFormatter, ) - manager_start.add_argument( - "-qf", - "--query_frequency", - action="store", - type=int, - default=60, - help="The frequency at which workers will be queried for response.", - ) - manager_start.add_argument( - "-qt", - "--query_timeout", - action="store", - type=float, - default=0.5, - help="The timeout for the query response that are sent to workers.", - ) - manager_start.add_argument( - "-wt", - "--worker_timeout", - action="store", - type=int, - default=180, - help="The sum total(query_frequency*tries) time before an attempt is made to restart worker.", - ) + add_manager_options(manager_start) manager_start.set_defaults(func=process_manager) manager_stop = manager_commands.add_parser( "stop", diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 6c09590a..e392b179 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -502,15 +502,22 @@ def check_celery_workers_processing(queues_in_spec: List[str], app: Celery) -> b """ # Query celery for active tasks active_tasks = app.control.inspect().active() + result = False - # Search for the queues we provided if necessary - if active_tasks is not None: - for tasks in active_tasks.values(): - for task in tasks: - if task["delivery_info"]["routing_key"] in queues_in_spec: - return True + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + # Search for the queues we provided if necessary + if active_tasks is not None: + for worker, tasks in active_tasks.items(): + for task in tasks: + if task["delivery_info"]["routing_key"] in queues_in_spec: + result = True - return False + # Set the entry in the Redis DB for the manager to signify if the worker + # is still doing work + worker_still_processing = 1 if result else 0 + redis_connection.hset(worker, "processing_work", worker_still_processing) + + return result def _get_workers_to_start(spec, steps): @@ -771,25 +778,24 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs): worker_list.append(worker_cmd) # Adding the worker args to redis db - redis_connection = CeleryManager.get_worker_args_redis_connection() - args = kwargs - # Save worker command with the arguements - args["worker_cmd"] = worker_cmd - # Store the nested dictionaries into a separate key with a link. - # Note: This only support single nested dicts(for simplicity) and - # further nesting can be accomplished by making this recursive. - for key in kwargs: - if type(kwargs[key]) is dict: - key_name = worker_name + "_" + key - redis_connection.hmset(name=key_name, mapping=kwargs[key]) - args[key] = "link:" + key_name - if type(kwargs[key]) is bool: - if kwargs[key]: - args[key] = "True" - else: - args[key] = "False" - redis_connection.hmset(name=worker_name, mapping=args) - redis_connection.quit() + with CeleryManager.get_worker_args_redis_connection() as redis_connection: + args = kwargs + # Save worker command with the arguements + args["worker_cmd"] = worker_cmd + # Store the nested dictionaries into a separate key with a link. + # Note: This only support single nested dicts(for simplicity) and + # further nesting can be accomplished by making this recursive. + for key in kwargs: + if type(kwargs[key]) is dict: + key_name = worker_name + "_" + key + redis_connection.hmset(name=key_name, mapping=kwargs[key]) + args[key] = "link:" + key_name + if type(kwargs[key]) is bool: + if kwargs[key]: + args[key] = "True" + else: + args[key] = "False" + redis_connection.hmset(name=worker_name, mapping=args) # Adding the worker to redis db to be monitored add_monitor_workers(workers=((worker_name, process.pid),)) diff --git a/merlin/study/celerymanager.py b/merlin/study/celerymanager.py index 6f2e697a..ddefab02 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/study/celerymanager.py @@ -27,7 +27,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. ############################################################################### - +import logging import os import subprocess import time @@ -36,6 +36,9 @@ import redis +LOG = logging.getLogger(__name__) + + class WorkerStatus: running = "Running" stalled = "Stalled" @@ -48,68 +51,89 @@ class WorkerStatus: "pid": -1, "monitored": 1, "num_unresponsive": 0, + "processing_work": 1, } +class RedisConnectionManager: + """ + A context manager for handling redis connections. + This will ensure safe opening and closing of Redis connections. + """ + + def __init__(self, db_num: int): + self.db_num = db_num + self.connection = None + + def __enter__(self): + self.connection = self.get_redis_connection() + return self.connection + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.connection: + LOG.debug(f"MANAGER: Closing connection at db_num: {self.db_num}") + self.connection.close() + + def get_redis_connection(self) -> redis.Redis: + """ + Generic redis connection function to get the results backend redis server with a given db number increment. + + :return: Redis connection object that can be used to access values for the manager. + """ + # from merlin.config.results_backend import get_backend_password + from merlin.config import results_backend + from merlin.config.configfile import CONFIG + + conn_string = results_backend.get_connection_string() + base, _ = conn_string.rsplit("/", 1) + new_db_num = CONFIG.results_backend.db_num + self.db_num + conn_string = f"{base}/{new_db_num}" + LOG.debug(f"MANAGER: Connecting to redis at db_num: {new_db_num}") + return redis.from_url(conn_string, decode_responses=True) + # password_file = CONFIG.results_backend.password + # try: + # password = get_backend_password(password_file) + # except IOError: + # password = CONFIG.results_backend.password + # return redis.Redis( + # host=CONFIG.results_backend.server, + # port=CONFIG.results_backend.port, + # db=CONFIG.results_backend.db_num + self.db_num, # Increment db_num to avoid conflicts + # username=CONFIG.results_backend.username, + # password=password, + # decode_responses=True, + # ) + + class CeleryManager: def __init__(self, query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180): """ Initializer for Celery Manager - @param int query_frequency: The frequency at which workers will be queried with ping commands - @param float query_timeout: The timeout for the query pings that are sent to workers - @param int worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. + + :param query_frequency: The frequency at which workers will be queried with ping commands + :param query_timeout: The timeout for the query pings that are sent to workers + :param worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. """ - self.redis_connection = self.get_worker_status_redis_connection() self.query_frequency = query_frequency self.query_timeout = query_timeout self.worker_timeout = worker_timeout @staticmethod - def get_worker_status_redis_connection(): - """ - Get the redis connection for info regarding the worker and manager status. - """ - return CeleryManager.get_redis_connection(1) - - @staticmethod - def get_worker_args_redis_connection(): - """ - Get the redis connection for info regarding the args used to generate each worker. - """ - return CeleryManager.get_redis_connection(2) + def get_worker_status_redis_connection() -> RedisConnectionManager: + """Get the redis connection for info regarding the worker and manager status.""" + return RedisConnectionManager(1) @staticmethod - def get_redis_connection(db_num): - """ - Generic redis connection function to get the results backend redis server with a given db number increment. - :param int db_num: Increment number for the db from the one provided in the config file. + def get_worker_args_redis_connection() -> RedisConnectionManager: + """Get the redis connection for info regarding the args used to generate each worker.""" + return RedisConnectionManager(2) - :return Redis: Redis connections object that can be used to access values for the manager. - """ - from merlin.config.configfile import CONFIG - from merlin.config.results_backend import get_backend_password - - password_file = CONFIG.results_backend.password - try: - password = get_backend_password(password_file) - except IOError: - password = CONFIG.results_backend.password - return redis.Redis( - host=CONFIG.results_backend.server, - port=CONFIG.results_backend.port, - db=CONFIG.results_backend.db_num + db_num, # Increment db_num to avoid conflicts - username=CONFIG.results_backend.username, - password=password, - decode_responses=True, - ) - - def get_celery_workers_status(self, workers): + def get_celery_workers_status(self, workers: list) -> dict: """ Get the worker status of a current worker that is being managed - :param CeleryManager self: CeleryManager attempting the stop. - :param list workers: Workers that are checked. - :return dict: The result dictionary for each worker and the response. + :param workers: Workers that are checked. + :return: The result dictionary for each worker and the response. """ from merlin.celery import app @@ -118,20 +142,19 @@ def get_celery_workers_status(self, workers): worker_results = {worker: status for d in ping_result for worker, status in d.items()} return worker_results - def stop_celery_worker(self, worker): + def stop_celery_worker(self, worker: str) -> bool: """ Stop a celery worker by kill the worker with pid - :param CeleryManager self: CeleryManager attempting the stop. - :param str worker: Worker that is being stopped. - :return bool: The result of whether a worker was stopped. + :param worker: Worker that is being stopped. + :return: The result of whether a worker was stopped. """ - # Get the PID associated with the pid - worker_status_connect = self.get_worker_status_redis_connection() - worker_pid = int(worker_status_connect.hget(worker, "pid")) - worker_status = worker_status_connect.hget(worker, "status") - worker_status_connect.quit() + # Get the PID associated with the worker + with self.get_worker_status_redis_connection() as worker_status_connect: + worker_pid = int(worker_status_connect.hget(worker, "pid")) + worker_status = worker_status_connect.hget(worker, "status") + # Check to see if the pid exists and worker is set as running if worker_status == WorkerStatus.running and psutil.pid_exists(worker_pid): # Check to see if the pid is associated with celery @@ -142,87 +165,100 @@ def stop_celery_worker(self, worker): return True return False - def restart_celery_worker(self, worker): + def restart_celery_worker(self, worker: str) -> bool: """ Restart a celery worker with the same arguements and parameters during its creation - :param CeleryManager self: CeleryManager attempting the stop. - :param str worker: Worker that is being restarted. - :return bool: The result of whether a worker was restarted. + :param worker: Worker that is being restarted. + :return: The result of whether a worker was restarted. """ # Stop the worker that is currently running if not self.stop_celery_worker(worker): return False + # Start the worker again with the args saved in redis db - worker_args_connect = self.get_worker_args_redis_connection() - worker_status_connect = self.get_worker_status_redis_connection() - # Get the args and remove the worker_cmd from the hash set - args = worker_args_connect.hgetall(worker) - worker_cmd = args["worker_cmd"] - del args["worker_cmd"] - kwargs = args - for key in args: - if args[key].startswith("link:"): - kwargs[key] = worker_args_connect.hgetall(args[key].split(":", 1)[1]) - elif args[key] == "True": - kwargs[key] = True - elif args[key] == "False": - kwargs[key] = False - - # Run the subprocess for the worker and save the PID - process = subprocess.Popen(worker_cmd, **kwargs) - worker_status_connect.hset(worker, "pid", process.pid) - - worker_args_connect.quit() - worker_status_connect.quit() + with ( + self.get_worker_args_redis_connection() as worker_args_connect, + self.get_worker_status_redis_connection() as worker_status_connect, + ): + # Get the args and remove the worker_cmd from the hash set + args = worker_args_connect.hgetall(worker) + worker_cmd = args["worker_cmd"] + del args["worker_cmd"] + kwargs = args + for key in args: + if args[key].startswith("link:"): + kwargs[key] = worker_args_connect.hgetall(args[key].split(":", 1)[1]) + elif args[key] == "True": + kwargs[key] = True + elif args[key] == "False": + kwargs[key] = False + + # Run the subprocess for the worker and save the PID + process = subprocess.Popen(worker_cmd, **kwargs) + worker_status_connect.hset(worker, "pid", process.pid) return True - # TODO add some logs def run(self): """ - Main manager loop - """ + Main manager loop for monitoring and managing Celery workers. + This method continuously monitors the status of Celery workers by + checking their health and attempting to restart any that are + unresponsive. It updates the Redis database with the current + status of the manager and the workers. + """ manager_info = { "status": "Running", - "process id": os.getpid(), + "pid": os.getpid(), } - self.redis_connection.hmset(name="manager", mapping=manager_info) - - while True: # TODO Make it so that it will stop after a list of workers is stopped - # Get the list of running workers - workers = self.redis_connection.keys() - workers.remove("manager") - workers = [worker for worker in workers if int(self.redis_connection.hget(worker, "monitored"))] - print(f"Monitoring {workers} workers") - - # Check/ Ping each worker to see if they are still running - if workers: - worker_results = self.get_celery_workers_status(workers) - - # If running set the status on redis that it is running - for worker in list(worker_results.keys()): - self.redis_connection.hset(worker, "status", WorkerStatus.running) - - # If not running attempt to restart it - for worker in workers: - if worker not in worker_results: - # If time where the worker is unresponsive is less than the worker time out then just increment - num_unresponsive = int(self.redis_connection.hget(worker, "num_unresponsive")) + 1 - if num_unresponsive * self.query_frequency < self.worker_timeout: - # Attempt to restart worker - if self.restart_celery_worker(worker): - # If successful set the status to running and reset num_unresponsive - self.redis_connection.hset(worker, "status", WorkerStatus.running) - self.redis_connection.hset(worker, "num_unresponsive", 0) - # If failed set the status to stalled - self.redis_connection.hset(worker, "status", WorkerStatus.stalled) - else: - self.redis_connection.hset(worker, "num_unresponsive", num_unresponsive) - # Sleep for the query_frequency for the next iteration - time.sleep(self.query_frequency) + + with self.get_worker_status_redis_connection() as redis_connection: + LOG.debug(f"MANAGER: setting manager key in redis to hold the following info {manager_info}") + redis_connection.hmset(name="manager", mapping=manager_info) + + # TODO figure out what to do with "processing_work" entry for the merlin monitor + while True: # TODO Make it so that it will stop after a list of workers is stopped + # Get the list of running workers + workers = redis_connection.keys() + LOG.debug(f"MANAGER: workers: {workers}") + workers.remove("manager") + workers = [worker for worker in workers if int(redis_connection.hget(worker, "monitored"))] + LOG.info(f"MANAGER: Monitoring {workers} workers") + + # Check/ Ping each worker to see if they are still running + if workers: + worker_results = self.get_celery_workers_status(workers) + + # If running set the status on redis that it is running + LOG.info(f"MANAGER: Responsive workers: {worker_results.keys()}") + for worker in list(worker_results.keys()): + redis_connection.hset(worker, "status", WorkerStatus.running) + + # If not running attempt to restart it + for worker in workers: + if worker not in worker_results: + LOG.info(f"MANAGER: Worker '{worker}' is unresponsive.") + # If time where the worker is unresponsive is less than the worker time out then just increment + num_unresponsive = int(redis_connection.hget(worker, "num_unresponsive")) + 1 + if num_unresponsive * self.query_frequency < self.worker_timeout: + # Attempt to restart worker + LOG.info(f"MANAGER: Attempting to restart worker '{worker}'...") + if self.restart_celery_worker(worker): + # If successful set the status to running and reset num_unresponsive + redis_connection.hset(worker, "status", WorkerStatus.running) + redis_connection.hset(worker, "num_unresponsive", 0) + # If failed set the status to stalled + redis_connection.hset(worker, "status", WorkerStatus.stalled) + LOG.info(f"MANAGER: Worker '{worker}' restarted.") + else: + LOG.error(f"MANAGER: Could not restart worker '{worker}'.") + else: + redis_connection.hset(worker, "num_unresponsive", num_unresponsive) + # Sleep for the query_frequency for the next iteration + time.sleep(self.query_frequency) if __name__ == "__main__": diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py index a433a8ca..d195eb96 100644 --- a/merlin/study/celerymanageradapter.py +++ b/merlin/study/celerymanageradapter.py @@ -27,6 +27,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. ############################################################################### +import logging import subprocess import psutil @@ -34,6 +35,9 @@ from merlin.study.celerymanager import WORKER_INFO, CeleryManager, WorkerStatus +LOG = logging.getLogger(__name__) + + def add_monitor_workers(workers: list): """ Adds workers to be monitored by the celery manager. @@ -42,15 +46,25 @@ def add_monitor_workers(workers: list): if workers is None or len(workers) <= 0: return - redis_connection = CeleryManager.get_worker_status_redis_connection() - for worker in workers: - if redis_connection.exists(worker[0]): - redis_connection.hset(worker[0], "monitored", 1) - redis_connection.hset(worker[0], "pid", worker[1]) - worker_info = WORKER_INFO - worker_info["pid"] = worker[1] - redis_connection.hmset(name=worker[0], mapping=worker_info) - redis_connection.quit() + LOG.info( + f"MANAGER: Attempting to have the manager monitor the following workers {[worker_name for worker_name, _ in workers]}." + ) + monitored_workers = [] + + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + for worker in workers: + LOG.debug(f"MANAGER: Checking if connection for worker '{worker}' exists...") + if redis_connection.exists(worker[0]): + LOG.debug(f"MANAGER: Connection for worker '{worker}' exists. Setting this worker to be monitored") + redis_connection.hset(worker[0], "monitored", 1) + redis_connection.hset(worker[0], "pid", worker[1]) + monitored_workers.append(worker[0]) + else: + LOG.debug(f"MANAGER: Connection for worker '{worker}' does not exist. Not monitoring this worker.") + worker_info = WORKER_INFO + worker_info["pid"] = worker[1] + redis_connection.hmset(name=worker[0], mapping=worker_info) + LOG.info(f"MANAGER: Manager is monitoring the following workers {monitored_workers}.") def remove_monitor_workers(workers: list): @@ -60,13 +74,11 @@ def remove_monitor_workers(workers: list): """ if workers is None or len(workers) <= 0: return - redis_connection = CeleryManager.get_worker_status_redis_connection() - for worker in workers: - if redis_connection.exists(worker): - redis_connection.hset(worker, "monitored", 0) - redis_connection.hset(worker, "status", WorkerStatus.stopped) - - redis_connection.quit() + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + for worker in workers: + if redis_connection.exists(worker): + redis_connection.hset(worker, "monitored", 0) + redis_connection.hset(worker, "status", WorkerStatus.stopped) def is_manager_runnning() -> bool: @@ -75,16 +87,18 @@ def is_manager_runnning() -> bool: :return: True if manager is running and False if not. """ - redis_connection = CeleryManager.get_worker_args_redis_connection() - manager_status = redis_connection.hgetall("manager") - redis_connection.quit() + with CeleryManager.get_worker_args_redis_connection() as redis_connection: + manager_status = redis_connection.hgetall("manager") return manager_status["status"] == WorkerStatus.running and psutil.pid_exists(manager_status["pid"]) def run_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180) -> bool: """ A process locking function that calls the celery manager with proper arguments. - :params: See CeleryManager for more information regarding the parameters + + :param query_frequency: The frequency at which workers will be queried with ping commands + :param query_timeout: The timeout for the query pings that are sent to workers + :param worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. """ celerymanager = CeleryManager(query_frequency=query_frequency, query_timeout=query_timeout, worker_timeout=worker_timeout) celerymanager.run() @@ -93,9 +107,11 @@ def run_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_ti def start_manager(query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180) -> bool: """ A Non-locking function that calls the celery manager with proper arguments. - :params: See CeleryManager for more information regarding the parameters - :return bool: True if the manager was started successfully. + :param query_frequency: The frequency at which workers will be queried with ping commands + :param query_timeout: The timeout for the query pings that are sent to workers + :param worker_timeout: The sum total(query_frequency*tries) time before an attempt is made to restart worker. + :return bool: True if the manager was started successfully. """ subprocess.Popen( f"merlin manager run -qf {query_frequency} -qt {query_timeout} -wt {worker_timeout}", @@ -112,13 +128,13 @@ def stop_manager() -> bool: :return bool: True if the manager was stopped successfully and False otherwise. """ - redis_connection = CeleryManager.get_worker_status_redis_connection() - manager_pid = int(redis_connection.hget("manager", "pid")) - manager_status = redis_connection.hget("manager", "status") - print(redis_connection.hgetall("manager")) - redis_connection.quit() + with CeleryManager.get_worker_status_redis_connection() as redis_connection: + LOG.debug(f"MANAGER: manager keys: {redis_connection.hgetall('manager')}") + manager_pid = int(redis_connection.hget("manager", "pid")) + manager_status = redis_connection.hget("manager", "status") + LOG.debug(f"MANAGER: manager_status: {manager_status}") + LOG.debug(f"MANAGER: pid exists: {psutil.pid_exists(manager_pid)}") - print(manager_status, psutil.pid_exists(manager_pid)) # Check to make sure that the manager is running and the pid exists if manager_status == WorkerStatus.running and psutil.pid_exists(manager_pid): psutil.Process(manager_pid).terminate() From 1a4d416f97ead9725e0ca636d9b0f61141e370dd Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Thu, 22 Aug 2024 14:49:02 -0700 Subject: [PATCH 20/35] move managers to their own folder and fix ssl problems --- merlin/managers/__init__.py | 0 merlin/{study => managers}/celerymanager.py | 97 +++++++++++---------- merlin/managers/redis_connection.py | 81 +++++++++++++++++ merlin/study/celeryadapter.py | 2 +- merlin/study/celerymanageradapter.py | 2 +- 5 files changed, 132 insertions(+), 50 deletions(-) create mode 100644 merlin/managers/__init__.py rename merlin/{study => managers}/celerymanager.py (81%) create mode 100644 merlin/managers/redis_connection.py diff --git a/merlin/managers/__init__.py b/merlin/managers/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/merlin/study/celerymanager.py b/merlin/managers/celerymanager.py similarity index 81% rename from merlin/study/celerymanager.py rename to merlin/managers/celerymanager.py index ddefab02..99914545 100644 --- a/merlin/study/celerymanager.py +++ b/merlin/managers/celerymanager.py @@ -35,6 +35,7 @@ import psutil import redis +from merlin.managers.redis_connection import RedisConnectionManager LOG = logging.getLogger(__name__) @@ -55,54 +56,54 @@ class WorkerStatus: } -class RedisConnectionManager: - """ - A context manager for handling redis connections. - This will ensure safe opening and closing of Redis connections. - """ - - def __init__(self, db_num: int): - self.db_num = db_num - self.connection = None - - def __enter__(self): - self.connection = self.get_redis_connection() - return self.connection - - def __exit__(self, exc_type, exc_val, exc_tb): - if self.connection: - LOG.debug(f"MANAGER: Closing connection at db_num: {self.db_num}") - self.connection.close() - - def get_redis_connection(self) -> redis.Redis: - """ - Generic redis connection function to get the results backend redis server with a given db number increment. - - :return: Redis connection object that can be used to access values for the manager. - """ - # from merlin.config.results_backend import get_backend_password - from merlin.config import results_backend - from merlin.config.configfile import CONFIG - - conn_string = results_backend.get_connection_string() - base, _ = conn_string.rsplit("/", 1) - new_db_num = CONFIG.results_backend.db_num + self.db_num - conn_string = f"{base}/{new_db_num}" - LOG.debug(f"MANAGER: Connecting to redis at db_num: {new_db_num}") - return redis.from_url(conn_string, decode_responses=True) - # password_file = CONFIG.results_backend.password - # try: - # password = get_backend_password(password_file) - # except IOError: - # password = CONFIG.results_backend.password - # return redis.Redis( - # host=CONFIG.results_backend.server, - # port=CONFIG.results_backend.port, - # db=CONFIG.results_backend.db_num + self.db_num, # Increment db_num to avoid conflicts - # username=CONFIG.results_backend.username, - # password=password, - # decode_responses=True, - # ) +# class RedisConnectionManager: +# """ +# A context manager for handling redis connections. +# This will ensure safe opening and closing of Redis connections. +# """ + +# def __init__(self, db_num: int): +# self.db_num = db_num +# self.connection = None + +# def __enter__(self): +# self.connection = self.get_redis_connection() +# return self.connection + +# def __exit__(self, exc_type, exc_val, exc_tb): +# if self.connection: +# LOG.debug(f"MANAGER: Closing connection at db_num: {self.db_num}") +# self.connection.close() + +# def get_redis_connection(self) -> redis.Redis: +# """ +# Generic redis connection function to get the results backend redis server with a given db number increment. + +# :return: Redis connection object that can be used to access values for the manager. +# """ +# # from merlin.config.results_backend import get_backend_password +# from merlin.config import results_backend +# from merlin.config.configfile import CONFIG + +# conn_string = results_backend.get_connection_string() +# base, _ = conn_string.rsplit("/", 1) +# new_db_num = CONFIG.results_backend.db_num + self.db_num +# conn_string = f"{base}/{new_db_num}" +# LOG.debug(f"MANAGER: Connecting to redis at db_num: {new_db_num}") +# return redis.from_url(conn_string, decode_responses=True) +# # password_file = CONFIG.results_backend.password +# # try: +# # password = get_backend_password(password_file) +# # except IOError: +# # password = CONFIG.results_backend.password +# # return redis.Redis( +# # host=CONFIG.results_backend.server, +# # port=CONFIG.results_backend.port, +# # db=CONFIG.results_backend.db_num + self.db_num, # Increment db_num to avoid conflicts +# # username=CONFIG.results_backend.username, +# # password=password, +# # decode_responses=True, +# # ) class CeleryManager: diff --git a/merlin/managers/redis_connection.py b/merlin/managers/redis_connection.py new file mode 100644 index 00000000..8749fcb6 --- /dev/null +++ b/merlin/managers/redis_connection.py @@ -0,0 +1,81 @@ +############################################################################### +# Copyright (c) 2023, Lawrence Livermore National Security, LLC. +# Produced at the Lawrence Livermore National Laboratory +# Written by the Merlin dev team, listed in the CONTRIBUTORS file. +# +# +# LLNL-CODE-797170 +# All rights reserved. +# This file is part of Merlin, Version: 1.12.2b1. +# +# For details, see https://github.com/LLNL/merlin. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +############################################################################### +""" +This module stores a manager for redis connections. +""" +import logging +import redis + +LOG = logging.getLogger(__name__) + + +class RedisConnectionManager: + """ + A context manager for handling redis connections. + This will ensure safe opening and closing of Redis connections. + """ + + def __init__(self, db_num: int): + self.db_num = db_num + self.connection = None + + def __enter__(self): + self.connection = self.get_redis_connection() + return self.connection + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.connection: + LOG.debug(f"MANAGER: Closing connection at db_num: {self.db_num}") + self.connection.close() + + def get_redis_connection(self) -> redis.Redis: + """ + Generic redis connection function to get the results backend redis server with a given db number increment. + + :return: Redis connection object that can be used to access values for the manager. + """ + from merlin.config.results_backend import get_backend_password + from merlin.config.configfile import CONFIG + + password_file = CONFIG.results_backend.password + try: + password = get_backend_password(password_file) + except IOError: + password = CONFIG.results_backend.password + return redis.Redis( + host=CONFIG.results_backend.server, + port=CONFIG.results_backend.port, + db=CONFIG.results_backend.db_num + self.db_num, # Increment db_num to avoid conflicts + username=CONFIG.results_backend.username, + password=password, + decode_responses=True, + ssl=True, + ssl_cert_reqs=CONFIG.results_backend.cert_reqs, + ) diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index e392b179..651cd7a4 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -46,8 +46,8 @@ from merlin.common.dumper import dump_handler from merlin.config import Config +from merlin.managers.celerymanager import CeleryManager from merlin.study.batch import batch_check_parallel, batch_worker_launch -from merlin.study.celerymanager import CeleryManager from merlin.study.celerymanageradapter import add_monitor_workers, remove_monitor_workers from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py index d195eb96..31072d23 100644 --- a/merlin/study/celerymanageradapter.py +++ b/merlin/study/celerymanageradapter.py @@ -32,7 +32,7 @@ import psutil -from merlin.study.celerymanager import WORKER_INFO, CeleryManager, WorkerStatus +from merlin.managers.celerymanager import WORKER_INFO, CeleryManager, WorkerStatus LOG = logging.getLogger(__name__) From 875f13792f811dd693143f30b6ab192c8abefc32 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Tue, 3 Sep 2024 14:11:29 -0700 Subject: [PATCH 21/35] final PR touch ups --- merlin/main.py | 13 +++--- merlin/managers/celerymanager.py | 64 +++------------------------- merlin/router.py | 5 ++- merlin/study/celeryadapter.py | 7 +-- merlin/study/celerymanageradapter.py | 11 ++++- 5 files changed, 30 insertions(+), 70 deletions(-) diff --git a/merlin/main.py b/merlin/main.py index 46683d27..56fcbd5a 100644 --- a/merlin/main.py +++ b/merlin/main.py @@ -360,7 +360,7 @@ def stop_workers(args): LOG.warning(f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?") # Send stop command to router - router.stop_workers(args.task_server, worker_names, args.queues, args.workers) + router.stop_workers(args.task_server, worker_names, args.queues, args.workers, args.level.upper()) def print_info(args): @@ -414,12 +414,13 @@ def process_manager(args: Namespace): if args.command == "run": run_manager(query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout) elif args.command == "start": - if start_manager( - query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout - ): + try: + start_manager( + query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout + ) LOG.info("Manager started successfully.") - else: - LOG.error("Unable to start manager") + except Exception as e: + LOG.error(f"Unable to start manager.\n{e}") elif args.command == "stop": if stop_manager(): LOG.info("Manager stopped successfully.") diff --git a/merlin/managers/celerymanager.py b/merlin/managers/celerymanager.py index 99914545..e6b82085 100644 --- a/merlin/managers/celerymanager.py +++ b/merlin/managers/celerymanager.py @@ -50,62 +50,12 @@ class WorkerStatus: WORKER_INFO = { "status": WorkerStatus.running, "pid": -1, - "monitored": 1, + "monitored": 1, # This setting is for debug mode "num_unresponsive": 0, "processing_work": 1, } -# class RedisConnectionManager: -# """ -# A context manager for handling redis connections. -# This will ensure safe opening and closing of Redis connections. -# """ - -# def __init__(self, db_num: int): -# self.db_num = db_num -# self.connection = None - -# def __enter__(self): -# self.connection = self.get_redis_connection() -# return self.connection - -# def __exit__(self, exc_type, exc_val, exc_tb): -# if self.connection: -# LOG.debug(f"MANAGER: Closing connection at db_num: {self.db_num}") -# self.connection.close() - -# def get_redis_connection(self) -> redis.Redis: -# """ -# Generic redis connection function to get the results backend redis server with a given db number increment. - -# :return: Redis connection object that can be used to access values for the manager. -# """ -# # from merlin.config.results_backend import get_backend_password -# from merlin.config import results_backend -# from merlin.config.configfile import CONFIG - -# conn_string = results_backend.get_connection_string() -# base, _ = conn_string.rsplit("/", 1) -# new_db_num = CONFIG.results_backend.db_num + self.db_num -# conn_string = f"{base}/{new_db_num}" -# LOG.debug(f"MANAGER: Connecting to redis at db_num: {new_db_num}") -# return redis.from_url(conn_string, decode_responses=True) -# # password_file = CONFIG.results_backend.password -# # try: -# # password = get_backend_password(password_file) -# # except IOError: -# # password = CONFIG.results_backend.password -# # return redis.Redis( -# # host=CONFIG.results_backend.server, -# # port=CONFIG.results_backend.port, -# # db=CONFIG.results_backend.db_num + self.db_num, # Increment db_num to avoid conflicts -# # username=CONFIG.results_backend.username, -# # password=password, -# # decode_responses=True, -# # ) - - class CeleryManager: def __init__(self, query_frequency: int = 60, query_timeout: float = 0.5, worker_timeout: int = 180): """ @@ -156,6 +106,7 @@ def stop_celery_worker(self, worker: str) -> bool: worker_pid = int(worker_status_connect.hget(worker, "pid")) worker_status = worker_status_connect.hget(worker, "status") + # TODO be wary of stalled state workers (should not happen since we use psutil.Process.kill()) # Check to see if the pid exists and worker is set as running if worker_status == WorkerStatus.running and psutil.pid_exists(worker_pid): # Check to see if the pid is associated with celery @@ -174,9 +125,8 @@ def restart_celery_worker(self, worker: str) -> bool: :return: The result of whether a worker was restarted. """ - # Stop the worker that is currently running - if not self.stop_celery_worker(worker): - return False + # Stop the worker that is currently running (if possible) + self.stop_celery_worker(worker) # Start the worker again with the args saved in redis db with ( @@ -218,7 +168,7 @@ def run(self): with self.get_worker_status_redis_connection() as redis_connection: LOG.debug(f"MANAGER: setting manager key in redis to hold the following info {manager_info}") - redis_connection.hmset(name="manager", mapping=manager_info) + redis_connection.hset("manager", mapping=manager_info) # TODO figure out what to do with "processing_work" entry for the merlin monitor while True: # TODO Make it so that it will stop after a list of workers is stopped @@ -251,10 +201,10 @@ def run(self): # If successful set the status to running and reset num_unresponsive redis_connection.hset(worker, "status", WorkerStatus.running) redis_connection.hset(worker, "num_unresponsive", 0) - # If failed set the status to stalled - redis_connection.hset(worker, "status", WorkerStatus.stalled) LOG.info(f"MANAGER: Worker '{worker}' restarted.") else: + # If failed set the status to stalled + redis_connection.hset(worker, "status", WorkerStatus.stalled) LOG.error(f"MANAGER: Could not restart worker '{worker}'.") else: redis_connection.hset(worker, "num_unresponsive", num_unresponsive) diff --git a/merlin/router.py b/merlin/router.py index d9114bbc..9747b7c4 100644 --- a/merlin/router.py +++ b/merlin/router.py @@ -190,7 +190,7 @@ def get_workers(task_server): return [] -def stop_workers(task_server, spec_worker_names, queues, workers_regex): +def stop_workers(task_server, spec_worker_names, queues, workers_regex, debug_lvl): """ Stops workers. @@ -198,12 +198,13 @@ def stop_workers(task_server, spec_worker_names, queues, workers_regex): :param `spec_worker_names`: Worker names to stop, drawn from a spec. :param `queues` : The queues to stop :param `workers_regex` : Regex for workers to stop + :param debug_lvl: The debug level to use (INFO, DEBUG, ERROR, etc.) """ LOG.info("Stopping workers...") if task_server == "celery": # pylint: disable=R1705 # Stop workers - stop_celery_workers(queues, spec_worker_names, workers_regex) + stop_celery_workers(queues, spec_worker_names, workers_regex, debug_lvl) else: LOG.error("Celery is not specified as the task server!") diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 651cd7a4..510e5a04 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -46,7 +46,7 @@ from merlin.common.dumper import dump_handler from merlin.config import Config -from merlin.managers.celerymanager import CeleryManager +from merlin.managers.celerymanager import CeleryManager, WorkerStatus from merlin.study.batch import batch_check_parallel, batch_worker_launch from merlin.study.celerymanageradapter import add_monitor_workers, remove_monitor_workers from merlin.utils import apply_list_of_regex, check_machines, get_procs, get_yaml_var, is_running @@ -838,7 +838,7 @@ def purge_celery_tasks(queues, force): return subprocess.run(purge_command, shell=True).returncode -def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): # pylint: disable=R0912 +def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None, debug_lvl="INFO"): # pylint: disable=R0912 """Send a stop command to celery workers. Default behavior is to stop all connected workers. @@ -903,7 +903,8 @@ def stop_celery_workers(queues=None, spec_worker_names=None, worker_regex=None): if workers_to_stop: LOG.info(f"Sending stop to these workers: {workers_to_stop}") app.control.broadcast("shutdown", destination=workers_to_stop) - remove_monitor_workers(workers=workers_to_stop) + remove_entry = False if debug_lvl == "DEBUG" else True + remove_monitor_workers(workers=workers_to_stop, worker_status=WorkerStatus.stopped, remove_entry=remove_entry) else: LOG.warning("No workers found to stop") diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py index 31072d23..81f1a324 100644 --- a/merlin/study/celerymanageradapter.py +++ b/merlin/study/celerymanageradapter.py @@ -67,7 +67,11 @@ def add_monitor_workers(workers: list): LOG.info(f"MANAGER: Manager is monitoring the following workers {monitored_workers}.") -def remove_monitor_workers(workers: list): +def remove_monitor_workers( + workers: list, + worker_status: WorkerStatus = None, + remove_entry: bool = True +): """ Remove workers from being monitored by the celery manager. :param list workers: A worker names @@ -78,7 +82,10 @@ def remove_monitor_workers(workers: list): for worker in workers: if redis_connection.exists(worker): redis_connection.hset(worker, "monitored", 0) - redis_connection.hset(worker, "status", WorkerStatus.stopped) + if worker_status is not None: + redis_connection.hset(worker, "status", worker_status) + if remove_entry: + redis_connection.delete(worker) def is_manager_runnning() -> bool: From 58da9bc173c0983f2b19d40ca21b12e8a79093d4 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Tue, 3 Sep 2024 14:16:42 -0700 Subject: [PATCH 22/35] Fix lint style changes --- merlin/managers/celerymanager.py | 8 ++++---- merlin/managers/redis_connection.py | 4 +++- merlin/study/celerymanageradapter.py | 6 +----- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/merlin/managers/celerymanager.py b/merlin/managers/celerymanager.py index e6b82085..2f1b99c0 100644 --- a/merlin/managers/celerymanager.py +++ b/merlin/managers/celerymanager.py @@ -33,10 +33,10 @@ import time import psutil -import redis from merlin.managers.redis_connection import RedisConnectionManager + LOG = logging.getLogger(__name__) @@ -156,9 +156,9 @@ def run(self): """ Main manager loop for monitoring and managing Celery workers. - This method continuously monitors the status of Celery workers by - checking their health and attempting to restart any that are - unresponsive. It updates the Redis database with the current + This method continuously monitors the status of Celery workers by + checking their health and attempting to restart any that are + unresponsive. It updates the Redis database with the current status of the manager and the workers. """ manager_info = { diff --git a/merlin/managers/redis_connection.py b/merlin/managers/redis_connection.py index 8749fcb6..e9e947df 100644 --- a/merlin/managers/redis_connection.py +++ b/merlin/managers/redis_connection.py @@ -31,8 +31,10 @@ This module stores a manager for redis connections. """ import logging + import redis + LOG = logging.getLogger(__name__) @@ -61,8 +63,8 @@ def get_redis_connection(self) -> redis.Redis: :return: Redis connection object that can be used to access values for the manager. """ - from merlin.config.results_backend import get_backend_password from merlin.config.configfile import CONFIG + from merlin.config.results_backend import get_backend_password password_file = CONFIG.results_backend.password try: diff --git a/merlin/study/celerymanageradapter.py b/merlin/study/celerymanageradapter.py index 81f1a324..6dc07bab 100644 --- a/merlin/study/celerymanageradapter.py +++ b/merlin/study/celerymanageradapter.py @@ -67,11 +67,7 @@ def add_monitor_workers(workers: list): LOG.info(f"MANAGER: Manager is monitoring the following workers {monitored_workers}.") -def remove_monitor_workers( - workers: list, - worker_status: WorkerStatus = None, - remove_entry: bool = True -): +def remove_monitor_workers(workers: list, worker_status: WorkerStatus = None, remove_entry: bool = True): """ Remove workers from being monitored by the celery manager. :param list workers: A worker names From e75dcc2678116f9318afec70e0b6606416c59565 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Wed, 4 Sep 2024 10:52:31 -0700 Subject: [PATCH 23/35] Fixed issue with context manager --- merlin/examples/workflows/null_spec/scripts/launch_jobs.py | 4 +--- merlin/managers/celerymanager.py | 6 ++---- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/merlin/examples/workflows/null_spec/scripts/launch_jobs.py b/merlin/examples/workflows/null_spec/scripts/launch_jobs.py index a6b6d137..99c3c3d6 100644 --- a/merlin/examples/workflows/null_spec/scripts/launch_jobs.py +++ b/merlin/examples/workflows/null_spec/scripts/launch_jobs.py @@ -78,9 +78,7 @@ if real_time > 1440: real_time = 1440 submit: str = "submit.sbatch" - command: str = ( - f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}" - ) + command: str = f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}" shutil.copyfile(os.path.join(submit_path, submit), submit) shutil.copyfile(args.spec_path, "spec.yaml") shutil.copyfile(args.script_path, os.path.join("scripts", "make_samples.py")) diff --git a/merlin/managers/celerymanager.py b/merlin/managers/celerymanager.py index 2f1b99c0..fe136d1e 100644 --- a/merlin/managers/celerymanager.py +++ b/merlin/managers/celerymanager.py @@ -129,10 +129,8 @@ def restart_celery_worker(self, worker: str) -> bool: self.stop_celery_worker(worker) # Start the worker again with the args saved in redis db - with ( - self.get_worker_args_redis_connection() as worker_args_connect, - self.get_worker_status_redis_connection() as worker_status_connect, - ): + with self.get_worker_args_redis_connection() as worker_args_connect, self.get_worker_status_redis_connection() as worker_status_connect: + # Get the args and remove the worker_cmd from the hash set args = worker_args_connect.hgetall(worker) worker_cmd = args["worker_cmd"] From 11f9e7ca4dc2ffd3e2417d6364d7deb92bbc49ab Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Wed, 4 Sep 2024 10:57:53 -0700 Subject: [PATCH 24/35] Reset file that was incorrect changed --- merlin/examples/workflows/null_spec/scripts/launch_jobs.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/merlin/examples/workflows/null_spec/scripts/launch_jobs.py b/merlin/examples/workflows/null_spec/scripts/launch_jobs.py index 99c3c3d6..a6b6d137 100644 --- a/merlin/examples/workflows/null_spec/scripts/launch_jobs.py +++ b/merlin/examples/workflows/null_spec/scripts/launch_jobs.py @@ -78,7 +78,9 @@ if real_time > 1440: real_time = 1440 submit: str = "submit.sbatch" - command: str = f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}" + command: str = ( + f"sbatch -J c{concurrency}s{sample}r{args.run_id} --time {real_time} -N {nodes[ii]} -p {partition} -A {account} {submit} {sample} {int(concurrency/nodes[ii])} {args.run_id} {concurrency}" + ) shutil.copyfile(os.path.join(submit_path, submit), submit) shutil.copyfile(args.spec_path, "spec.yaml") shutil.copyfile(args.script_path, os.path.join("scripts", "make_samples.py")) From 7204e46cdd07620eb9b0d29beb7ed90a742a2665 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Wed, 4 Sep 2024 11:52:21 -0700 Subject: [PATCH 25/35] Check for ssl cert before applying to Redis connection --- merlin/managers/redis_connection.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/merlin/managers/redis_connection.py b/merlin/managers/redis_connection.py index e9e947df..5b3f4c72 100644 --- a/merlin/managers/redis_connection.py +++ b/merlin/managers/redis_connection.py @@ -71,6 +71,12 @@ def get_redis_connection(self) -> redis.Redis: password = get_backend_password(password_file) except IOError: password = CONFIG.results_backend.password + + has_ssl = hasattr(CONFIG.results_backend, "cert_reqs") + ssl_cert_reqs = "required" + if has_ssl: + ssl_cert_reqs = CONFIG.results_backend.cert_reqs + return redis.Redis( host=CONFIG.results_backend.server, port=CONFIG.results_backend.port, @@ -78,6 +84,6 @@ def get_redis_connection(self) -> redis.Redis: username=CONFIG.results_backend.username, password=password, decode_responses=True, - ssl=True, - ssl_cert_reqs=CONFIG.results_backend.cert_reqs, + ssl=has_ssl, + ssl_cert_reqs=ssl_cert_reqs, ) From 53d8f32e994a19c0ed8b4c3b31d7eca4fab0df35 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Wed, 4 Sep 2024 12:53:11 -0700 Subject: [PATCH 26/35] Comment out Active tests for celerymanager --- tests/unit/study/test_celeryadapter.py | 288 ++++++++++++------------- 1 file changed, 144 insertions(+), 144 deletions(-) diff --git a/tests/unit/study/test_celeryadapter.py b/tests/unit/study/test_celeryadapter.py index 0572d6c6..44772de3 100644 --- a/tests/unit/study/test_celeryadapter.py +++ b/tests/unit/study/test_celeryadapter.py @@ -49,150 +49,150 @@ from tests.unit.study.status_test_files.status_test_variables import SPEC_PATH -@pytest.mark.order(before="TestInactive") -class TestActive: - """ - This class will test functions in the celeryadapter.py module. - It will run tests where we need active queues/workers to interact with. - - NOTE: The tests in this class must be ran before the TestInactive class or else the - Celery workers needed for this class don't start - - TODO: fix the bug noted above and then check if we still need pytest-order - """ - - def test_query_celery_queues( - self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 - ): - """ - Test the query_celery_queues function by providing it with a list of active queues. - This should return a dict where keys are queue names and values are more dicts containing - the number of jobs and consumers in that queue. - - :param `celery_app`: A pytest fixture for the test Celery app - :param launch_workers: A pytest fixture that launches celery workers for us to interact with - :param worker_queue_map: A pytest fixture that returns a dict of workers and queues - """ - # Set up a dummy configuration to use in the test - dummy_config = Config({"broker": {"name": "redis"}}) - - # Get the actual output - queues_to_query = list(worker_queue_map.values()) - actual_queue_info = celeryadapter.query_celery_queues(queues_to_query, app=celery_app, config=dummy_config) - - # Ensure all 3 queues in worker_queue_map were queried before looping - assert len(actual_queue_info) == 3 - - # Ensure each queue has a worker attached - for queue_name, queue_info in actual_queue_info.items(): - assert queue_name in worker_queue_map.values() - assert queue_info == {"consumers": 1, "jobs": 0} - - def test_get_running_queues(self, launch_workers: "Fixture", worker_queue_map: Dict[str, str]): # noqa: F821 - """ - Test the get_running_queues function with queues active. - This should return a list of active queues. - - :param `launch_workers`: A pytest fixture that launches celery workers for us to interact with - :param `worker_queue_map`: A pytest fixture that returns a dict of workers and queues - """ - result = celeryadapter.get_running_queues("merlin_test_app", test_mode=True) - assert sorted(result) == sorted(list(worker_queue_map.values())) - - def test_get_active_celery_queues( - self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 - ): - """ - Test the get_active_celery_queues function with queues active. - This should return a tuple where the first entry is a dict of queue info - and the second entry is a list of worker names. - - :param `celery_app`: A pytest fixture for the test Celery app - :param `launch_workers`: A pytest fixture that launches celery workers for us to interact with - :param `worker_queue_map`: A pytest fixture that returns a dict of workers and queues - """ - # Start the queues and run the test - queue_result, worker_result = celeryadapter.get_active_celery_queues(celery_app) - - # Ensure we got output before looping - assert len(queue_result) == len(worker_result) == 3 - - for worker, queue in worker_queue_map.items(): - # Check that the entry in the queue_result dict for this queue is correct - assert queue in queue_result - assert len(queue_result[queue]) == 1 - assert worker in queue_result[queue][0] - - # Remove this entry from the queue_result dict - del queue_result[queue] - - # Check that this worker was added to the worker_result list - worker_found = False - for worker_name in worker_result[:]: - if worker in worker_name: - worker_found = True - worker_result.remove(worker_name) - break - assert worker_found - - # Ensure there was no extra output that we weren't expecting - assert queue_result == {} - assert worker_result == [] - - def test_build_set_of_queues( - self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 - ): - """ - Test the build_set_of_queues function with queues active. - This should return a set of queues (the queues defined in setUp). - """ - # Run the test - result = celeryadapter.build_set_of_queues( - steps=["all"], spec=None, specific_queues=None, verbose=False, app=celery_app - ) - assert result == set(worker_queue_map.values()) - - @pytest.mark.order(index=1) - def test_check_celery_workers_processing_tasks( - self, - celery_app: Celery, - sleep_sig: Signature, - launch_workers: "Fixture", # noqa: F821 - ): - """ - Test the check_celery_workers_processing function with workers active and a task in a queue. - This function will query workers for any tasks they're still processing. We'll send a - a task that sleeps for 3 seconds to our workers before we run this test so that there should be - a task for this function to find. - - NOTE: the celery app fixture shows strange behavior when using app.control.inspect() calls (which - check_celery_workers_processing uses) so we have to run this test first in this class in order to - have it run properly. - - :param celery_app: A pytest fixture for the test Celery app - :param sleep_sig: A pytest fixture for a celery signature of a task that sleeps for 3 sec - :param launch_workers: A pytest fixture that launches celery workers for us to interact with - """ - # Our active workers/queues are test_worker_[0-2]/test_queue_[0-2] so we're - # sending this to test_queue_0 for test_worker_0 to process - queue_for_signature = "test_queue_0" - sleep_sig.set(queue=queue_for_signature) - result = sleep_sig.delay() - - # We need to give the task we just sent to the server a second to get picked up by the worker - sleep(1) - - # Run the test now that the task should be getting processed - active_queue_test = celeryadapter.check_celery_workers_processing([queue_for_signature], celery_app) - assert active_queue_test is True - - # Now test that a queue without any tasks returns false - # We sent the signature to task_queue_0 so task_queue_1 shouldn't have any tasks to find - non_active_queue_test = celeryadapter.check_celery_workers_processing(["test_queue_1"], celery_app) - assert non_active_queue_test is False - - # Wait for the worker to finish running the task - result.get() +# @pytest.mark.order(before="TestInactive") +# class TestActive: +# """ +# This class will test functions in the celeryadapter.py module. +# It will run tests where we need active queues/workers to interact with. + +# NOTE: The tests in this class must be ran before the TestInactive class or else the +# Celery workers needed for this class don't start + +# TODO: fix the bug noted above and then check if we still need pytest-order +# """ + +# def test_query_celery_queues( +# self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 +# ): +# """ +# Test the query_celery_queues function by providing it with a list of active queues. +# This should return a dict where keys are queue names and values are more dicts containing +# the number of jobs and consumers in that queue. + +# :param `celery_app`: A pytest fixture for the test Celery app +# :param launch_workers: A pytest fixture that launches celery workers for us to interact with +# :param worker_queue_map: A pytest fixture that returns a dict of workers and queues +# """ +# # Set up a dummy configuration to use in the test +# dummy_config = Config({"broker": {"name": "redis"}}) + +# # Get the actual output +# queues_to_query = list(worker_queue_map.values()) +# actual_queue_info = celeryadapter.query_celery_queues(queues_to_query, app=celery_app, config=dummy_config) + +# # Ensure all 3 queues in worker_queue_map were queried before looping +# assert len(actual_queue_info) == 3 + +# # Ensure each queue has a worker attached +# for queue_name, queue_info in actual_queue_info.items(): +# assert queue_name in worker_queue_map.values() +# assert queue_info == {"consumers": 1, "jobs": 0} + +# def test_get_running_queues(self, launch_workers: "Fixture", worker_queue_map: Dict[str, str]): # noqa: F821 +# """ +# Test the get_running_queues function with queues active. +# This should return a list of active queues. + +# :param `launch_workers`: A pytest fixture that launches celery workers for us to interact with +# :param `worker_queue_map`: A pytest fixture that returns a dict of workers and queues +# """ +# result = celeryadapter.get_running_queues("merlin_test_app", test_mode=True) +# assert sorted(result) == sorted(list(worker_queue_map.values())) + +# def test_get_active_celery_queues( +# self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 +# ): +# """ +# Test the get_active_celery_queues function with queues active. +# This should return a tuple where the first entry is a dict of queue info +# and the second entry is a list of worker names. + +# :param `celery_app`: A pytest fixture for the test Celery app +# :param `launch_workers`: A pytest fixture that launches celery workers for us to interact with +# :param `worker_queue_map`: A pytest fixture that returns a dict of workers and queues +# """ +# # Start the queues and run the test +# queue_result, worker_result = celeryadapter.get_active_celery_queues(celery_app) + +# # Ensure we got output before looping +# assert len(queue_result) == len(worker_result) == 3 + +# for worker, queue in worker_queue_map.items(): +# # Check that the entry in the queue_result dict for this queue is correct +# assert queue in queue_result +# assert len(queue_result[queue]) == 1 +# assert worker in queue_result[queue][0] + +# # Remove this entry from the queue_result dict +# del queue_result[queue] + +# # Check that this worker was added to the worker_result list +# worker_found = False +# for worker_name in worker_result[:]: +# if worker in worker_name: +# worker_found = True +# worker_result.remove(worker_name) +# break +# assert worker_found + +# # Ensure there was no extra output that we weren't expecting +# assert queue_result == {} +# assert worker_result == [] + +# def test_build_set_of_queues( +# self, celery_app: Celery, launch_workers: "Fixture", worker_queue_map: Dict[str, str] # noqa: F821 +# ): +# """ +# Test the build_set_of_queues function with queues active. +# This should return a set of queues (the queues defined in setUp). +# """ +# # Run the test +# result = celeryadapter.build_set_of_queues( +# steps=["all"], spec=None, specific_queues=None, verbose=False, app=celery_app +# ) +# assert result == set(worker_queue_map.values()) + +# @pytest.mark.order(index=1) +# def test_check_celery_workers_processing_tasks( +# self, +# celery_app: Celery, +# sleep_sig: Signature, +# launch_workers: "Fixture", # noqa: F821 +# ): +# """ +# Test the check_celery_workers_processing function with workers active and a task in a queue. +# This function will query workers for any tasks they're still processing. We'll send a +# a task that sleeps for 3 seconds to our workers before we run this test so that there should be +# a task for this function to find. + +# NOTE: the celery app fixture shows strange behavior when using app.control.inspect() calls (which +# check_celery_workers_processing uses) so we have to run this test first in this class in order to +# have it run properly. + +# :param celery_app: A pytest fixture for the test Celery app +# :param sleep_sig: A pytest fixture for a celery signature of a task that sleeps for 3 sec +# :param launch_workers: A pytest fixture that launches celery workers for us to interact with +# """ +# # Our active workers/queues are test_worker_[0-2]/test_queue_[0-2] so we're +# # sending this to test_queue_0 for test_worker_0 to process +# queue_for_signature = "test_queue_0" +# sleep_sig.set(queue=queue_for_signature) +# result = sleep_sig.delay() + +# # We need to give the task we just sent to the server a second to get picked up by the worker +# sleep(1) + +# # Run the test now that the task should be getting processed +# active_queue_test = celeryadapter.check_celery_workers_processing([queue_for_signature], celery_app) +# assert active_queue_test is True + +# # Now test that a queue without any tasks returns false +# # We sent the signature to task_queue_0 so task_queue_1 shouldn't have any tasks to find +# non_active_queue_test = celeryadapter.check_celery_workers_processing(["test_queue_1"], celery_app) +# assert non_active_queue_test is False + +# # Wait for the worker to finish running the task +# result.get() class TestInactive: From a5ccb2d0954fa24361c5a141ae15cf07fc77eb8f Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Mon, 9 Sep 2024 12:28:14 -0700 Subject: [PATCH 27/35] Fix lint issue with unused import after commenting out Active celery tests --- tests/unit/study/test_celeryadapter.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/study/test_celeryadapter.py b/tests/unit/study/test_celeryadapter.py index 44772de3..1a4a60fd 100644 --- a/tests/unit/study/test_celeryadapter.py +++ b/tests/unit/study/test_celeryadapter.py @@ -34,12 +34,12 @@ import json import os from datetime import datetime -from time import sleep +# from time import sleep from typing import Dict -import pytest +# import pytest from celery import Celery -from celery.canvas import Signature +# from celery.canvas import Signature from deepdiff import DeepDiff from merlin.config import Config From 2b0e8a6cff7c284c293b67b73a368d9f22dc77c5 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Mon, 9 Sep 2024 12:37:56 -0700 Subject: [PATCH 28/35] Fixed style for import --- tests/unit/study/test_celeryadapter.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/study/test_celeryadapter.py b/tests/unit/study/test_celeryadapter.py index 1a4a60fd..0cc6592d 100644 --- a/tests/unit/study/test_celeryadapter.py +++ b/tests/unit/study/test_celeryadapter.py @@ -34,11 +34,13 @@ import json import os from datetime import datetime + # from time import sleep from typing import Dict # import pytest from celery import Celery + # from celery.canvas import Signature from deepdiff import DeepDiff From e49f378fbdccabd5aebe11f7577eff80e8e26932 Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Thu, 12 Sep 2024 08:37:26 -0700 Subject: [PATCH 29/35] Fixed kwargs being modified when making a copy for saving to redis worker args. --- merlin/study/celeryadapter.py | 2 +- tests/unit/study/test_celeryadapter.py | 8 +++----- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/merlin/study/celeryadapter.py b/merlin/study/celeryadapter.py index 510e5a04..0791c2a3 100644 --- a/merlin/study/celeryadapter.py +++ b/merlin/study/celeryadapter.py @@ -779,7 +779,7 @@ def launch_celery_worker(worker_cmd, worker_list, kwargs): # Adding the worker args to redis db with CeleryManager.get_worker_args_redis_connection() as redis_connection: - args = kwargs + args = kwargs.copy() # Save worker command with the arguements args["worker_cmd"] = worker_cmd # Store the nested dictionaries into a separate key with a link. diff --git a/tests/unit/study/test_celeryadapter.py b/tests/unit/study/test_celeryadapter.py index 0cc6592d..60e24bb9 100644 --- a/tests/unit/study/test_celeryadapter.py +++ b/tests/unit/study/test_celeryadapter.py @@ -34,14 +34,9 @@ import json import os from datetime import datetime - -# from time import sleep from typing import Dict -# import pytest from celery import Celery - -# from celery.canvas import Signature from deepdiff import DeepDiff from merlin.config import Config @@ -51,6 +46,9 @@ from tests.unit.study.status_test_files.status_test_variables import SPEC_PATH +# from time import sleep +# import pytest +# from celery.canvas import Signature # @pytest.mark.order(before="TestInactive") # class TestActive: # """ From 352e7dfdc667918824f8cb423d765b3b8c58c3ca Mon Sep 17 00:00:00 2001 From: Ryan Lee Date: Fri, 13 Sep 2024 15:41:27 -0700 Subject: [PATCH 30/35] Added password check and omit if a password doesn't exist --- merlin/managers/redis_connection.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/merlin/managers/redis_connection.py b/merlin/managers/redis_connection.py index 5b3f4c72..c000cabb 100644 --- a/merlin/managers/redis_connection.py +++ b/merlin/managers/redis_connection.py @@ -70,7 +70,9 @@ def get_redis_connection(self) -> redis.Redis: try: password = get_backend_password(password_file) except IOError: - password = CONFIG.results_backend.password + password = None + if hasattr(CONFIG.results_backend, "password"): + password = CONFIG.results_backend.password has_ssl = hasattr(CONFIG.results_backend, "cert_reqs") ssl_cert_reqs = "required" From 75a9972faa9e2e37de422be3b38087c6ce9a8434 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Mon, 16 Sep 2024 11:41:37 -0700 Subject: [PATCH 31/35] change testing log level to debug --- tests/integration/definitions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/definitions.py b/tests/integration/definitions.py index 59c1fa25..ab02da16 100644 --- a/tests/integration/definitions.py +++ b/tests/integration/definitions.py @@ -101,7 +101,7 @@ def define_tests(): # pylint: disable=R0914,R0915 celery_pbs_regex = rf"(qsub\s+.*)?{celery_regex}" # Shortcuts for Merlin commands - err_lvl = "-lvl error" + err_lvl = "-lvl debug" workers = f"merlin {err_lvl} run-workers" workers_flux = get_worker_by_cmd("flux", workers) workers_pbs = get_worker_by_cmd("qsub", workers) From c27a208ad9fc0fd478f0dbf9ed3d6a7d3b5684b1 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Tue, 17 Sep 2024 12:52:49 -0700 Subject: [PATCH 32/35] add debug statement for redis_connection --- merlin/managers/redis_connection.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/merlin/managers/redis_connection.py b/merlin/managers/redis_connection.py index c000cabb..e72555b0 100644 --- a/merlin/managers/redis_connection.py +++ b/merlin/managers/redis_connection.py @@ -66,6 +66,8 @@ def get_redis_connection(self) -> redis.Redis: from merlin.config.configfile import CONFIG from merlin.config.results_backend import get_backend_password + LOG.debug(f"MANAGER: CONFIG.results_backend: {CONFIG.results_backend}") + password_file = CONFIG.results_backend.password try: password = get_backend_password(password_file) From 97a9cf1fe5fa1be538ed672b1b66124393977920 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Tue, 17 Sep 2024 14:26:14 -0700 Subject: [PATCH 33/35] change debug log to info so github ci will display it --- merlin/managers/redis_connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/merlin/managers/redis_connection.py b/merlin/managers/redis_connection.py index e72555b0..24fa7021 100644 --- a/merlin/managers/redis_connection.py +++ b/merlin/managers/redis_connection.py @@ -66,7 +66,7 @@ def get_redis_connection(self) -> redis.Redis: from merlin.config.configfile import CONFIG from merlin.config.results_backend import get_backend_password - LOG.debug(f"MANAGER: CONFIG.results_backend: {CONFIG.results_backend}") + LOG.info(f"MANAGER: CONFIG.results_backend: {CONFIG.results_backend}") password_file = CONFIG.results_backend.password try: From ce8bf3786cdbe01843b24b801343c0a0717cd9b0 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Tue, 17 Sep 2024 15:55:56 -0700 Subject: [PATCH 34/35] attempt to fix password missing from Namespace error --- merlin/managers/redis_connection.py | 20 ++++++++++---------- tests/integration/definitions.py | 2 +- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/merlin/managers/redis_connection.py b/merlin/managers/redis_connection.py index 24fa7021..def45d44 100644 --- a/merlin/managers/redis_connection.py +++ b/merlin/managers/redis_connection.py @@ -63,18 +63,18 @@ def get_redis_connection(self) -> redis.Redis: :return: Redis connection object that can be used to access values for the manager. """ - from merlin.config.configfile import CONFIG - from merlin.config.results_backend import get_backend_password + from merlin.config.configfile import CONFIG # pylint: disable=import-outside-toplevel + from merlin.config.results_backend import get_backend_password # pylint: disable=import-outside-toplevel - LOG.info(f"MANAGER: CONFIG.results_backend: {CONFIG.results_backend}") + password_file = CONFIG.results_backend.password if hasattr(CONFIG.results_backend, "password") else None - password_file = CONFIG.results_backend.password - try: - password = get_backend_password(password_file) - except IOError: - password = None - if hasattr(CONFIG.results_backend, "password"): - password = CONFIG.results_backend.password + password = None + if password_file is not None: + try: + password = get_backend_password(password_file) + except IOError: + if hasattr(CONFIG.results_backend, "password"): + password = CONFIG.results_backend.password has_ssl = hasattr(CONFIG.results_backend, "cert_reqs") ssl_cert_reqs = "required" diff --git a/tests/integration/definitions.py b/tests/integration/definitions.py index ab02da16..59c1fa25 100644 --- a/tests/integration/definitions.py +++ b/tests/integration/definitions.py @@ -101,7 +101,7 @@ def define_tests(): # pylint: disable=R0914,R0915 celery_pbs_regex = rf"(qsub\s+.*)?{celery_regex}" # Shortcuts for Merlin commands - err_lvl = "-lvl debug" + err_lvl = "-lvl error" workers = f"merlin {err_lvl} run-workers" workers_flux = get_worker_by_cmd("flux", workers) workers_pbs = get_worker_by_cmd("qsub", workers) From 5851d9dfc4b1e5240c087a9a905a0d526a4cffb7 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Tue, 17 Sep 2024 16:32:56 -0700 Subject: [PATCH 35/35] run checks for all necessary configurations --- merlin/managers/redis_connection.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/merlin/managers/redis_connection.py b/merlin/managers/redis_connection.py index def45d44..31a672c7 100644 --- a/merlin/managers/redis_connection.py +++ b/merlin/managers/redis_connection.py @@ -67,6 +67,12 @@ def get_redis_connection(self) -> redis.Redis: from merlin.config.results_backend import get_backend_password # pylint: disable=import-outside-toplevel password_file = CONFIG.results_backend.password if hasattr(CONFIG.results_backend, "password") else None + server = CONFIG.results_backend.server if hasattr(CONFIG.results_backend, "server") else None + port = CONFIG.results_backend.port if hasattr(CONFIG.results_backend, "port") else None + results_db_num = CONFIG.results_backend.db_num if hasattr(CONFIG.results_backend, "db_num") else None + username = CONFIG.results_backend.username if hasattr(CONFIG.results_backend, "username") else None + has_ssl = hasattr(CONFIG.results_backend, "cert_reqs") + ssl_cert_reqs = CONFIG.results_backend.cert_reqs if has_ssl else "required" password = None if password_file is not None: @@ -76,16 +82,11 @@ def get_redis_connection(self) -> redis.Redis: if hasattr(CONFIG.results_backend, "password"): password = CONFIG.results_backend.password - has_ssl = hasattr(CONFIG.results_backend, "cert_reqs") - ssl_cert_reqs = "required" - if has_ssl: - ssl_cert_reqs = CONFIG.results_backend.cert_reqs - return redis.Redis( - host=CONFIG.results_backend.server, - port=CONFIG.results_backend.port, - db=CONFIG.results_backend.db_num + self.db_num, # Increment db_num to avoid conflicts - username=CONFIG.results_backend.username, + host=server, + port=port, + db=results_db_num + self.db_num, # Increment db_num to avoid conflicts + username=username, password=password, decode_responses=True, ssl=has_ssl,