Skip to content

Commit

Permalink
[Datasets] Emit warning when starting Dataset execution with no CPU r…
Browse files Browse the repository at this point in the history
…esources available (#31574)

The existing Ray/Ray autoscaler warnings for a cluster with no available CPU resources is pretty generic, and can be difficult for users to understand and debug. We check at the start of Dataset execution if there are no available CPU resources, and emit a more detailed warning message with a [helpful link to documentation](https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune).
  • Loading branch information
scottjlee authored Jan 12, 2023
1 parent 251d236 commit 6d00593
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 2 deletions.
12 changes: 10 additions & 2 deletions python/ray/data/_internal/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,17 @@ def execute(
Returns:
The blocks of the output dataset.
"""
context = DatasetContext.get_current()
if not ray.available_resources().get("CPU"):
logger.get_logger().warning(
"Warning: The Ray cluster currently does not have "
"any available CPUs. The Dataset job will hang unless more CPUs "
"are freed up. A common reason is that cluster resources are "
"used by Actors or Tune trials; see the following link "
"for more details: "
"https://docs.ray.io/en/master/data/dataset-internals.html#datasets-and-tune" # noqa: E501
)
if not self.has_computed_output():
context = DatasetContext.get_current()

# Read stage is handled with the legacy execution impl for now.
if (
context.new_execution_backend
Expand Down
51 changes: 51 additions & 0 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import random
import signal
import time
from unittest.mock import patch

import numpy as np
import pandas as pd
Expand All @@ -14,6 +15,7 @@
import ray
from ray._private.test_utils import wait_for_condition
from ray.air.util.tensor_extensions.arrow import ArrowVariableShapedTensorType
from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.stats import _StatsActor
from ray.data._internal.arrow_block import ArrowRow
from ray.data._internal.block_builder import BlockBuilder
Expand Down Expand Up @@ -5411,6 +5413,55 @@ def test_ragged_tensors(ray_start_regular_shared):
]


class LoggerWarningCalled(Exception):
"""Custom exception used in test_warning_execute_with_no_cpu() and
test_nowarning_execute_with_cpu(). Raised when the `logger.warning` method
is called, so that we can kick out of `plan.execute()` by catching this Exception
and check logging was done properly."""

pass


def test_warning_execute_with_no_cpu(ray_start_cluster):
"""Tests ExecutionPlan.execute() to ensure a warning is logged
when no CPU resources are available."""
# Create one node with no CPUs to trigger the Dataset warning
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)

logger = DatasetLogger("ray.data._internal.plan").get_logger()
with patch.object(
logger,
"warning",
side_effect=LoggerWarningCalled,
) as mock_logger:
try:
ds = ray.data.range(10)
ds = ds.map_batches(lambda x: x)
ds.take()
except LoggerWarningCalled:
logger_args, logger_kwargs = mock_logger.call_args
assert "Warning: The Ray cluster currently does not have " in logger_args[0]


def test_nowarning_execute_with_cpu(ray_start_cluster_init):
"""Tests ExecutionPlan.execute() to ensure no warning is logged
when there are available CPU resources."""
# Create one node with CPUs to avoid triggering the Dataset warning
ray.init(ray_start_cluster_init.address)

logger = DatasetLogger("ray.data._internal.plan").get_logger()
with patch.object(
logger,
"warning",
side_effect=LoggerWarningCalled,
) as mock_logger:
ds = ray.data.range(10)
ds = ds.map_batches(lambda x: x)
ds.take()
mock_logger.assert_not_called()


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit 6d00593

Please sign in to comment.