Skip to content

Commit

Permalink
remove manager functionality from this PR
Browse files Browse the repository at this point in the history
  • Loading branch information
bgunnar5 committed Nov 12, 2024
1 parent 3bc1fa6 commit 3be8963
Show file tree
Hide file tree
Showing 9 changed files with 13 additions and 618 deletions.
1 change: 0 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
### Added
- Merlin manager capability to monitor celery workers.
- Added additional tests for the `merlin run` and `merlin purge` commands
- Aliased types to represent different types of pytest fixtures
- New test condition `StepFinishedFilesCount` to help search for `MERLIN_FINISHED` files in output workspaces
Expand Down
101 changes: 1 addition & 100 deletions merlin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
from merlin.server.server_commands import config_server, init_server, restart_server, start_server, status_server, stop_server
from merlin.spec.expansion import RESERVED, get_spec_with_expansion
from merlin.spec.specification import MerlinSpec
from merlin.study.celerymanageradapter import run_manager, start_manager, stop_manager
from merlin.study.status import DetailedStatus, Status
from merlin.study.status_constants import VALID_RETURN_CODES, VALID_STATUS_FILTERS
from merlin.study.status_renderers import status_renderer_factory
Expand Down Expand Up @@ -360,7 +359,7 @@ def stop_workers(args):
LOG.warning(f"Worker '{worker_name}' is unexpanded. Target provenance spec instead?")

# Send stop command to router
router.stop_workers(args.task_server, worker_names, args.queues, args.workers, args.level.upper())
router.stop_workers(args.task_server, worker_names, args.queues, args.workers)


def print_info(args):
Expand Down Expand Up @@ -401,35 +400,6 @@ def process_example(args: Namespace) -> None:
setup_example(args.workflow, args.path)


def process_manager(args: Namespace):
"""
Process the command for managing the workers.
This function interprets the command provided in the `args` namespace and
executes the corresponding manager function. It supports three commands:
"run", "start", and "stop".
:param args: parsed CLI arguments
"""
if args.command == "run":
run_manager(query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout)
elif args.command == "start":
try:
start_manager(
query_frequency=args.query_frequency, query_timeout=args.query_timeout, worker_timeout=args.worker_timeout
)
LOG.info("Manager started successfully.")
except Exception as e:
LOG.error(f"Unable to start manager.\n{e}")
elif args.command == "stop":
if stop_manager():
LOG.info("Manager stopped successfully.")
else:
LOG.error("Unable to stop manager.")
else:
print("Run manager with a command. Try 'merlin manager -h' for more details")


def process_monitor(args):
"""
CLI command to monitor merlin workers and queues to keep
Expand Down Expand Up @@ -935,75 +905,6 @@ def generate_worker_touching_parsers(subparsers: ArgumentParser) -> None:
help="regex match for specific workers to stop",
)

# merlin manager
manager: ArgumentParser = subparsers.add_parser(
"manager",
help="Watchdog application to manage workers",
description="A daemon process that helps to restart and communicate with workers while running.",
formatter_class=ArgumentDefaultsHelpFormatter,
)
manager.set_defaults(func=process_manager)

def add_manager_options(manager_parser: ArgumentParser):
"""
Add shared options for manager subcommands.
The `manager run` and `manager start` subcommands have the same options.
Rather than writing duplicate code for these we'll use this function
to add the arguments to these subcommands.
:param manager_parser: The ArgumentParser object to add these options to
"""
manager_parser.add_argument(
"-qf",
"--query_frequency",
action="store",
type=int,
default=60,
help="The frequency at which workers will be queried for response.",
)
manager_parser.add_argument(
"-qt",
"--query_timeout",
action="store",
type=float,
default=0.5,
help="The timeout for the query response that are sent to workers.",
)
manager_parser.add_argument(
"-wt",
"--worker_timeout",
action="store",
type=int,
default=180,
help="The sum total (query_frequency*tries) time before an attempt is made to restart worker.",
)

manager_commands: ArgumentParser = manager.add_subparsers(dest="command")
manager_run = manager_commands.add_parser(
"run",
help="Run the daemon process",
description="Run manager",
formatter_class=ArgumentDefaultsHelpFormatter,
)
add_manager_options(manager_run)
manager_run.set_defaults(func=process_manager)
manager_start = manager_commands.add_parser(
"start",
help="Start the daemon process",
description="Start manager",
formatter_class=ArgumentDefaultsHelpFormatter,
)
add_manager_options(manager_start)
manager_start.set_defaults(func=process_manager)
manager_stop = manager_commands.add_parser(
"stop",
help="Stop the daemon process",
description="Stop manager",
formatter_class=ArgumentDefaultsHelpFormatter,
)
manager_stop.set_defaults(func=process_manager)

# merlin monitor
monitor: ArgumentParser = subparsers.add_parser(
"monitor",
Expand Down
Empty file removed merlin/managers/__init__.py
Empty file.
214 changes: 0 additions & 214 deletions merlin/managers/celerymanager.py

This file was deleted.

Loading

0 comments on commit 3be8963

Please sign in to comment.