Skip to content

Commit

Permalink
[Data] Implement accurate memory accounting for all-to-all operations (
Browse files Browse the repository at this point in the history
…#50290)

`AllToAllOperator` and `ZipOperator` don't implement accurate memory. As
a result, if a plan contains either of these operators, the streaming
executors falls back to the legacy scheduling algorithm.

Fixes #48104

---------

Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu>
  • Loading branch information
bveeramani authored and israbbani committed Feb 25, 2025
1 parent d8c012e commit 330723c
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 294 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,23 +94,35 @@ def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert not self.completed()
assert input_index == 0, input_index
self._input_buffer.append(refs)
self._metrics.on_input_queued(refs)

def all_inputs_done(self) -> None:
ctx = TaskContext(
task_idx=self._next_task_index,
sub_progress_bar_dict=self._sub_progress_bar_dict,
target_max_block_size=self.actual_target_max_block_size,
)
# NOTE: We don't account object store memory use from intermediate `bulk_fn`
# outputs (e.g., map outputs for map-reduce).
self._output_buffer, self._stats = self._bulk_fn(self._input_buffer, ctx)

while self._input_buffer:
refs = self._input_buffer.pop()
self._metrics.on_input_dequeued(refs)

for ref in self._output_buffer:
self._metrics.on_output_queued(ref)

self._next_task_index += 1
self._input_buffer.clear()

super().all_inputs_done()

def has_next(self) -> bool:
return len(self._output_buffer) > 0

def _get_next_inner(self) -> RefBundle:
bundle = self._output_buffer.pop(0)
self._metrics.on_output_dequeued(bundle)
self._output_rows += bundle.num_rows()
return bundle

Expand Down Expand Up @@ -152,6 +164,9 @@ def close_sub_progress_bars(self):
def supports_fusion(self):
return True

def implements_accurate_memory_accounting(self):
return True


class NAryOperator(PhysicalOperator):
"""An operator that has multiple input dependencies and one output.
Expand Down
21 changes: 18 additions & 3 deletions python/ray/data/_internal/execution/operators/zip_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,26 +74,41 @@ def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
assert input_index == 0 or input_index == 1, input_index
if input_index == 0:
self._left_buffer.append(refs)
self._metrics.on_input_queued(refs)
else:
self._right_buffer.append(refs)
self._metrics.on_input_queued(refs)

def all_inputs_done(self) -> None:
self._output_buffer, self._stats = self._zip(
self._left_buffer, self._right_buffer
)
self._left_buffer.clear()
self._right_buffer.clear()

while self._left_buffer:
refs = self._left_buffer.pop()
self._metrics.on_input_dequeued(refs)
while self._right_buffer:
refs = self._right_buffer.pop()
self._metrics.on_input_dequeued(refs)
for ref in self._output_buffer:
self._metrics.on_output_queued(ref)

super().all_inputs_done()

def has_next(self) -> bool:
return len(self._output_buffer) > 0

def _get_next_inner(self) -> RefBundle:
return self._output_buffer.pop(0)
refs = self._output_buffer.pop(0)
self._metrics.on_output_dequeued(refs)
return refs

def get_stats(self) -> StatsDict:
return self._stats

def implements_accurate_memory_accounting(self):
return True

def _zip(
self, left_input: List[RefBundle], right_input: List[RefBundle]
) -> Tuple[List[RefBundle], StatsDict]:
Expand Down
31 changes: 23 additions & 8 deletions python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import math
import os
import time
from abc import ABC, abstractmethod
Expand All @@ -10,7 +11,11 @@
ExecutionResources,
)
from ray.data._internal.execution.interfaces.physical_operator import PhysicalOperator
from ray.data._internal.execution.operators.base_physical_operator import (
AllToAllOperator,
)
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
from ray.data._internal.execution.operators.zip_operator import ZipOperator
from ray.data._internal.execution.util import memory_string
from ray.data.context import DataContext

Expand All @@ -21,6 +26,10 @@
logger = logging.getLogger(__name__)
DEBUG_RESOURCE_MANAGER = os.environ.get("RAY_DATA_DEBUG_RESOURCE_MANAGER", "0") == "1"

# These are physical operators that must receive all inputs before they start
# processing data.
MATERIALIZING_OPERATORS = (AllToAllOperator, ZipOperator)


class ResourceManager:
"""A class that manages the resource usage of a streaming executor."""
Expand Down Expand Up @@ -241,14 +250,6 @@ def get_op_usage_str(self, op: PhysicalOperator) -> str:
usage_str += f",object store={budget.object_store_memory_str()})"
return usage_str

def get_downstream_fraction(self, op: PhysicalOperator) -> float:
"""Return the downstream fraction of the given operator."""
return self._downstream_fraction[op]

def get_downstream_object_store_memory(self, op: PhysicalOperator) -> float:
"""Return the downstream object store memory usage of the given operator."""
return self._downstream_object_store_memory[op]

def op_resource_allocator_enabled(self) -> bool:
"""Return whether OpResourceAllocator is enabled."""
return self._op_resource_allocator is not None
Expand Down Expand Up @@ -548,6 +549,9 @@ def max_task_output_bytes_to_read(self, op: PhysicalOperator) -> Optional[int]:
# Add the remaining of `_reserved_for_op_outputs`.
op_outputs_usage = self._get_op_outputs_usage_with_downstream(op)
res += max(self._reserved_for_op_outputs[op] - op_outputs_usage, 0)
if math.isinf(res):
return None

res = int(res)
assert res >= 0
if res == 0 and self._should_unblock_streaming_output_backpressure(op):
Expand Down Expand Up @@ -649,3 +653,14 @@ def update_usages(self):
# We don't limit GPU resources, as not all operators
# use GPU resources.
self._op_budgets[op].gpu = float("inf")

# A materializing operator like `AllToAllOperator` waits for all its input
# operator’s outputs before processing data. This often forces the input
# operator to exceed its object store memory budget. To prevent deadlock, we
# disable object store memory backpressure for the input operator.
for op in eligible_ops:
if any(
isinstance(next_op, MATERIALIZING_OPERATORS)
for next_op in op.output_dependencies
):
self._op_budgets[op].object_store_memory = float("inf")
81 changes: 4 additions & 77 deletions python/ray/data/_internal/execution/streaming_executor_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"""

import logging
import math
import threading
import time
from collections import defaultdict
Expand All @@ -17,7 +16,6 @@
from ray.data._internal.execution.bundle_queue import create_bundle_queue
from ray.data._internal.execution.interfaces import (
ExecutionOptions,
ExecutionResources,
PhysicalOperator,
RefBundle,
)
Expand Down Expand Up @@ -553,12 +551,10 @@ def select_operator_to_run(
# Filter to ops that are eligible for execution.
ops = []
for op, state in topology.items():
if resource_manager.op_resource_allocator_enabled():
under_resource_limits = (
resource_manager.op_resource_allocator.can_submit_new_task(op)
)
else:
under_resource_limits = _execution_allowed(op, resource_manager)
assert resource_manager.op_resource_allocator_enabled(), topology
under_resource_limits = (
resource_manager.op_resource_allocator.can_submit_new_task(op)
)
in_backpressure = not under_resource_limits or any(
not p.can_add_input(op) for p in backpressure_policies
)
Expand Down Expand Up @@ -609,72 +605,3 @@ def select_operator_to_run(
topology[selected_op]._scheduling_status.selected = True
autoscaler.try_trigger_scaling()
return selected_op


def _execution_allowed(op: PhysicalOperator, resource_manager: ResourceManager) -> bool:
"""Return whether an operator is allowed to execute given resource usage.
Operators are throttled globally based on CPU and GPU limits for the stream.
For an N operator DAG, we only throttle the kth operator (in the source-to-sink
ordering) on object store utilization if the cumulative object store utilization
for the kth operator and every operator downstream from it is greater than
k/N * global_limit; i.e., the N - k operator sub-DAG is using more object store
memory than it's share.
Args:
op: The operator to check.
resource_manager: The ResourceManager of the current dataset.
Returns:
Whether the op is allowed to run.
"""
if op.throttling_disabled():
return True

global_usage = resource_manager.get_global_usage()
global_limits = resource_manager.get_global_limits()

# To avoid starvation problems when dealing with fractional resource types,
# convert all quantities to integer (0 or 1) for deciding admissibility. This
# allows operators with non-integral requests to slightly overshoot the limit.
global_floored = ExecutionResources(
cpu=math.floor(global_usage.cpu or 0),
gpu=math.floor(global_usage.gpu or 0),
object_store_memory=global_usage.object_store_memory,
)
inc_resource_req = op.incremental_resource_usage()
if inc_resource_req.cpu and inc_resource_req.gpu:
raise NotImplementedError(
"Operator incremental resource usage cannot specify both CPU "
"and GPU at the same time, since it may cause deadlock."
)

# NOTE: Resources requests are clamped at 1.0, to allow scheduling of operators
# with larger resource requirements in tight environment
inc_resource_req_adjusted = ExecutionResources(
cpu=min(1.0, inc_resource_req.cpu),
gpu=min(1.0, inc_resource_req.gpu),
object_store_memory=0,
)

# Under global limits; always allow.
new_usage = global_floored.add(inc_resource_req_adjusted)
if new_usage.satisfies_limit(global_limits):
return True

# We're over global limits, but execution may still be allowed if memory is the
# only bottleneck and this wouldn't impact downstream memory limits. This avoids
# stalling the execution for memory bottlenecks that occur upstream.
# See for more context: https://github.com/ray-project/ray/pull/32673
global_limits_sans_memory = ExecutionResources.for_limits(
cpu=global_limits.cpu, gpu=global_limits.gpu
)
global_ok_sans_memory = new_usage.satisfies_limit(global_limits_sans_memory)
downstream_memory = resource_manager.get_downstream_object_store_memory(op)
downstream_limit = global_limits.scale(resource_manager.get_downstream_fraction(op))
downstream_memory_ok = ExecutionResources(
object_store_memory=downstream_memory
).satisfies_limit(downstream_limit)

return global_ok_sans_memory and downstream_memory_ok
Loading

0 comments on commit 330723c

Please sign in to comment.