diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc index b740dbd768fb..3cf548b6bf3b 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.cc @@ -302,6 +302,17 @@ bool ExclusiveCluster::IsIdleNodeInstance(const std::string &job_cluster_id, return job_cluster_id == kEmptyJobClusterId; } +void ExclusiveCluster::ForeachJobCluster( + const std::function &)> + &fn) const { + if (fn == nullptr) { + return; + } + for (const auto &[job_cluster_id, job_cluster] : job_clusters_) { + fn(job_cluster_id, job_cluster); + } +} + ///////////////////////// MixedCluster ///////////////////////// bool MixedCluster::IsIdleNodeInstance(const std::string &job_cluster_id, const gcs::NodeInstance &node_instance) const { @@ -464,5 +475,45 @@ Status PrimaryCluster::RemoveLogicalCluster(const std::string &logical_cluster_i return async_data_flusher_(std::move(data), std::move(callback)); } +void PrimaryCluster::GetVirtualClustersData(rpc::GetVirtualClustersRequest request, + GetVirtualClustersDataCallback callback) { + std::vector> virtual_cluster_data_list; + auto virtual_cluster_id = request.virtual_cluster_id(); + bool include_job_clusters = request.include_job_clusters(); + + auto visit_proto_data = [&](const VirtualCluster *cluster) { + if (include_job_clusters && cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) { + auto exclusive_cluster = dynamic_cast(cluster); + exclusive_cluster->ForeachJobCluster( + [&](const std::string &_, const auto &job_cluster) { + callback(job_cluster->ToProto()); + }); + } + if (cluster->GetID() != kPrimaryClusterID) { + // Skip the primary cluster's proto data. + callback(cluster->ToProto()); + } + }; + + if (virtual_cluster_id.empty()) { + // Get all virtual clusters data. + for (const auto &[_, logical_cluster] : logical_clusters_) { + visit_proto_data(logical_cluster.get()); + } + visit_proto_data(this); + return; + } + + if (virtual_cluster_id == kPrimaryClusterID) { + visit_proto_data(this); + return; + } + + auto logical_cluster = GetLogicalCluster(virtual_cluster_id); + if (logical_cluster != nullptr) { + visit_proto_data(logical_cluster.get()); + } +} + } // namespace gcs } // namespace ray \ No newline at end of file diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h index 525ce7793750..040105eaf183 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster.h +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster.h @@ -61,6 +61,8 @@ using CreateOrUpdateVirtualClusterCallback = std::function)>; using RemoveVirtualClusterCallback = CreateOrUpdateVirtualClusterCallback; +using GetVirtualClustersDataCallback = + std::function)>; /// /// | @@ -222,6 +224,11 @@ class ExclusiveCluster : public VirtualCluster { /// \return The job cluster if it exists, otherwise return nullptr. std::shared_ptr GetJobCluster(const std::string &job_name) const; + /// Iterate all job clusters. + void ForeachJobCluster( + const std::function &)> + &fn) const; + /// Check if the virtual cluster is in use. /// /// \return True if the virtual cluster is in use, false otherwise. @@ -231,8 +238,6 @@ class ExclusiveCluster : public VirtualCluster { bool IsIdleNodeInstance(const std::string &job_cluster_id, const gcs::NodeInstance &node_instance) const override; - /// The id of the virtual cluster. - std::string id_; // The mapping from job cluster id to `JobCluster` instance. absl::flat_hash_map> job_clusters_; // The async data flusher. @@ -295,6 +300,10 @@ class PrimaryCluster : public ExclusiveCluster { Status RemoveLogicalCluster(const std::string &logical_cluster_id, RemoveVirtualClusterCallback callback); + /// Get virtual cluster's proto data. + void GetVirtualClustersData(rpc::GetVirtualClustersRequest request, + GetVirtualClustersDataCallback callback); + /// Handle the node added event. /// /// \param node The node that is added. diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc b/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc index 72b8e0f5ccd8..267177308c91 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc @@ -97,12 +97,15 @@ void GcsVirtualClusterManager::HandleRemoveVirtualCluster( } } -void GcsVirtualClusterManager::HandleGetAllVirtualClusters( - rpc::GetAllVirtualClustersRequest request, - rpc::GetAllVirtualClustersReply *reply, +void GcsVirtualClusterManager::HandleGetVirtualClusters( + rpc::GetVirtualClustersRequest request, + rpc::GetVirtualClustersReply *reply, rpc::SendReplyCallback send_reply_callback) { - RAY_LOG(DEBUG) << "Getting all virtual clusters."; - // TODO(Shanly): To be implement. + RAY_LOG(DEBUG) << "Getting virtual clusters."; + primary_cluster_->GetVirtualClustersData( + std::move(request), [reply, send_reply_callback](auto data) { + reply->add_virtual_cluster_data_list()->CopyFrom(*data); + }); GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK()); } diff --git a/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.h b/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.h index 3a7e2068d94b..9c74d6e83bf8 100644 --- a/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.h +++ b/src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.h @@ -61,9 +61,9 @@ class GcsVirtualClusterManager : public rpc::VirtualClusterInfoHandler { rpc::RemoveVirtualClusterReply *reply, rpc::SendReplyCallback send_reply_callback) override; - void HandleGetAllVirtualClusters(rpc::GetAllVirtualClustersRequest request, - rpc::GetAllVirtualClustersReply *reply, - rpc::SendReplyCallback send_reply_callback) override; + void HandleGetVirtualClusters(rpc::GetVirtualClustersRequest request, + rpc::GetVirtualClustersReply *reply, + rpc::SendReplyCallback send_reply_callback) override; Status VerifyRequest(const rpc::CreateOrUpdateVirtualClusterRequest &request); diff --git a/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc b/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc index a9d7a698e729..187858cb447b 100644 --- a/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc +++ b/src/ray/gcs/gcs_server/test/gcs_virtual_cluster_manager_test.cc @@ -41,24 +41,65 @@ class GcsVirtualClusterManagerTest : public ::testing::Test { std::unique_ptr gcs_virtual_cluster_manager_; }; -class PrimaryClusterTest : public ::testing::Test {}; +class VirtualClusterTest : public ::testing::Test { + public: + std::shared_ptr InitPrimaryCluster( + size_t node_count, + size_t template_count, + absl::flat_hash_map>> + *template_id_to_nodes = nullptr) { + auto primary_cluster = + std::make_shared([](auto data, auto callback) { + callback(Status::OK(), data); + return Status::OK(); + }); -TEST_F(PrimaryClusterTest, NodeAddAndRemove) { - auto primary_cluster = std::make_shared( - [](auto data, auto callback) { return Status::OK(); }); + for (size_t i = 0; i < node_count; ++i) { + auto node = Mocker::GenNodeInfo(); + auto template_id = std::to_string(i % template_count); + node->set_template_id(template_id); + primary_cluster->OnNodeAdd(*node); + if (template_id_to_nodes != nullptr) { + (*template_id_to_nodes)[template_id].emplace(NodeID::FromBinary(node->node_id()), + node); + } + } + return primary_cluster; + } - size_t node_count = 200; - size_t template_count = 10; + Status CreateVirtualCluster( + std::shared_ptr primary_cluster, + const std::string &virtual_cluster_id, + const absl::flat_hash_map &replica_sets, + rpc::AllocationMode allocation_mode = rpc::AllocationMode::EXCLUSIVE) { + rpc::CreateOrUpdateVirtualClusterRequest request; + request.set_virtual_cluster_id(virtual_cluster_id); + request.set_mode(rpc::AllocationMode::EXCLUSIVE); + request.set_revision(0); + request.mutable_replica_sets()->insert(replica_sets.begin(), replica_sets.end()); + auto status = primary_cluster->CreateOrUpdateVirtualCluster( + request, + [this](const Status &status, std::shared_ptr data) { + ASSERT_TRUE(status.ok()); + }); + return status; + } +}; + +class PrimaryClusterTest : public VirtualClusterTest { + public: + using VirtualClusterTest::VirtualClusterTest; +}; + +TEST_F(PrimaryClusterTest, NodeAddAndRemove) { absl::flat_hash_map>> template_id_to_nodes; - for (size_t i = 0; i < node_count; ++i) { - auto node = Mocker::GenNodeInfo(); - auto template_id = std::to_string(i % template_count); - node->set_template_id(template_id); - primary_cluster->OnNodeAdd(*node); - template_id_to_nodes[template_id].emplace(NodeID::FromBinary(node->node_id()), node); - } + size_t node_count = 200; + size_t template_count = 10; + auto primary_cluster = + InitPrimaryCluster(node_count, template_count, &template_id_to_nodes); const auto &visiable_node_instances = primary_cluster->GetVisibleNodeInstances(); EXPECT_EQ(visiable_node_instances.size(), template_count); @@ -124,39 +165,19 @@ TEST_F(PrimaryClusterTest, NodeAddAndRemove) { } TEST_F(PrimaryClusterTest, CreateOrUpdateVirtualCluster) { - auto primary_cluster = - std::make_shared([](auto data, auto callback) { - callback(Status::OK(), data); - return Status::OK(); - }); - size_t node_count = 200; size_t template_count = 10; - for (size_t i = 0; i < node_count; ++i) { - auto node = Mocker::GenNodeInfo(); - auto template_id = std::to_string(i % template_count); - node->set_template_id(template_id); - primary_cluster->OnNodeAdd(*node); - } + auto primary_cluster = InitPrimaryCluster(node_count, template_count); std::string template_id_0 = "0"; std::string template_id_1 = "1"; size_t node_count_per_template = node_count / template_count; - { - rpc::CreateOrUpdateVirtualClusterRequest request; - request.set_virtual_cluster_id("virtual_cluster_id_0"); - request.set_mode(rpc::AllocationMode::EXCLUSIVE); - request.set_revision(0); - request.mutable_replica_sets()->insert({template_id_0, 5}); - request.mutable_replica_sets()->insert({template_id_1, 10}); - auto status = primary_cluster->CreateOrUpdateVirtualCluster( - request, - [this](const Status &status, std::shared_ptr data) { - ASSERT_TRUE(status.ok()); - }); - ASSERT_TRUE(status.ok()); - } + std::string virtual_cluster_id_0 = "virtual_cluster_id_0"; + ASSERT_TRUE(CreateVirtualCluster(primary_cluster, + virtual_cluster_id_0, + {{template_id_0, 5}, {template_id_1, 10}}) + .ok()); { // Check the logical cluster virtual_cluster_id_0 visible node instances. @@ -194,21 +215,13 @@ TEST_F(PrimaryClusterTest, CreateOrUpdateVirtualCluster) { EXPECT_NE(primary_cluster->GetRevision(), 0); } - { - // Create virtual_cluster_id_1 and check that the status is ok. - rpc::CreateOrUpdateVirtualClusterRequest request; - request.set_virtual_cluster_id("virtual_cluster_id_1"); - request.set_mode(rpc::AllocationMode::EXCLUSIVE); - request.set_revision(0); - request.mutable_replica_sets()->insert({template_id_0, node_count_per_template - 5}); - request.mutable_replica_sets()->insert({template_id_1, node_count_per_template - 10}); - auto status = primary_cluster->CreateOrUpdateVirtualCluster( - request, - [this](const Status &status, std::shared_ptr data) { - ASSERT_TRUE(status.ok()); - }); - ASSERT_TRUE(status.ok()); - } + // Create virtual_cluster_id_1 and check that the status is ok. + std::string virtual_cluster_id_1 = "virtual_cluster_id_1"; + ASSERT_TRUE(CreateVirtualCluster(primary_cluster, + virtual_cluster_id_1, + {{template_id_0, node_count_per_template - 5}, + {template_id_1, node_count_per_template - 10}}) + .ok()); { // Check the logical cluster virtual_cluster_id_1 visible node instances. @@ -242,21 +255,14 @@ TEST_F(PrimaryClusterTest, CreateOrUpdateVirtualCluster) { EXPECT_NE(primary_cluster->GetRevision(), 0); } - { - // Create virtual_cluster_id_2 and check that the status is succeed. - rpc::CreateOrUpdateVirtualClusterRequest request; - request.set_virtual_cluster_id("virtual_cluster_id_2"); - request.set_mode(rpc::AllocationMode::EXCLUSIVE); - request.set_revision(0); - request.mutable_replica_sets()->insert({template_id_0, 0}); - request.mutable_replica_sets()->insert({template_id_1, 0}); - auto status = primary_cluster->CreateOrUpdateVirtualCluster( - request, - [this](const Status &status, std::shared_ptr data) { - ASSERT_TRUE(status.ok()); - }); - ASSERT_TRUE(status.ok()); + // Create virtual_cluster_id_2 and check that the status is succeed. + std::string virtual_cluster_id_2 = "virtual_cluster_id_2"; + ASSERT_TRUE(CreateVirtualCluster(primary_cluster, + virtual_cluster_id_2, + {{template_id_0, 0}, {template_id_1, 0}}) + .ok()); + { auto logical_cluster = primary_cluster->GetLogicalCluster("virtual_cluster_id_2"); ASSERT_NE(logical_cluster, nullptr); ASSERT_EQ(logical_cluster->GetVisibleNodeInstances().size(), 2); @@ -270,37 +276,19 @@ TEST_F(PrimaryClusterTest, CreateOrUpdateVirtualCluster) { { // Create virtual_cluster_id_3 and check that the status is failed. - rpc::CreateOrUpdateVirtualClusterRequest request; - request.set_virtual_cluster_id("virtual_cluster_id_3"); - request.set_mode(rpc::AllocationMode::EXCLUSIVE); - request.set_revision(0); - request.mutable_replica_sets()->insert({template_id_0, 1}); - request.mutable_replica_sets()->insert({template_id_1, 0}); - auto status = primary_cluster->CreateOrUpdateVirtualCluster( - request, - [this](const Status &status, std::shared_ptr data) { - ASSERT_TRUE(status.ok()); - }); - ASSERT_FALSE(status.ok()); - ASSERT_EQ(primary_cluster->GetLogicalCluster("virtual_cluster_id_3"), nullptr); + std::string virtual_cluster_id_3 = "virtual_cluster_id_3"; + ASSERT_FALSE(CreateVirtualCluster(primary_cluster, + virtual_cluster_id_3, + {{template_id_0, 5}, {template_id_1, 10}}) + .ok()); + ASSERT_EQ(primary_cluster->GetLogicalCluster(virtual_cluster_id_3), nullptr); } } TEST_F(PrimaryClusterTest, CreateJobCluster) { - auto primary_cluster = - std::make_shared([](auto data, auto callback) { - callback(Status::OK(), data); - return Status::OK(); - }); - size_t node_count = 200; size_t template_count = 10; - for (size_t i = 0; i < node_count; ++i) { - auto node = Mocker::GenNodeInfo(); - auto template_id = std::to_string(i % template_count); - node->set_template_id(template_id); - primary_cluster->OnNodeAdd(*node); - } + auto primary_cluster = InitPrimaryCluster(node_count, template_count); std::string template_id_0 = "0"; std::string template_id_1 = "1"; @@ -410,20 +398,9 @@ TEST_F(PrimaryClusterTest, CreateJobCluster) { } TEST_F(PrimaryClusterTest, RemoveJobCluster) { - auto primary_cluster = - std::make_shared([](auto data, auto callback) { - callback(Status::OK(), data); - return Status::OK(); - }); - size_t node_count = 200; size_t template_count = 10; - for (size_t i = 0; i < node_count; ++i) { - auto node = Mocker::GenNodeInfo(); - auto template_id = std::to_string(i % template_count); - node->set_template_id(template_id); - primary_cluster->OnNodeAdd(*node); - } + auto primary_cluster = InitPrimaryCluster(node_count, template_count); std::string template_id_0 = "0"; std::string template_id_1 = "1"; @@ -515,41 +492,20 @@ TEST_F(PrimaryClusterTest, RemoveJobCluster) { } TEST_F(PrimaryClusterTest, RemoveLogicalCluster) { - auto primary_cluster = - std::make_shared([](auto data, auto callback) { - callback(Status::OK(), data); - return Status::OK(); - }); - size_t node_count = 200; size_t template_count = 10; - for (size_t i = 0; i < node_count; ++i) { - auto node = Mocker::GenNodeInfo(); - auto template_id = std::to_string(i % template_count); - node->set_template_id(template_id); - primary_cluster->OnNodeAdd(*node); - } + auto primary_cluster = InitPrimaryCluster(node_count, template_count); std::string template_id_0 = "0"; std::string template_id_1 = "1"; size_t node_count_per_template = node_count / template_count; + // Create virtual_cluster_id_0 and check that the status is succeed. std::string virtual_cluster_id_0 = "virtual_cluster_id_0"; - - { - rpc::CreateOrUpdateVirtualClusterRequest request; - request.set_virtual_cluster_id(virtual_cluster_id_0); - request.set_mode(rpc::AllocationMode::EXCLUSIVE); - request.set_revision(0); - request.mutable_replica_sets()->insert({template_id_0, 5}); - request.mutable_replica_sets()->insert({template_id_1, 10}); - auto status = primary_cluster->CreateOrUpdateVirtualCluster( - request, - [this](const Status &status, std::shared_ptr data) { - ASSERT_TRUE(status.ok()); - }); - ASSERT_TRUE(status.ok()); - } + ASSERT_TRUE(CreateVirtualCluster(primary_cluster, + virtual_cluster_id_0, + {{template_id_0, 5}, {template_id_1, 10}}) + .ok()); { // Check the logical cluster virtual_cluster_id_0 visible node instances. @@ -624,5 +580,74 @@ TEST_F(PrimaryClusterTest, RemoveLogicalCluster) { } } +TEST_F(PrimaryClusterTest, GetVirtualClusters) { + size_t node_count = 200; + size_t template_count = 10; + auto primary_cluster = InitPrimaryCluster(node_count, template_count); + + std::string template_id_0 = "0"; + std::string template_id_1 = "1"; + + std::string virtual_cluster_id_0 = "virtual_cluster_id_0"; + std::string virtual_cluster_id_1 = "virtual_cluster_id_1"; + + ASSERT_TRUE(CreateVirtualCluster(primary_cluster, + virtual_cluster_id_0, + {{template_id_0, 5}, {template_id_1, 5}}) + .ok()); + + ASSERT_TRUE(CreateVirtualCluster(primary_cluster, + virtual_cluster_id_1, + {{template_id_0, 5}, {template_id_1, 5}}) + .ok()); + + ASSERT_TRUE(primary_cluster + ->CreateJobCluster("job_0", + {{template_id_0, 10}, {template_id_1, 10}}, + [this](const Status &status, auto data) { + ASSERT_TRUE(status.ok()); + }) + .ok()); + + auto virtual_cluster_0 = std::dynamic_pointer_cast( + primary_cluster->GetLogicalCluster(virtual_cluster_id_0)); + ASSERT_TRUE(virtual_cluster_0 != nullptr); + ASSERT_TRUE(virtual_cluster_0 + ->CreateJobCluster("job_1", + {{template_id_0, 2}, {template_id_1, 2}}, + [this](const Status &status, auto data) { + ASSERT_TRUE(status.ok()); + }) + .ok()); + + { + rpc::GetVirtualClustersRequest request; + request.set_virtual_cluster_id(virtual_cluster_id_0); + request.set_include_job_clusters(false); + absl::flat_hash_map> + virtual_clusters_data_map; + primary_cluster->GetVirtualClustersData( + request, [this, &virtual_clusters_data_map](auto data) { + RAY_LOG(INFO) << "xxx: " << data->id(); + virtual_clusters_data_map.emplace(data->id(), data); + }); + ASSERT_EQ(virtual_clusters_data_map.size(), 1); + ASSERT_TRUE(virtual_clusters_data_map.contains(virtual_cluster_id_0)); + + virtual_clusters_data_map.clear(); + request.set_include_job_clusters(true); + primary_cluster->GetVirtualClustersData( + request, [this, &virtual_clusters_data_map](auto data) { + virtual_clusters_data_map.emplace(data->id(), data); + }); + ASSERT_EQ(virtual_clusters_data_map.size(), 2); + ASSERT_TRUE(virtual_clusters_data_map.contains(virtual_cluster_id_0)); + + auto job_cluster = virtual_cluster_0->GetJobCluster("job_1"); + ASSERT_TRUE(job_cluster != nullptr); + ASSERT_TRUE(virtual_clusters_data_map.contains(job_cluster->GetID())); + } +} + } // namespace gcs } // namespace ray diff --git a/src/ray/protobuf/gcs_service.proto b/src/ray/protobuf/gcs_service.proto index f9ee1a290a57..8ba6d0146da0 100644 --- a/src/ray/protobuf/gcs_service.proto +++ b/src/ray/protobuf/gcs_service.proto @@ -846,10 +846,15 @@ message RemoveVirtualClusterReply { GcsStatus status = 1; } -message GetAllVirtualClustersRequest { +message GetVirtualClustersRequest { + // ID of the virtual cluster to get. + // It will reply all clusters data if virtual_cluster_id is empty. + string virtual_cluster_id = 1; + // Wether include job clusters. + bool include_job_clusters = 2; } -message GetAllVirtualClustersReply { +message GetVirtualClustersReply { GcsStatus status = 1; repeated VirtualClusterTableData virtual_cluster_data_list = 2; } @@ -861,5 +866,5 @@ service VirtualClusterInfoGcsService { // Remove a virtual cluster. rpc RemoveVirtualCluster(RemoveVirtualClusterRequest) returns (RemoveVirtualClusterReply); // Get all the virtual clusters. - rpc GetAllVirtualClusters(GetAllVirtualClustersRequest) returns (GetAllVirtualClustersReply); + rpc GetVirtualClusters(GetVirtualClustersRequest) returns (GetVirtualClustersReply); } diff --git a/src/ray/rpc/gcs_server/gcs_rpc_server.h b/src/ray/rpc/gcs_server/gcs_rpc_server.h index 9acba3306deb..c8404c28882b 100644 --- a/src/ray/rpc/gcs_server/gcs_rpc_server.h +++ b/src/ray/rpc/gcs_server/gcs_rpc_server.h @@ -730,9 +730,9 @@ class VirtualClusterInfoGcsServiceHandler { RemoveVirtualClusterReply *reply, SendReplyCallback send_reply_callback) = 0; - virtual void HandleGetAllVirtualClusters(GetAllVirtualClustersRequest request, - GetAllVirtualClustersReply *reply, - SendReplyCallback send_reply_callback) = 0; + virtual void HandleGetVirtualClusters(GetVirtualClustersRequest request, + GetVirtualClustersReply *reply, + SendReplyCallback send_reply_callback) = 0; }; class VirtualClusterInfoGrpcService : public GrpcService { @@ -753,7 +753,7 @@ class VirtualClusterInfoGrpcService : public GrpcService { const ClusterID &cluster_id) override { VIRTUAL_CLUSTER_SERVICE_RPC_HANDLER(CreateOrUpdateVirtualCluster); VIRTUAL_CLUSTER_SERVICE_RPC_HANDLER(RemoveVirtualCluster); - VIRTUAL_CLUSTER_SERVICE_RPC_HANDLER(GetAllVirtualClusters); + VIRTUAL_CLUSTER_SERVICE_RPC_HANDLER(GetVirtualClusters); } private: