diff --git a/rs/cli/src/clients.rs b/rs/cli/src/clients.rs deleted file mode 100644 index d36a9feb2..000000000 --- a/rs/cli/src/clients.rs +++ /dev/null @@ -1,146 +0,0 @@ -use async_trait::async_trait; -use decentralization::SubnetChangeResponse; -use ic_base_types::PrincipalId; -use ic_management_types::{ - requests::{MembershipReplaceRequest, SubnetCreateRequest, SubnetResizeRequest}, - Artifact, Network, NetworkError, Release, TopologyChangeProposal, -}; -use log::error; -use serde::de::DeserializeOwned; - -#[derive(Clone)] -pub struct DashboardBackendClient { - pub(crate) url: reqwest::Url, -} - -impl DashboardBackendClient { - // Only used in tests, which should be cleaned up together with this code. - #[allow(dead_code)] - pub fn new(network: &Network, dev: bool) -> DashboardBackendClient { - Self { - url: reqwest::Url::parse(if !dev { - "https://dashboard.internal.dfinity.network/" - } else { - "http://localhost:17000/" - }) - .expect("invalid base url") - .join("api/proxy/registry/") - .expect("failed to join url") - .join(&network.name) - .expect("failed to join url"), - } - } - - pub fn new_with_backend_url(url: String) -> Self { - Self { - url: reqwest::Url::parse(&url).unwrap(), - } - } - - pub async fn subnet_pending_action(&self, subnet: PrincipalId) -> anyhow::Result> { - reqwest::Client::new() - .get( - self.url - .join(&format!("subnet/{subnet}/pending_action")) - .map_err(|e| anyhow::anyhow!(e))?, - ) - .rest_send() - .await - } - - pub async fn membership_replace(&self, request: MembershipReplaceRequest) -> anyhow::Result { - reqwest::Client::new() - .post(self.url.join("subnet/membership/replace").map_err(|e| anyhow::anyhow!(e))?) - .json(&request) - .rest_send() - .await - } - - pub async fn subnet_resize(&self, request: SubnetResizeRequest) -> anyhow::Result { - reqwest::Client::new() - .post(self.url.join("subnet/membership/resize").map_err(|e| anyhow::anyhow!(e))?) - .json(&request) - .rest_send() - .await - } - - pub async fn subnet_create(&self, request: SubnetCreateRequest) -> anyhow::Result { - reqwest::Client::new() - .post(self.url.join("subnet/create").map_err(|e| anyhow::anyhow!(e))?) - .json(&request) - .rest_send() - .await - } - - pub async fn get_retireable_versions(&self, release_artifact: &Artifact) -> anyhow::Result> { - reqwest::Client::new() - .get( - self.url - .join(&format!("release/retireable/{}", release_artifact)) - .map_err(|e| anyhow::anyhow!(e))?, - ) - .rest_send() - .await - } - - pub async fn get_nns_replica_version(&self) -> anyhow::Result { - reqwest::Client::new() - .get(self.url.join("release/versions/nns").map_err(|e| anyhow::anyhow!(e))?) - .rest_send() - .await - } -} - -#[async_trait] -trait RESTRequestBuilder { - async fn rest_send(self) -> anyhow::Result; -} - -#[async_trait] -impl RESTRequestBuilder for reqwest::RequestBuilder { - async fn rest_send(self) -> anyhow::Result { - let response_result = self.send().await?; - if let Err(e) = response_result.error_for_status_ref() { - let response = response_result.text().await?; - match serde_json::from_str(&response) { - Ok(NetworkError::ResizeFailed(s)) => { - error!("{}", s); - Err(anyhow::anyhow!("failed request (error: {})", e)) - } - _ => Err(anyhow::anyhow!("failed request (error: {}, response: {})", e, response)), - } - } else { - response_result.text().await.map_err(|e| anyhow::anyhow!(e)).and_then(|body| { - serde_json::from_str::(&body) - .map_err(|e| anyhow::anyhow!("Error decoding {} from backend output: {}\n{}", std::any::type_name::(), body, e)) - }) - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn dashboard_backend_client_url() { - let mainnet = Network::new("mainnet", &vec![]).await.expect("failed to create mainnet network"); - let staging = Network::new("staging", &vec![]).await.expect("failed to create staging network"); - assert_eq!( - DashboardBackendClient::new(&mainnet, false).url.to_string(), - "https://dashboard.internal.dfinity.network/api/proxy/registry/mainnet" - ); - assert_eq!( - DashboardBackendClient::new(&staging, false).url.to_string(), - "https://dashboard.internal.dfinity.network/api/proxy/registry/staging" - ); - assert_eq!( - DashboardBackendClient::new(&mainnet, true).url.to_string(), - "http://localhost:17000/api/proxy/registry/mainnet" - ); - assert_eq!( - DashboardBackendClient::new(&staging, true).url.to_string(), - "http://localhost:17000/api/proxy/registry/staging" - ); - } -} diff --git a/rs/cli/src/lib.rs b/rs/cli/src/lib.rs index 8af5816d3..cb8166b20 100644 --- a/rs/cli/src/lib.rs +++ b/rs/cli/src/lib.rs @@ -1,5 +1,4 @@ pub mod cli; -pub mod clients; pub(crate) mod defaults; pub mod detect_neuron; pub mod general; @@ -9,13 +8,3 @@ pub mod ops_subnet_node_replace; pub mod parsed_cli; pub mod registry_dump; pub mod runner; - -/// Get a localhost socket address with random, unused port. -pub fn local_unused_port() -> u16 { - let addr: std::net::SocketAddr = "127.0.0.1:0".parse().unwrap(); - let socket = socket2::Socket::new(socket2::Domain::IPV4, socket2::Type::STREAM, Some(socket2::Protocol::TCP)).unwrap(); - socket.bind(&addr.into()).unwrap(); - socket.set_reuse_address(true).unwrap(); - let tcp = std::net::TcpListener::from(socket); - tcp.local_addr().unwrap().port() -} diff --git a/rs/cli/src/main.rs b/rs/cli/src/main.rs index cb6770511..501290043 100644 --- a/rs/cli/src/main.rs +++ b/rs/cli/src/main.rs @@ -1,6 +1,6 @@ use crate::ic_admin::IcAdminWrapper; use clap::{error::ErrorKind, CommandFactory, Parser}; -use decentralization::subnets::NodesRemover; +use decentralization::subnets::{MembershipReplace, NodesRemover, ReplaceTarget}; use dotenv::dotenv; use dre::cli::proposals::ProposalStatus; use dre::detect_neuron::Auth; @@ -80,7 +80,7 @@ async fn main() -> Result<(), anyhow::Error> { .await .expect("Failed to create a runner"); - let r = match &cli_opts.subcommand { + match &cli_opts.subcommand { // Covered above cli::Commands::Upgrade => Ok(()), cli::Commands::DerToPrincipal { path } => { @@ -139,12 +139,12 @@ async fn main() -> Result<(), anyhow::Error> { let min_nakamoto_coefficients = parse_min_nakamoto_coefficients(&mut cmd, min_nakamoto_coefficients); runner_instance .membership_replace( - ic_management_types::requests::MembershipReplaceRequest { + MembershipReplace { target: match &subnet.id { - Some(subnet) => ic_management_types::requests::ReplaceTarget::Subnet(*subnet), + Some(subnet) => ReplaceTarget::Subnet(*subnet), None => { if let Some(motivation) = motivation.clone() { - ic_management_types::requests::ReplaceTarget::Nodes { + ReplaceTarget::Nodes { nodes: nodes.clone(), motivation, } @@ -551,9 +551,7 @@ async fn main() -> Result<(), anyhow::Error> { }; } }, - }; - let _ = runner_instance.stop_backend().await; - r + } }) .await; diff --git a/rs/cli/src/runner.rs b/rs/cli/src/runner.rs index bee10dda0..955575fa7 100644 --- a/rs/cli/src/runner.rs +++ b/rs/cli/src/runner.rs @@ -1,12 +1,10 @@ -use crate::clients::DashboardBackendClient; +use crate::ic_admin; use crate::ic_admin::ProposeOptions; use crate::operations::hostos_rollout::{HostosRollout, HostosRolloutResponse, NodeGroupUpdate}; use crate::ops_subnet_node_replace; -use crate::{ic_admin, local_unused_port}; -use actix_web::dev::ServerHandle; use decentralization::network::{AvailableNodesQuerier, SubnetChange, SubnetQuerier, SubnetQueryBy}; use decentralization::network::{NetworkHealRequest, TopologyManager}; -use decentralization::subnets::NodesRemover; +use decentralization::subnets::{MembershipReplace, NodesRemover, ReplaceTarget}; use decentralization::SubnetChangeResponse; use futures::future::join_all; use futures::TryFutureExt; @@ -15,16 +13,15 @@ use ic_base_types::PrincipalId; use ic_management_backend::proposal::ProposalAgent; use ic_management_backend::public_dashboard::query_ic_dashboard_list; use ic_management_backend::registry::{self, RegistryState}; -use ic_management_backend::{endpoints, health, health::HealthStatusQuerier}; -use ic_management_types::TopologyChangePayload; +use ic_management_backend::{health, health::HealthStatusQuerier}; use ic_management_types::{Artifact, Network, Node, NodeFeature, NodeProvidersResponse}; +use ic_management_types::{NetworkError, TopologyChangePayload}; use itertools::Itertools; use log::{info, warn}; use registry_canister::mutations::do_change_subnet_membership::ChangeSubnetMembershipPayload; use std::cell::RefCell; use std::collections::BTreeMap; -use std::sync::{mpsc, Arc}; -use std::thread; +use std::sync::Arc; use tabled::builder::Builder; use tabled::settings::Style; @@ -32,49 +29,9 @@ pub struct Runner { pub ic_admin: ic_admin::IcAdminWrapper, registry: RefCell>>, network: Network, - dashboard_backend_client: RefCell>, - backend_srv: RefCell>, } impl Runner { - pub async fn get_backend_client(&self) -> anyhow::Result { - if let Some(dashboard_backend_client) = &*self.dashboard_backend_client.borrow() { - return Ok(dashboard_backend_client.clone()); - }; - - // This will be executed just once creating the backend - let backend_port = local_unused_port(); - let backend_url = format!("http://localhost:{}/", backend_port); - let (tx, rx) = mpsc::channel(); - - let target_network_backend = self.registry().await.network(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(async move { - endpoints::run_backend(&target_network_backend, "127.0.0.1", backend_port, true, Some(tx)) - .await - .expect("failed") - }); - }); - let srv = rx.recv().unwrap(); - let dashboard_backend_client = DashboardBackendClient::new_with_backend_url(backend_url); - - self.dashboard_backend_client - .borrow_mut() - .get_or_insert_with(|| dashboard_backend_client.clone()); - self.backend_srv.borrow_mut().get_or_insert_with(|| srv.clone()); - - Ok(dashboard_backend_client) - } - - pub async fn stop_backend(&self) -> anyhow::Result<()> { - let backend_srv_opt = self.backend_srv.borrow().clone(); - if let Some(backend_srv) = backend_srv_opt { - backend_srv.stop(false).await; - } - Ok(()) - } - pub async fn registry(&self) -> Arc { { if let Some(ref registry) = *self.registry.borrow() { @@ -110,8 +67,6 @@ impl Runner { ic_admin, registry: RefCell::new(None), network: network.clone(), - dashboard_backend_client: RefCell::new(None), - backend_srv: RefCell::new(None), }) } @@ -150,7 +105,17 @@ impl Runner { dry_run: bool, ) -> anyhow::Result<()> { let subnet = request.subnet; - let change = self.get_backend_client().await?.subnet_resize(request).await?; + let change = self + .registry() + .await + .modify_subnet_nodes(SubnetQueryBy::SubnetId(request.subnet)) + .await? + .excluding_from_available(request.exclude.clone().unwrap_or_default()) + .including_from_available(request.only.clone().unwrap_or_default()) + .including_from_available(request.include.clone().unwrap_or_default()) + .resize(request.add, request.remove)?; + let change = SubnetChangeResponse::from(&change); + if verbose { if let Some(run_log) = &change.run_log { println!("{}\n", run_log.join("\n")); @@ -198,18 +163,30 @@ impl Runner { println!("{}", self.ic_admin.grep_subcommand_arguments("propose-to-create-subnet")); return Ok(()); } - let subnet_creation_data = self.get_backend_client().await?.subnet_create(request).await?; + + let subnet_creation_data = self + .registry() + .await + .create_subnet( + request.size, + request.min_nakamoto_coefficients.clone(), + request.include.clone().unwrap_or_default(), + request.exclude.clone().unwrap_or_default(), + request.only.clone().unwrap_or_default(), + ) + .await?; + let subnet_creation_data = SubnetChangeResponse::from(&subnet_creation_data); + if verbose { if let Some(run_log) = &subnet_creation_data.run_log { println!("{}\n", run_log.join("\n")); } } println!("{}", subnet_creation_data); - let replica_version = replica_version.unwrap_or( - self.get_backend_client() - .await? - .get_nns_replica_version() + self.registry() + .await + .nns_replica_version() .await .expect("Failed to get a GuestOS version of the NNS subnet"), ); @@ -232,13 +209,44 @@ impl Runner { Ok(()) } - pub async fn membership_replace( - &self, - request: ic_management_types::requests::MembershipReplaceRequest, - verbose: bool, - dry_run: bool, - ) -> anyhow::Result<()> { - let change = self.get_backend_client().await?.membership_replace(request).await?; + /// Simulates replacement of nodes in a subnet. + /// There are multiple ways to replace nodes. For instance: + /// 1. Setting `heal` to `true` in the request to replace unhealthy nodes + /// 2. Replace `optimize` nodes to optimize subnet decentralization. + /// 3. Explicitly add or remove nodes from the subnet specifying their + /// Principals. + /// + /// All nodes in the request must belong to exactly one subnet. + pub async fn membership_replace(&self, request: MembershipReplace, verbose: bool, dry_run: bool) -> anyhow::Result<()> { + let mut motivations: Vec = vec![]; + let health_client = health::HealthClient::new(self.registry().await.network()); + let registry_nodes = self.registry().await.nodes(); + let change_request = match &request.target { + ReplaceTarget::Subnet(subnet) => self.registry().await.modify_subnet_nodes(SubnetQueryBy::SubnetId(*subnet)).await?, + ReplaceTarget::Nodes { + nodes: nodes_to_replace, + motivation, + } => { + motivations.push(motivation.clone()); + let nodes_to_replace = nodes_to_replace + .iter() + .filter_map(|n| registry_nodes.get(n)) + .map(decentralization::network::Node::from) + .collect::>(); + self.registry() + .await + .modify_subnet_nodes(SubnetQueryBy::NodeList(nodes_to_replace)) + .await? + } + } + .excluding_from_available(request.exclude.clone().unwrap_or_default()) + .including_from_available(request.only.clone()) + .including_from_available(request.include.clone().unwrap_or_default()) + .with_min_nakamoto_coefficients(request.min_nakamoto_coefficients.clone()); + let subnet_health: BTreeMap = health_client.subnet(change_request.subnet().id).await?; + + let change = request.replace(subnet_health, registry_nodes, change_request).await?; + if verbose { if let Some(run_log) = &change.run_log { println!("{}\n", run_log.join("\n")); @@ -255,7 +263,15 @@ impl Runner { async fn run_membership_change(&self, change: SubnetChangeResponse, options: ProposeOptions, dry_run: bool) -> anyhow::Result<()> { let subnet_id = change.subnet_id.ok_or_else(|| anyhow::anyhow!("subnet_id is required"))?; - let pending_action = self.get_backend_client().await?.subnet_pending_action(subnet_id).await?; + let pending_action = self + .registry() + .await + .subnets_with_proposals() + .await? + .get(&subnet_id) + .map(|s| s.proposal.clone()) + .ok_or(NetworkError::SubnetNotFound(subnet_id))?; + if let Some(proposal) = pending_action { return Err(anyhow::anyhow!(format!( "There is a pending proposal for this subnet: https://dashboard.internetcomputer.org/proposal/{}", @@ -279,8 +295,7 @@ impl Runner { } pub async fn prepare_versions_to_retire(&self, release_artifact: &Artifact, edit_summary: bool) -> anyhow::Result<(String, Option>)> { - let retireable_versions = self.get_backend_client().await?.get_retireable_versions(release_artifact).await?; - + let retireable_versions = self.registry().await.retireable_versions(release_artifact).await?; let versions = if retireable_versions.is_empty() { Vec::new() } else { diff --git a/rs/decentralization/src/subnets.rs b/rs/decentralization/src/subnets.rs index 31c61def9..593eba7b2 100644 --- a/rs/decentralization/src/subnets.rs +++ b/rs/decentralization/src/subnets.rs @@ -3,11 +3,15 @@ use std::collections::BTreeMap; use ic_base_types::PrincipalId; use ic_management_types::{ requests::{NodeRemoval, NodeRemovalReason}, - Status, Subnet, + MinNakamotoCoefficients, Status, Subnet, }; use itertools::Itertools; +use log::{info, warn}; -use crate::network::Node; +use crate::{ + network::{Node, SubnetChangeRequest}, + SubnetChangeResponse, +}; pub async fn unhealthy_with_nodes( subnets: &BTreeMap, @@ -118,3 +122,131 @@ impl NodesRemover { (nodes_to_rm, motivation) } } + +pub enum ReplaceTarget { + /// Subnet targeted for replacements + Subnet(PrincipalId), + /// Nodes on the same subnet that need to be replaced for other reasons + Nodes { nodes: Vec, motivation: String }, +} + +pub struct MembershipReplace { + pub target: ReplaceTarget, + pub heal: bool, + pub optimize: Option, + pub exclude: Option>, + pub only: Vec, + pub include: Option>, + pub min_nakamoto_coefficients: Option, +} + +impl MembershipReplace { + pub async fn replace( + &self, + subnet_health: BTreeMap, + registry_nodes: BTreeMap, + subnet_change_request: SubnetChangeRequest, + ) -> anyhow::Result { + let mut motivations: Vec = vec![]; + + let mut replacements_unhealthy: Vec = Vec::new(); + if self.heal { + let subnet = subnet_change_request.subnet(); + let unhealthy: Vec = subnet + .nodes + .into_iter() + .filter_map(|n| match subnet_health.get(&n.id) { + Some(health) => { + if *health == ic_management_types::Status::Healthy { + None + } else { + info!("Node {} is {:?}", n.id, health); + Some(n) + } + } + None => { + warn!("Node {} has no known health, assuming unhealthy", n.id); + Some(n) + } + }) + .collect::>(); + + if !unhealthy.is_empty() { + // Do not check the health of the force-included nodes + let unhealthy = unhealthy + .into_iter() + .filter(|n| !self.include.as_ref().unwrap_or(&vec![]).contains(&n.id)) + .collect::>(); + replacements_unhealthy.extend(unhealthy); + } + } + let req_replace_nodes = if let ReplaceTarget::Nodes { + nodes: req_replace_node_ids, + motivation: _, + } = &self.target + { + let req_replace_nodes = req_replace_node_ids + .iter() + .filter_map(|n| registry_nodes.get(n)) + .map(Node::from) + .collect::>(); + replacements_unhealthy.retain(|n| !req_replace_node_ids.contains(&n.id)); + req_replace_nodes + } else { + vec![] + }; + + let num_unhealthy = replacements_unhealthy.len(); + if !replacements_unhealthy.is_empty() { + let replace_target = if num_unhealthy == 1 { "node" } else { "nodes" }; + motivations.push(format!("replacing {num_unhealthy} unhealthy {replace_target}")); + } + // Optimize the requested number of nodes, and remove unhealthy nodes if there + // are any + let replacements = replacements_unhealthy.into_iter().chain(req_replace_nodes).collect(); + let change = subnet_change_request.optimize(self.optimize.unwrap_or(0), &replacements)?; + let num_optimized = change.removed().len() - replacements.len(); + if num_optimized > 0 { + let replace_target = if num_optimized == 1 { "node" } else { "nodes" }; + motivations.push(format!("replacing {num_optimized} {replace_target} to improve subnet decentralization")); + } + + Ok(SubnetChangeResponse::from(&change).with_motivation(motivations.join("; "))) + } +} + +// impl Display for MembershipReplaceRequest +impl std::fmt::Display for MembershipReplace { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let target = match &self.target { + ReplaceTarget::Subnet(subnet) => format!("subnet {}", subnet), + ReplaceTarget::Nodes { nodes, motivation } => { + format!("nodes {:?} ({})", nodes, motivation) + } + }; + write!(f, "target: {}", target)?; + if self.heal { + write!(f, " heal: {}", self.heal)?; + } + if let Some(optimize) = self.optimize { + write!(f, " optimize: {}", optimize)?; + } + if let Some(exclude) = &self.exclude { + if !exclude.is_empty() { + write!(f, " exclude: {:?}", self.exclude)?; + } + } + if !self.only.is_empty() { + write!(f, " only: {:?}", self.only)?; + } + if let Some(include) = &self.include { + if !include.is_empty() { + write!(f, " include: {:?}", include)?; + } + } + if let Some(min_nakamoto_coefficients) = &self.min_nakamoto_coefficients { + write!(f, " min_nakamoto_coefficients: {:?}", min_nakamoto_coefficients)?; + } + Ok(()) + } +} diff --git a/rs/ic-management-backend/src/endpoints/mod.rs b/rs/ic-management-backend/src/endpoints/mod.rs index 6a0177609..b2e0f98e5 100644 --- a/rs/ic-management-backend/src/endpoints/mod.rs +++ b/rs/ic-management-backend/src/endpoints/mod.rs @@ -6,7 +6,7 @@ pub mod subnet; use crate::health::HealthStatusQuerier; use crate::{health, prometheus, proposal, registry, registry::RegistryState, release::list_subnets_release_statuses, release::RolloutBuilder}; use actix_web::dev::Service; -use actix_web::{get, post, web, App, Error, HttpResponse, HttpServer, Responder, Result}; +use actix_web::{get, web, App, Error, HttpResponse, HttpServer, Responder, Result}; use decentralization::network::AvailableNodesQuerier; use ic_management_types::Network; use ic_registry_nns_data_provider::registry::RegistryCanister; @@ -80,17 +80,11 @@ pub async fn run_backend( .service(operators) .service(nodes_healths) .service(get_subnet) - .service(self::subnet::pending_action) - .service(self::subnet::replace) - .service(self::subnet::create_subnet) - .service(self::subnet::resize) .service(self::subnet::change_preview) .service(self::query_decentralization::decentralization_subnet_query) .service(self::query_decentralization::decentralization_whatif_query) .service(self::release::releases_list_all) - .service(self::release::retireable) .service(self::release::blessed) - .service(self::release::get_nns_replica_version) .service(self::governance_canister::governance_canister_version_endpoint) }) .shutdown_timeout(10) diff --git a/rs/ic-management-backend/src/endpoints/release.rs b/rs/ic-management-backend/src/endpoints/release.rs index 9fd9f2140..8f6ee80ba 100644 --- a/rs/ic-management-backend/src/endpoints/release.rs +++ b/rs/ic-management-backend/src/endpoints/release.rs @@ -13,20 +13,8 @@ pub(crate) async fn releases_list_all(registry: web::Data, registry: web::Data>>) -> Result { - let registry = registry.read().await; - response_from_result(registry.retireable_versions(&request.release_artifact).await) -} - #[get("release/versions/blessed/{release_artifact}")] pub(crate) async fn blessed(request: web::Path, registry: web::Data>>) -> Result { let registry = registry.read().await; response_from_result(registry.blessed_versions(&request.release_artifact).await) } - -#[get("/release/versions/nns")] -pub(crate) async fn get_nns_replica_version(registry: web::Data>>) -> Result { - let registry = registry.read().await; - Ok(HttpResponse::Ok().json(registry.nns_replica_version().await)) -} diff --git a/rs/ic-management-backend/src/endpoints/subnet.rs b/rs/ic-management-backend/src/endpoints/subnet.rs index 89dcd6fea..ab1b4ec93 100644 --- a/rs/ic-management-backend/src/endpoints/subnet.rs +++ b/rs/ic-management-backend/src/endpoints/subnet.rs @@ -1,11 +1,7 @@ use super::*; -use crate::health::HealthStatusQuerier; -use crate::{health, subnets::get_proposed_subnet_changes}; -use decentralization::network::{SubnetQueryBy, TopologyManager}; +use crate::subnets::get_proposed_subnet_changes; use ic_base_types::PrincipalId; -use ic_management_types::requests::{MembershipReplaceRequest, ReplaceTarget, SubnetCreateRequest, SubnetResizeRequest}; use ic_management_types::Node; -use log::warn; use serde::Deserialize; use std::collections::BTreeMap; @@ -14,26 +10,6 @@ struct SubnetRequest { subnet: PrincipalId, } -#[get("/subnet/{subnet}/pending_action")] -pub(crate) async fn pending_action( - request: web::Path, - registry: web::Data>>, -) -> Result { - match registry.read().await.subnets_with_proposals().await { - Ok(subnets) => { - if let Some(subnet) = subnets.get(&request.subnet) { - Ok(HttpResponse::Ok().json(&subnet.proposal)) - } else { - Err(actix_web::error::ErrorNotFound(anyhow::format_err!( - "subnet {} not found", - request.subnet - ))) - } - } - Err(e) => Err(actix_web::error::ErrorInternalServerError(format!("failed to fetch subnets: {}", e))), - } -} - #[get("/subnet/{subnet}/change_preview")] pub(crate) async fn change_preview( request: web::Path, @@ -53,155 +29,3 @@ pub(crate) async fn change_preview( Err(e) => Err(actix_web::error::ErrorInternalServerError(format!("failed to fetch subnets: {}", e))), } } - -/// Simulates replacement of nodes in a subnet. -/// There are multiple ways to replace nodes. For instance: -/// 1. Setting `heal` to `true` in the request to replace unhealthy nodes -/// 2. Replace `optimize` nodes to optimize subnet decentralization. -/// 3. Explicitly add or remove nodes from the subnet specifying their -/// Principals. -/// -/// All nodes in the request must belong to exactly one subnet. -#[post("/subnet/membership/replace")] -pub(crate) async fn replace( - request: web::Json, - registry: web::Data>>, -) -> Result { - let registry = registry.read().await; - let all_nodes = registry.nodes(); - - let mut motivations: Vec = vec![]; - - info!("Received MembershipReplaceRequest: {}", request); - - let change_request = match &request.target { - ReplaceTarget::Subnet(subnet) => registry.modify_subnet_nodes(SubnetQueryBy::SubnetId(*subnet)).await?, - ReplaceTarget::Nodes { - nodes: nodes_to_replace, - motivation, - } => { - motivations.push(motivation.clone()); - let nodes_to_replace = nodes_to_replace - .iter() - .filter_map(|n| all_nodes.get(n)) - .map(decentralization::network::Node::from) - .collect::>(); - registry.modify_subnet_nodes(SubnetQueryBy::NodeList(nodes_to_replace)).await? - } - } - .excluding_from_available(request.exclude.clone().unwrap_or_default()) - .including_from_available(request.only.clone()) - .including_from_available(request.include.clone().unwrap_or_default()) - .with_min_nakamoto_coefficients(request.min_nakamoto_coefficients.clone()); - - let mut replacements_unhealthy: Vec = Vec::new(); - if request.heal { - let subnet = change_request.subnet(); - let health_client = health::HealthClient::new(registry.network()); - let healths = health_client - .subnet(subnet.id) - .await - .map_err(|_| actix_web::error::ErrorInternalServerError("failed to fetch subnet health".to_string()))?; - let unhealthy: Vec = subnet - .nodes - .into_iter() - .filter_map(|n| match healths.get(&n.id) { - Some(health) => { - if *health == ic_management_types::Status::Healthy { - None - } else { - info!("Node {} is {:?}", n.id, health); - Some(n) - } - } - None => { - warn!("Node {} has no known health, assuming unhealthy", n.id); - Some(n) - } - }) - .collect::>(); - - if !unhealthy.is_empty() { - // Do not check the health of the force-included nodes - let unhealthy = unhealthy - .into_iter() - .filter(|n| !request.include.as_ref().unwrap_or(&vec![]).contains(&n.id)) - .collect::>(); - replacements_unhealthy.extend(unhealthy); - } - } - let req_replace_nodes = if let ReplaceTarget::Nodes { - nodes: req_replace_node_ids, - motivation: _, - } = &request.target - { - let req_replace_nodes = req_replace_node_ids - .iter() - .filter_map(|n| all_nodes.get(n)) - .map(decentralization::network::Node::from) - .collect::>(); - replacements_unhealthy.retain(|n| !req_replace_node_ids.contains(&n.id)); - req_replace_nodes - } else { - vec![] - }; - - let num_unhealthy = replacements_unhealthy.len(); - if !replacements_unhealthy.is_empty() { - let replace_target = if num_unhealthy == 1 { "node" } else { "nodes" }; - motivations.push(format!("replacing {num_unhealthy} unhealthy {replace_target}")); - } - // Optimize the requested number of nodes, and remove unhealthy nodes if there - // are any - let replacements = replacements_unhealthy.into_iter().chain(req_replace_nodes).collect(); - let change = change_request.optimize(request.optimize.unwrap_or(0), &replacements)?; - let num_optimized = change.removed().len() - replacements.len(); - if num_optimized > 0 { - let replace_target = if num_optimized == 1 { "node" } else { "nodes" }; - motivations.push(format!("replacing {num_optimized} {replace_target} to improve subnet decentralization")); - } - - Ok(HttpResponse::Ok().json(decentralization::SubnetChangeResponse::from(&change).with_motivation(motivations.join("; ")))) -} - -/// Simulates creation of a new subnet -#[post("/subnet/create")] -pub(crate) async fn create_subnet( - registry: web::Data>>, - request: web::Json, -) -> Result { - let registry = registry.read().await; - println!( - "Received a request to create a subnet of size {:?} and MinNakamotoCoefficients {}", - request.size, - serde_json::to_string(&request.min_nakamoto_coefficients).unwrap() - ); - - Ok(HttpResponse::Ok().json(decentralization::SubnetChangeResponse::from( - ®istry - .create_subnet( - request.size, - request.min_nakamoto_coefficients.clone(), - request.include.clone().unwrap_or_default(), - request.exclude.clone().unwrap_or_default(), - request.only.clone().unwrap_or_default(), - ) - .await?, - ))) -} - -/// Simulates resizing the subnet, i.e. adding or removing nodes to a subnet. -#[post("/subnet/membership/resize")] -pub(crate) async fn resize(request: web::Json, registry: web::Data>>) -> Result { - let registry = registry.read().await; - - let change = registry - .modify_subnet_nodes(SubnetQueryBy::SubnetId(request.subnet)) - .await? - .excluding_from_available(request.exclude.clone().unwrap_or_default()) - .including_from_available(request.only.clone().unwrap_or_default()) - .including_from_available(request.include.clone().unwrap_or_default()) - .resize(request.add, request.remove)?; - - Ok(HttpResponse::Ok().json(decentralization::SubnetChangeResponse::from(&change))) -} diff --git a/rs/ic-management-types/src/requests.rs b/rs/ic-management-types/src/requests.rs index 44cf1d27f..5874790e3 100644 --- a/rs/ic-management-types/src/requests.rs +++ b/rs/ic-management-types/src/requests.rs @@ -2,62 +2,6 @@ use crate::{MinNakamotoCoefficients, Node, Status}; use ic_base_types::PrincipalId; use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize)] -pub struct MembershipReplaceRequest { - pub target: ReplaceTarget, - pub heal: bool, - pub optimize: Option, - pub exclude: Option>, - pub only: Vec, - pub include: Option>, - pub min_nakamoto_coefficients: Option, -} - -// impl Display for MembershipReplaceRequest -impl std::fmt::Display for MembershipReplaceRequest { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let target = match &self.target { - ReplaceTarget::Subnet(subnet) => format!("subnet {}", subnet), - ReplaceTarget::Nodes { nodes, motivation } => { - format!("nodes {:?} ({})", nodes, motivation) - } - }; - write!(f, "target: {}", target)?; - if self.heal { - write!(f, " heal: {}", self.heal)?; - } - if let Some(optimize) = self.optimize { - write!(f, " optimize: {}", optimize)?; - } - if let Some(exclude) = &self.exclude { - if !exclude.is_empty() { - write!(f, " exclude: {:?}", self.exclude)?; - } - } - if !self.only.is_empty() { - write!(f, " only: {:?}", self.only)?; - } - if let Some(include) = &self.include { - if !include.is_empty() { - write!(f, " include: {:?}", include)?; - } - } - if let Some(min_nakamoto_coefficients) = &self.min_nakamoto_coefficients { - write!(f, " min_nakamoto_coefficients: {:?}", min_nakamoto_coefficients)?; - } - Ok(()) - } -} - -#[derive(Serialize, Deserialize)] -#[serde(rename_all = "lowercase")] -pub enum ReplaceTarget { - /// Subnet targeted for replacements - Subnet(PrincipalId), - /// Nodes on the same subnet that need to be replaced for other reasons - Nodes { nodes: Vec, motivation: String }, -} - #[derive(Serialize, Deserialize)] pub struct SubnetCreateRequest { pub size: usize,