From 8360f68ae51ec4685e62db461b7ed30c058fed38 Mon Sep 17 00:00:00 2001 From: vitsai Date: Thu, 26 Oct 2023 22:19:52 +0000 Subject: [PATCH 1/7] remove dead code Signed-off-by: vitsai --- src/ray/gcs/gcs_client/accessor.cc | 42 ------------------- src/ray/gcs/gcs_client/accessor.h | 17 -------- .../gcs/gcs_client/test/gcs_client_test.cc | 38 ----------------- .../gcs/gcs_server/gcs_resource_manager.cc | 10 ----- src/ray/gcs/gcs_server/gcs_resource_manager.h | 5 --- src/ray/protobuf/gcs_service.proto | 10 ----- src/ray/raylet/node_manager.cc | 14 ------- src/ray/raylet/node_manager.h | 3 -- .../scheduling/local_resource_manager.cc | 5 --- .../scheduling/local_resource_manager.h | 5 --- src/ray/rpc/gcs_server/gcs_rpc_server.h | 5 --- 11 files changed, 154 deletions(-) diff --git a/src/ray/gcs/gcs_client/accessor.cc b/src/ray/gcs/gcs_client/accessor.cc index 10e990ae29a25..d459d2858266a 100644 --- a/src/ray/gcs/gcs_client/accessor.cc +++ b/src/ray/gcs/gcs_client/accessor.cc @@ -731,48 +731,6 @@ Status NodeResourceInfoAccessor::AsyncGetDrainingNodes( return Status::OK(); } -Status NodeResourceInfoAccessor::AsyncReportResourceUsage( - const std::shared_ptr &data_ptr, const StatusCallback &callback) { - absl::MutexLock lock(&mutex_); - last_resource_usage_ = std::make_shared( - ResourceMapToNodeResources(MapFromProtobuf(data_ptr->resources_total()), - MapFromProtobuf(data_ptr->resources_available()))); - cached_resource_usage_.mutable_resources()->CopyFrom(*data_ptr); - client_impl_->GetGcsRpcClient().ReportResourceUsage( - cached_resource_usage_, - [callback](const Status &status, const rpc::ReportResourceUsageReply &reply) { - if (callback) { - callback(status); - } - }); - return Status::OK(); -} - -void NodeResourceInfoAccessor::FillResourceUsageRequest( - rpc::ReportResourceUsageRequest &resources) { - NodeResources cached_resources = *GetLastResourceUsage(); - - auto resources_data = resources.mutable_resources(); - resources_data->clear_resources_total(); - for (const auto &resource_pair : cached_resources.total.GetResourceMap()) { - (*resources_data->mutable_resources_total())[resource_pair.first] = - resource_pair.second; - } - - resources_data->clear_resources_available(); - for (const auto &resource_pair : cached_resources.available.GetResourceMap()) { - (*resources_data->mutable_resources_available())[resource_pair.first] = - resource_pair.second; - } - - resources_data->clear_resource_load(); - resources_data->set_resource_load_changed(true); - for (const auto &resource_pair : cached_resources.load.GetResourceMap()) { - (*resources_data->mutable_resource_load())[resource_pair.first] = - resource_pair.second; - } -} - void NodeResourceInfoAccessor::AsyncResubscribe() { RAY_LOG(DEBUG) << "Reestablishing subscription for node resource info."; if (subscribe_resource_operation_ != nullptr) { diff --git a/src/ray/gcs/gcs_client/accessor.h b/src/ray/gcs/gcs_client/accessor.h index 0ca00bf4499ce..7ec2c284a63cf 100644 --- a/src/ray/gcs/gcs_client/accessor.h +++ b/src/ray/gcs/gcs_client/accessor.h @@ -460,20 +460,6 @@ class NodeResourceInfoAccessor { /// server. virtual void AsyncResubscribe(); - /// Report resource usage of a node to GCS asynchronously. - /// - /// \param data_ptr The data that will be reported to GCS. - /// \param callback Callback that will be called after report finishes. - /// \return Status - virtual Status AsyncReportResourceUsage( - const std::shared_ptr &data_ptr, - const StatusCallback &callback); - - /// Return resources in last report. Used by light heartbeat. - virtual const std::shared_ptr &GetLastResourceUsage() { - return last_resource_usage_; - } - /// Get newest resource usage of all nodes from GCS asynchronously. /// /// \param callback Callback that will be called after lookup finishes. @@ -481,9 +467,6 @@ class NodeResourceInfoAccessor { virtual Status AsyncGetAllResourceUsage( const ItemCallback &callback); - /// Fill resource fields with cached resources. Used by light resource usage report. - virtual void FillResourceUsageRequest(rpc::ReportResourceUsageRequest &resource_usage); - protected: /// Cache which stores resource usage in last report used to check if they are changed. /// Used by light resource usage report. diff --git a/src/ray/gcs/gcs_client/test/gcs_client_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_test.cc index 2d7cec8e39322..0cb97b1839d64 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_test.cc @@ -595,44 +595,6 @@ TEST_P(GcsClientTest, TestNodeInfo) { ASSERT_TRUE(gcs_client_->Nodes().IsRemoved(node2_id)); } -TEST_P(GcsClientTest, TestNodeResourceUsage) { - // Register node. - auto node_info = Mocker::GenNodeInfo(); - RAY_CHECK(RegisterNode(*node_info)); - - // Report resource usage of a node to GCS. - NodeID node_id = NodeID::FromBinary(node_info->node_id()); - auto resource = std::make_shared(); - resource->set_node_id(node_id.Binary()); - resource->set_should_global_gc(true); - std::string resource_name = "CPU"; - double resource_value = 1.0; - (*resource->mutable_resources_total())[resource_name] = resource_value; - ASSERT_TRUE(ReportResourceUsage(resource)); - - // Get and check last report resource usage. - auto last_resource_usage = gcs_client_->NodeResources().GetLastResourceUsage(); - ASSERT_EQ(last_resource_usage->total.Get(scheduling::ResourceID::CPU()), - resource_value); -} - -TEST_P(GcsClientTest, TestNodeResourceUsageWithLightResourceUsageReport) { - // Register node. - auto node_info = Mocker::GenNodeInfo(); - RAY_CHECK(RegisterNode(*node_info)); - - // Report unchanged resource usage of a node to GCS. - NodeID node_id = NodeID::FromBinary(node_info->node_id()); - auto resource = std::make_shared(); - resource->set_node_id(node_id.Binary()); - ASSERT_TRUE(ReportResourceUsage(resource)); - - // Report changed resource usage of a node to GCS. - auto resource1 = std::make_shared(); - resource1->set_node_id(node_id.Binary()); - ASSERT_TRUE(ReportResourceUsage(resource1)); -} - TEST_P(GcsClientTest, TestGetAllAvailableResources) { // Register node. auto node_info = Mocker::GenNodeInfo(); diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 81ecac15738a9..783dbdba374e6 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -171,16 +171,6 @@ const absl::flat_hash_map return node_resource_usages_; } -void GcsResourceManager::HandleReportResourceUsage( - rpc::ReportResourceUsageRequest request, - rpc::ReportResourceUsageReply *reply, - rpc::SendReplyCallback send_reply_callback) { - UpdateFromResourceView(request.resources()); - - GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); - ++counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST]; -} - void GcsResourceManager::HandleGetAllResourceUsage( rpc::GetAllResourceUsageRequest request, rpc::GetAllResourceUsageReply *reply, diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.h b/src/ray/gcs/gcs_server/gcs_resource_manager.h index d296a3c7c6009..0c596fd654d67 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.h @@ -92,11 +92,6 @@ class GcsResourceManager : public rpc::NodeResourceInfoHandler, rpc::GetDrainingNodesReply *reply, rpc::SendReplyCallback send_reply_callback) override; - /// Handle report resource usage rpc from a raylet. - void HandleReportResourceUsage(rpc::ReportResourceUsageRequest request, - rpc::ReportResourceUsageReply *reply, - rpc::SendReplyCallback send_reply_callback) override; - /// Handle get all resource usage rpc request. /// Autoscaler-specific RPC called from Python. void HandleGetAllResourceUsage(rpc::GetAllResourceUsageRequest request, diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index e623f3e2e1713..06082a0d0435a 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -237,14 +237,6 @@ message GetAllAvailableResourcesReply { repeated AvailableResources resources_list = 2; } -message ReportResourceUsageRequest { - ResourcesData resources = 1; -} - -message ReportResourceUsageReply { - GcsStatus status = 1; -} - message ReportWorkerFailureRequest { WorkerTableData worker_failure = 1; } @@ -619,8 +611,6 @@ service NodeResourceInfoGcsService { // Get available resources of all nodes. rpc GetAllAvailableResources(GetAllAvailableResourcesRequest) returns (GetAllAvailableResourcesReply); - // Report resource usage of a node to GCS Service. - rpc ReportResourceUsage(ReportResourceUsageRequest) returns (ReportResourceUsageReply); // Get resource usage of all nodes from GCS Service. rpc GetAllResourceUsage(GetAllResourceUsageRequest) returns (GetAllResourceUsageReply); // Get ids of draining nodes. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index af258f4bc280e..1cb50823e6482 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -635,20 +635,6 @@ void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job worker_pool_.HandleJobFinished(job_id); } -void NodeManager::FillNormalTaskResourceUsage(rpc::ResourcesData &resources_data) { - auto last_heartbeat_resources = gcs_client_->NodeResources().GetLastResourceUsage(); - auto normal_task_resources = local_task_manager_->CalcNormalTaskResources(); - if (last_heartbeat_resources->normal_task_resources != normal_task_resources) { - RAY_LOG(DEBUG) << "normal_task_resources = " << normal_task_resources.DebugString(); - resources_data.set_resources_normal_task_changed(true); - auto resource_map = normal_task_resources.GetResourceMap(); - resources_data.mutable_resources_normal_task()->insert(resource_map.begin(), - resource_map.end()); - resources_data.set_resources_normal_task_timestamp(absl::GetCurrentTimeNanos()); - last_heartbeat_resources->normal_task_resources = normal_task_resources; - } -} - void NodeManager::DoLocalGC(bool triggered_by_global_gc) { auto all_workers = worker_pool_.GetAllRegisteredWorkers(); for (const auto &driver : worker_pool_.GetAllRegisteredDrivers()) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 5b53adff5464c..0600fa65bdc90 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -285,9 +285,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler, /// \return Void. void TryLocalInfeasibleTaskScheduling(); - /// Fill out the normal task resource report. - void FillNormalTaskResourceUsage(rpc::ResourcesData &resources_data); - /// Write out debug state to a file. void DumpDebugState() const; diff --git a/src/ray/raylet/scheduling/local_resource_manager.cc b/src/ray/raylet/scheduling/local_resource_manager.cc index 8523f60f0cdc3..a7e6b89574b84 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.cc +++ b/src/ray/raylet/scheduling/local_resource_manager.cc @@ -390,11 +390,6 @@ void LocalResourceManager::OnResourceOrStateChanged() { resource_change_subscriber_(ToNodeResources()); } -void LocalResourceManager::ResetLastReportResourceUsage( - const NodeResources &replacement) { - last_report_resources_.reset(new NodeResources(replacement)); -} - bool LocalResourceManager::ResourcesExist(scheduling::ResourceID resource_id) const { return local_resources_.total.Has(resource_id); } diff --git a/src/ray/raylet/scheduling/local_resource_manager.h b/src/ray/raylet/scheduling/local_resource_manager.h index aaa995565e036..353b935c732b4 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.h +++ b/src/ray/raylet/scheduling/local_resource_manager.h @@ -131,11 +131,6 @@ class LocalResourceManager : public syncer::ReporterInterface { /// Get the number of cpus on this node. uint64_t GetNumCpus() const; - /// Replace the local resources by the provided value. - /// - /// \param replacement: the new value. - void ResetLastReportResourceUsage(const NodeResources &replacement); - /// Check whether the specific resource exists or not in local node. /// /// \param resource_name: the specific resource name. diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 2599aa0c7f86f..55f6e5a95b1f1 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -419,10 +419,6 @@ class NodeResourceInfoGcsServiceHandler { rpc::GetDrainingNodesReply *reply, rpc::SendReplyCallback send_reply_callback) = 0; - virtual void HandleReportResourceUsage(ReportResourceUsageRequest request, - ReportResourceUsageReply *reply, - SendReplyCallback send_reply_callback) = 0; - virtual void HandleGetAllResourceUsage(GetAllResourceUsageRequest request, GetAllResourceUsageReply *reply, SendReplyCallback send_reply_callback) = 0; @@ -448,7 +444,6 @@ class NodeResourceInfoGrpcService : public GrpcService { NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetResources); NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetAllAvailableResources); NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetDrainingNodes); - NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(ReportResourceUsage); NODE_RESOURCE_INFO_SERVICE_RPC_HANDLER(GetAllResourceUsage); } From c1576145b9b64ba02ab041d583638727566d1eda Mon Sep 17 00:00:00 2001 From: vitsai Date: Fri, 27 Oct 2023 17:19:24 +0000 Subject: [PATCH 2/7] wip Signed-off-by: vitsai --- .../scheduling/local_resource_manager.h | 34 +++++++++++++++++++ .../gcs/gcs_client/test/gcs_client_test.cc | 1 + .../scheduling/local_resource_manager.h | 2 +- 3 files changed, 36 insertions(+), 1 deletion(-) create mode 100644 src/mock/ray/raylet/scheduling/local_resource_manager.h diff --git a/src/mock/ray/raylet/scheduling/local_resource_manager.h b/src/mock/ray/raylet/scheduling/local_resource_manager.h new file mode 100644 index 0000000000000..ddd64bd97f566 --- /dev/null +++ b/src/mock/ray/raylet/scheduling/local_resource_manager.h @@ -0,0 +1,34 @@ +// Copyright 2023 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace ray { +namespace raylet { + +class MockWork : public Work { + public: +}; + +} // namespace raylet +} // namespace ray + +namespace ray { +namespace raylet { + +class MockLocalTaskManager : public LocalTaskManager { + public: + MOCK_METHOD(void, PopulateResourceUsage, (rpc::ResourcesData & data), (const)); +}; + +} // namespace raylet +} // namespace ray diff --git a/src/ray/gcs/gcs_client/test/gcs_client_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_test.cc index 0cb97b1839d64..acc1061ca9247 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_test.cc @@ -96,6 +96,7 @@ class GcsClientTest : public ::testing::TestWithParam { gcs::GcsClientOptions options("127.0.0.1:5397"); gcs_client_ = std::make_unique(options); ReconnectClient(); + local_resource_manager_ = std::make_unique(); } void TearDown() override { diff --git a/src/ray/raylet/scheduling/local_resource_manager.h b/src/ray/raylet/scheduling/local_resource_manager.h index 353b935c732b4..fb1824f12796d 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.h +++ b/src/ray/raylet/scheduling/local_resource_manager.h @@ -142,7 +142,7 @@ class LocalResourceManager : public syncer::ReporterInterface { int64_t after_version, syncer::MessageType message_type) const override; /// Populate resource usage. - void PopulateResourceUsage(rpc::ResourcesData &resources_data) const; + virtual void PopulateResourceUsage(rpc::ResourcesData &resources_data) const; /// Record the metrics. void RecordMetrics() const; From 783bae3171424df3f3223fd096ffea66ec28ee55 Mon Sep 17 00:00:00 2001 From: vitsai Date: Mon, 30 Oct 2023 20:58:52 +0000 Subject: [PATCH 3/7] fix client test Signed-off-by: vitsai --- src/ray/gcs/gcs_client/accessor.h | 7 ------ .../gcs/gcs_client/test/gcs_client_test.cc | 23 +++++++++++-------- src/ray/gcs/gcs_server/gcs_server.h | 5 ++++ src/ray/rpc/gcs_server/gcs_rpc_client.h | 6 ----- 4 files changed, 19 insertions(+), 22 deletions(-) diff --git a/src/ray/gcs/gcs_client/accessor.h b/src/ray/gcs/gcs_client/accessor.h index 7ec2c284a63cf..462301592a908 100644 --- a/src/ray/gcs/gcs_client/accessor.h +++ b/src/ray/gcs/gcs_client/accessor.h @@ -473,13 +473,6 @@ class NodeResourceInfoAccessor { std::shared_ptr last_resource_usage_ = std::make_shared(); private: - // Mutex to protect the cached_resource_usage_ field. - absl::Mutex mutex_; - - /// Save the resource usage data, so we can resend it again when GCS server restarts - /// from a failure. - rpc::ReportResourceUsageRequest cached_resource_usage_ ABSL_GUARDED_BY(mutex_); - /// Save the subscribe operation in this function, so we can call it again when PubSub /// server restarts from a failure. SubscribeOperation subscribe_resource_operation_; diff --git a/src/ray/gcs/gcs_client/test/gcs_client_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_test.cc index acc1061ca9247..639fbbbea2162 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_test.cc @@ -96,7 +96,6 @@ class GcsClientTest : public ::testing::TestWithParam { gcs::GcsClientOptions options("127.0.0.1:5397"); gcs_client_ = std::make_unique(options); ReconnectClient(); - local_resource_manager_ = std::make_unique(); } void TearDown() override { @@ -373,11 +372,9 @@ class GcsClientTest : public ::testing::TestWithParam { return resource_map; } - bool ReportResourceUsage(const std::shared_ptr resources) { - std::promise promise; - RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage( - resources, [&promise](Status status) { promise.set_value(status.ok()); })); - return WaitReady(promise.get_future(), timeout_ms_); + void ReportResourceUsage(const std::shared_ptr resources) { + // Do it from the server side. + gcs_server_->GetGcsResourceManager().UpdateFromResourceView(*resources); } std::vector GetAllAvailableResources() { @@ -455,6 +452,14 @@ class GcsClientTest : public ::testing::TestWithParam { std::unique_ptr client_io_service_thread_; std::unique_ptr client_io_service_; std::unique_ptr gcs_client_; + std::unique_ptr syncer_io_service_thread_; + std::unique_ptr syncer_io_service_; + + // Ray Syncer and reporter. + std::unique_ptr ray_syncer_; + std::unique_ptr ray_syncer_service_; + std::unique_ptr syncer_server_; + std::unique_ptr local_resource_manager_; // Timeout waiting for GCS server reply, default is 2s. const std::chrono::milliseconds timeout_ms_{2000}; @@ -613,7 +618,7 @@ TEST_P(GcsClientTest, TestGetAllAvailableResources) { (*resource->mutable_resources_available())["GPU"] = 10.0; (*resource->mutable_resources_total())["CPU"] = 1.0; (*resource->mutable_resources_total())["GPU"] = 10.0; - ASSERT_TRUE(ReportResourceUsage(resource)); + ReportResourceUsage(resource); // Assert get all available resources right. std::vector resources = GetAllAvailableResources(); @@ -757,7 +762,7 @@ TEST_P(GcsClientTest, TestNodeTableResubscribe) { resources->set_node_id(node_info->node_id()); // Set this flag because GCS won't publish unchanged resources. resources->set_should_global_gc(true); - ASSERT_TRUE(ReportResourceUsage(resources)); + ReportResourceUsage(resources); RestartGcsServer(); @@ -765,7 +770,7 @@ TEST_P(GcsClientTest, TestNodeTableResubscribe) { ASSERT_TRUE(RegisterNode(*node_info)); node_id = NodeID::FromBinary(node_info->node_id()); resources->set_node_id(node_info->node_id()); - ASSERT_TRUE(ReportResourceUsage(resources)); + ReportResourceUsage(resources); WaitForExpectedCount(node_change_count, 2); } diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index 49b3d4408db3a..ee517b24460fb 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -110,6 +110,11 @@ class GcsServer { static constexpr char kInMemoryStorage[] = "memory"; static constexpr char kRedisStorage[] = "redis"; + GcsResourceManager &GetGcsResourceManager() { + RAY_CHECK(gcs_resource_manager_ != nullptr); + return *gcs_resource_manager_; + } + protected: /// Generate the redis client options RedisClientOptions GetRedisClientOptions() const; diff --git a/src/ray/rpc/gcs_server/gcs_rpc_client.h b/src/ray/rpc/gcs_server/gcs_rpc_client.h index 516bf285b5aed..205418ae70ad0 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_client.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_client.h @@ -367,12 +367,6 @@ class GcsRpcClient { node_resource_info_grpc_client_, /*method_timeout_ms*/ -1, ) - /// Report resource usage of a node to GCS Service. - VOID_GCS_RPC_CLIENT_METHOD(NodeResourceInfoGcsService, - ReportResourceUsage, - node_resource_info_grpc_client_, - /*method_timeout_ms*/ -1, ) - /// Get resource usage of all nodes from GCS Service. VOID_GCS_RPC_CLIENT_METHOD(NodeResourceInfoGcsService, GetAllResourceUsage, From aa7e7248d3edded9963c9f16806bc98dd1493c24 Mon Sep 17 00:00:00 2001 From: vitsai Date: Mon, 30 Oct 2023 21:01:07 +0000 Subject: [PATCH 4/7] remove mock Signed-off-by: vitsai --- .../scheduling/local_resource_manager.h | 34 ------------------- 1 file changed, 34 deletions(-) delete mode 100644 src/mock/ray/raylet/scheduling/local_resource_manager.h diff --git a/src/mock/ray/raylet/scheduling/local_resource_manager.h b/src/mock/ray/raylet/scheduling/local_resource_manager.h deleted file mode 100644 index ddd64bd97f566..0000000000000 --- a/src/mock/ray/raylet/scheduling/local_resource_manager.h +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2023 The Ray Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -namespace ray { -namespace raylet { - -class MockWork : public Work { - public: -}; - -} // namespace raylet -} // namespace ray - -namespace ray { -namespace raylet { - -class MockLocalTaskManager : public LocalTaskManager { - public: - MOCK_METHOD(void, PopulateResourceUsage, (rpc::ResourcesData & data), (const)); -}; - -} // namespace raylet -} // namespace ray From d80ebf6a9298f8649782b4b00e006c0860ce555b Mon Sep 17 00:00:00 2001 From: vitsai Date: Mon, 30 Oct 2023 21:08:01 +0000 Subject: [PATCH 5/7] fix accessor test Signed-off-by: vitsai --- src/mock/ray/gcs/gcs_client/accessor.h | 5 ----- src/mock/ray/gcs/gcs_server/gcs_resource_manager.h | 6 ------ .../gcs/gcs_client/test/global_state_accessor_test.cc | 8 ++------ src/ray/gcs/gcs_server/gcs_resource_manager.cc | 2 -- .../gcs/gcs_server/test/gcs_resource_manager_test.cc | 10 +++++----- 5 files changed, 7 insertions(+), 24 deletions(-) diff --git a/src/mock/ray/gcs/gcs_client/accessor.h b/src/mock/ray/gcs/gcs_client/accessor.h index 3725449e155f1..91f68a47ad5d3 100644 --- a/src/mock/ray/gcs/gcs_client/accessor.h +++ b/src/mock/ray/gcs/gcs_client/accessor.h @@ -181,11 +181,6 @@ class MockNodeResourceInfoAccessor : public NodeResourceInfoAccessor { (const MultiItemCallback &callback), (override)); MOCK_METHOD(void, AsyncResubscribe, (), (override)); - MOCK_METHOD(Status, - AsyncReportResourceUsage, - (const std::shared_ptr &data_ptr, - const StatusCallback &callback), - (override)); MOCK_METHOD(Status, AsyncGetAllResourceUsage, (const ItemCallback &callback), diff --git a/src/mock/ray/gcs/gcs_server/gcs_resource_manager.h b/src/mock/ray/gcs/gcs_server/gcs_resource_manager.h index d34421499c44d..3b22c23c53606 100644 --- a/src/mock/ray/gcs/gcs_server/gcs_resource_manager.h +++ b/src/mock/ray/gcs/gcs_server/gcs_resource_manager.h @@ -52,12 +52,6 @@ class MockGcsResourceManager : public GcsResourceManager { rpc::GetAllAvailableResourcesReply *reply, rpc::SendReplyCallback send_reply_callback), (override)); - MOCK_METHOD(void, - HandleReportResourceUsage, - (rpc::ReportResourceUsageRequest request, - rpc::ReportResourceUsageReply *reply, - rpc::SendReplyCallback send_reply_callback), - (override)); MOCK_METHOD(void, HandleGetAllResourceUsage, (rpc::GetAllResourceUsageRequest request, diff --git a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc index 82c8fc85e80d9..cce0091b6354c 100644 --- a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc +++ b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc @@ -192,9 +192,7 @@ TEST_P(GlobalStateAccessorTest, TestGetAllResourceUsage) { std::promise promise1; auto resources1 = std::make_shared(); resources1->set_node_id(node_table_data->node_id()); - RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage( - resources1, [&promise1](Status status) { promise1.set_value(status.ok()); })); - WaitReady(promise1.get_future(), timeout_ms_); + gcs_server_->GetGcsResourceManager().UpdateFromResourceView(*resources1); resources = global_state_->GetAllResourceUsage(); resource_usage_batch_data.ParseFromString(*resources.get()); @@ -208,9 +206,7 @@ TEST_P(GlobalStateAccessorTest, TestGetAllResourceUsage) { (*heartbeat2->mutable_resources_total())["GPU"] = 10; (*heartbeat2->mutable_resources_available())["CPU"] = 1; (*heartbeat2->mutable_resources_available())["GPU"] = 5; - RAY_CHECK_OK(gcs_client_->NodeResources().AsyncReportResourceUsage( - heartbeat2, [&promise2](Status status) { promise2.set_value(status.ok()); })); - WaitReady(promise2.get_future(), timeout_ms_); + gcs_server_->GetGcsResourceManager().UpdateFromResourceView(*heartbeat2); resources = global_state_->GetAllResourceUsage(); resource_usage_batch_data.ParseFromString(*resources.get()); diff --git a/src/ray/gcs/gcs_server/gcs_resource_manager.cc b/src/ray/gcs/gcs_server/gcs_resource_manager.cc index 783dbdba374e6..5e909a57b3e12 100644 --- a/src/ray/gcs/gcs_server/gcs_resource_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_resource_manager.cc @@ -329,8 +329,6 @@ std::string GcsResourceManager::DebugString() const { << counts_[CountType::GET_RESOURCES_REQUEST] << "\n- GetAllAvailableResources request count" << counts_[CountType::GET_ALL_AVAILABLE_RESOURCES_REQUEST] - << "\n- ReportResourceUsage request count: " - << counts_[CountType::REPORT_RESOURCE_USAGE_REQUEST] << "\n- GetAllResourceUsage request count: " << counts_[CountType::GET_ALL_RESOURCE_USAGE_REQUEST]; return stream.str(); diff --git a/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc index a995756dc3b20..a4153f5c96af2 100644 --- a/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_resource_manager_test.cc @@ -107,10 +107,10 @@ TEST_F(GcsResourceManagerTest, TestResourceUsageAPI) { gcs_resource_manager_->OnNodeAdd(*node); - rpc::ReportResourceUsageRequest report_request; - (*report_request.mutable_resources()->mutable_resources_available())["CPU"] = 2; - (*report_request.mutable_resources()->mutable_resources_total())["CPU"] = 2; - gcs_resource_manager_->UpdateNodeResourceUsage(node_id, report_request.resources()); + rpc::ResourcesData resources_data; + (*resources_data.mutable_resources_available())["CPU"] = 2; + (*resources_data.mutable_resources_total())["CPU"] = 2; + gcs_resource_manager_->UpdateNodeResourceUsage(node_id, resources_data); gcs_resource_manager_->HandleGetAllResourceUsage( get_all_request, &get_all_reply, send_reply_callback); @@ -123,7 +123,7 @@ TEST_F(GcsResourceManagerTest, TestResourceUsageAPI) { ASSERT_EQ(get_all_reply2.resource_usage_data().batch().size(), 0); // This will be ignored since the node is dead. - gcs_resource_manager_->UpdateNodeResourceUsage(node_id, report_request.resources()); + gcs_resource_manager_->UpdateNodeResourceUsage(node_id, resources_data); rpc::GetAllResourceUsageReply get_all_reply3; gcs_resource_manager_->HandleGetAllResourceUsage( get_all_request, &get_all_reply3, send_reply_callback); From f6e5934cf181311272919d91b2b5301a51faeb47 Mon Sep 17 00:00:00 2001 From: vitsai Date: Mon, 30 Oct 2023 21:49:06 +0000 Subject: [PATCH 6/7] remove virtual Signed-off-by: vitsai --- src/ray/raylet/scheduling/local_resource_manager.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ray/raylet/scheduling/local_resource_manager.h b/src/ray/raylet/scheduling/local_resource_manager.h index fb1824f12796d..353b935c732b4 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.h +++ b/src/ray/raylet/scheduling/local_resource_manager.h @@ -142,7 +142,7 @@ class LocalResourceManager : public syncer::ReporterInterface { int64_t after_version, syncer::MessageType message_type) const override; /// Populate resource usage. - virtual void PopulateResourceUsage(rpc::ResourcesData &resources_data) const; + void PopulateResourceUsage(rpc::ResourcesData &resources_data) const; /// Record the metrics. void RecordMetrics() const; From afb970cba84404af701a151951df0562f7fff102 Mon Sep 17 00:00:00 2001 From: vitsai Date: Wed, 1 Nov 2023 02:31:53 +0000 Subject: [PATCH 7/7] comments Signed-off-by: vitsai --- .../gcs/gcs_client/test/gcs_client_test.cc | 19 +++---------------- .../test/global_state_accessor_test.cc | 4 ++-- src/ray/gcs/gcs_server/gcs_server.h | 4 ++-- .../scheduling/local_resource_manager.h | 2 -- 4 files changed, 7 insertions(+), 22 deletions(-) diff --git a/src/ray/gcs/gcs_client/test/gcs_client_test.cc b/src/ray/gcs/gcs_client/test/gcs_client_test.cc index 639fbbbea2162..74bf428ee498a 100644 --- a/src/ray/gcs/gcs_client/test/gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/gcs_client_test.cc @@ -372,11 +372,6 @@ class GcsClientTest : public ::testing::TestWithParam { return resource_map; } - void ReportResourceUsage(const std::shared_ptr resources) { - // Do it from the server side. - gcs_server_->GetGcsResourceManager().UpdateFromResourceView(*resources); - } - std::vector GetAllAvailableResources() { std::promise promise; std::vector resources; @@ -452,14 +447,6 @@ class GcsClientTest : public ::testing::TestWithParam { std::unique_ptr client_io_service_thread_; std::unique_ptr client_io_service_; std::unique_ptr gcs_client_; - std::unique_ptr syncer_io_service_thread_; - std::unique_ptr syncer_io_service_; - - // Ray Syncer and reporter. - std::unique_ptr ray_syncer_; - std::unique_ptr ray_syncer_service_; - std::unique_ptr syncer_server_; - std::unique_ptr local_resource_manager_; // Timeout waiting for GCS server reply, default is 2s. const std::chrono::milliseconds timeout_ms_{2000}; @@ -618,7 +605,7 @@ TEST_P(GcsClientTest, TestGetAllAvailableResources) { (*resource->mutable_resources_available())["GPU"] = 10.0; (*resource->mutable_resources_total())["CPU"] = 1.0; (*resource->mutable_resources_total())["GPU"] = 10.0; - ReportResourceUsage(resource); + gcs_server_->UpdateGcsResourceManagerInTest(*resource); // Assert get all available resources right. std::vector resources = GetAllAvailableResources(); @@ -762,7 +749,7 @@ TEST_P(GcsClientTest, TestNodeTableResubscribe) { resources->set_node_id(node_info->node_id()); // Set this flag because GCS won't publish unchanged resources. resources->set_should_global_gc(true); - ReportResourceUsage(resources); + gcs_server_->UpdateGcsResourceManagerInTest(*resources); RestartGcsServer(); @@ -770,7 +757,7 @@ TEST_P(GcsClientTest, TestNodeTableResubscribe) { ASSERT_TRUE(RegisterNode(*node_info)); node_id = NodeID::FromBinary(node_info->node_id()); resources->set_node_id(node_info->node_id()); - ReportResourceUsage(resources); + gcs_server_->UpdateGcsResourceManagerInTest(*resources); WaitForExpectedCount(node_change_count, 2); } diff --git a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc index cce0091b6354c..0a53f86463638 100644 --- a/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc +++ b/src/ray/gcs/gcs_client/test/global_state_accessor_test.cc @@ -192,7 +192,7 @@ TEST_P(GlobalStateAccessorTest, TestGetAllResourceUsage) { std::promise promise1; auto resources1 = std::make_shared(); resources1->set_node_id(node_table_data->node_id()); - gcs_server_->GetGcsResourceManager().UpdateFromResourceView(*resources1); + gcs_server_->UpdateGcsResourceManagerInTest(*resources1); resources = global_state_->GetAllResourceUsage(); resource_usage_batch_data.ParseFromString(*resources.get()); @@ -206,7 +206,7 @@ TEST_P(GlobalStateAccessorTest, TestGetAllResourceUsage) { (*heartbeat2->mutable_resources_total())["GPU"] = 10; (*heartbeat2->mutable_resources_available())["CPU"] = 1; (*heartbeat2->mutable_resources_available())["GPU"] = 5; - gcs_server_->GetGcsResourceManager().UpdateFromResourceView(*heartbeat2); + gcs_server_->UpdateGcsResourceManagerInTest(*heartbeat2); resources = global_state_->GetAllResourceUsage(); resource_usage_batch_data.ParseFromString(*resources.get()); diff --git a/src/ray/gcs/gcs_server/gcs_server.h b/src/ray/gcs/gcs_server/gcs_server.h index ee517b24460fb..47342dbb2713e 100644 --- a/src/ray/gcs/gcs_server/gcs_server.h +++ b/src/ray/gcs/gcs_server/gcs_server.h @@ -110,9 +110,9 @@ class GcsServer { static constexpr char kInMemoryStorage[] = "memory"; static constexpr char kRedisStorage[] = "redis"; - GcsResourceManager &GetGcsResourceManager() { + void UpdateGcsResourceManagerInTest(const rpc::ResourcesData &resources) { RAY_CHECK(gcs_resource_manager_ != nullptr); - return *gcs_resource_manager_; + gcs_resource_manager_->UpdateFromResourceView(resources); } protected: diff --git a/src/ray/raylet/scheduling/local_resource_manager.h b/src/ray/raylet/scheduling/local_resource_manager.h index 353b935c732b4..13e611a90b461 100644 --- a/src/ray/raylet/scheduling/local_resource_manager.h +++ b/src/ray/raylet/scheduling/local_resource_manager.h @@ -209,8 +209,6 @@ class LocalResourceManager : public syncer::ReporterInterface { /// A map storing when the resource was last idle. absl::flat_hash_map> last_idle_times_; - /// Cached resources, used to compare with newest one in light heartbeat mode. - std::unique_ptr last_report_resources_; /// Function to get used object store memory. std::function get_used_object_store_memory_; /// Function to get whether the pull manager is at capacity.