diff --git a/.gitignore b/.gitignore index 56b81a8f39c8f..d7d691afa816a 100644 --- a/.gitignore +++ b/.gitignore @@ -16,6 +16,9 @@ .jar /dashboard/client/build +# Kuberay config lives in a separate repository +python/ray/autoscaler/kuberay/config + # Files generated by flatc should be ignored /src/ray/gcs/format/*_generated.h /src/ray/object_manager/format/*_generated.h diff --git a/ci/lint/check_api_annotations.py b/ci/lint/check_api_annotations.py index 2445afc1a72d6..4f5ee74b3ce81 100755 --- a/ci/lint/check_api_annotations.py +++ b/ci/lint/check_api_annotations.py @@ -99,7 +99,13 @@ def verify(symbol, scanned, ok, output, prefix=None, ignore=None): verify(ray.air, set(), ok, output) verify(ray.train, set(), ok, output) verify(ray.tune, set(), ok, output) - verify(ray, set(), ok, output, ignore=["ray.workflow", "ray.tune", "ray.serve"]) + verify( + ray, + set(), + ok, + output, + ignore=["ray.workflow", "ray.tune", "ray.serve"], + ) verify(ray.serve, set(), ok, output) assert len(ok) >= 500, len(ok) # TODO(ekl) enable it for all modules. diff --git a/dashboard/agent.py b/dashboard/agent.py index df57590ff0b6b..86309615ce8b7 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -5,6 +5,7 @@ import logging import logging.handlers import os +import pathlib import sys import signal @@ -18,7 +19,10 @@ from ray._private.gcs_pubsub import GcsAioPublisher from ray._raylet import GcsClient from ray._private.gcs_utils import GcsAioClient -from ray._private.ray_logging import setup_component_logger +from ray._private.ray_logging import ( + setup_component_logger, + configure_log_file, +) from ray.core.generated import agent_manager_pb2, agent_manager_pb2_grpc from ray.experimental.internal_kv import ( _initialize_internal_kv, @@ -341,6 +345,14 @@ async def _check_parent(): await self.http_server.cleanup() +def open_capture_files(log_dir): + filename = f"agent-{args.agent_id}" + return ( + ray._private.utils.open_log(pathlib.Path(log_dir) / f"{filename}.out"), + ray._private.utils.open_log(pathlib.Path(log_dir) / f"{filename}.err"), + ) + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Dashboard agent.") parser.add_argument( @@ -507,6 +519,10 @@ async def _check_parent(): # w.r.t grpc server init in the DashboardAgent initializer. loop = ray._private.utils.get_or_create_event_loop() + # Setup stdout/stderr redirect files + out_file, err_file = open_capture_files(args.log_dir) + configure_log_file(out_file, err_file) + agent = DashboardAgent( args.node_ip_address, args.dashboard_agent_port, diff --git a/doc/source/ray-observability/ray-logging.rst b/doc/source/ray-observability/ray-logging.rst index ac238cc4a715e..b0b931db00373 100644 --- a/doc/source/ray-observability/ray-logging.rst +++ b/doc/source/ray-observability/ray-logging.rst @@ -2,7 +2,43 @@ Logging ======= -This document will explain Ray's logging system and its best practices. +This document explains Ray's logging system and related best practices. + +Internal Ray Logging Configuration +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +When ``import ray`` is executed, Ray's logger is initialized, generating a sensible configuration given in ``python/ray/_private/log.py``. The default logging level is ``logging.INFO``. + +All ray loggers are automatically configured in ``ray._private.ray_logging``. To change the Ray library logging configuration: + +.. code-block:: python + + import logging + + logger = logging.getLogger("ray") + logger # Modify the ray logging config + +Similarly, to modify the logging configuration for any Ray subcomponent, specify the appropriate logger name: + +.. code-block:: python + + import logging + + # First, get the handle for the logger you want to modify + ray_data_logger = logging.getLogger("ray.data") + ray_tune_logger = logging.getLogger("ray.tune") + ray_rllib_logger = logging.getLogger("ray.rllib") + ray_air_logger = logging.getLogger("ray.air") + ray_train_logger = logging.getLogger("ray.train") + ray_workflow_logger = logging.getLogger("ray.workflow") + + # Modify the ray.data logging level + ray_data_logger.setLevel(logging.WARNING) + + # Other loggers can be modified similarly. + # Here's how to add an aditional file handler for ray tune: + ray_tune_logger.addHandler(logging.FileHandler("extra_ray_tune_log.log")) + +For more information about logging in workers, see :ref:`Customizing worker loggers`. Driver logs ~~~~~~~~~~~ @@ -16,12 +52,12 @@ The log file consists of the stdout of the entrypoint command of the job. For t .. _ray-worker-logs: -Worker logs +Worker stdout and stderr ~~~~~~~~~~~ -Ray's tasks or actors are executed remotely within Ray's worker processes. Ray has special support to improve the visibility of logs produced by workers. +Ray's tasks or actors are executed remotely within Ray's worker processes. Ray has special support to improve the visibility of stdout and stderr produced by workers. -- By default, all of the tasks/actors stdout and stderr are redirected to the worker log files. Check out :ref:`Logging directory structure ` to learn how Ray's logging directory is structured. -- By default, all of the tasks/actors stdout and stderr that is redirected to worker log files are published to the driver. Drivers display logs generated from its tasks/actors to its stdout and stderr. +- By default, stdout and stderr from all tasks and actors are redirected to the worker log files, including any log messages generated by the worker. See :ref:`Logging directory structure ` to understand the structure of the Ray logging directory. +- By default, the driver reads the worker log files to which the stdout and stderr for all tasks and actors are redirected. Drivers display all stdout and stderr generated from their tasks or actors to their own stdout and stderr. Let's look at a code example to see how this works. @@ -37,7 +73,7 @@ Let's look at a code example to see how this works. ray.get(task.remote()) -You should be able to see the string `task` from your driver stdout. +You should be able to see the string `task` from your driver stdout. When logs are printed, the process id (pid) and an IP address of the node that executes tasks/actors are printed together. Check out the output below. @@ -139,10 +175,9 @@ Limitations: By default, the builtin print will also be patched to use `ray.experimental.tqdm_ray.safe_print` when `tqdm_ray` is used. This avoids progress bar corruption on driver print statements. To disable this, set `RAY_TQDM_PATCH_PRINT=0`. -How to set up loggers +Customizing Worker Loggers ~~~~~~~~~~~~~~~~~~~~~ -When using ray, all of the tasks and actors are executed remotely in Ray's worker processes. -Since Python logger module creates a singleton logger per process, loggers should be configured on per task/actor basis. +When using Ray, all tasks and actors are executed remotely in Ray's worker processes. .. note:: @@ -164,7 +199,8 @@ Since Python logger module creates a singleton logger per process, loggers shoul logging.basicConfig(level=logging.INFO) def log(self, msg): - logging.info(msg) + logger = logging.getLogger(__name__) + logger.info(msg) actor = Actor.remote() ray.get(actor.log.remote("A log message for an actor.")) @@ -172,14 +208,15 @@ Since Python logger module creates a singleton logger per process, loggers shoul @ray.remote def f(msg): logging.basicConfig(level=logging.INFO) - logging.info(msg) + logger = logging.getLogger(__name__) + logger.info(msg) - ray.get(f.remote("A log message for a task")) + ray.get(f.remote("A log message for a task.")) .. code-block:: bash - (pid=95193) INFO:root:A log message for a task - (pid=95192) INFO:root:A log message for an actor. + (Actor pid=179641) INFO:__main__:A log message for an actor. + (f pid=177572) INFO:__main__:A log message for a task. How to use structured logging ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -206,11 +243,11 @@ Logging directory structure --------------------------- .. _logging-directory-structure: -By default, Ray logs are stored in a ``/tmp/ray/session_*/logs`` directory. +By default, Ray logs are stored in a ``/tmp/ray/session_*/logs`` directory. .. note:: - The default temp directory is ``/tmp/ray`` (for Linux and Mac OS). If you'd like to change the temp directory, you can specify it when ``ray start`` or ``ray.init()`` is called. + The default temp directory is ``/tmp/ray`` (for Linux and MacOS). To change the temp directory, specify it when you call ``ray start`` or ``ray.init()``. A new Ray instance creates a new session ID to the temp directory. The latest session ID is symlinked to ``/tmp/ray/session_latest``. @@ -235,7 +272,7 @@ Here's a Ray log directory structure. Note that ``.out`` is logs from stdout/std For the logs of the actual installations (including e.g. ``pip install`` logs), see the ``runtime_env_setup-[job_id].log`` file (see below). - ``runtime_env_setup-[job_id].log``: Logs from installing :ref:`runtime environments ` for a task, actor or job. This file will only be present if a runtime environment is installed. - ``runtime_env_setup-ray_client_server_[port].log``: Logs from installing :ref:`runtime environments ` for a job when connecting via :ref:`Ray Client `. -- ``worker-[worker_id]-[job_id]-[pid].[out|err]``: Python/Java part of Ray drivers and workers. All of stdout and stderr from tasks/actors are streamed here. Note that job_id is an id of the driver.- +- ``worker-[worker_id]-[job_id]-[pid].[out|err]``: Python or Java part of Ray drivers and workers. All of stdout and stderr from tasks or actors are streamed here. Note that job_id is an id of the driver.- .. _ray-log-rotation: diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 08fafe10cd5bf..358832866eeb0 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -1,8 +1,10 @@ # isort: skip_file +from ray._private import log # isort: skip # noqa: F401 import logging import os import sys +log.generate_logging_config() logger = logging.getLogger(__name__) diff --git a/python/ray/_private/log.py b/python/ray/_private/log.py new file mode 100644 index 0000000000000..ea19fbae7926b --- /dev/null +++ b/python/ray/_private/log.py @@ -0,0 +1,136 @@ +import logging +import re +from logging.config import dictConfig +import threading + + +class ContextFilter(logging.Filter): + """A filter that adds ray context info to log records. + + This filter adds a package name to append to the message as well as information + about what worker emitted the message, if applicable. + """ + + logger_regex = re.compile(r"ray(\.(?P\w+))?(\..*)?") + package_message_names = { + "air": "AIR", + "data": "Data", + "rllib": "RLlib", + "serve": "Serve", + "train": "Train", + "tune": "Tune", + "workflow": "Workflow", + } + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def filter(self, record: logging.LogRecord) -> bool: + """Add context information to the log record. + + This filter adds a package name from where the message was generated as + well as the worker IP address, if applicable. + + Args: + record: Record to be filtered + + Returns: + True if the record is to be logged, False otherwise. (This filter only + adds context, so records are always logged.) + """ + match = self.logger_regex.search(record.name) + if match and match["subpackage"] in self.package_message_names: + record.package = f"[Ray {self.package_message_names[match['subpackage']]}]" + else: + record.package = "" + + return True + + +class PlainRayHandler(logging.StreamHandler): + """A plain log handler. + + This handler writes to whatever sys.stderr points to at emit-time, + not at instantiation time. See docs for logging._StderrHandler. + """ + + def __init__(self): + super().__init__() + self.plain_handler = logging._StderrHandler() + self.plain_handler.level = self.level + self.plain_handler.formatter = logging.Formatter(fmt="%(message)s") + + def emit(self, record: logging.LogRecord): + """Emit the log message. + + If this is a worker, bypass fancy logging and just emit the log record. + If this is the driver, emit the message using the appropriate console handler. + + Args: + record: Log record to be emitted + """ + import ray + + if ( + hasattr(ray, "_private") + and ray._private.worker.global_worker.mode + == ray._private.worker.WORKER_MODE + ): + self.plain_handler.emit(record) + else: + logging._StderrHandler.emit(self, record) + + +logger_initialized = False +logging_config_lock = threading.Lock() + + +def generate_logging_config(): + """Generate the default Ray logging configuration.""" + with logging_config_lock: + global logger_initialized + if logger_initialized: + return + logger_initialized = True + + formatters = { + "plain": { + "datefmt": "[%Y-%m-%d %H:%M:%S]", + "format": "%(asctime)s %(package)s %(levelname)s %(name)s::%(message)s", + }, + } + filters = {"context_filter": {"()": ContextFilter}} + handlers = { + "default": { + "()": PlainRayHandler, + "formatter": "plain", + "filters": ["context_filter"], + } + } + + loggers = { + # Default ray logger; any log message that gets propagated here will be + # logged to the console. Disable propagation, as many users will use + # basicConfig to set up a default handler. If so, logs will be + # printed twice unless we prevent propagation here. + "ray": { + "level": "INFO", + "handlers": ["default"], + "propagate": False, + }, + # Special handling for ray.rllib: only warning-level messages passed through + # See https://github.com/ray-project/ray/pull/31858 for related PR + "ray.rllib": { + "level": "WARN", + }, + } + + dictConfig( + { + "version": 1, + "formatters": formatters, + "filters": filters, + "handlers": handlers, + "loggers": loggers, + } + ) diff --git a/python/ray/_private/ray_logging.py b/python/ray/_private/ray_logging.py index fb0609a64017c..8ef95a6a3c8a7 100644 --- a/python/ray/_private/ray_logging.py +++ b/python/ray/_private/ray_logging.py @@ -20,8 +20,6 @@ from ray._private.utils import binary_to_hex from ray.util.debug import log_once -_default_handler = None - def setup_logger( logging_level: int, @@ -32,14 +30,6 @@ def setup_logger( if type(logging_level) is str: logging_level = logging.getLevelName(logging_level.upper()) logger.setLevel(logging_level) - global _default_handler - if _default_handler is None: - _default_handler = logging._StderrHandler() - logger.addHandler(_default_handler) - _default_handler.setFormatter(logging.Formatter(logging_format)) - # Setting this will avoid the message - # being propagated to the parent logger. - logger.propagate = False def setup_component_logger( @@ -53,11 +43,18 @@ def setup_component_logger( logger_name=None, propagate=True, ): - """Configure the root logger that is used for Ray's python components. + """Configure the logger that is used for Ray's python components. For example, it should be used for monitor, dashboard, and log monitor. The only exception is workers. They use the different logging config. + Ray's python components generally should not write to stdout/stderr, because + messages written there will be redirected to the head node. For deployments where + there may be thousands of workers, this would create unacceptable levels of log + spam. For this reason, we disable the "ray" logger's handlers, and enable + propagation so that log messages that actually do need to be sent to the head node + can reach it. + Args: logging_level: Logging level in string or logging enum. logging_format: Logging format string. @@ -73,6 +70,10 @@ def setup_component_logger( Returns: the created or modified logger. """ + ray_logger = logging.getLogger("ray") + ray_logger.propagate = True + ray_logger.handlers.clear() + logger = logging.getLogger(logger_name) if type(logging_level) is str: logging_level = logging.getLevelName(logging_level.upper()) diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index 122787c6050d9..26c92830ca86e 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -1863,3 +1863,33 @@ def get_current_unused_port(): port = sock.getsockname()[1] sock.close() return port + + +def search_words(string: str, words: str): + """Check whether each word is in the given string. + + Args: + string: String to search + words: Space-separated string of words to search for + """ + return [word in string for word in words.split(" ")] + + +def has_all_words(string: str, words: str): + """Check that string has all of the given words. + + Args: + string: String to search + words: Space-separated string of words to search for + """ + return all(search_words(string, words)) + + +def has_no_words(string, words): + """Check that string has none of the given words. + + Args: + string: String to search + words: Space-separated string of words to search for + """ + return not any(search_words(string, words)) diff --git a/python/ray/_private/worker.py b/python/ray/_private/worker.py index 69a8327173c96..b4d1f8154c866 100644 --- a/python/ray/_private/worker.py +++ b/python/ray/_private/worker.py @@ -1286,6 +1286,8 @@ def init( """ if configure_logging: setup_logger(logging_level, logging_format or ray_constants.LOGGER_FORMAT) + else: + logging.getLogger("ray").handlers.clear() # Parse the hidden options: _enable_object_reconstruction: bool = kwargs.pop( diff --git a/python/ray/autoscaler/_private/kuberay/run_autoscaler.py b/python/ray/autoscaler/_private/kuberay/run_autoscaler.py index 89896800d3d3c..a02598c5f1f7a 100644 --- a/python/ray/autoscaler/_private/kuberay/run_autoscaler.py +++ b/python/ray/autoscaler/_private/kuberay/run_autoscaler.py @@ -86,8 +86,16 @@ def _setup_logging() -> None: filename=ray_constants.MONITOR_LOG_FILE_NAME, # monitor.log max_bytes=ray_constants.LOGGING_ROTATE_BYTES, backup_count=ray_constants.LOGGING_ROTATE_BACKUP_COUNT, - logger_name="ray", # Root of the logging hierarchy for Ray code. ) - # Logs will also be written to the container's stdout. + + # For the autoscaler, the root logger _also_ needs to write to stderr, not just + # ray_constants.MONITOR_LOG_FILE_NAME. + level = logging.getLevelName(ray_constants.LOGGER_LEVEL.upper()) + stderr_handler = logging._StderrHandler() + stderr_handler.setFormatter(logging.Formatter(ray_constants.LOGGER_FORMAT)) + stderr_handler.setLevel(level) + logging.root.setLevel(level) + logging.root.addHandler(stderr_handler) + # The stdout handler was set up in the Ray CLI entry point. # See ray.scripts.scripts::cli(). diff --git a/python/ray/autoscaler/_private/monitor.py b/python/ray/autoscaler/_private/monitor.py index f15e109fc9d40..8db6404b09532 100644 --- a/python/ray/autoscaler/_private/monitor.py +++ b/python/ray/autoscaler/_private/monitor.py @@ -2,7 +2,7 @@ import argparse import json -import logging.handlers +import logging import os import signal import sys @@ -370,6 +370,7 @@ def update_resource_requests(self): def _run(self): """Run the monitor loop.""" + while True: try: gcs_request_start_time = time.time() diff --git a/python/ray/serve/tests/conftest.py b/python/ray/serve/tests/conftest.py index 9ef903494b954..b1e6812fa96cc 100644 --- a/python/ray/serve/tests/conftest.py +++ b/python/ray/serve/tests/conftest.py @@ -10,7 +10,7 @@ from ray import serve from ray._private.test_utils import wait_for_condition -from ray.tests.conftest import pytest_runtest_makereport # noqa +from ray.tests.conftest import pytest_runtest_makereport, propagate_logs # noqa # https://tools.ietf.org/html/rfc6335#section-6 MIN_DYNAMIC_PORT = 49152 diff --git a/python/ray/serve/tests/test_standalone.py b/python/ray/serve/tests/test_standalone.py index 65ed21cd61db3..c584074fd7217 100644 --- a/python/ray/serve/tests/test_standalone.py +++ b/python/ray/serve/tests/test_standalone.py @@ -735,7 +735,7 @@ def verify_snapshot(): assert hello_deployment["status"] == "RUNNING" -def test_serve_start_different_http_checkpoint_options_warning(caplog): +def test_serve_start_different_http_checkpoint_options_warning(propagate_logs, caplog): logger = logging.getLogger("ray.serve") caplog.set_level(logging.WARNING, logger="ray.serve") diff --git a/python/ray/tests/kuberay/Dockerfile b/python/ray/tests/kuberay/Dockerfile new file mode 100644 index 0000000000000..acbd041104d48 --- /dev/null +++ b/python/ray/tests/kuberay/Dockerfile @@ -0,0 +1,8 @@ +# Use the latest Ray master as base. +FROM rayproject/ray:nightly-py310 +# Invalidate the cache so that fresh code is pulled in the next step. +ARG BUILD_DATE +# Retrieve your development code. +ADD . ray +# Install symlinks to your modified Python code. +RUN python ray/python/ray/setup-dev.py -y diff --git a/python/ray/tests/kuberay/README.md b/python/ray/tests/kuberay/README.md index 6a33519034cb3..3c72dec1df37c 100644 --- a/python/ray/tests/kuberay/README.md +++ b/python/ray/tests/kuberay/README.md @@ -3,54 +3,39 @@ This page provides suggestions on running the test `test_autoscaling_e2e` locally. You might want to do this if your PR is breaking this test in CI and you want to debug why. -## Build a docker image with your code changes. -First, push your code changes to your git fork. -The Dockerfile below will work if you've only made Python changes. -```dockerfile -# Use the latest Ray master as base. -FROM rayproject/ray:nightly-py37 -# Invalidate the cache so that fresh code is pulled in the next step. -ARG BUILD_DATE -# Retrieve your development code. -RUN git clone -b https://github.com//ray -# Install symlinks to your modified Python code. -RUN python ray/python/ray/setup-dev.py -y -``` - -Build the image and push it to your docker account or registry. Assuming your Dockerfile is named "Dockerfile": -```shell -docker build --build-arg BUILD_DATE=$(date +%Y-%m-%d:%H:%M:%S) -t /: - < Dockerfile -docker push /: -``` - -## Setup Access to a Kubernetes cluster. -Gain access to a Kubernetes cluster. -The easiest thing to do is to use KinD. -```shell -brew install kind -kind create cluster -``` - -## Install master Ray -The test uses Ray client, so you should either -- install nightly Ray in your environment -- install Ray from source in your environment (`pip install -e`) - -Match your environment's Python version with the Ray image you are using. - -## Run the test. - -```shell -# Set up the operator. -python setup/setup_kuberay.py -# Run the test. -RAY_IMAGE= python test_autoscaling_e2e.py -# Tear RayClusters and operator down. -python setup/teardown_kuberay.py -``` +Running the test must happen in stages: + +1. Tear down any running `kind` cluster +2. Remove the existing ray docker image that will be deployed to the cluster +3. Build a new docker image containing the local ray repository +4. Create a new `kind` cluster +5. Load the docker image into the cluster +6. Set up kuberay +7. Run the test + +To help with this, there is a `Dockerfile` and a `rune2e.sh` bash script which +together run these things for you. + +## Test requirements + +1. Ensure `kind` and `kustomize` are both installed +2. Run `ray/autoscaler/kuberay/init-config.sh` to clone `ray-project/kuberay`, + which contains config files needed to set up kuberay. +3. Finally, make sure that the `Dockerfile` is using the same python version as + what you're using to run the test. By default, this dockerfile is built using + the `rayproject/ray:nightly-py310` build. + +Now you're ready to run the test. + +## Running the test + +Run `./rune2e.sh` to run the test. The test itself does not tear down resources on failure; you can -- examine a Ray cluster from a failed test (`kubectl get pod`, `kubectl get raycluster`) +- examine a Ray cluster from a failed test (`kubectl get pods`, `kubectl get pod`, `kubectl get raycluster`) +- view all logs (`kubectl logs `) or just logs associated with the autoscaler (`kubectl logs -c autoscaler`) - delete the Ray cluster (`kubectl delete raycluster -A`) -- rerun the test without tearing the operator down (`RAY_IMAGE= python test_autoscaling_e2e.py`) +- rerun the test without tearing the operator down (`RAY_IMAGE=/: python test_autoscaling_e2e.py`) - tear down the operator when you're done `python setup/teardown_kuberay.py` +- copy files from a pod to your filesystem (`kubectl cp :/path/to/file /target/path/in/local/filesystem`) +- access a bash prompt inside the pod (`kubectl exec -it bash`) diff --git a/python/ray/tests/kuberay/rune2e.sh b/python/ray/tests/kuberay/rune2e.sh new file mode 100755 index 0000000000000..491e93f1aa18e --- /dev/null +++ b/python/ray/tests/kuberay/rune2e.sh @@ -0,0 +1,15 @@ +#!/bin/bash + +set -x +RAY_IMAGE=rayproject/autoscaling_e2e_test_image +kind delete cluster +docker image rm $RAY_IMAGE + +pushd ../../../.. +docker build --build-arg BUILD_DATE="$(date +%Y-%m-%d:%H:%M:%S)" -t $RAY_IMAGE -f ./python/ray/tests/kuberay/Dockerfile . || exit +popd || exit + +kind create cluster || exit +kind load docker-image $RAY_IMAGE || exit +python setup/setup_kuberay.py +RAY_IMAGE=$RAY_IMAGE python test_autoscaling_e2e.py diff --git a/python/ray/tests/kuberay/setup/setup_kuberay.py b/python/ray/tests/kuberay/setup/setup_kuberay.py index 1d72e99713c91..8a8c100092981 100644 --- a/python/ray/tests/kuberay/setup/setup_kuberay.py +++ b/python/ray/tests/kuberay/setup/setup_kuberay.py @@ -1,10 +1,8 @@ from ray.tests.kuberay.utils import ( setup_kuberay_operator, wait_for_raycluster_crd, - setup_logging, ) if __name__ == "__main__": - setup_logging() setup_kuberay_operator() wait_for_raycluster_crd() diff --git a/python/ray/tests/kuberay/setup/teardown_kuberay.py b/python/ray/tests/kuberay/setup/teardown_kuberay.py index e7e82c23c7a18..d9cda64d81e3a 100644 --- a/python/ray/tests/kuberay/setup/teardown_kuberay.py +++ b/python/ray/tests/kuberay/setup/teardown_kuberay.py @@ -1,5 +1,4 @@ -from ray.tests.kuberay.utils import teardown_kuberay_operator, setup_logging +from ray.tests.kuberay.utils import teardown_kuberay_operator if __name__ == "__main__": - setup_logging() teardown_kuberay_operator() diff --git a/python/ray/tests/kuberay/test_autoscaling_e2e.py b/python/ray/tests/kuberay/test_autoscaling_e2e.py index 748c2a83a5450..73aac77fa9f72 100644 --- a/python/ray/tests/kuberay/test_autoscaling_e2e.py +++ b/python/ray/tests/kuberay/test_autoscaling_e2e.py @@ -15,7 +15,6 @@ get_raycluster, ray_client_port_forward, ray_job_submit, - setup_logging, switch_to_ray_parent_dir, kubectl_exec_python_script, kubectl_logs, @@ -40,7 +39,7 @@ # By default, use the same image for the autoscaler and Ray containers. AUTOSCALER_IMAGE = os.environ.get("AUTOSCALER_IMAGE", RAY_IMAGE) # Set to IfNotPresent in kind CI. -PULL_POLICY = os.environ.get("PULL_POLICY", "Always") +PULL_POLICY = os.environ.get("PULL_POLICY", "IfNotPresent") logger.info(f"Using image `{RAY_IMAGE}` for Ray containers.") logger.info(f"Using image `{AUTOSCALER_IMAGE}` for Autoscaler containers.") logger.info(f"Using pull policy `{PULL_POLICY}` for all images.") @@ -403,5 +402,4 @@ def testAutoscaling(self): import pytest import sys - setup_logging() sys.exit(pytest.main(["-vv", __file__])) diff --git a/python/ray/tests/kuberay/utils.py b/python/ray/tests/kuberay/utils.py index 345cc09bcf6d8..ef0016f3ca7b5 100644 --- a/python/ray/tests/kuberay/utils.py +++ b/python/ray/tests/kuberay/utils.py @@ -32,13 +32,6 @@ LOG_FORMAT = "[%(levelname)s %(asctime)s] " "%(filename)s: %(lineno)d " "%(message)s" -def setup_logging(): - logging.basicConfig( - level=logging.INFO, - format=LOG_FORMAT, - ) - - def switch_to_ray_parent_dir(): # Switch to parent of Ray repo, because that's what the doc examples do. logger.info("Switching to parent of Ray directory.") diff --git a/python/ray/tests/test_cli.py b/python/ray/tests/test_cli.py index 865099a5ab1f4..e4fd4530a0ced 100644 --- a/python/ray/tests/test_cli.py +++ b/python/ray/tests/test_cli.py @@ -169,6 +169,7 @@ def _die_on_error(result): def _debug_check_line_by_line(result, expected_lines): + """Print the result and expected output line-by-line.""" output_lines = result.output.split("\n") i = 0 @@ -195,7 +196,7 @@ def _debug_check_line_by_line(result, expected_lines): for line in expected_lines[i:]: print(repr(line)) - assert False + assert False, (result.output, expected_lines) @contextmanager diff --git a/python/ray/tests/test_get_or_create_actor.py b/python/ray/tests/test_get_or_create_actor.py index f91055a3e2484..17ec8692527bc 100644 --- a/python/ray/tests/test_get_or_create_actor.py +++ b/python/ray/tests/test_get_or_create_actor.py @@ -96,7 +96,7 @@ def do_run(name): if "local Ray instance" not in line and "The object store" not in line: out.append(line) valid = "".join(out) - assert valid.strip() == "DONE", out_str + assert "DONE" in valid, out_str if __name__ == "__main__": diff --git a/python/ray/tests/test_logging.py b/python/ray/tests/test_logging.py index 5ef89b6cc7616..33b0674c2e895 100644 --- a/python/ray/tests/test_logging.py +++ b/python/ray/tests/test_logging.py @@ -213,7 +213,12 @@ def f(): # Create a runtime env to make sure dashboard agent is alive. ray.get(f.options(runtime_env={"env_vars": {"A": "a", "B": "b"}}).remote()) - paths = list(log_dir_path.iterdir()) + # Filter out only paths that end in .log, .log.1, etc. + # These paths are handled by the logger; the others (.out, .err) are not. + paths = [] + for path in log_dir_path.iterdir(): + if re.search(r".*\.log(\.\d+)?", str(path)): + paths.append(path) def component_exist(component, paths): for path in paths: @@ -380,11 +385,11 @@ def print_after(_obj): assert msgs[0][0] == "done" -def test_log_redirect_to_stderr(shutdown_only, capfd): +def test_log_redirect_to_stderr(shutdown_only): log_components = { ray_constants.PROCESS_TYPE_DASHBOARD: "Dashboard head grpc address", - ray_constants.PROCESS_TYPE_DASHBOARD_AGENT: "Dashboard agent grpc address", + ray_constants.PROCESS_TYPE_DASHBOARD_AGENT: "", ray_constants.PROCESS_TYPE_GCS_SERVER: "Loading job table data", # No log monitor output if all components are writing to stderr. ray_constants.PROCESS_TYPE_LOG_MONITOR: "", @@ -438,7 +443,6 @@ def f(): # Make sure that the expected startup log records for each of the # components appears in the stderr stream. - # stderr = capfd.readouterr().err for component, canonical_record in log_components.items(): if not canonical_record: # Process not run or doesn't generate logs; skip. @@ -851,8 +855,8 @@ class MySubclass(MyClass): def test_ray_does_not_break_makeRecord(): - """Importing Ray used to cause `logging.makeRecord` to use the default record factory, - rather than the factory set by `logging.setRecordFactory`. + """Importing Ray used to cause `logging.makeRecord` to use the default record + factory, rather than the factory set by `logging.setRecordFactory`. This tests validates that this bug is fixed. """ @@ -874,6 +878,70 @@ def test_ray_does_not_break_makeRecord(): logging.setLogRecordFactory(logging.LogRecord) +@pytest.mark.parametrize( + "logger_name,package_name", + ( + ("ray", ""), + ("ray.air", "[Ray AIR]"), + ("ray.data", "[Ray Data]"), + ("ray.rllib", "[Ray RLlib]"), + ("ray.serve", "[Ray Serve]"), + ("ray.train", "[Ray Train]"), + ("ray.tune", "[Ray Tune]"), + ("ray.workflow", "[Ray Workflow]"), + ), +) +def test_log_library_context(propagate_logs, caplog, logger_name, package_name): + """Test that the log configuration injects the correct context into log messages.""" + logger = logging.getLogger(logger_name) + logger.critical("Test!") + + assert ( + caplog.records[-1].package == package_name + ), "Missing ray package name in log record." + + +@pytest.mark.parametrize( + "logger_name,logger_level", + ( + ("ray", logging.INFO), + ("ray.air", logging.INFO), + ("ray.data", logging.INFO), + ("ray.rllib", logging.WARNING), + ("ray.serve", logging.INFO), + ("ray.train", logging.INFO), + ("ray.tune", logging.INFO), + ("ray.workflow", logging.INFO), + ), +) +@pytest.mark.parametrize( + "test_level", + ( + logging.NOTSET, + logging.DEBUG, + logging.INFO, + logging.WARNING, + logging.ERROR, + logging.CRITICAL, + ), +) +def test_log_level_settings( + propagate_logs, caplog, logger_name, logger_level, test_level +): + """Test that logs of lower level than the ray subpackage is + configured for are rejected. + """ + logger = logging.getLogger(logger_name) + + logger.log(test_level, "Test!") + + if test_level >= logger_level: + assert caplog.records, "Log message missing where one is expected." + assert caplog.records[-1].levelno == test_level, "Log message level mismatch." + else: + assert len(caplog.records) == 0, "Log message found where none are expected." + + if __name__ == "__main__": import sys diff --git a/python/ray/tests/test_multi_node_3.py b/python/ray/tests/test_multi_node_3.py index 4c7b03ac8dd75..48634c16ea31b 100644 --- a/python/ray/tests/test_multi_node_3.py +++ b/python/ray/tests/test_multi_node_3.py @@ -24,7 +24,6 @@ def test_calling_start_ray_head(call_ray_stop_only): - # Test that we can call ray start with various command line # parameters. @@ -200,7 +199,6 @@ def test_calling_start_ray_head(call_ray_stop_only): def test_ray_start_non_head(call_ray_stop_only, monkeypatch): - # Test that we can call ray start to connect to an existing cluster. # Test starting Ray with a port specified. @@ -433,8 +431,7 @@ def f(): def test_multi_driver_logging(ray_start_regular): - address_info = ray_start_regular - address = address_info["address"] + address = ray_start_regular["address"] # ray.init(address=address) driver1_wait = Semaphore.options(name="driver1_wait").remote(value=0) @@ -479,10 +476,10 @@ def remote_print(s, file=None): """ p1 = run_string_as_driver_nonblocking( - driver_script_template.format(address, "driver1_wait", "1", "2") + driver_script_template.format(address, "driver1_wait", "message1", "message2") ) p2 = run_string_as_driver_nonblocking( - driver_script_template.format(address, "driver2_wait", "3", "4") + driver_script_template.format(address, "driver2_wait", "message3", "message4") ) ray.get(main_wait.acquire.remote()) @@ -492,29 +489,24 @@ def remote_print(s, file=None): ray.get(driver1_wait.release.remote()) ray.get(driver2_wait.release.remote()) - # At this point driver1 should receive '1' and driver2 '3' + # At this point driver1 should receive 'message1' and driver2 'message3' ray.get(main_wait.acquire.remote()) ray.get(main_wait.acquire.remote()) ray.get(driver1_wait.release.remote()) ray.get(driver2_wait.release.remote()) - # At this point driver1 should receive '2' and driver2 '4' + # At this point driver1 should receive 'message2' and driver2 'message4' ray.get(main_wait.acquire.remote()) ray.get(main_wait.acquire.remote()) driver1_out = p1.stdout.read().decode("ascii") driver2_out = p2.stdout.read().decode("ascii") - if sys.platform == "win32": - driver1_out = driver1_out.replace("\r", "") - driver2_out = driver2_out.replace("\r", "") - driver1_out_split = driver1_out.split("\n") - driver2_out_split = driver2_out.split("\n") - - assert driver1_out_split[0][-1] == "1", driver1_out_split - assert driver1_out_split[1][-1] == "2", driver1_out_split - assert driver2_out_split[0][-1] == "3", driver2_out_split - assert driver2_out_split[1][-1] == "4", driver2_out_split + + assert "message1" in driver1_out + assert "message2" in driver1_out + assert "message3" in driver2_out + assert "message4" in driver2_out @pytest.fixture diff --git a/python/ray/tests/test_output.py b/python/ray/tests/test_output.py index 9299cb4e5d4fd..2df9f53c9305f 100644 --- a/python/ray/tests/test_output.py +++ b/python/ray/tests/test_output.py @@ -36,19 +36,18 @@ def verbose(): assert out_str.count("[repeated 9x across cluster]") == 1 -def test_logger_config(): +def test_logger_config_with_ray_init(): + """Test that the logger is correctly configured when ray.init is called.""" + script = """ import ray ray.init(num_cpus=1) """ - proc = run_string_as_driver_nonblocking(script) - out_str = proc.stdout.read().decode("ascii") - err_str = proc.stderr.read().decode("ascii") - - print(out_str, err_str) - assert "INFO worker.py:" in err_str, err_str + out_str = run_string_as_driver(script) + assert "INFO" in out_str, out_str + assert "ray._private.worker" in out_str, out_str @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") @@ -501,17 +500,13 @@ def test_output_local_ray(): """ output = run_string_as_driver(script) lines = output.strip("\n").split("\n") - for line in lines: - print(line) lines = [line for line in lines if "The object store is using /tmp" not in line] - assert len(lines) == 1 - line = lines[0] - print(line) - assert "Started a local Ray instance." in line + assert len(lines) >= 1 + assert "Started a local Ray instance." in output if os.environ.get("RAY_MINIMAL") == "1": - assert "View the dashboard" not in line + assert "View the dashboard" not in output else: - assert "View the dashboard" in line + assert "View the dashboard" in output def test_output_ray_cluster(call_ray_start): @@ -521,15 +516,13 @@ def test_output_ray_cluster(call_ray_start): """ output = run_string_as_driver(script) lines = output.strip("\n").split("\n") - for line in lines: - print(line) - assert len(lines) == 2 - assert "Connecting to existing Ray cluster at address:" in lines[0] - assert "Connected to Ray cluster." in lines[1] + assert len(lines) >= 1 + assert "Connecting to existing Ray cluster at address:" in output + assert "Connected to Ray cluster." in output if os.environ.get("RAY_MINIMAL") == "1": - assert "View the dashboard" not in lines[1] + assert "View the dashboard" not in output else: - assert "View the dashboard" in lines[1] + assert "View the dashboard" in output @pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.") diff --git a/python/ray/tests/test_runtime_env_working_dir_2.py b/python/ray/tests/test_runtime_env_working_dir_2.py index ae4dd5de13e59..4afd286725069 100644 --- a/python/ray/tests/test_runtime_env_working_dir_2.py +++ b/python/ray/tests/test_runtime_env_working_dir_2.py @@ -4,10 +4,13 @@ import tempfile import pytest -from ray._private.test_utils import run_string_as_driver +from ray._private.test_utils import ( + chdir, + run_string_as_driver, +) + import ray -from ray._private.test_utils import chdir from ray._private.runtime_env import RAY_WORKER_DEV_EXCLUDES from ray._private.runtime_env.packaging import GCS_STORAGE_MAX_SIZE from ray.exceptions import RuntimeEnvSetupError @@ -147,7 +150,6 @@ def test_large_dir_upload_message(start_cluster, option): output = run_string_as_driver(driver_script) assert "Pushing file package" in output assert "Successfully pushed file package" in output - assert "warning" not in output.lower() # TODO(architkulkarni): Deflake and reenable this test. diff --git a/python/ray/tests/test_usage_stats.py b/python/ray/tests/test_usage_stats.py index 3786519c1aff6..cc95f39e32de6 100644 --- a/python/ray/tests/test_usage_stats.py +++ b/python/ray/tests/test_usage_stats.py @@ -1281,18 +1281,10 @@ def test_usage_report_disabled(monkeypatch, ray_start_cluster, reset_usage_stats if "dashboard.log" in str(path): with open(str(path), "r") as f: contents = f.readlines() + break assert contents is not None - - keyword_found = False - for c in contents: - if "Usage reporting is disabled" in c: - keyword_found = True - - # Make sure the module was disabled. - assert keyword_found - - for c in contents: - assert "Failed to report usage stats" not in c + assert any(["Usage reporting is disabled" in c for c in contents]) + assert all(["Failed to report usage stats" not in c for c in contents]) def test_usage_file_error_message(monkeypatch, ray_start_cluster, reset_usage_stats): diff --git a/python/ray/tests/test_widgets.py b/python/ray/tests/test_widgets.py index 32200b8857285..be1f8714c0e9d 100644 --- a/python/ray/tests/test_widgets.py +++ b/python/ray/tests/test_widgets.py @@ -6,7 +6,9 @@ @mock.patch("importlib.import_module") @mock.patch("ray.widgets.util.in_notebook") -def test_ensure_notebook_dep_missing(mock_in_notebook, mock_import_module, caplog): +def test_ensure_notebook_dep_missing( + mock_in_notebook, mock_import_module, propagate_logs, caplog +): """Test that missing notebook dependencies trigger a warning.""" class MockDep: @@ -32,7 +34,9 @@ def dummy_ipython_display(self): @mock.patch("importlib.import_module") @mock.patch("ray.widgets.util.in_notebook") -def test_ensure_notebook_dep_outdated(mock_in_notebook, mock_import_module, caplog): +def test_ensure_notebook_dep_outdated( + mock_in_notebook, mock_import_module, propagate_logs, caplog +): """Test that outdated notebook dependencies trigger a warning.""" class MockDep: @@ -54,7 +58,9 @@ def dummy_ipython_display(): @mock.patch("importlib.import_module") @mock.patch("ray.widgets.util.in_notebook") -def test_ensure_notebook_valid(mock_in_notebook, mock_import_module, caplog): +def test_ensure_notebook_valid( + mock_in_notebook, mock_import_module, propagate_logs, caplog +): """Test that valid notebook dependencies don't trigger a warning.""" class MockDep: diff --git a/python/ray/train/tests/test_transformers_gpu.py b/python/ray/train/tests/test_transformers_gpu.py index 584a29028f7f0..1780b9b812ee4 100644 --- a/python/ray/train/tests/test_transformers_gpu.py +++ b/python/ray/train/tests/test_transformers_gpu.py @@ -43,7 +43,7 @@ def __init__(self, pipeline=None, preprocessor=None, use_gpu: bool = False): # TODO(ml-team): Add np.ndarray to batch_type @pytest.mark.parametrize("batch_type", [pd.DataFrame]) @pytest.mark.parametrize("device", [None, 0]) -def test_predict_batch(ray_start_4_cpus, caplog, batch_type, device): +def test_predict_batch(ray_start_4_cpus, batch_type, device): checkpoint = create_checkpoint() kwargs = {} diff --git a/python/ray/tune/automl/search_policy.py b/python/ray/tune/automl/search_policy.py index deb46da332ef1..497235980e6b8 100644 --- a/python/ray/tune/automl/search_policy.py +++ b/python/ray/tune/automl/search_policy.py @@ -181,7 +181,7 @@ def on_trial_complete(self, trial_id, result=None, error=False): "reward_attr": self.reward_attr, "reward": self.best_trial.best_result[self.reward_attr] if self.best_trial - else None, + else 0, }, ) diff --git a/python/ray/tune/tests/test_commands.py b/python/ray/tune/tests/test_commands.py index 4148aee1125c2..07d098416f4ec 100644 --- a/python/ray/tune/tests/test_commands.py +++ b/python/ray/tune/tests/test_commands.py @@ -4,6 +4,7 @@ import subprocess import sys import time +from unittest import mock try: from cStringIO import StringIO @@ -61,7 +62,11 @@ def test_time(start_ray, tmpdir): assert sum(times) / len(times) < 7.0, "CLI is taking too long!" -def test_ls(start_ray, tmpdir): +@mock.patch( + "ray.tune.cli.commands.print_format_output", + wraps=ray.tune.cli.commands.print_format_output, +) +def test_ls(mock_print_format_output, start_ray, tmpdir): """This test captures output of list_trials.""" experiment_name = "test_ls" experiment_path = os.path.join(str(tmpdir), experiment_name) @@ -76,23 +81,26 @@ def test_ls(start_ray, tmpdir): columns = ["episode_reward_mean", "training_iteration", "trial_id"] limit = 2 - with Capturing() as output: - commands.list_trials(experiment_path, info_keys=columns, limit=limit) - lines = output.captured - - assert all(col in lines[1] for col in columns) - assert lines[1].count("|") == len(columns) + 1 - assert len(lines) == 3 + limit + 1 - - with Capturing() as output: - commands.list_trials( - experiment_path, - sort=["trial_id"], - info_keys=("trial_id", "training_iteration"), - filter_op="training_iteration == 1", - ) - lines = output.captured - assert len(lines) == 3 + num_samples + 1 + commands.list_trials(experiment_path, info_keys=columns, limit=limit) + + # The dataframe that is printed as a table is the first arg of the last + # call made to `ray.tune.cli.commands.print_format_output`. + mock_print_format_output.assert_called() + args, _ = mock_print_format_output.call_args_list[-1] + df = args[0] + assert sorted(df.columns.to_list()) == sorted(columns), df + assert len(df.index) == limit, df + + commands.list_trials( + experiment_path, + sort=["trial_id"], + info_keys=("trial_id", "training_iteration"), + filter_op="training_iteration == 1", + ) + args, _ = mock_print_format_output.call_args_list[-1] + df = args[0] + assert sorted(df.columns.to_list()) == sorted(["trial_id", "training_iteration"]) + assert len(df.index) == num_samples with pytest.raises(click.ClickException): commands.list_trials( @@ -103,7 +111,11 @@ def test_ls(start_ray, tmpdir): commands.list_trials(experiment_path, info_keys=("asdf",)) -def test_ls_with_cfg(start_ray, tmpdir): +@mock.patch( + "ray.tune.cli.commands.print_format_output", + wraps=ray.tune.cli.commands.print_format_output, +) +def test_ls_with_cfg(mock_print_format_output, start_ray, tmpdir): experiment_name = "test_ls_with_cfg" experiment_path = os.path.join(str(tmpdir), experiment_name) tune.run( @@ -116,12 +128,16 @@ def test_ls_with_cfg(start_ray, tmpdir): columns = [CONFIG_PREFIX + "/test_variable", "trial_id"] limit = 4 - with Capturing() as output: - commands.list_trials(experiment_path, info_keys=columns, limit=limit) - lines = output.captured - assert all(col in lines[1] for col in columns) - assert lines[1].count("|") == len(columns) + 1 - assert len(lines) == 3 + limit + 1 + + commands.list_trials(experiment_path, info_keys=columns, limit=limit) + + # The dataframe that is printed as a table is the first arg of the last + # call made to `ray.tune.cli.commands.print_format_output`. + mock_print_format_output.assert_called() + args, _ = mock_print_format_output.call_args_list[-1] + df = args[0] + assert sorted(df.columns.to_list()) == sorted(columns), df + assert len(df.index) == limit, df def test_lsx(start_ray, tmpdir): diff --git a/python/ray/tune/tests/test_syncer.py b/python/ray/tune/tests/test_syncer.py index e3bff59fb67e7..c9c1bef9857b0 100644 --- a/python/ray/tune/tests/test_syncer.py +++ b/python/ray/tune/tests/test_syncer.py @@ -31,6 +31,17 @@ from ray.tune.utils.file_transfer import _pack_dir, _unpack_dir +@pytest.fixture +def propagate_logs(): + # Ensure that logs are propagated to ancestor handles. This is required if using the + # caplog fixture with Ray's logging. + # NOTE: This only enables log propagation in the driver process, not the workers! + logger = logging.getLogger("ray") + logger.propagate = True + yield + logger.propagate = False + + @pytest.fixture def ray_start_4_cpus(): address_info = ray.init(num_cpus=4, configure_logging=False) @@ -470,7 +481,7 @@ def sync_up(): syncer.wait() -def test_syncer_not_running_sync_last_failed(caplog, temp_data_dirs): +def test_syncer_not_running_sync_last_failed(propagate_logs, caplog, temp_data_dirs): """Check that new sync is issued if old sync completed""" caplog.set_level(logging.WARNING) diff --git a/python/ray/tune/tests/test_syncer_callback.py b/python/ray/tune/tests/test_syncer_callback.py index bd34291438957..eb49e00acd8d4 100644 --- a/python/ray/tune/tests/test_syncer_callback.py +++ b/python/ray/tune/tests/test_syncer_callback.py @@ -27,6 +27,17 @@ from ray.tune.utils.file_transfer import sync_dir_between_nodes +@pytest.fixture +def propagate_logs(): + # Ensure that logs are propagated to ancestor handles. This is required if using the + # caplog fixture with Ray's logging. + # NOTE: This only enables log propagation in the driver process, not the workers! + logger = logging.getLogger("ray") + logger.propagate = True + yield + logger.propagate = False + + @pytest.fixture def ray_start_2_cpus(): address_info = ray.init(num_cpus=2, configure_logging=False) @@ -439,7 +450,9 @@ def test_syncer_callback_wait_for_all_error(ray_start_2_cpus, temp_data_dirs): assert "At least one" in e -def test_syncer_callback_log_error(caplog, ray_start_2_cpus, temp_data_dirs): +def test_syncer_callback_log_error( + propagate_logs, caplog, ray_start_2_cpus, temp_data_dirs +): """Check that errors in a previous sync are logged correctly""" caplog.set_level(logging.ERROR, logger="ray.tune.syncer") @@ -477,7 +490,9 @@ def test_syncer_callback_log_error(caplog, ray_start_2_cpus, temp_data_dirs): assert_file(True, tmp_target, "level0.txt") -def test_syncer_callback_dead_node_log_error(caplog, ray_start_2_cpus, temp_data_dirs): +def test_syncer_callback_dead_node_log_error( + propagate_logs, caplog, ray_start_2_cpus, temp_data_dirs +): """Check that we catch + log errors when trying syncing with a dead remote node.""" caplog.set_level(logging.ERROR, logger="ray.tune.syncer") diff --git a/python/ray/tune/tests/test_tuner_restore.py b/python/ray/tune/tests/test_tuner_restore.py index fcc9fb566618b..46b29a4e5771d 100644 --- a/python/ray/tune/tests/test_tuner_restore.py +++ b/python/ray/tune/tests/test_tuner_restore.py @@ -36,6 +36,17 @@ from ray.tune.tuner import Tuner +@pytest.fixture +def propagate_logs(): + # Ensure that logs are propagated to ancestor handles. This is required if using the + # caplog fixture with Ray's logging. + # NOTE: This only enables log propagation in the driver process, not the workers! + logger = logging.getLogger("ray") + logger.propagate = True + yield + logger.propagate = False + + @pytest.fixture def ray_start_2_cpus(): address_info = ray.init(num_cpus=2, configure_logging=False) @@ -1166,7 +1177,7 @@ def train_fn(config): assert r.config["test2"].name in ["11", "12", "13", "14"] -def test_tuner_pkl_backwards_compatibility(tmp_path, caplog): +def test_tuner_pkl_backwards_compatibility(tmp_path, propagate_logs, caplog): tuner_internal = Tuner( _train_fn_sometimes_failing, param_space={"a": 1} )._local_tuner