Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[10/N][VirtualCluster] Implement GetVirtualClusters #424

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,17 @@ bool ExclusiveCluster::IsIdleNodeInstance(const std::string &job_cluster_id,
return job_cluster_id == kEmptyJobClusterId;
}

void ExclusiveCluster::ForeachJobCluster(
const std::function<void(const std::string &, const std::shared_ptr<JobCluster> &)>
&fn) const {
if (fn == nullptr) {
return;
}
for (const auto &[job_cluster_id, job_cluster] : job_clusters_) {
fn(job_cluster_id, job_cluster);
}
}

///////////////////////// MixedCluster /////////////////////////
bool MixedCluster::IsIdleNodeInstance(const std::string &job_cluster_id,
const gcs::NodeInstance &node_instance) const {
Expand Down Expand Up @@ -464,5 +475,45 @@ Status PrimaryCluster::RemoveLogicalCluster(const std::string &logical_cluster_i
return async_data_flusher_(std::move(data), std::move(callback));
}

void PrimaryCluster::GetVirtualClustersData(rpc::GetVirtualClustersRequest request,
GetVirtualClustersDataCallback callback) {
std::vector<std::shared_ptr<rpc::VirtualClusterTableData>> virtual_cluster_data_list;
auto virtual_cluster_id = request.virtual_cluster_id();
bool include_job_clusters = request.include_job_clusters();

auto visit_proto_data = [&](const VirtualCluster *cluster) {
if (include_job_clusters && cluster->GetMode() == rpc::AllocationMode::EXCLUSIVE) {
auto exclusive_cluster = dynamic_cast<const ExclusiveCluster *>(cluster);
exclusive_cluster->ForeachJobCluster(
[&](const std::string &_, const auto &job_cluster) {
callback(job_cluster->ToProto());
});
}
if (cluster->GetID() != kPrimaryClusterID) {
// Skip the primary cluster's proto data.
callback(cluster->ToProto());
}
};

if (virtual_cluster_id.empty()) {
// Get all virtual clusters data.
for (const auto &[_, logical_cluster] : logical_clusters_) {
visit_proto_data(logical_cluster.get());
}
visit_proto_data(this);
return;
}

if (virtual_cluster_id == kPrimaryClusterID) {
visit_proto_data(this);
return;
}

auto logical_cluster = GetLogicalCluster(virtual_cluster_id);
if (logical_cluster != nullptr) {
visit_proto_data(logical_cluster.get());
}
}

} // namespace gcs
} // namespace ray
13 changes: 11 additions & 2 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ using CreateOrUpdateVirtualClusterCallback =
std::function<void(const Status &, std::shared_ptr<rpc::VirtualClusterTableData>)>;

using RemoveVirtualClusterCallback = CreateOrUpdateVirtualClusterCallback;
using GetVirtualClustersDataCallback =
std::function<void(std::shared_ptr<rpc::VirtualClusterTableData>)>;

/// <template_id, _>
/// |
Expand Down Expand Up @@ -222,6 +224,11 @@ class ExclusiveCluster : public VirtualCluster {
/// \return The job cluster if it exists, otherwise return nullptr.
std::shared_ptr<JobCluster> GetJobCluster(const std::string &job_name) const;

/// Iterate all job clusters.
void ForeachJobCluster(
const std::function<void(const std::string &, const std::shared_ptr<JobCluster> &)>
&fn) const;

/// Check if the virtual cluster is in use.
///
/// \return True if the virtual cluster is in use, false otherwise.
Expand All @@ -231,8 +238,6 @@ class ExclusiveCluster : public VirtualCluster {
bool IsIdleNodeInstance(const std::string &job_cluster_id,
const gcs::NodeInstance &node_instance) const override;

/// The id of the virtual cluster.
std::string id_;
// The mapping from job cluster id to `JobCluster` instance.
absl::flat_hash_map<std::string, std::shared_ptr<JobCluster>> job_clusters_;
// The async data flusher.
Expand Down Expand Up @@ -295,6 +300,10 @@ class PrimaryCluster : public ExclusiveCluster {
Status RemoveLogicalCluster(const std::string &logical_cluster_id,
RemoveVirtualClusterCallback callback);

/// Get virtual cluster's proto data.
void GetVirtualClustersData(rpc::GetVirtualClustersRequest request,
GetVirtualClustersDataCallback callback);

/// Handle the node added event.
///
/// \param node The node that is added.
Expand Down
13 changes: 8 additions & 5 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,15 @@ void GcsVirtualClusterManager::HandleRemoveVirtualCluster(
}
}

void GcsVirtualClusterManager::HandleGetAllVirtualClusters(
rpc::GetAllVirtualClustersRequest request,
rpc::GetAllVirtualClustersReply *reply,
void GcsVirtualClusterManager::HandleGetVirtualClusters(
rpc::GetVirtualClustersRequest request,
rpc::GetVirtualClustersReply *reply,
rpc::SendReplyCallback send_reply_callback) {
RAY_LOG(DEBUG) << "Getting all virtual clusters.";
// TODO(Shanly): To be implement.
RAY_LOG(DEBUG) << "Getting virtual clusters.";
primary_cluster_->GetVirtualClustersData(
std::move(request), [reply, send_reply_callback](auto data) {
reply->add_virtual_cluster_data_list()->CopyFrom(*data);
});
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
}

Expand Down
6 changes: 3 additions & 3 deletions src/ray/gcs/gcs_server/gcs_virtual_cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,9 @@ class GcsVirtualClusterManager : public rpc::VirtualClusterInfoHandler {
rpc::RemoveVirtualClusterReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

void HandleGetAllVirtualClusters(rpc::GetAllVirtualClustersRequest request,
rpc::GetAllVirtualClustersReply *reply,
rpc::SendReplyCallback send_reply_callback) override;
void HandleGetVirtualClusters(rpc::GetVirtualClustersRequest request,
rpc::GetVirtualClustersReply *reply,
rpc::SendReplyCallback send_reply_callback) override;

Status VerifyRequest(const rpc::CreateOrUpdateVirtualClusterRequest &request);

Expand Down
Loading