Skip to content

Commit

Permalink
[10/N][VirtualCluster] Implement GetVirtualClusters (#424)
Browse files Browse the repository at this point in the history
Signed-off-by: 黑驰 <senlin.zsl@antgroup.com>
  • Loading branch information
wumuzi520 authored Dec 19, 2024
1 parent 02a4262 commit d439fdd
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 149 deletions.
51 changes: 51 additions & 0 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,17 @@ bool ExclusiveCluster::IsIdleNodeInstance(const std::string &job_cluster_id,
return job_cluster_id == kEmptyJobClusterId;
}

void ExclusiveCluster::ForeachJobCluster(
const std::function<void(const std::string &, const std::shared_ptr<JobCluster> &)>
&fn) const {
if (fn == nullptr) {
return;
}
for (const auto &[job_cluster_id, job_cluster] : job_clusters_) {
fn(job_cluster_id, job_cluster);
}
}

///////////////////////// MixedCluster /////////////////////////
bool MixedCluster::IsIdleNodeInstance(const std::string &job_cluster_id,
const gcs::NodeInstance &node_instance) const {
Expand Down Expand Up @@ -464,5 +475,45 @@ Status PrimaryCluster::RemoveLogicalCluster(const std::string &logical_cluster_i
return async_data_flusher_(std::move(data), std::move(callback));
}

void PrimaryCluster::GetVirtualClustersData(rpc::GetVirtualClustersRequest request,
GetVirtualClustersDataCallback callback) {
std::vector<std::shared_ptr<rpc::VirtualClusterTableData>> virtual_cluster_data_list;
auto virtual_cluster_id = request.virtual_cluster_id();
bool include_job_clusters = request.include_job_clusters();

auto visit_proto_data = [&](const VirtualCluster *cluster) {
if (include_job_clusters && cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) {
auto exclusive_cluster = dynamic_cast<const ExclusiveCluster *>(cluster);
exclusive_cluster->ForeachJobCluster(
[&](const std::string &_, const auto &job_cluster) {
callback(job_cluster->ToProto());
});
}
if (cluster->GetID() != kPrimaryClusterID) {
// Skip the primary cluster's proto data.
callback(cluster->ToProto());
}
};

if (virtual_cluster_id.empty()) {
// Get all virtual clusters data.
for (const auto &[_, logical_cluster] : logical_clusters_) {
visit_proto_data(logical_cluster.get());
}
visit_proto_data(this);
return;
}

if (virtual_cluster_id == kPrimaryClusterID) {
visit_proto_data(this);
return;
}

auto logical_cluster = GetLogicalCluster(virtual_cluster_id);
if (logical_cluster != nullptr) {
visit_proto_data(logical_cluster.get());
}
}

} // namespace gcs
} // namespace ray
13 changes: 11 additions & 2 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ using CreateOrUpdateVirtualClusterCallback =
std::function<void(const Status &, std::shared_ptr<rpc::VirtualClusterTableData>)>;

using RemoveVirtualClusterCallback = CreateOrUpdateVirtualClusterCallback;
using GetVirtualClustersDataCallback =
std::function<void(std::shared_ptr<rpc::VirtualClusterTableData>)>;

/// <template_id, _>
/// |
Expand Down Expand Up @@ -222,6 +224,11 @@ class ExclusiveCluster : public VirtualCluster {
/// \return The job cluster if it exists, otherwise return nullptr.
std::shared_ptr<JobCluster> GetJobCluster(const std::string &job_name) const;

/// Iterate all job clusters.
void ForeachJobCluster(
const std::function<void(const std::string &, const std::shared_ptr<JobCluster> &)>
&fn) const;

/// Check if the virtual cluster is in use.
///
/// \return True if the virtual cluster is in use, false otherwise.
Expand All @@ -231,8 +238,6 @@ class ExclusiveCluster : public VirtualCluster {
bool IsIdleNodeInstance(const std::string &job_cluster_id,
const gcs::NodeInstance &node_instance) const override;

/// The id of the virtual cluster.
std::string id_;
// The mapping from job cluster id to `JobCluster` instance.
absl::flat_hash_map<std::string, std::shared_ptr<JobCluster>> job_clusters_;
// The async data flusher.
Expand Down Expand Up @@ -295,6 +300,10 @@ class PrimaryCluster : public ExclusiveCluster {
Status RemoveLogicalCluster(const std::string &logical_cluster_id,
RemoveVirtualClusterCallback callback);

/// Get virtual cluster's proto data.
void GetVirtualClustersData(rpc::GetVirtualClustersRequest request,
GetVirtualClustersDataCallback callback);

/// Handle the node added event.
///
/// \param node The node that is added.
Expand Down
13 changes: 8 additions & 5 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,15 @@ void GcsVirtualClusterManager::HandleRemoveVirtualCluster(
}
}

void GcsVirtualClusterManager::HandleGetAllVirtualClusters(
rpc::GetAllVirtualClustersRequest request,
rpc::GetAllVirtualClustersReply *reply,
void GcsVirtualClusterManager::HandleGetVirtualClusters(
rpc::GetVirtualClustersRequest request,
rpc::GetVirtualClustersReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_LOG(DEBUG) << "Getting all virtual clusters.";
// TODO(Shanly): To be implement.
RAY_LOG(DEBUG) << "Getting virtual clusters.";
primary_cluster_->GetVirtualClustersData(
std::move(request), [reply, send_reply_callback](auto data) {
reply->add_virtual_cluster_data_list()->CopyFrom(*data);
});
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ class GcsVirtualClusterManager : public rpc::VirtualClusterInfoHandler {
rpc::RemoveVirtualClusterReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleGetAllVirtualClusters(rpc::GetAllVirtualClustersRequest request,
rpc::GetAllVirtualClustersReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleGetVirtualClusters(rpc::GetVirtualClustersRequest request,
rpc::GetVirtualClustersReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

Status VerifyRequest(const rpc::CreateOrUpdateVirtualClusterRequest &request);

Expand Down
Loading

0 comments on commit d439fdd

Please sign in to comment.