Skip to content

Commit

Permalink
[core] Remove GCS ReportResourceUsage RPC (#40808)
Browse files Browse the repository at this point in the history
Signed-off-by: vitsai <vitsai@cs.stanford.edu>
  • Loading branch information
vitsai authored Nov 1, 2023
1 parent 5f25ff4 commit c85e15a
Show file tree
Hide file tree
Showing 17 changed files with 15 additions and 203 deletions.
5 changes: 0 additions & 5 deletions src/mock/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,6 @@ class MockNodeResourceInfoAccessor : public NodeResourceInfoAccessor {
(const MultiItemCallback<rpc::AvailableResources> &callback),
(override));
MOCK_METHOD(void, AsyncResubscribe, (), (override));
MOCK_METHOD(Status,
AsyncReportResourceUsage,
(const std::shared_ptr<rpc::ResourcesData> &data_ptr,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status,
AsyncGetAllResourceUsage,
(const ItemCallback<rpc::ResourceUsageBatchData> &callback),
Expand Down
6 changes: 0 additions & 6 deletions src/mock/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ class MockGcsResourceManager : public GcsResourceManager {
rpc::GetAllAvailableResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandleReportResourceUsage,
(rpc::ReportResourceUsageRequest request,
rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandleGetAllResourceUsage,
(rpc::GetAllResourceUsageRequest request,
Expand Down
42 changes: 0 additions & 42 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -731,48 +731,6 @@ Status NodeResourceInfoAccessor::AsyncGetDrainingNodes(
return Status::OK();
}

Status NodeResourceInfoAccessor::AsyncReportResourceUsage(
const std::shared_ptr<rpc::ResourcesData> &data_ptr, const StatusCallback &callback) {
absl::MutexLock lock(&mutex_);
last_resource_usage_ = std::make_shared<NodeResources>(
ResourceMapToNodeResources(MapFromProtobuf(data_ptr->resources_total()),
MapFromProtobuf(data_ptr->resources_available())));
cached_resource_usage_.mutable_resources()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportResourceUsage(
cached_resource_usage_,
[callback](const Status &status, const rpc::ReportResourceUsageReply &reply) {
if (callback) {
callback(status);
}
});
return Status::OK();
}

void NodeResourceInfoAccessor::FillResourceUsageRequest(
rpc::ReportResourceUsageRequest &resources) {
NodeResources cached_resources = *GetLastResourceUsage();

auto resources_data = resources.mutable_resources();
resources_data->clear_resources_total();
for (const auto &resource_pair : cached_resources.total.GetResourceMap()) {
(*resources_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}

resources_data->clear_resources_available();
for (const auto &resource_pair : cached_resources.available.GetResourceMap()) {
(*resources_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}

resources_data->clear_resource_load();
resources_data->set_resource_load_changed(true);
for (const auto &resource_pair : cached_resources.load.GetResourceMap()) {
(*resources_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
}

void NodeResourceInfoAccessor::AsyncResubscribe() {
RAY_LOG(DEBUG) << "Reestablishing subscription for node resource info.";
if (subscribe_resource_operation_ != nullptr) {
Expand Down
24 changes: 0 additions & 24 deletions src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,43 +460,19 @@ class NodeResourceInfoAccessor {
/// server.
virtual void AsyncResubscribe();

/// Report resource usage of a node to GCS asynchronously.
///
/// \param data_ptr The data that will be reported to GCS.
/// \param callback Callback that will be called after report finishes.
/// \return Status
virtual Status AsyncReportResourceUsage(
const std::shared_ptr<rpc::ResourcesData> &data_ptr,
const StatusCallback &callback);

/// Return resources in last report. Used by light heartbeat.
virtual const std::shared_ptr<NodeResources> &GetLastResourceUsage() {
return last_resource_usage_;
}

/// Get newest resource usage of all nodes from GCS asynchronously.
///
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
virtual Status AsyncGetAllResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &callback);

/// Fill resource fields with cached resources. Used by light resource usage report.
virtual void FillResourceUsageRequest(rpc::ReportResourceUsageRequest &resource_usage);

protected:
/// Cache which stores resource usage in last report used to check if they are changed.
/// Used by light resource usage report.
std::shared_ptr<NodeResources> last_resource_usage_ = std::make_shared<NodeResources>();

private:
// Mutex to protect the cached_resource_usage_ field.
absl::Mutex mutex_;

/// Save the resource usage data, so we can resend it again when GCS server restarts
/// from a failure.
rpc::ReportResourceUsageRequest cached_resource_usage_ ABSL_GUARDED_BY(mutex_);

/// Save the subscribe operation in this function, so we can call it again when PubSub
/// server restarts from a failure.
SubscribeOperation subscribe_resource_operation_;
Expand Down
51 changes: 3 additions & 48 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,6 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
return resource_map;
}

bool ReportResourceUsage(const std::shared_ptr<rpc::ResourcesData> resources) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage(
resources, [&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
}

std::vector<rpc::AvailableResources> GetAllAvailableResources() {
std::promise<bool> promise;
std::vector<rpc::AvailableResources> resources;
Expand Down Expand Up @@ -595,44 +588,6 @@ TEST_P(GcsClientTest, TestNodeInfo) {
ASSERT_TRUE(gcs_client_->Nodes().IsRemoved(node2_id));
}

TEST_P(GcsClientTest, TestNodeResourceUsage) {
// Register node.
auto node_info = Mocker::GenNodeInfo();
RAY_CHECK(RegisterNode(*node_info));

// Report resource usage of a node to GCS.
NodeID node_id = NodeID::FromBinary(node_info->node_id());
auto resource = std::make_shared<rpc::ResourcesData>();
resource->set_node_id(node_id.Binary());
resource->set_should_global_gc(true);
std::string resource_name = "CPU";
double resource_value = 1.0;
(*resource->mutable_resources_total())[resource_name] = resource_value;
ASSERT_TRUE(ReportResourceUsage(resource));

// Get and check last report resource usage.
auto last_resource_usage = gcs_client_->NodeResources().GetLastResourceUsage();
ASSERT_EQ(last_resource_usage->total.Get(scheduling::ResourceID::CPU()),
resource_value);
}

TEST_P(GcsClientTest, TestNodeResourceUsageWithLightResourceUsageReport) {
// Register node.
auto node_info = Mocker::GenNodeInfo();
RAY_CHECK(RegisterNode(*node_info));

// Report unchanged resource usage of a node to GCS.
NodeID node_id = NodeID::FromBinary(node_info->node_id());
auto resource = std::make_shared<rpc::ResourcesData>();
resource->set_node_id(node_id.Binary());
ASSERT_TRUE(ReportResourceUsage(resource));

// Report changed resource usage of a node to GCS.
auto resource1 = std::make_shared<rpc::ResourcesData>();
resource1->set_node_id(node_id.Binary());
ASSERT_TRUE(ReportResourceUsage(resource1));
}

TEST_P(GcsClientTest, TestGetAllAvailableResources) {
// Register node.
auto node_info = Mocker::GenNodeInfo();
Expand All @@ -650,7 +605,7 @@ TEST_P(GcsClientTest, TestGetAllAvailableResources) {
(*resource->mutable_resources_available())["GPU"] = 10.0;
(*resource->mutable_resources_total())["CPU"] = 1.0;
(*resource->mutable_resources_total())["GPU"] = 10.0;
ASSERT_TRUE(ReportResourceUsage(resource));
gcs_server_->UpdateGcsResourceManagerInTest(*resource);

// Assert get all available resources right.
std::vector<rpc::AvailableResources> resources = GetAllAvailableResources();
Expand Down Expand Up @@ -794,15 +749,15 @@ TEST_P(GcsClientTest, TestNodeTableResubscribe) {
resources->set_node_id(node_info->node_id());
// Set this flag because GCS won't publish unchanged resources.
resources->set_should_global_gc(true);
ASSERT_TRUE(ReportResourceUsage(resources));
gcs_server_->UpdateGcsResourceManagerInTest(*resources);

RestartGcsServer();

node_info = Mocker::GenNodeInfo(1);
ASSERT_TRUE(RegisterNode(*node_info));
node_id = NodeID::FromBinary(node_info->node_id());
resources->set_node_id(node_info->node_id());
ASSERT_TRUE(ReportResourceUsage(resources));
gcs_server_->UpdateGcsResourceManagerInTest(*resources);

WaitForExpectedCount(node_change_count, 2);
}
Expand Down
8 changes: 2 additions & 6 deletions src/ray/gcs/gcs_client/test/global_state_accessor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ TEST_P(GlobalStateAccessorTest, TestGetAllResourceUsage) {
std::promise<bool> promise1;
auto resources1 = std::make_shared<rpc::ResourcesData>();
resources1->set_node_id(node_table_data->node_id());
RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage(
resources1, [&promise1](Status status) { promise1.set_value(status.ok()); }));
WaitReady(promise1.get_future(), timeout_ms_);
gcs_server_->UpdateGcsResourceManagerInTest(*resources1);

resources = global_state_->GetAllResourceUsage();
resource_usage_batch_data.ParseFromString(*resources.get());
Expand All @@ -208,9 +206,7 @@ TEST_P(GlobalStateAccessorTest, TestGetAllResourceUsage) {
(*heartbeat2->mutable_resources_total())["GPU"] = 10;
(*heartbeat2->mutable_resources_available())["CPU"] = 1;
(*heartbeat2->mutable_resources_available())["GPU"] = 5;
RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage(
heartbeat2, [&promise2](Status status) { promise2.set_value(status.ok()); }));
WaitReady(promise2.get_future(), timeout_ms_);
gcs_server_->UpdateGcsResourceManagerInTest(*heartbeat2);

resources = global_state_->GetAllResourceUsage();
resource_usage_batch_data.ParseFromString(*resources.get());
Expand Down
12 changes: 0 additions & 12 deletions src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,6 @@ const absl::flat_hash_map<NodeID, rpc::ResourcesData>
return node_resource_usages_;
}

void GcsResourceManager::HandleReportResourceUsage(
rpc::ReportResourceUsageRequest request,
rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
UpdateFromResourceView(request.resources());

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST];
}

void GcsResourceManager::HandleGetAllResourceUsage(
rpc::GetAllResourceUsageRequest request,
rpc::GetAllResourceUsageReply *reply,
Expand Down Expand Up @@ -339,8 +329,6 @@ std::string GcsResourceManager::DebugString() const {
<< counts_[CountType::GET_RESOURCES_REQUEST]
<< "\n- GetAllAvailableResources request count"
<< counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST]
<< "\n- ReportResourceUsage request count: "
<< counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST]
<< "\n- GetAllResourceUsage request count: "
<< counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST];
return stream.str();
Expand Down
5 changes: 0 additions & 5 deletions src/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
rpc::GetDrainingNodesReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle report resource usage rpc from a raylet.
void HandleReportResourceUsage(rpc::ReportResourceUsageRequest request,
rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle get all resource usage rpc request.
/// Autoscaler-specific RPC called from Python.
void HandleGetAllResourceUsage(rpc::GetAllResourceUsageRequest request,
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ class GcsServer {
static constexpr char kInMemoryStorage[] = "memory";
static constexpr char kRedisStorage[] = "redis";

void UpdateGcsResourceManagerInTest(const rpc::ResourcesData &resources) {
RAY_CHECK(gcs_resource_manager_ != nullptr);
gcs_resource_manager_->UpdateFromResourceView(resources);
}

protected:
/// Generate the redis client options
RedisClientOptions GetRedisClientOptions() const;
Expand Down
10 changes: 5 additions & 5 deletions src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ TEST_F(GcsResourceManagerTest, TestResourceUsageAPI) {

gcs_resource_manager_->OnNodeAdd(*node);

rpc::ReportResourceUsageRequest report_request;
(*report_request.mutable_resources()->mutable_resources_available())["CPU"] = 2;
(*report_request.mutable_resources()->mutable_resources_total())["CPU"] = 2;
gcs_resource_manager_->UpdateNodeResourceUsage(node_id, report_request.resources());
rpc::ResourcesData resources_data;
(*resources_data.mutable_resources_available())["CPU"] = 2;
(*resources_data.mutable_resources_total())["CPU"] = 2;
gcs_resource_manager_->UpdateNodeResourceUsage(node_id, resources_data);

gcs_resource_manager_->HandleGetAllResourceUsage(
get_all_request, &get_all_reply, send_reply_callback);
Expand All @@ -123,7 +123,7 @@ TEST_F(GcsResourceManagerTest, TestResourceUsageAPI) {
ASSERT_EQ(get_all_reply2.resource_usage_data().batch().size(), 0);

// This will be ignored since the node is dead.
gcs_resource_manager_->UpdateNodeResourceUsage(node_id, report_request.resources());
gcs_resource_manager_->UpdateNodeResourceUsage(node_id, resources_data);
rpc::GetAllResourceUsageReply get_all_reply3;
gcs_resource_manager_->HandleGetAllResourceUsage(
get_all_request, &get_all_reply3, send_reply_callback);
Expand Down
10 changes: 0 additions & 10 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,6 @@ message GetAllAvailableResourcesReply {
repeated AvailableResources resources_list = 2;
}

message ReportResourceUsageRequest {
ResourcesData resources = 1;
}

message ReportResourceUsageReply {
GcsStatus status = 1;
}

message ReportWorkerFailureRequest {
WorkerTableData worker_failure = 1;
}
Expand Down Expand Up @@ -619,8 +611,6 @@ service NodeResourceInfoGcsService {
// Get available resources of all nodes.
rpc GetAllAvailableResources(GetAllAvailableResourcesRequest)
returns (GetAllAvailableResourcesReply);
// Report resource usage of a node to GCS Service.
rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply);
// Get resource usage of all nodes from GCS Service.
rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply);
// Get ids of draining nodes.
Expand Down
14 changes: 0 additions & 14 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -635,20 +635,6 @@ void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job
worker_pool_.HandleJobFinished(job_id);
}

void NodeManager::FillNormalTaskResourceUsage(rpc::ResourcesData &resources_data) {
auto last_heartbeat_resources = gcs_client_->NodeResources().GetLastResourceUsage();
auto normal_task_resources = local_task_manager_->CalcNormalTaskResources();
if (last_heartbeat_resources->normal_task_resources != normal_task_resources) {
RAY_LOG(DEBUG) << "normal_task_resources = " << normal_task_resources.DebugString();
resources_data.set_resources_normal_task_changed(true);
auto resource_map = normal_task_resources.GetResourceMap();
resources_data.mutable_resources_normal_task()->insert(resource_map.begin(),
resource_map.end());
resources_data.set_resources_normal_task_timestamp(absl::GetCurrentTimeNanos());
last_heartbeat_resources->normal_task_resources = normal_task_resources;
}
}

void NodeManager::DoLocalGC(bool triggered_by_global_gc) {
auto all_workers = worker_pool_.GetAllRegisteredWorkers();
for (const auto &driver : worker_pool_.GetAllRegisteredDrivers()) {
Expand Down
3 changes: 0 additions & 3 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// \return Void.
void TryLocalInfeasibleTaskScheduling();

/// Fill out the normal task resource report.
void FillNormalTaskResourceUsage(rpc::ResourcesData &resources_data);

/// Write out debug state to a file.
void DumpDebugState() const;

Expand Down
5 changes: 0 additions & 5 deletions src/ray/raylet/scheduling/local_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,6 @@ void LocalResourceManager::OnResourceOrStateChanged() {
resource_change_subscriber_(ToNodeResources());
}

void LocalResourceManager::ResetLastReportResourceUsage(
const NodeResources &replacement) {
last_report_resources_.reset(new NodeResources(replacement));
}

bool LocalResourceManager::ResourcesExist(scheduling::ResourceID resource_id) const {
return local_resources_.total.Has(resource_id);
}
Expand Down
7 changes: 0 additions & 7 deletions src/ray/raylet/scheduling/local_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,6 @@ class LocalResourceManager : public syncer::ReporterInterface {
/// Get the number of cpus on this node.
uint64_t GetNumCpus() const;

/// Replace the local resources by the provided value.
///
/// \param replacement: the new value.
void ResetLastReportResourceUsage(const NodeResources &replacement);

/// Check whether the specific resource exists or not in local node.
///
/// \param resource_name: the specific resource name.
Expand Down Expand Up @@ -214,8 +209,6 @@ class LocalResourceManager : public syncer::ReporterInterface {

/// A map storing when the resource was last idle.
absl::flat_hash_map<WorkArtifact, absl::optional<absl::Time>> last_idle_times_;
/// Cached resources, used to compare with newest one in light heartbeat mode.
std::unique_ptr<NodeResources> last_report_resources_;
/// Function to get used object store memory.
std::function<int64_t(void)> get_used_object_store_memory_;
/// Function to get whether the pull manager is at capacity.
Expand Down
Loading

0 comments on commit c85e15a

Please sign in to comment.