From 634060466561d9e313a19f1f368a68ac05e801f4 Mon Sep 17 00:00:00 2001 From: Brian Gunnarson Date: Mon, 30 Sep 2024 14:34:26 -0700 Subject: [PATCH] move stop and query workers tests to the same file --- tests/integration/commands/base_classes.py | 130 ------ .../commands/test_query_workers.py | 268 ----------- .../commands/test_stop_and_query_workers.py | 442 ++++++++++++++++++ .../integration/commands/test_stop_workers.py | 292 ------------ tests/integration/helper_funcs.py | 4 +- 5 files changed, 444 insertions(+), 692 deletions(-) delete mode 100644 tests/integration/commands/base_classes.py delete mode 100644 tests/integration/commands/test_query_workers.py create mode 100644 tests/integration/commands/test_stop_and_query_workers.py delete mode 100644 tests/integration/commands/test_stop_workers.py diff --git a/tests/integration/commands/base_classes.py b/tests/integration/commands/base_classes.py deleted file mode 100644 index 917e8db6..00000000 --- a/tests/integration/commands/base_classes.py +++ /dev/null @@ -1,130 +0,0 @@ -""" -This module will contain the base classes used for -the integration tests in this command directory. -""" - -import os -import subprocess -from contextlib import contextmanager -from typing import List - -from tests.context_managers.celery_workers_manager import CeleryWorkersManager -from tests.integration.conditions import Condition -from tests.integration.helper_funcs import check_test_conditions, copy_app_yaml_to_cwd, load_workers_from_spec - - -class BaseStopWorkersAndQueryWorkersTest: - """ - Base class for `stop-workers` and `query-workers` tests. - Contains necessary methods for executing the tests. - """ - - @contextmanager - def run_test_with_workers( - self, - path_to_test_specs: str, - merlin_server_dir: str, - conditions: List[Condition], - command: str, - flag: str = None, - ): - """ - Helper method to run common testing logic for tests with workers started. - This method must also be a context manager so we can check the status of the - workers prior to the CeleryWorkersManager running it's exit code that shuts down - all active workers. - - This method will: - 0. Read in the necessary fixtures as parameters. These fixtures grab paths to - our test specs and the merlin server directory created from starting the - containerized redis server. - 1. Load in the worker specifications from the `multiple_workers.yaml` file. - 2. Use a context manager to start up the workers on the celery app connected to - the containerized redis server - 3. Copy the app.yaml file for the containerized redis server to the current working - directory so that merlin will connect to it when we run our test - 4. Run the test command that's provided and check that the conditions given are - passing. - 5. Yield control back to the calling method. - 6. Safely terminate workers that may have not been stopped once the calling method - completes. - - Parameters: - path_to_test_specs: - A fixture to provide the path to the directory containing test specifications. - merlin_server_dir: - A fixture to provide the path to the merlin_server directory that will be - created by the `redis_server` fixture. - conditions: - A list of `Condition` instances that need to pass in order for this test to - be successful. - command: - The command that we're testing. E.g. "merlin stop-workers" - flag: - An optional flag to add to the command that we're testing so we can test - different functionality for the command. - """ - from merlin.celery import app as celery_app - - # Grab worker configurations from the spec file - multiple_worker_spec = os.path.join(path_to_test_specs, "multiple_workers.yaml") - workers_from_spec = load_workers_from_spec(multiple_worker_spec) - - # We use a context manager to start workers so that they'll safely stop even if this test fails - with CeleryWorkersManager(celery_app) as workers_manager: - workers_manager.launch_workers(workers_from_spec) - - # Copy the app.yaml to the cwd so merlin will connect to the testing server - copy_app_yaml_to_cwd(merlin_server_dir) - - # Run the test - cmd_to_test = f"{command} {flag}" if flag else command - result = subprocess.run(cmd_to_test, capture_output=True, text=True, shell=True) - - info = { - "stdout": result.stdout, - "stderr": result.stderr, - "return_code": result.returncode, - } - - # Ensure all test conditions are satisfied - check_test_conditions(conditions, info) - - yield - - def run_test_without_workers(self, merlin_server_dir: str, conditions: List[Condition], command: str): - """ - Helper function to run common testing logic for tests with no workers started. - - This test will: - 0. Read in the `merlin_server_dir` fixture as a parameter. This fixture is a - path to the merlin server directory created from starting the containerized - redis server. - 1. Copy the app.yaml file for the containerized redis server to the current working - directory so that merlin will connect to it when we run our test - 2. Run the test command that's provided and check that the conditions given are - passing. - - Parameters: - merlin_server_dir: - A fixture to provide the path to the merlin_server directory that will be - created by the `redis_server` fixture. - conditions: - A list of `Condition` instances that need to pass in order for this test to - be successful. - command: - The command that we're testing. E.g. "merlin stop-workers" - """ - # Copy the app.yaml to the cwd so merlin will connect to the testing server - copy_app_yaml_to_cwd(merlin_server_dir) - - # Run the test - result = subprocess.run(command, capture_output=True, text=True, shell=True) - info = { - "stdout": result.stdout, - "stderr": result.stderr, - "return_code": result.returncode, - } - - # Ensure all test conditions are satisfied - check_test_conditions(conditions, info) diff --git a/tests/integration/commands/test_query_workers.py b/tests/integration/commands/test_query_workers.py deleted file mode 100644 index d1b4cf9d..00000000 --- a/tests/integration/commands/test_query_workers.py +++ /dev/null @@ -1,268 +0,0 @@ -""" -Tests for the `merlin query-workers` command. -""" - -import os -from enum import Enum - -from tests.integration.commands.base_classes import BaseStopWorkersAndQueryWorkersTest -from tests.integration.conditions import HasRegex - - -class WorkerMessages(Enum): - """ - Enumerated strings to help keep track of the messages - that we're expecting (or not expecting) to see from the - tests in this module. - """ - - NO_WORKERS_MSG = "No workers found!" - STEP_1_WORKER = "step_1_merlin_test_worker" - STEP_2_WORKER = "step_2_merlin_test_worker" - OTHER_WORKER = "other_merlin_test_worker" - - -class TestQueryWorkers(BaseStopWorkersAndQueryWorkersTest): - """ - Tests for the `merlin query-workers` command. Most of these tests will: - 1. Start workers from a spec file used for testing - - Use CeleryWorkerManager for this to ensure safe stoppage of workers - if something goes wrong - 2. Run the `merlin query-workers` command from a subprocess - """ - - command_to_test = "merlin query-workers" - - def test_no_workers( - self, - redis_server: str, - redis_results_backend_config: "Fixture", # noqa: F821 - redis_broker_config: "Fixture", # noqa: F821 - merlin_server_dir: str, - ): - """ - Test the `merlin query-workers` command with no workers started in the first place. - - Run the `merlin query-workers` command and ensure that a "no workers found" message - is written to the output. To see more information on exactly what this test is doing, - see the `run_test_without_workers()` method of the base class. - - Parameters: - redis_server: - A fixture that starts a containerized redis server instance that runs on - localhost:6379. - redis_results_backend_config: - A fixture that modifies the CONFIG object so that it points the results - backend configuration to the containerized redis server we start up with - the `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - redis_broker_config: - A fixture that modifies the CONFIG object so that it points the broker - configuration to the containerized redis server we start up with the - `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - merlin_server_dir: - A fixture to provide the path to the merlin_server directory that will be - created by the `redis_server` fixture. - """ - conditions = [ - HasRegex(WorkerMessages.NO_WORKERS_MSG.value), # No workers should be launched so we should see this - HasRegex(WorkerMessages.STEP_1_WORKER.value, negate=True), # None of these workers should be started - HasRegex(WorkerMessages.STEP_2_WORKER.value, negate=True), # None of these workers should be started - HasRegex(WorkerMessages.OTHER_WORKER.value, negate=True), # None of these workers should be started - ] - self.run_test_without_workers(merlin_server_dir, conditions, self.command_to_test) - - def test_no_flags( - self, - redis_server: str, - redis_results_backend_config: "Fixture", # noqa: F821 - redis_broker_config: "Fixture", # noqa: F821 - path_to_test_specs: str, - merlin_server_dir: str, - ): - """ - Test the `merlin query-workers` command with no flags. - - Run the `merlin query-workers` command and ensure that all workers are queried. - To see more information on exactly what this test is doing, see the - `run_test_with_workers()` method. - - Parameters: - redis_server: - A fixture that starts a containerized redis server instance that runs on - localhost:6379. - redis_results_backend_config: - A fixture that modifies the CONFIG object so that it points the results - backend configuration to the containerized redis server we start up with - the `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - redis_broker_config: - A fixture that modifies the CONFIG object so that it points the broker - configuration to the containerized redis server we start up with the - `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - path_to_test_specs: - A fixture to provide the path to the directory containing test specifications. - merlin_server_dir: - A fixture to provide the path to the merlin_server directory that will be - created by the `redis_server` fixture. - """ - conditions = [ - HasRegex(WorkerMessages.NO_WORKERS_MSG.value, negate=True), # Some workers should be found - HasRegex(WorkerMessages.STEP_1_WORKER.value), # This worker should be queried - HasRegex(WorkerMessages.STEP_2_WORKER.value), # This worker should be queried - HasRegex(WorkerMessages.OTHER_WORKER.value), # This worker should be queried - ] - with self.run_test_with_workers(path_to_test_specs, merlin_server_dir, conditions, self.command_to_test): - pass - - def test_spec_flag( - self, - redis_server: str, - redis_results_backend_config: "Fixture", # noqa: F821 - redis_broker_config: "Fixture", # noqa: F821 - path_to_test_specs: str, - merlin_server_dir: str, - ): - """ - Test the `merlin query-workers` command with the `--spec` flag. - - Run the `merlin query-workers` command with the `--spec` flag and ensure that all - workers are queried. To see more information on exactly what this test is doing, - see the `run_test_with_workers()` method. - - Parameters: - redis_server: - A fixture that starts a containerized redis server instance that runs on - localhost:6379. - redis_results_backend_config: - A fixture that modifies the CONFIG object so that it points the results - backend configuration to the containerized redis server we start up with - the `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - redis_broker_config: - A fixture that modifies the CONFIG object so that it points the broker - configuration to the containerized redis server we start up with the - `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - path_to_test_specs: - A fixture to provide the path to the directory containing test specifications. - merlin_server_dir: - A fixture to provide the path to the merlin_server directory that will be - created by the `redis_server` fixture. - """ - conditions = [ - HasRegex(WorkerMessages.NO_WORKERS_MSG.value, negate=True), # Some workers should be queried - HasRegex(WorkerMessages.STEP_1_WORKER.value), # This worker should be queried - HasRegex(WorkerMessages.STEP_2_WORKER.value), # This worker should be queried - HasRegex(WorkerMessages.OTHER_WORKER.value), # This worker should be queried - ] - with self.run_test_with_workers( - path_to_test_specs, - merlin_server_dir, - conditions, - self.command_to_test, - flag=f"--spec {os.path.join(path_to_test_specs, 'multiple_workers.yaml')}", - ): - pass - - def test_workers_flag( - self, - redis_server: str, - redis_results_backend_config: "Fixture", # noqa: F821 - redis_broker_config: "Fixture", # noqa: F821 - path_to_test_specs: str, - merlin_server_dir: str, - ): - """ - Test the `merlin query-workers` command with the `--workers` flag. - - Run the `merlin query-workers` command with the `--workers` flag and ensure that - only the workers given with this flag are queried. To see more information on - exactly what this test is doing, see the `run_test_with_workers()` method. - - Parameters: - redis_server: - A fixture that starts a containerized redis server instance that runs on - localhost:6379. - redis_results_backend_config: - A fixture that modifies the CONFIG object so that it points the results - backend configuration to the containerized redis server we start up with - the `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - redis_broker_config: - A fixture that modifies the CONFIG object so that it points the broker - configuration to the containerized redis server we start up with the - `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - path_to_test_specs: - A fixture to provide the path to the directory containing test specifications. - merlin_server_dir: - A fixture to provide the path to the merlin_server directory that will be - created by the `redis_server` fixture. - """ - conditions = [ - HasRegex(WorkerMessages.NO_WORKERS_MSG.value, negate=True), # Some workers should be queried - HasRegex(WorkerMessages.STEP_1_WORKER.value), # This worker should be queried - HasRegex(WorkerMessages.STEP_2_WORKER.value), # This worker should be queried - HasRegex(WorkerMessages.OTHER_WORKER.value, negate=True), # This worker should NOT be queried - ] - with self.run_test_with_workers( - path_to_test_specs, - merlin_server_dir, - conditions, - self.command_to_test, - flag=f"--workers {WorkerMessages.STEP_1_WORKER.value} {WorkerMessages.STEP_2_WORKER.value}", - ): - pass - - def test_queues_flag( - self, - redis_server: str, - redis_results_backend_config: "Fixture", # noqa: F821 - redis_broker_config: "Fixture", # noqa: F821 - path_to_test_specs: str, - merlin_server_dir: str, - ): - """ - Test the `merlin query-workers` command with the `--queues` flag. - - Run the `merlin query-workers` command with the `--queues` flag and ensure that - only the workers attached to the given queues are queried. To see more information - on exactly what this test is doing, see the `run_test_with_workers()` method. - - Parameters: - redis_server: - A fixture that starts a containerized redis server instance that runs on - localhost:6379. - redis_results_backend_config: - A fixture that modifies the CONFIG object so that it points the results - backend configuration to the containerized redis server we start up with - the `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - redis_broker_config: - A fixture that modifies the CONFIG object so that it points the broker - configuration to the containerized redis server we start up with the - `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - path_to_test_specs: - A fixture to provide the path to the directory containing test specifications. - merlin_server_dir: - A fixture to provide the path to the merlin_server directory that will be - created by the `redis_server` fixture. - """ - conditions = [ - HasRegex(WorkerMessages.NO_WORKERS_MSG.value, negate=True), # One worker should be queried - HasRegex(WorkerMessages.STEP_1_WORKER.value), # This worker should be queried - HasRegex(WorkerMessages.STEP_2_WORKER.value, negate=True), # This worker should NOT be queried - HasRegex(WorkerMessages.OTHER_WORKER.value, negate=True), # This worker should NOT be queried - ] - with self.run_test_with_workers( - path_to_test_specs, - merlin_server_dir, - conditions, - self.command_to_test, - flag="--queues hello_queue", - ): - pass diff --git a/tests/integration/commands/test_stop_and_query_workers.py b/tests/integration/commands/test_stop_and_query_workers.py new file mode 100644 index 00000000..5d86e8d5 --- /dev/null +++ b/tests/integration/commands/test_stop_and_query_workers.py @@ -0,0 +1,442 @@ +""" +This module will contain the base class used for testing +the `stop-workers` and `query-workers` commands. +""" + +import os +import subprocess +from contextlib import contextmanager +from enum import Enum +from typing import List + +import pytest + +from tests.context_managers.celery_workers_manager import CeleryWorkersManager +from tests.integration.conditions import Condition, HasRegex +from tests.integration.helper_funcs import check_test_conditions, copy_app_yaml_to_cwd, load_workers_from_spec + + +# pylint: disable=unused-argument,import-outside-toplevel,too-many-arguments + + +class WorkerMessages(Enum): + """ + Enumerated strings to help keep track of the messages + that we're expecting (or not expecting) to see from the + tests in this module. + """ + + NO_WORKERS_MSG_STOP = "No workers found to stop" + NO_WORKERS_MSG_QUERY = "No workers found!" + STEP_1_WORKER = "step_1_merlin_test_worker" + STEP_2_WORKER = "step_2_merlin_test_worker" + OTHER_WORKER = "other_merlin_test_worker" + + +class TestStopAndQueryWorkersCommands: + """ + Tests for the `merlin stop-workers` and `merlin query-workers` commands. + Most of these tests will: + 1. Start workers from a spec file used for testing + - Use CeleryWorkerManager for this to ensure safe stoppage of workers + if something goes wrong + 2. Run the test command from a subprocess + """ + + @contextmanager + def run_test_with_workers( + self, + path_to_test_specs: str, + merlin_server_dir: str, + conditions: List[Condition], + command: str, + flag: str = None, + ): + """ + Helper method to run common testing logic for tests with workers started. + This method must also be a context manager so we can check the status of the + workers prior to the CeleryWorkersManager running it's exit code that shuts down + all active workers. + + This method will: + 0. Read in the necessary fixtures as parameters. These fixtures grab paths to + our test specs and the merlin server directory created from starting the + containerized redis server. + 1. Load in the worker specifications from the `multiple_workers.yaml` file. + 2. Use a context manager to start up the workers on the celery app connected to + the containerized redis server + 3. Copy the app.yaml file for the containerized redis server to the current working + directory so that merlin will connect to it when we run our test + 4. Run the test command that's provided and check that the conditions given are + passing. + 5. Yield control back to the calling method. + 6. Safely terminate workers that may have not been stopped once the calling method + completes. + + Parameters: + path_to_test_specs: + A fixture to provide the path to the directory containing test specifications. + merlin_server_dir: + A fixture to provide the path to the merlin_server directory that will be + created by the `redis_server` fixture. + conditions: + A list of `Condition` instances that need to pass in order for this test to + be successful. + command: + The command that we're testing. E.g. "merlin stop-workers" + flag: + An optional flag to add to the command that we're testing so we can test + different functionality for the command. + """ + from merlin.celery import app as celery_app + + # Grab worker configurations from the spec file + multiple_worker_spec = os.path.join(path_to_test_specs, "multiple_workers.yaml") + workers_from_spec = load_workers_from_spec(multiple_worker_spec) + + # We use a context manager to start workers so that they'll safely stop even if this test fails + with CeleryWorkersManager(celery_app) as workers_manager: + workers_manager.launch_workers(workers_from_spec) + + # Copy the app.yaml to the cwd so merlin will connect to the testing server + copy_app_yaml_to_cwd(merlin_server_dir) + + # Run the test + cmd_to_test = f"{command} {flag}" if flag else command + result = subprocess.run(cmd_to_test, capture_output=True, text=True, shell=True) + + info = { + "stdout": result.stdout, + "stderr": result.stderr, + "return_code": result.returncode, + } + + # Ensure all test conditions are satisfied + check_test_conditions(conditions, info) + + yield + + def get_no_workers_msg(self, command_to_test: str) -> WorkerMessages: + """ + Retrieve the appropriate "no workers" found message. + + This method checks the command to test and returns a corresponding + message based on whether the command is to stop workers or query for them. + + Returns: + The message indicating that no workers are available, depending on the + command being tested. + """ + no_workers_msg = None + if command_to_test == "merlin stop-workers": + no_workers_msg = WorkerMessages.NO_WORKERS_MSG_STOP.value + else: + no_workers_msg = WorkerMessages.NO_WORKERS_MSG_QUERY.value + return no_workers_msg + + @pytest.mark.parametrize("command_to_test", ["merlin stop-workers", "merlin query-workers"]) + def test_no_workers( + self, + redis_server: str, + redis_results_backend_config: "Fixture", # noqa: F821 + redis_broker_config: "Fixture", # noqa: F821 + merlin_server_dir: str, + command_to_test: str, + ): + """ + Test the `merlin stop-workers` and `merlin query-workers` commands with no workers + started in the first place. + + This test will: + 0. Setup the pytest fixtures which include: + - starting a containerized Redis server + - updating the CONFIG object to point to the containerized Redis server + - obtaining the path to the merlin server directory created from starting + the containerized Redis server + 1. Copy the app.yaml file for the containerized redis server to the current working + directory so that merlin will connect to it when we run our test + 2. Run the test command that's provided and check that the conditions given are + passing. + + Parameters: + redis_server: + A fixture that starts a containerized redis server instance that runs on + localhost:6379. + redis_results_backend_config: + A fixture that modifies the CONFIG object so that it points the results + backend configuration to the containerized redis server we start up with + the `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + redis_broker_config: + A fixture that modifies the CONFIG object so that it points the broker + configuration to the containerized redis server we start up with the + `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + merlin_server_dir: + A fixture to provide the path to the merlin_server directory that will be + created by the `redis_server` fixture. + command_to_test: + The command that we're testing, obtained from the parametrize call. + """ + conditions = [ + HasRegex(self.get_no_workers_msg(command_to_test)), + HasRegex(WorkerMessages.STEP_1_WORKER.value, negate=True), + HasRegex(WorkerMessages.STEP_2_WORKER.value, negate=True), + HasRegex(WorkerMessages.OTHER_WORKER.value, negate=True), + ] + + # Copy the app.yaml to the cwd so merlin will connect to the testing server + copy_app_yaml_to_cwd(merlin_server_dir) + + # Run the test + result = subprocess.run(command_to_test, capture_output=True, text=True, shell=True) + info = { + "stdout": result.stdout, + "stderr": result.stderr, + "return_code": result.returncode, + } + + # Ensure all test conditions are satisfied + check_test_conditions(conditions, info) + + @pytest.mark.parametrize("command_to_test", ["merlin stop-workers", "merlin query-workers"]) + def test_no_flags( + self, + redis_server: str, + redis_results_backend_config: "Fixture", # noqa: F821 + redis_broker_config: "Fixture", # noqa: F821 + path_to_test_specs: str, + merlin_server_dir: str, + command_to_test: str, + ): + """ + Test the `merlin stop-workers` and `merlin query-workers` commands with no flags. + + Run the commands referenced above and ensure the text output from Merlin is correct. + For the `stop-workers` command, we check if all workers are stopped as well. + To see more information on exactly what this test is doing, see the + `run_test_with_workers()` method. + + Parameters: + redis_server: + A fixture that starts a containerized redis server instance that runs on + localhost:6379. + redis_results_backend_config: + A fixture that modifies the CONFIG object so that it points the results + backend configuration to the containerized redis server we start up with + the `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + redis_broker_config: + A fixture that modifies the CONFIG object so that it points the broker + configuration to the containerized redis server we start up with the + `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + path_to_test_specs: + A fixture to provide the path to the directory containing test specifications. + merlin_server_dir: + A fixture to provide the path to the merlin_server directory that will be + created by the `redis_server` fixture. + command_to_test: + The command that we're testing, obtained from the parametrize call. + """ + conditions = [ + HasRegex(self.get_no_workers_msg(command_to_test), negate=True), + HasRegex(WorkerMessages.STEP_1_WORKER.value), + HasRegex(WorkerMessages.STEP_2_WORKER.value), + HasRegex(WorkerMessages.OTHER_WORKER.value), + ] + with self.run_test_with_workers(path_to_test_specs, merlin_server_dir, conditions, command_to_test): + if command_to_test == "merlin stop-workers": + # After the test runs and before the CeleryWorkersManager exits, ensure there are no workers on the app + from merlin.celery import app as celery_app + + active_queues = celery_app.control.inspect().active_queues() + assert active_queues is None + + @pytest.mark.parametrize("command_to_test", ["merlin stop-workers", "merlin query-workers"]) + def test_spec_flag( + self, + redis_server: str, + redis_results_backend_config: "Fixture", # noqa: F821 + redis_broker_config: "Fixture", # noqa: F821 + path_to_test_specs: str, + merlin_server_dir: str, + command_to_test: str, + ): + """ + Test the `merlin stop-workers` and `merlin query-workers` commands with the `--spec` + flag. + + Run the commands referenced above with the `--spec` flag and ensure the text output + from Merlin is correct. For the `stop-workers` command, we check if all workers defined + in the spec file are stopped as well. To see more information on exactly what this test + is doing, see the `run_test_with_workers()` method. + + Parameters: + redis_server: + A fixture that starts a containerized redis server instance that runs on + localhost:6379. + redis_results_backend_config: + A fixture that modifies the CONFIG object so that it points the results + backend configuration to the containerized redis server we start up with + the `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + redis_broker_config: + A fixture that modifies the CONFIG object so that it points the broker + configuration to the containerized redis server we start up with the + `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + path_to_test_specs: + A fixture to provide the path to the directory containing test specifications. + merlin_server_dir: + A fixture to provide the path to the merlin_server directory that will be + created by the `redis_server` fixture. + command_to_test: + The command that we're testing, obtained from the parametrize call. + """ + conditions = [ + HasRegex(self.get_no_workers_msg(command_to_test), negate=True), + HasRegex(WorkerMessages.STEP_1_WORKER.value), + HasRegex(WorkerMessages.STEP_2_WORKER.value), + HasRegex(WorkerMessages.OTHER_WORKER.value), + ] + with self.run_test_with_workers( + path_to_test_specs, + merlin_server_dir, + conditions, + command_to_test, + flag=f"--spec {os.path.join(path_to_test_specs, 'multiple_workers.yaml')}", + ): + if command_to_test == "merlin stop-workers": + from merlin.celery import app as celery_app + + active_queues = celery_app.control.inspect().active_queues() + assert active_queues is None + + @pytest.mark.parametrize("command_to_test", ["merlin stop-workers", "merlin query-workers"]) + def test_workers_flag( + self, + redis_server: str, + redis_results_backend_config: "Fixture", # noqa: F821 + redis_broker_config: "Fixture", # noqa: F821 + path_to_test_specs: str, + merlin_server_dir: str, + command_to_test: str, + ): + """ + Test the `merlin stop-workers` and `merlin query-workers` commands with the `--workers` + flag. + + Run the commands referenced above with the `--workers` flag and ensure the text output + from Merlin is correct. For the `stop-workers` command, we check to make sure that all + workers given with this flag are stopped. To see more information on exactly what this + test is doing, see the `run_test_with_workers()` method. + + Parameters: + redis_server: + A fixture that starts a containerized redis server instance that runs on + localhost:6379. + redis_results_backend_config: + A fixture that modifies the CONFIG object so that it points the results + backend configuration to the containerized redis server we start up with + the `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + redis_broker_config: + A fixture that modifies the CONFIG object so that it points the broker + configuration to the containerized redis server we start up with the + `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + path_to_test_specs: + A fixture to provide the path to the directory containing test specifications. + merlin_server_dir: + A fixture to provide the path to the merlin_server directory that will be + created by the `redis_server` fixture. + command_to_test: + The command that we're testing, obtained from the parametrize call. + """ + conditions = [ + HasRegex(self.get_no_workers_msg(command_to_test), negate=True), + HasRegex(WorkerMessages.STEP_1_WORKER.value), + HasRegex(WorkerMessages.STEP_2_WORKER.value), + HasRegex(WorkerMessages.OTHER_WORKER.value, negate=True), + ] + with self.run_test_with_workers( + path_to_test_specs, + merlin_server_dir, + conditions, + command_to_test, + flag=f"--workers {WorkerMessages.STEP_1_WORKER.value} {WorkerMessages.STEP_2_WORKER.value}", + ): + if command_to_test == "merlin stop-workers": + from merlin.celery import app as celery_app + + active_queues = celery_app.control.inspect().active_queues() + worker_name = f"celery@{WorkerMessages.OTHER_WORKER.value}" + assert worker_name in active_queues + + @pytest.mark.parametrize("command_to_test", ["merlin stop-workers", "merlin query-workers"]) + def test_queues_flag( + self, + redis_server: str, + redis_results_backend_config: "Fixture", # noqa: F821 + redis_broker_config: "Fixture", # noqa: F821 + path_to_test_specs: str, + merlin_server_dir: str, + command_to_test: str, + ): + """ + Test the `merlin stop-workers` and `merlin query-workers` commands with the `--queues` + flag. + + Run the commands referenced above with the `--queues` flag and ensure the text output + from Merlin is correct. For the `stop-workers` command, we check that only the workers + attached to the given queues are stopped. To see more information on exactly what this + test is doing, see the `run_test_with_workers()` method. + + Parameters: + redis_server: + A fixture that starts a containerized redis server instance that runs on + localhost:6379. + redis_results_backend_config: + A fixture that modifies the CONFIG object so that it points the results + backend configuration to the containerized redis server we start up with + the `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + redis_broker_config: + A fixture that modifies the CONFIG object so that it points the broker + configuration to the containerized redis server we start up with the + `redis_server` fixture. The CONFIG object is what merlin uses to connect + to a server. + path_to_test_specs: + A fixture to provide the path to the directory containing test specifications. + merlin_server_dir: + A fixture to provide the path to the merlin_server directory that will be + created by the `redis_server` fixture. + command_to_test: + The command that we're testing, obtained from the parametrize call. + """ + conditions = [ + HasRegex(self.get_no_workers_msg(command_to_test), negate=True), + HasRegex(WorkerMessages.STEP_1_WORKER.value), + HasRegex(WorkerMessages.STEP_2_WORKER.value, negate=True), + HasRegex(WorkerMessages.OTHER_WORKER.value, negate=True), + ] + with self.run_test_with_workers( + path_to_test_specs, + merlin_server_dir, + conditions, + command_to_test, + flag="--queues hello_queue", + ): + if command_to_test == "merlin stop-workers": + from merlin.celery import app as celery_app + + active_queues = celery_app.control.inspect().active_queues() + workers_that_should_be_alive = [ + f"celery@{WorkerMessages.OTHER_WORKER.value}", + f"celery@{WorkerMessages.STEP_2_WORKER.value}", + ] + for worker_name in workers_that_should_be_alive: + assert worker_name in active_queues + +# pylint: enable=unused-argument,import-outside-toplevel,too-many-arguments diff --git a/tests/integration/commands/test_stop_workers.py b/tests/integration/commands/test_stop_workers.py deleted file mode 100644 index c8b424f0..00000000 --- a/tests/integration/commands/test_stop_workers.py +++ /dev/null @@ -1,292 +0,0 @@ -""" -Tests for the `merlin stop-workers` command. -""" - -import os -from enum import Enum - -from tests.integration.commands.base_classes import BaseStopWorkersAndQueryWorkersTest -from tests.integration.conditions import HasRegex - - -class WorkerMessages(Enum): - """ - Enumerated strings to help keep track of the messages - that we're expecting (or not expecting) to see from the - tests in this module. - """ - - NO_WORKERS_MSG = "No workers found to stop" - STEP_1_WORKER = "step_1_merlin_test_worker" - STEP_2_WORKER = "step_2_merlin_test_worker" - OTHER_WORKER = "other_merlin_test_worker" - - -class TestStopWorkers(BaseStopWorkersAndQueryWorkersTest): - """ - Tests for the `merlin stop-workers` command. Most of these tests will: - 1. Start workers from a spec file used for testing - - Use CeleryWorkerManager for this to ensure safe stoppage of workers - if something goes wrong - 2. Run the `merlin stop-workers` command from a subprocess - """ - - command_to_test = "merlin stop-workers" - - def test_no_workers( - self, - redis_server: str, - redis_results_backend_config: "Fixture", # noqa: F821 - redis_broker_config: "Fixture", # noqa: F821 - merlin_server_dir: str, - ): - """ - Test the `merlin stop-workers` command with no workers started in the first place. - - Run the `merlin stop-workers` command and ensure that a "no workers found" message - is written to the output. To see more information on exactly what this test is doing, - see the `run_test_without_workers()` method of the base class. - - Parameters: - redis_server: - A fixture that starts a containerized redis server instance that runs on - localhost:6379. - redis_results_backend_config: - A fixture that modifies the CONFIG object so that it points the results - backend configuration to the containerized redis server we start up with - the `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - redis_broker_config: - A fixture that modifies the CONFIG object so that it points the broker - configuration to the containerized redis server we start up with the - `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - merlin_server_dir: - A fixture to provide the path to the merlin_server directory that will be - created by the `redis_server` fixture. - """ - conditions = [ - HasRegex(WorkerMessages.NO_WORKERS_MSG.value), # No workers should be launched so we should see this - HasRegex(WorkerMessages.STEP_1_WORKER.value, negate=True), # None of these workers should be started - HasRegex(WorkerMessages.STEP_2_WORKER.value, negate=True), # None of these workers should be started - HasRegex(WorkerMessages.OTHER_WORKER.value, negate=True), # None of these workers should be started - ] - self.run_test_without_workers(merlin_server_dir, conditions, self.command_to_test) - - def test_no_flags( - self, - redis_server: str, - redis_results_backend_config: "Fixture", # noqa: F821 - redis_broker_config: "Fixture", # noqa: F821 - path_to_test_specs: str, - merlin_server_dir: str, - ): - """ - Test the `merlin stop-workers` command with no flags. - - Run the `merlin stop-workers` command and ensure that all workers are stopped. - To see more information on exactly what this test is doing, see the - `run_test_with_workers()` method of the base class. - - Parameters: - redis_server: - A fixture that starts a containerized redis server instance that runs on - localhost:6379. - redis_results_backend_config: - A fixture that modifies the CONFIG object so that it points the results - backend configuration to the containerized redis server we start up with - the `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - redis_broker_config: - A fixture that modifies the CONFIG object so that it points the broker - configuration to the containerized redis server we start up with the - `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - path_to_test_specs: - A fixture to provide the path to the directory containing test specifications. - merlin_server_dir: - A fixture to provide the path to the merlin_server directory that will be - created by the `redis_server` fixture. - """ - from merlin.celery import app as celery_app - - # Define test conditions - conditions = [ - HasRegex(WorkerMessages.NO_WORKERS_MSG.value, negate=True), # Some workers should be stopped - HasRegex(WorkerMessages.STEP_1_WORKER.value), # This worker should be stopped - HasRegex(WorkerMessages.STEP_2_WORKER.value), # This worker should be stopped - HasRegex(WorkerMessages.OTHER_WORKER.value), # This worker should be stopped - ] - - # Run the test - with self.run_test_with_workers(path_to_test_specs, merlin_server_dir, conditions, self.command_to_test): - # After the test runs and before the CeleryWorkersManager exits, ensure there are no workers on the app - active_queues = celery_app.control.inspect().active_queues() - assert active_queues is None - - def test_spec_flag( - self, - redis_server: str, - redis_results_backend_config: "Fixture", # noqa: F821 - redis_broker_config: "Fixture", # noqa: F821 - path_to_test_specs: str, - merlin_server_dir: str, - ): - """ - Test the `merlin stop-workers` command with the `--spec` flag. - - Run the `merlin stop-workers` command with the `--spec` flag and ensure that all - workers are stopped. To see more information on exactly what this test is doing, - see the `run_test_with_workers()` method of the base class. - - Parameters: - redis_server: - A fixture that starts a containerized redis server instance that runs on - localhost:6379. - redis_results_backend_config: - A fixture that modifies the CONFIG object so that it points the results - backend configuration to the containerized redis server we start up with - the `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - redis_broker_config: - A fixture that modifies the CONFIG object so that it points the broker - configuration to the containerized redis server we start up with the - `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - path_to_test_specs: - A fixture to provide the path to the directory containing test specifications. - merlin_server_dir: - A fixture to provide the path to the merlin_server directory that will be - created by the `redis_server` fixture. - """ - from merlin.celery import app as celery_app - - conditions = [ - HasRegex(WorkerMessages.NO_WORKERS_MSG.value, negate=True), # Some workers should be stopped - HasRegex(WorkerMessages.STEP_1_WORKER.value), # This worker should be stopped - HasRegex(WorkerMessages.STEP_2_WORKER.value), # This worker should be stopped - HasRegex(WorkerMessages.OTHER_WORKER.value), # This worker should be stopped - ] - with self.run_test_with_workers( - path_to_test_specs, - merlin_server_dir, - conditions, - self.command_to_test, - flag=f"--spec {os.path.join(path_to_test_specs, 'multiple_workers.yaml')}", - ): - active_queues = celery_app.control.inspect().active_queues() - assert active_queues is None - - def test_workers_flag( - self, - redis_server: str, - redis_results_backend_config: "Fixture", # noqa: F821 - redis_broker_config: "Fixture", # noqa: F821 - path_to_test_specs: str, - merlin_server_dir: str, - ): - """ - Test the `merlin stop-workers` command with the `--workers` flag. - - Run the `merlin stop-workers` command with the `--workers` flag and ensure that - only the workers given with this flag are stopped. To see more information on - exactly what this test is doing, see the `run_test_with_workers()` method of the - base class. - - Parameters: - redis_server: - A fixture that starts a containerized redis server instance that runs on - localhost:6379. - redis_results_backend_config: - A fixture that modifies the CONFIG object so that it points the results - backend configuration to the containerized redis server we start up with - the `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - redis_broker_config: - A fixture that modifies the CONFIG object so that it points the broker - configuration to the containerized redis server we start up with the - `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - path_to_test_specs: - A fixture to provide the path to the directory containing test specifications. - merlin_server_dir: - A fixture to provide the path to the merlin_server directory that will be - created by the `redis_server` fixture. - """ - from merlin.celery import app as celery_app - - conditions = [ - HasRegex(WorkerMessages.NO_WORKERS_MSG.value, negate=True), # Some workers should be stopped - HasRegex(WorkerMessages.STEP_1_WORKER.value), # This worker should be stopped - HasRegex(WorkerMessages.STEP_2_WORKER.value), # This worker should be stopped - HasRegex(WorkerMessages.OTHER_WORKER.value, negate=True), # This worker should NOT be stopped - ] - with self.run_test_with_workers( - path_to_test_specs, - merlin_server_dir, - conditions, - self.command_to_test, - flag=f"--workers {WorkerMessages.STEP_1_WORKER.value} {WorkerMessages.STEP_2_WORKER.value}", - ): - active_queues = celery_app.control.inspect().active_queues() - worker_name = f"celery@{WorkerMessages.OTHER_WORKER.value}" - assert worker_name in active_queues - - def test_queues_flag( - self, - redis_server: str, - redis_results_backend_config: "Fixture", # noqa: F821 - redis_broker_config: "Fixture", # noqa: F821 - path_to_test_specs: str, - merlin_server_dir: str, - ): - """ - Test the `merlin stop-workers` command with the `--queues` flag. - - Run the `merlin stop-workers` command with the `--queues` flag and ensure that - only the workers attached to the given queues are stopped. To see more information - on exactly what this test is doing, see the `run_test_with_workers()` method of the - base class. - - Parameters: - redis_server: - A fixture that starts a containerized redis server instance that runs on - localhost:6379. - redis_results_backend_config: - A fixture that modifies the CONFIG object so that it points the results - backend configuration to the containerized redis server we start up with - the `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - redis_broker_config: - A fixture that modifies the CONFIG object so that it points the broker - configuration to the containerized redis server we start up with the - `redis_server` fixture. The CONFIG object is what merlin uses to connect - to a server. - path_to_test_specs: - A fixture to provide the path to the directory containing test specifications. - merlin_server_dir: - A fixture to provide the path to the merlin_server directory that will be - created by the `redis_server` fixture. - """ - from merlin.celery import app as celery_app - - conditions = [ - HasRegex(WorkerMessages.NO_WORKERS_MSG.value, negate=True), # One workers should be stopped - HasRegex(WorkerMessages.STEP_1_WORKER.value), # This worker should be stopped - HasRegex(WorkerMessages.STEP_2_WORKER.value, negate=True), # This worker should NOT be stopped - HasRegex(WorkerMessages.OTHER_WORKER.value, negate=True), # This worker should NOT be stopped - ] - with self.run_test_with_workers( - path_to_test_specs, - merlin_server_dir, - conditions, - self.command_to_test, - flag="--queues hello_queue", - ): - active_queues = celery_app.control.inspect().active_queues() - workers_that_should_be_alive = [ - f"celery@{WorkerMessages.OTHER_WORKER.value}", - f"celery@{WorkerMessages.STEP_2_WORKER.value}", - ] - for worker_name in workers_that_should_be_alive: - assert worker_name in active_queues diff --git a/tests/integration/helper_funcs.py b/tests/integration/helper_funcs.py index ac3d5d7d..fc976a68 100644 --- a/tests/integration/helper_funcs.py +++ b/tests/integration/helper_funcs.py @@ -102,11 +102,11 @@ def check_test_conditions(conditions: List[Condition], info: Dict[str, str]): condition.ingest_info(info) try: assert condition.passes - except AssertionError: + except AssertionError as exc: error_message = ( f"Condition failed: {condition}\n" f"Captured stdout: {info['stdout']}\n" f"Captured stderr: {info['stderr']}\n" f"Return code: {info['return_code']}\n" ) - raise AssertionError(error_message) + raise AssertionError(error_message) from exc