Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Remove unused GCS RPCs #40808

Merged
merged 8 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/mock/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,6 @@ class MockNodeResourceInfoAccessor : public NodeResourceInfoAccessor {
(const MultiItemCallback<rpc::AvailableResources> &callback),
(override));
MOCK_METHOD(void, AsyncResubscribe, (), (override));
MOCK_METHOD(Status,
AsyncReportResourceUsage,
(const std::shared_ptr<rpc::ResourcesData> &data_ptr,
const StatusCallback &callback),
(override));
MOCK_METHOD(Status,
AsyncGetAllResourceUsage,
(const ItemCallback<rpc::ResourceUsageBatchData> &callback),
Expand Down
6 changes: 0 additions & 6 deletions src/mock/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ class MockGcsResourceManager : public GcsResourceManager {
rpc::GetAllAvailableResourcesReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandleReportResourceUsage,
(rpc::ReportResourceUsageRequest request,
rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback),
(override));
MOCK_METHOD(void,
HandleGetAllResourceUsage,
(rpc::GetAllResourceUsageRequest request,
Expand Down
42 changes: 0 additions & 42 deletions src/ray/gcs/gcs_client/accessor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -731,48 +731,6 @@ Status NodeResourceInfoAccessor::AsyncGetDrainingNodes(
return Status::OK();
}

Status NodeResourceInfoAccessor::AsyncReportResourceUsage(
const std::shared_ptr<rpc::ResourcesData> &data_ptr, const StatusCallback &callback) {
absl::MutexLock lock(&mutex_);
last_resource_usage_ = std::make_shared<NodeResources>(
ResourceMapToNodeResources(MapFromProtobuf(data_ptr->resources_total()),
MapFromProtobuf(data_ptr->resources_available())));
cached_resource_usage_.mutable_resources()->CopyFrom(*data_ptr);
client_impl_->GetGcsRpcClient().ReportResourceUsage(
cached_resource_usage_,
[callback](const Status &status, const rpc::ReportResourceUsageReply &reply) {
if (callback) {
callback(status);
}
});
return Status::OK();
}

void NodeResourceInfoAccessor::FillResourceUsageRequest(
rpc::ReportResourceUsageRequest &resources) {
NodeResources cached_resources = *GetLastResourceUsage();

auto resources_data = resources.mutable_resources();
resources_data->clear_resources_total();
for (const auto &resource_pair : cached_resources.total.GetResourceMap()) {
(*resources_data->mutable_resources_total())[resource_pair.first] =
resource_pair.second;
}

resources_data->clear_resources_available();
for (const auto &resource_pair : cached_resources.available.GetResourceMap()) {
(*resources_data->mutable_resources_available())[resource_pair.first] =
resource_pair.second;
}

resources_data->clear_resource_load();
resources_data->set_resource_load_changed(true);
for (const auto &resource_pair : cached_resources.load.GetResourceMap()) {
(*resources_data->mutable_resource_load())[resource_pair.first] =
resource_pair.second;
}
}

void NodeResourceInfoAccessor::AsyncResubscribe() {
RAY_LOG(DEBUG) << "Reestablishing subscription for node resource info.";
if (subscribe_resource_operation_ != nullptr) {
Expand Down
24 changes: 0 additions & 24 deletions src/ray/gcs/gcs_client/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,43 +460,19 @@ class NodeResourceInfoAccessor {
/// server.
virtual void AsyncResubscribe();

/// Report resource usage of a node to GCS asynchronously.
///
/// \param data_ptr The data that will be reported to GCS.
/// \param callback Callback that will be called after report finishes.
/// \return Status
virtual Status AsyncReportResourceUsage(
const std::shared_ptr<rpc::ResourcesData> &data_ptr,
const StatusCallback &callback);

/// Return resources in last report. Used by light heartbeat.
virtual const std::shared_ptr<NodeResources> &GetLastResourceUsage() {
return last_resource_usage_;
}

/// Get newest resource usage of all nodes from GCS asynchronously.
///
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
virtual Status AsyncGetAllResourceUsage(
const ItemCallback<rpc::ResourceUsageBatchData> &callback);

/// Fill resource fields with cached resources. Used by light resource usage report.
virtual void FillResourceUsageRequest(rpc::ReportResourceUsageRequest &resource_usage);

protected:
/// Cache which stores resource usage in last report used to check if they are changed.
/// Used by light resource usage report.
std::shared_ptr<NodeResources> last_resource_usage_ = std::make_shared<NodeResources>();

private:
// Mutex to protect the cached_resource_usage_ field.
absl::Mutex mutex_;

/// Save the resource usage data, so we can resend it again when GCS server restarts
/// from a failure.
rpc::ReportResourceUsageRequest cached_resource_usage_ ABSL_GUARDED_BY(mutex_);

/// Save the subscribe operation in this function, so we can call it again when PubSub
/// server restarts from a failure.
SubscribeOperation subscribe_resource_operation_;
Expand Down
51 changes: 3 additions & 48 deletions src/ray/gcs/gcs_client/test/gcs_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -372,13 +372,6 @@ class GcsClientTest : public ::testing::TestWithParam<bool> {
return resource_map;
}

bool ReportResourceUsage(const std::shared_ptr<rpc::ResourcesData> resources) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage(
resources, [&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
}

std::vector<rpc::AvailableResources> GetAllAvailableResources() {
std::promise<bool> promise;
std::vector<rpc::AvailableResources> resources;
Expand Down Expand Up @@ -595,44 +588,6 @@ TEST_P(GcsClientTest, TestNodeInfo) {
ASSERT_TRUE(gcs_client_->Nodes().IsRemoved(node2_id));
}

TEST_P(GcsClientTest, TestNodeResourceUsage) {
// Register node.
auto node_info = Mocker::GenNodeInfo();
RAY_CHECK(RegisterNode(*node_info));

// Report resource usage of a node to GCS.
NodeID node_id = NodeID::FromBinary(node_info->node_id());
auto resource = std::make_shared<rpc::ResourcesData>();
resource->set_node_id(node_id.Binary());
resource->set_should_global_gc(true);
std::string resource_name = "CPU";
double resource_value = 1.0;
(*resource->mutable_resources_total())[resource_name] = resource_value;
ASSERT_TRUE(ReportResourceUsage(resource));

// Get and check last report resource usage.
auto last_resource_usage = gcs_client_->NodeResources().GetLastResourceUsage();
ASSERT_EQ(last_resource_usage->total.Get(scheduling::ResourceID::CPU()),
resource_value);
}

TEST_P(GcsClientTest, TestNodeResourceUsageWithLightResourceUsageReport) {
// Register node.
auto node_info = Mocker::GenNodeInfo();
RAY_CHECK(RegisterNode(*node_info));

// Report unchanged resource usage of a node to GCS.
NodeID node_id = NodeID::FromBinary(node_info->node_id());
auto resource = std::make_shared<rpc::ResourcesData>();
resource->set_node_id(node_id.Binary());
ASSERT_TRUE(ReportResourceUsage(resource));

// Report changed resource usage of a node to GCS.
auto resource1 = std::make_shared<rpc::ResourcesData>();
resource1->set_node_id(node_id.Binary());
ASSERT_TRUE(ReportResourceUsage(resource1));
}

TEST_P(GcsClientTest, TestGetAllAvailableResources) {
// Register node.
auto node_info = Mocker::GenNodeInfo();
Expand All @@ -650,7 +605,7 @@ TEST_P(GcsClientTest, TestGetAllAvailableResources) {
(*resource->mutable_resources_available())["GPU"] = 10.0;
(*resource->mutable_resources_total())["CPU"] = 1.0;
(*resource->mutable_resources_total())["GPU"] = 10.0;
ASSERT_TRUE(ReportResourceUsage(resource));
gcs_server_->UpdateGcsResourceManagerInTest(*resource);

// Assert get all available resources right.
std::vector<rpc::AvailableResources> resources = GetAllAvailableResources();
Expand Down Expand Up @@ -794,15 +749,15 @@ TEST_P(GcsClientTest, TestNodeTableResubscribe) {
resources->set_node_id(node_info->node_id());
// Set this flag because GCS won't publish unchanged resources.
resources->set_should_global_gc(true);
ASSERT_TRUE(ReportResourceUsage(resources));
gcs_server_->UpdateGcsResourceManagerInTest(*resources);

RestartGcsServer();

node_info = Mocker::GenNodeInfo(1);
ASSERT_TRUE(RegisterNode(*node_info));
node_id = NodeID::FromBinary(node_info->node_id());
resources->set_node_id(node_info->node_id());
ASSERT_TRUE(ReportResourceUsage(resources));
gcs_server_->UpdateGcsResourceManagerInTest(*resources);

WaitForExpectedCount(node_change_count, 2);
}
Expand Down
8 changes: 2 additions & 6 deletions src/ray/gcs/gcs_client/test/global_state_accessor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ TEST_P(GlobalStateAccessorTest, TestGetAllResourceUsage) {
std::promise<bool> promise1;
auto resources1 = std::make_shared<rpc::ResourcesData>();
resources1->set_node_id(node_table_data->node_id());
RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage(
resources1, [&promise1](Status status) { promise1.set_value(status.ok()); }));
WaitReady(promise1.get_future(), timeout_ms_);
gcs_server_->UpdateGcsResourceManagerInTest(*resources1);

resources = global_state_->GetAllResourceUsage();
resource_usage_batch_data.ParseFromString(*resources.get());
Expand All @@ -208,9 +206,7 @@ TEST_P(GlobalStateAccessorTest, TestGetAllResourceUsage) {
(*heartbeat2->mutable_resources_total())["GPU"] = 10;
(*heartbeat2->mutable_resources_available())["CPU"] = 1;
(*heartbeat2->mutable_resources_available())["GPU"] = 5;
RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage(
heartbeat2, [&promise2](Status status) { promise2.set_value(status.ok()); }));
WaitReady(promise2.get_future(), timeout_ms_);
gcs_server_->UpdateGcsResourceManagerInTest(*heartbeat2);

resources = global_state_->GetAllResourceUsage();
resource_usage_batch_data.ParseFromString(*resources.get());
Expand Down
12 changes: 0 additions & 12 deletions src/ray/gcs/gcs_server/gcs_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,6 @@ const absl::flat_hash_map<NodeID, rpc::ResourcesData>
return node_resource_usages_;
}

void GcsResourceManager::HandleReportResourceUsage(
rpc::ReportResourceUsageRequest request,
rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) {
UpdateFromResourceView(request.resources());

GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST];
}

void GcsResourceManager::HandleGetAllResourceUsage(
rpc::GetAllResourceUsageRequest request,
rpc::GetAllResourceUsageReply *reply,
Expand Down Expand Up @@ -339,8 +329,6 @@ std::string GcsResourceManager::DebugString() const {
<< counts_[CountType::GET_RESOURCES_REQUEST]
<< "\n- GetAllAvailableResources request count"
<< counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST]
<< "\n- ReportResourceUsage request count: "
<< counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST]
<< "\n- GetAllResourceUsage request count: "
<< counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST];
return stream.str();
Expand Down
5 changes: 0 additions & 5 deletions src/ray/gcs/gcs_server/gcs_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler,
rpc::GetDrainingNodesReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle report resource usage rpc from a raylet.
void HandleReportResourceUsage(rpc::ReportResourceUsageRequest request,
rpc::ReportResourceUsageReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

/// Handle get all resource usage rpc request.
/// Autoscaler-specific RPC called from Python.
void HandleGetAllResourceUsage(rpc::GetAllResourceUsageRequest request,
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ class GcsServer {
static constexpr char kInMemoryStorage[] = "memory";
static constexpr char kRedisStorage[] = "redis";

void UpdateGcsResourceManagerInTest(const rpc::ResourcesData &resources) {
RAY_CHECK(gcs_resource_manager_ != nullptr);
gcs_resource_manager_->UpdateFromResourceView(resources);
}

protected:
/// Generate the redis client options
RedisClientOptions GetRedisClientOptions() const;
Expand Down
10 changes: 5 additions & 5 deletions src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ TEST_F(GcsResourceManagerTest, TestResourceUsageAPI) {

gcs_resource_manager_->OnNodeAdd(*node);

rpc::ReportResourceUsageRequest report_request;
(*report_request.mutable_resources()->mutable_resources_available())["CPU"] = 2;
(*report_request.mutable_resources()->mutable_resources_total())["CPU"] = 2;
gcs_resource_manager_->UpdateNodeResourceUsage(node_id, report_request.resources());
rpc::ResourcesData resources_data;
(*resources_data.mutable_resources_available())["CPU"] = 2;
(*resources_data.mutable_resources_total())["CPU"] = 2;
gcs_resource_manager_->UpdateNodeResourceUsage(node_id, resources_data);

gcs_resource_manager_->HandleGetAllResourceUsage(
get_all_request, &get_all_reply, send_reply_callback);
Expand All @@ -123,7 +123,7 @@ TEST_F(GcsResourceManagerTest, TestResourceUsageAPI) {
ASSERT_EQ(get_all_reply2.resource_usage_data().batch().size(), 0);

// This will be ignored since the node is dead.
gcs_resource_manager_->UpdateNodeResourceUsage(node_id, report_request.resources());
gcs_resource_manager_->UpdateNodeResourceUsage(node_id, resources_data);
rpc::GetAllResourceUsageReply get_all_reply3;
gcs_resource_manager_->HandleGetAllResourceUsage(
get_all_request, &get_all_reply3, send_reply_callback);
Expand Down
10 changes: 0 additions & 10 deletions src/ray/protobuf/gcs_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -237,14 +237,6 @@ message GetAllAvailableResourcesReply {
repeated AvailableResources resources_list = 2;
}

message ReportResourceUsageRequest {
ResourcesData resources = 1;
}

message ReportResourceUsageReply {
GcsStatus status = 1;
}

message ReportWorkerFailureRequest {
WorkerTableData worker_failure = 1;
}
Expand Down Expand Up @@ -619,8 +611,6 @@ service NodeResourceInfoGcsService {
// Get available resources of all nodes.
rpc GetAllAvailableResources(GetAllAvailableResourcesRequest)
returns (GetAllAvailableResourcesReply);
// Report resource usage of a node to GCS Service.
rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply);
// Get resource usage of all nodes from GCS Service.
rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply);
// Get ids of draining nodes.
Expand Down
14 changes: 0 additions & 14 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -635,20 +635,6 @@ void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job
worker_pool_.HandleJobFinished(job_id);
}

void NodeManager::FillNormalTaskResourceUsage(rpc::ResourcesData &resources_data) {
auto last_heartbeat_resources = gcs_client_->NodeResources().GetLastResourceUsage();
auto normal_task_resources = local_task_manager_->CalcNormalTaskResources();
if (last_heartbeat_resources->normal_task_resources != normal_task_resources) {
RAY_LOG(DEBUG) << "normal_task_resources = " << normal_task_resources.DebugString();
resources_data.set_resources_normal_task_changed(true);
auto resource_map = normal_task_resources.GetResourceMap();
resources_data.mutable_resources_normal_task()->insert(resource_map.begin(),
resource_map.end());
resources_data.set_resources_normal_task_timestamp(absl::GetCurrentTimeNanos());
last_heartbeat_resources->normal_task_resources = normal_task_resources;
}
}

void NodeManager::DoLocalGC(bool triggered_by_global_gc) {
auto all_workers = worker_pool_.GetAllRegisteredWorkers();
for (const auto &driver : worker_pool_.GetAllRegisteredDrivers()) {
Expand Down
3 changes: 0 additions & 3 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,9 +285,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// \return Void.
void TryLocalInfeasibleTaskScheduling();

/// Fill out the normal task resource report.
void FillNormalTaskResourceUsage(rpc::ResourcesData &resources_data);

/// Write out debug state to a file.
void DumpDebugState() const;

Expand Down
5 changes: 0 additions & 5 deletions src/ray/raylet/scheduling/local_resource_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -390,11 +390,6 @@ void LocalResourceManager::OnResourceOrStateChanged() {
resource_change_subscriber_(ToNodeResources());
}

void LocalResourceManager::ResetLastReportResourceUsage(
const NodeResources &replacement) {
last_report_resources_.reset(new NodeResources(replacement));
}

bool LocalResourceManager::ResourcesExist(scheduling::ResourceID resource_id) const {
return local_resources_.total.Has(resource_id);
}
Expand Down
7 changes: 0 additions & 7 deletions src/ray/raylet/scheduling/local_resource_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,6 @@ class LocalResourceManager : public syncer::ReporterInterface {
/// Get the number of cpus on this node.
uint64_t GetNumCpus() const;

/// Replace the local resources by the provided value.
///
/// \param replacement: the new value.
void ResetLastReportResourceUsage(const NodeResources &replacement);

/// Check whether the specific resource exists or not in local node.
///
/// \param resource_name: the specific resource name.
Expand Down Expand Up @@ -214,8 +209,6 @@ class LocalResourceManager : public syncer::ReporterInterface {

/// A map storing when the resource was last idle.
absl::flat_hash_map<WorkArtifact, absl::optional<absl::Time>> last_idle_times_;
/// Cached resources, used to compare with newest one in light heartbeat mode.
std::unique_ptr<NodeResources> last_report_resources_;
/// Function to get used object store memory.
std::function<int64_t(void)> get_used_object_store_memory_;
/// Function to get whether the pull manager is at capacity.
Expand Down
Loading