Skip to content

Commit

Permalink
[Core] Fix check failure due to negative available resource (ray-proj…
Browse files Browse the repository at this point in the history
…ect#50517)

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
  • Loading branch information
jjyao authored and xsuler committed Mar 4, 2025
1 parent 4d3cc25 commit d0306df
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 20 deletions.
1 change: 1 addition & 0 deletions src/ray/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ ray_cc_library(
":runtime_env",
"//:node_manager_fbs",
"//src/ray/util",
"//src/ray/util:container_util",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
"@com_google_absl//absl/strings",
Expand Down
17 changes: 17 additions & 0 deletions src/ray/common/bundle_spec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,23 @@ std::string GetOriginalResourceNameFromWildcardResource(const std::string &resou
}
}

bool IsCPUOrPlacementGroupCPUResource(ResourceID resource_id) {
// Check whether the resource is CPU resource or CPU resource inside PG.
if (resource_id == ResourceID::CPU()) {
return true;
}

auto possible_pg_resource = ParsePgFormattedResource(resource_id.Binary(),
/*for_wildcard_resource*/ true,
/*for_indexed_resource*/ true);
if (possible_pg_resource.has_value() &&
possible_pg_resource->original_resource == ResourceID::CPU().Binary()) {
return true;
}

return false;
}

std::optional<PgFormattedResourceData> ParsePgFormattedResource(
const std::string &resource, bool for_wildcard_resource, bool for_indexed_resource) {
// Check if it is a wildcard pg resource.
Expand Down
4 changes: 4 additions & 0 deletions src/ray/common/bundle_spec.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ std::string GetOriginalResourceName(const std::string &resource);
// Returns "" if the resource is not a wildcard resource.
std::string GetOriginalResourceNameFromWildcardResource(const std::string &resource);

/// Return whether the resource specified by the resource_id is a CPU resource
/// or CPU resource inside a placement group.
bool IsCPUOrPlacementGroupCPUResource(ResourceID resource_id);

/// Parse the given resource and get the pg related information.
///
/// \param resource name of the resource.
Expand Down
11 changes: 10 additions & 1 deletion src/ray/common/scheduling/resource_instance_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <utility>

#include "ray/common/bundle_spec.h"
#include "ray/util/container_util.h"
#include "ray/util/logging.h"

namespace ray {
Expand Down Expand Up @@ -371,7 +372,15 @@ void NodeResourceInstanceSet::AllocateWithReference(
RAY_CHECK_EQ(available.size(), ref_allocation.size());

for (size_t i = 0; i < ref_allocation.size(); i++) {
RAY_CHECK_GE(available[i], ref_allocation[i]);
if (available[i] < ref_allocation[i]) {
// Only CPU resource can go negative due to the behavior
// that ray.get() will temporarily release the CPU resource.
// See https://github.com/ray-project/ray/pull/50517.
RAY_CHECK(IsCPUOrPlacementGroupCPUResource(resource_id))
<< "Resource " << resource_id.Binary()
<< " has less availability than requested. Available: "
<< debug_string(available) << ", requested: " << debug_string(ref_allocation);
}
available[i] -= ref_allocation[i];
}

Expand Down
2 changes: 1 addition & 1 deletion src/ray/common/scheduling/resource_instance_set.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class NodeResourceInstanceSet {
///
/// The function assumes and also verifies that (1) the resource_id exists in the
/// node; (2) the available resources with resource_id on the node can satisfy the
/// provided ref_allocation.
/// provided ref_allocation (with the exception of CPU resources which can go negative).
///
/// \param ref_allocation: The reference allocation used to allocate the resource_id
/// \param resource_id: The id of the resource to be allocated
Expand Down
17 changes: 0 additions & 17 deletions src/ray/raylet/local_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,6 @@
namespace ray {
namespace raylet {

bool IsCPUOrPlacementGroupCPUResource(ResourceID resource_id) {
// Check whether the resource is CPU resource or CPU resource inside PG.
if (resource_id == ResourceID::CPU()) {
return true;
}

auto possible_pg_resource = ParsePgFormattedResource(resource_id.Binary(),
/*for_wildcard_resource*/ true,
/*for_indexed_resource*/ true);
if (possible_pg_resource.has_value() &&
possible_pg_resource->original_resource == ResourceID::CPU().Binary()) {
return true;
}

return false;
}

LocalTaskManager::LocalTaskManager(
const NodeID &self_node_id,
ClusterResourceScheduler &cluster_resource_scheduler,
Expand Down
46 changes: 45 additions & 1 deletion src/ray/raylet/scheduling/cluster_task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1829,7 +1829,51 @@ TEST_F(ClusterTaskManagerTest, FeasibleToNonFeasible) {
task1.GetTaskSpecification().TaskId());
}

TEST_F(ClusterTaskManagerTestWithGPUsAtHead, RleaseAndReturnWorkerCpuResources) {
TEST_F(ClusterTaskManagerTest, NegativePlacementGroupCpuResources) {
// Add PG CPU resources.
scheduler_->GetLocalResourceManager().AddLocalResourceInstances(
scheduling::ResourceID("CPU_group_aaa"), std::vector<FixedPoint>{FixedPoint(2)});
scheduler_->GetLocalResourceManager().AddLocalResourceInstances(
scheduling::ResourceID("CPU_group_0_aaa"), std::vector<FixedPoint>{FixedPoint(1)});
scheduler_->GetLocalResourceManager().AddLocalResourceInstances(
scheduling::ResourceID("CPU_group_1_aaa"), std::vector<FixedPoint>{FixedPoint(1)});

const NodeResources &node_resources =
scheduler_->GetClusterResourceManager().GetNodeResources(
scheduling::NodeID(id_.Binary()));

auto worker1 = std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
auto allocated_instances = std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(scheduler_->GetLocalResourceManager().AllocateLocalTaskResources(
{{"CPU_group_aaa", 1.}, {"CPU_group_0_aaa", 1.}}, allocated_instances));
worker1->SetAllocatedInstances(allocated_instances);
// worker1 calls ray.get() and release the CPU resource
ASSERT_TRUE(local_task_manager_->ReleaseCpuResourcesFromBlockedWorker(worker1));

// the released CPU resource is acquired by worker2
auto worker2 = std::make_shared<MockWorker>(WorkerID::FromRandom(), 5678);
allocated_instances = std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(scheduler_->GetLocalResourceManager().AllocateLocalTaskResources(
{{"CPU_group_aaa", 1.}, {"CPU_group_0_aaa", 1.}}, allocated_instances));
worker2->SetAllocatedInstances(allocated_instances);

// ray.get() returns and worker1 acquires the CPU resource again
ASSERT_TRUE(local_task_manager_->ReturnCpuResourcesToUnblockedWorker(worker1));
ASSERT_EQ(node_resources.available.Get(scheduling::ResourceID("CPU_group_aaa")), 0);
ASSERT_EQ(node_resources.available.Get(scheduling::ResourceID("CPU_group_0_aaa")), -1);
ASSERT_EQ(node_resources.available.Get(scheduling::ResourceID("CPU_group_1_aaa")), 1);

auto worker3 = std::make_shared<MockWorker>(WorkerID::FromRandom(), 7678);
allocated_instances = std::make_shared<TaskResourceInstances>();
ASSERT_TRUE(scheduler_->GetLocalResourceManager().AllocateLocalTaskResources(
{{"CPU_group_aaa", 1.}, {"CPU_group_1_aaa", 1.}}, allocated_instances));
worker3->SetAllocatedInstances(allocated_instances);
ASSERT_EQ(node_resources.available.Get(scheduling::ResourceID("CPU_group_aaa")), -1);
ASSERT_EQ(node_resources.available.Get(scheduling::ResourceID("CPU_group_0_aaa")), -1);
ASSERT_EQ(node_resources.available.Get(scheduling::ResourceID("CPU_group_1_aaa")), 0);
}

TEST_F(ClusterTaskManagerTestWithGPUsAtHead, ReleaseAndReturnWorkerCpuResources) {
// Add PG CPU and GPU resources.
scheduler_->GetLocalResourceManager().AddLocalResourceInstances(
scheduling::ResourceID("CPU_group_aaa"), std::vector<FixedPoint>{FixedPoint(1)});
Expand Down

0 comments on commit d0306df

Please sign in to comment.