diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index 8ee99fb3b..3a8f33484 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "2bbd3d15bf606c9af001e99012b5deb363a91b712bf83a21702c95fdf0ccdf21", + "checksum": "b92d8de730e6f789d5322f3f47eaecd7bdbef7250e5d5b835bb493ec0a3ce1da", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -9085,6 +9085,10 @@ "id": "colored 2.1.0", "target": "colored" }, + { + "id": "futures 0.3.30", + "target": "futures" + }, { "id": "ic-base-types 0.9.0", "target": "ic_base_types" @@ -20942,6 +20946,10 @@ "id": "ic-nns-governance 0.9.0", "target": "ic_nns_governance" }, + { + "id": "ic-protobuf 0.9.0", + "target": "ic_protobuf" + }, { "id": "ic-registry-subnet-type 0.9.0", "target": "ic_registry_subnet_type" diff --git a/Cargo.lock b/Cargo.lock index e8099275a..862a3d085 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1794,6 +1794,7 @@ dependencies = [ "ahash 0.8.11", "anyhow", "colored", + "futures", "ic-base-types", "ic-management-types", "itertools 0.13.0", @@ -4213,6 +4214,7 @@ dependencies = [ "futures", "ic-base-types", "ic-nns-governance", + "ic-protobuf", "ic-registry-subnet-type", "ic-types", "registry-canister", diff --git a/rs/cli/src/commands/firewall.rs b/rs/cli/src/commands/firewall.rs index bfe8f4325..8d22db019 100644 --- a/rs/cli/src/commands/firewall.rs +++ b/rs/cli/src/commands/firewall.rs @@ -37,7 +37,7 @@ impl ExecutableCommand for Firewall { async fn execute(&self, ctx: crate::ctx::DreContext) -> anyhow::Result<()> { let registry = ctx.registry().await; - let firewall_ruleset = registry.firewall_rule_set(self.rules_scope.clone())?; + let firewall_ruleset = registry.firewall_rule_set(self.rules_scope.clone()).await?; let rules: BTreeMap = firewall_ruleset.entries.iter().enumerate().sorted_by(|a, b| a.0.cmp(&b.0)).collect(); diff --git a/rs/cli/src/commands/registry.rs b/rs/cli/src/commands/registry.rs index 6204c3a48..28b62cdb3 100644 --- a/rs/cli/src/commands/registry.rs +++ b/rs/cli/src/commands/registry.rs @@ -1,26 +1,17 @@ -use std::{ - collections::{BTreeMap, HashMap}, - path::PathBuf, - rc::Rc, - str::FromStr, -}; +use std::{collections::BTreeMap, path::PathBuf, str::FromStr, sync::Arc}; use clap::Args; use ic_management_backend::{ health::{HealthClient, HealthStatusQuerier}, - lazy_registry::{LazyRegistry, LazyRegistryFamilyEntries}, - public_dashboard::query_ic_dashboard_list, + lazy_registry::LazyRegistry, }; -use ic_management_types::{HealthStatus, Network, NodeProvidersResponse}; +use ic_management_types::{HealthStatus, Network}; use ic_protobuf::registry::{ - api_boundary_node::v1::ApiBoundaryNodeRecord, dc::v1::DataCenterRecord, hostos_version::v1::HostosVersionRecord, - node::v1::{ConnectionEndpoint, IPv4InterfaceConfig, NodeRecord}, - node_operator::v1::NodeOperatorRecord, - node_rewards::v2::NodeRewardsTable, + node::v1::{ConnectionEndpoint, IPv4InterfaceConfig}, replica_version::v1::ReplicaVersionRecord, - subnet::v1::{ChainKeyConfig, EcdsaConfig, SubnetFeatures, SubnetRecord as SubnetRecordProto}, + subnet::v1::{ChainKeyConfig, EcdsaConfig, SubnetFeatures}, unassigned_nodes_config::v1::UnassignedNodesConfigRecord, }; use ic_registry_subnet_type::SubnetType; @@ -80,23 +71,11 @@ impl Registry { let elected_guest_os_versions = get_elected_guest_os_versions(&local_registry)?; let elected_host_os_versions = get_elected_host_os_versions(&local_registry)?; - let node_provider_names: HashMap = HashMap::from_iter( - query_ic_dashboard_list::(ctx.network(), "v3/node-providers") - .await? - .node_providers - .iter() - .map(|np| (np.principal_id, np.display_name.clone())), - ); - let mut node_operators = get_node_operators(&local_registry, &node_provider_names, ctx.network())?; + let mut node_operators = get_node_operators(&local_registry, ctx.network()).await?; - let dcs = local_registry - .get_family_entries_versioned::() - .map_err(|e| anyhow::anyhow!("Couldn't get data centers: {:?}", e))? - .into_iter() - .map(|(_, (_, record))| record) - .collect(); + let dcs = local_registry.get_datacenters()?; - let subnets = get_subnets(&local_registry)?; + let subnets = get_subnets(&local_registry).await?; let unassigned_nodes_config = get_unassigned_nodes(&local_registry)?; @@ -142,69 +121,42 @@ impl Registry { } } -fn get_elected_guest_os_versions(local_registry: &Rc) -> anyhow::Result> { - let elected_versions = local_registry - .get_family_entries_versioned::() - .map_err(|e| anyhow::anyhow!("Couldn't get elected versions: {:?}", e))? - .into_iter() - .map(|(_, (_, record))| record) - .collect(); - Ok(elected_versions) +fn get_elected_guest_os_versions(local_registry: &Arc) -> anyhow::Result> { + local_registry.elected_guestos_records() } -fn get_elected_host_os_versions(local_registry: &Rc) -> anyhow::Result> { - let elected_versions = local_registry - .get_family_entries_versioned::() - .map_err(|e| anyhow::anyhow!("Couldn't get elected versions: {:?}", e))? - .into_iter() - .map(|(_, (_, record))| record) - .collect(); - Ok(elected_versions) +fn get_elected_host_os_versions(local_registry: &Arc) -> anyhow::Result> { + local_registry.elected_hostos_records() } -fn get_node_operators( - local_registry: &Rc, - node_provider_names: &HashMap, - network: &Network, -) -> anyhow::Result> { - let all_nodes = local_registry - .get_family_entries_versioned::() - .map_err(|e| anyhow::anyhow!("Couldn't get nodes: {:?}", e))? - .into_iter() - .map(|(k, (_, record))| (k, record)) - .collect::>(); - let node_operators = local_registry - .get_family_entries_versioned::() - .map_err(|e| anyhow::anyhow!("Couldn't get node operators: {:?}", e))? - .into_iter() - .map(|(k, (_, record))| { - let node_operator_principal_id = PrincipalId::from_str(k.as_str()).expect("Couldn't parse principal id"); - let node_provider_name = node_provider_names - .get(&PrincipalId::try_from(&record.node_provider_principal_id).expect("Couldn't parse principal id")) - .map_or_else(String::default, |v| { - if network.is_mainnet() && v.is_empty() { - panic!("Node provider name should not be empty for mainnet") - } - v.to_string() - }); +async fn get_node_operators(local_registry: &Arc, network: &Network) -> anyhow::Result> { + let all_nodes = local_registry.nodes().await?; + let operators = local_registry + .operators() + .await + .map_err(|e| anyhow::anyhow!("Couldn't get node operators: {:?}", e))?; + let node_operators = operators + .iter() + .map(|(k, record)| { + let node_provider_name = record.provider.name.as_ref().map_or_else(String::default, |v| { + if network.is_mainnet() && v.is_empty() { + panic!("Node provider name should not be empty for mainnet") + } + v.to_string() + }); // Find the number of nodes registered by this operator - let operator_registered_nodes_num = all_nodes - .iter() - .filter(|(_, record)| { - PrincipalId::try_from(&record.node_operator_id).expect("Couldn't parse principal") == node_operator_principal_id - }) - .count() as u64; + let operator_registered_nodes_num = all_nodes.iter().filter(|(nk, _)| nk == &k).count() as u64; ( - node_operator_principal_id, + record.provider.principal, NodeOperator { - node_operator_principal_id, - node_allowance_remaining: record.node_allowance, - node_allowance_total: record.node_allowance + operator_registered_nodes_num, - node_provider_principal_id: PrincipalId::try_from(record.node_provider_principal_id).expect("Couldn't parse principal id"), + node_operator_principal_id: *k, + node_allowance_remaining: record.allowance, + node_allowance_total: record.allowance + operator_registered_nodes_num, + node_provider_principal_id: record.provider.principal, node_provider_name, - dc_id: record.dc_id, - rewardable_nodes: record.rewardable_nodes, - ipv6: record.ipv6, + dc_id: record.datacenter.as_ref().map(|d| d.name.to_owned()).unwrap_or_default(), + rewardable_nodes: record.rewardable_nodes.clone(), + ipv6: Some(record.ipv6.to_string()), total_up_nodes: 0, nodes_health: Default::default(), rewards_correct: false, @@ -215,56 +167,42 @@ fn get_node_operators( Ok(node_operators) } -fn get_subnets(local_registry: &Rc) -> anyhow::Result> { - Ok(local_registry - .get_family_entries::()? - .into_iter() +async fn get_subnets(local_registry: &Arc) -> anyhow::Result> { + let subnets = local_registry.subnets().await?; + Ok(subnets + .iter() .map(|(subnet_id, record)| SubnetRecord { - subnet_id: PrincipalId::from_str(&subnet_id).expect("Couldn't parse principal id"), - membership: record - .membership - .iter() - .map(|n| { - PrincipalId::try_from(&n[..]) - .expect("could not create PrincipalId from membership entry") - .to_string() - }) - .collect(), + subnet_id: *subnet_id, + membership: record.nodes.iter().map(|n| n.principal.to_string()).collect(), nodes: Default::default(), max_ingress_bytes_per_message: record.max_ingress_bytes_per_message, max_ingress_messages_per_block: record.max_ingress_messages_per_block, max_block_payload_size: record.max_block_payload_size, unit_delay_millis: record.unit_delay_millis, initial_notary_delay_millis: record.initial_notary_delay_millis, - replica_version_id: record.replica_version_id, + replica_version_id: record.replica_version.clone(), dkg_interval_length: record.dkg_interval_length, start_as_nns: record.start_as_nns, - subnet_type: SubnetType::try_from(record.subnet_type).unwrap(), + subnet_type: record.subnet_type, features: record.features.clone().unwrap_or_default(), max_number_of_canisters: record.max_number_of_canisters, - ssh_readonly_access: record.ssh_readonly_access, - ssh_backup_access: record.ssh_backup_access, - ecdsa_config: record.ecdsa_config, + ssh_readonly_access: record.ssh_readonly_access.clone(), + ssh_backup_access: record.ssh_backup_access.clone(), + ecdsa_config: record.ecdsa_config.clone(), dkg_dealings_per_block: record.dkg_dealings_per_block, is_halted: record.is_halted, halt_at_cup_height: record.halt_at_cup_height, - chain_key_config: record.chain_key_config, + chain_key_config: record.chain_key_config.clone(), }) .collect::>()) } -fn get_unassigned_nodes(local_registry: &Rc) -> anyhow::Result> { - let unassigned_nodes_config = local_registry - .get_family_entries_versioned::() - .map_err(|e| anyhow::anyhow!("Couldn't get unassigned nodes config: {:?}", e))? - .into_iter() - .map(|(_, (_, record))| record) - .next(); - Ok(unassigned_nodes_config) +fn get_unassigned_nodes(local_registry: &Arc) -> anyhow::Result> { + local_registry.get_unassigned_nodes() } async fn get_nodes( - local_registry: &Rc, + local_registry: &Arc, node_operators: &BTreeMap, subnets: &[SubnetRecord], network: &Network, @@ -273,41 +211,47 @@ async fn get_nodes( let nodes_health = health_client.nodes().await?; let nodes = local_registry - .get_family_entries_versioned::() + .nodes() + .await .map_err(|e| anyhow::anyhow!("Couldn't get nodes: {:?}", e))? - .into_iter() - .map(|(k, (_, record))| { - let node_operator_id = PrincipalId::try_from(&record.node_operator_id).expect("Couldn't parse principal id"); - let node_id = PrincipalId::from_str(&k).expect("Couldn't parse principal id"); + .iter() + .map(|(k, record)| { + let node_operator_id = record.operator.principal; NodeDetails { - node_id, - xnet: record.xnet, - http: record.http, + node_id: *k, + xnet: Some(ConnectionEndpoint { + ip_addr: record.ip_addr.to_string(), + port: 2497, + }), + http: Some(ConnectionEndpoint { + ip_addr: record.ip_addr.to_string(), + port: 8080, + }), node_operator_id, - chip_id: record.chip_id, - hostos_version_id: record.hostos_version_id, - public_ipv4_config: record.public_ipv4_config, + chip_id: record.chip_id.clone(), + hostos_version_id: Some(record.hostos_version.clone()), + public_ipv4_config: record.public_ipv4_config.clone(), node_provider_id: match node_operators.get(&node_operator_id) { Some(no) => no.node_provider_principal_id, None => PrincipalId::new_anonymous(), }, subnet_id: subnets .iter() - .find(|subnet| subnet.membership.contains(&k)) + .find(|subnet| subnet.membership.contains(&k.to_string())) .map(|subnet| subnet.subnet_id), dc_id: match node_operators.get(&node_operator_id) { Some(no) => no.dc_id.clone(), None => "".to_string(), }, - status: nodes_health.get(&node_id).unwrap_or(&ic_management_types::HealthStatus::Unknown).clone(), + status: nodes_health.get(k).unwrap_or(&ic_management_types::HealthStatus::Unknown).clone(), } }) .collect::>(); Ok(nodes) } -fn get_node_rewards_table(local_registry: &Rc, network: &Network) -> NodeRewardsTableFlattened { - let rewards_table_bytes = local_registry.get_family_entries::(); +fn get_node_rewards_table(local_registry: &Arc, network: &Network) -> NodeRewardsTableFlattened { + let rewards_table_bytes = local_registry.get_node_rewards_table(); let mut rewards_table = match rewards_table_bytes { Ok(r) => r, @@ -360,12 +304,12 @@ fn get_node_rewards_table(local_registry: &Rc, network: &Network) } } -fn get_api_boundary_nodes(local_registry: &Rc) -> anyhow::Result> { +fn get_api_boundary_nodes(local_registry: &Arc) -> anyhow::Result> { let api_bns = local_registry - .get_family_entries_versioned::() + .get_api_boundary_nodes() .map_err(|e| anyhow::anyhow!("Couldn't get api boundary nodes: {:?}", e))? .into_iter() - .map(|(k, (_, record))| { + .map(|(k, record)| { let principal = PrincipalId::from_str(k.as_str()).expect("Couldn't parse principal id"); ApiBoundaryNodeDetails { principal, diff --git a/rs/cli/src/ctx.rs b/rs/cli/src/ctx.rs index 7a6573d68..96b21f4e5 100644 --- a/rs/cli/src/ctx.rs +++ b/rs/cli/src/ctx.rs @@ -9,7 +9,7 @@ use std::{ use ic_canisters::{governance::governance_canister_version, CanisterClient, IcAgentCanisterClient}; use ic_management_backend::{ - lazy_registry::LazyRegistry, + lazy_registry::{LazyRegistry, LazyRegistryImpl}, proposal::ProposalAgent, registry::{local_registry_path, sync_local_store}, }; @@ -29,7 +29,7 @@ const STAGING_NEURON_ID: u64 = 49; #[derive(Clone)] pub struct DreContext { network: Network, - registry: RefCell>>, + registry: RefCell>>, ic_admin: Option>, runner: RefCell>>, verbose_runner: bool, @@ -149,7 +149,7 @@ impl DreContext { Ok((ic_admin, Some(ic_admin_path))) } - pub async fn registry(&self) -> Rc { + pub async fn registry(&self) -> Arc { if let Some(reg) = self.registry.borrow().as_ref() { return reg.clone(); } @@ -162,7 +162,7 @@ impl DreContext { info!("Using local registry path for network {}: {}", network.name, local_path.display()); let local_registry = LocalRegistry::new(local_path, Duration::from_millis(1000)).expect("Failed to create local registry"); - let registry = Rc::new(LazyRegistry::new(local_registry, network.clone(), self.skip_sync)); + let registry = Arc::new(LazyRegistryImpl::new(local_registry, network.clone(), self.skip_sync)); *self.registry.borrow_mut() = Some(registry.clone()); registry } diff --git a/rs/cli/src/lib.rs b/rs/cli/src/lib.rs index ca5782f8e..a28d04d95 100644 --- a/rs/cli/src/lib.rs +++ b/rs/cli/src/lib.rs @@ -8,3 +8,5 @@ mod operations; mod qualification; mod runner; mod subnet_manager; +#[cfg(test)] +mod unit_tests; diff --git a/rs/cli/src/operations/hostos_rollout.rs b/rs/cli/src/operations/hostos_rollout.rs index 0ba73454d..4903abbdb 100644 --- a/rs/cli/src/operations/hostos_rollout.rs +++ b/rs/cli/src/operations/hostos_rollout.rs @@ -699,6 +699,8 @@ pub mod test { }, allowance: 23933, datacenter: None, + rewardable_nodes: BTreeMap::new(), + ipv6: "".to_string(), }, hostname: None, hostos_release: None, @@ -709,6 +711,8 @@ pub mod test { hostos_version: hostos_version.clone(), dfinity_owned: Some(dfinity_owned), is_api_boundary_node, + chip_id: None, + public_ipv4_config: None, }; n.insert(node.principal, node); } diff --git a/rs/cli/src/qualification/ensure_blessed_versions.rs b/rs/cli/src/qualification/ensure_blessed_versions.rs index 0fa3591f5..767e86b7b 100644 --- a/rs/cli/src/qualification/ensure_blessed_versions.rs +++ b/rs/cli/src/qualification/ensure_blessed_versions.rs @@ -24,7 +24,7 @@ impl Step for EnsureBlessedRevisions { async fn execute(&self, ctx: &StepCtx) -> anyhow::Result<()> { let registry = ctx.dre_ctx().registry().await; - let blessed_versions = registry.elected_guestos()?; + let blessed_versions = registry.elected_guestos().await?; if blessed_versions.contains(&self.version) { return Ok(()); @@ -62,7 +62,7 @@ impl Step for EnsureBlessedRevisions { place_proposal.retry(&ExponentialBuilder::default()).await?; registry.sync_with_nns().await?; - let blessed_versions = registry.elected_guestos()?; + let blessed_versions = registry.elected_guestos().await?; let table = Table::new() .with_columns(&[("Blessed versions", CellAlignment::Center)]) diff --git a/rs/cli/src/qualification/retire_blessed_versions.rs b/rs/cli/src/qualification/retire_blessed_versions.rs index 9637c80da..5303b276a 100644 --- a/rs/cli/src/qualification/retire_blessed_versions.rs +++ b/rs/cli/src/qualification/retire_blessed_versions.rs @@ -22,7 +22,7 @@ impl Step for RetireBlessedVersions { async fn execute(&self, ctx: &StepCtx) -> anyhow::Result<()> { let registry = ctx.dre_ctx().registry().await; - let blessed_versions = registry.elected_guestos()?; + let blessed_versions = registry.elected_guestos().await?; let mut to_unelect = vec![]; for version in &self.versions { if blessed_versions.contains(version) { @@ -57,7 +57,7 @@ impl Step for RetireBlessedVersions { place_proposal.retry(&ExponentialBuilder::default()).await?; registry.sync_with_nns().await?; - let blessed_versions = registry.elected_guestos()?; + let blessed_versions = registry.elected_guestos().await?; let table = Table::new() .with_columns(&[("Blessed versions", CellAlignment::Center)]) diff --git a/rs/cli/src/qualification/upgrade_subnets.rs b/rs/cli/src/qualification/upgrade_subnets.rs index a4bb49634..e3fd0e428 100644 --- a/rs/cli/src/qualification/upgrade_subnets.rs +++ b/rs/cli/src/qualification/upgrade_subnets.rs @@ -120,7 +120,7 @@ impl Step for UpgradeSubnets { } } else { let registry = ctx.dre_ctx().registry().await; - let unassigned_nodes_version = registry.unassigned_nodes_replica_version()?; + let unassigned_nodes_version = registry.unassigned_nodes_replica_version().await?; if unassigned_nodes_version.to_string() == self.to_version { ctx.print_text(format!("Unassigned nodes are already on {}, skipping", self.to_version)); return Ok(()); diff --git a/rs/cli/src/qualification/util.rs b/rs/cli/src/qualification/util.rs index fc46fad0e..a992d74ce 100644 --- a/rs/cli/src/qualification/util.rs +++ b/rs/cli/src/qualification/util.rs @@ -155,7 +155,7 @@ impl StepCtx { let subnets = registry.subnets().await?; let subnets = subnets.values(); - let unassigned = registry.unassigned_nodes_replica_version()?; + let unassigned = registry.unassigned_nodes_replica_version().await?; let table = Table::new() .with_columns(&[ ("Subnet type", CellAlignment::Left), diff --git a/rs/cli/src/runner.rs b/rs/cli/src/runner.rs index a23f12aa6..5c741c810 100644 --- a/rs/cli/src/runner.rs +++ b/rs/cli/src/runner.rs @@ -4,13 +4,10 @@ use std::collections::BTreeSet; use std::rc::Rc; use std::sync::Arc; -use decentralization::network::AvailableNodesQuerier; use decentralization::network::DecentralizedSubnet; use decentralization::network::NetworkHealRequest; use decentralization::network::SubnetChange; -use decentralization::network::SubnetQuerier; use decentralization::network::SubnetQueryBy; -use decentralization::network::TopologyManager; use decentralization::network::{generate_added_node_description, generate_removed_nodes_description}; use decentralization::subnets::NodesRemover; use decentralization::SubnetChangeResponse; @@ -47,7 +44,7 @@ use crate::operations::hostos_rollout::NodeGroupUpdate; pub struct Runner { ic_admin: Arc, - registry: Rc, + registry: Arc, ic_repo: RefCell>>, network: Network, proposal_agent: ProposalAgent, @@ -55,7 +52,7 @@ pub struct Runner { } impl Runner { - pub fn new(ic_admin: Arc, registry: Rc, network: Network, agent: ProposalAgent, verbose: bool) -> Self { + pub fn new(ic_admin: Arc, registry: Arc, network: Network, agent: ProposalAgent, verbose: bool) -> Self { Self { ic_admin, registry, @@ -66,7 +63,7 @@ impl Runner { } } - fn ic_repo(&self) -> Rc { + async fn ic_repo(&self) -> Rc { if let Some(ic_repo) = self.ic_repo.borrow().as_ref() { return ic_repo.clone(); } @@ -76,10 +73,12 @@ impl Runner { self.network.clone(), self.registry .elected_guestos() + .await .expect("Should be able to fetch elected guestos versions") .to_vec(), self.registry .elected_hostos() + .await .expect("Should be able to fetch elected hostos versions") .to_vec(), ) @@ -319,7 +318,7 @@ impl Runner { only: &[String], exclude: &[String], ) -> anyhow::Result, String)>> { - let elected_versions = self.registry.elected_hostos().unwrap(); + let elected_versions = self.registry.elected_hostos().await.unwrap(); if !elected_versions.contains(&version.to_string()) { return Err(anyhow::anyhow!(format!( "The version {} has not being elected.\nVersions elected are: {:?}", @@ -610,7 +609,7 @@ impl Runner { } async fn retireable_hostos_versions(&self) -> anyhow::Result> { - let ic_repo = self.ic_repo(); + let ic_repo = self.ic_repo().await; let hosts = ic_repo.hostos_releases().await?; let active_releases = hosts.get_active_branches(); let hostos_versions: BTreeSet = self.registry.nodes().await?.values().map(|s| s.hostos_version.clone()).collect(); @@ -637,11 +636,11 @@ impl Runner { } async fn retireable_guestos_versions(&self) -> anyhow::Result> { - let ic_repo = self.ic_repo(); + let ic_repo = self.ic_repo().await; let guests = ic_repo.guestos_releases().await?; let active_releases = guests.get_active_branches(); let subnet_versions: BTreeSet = self.registry.subnets().await?.values().map(|s| s.replica_version.clone()).collect(); - let version_on_unassigned_nodes = self.registry.unassigned_nodes_replica_version()?; + let version_on_unassigned_nodes = self.registry.unassigned_nodes_replica_version().await?; let versions_in_proposals: BTreeSet = self .proposal_agent .list_open_elect_replica_proposals() @@ -704,7 +703,7 @@ impl Runner { None => return Err(anyhow::anyhow!("Couldn't find nns subnet with id '{}'", nns_subnet_id)), }; - let unassigned_version = self.registry.unassigned_nodes_replica_version()?; + let unassigned_version = self.registry.unassigned_nodes_replica_version().await?; if unassigned_version == nns.replica_version.clone().into() { info!( diff --git a/rs/cli/src/subnet_manager.rs b/rs/cli/src/subnet_manager.rs index 7a76eaa56..3406aa265 100644 --- a/rs/cli/src/subnet_manager.rs +++ b/rs/cli/src/subnet_manager.rs @@ -1,11 +1,11 @@ use core::fmt; use std::collections::HashSet; -use std::rc::Rc; +use std::sync::Arc; use anyhow::anyhow; use anyhow::Ok; use decentralization::{ - network::{DecentralizedSubnet, Node as DecentralizedNode, NodesConverter, SubnetQueryBy, TopologyManager}, + network::{DecentralizedSubnet, Node as DecentralizedNode, SubnetQueryBy}, SubnetChangeResponse, }; use ic_management_backend::health::{self, HealthStatusQuerier}; @@ -37,12 +37,12 @@ impl fmt::Display for SubnetManagerError { pub struct SubnetManager { subnet_target: Option, - registry_instance: Rc, + registry_instance: Arc, network: Network, } impl SubnetManager { - pub fn new(registry_instance: Rc, network: Network) -> Self { + pub fn new(registry_instance: Arc, network: Network) -> Self { Self { subnet_target: None, registry_instance, diff --git a/rs/cli/src/unit_tests/mod.rs b/rs/cli/src/unit_tests/mod.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/rs/cli/src/unit_tests/mod.rs @@ -0,0 +1 @@ + diff --git a/rs/decentralization/Cargo.toml b/rs/decentralization/Cargo.toml index 0aab16250..35a80b5cc 100644 --- a/rs/decentralization/Cargo.toml +++ b/rs/decentralization/Cargo.toml @@ -23,6 +23,7 @@ serde = { workspace = true } serde_json = { workspace = true } strum_macros = { workspace = true } tabular = { workspace = true } +futures.workspace = true [dev-dependencies] tokio = { workspace = true } diff --git a/rs/decentralization/src/network.rs b/rs/decentralization/src/network.rs index da72abf90..339f57fea 100644 --- a/rs/decentralization/src/network.rs +++ b/rs/decentralization/src/network.rs @@ -5,6 +5,7 @@ use actix_web::http::StatusCode; use actix_web::{HttpResponse, ResponseError}; use ahash::{AHashMap, AHashSet, HashSet}; use anyhow::anyhow; +use futures::future::BoxFuture; use ic_base_types::PrincipalId; use ic_management_types::{HealthStatus, MinNakamotoCoefficients, NetworkError, NodeFeature}; use itertools::Itertools; @@ -825,7 +826,7 @@ impl From for DecentralizedSubnet { } pub trait AvailableNodesQuerier { - fn available_nodes(&self) -> impl std::future::Future, NetworkError>>; + fn available_nodes(&self) -> BoxFuture<'_, Result, NetworkError>>; } #[derive(Clone)] @@ -835,11 +836,11 @@ pub enum SubnetQueryBy { } pub trait NodesConverter { - fn get_nodes(&self, from: &[PrincipalId]) -> impl std::future::Future, NetworkError>>; + fn get_nodes<'a>(&'a self, from: &'a [PrincipalId]) -> BoxFuture<'a, Result, NetworkError>>; } pub trait SubnetQuerier { - fn subnet(&self, by: SubnetQueryBy) -> impl std::future::Future>; + fn subnet(&self, by: SubnetQueryBy) -> BoxFuture<'_, Result>; } #[derive(Clone, Serialize, Deserialize, Debug, strum_macros::Display)] @@ -861,34 +862,37 @@ impl ResponseError for DecentralizationError { } } -#[allow(async_fn_in_trait)] -pub trait TopologyManager: SubnetQuerier + AvailableNodesQuerier { - async fn modify_subnet_nodes(&self, by: SubnetQueryBy) -> Result { - Ok(SubnetChangeRequest { - available_nodes: self.available_nodes().await?, - subnet: self.subnet(by).await?, - ..Default::default() +pub trait TopologyManager: SubnetQuerier + AvailableNodesQuerier + Sync { + fn modify_subnet_nodes(&self, by: SubnetQueryBy) -> BoxFuture<'_, Result> { + Box::pin(async { + Ok(SubnetChangeRequest { + available_nodes: self.available_nodes().await?, + subnet: self.subnet(by).await?, + ..Default::default() + }) }) } - async fn create_subnet( - &self, + fn create_subnet<'a>( + &'a self, size: usize, min_nakamoto_coefficients: Option, include_nodes: Vec, exclude_nodes: Vec, only_nodes: Vec, - health_of_nodes: &BTreeMap, - ) -> Result { - SubnetChangeRequest { - available_nodes: self.available_nodes().await?, - min_nakamoto_coefficients, - ..Default::default() - } - .including_from_available(include_nodes.clone()) - .excluding_from_available(exclude_nodes.clone()) - .including_from_available(only_nodes.clone()) - .resize(size, 0, 0, health_of_nodes) + health_of_nodes: &'a BTreeMap, + ) -> BoxFuture<'a, Result> { + Box::pin(async move { + SubnetChangeRequest { + available_nodes: self.available_nodes().await?, + min_nakamoto_coefficients, + ..Default::default() + } + .including_from_available(include_nodes.clone()) + .excluding_from_available(exclude_nodes.clone()) + .including_from_available(only_nodes.clone()) + .resize(size, 0, 0, health_of_nodes) + }) } } diff --git a/rs/ic-management-backend/src/lazy_registry.rs b/rs/ic-management-backend/src/lazy_registry.rs index edb9ac73b..58d7f6319 100644 --- a/rs/ic-management-backend/src/lazy_registry.rs +++ b/rs/ic-management-backend/src/lazy_registry.rs @@ -1,10 +1,11 @@ +use std::collections::BTreeMap; use std::collections::{BTreeSet, HashSet}; use std::net::Ipv6Addr; use std::str::FromStr; use std::sync::Arc; -use std::{cell::RefCell, collections::BTreeMap}; use decentralization::network::{AvailableNodesQuerier, DecentralizedSubnet, NodesConverter, SubnetQuerier, SubnetQueryBy}; +use futures::future::BoxFuture; use ic_interfaces_registry::RegistryClient; use ic_interfaces_registry::{RegistryClientVersionedResult, RegistryValue}; use ic_management_types::{ @@ -28,6 +29,7 @@ use ic_registry_subnet_type::SubnetType; use ic_types::{NodeId, PrincipalId, RegistryVersion}; use itertools::Itertools; use log::warn; +use tokio::sync::RwLock; use tokio::try_join; use crate::health::HealthStatusQuerier; @@ -48,18 +50,136 @@ const KNOWN_SUBNETS: &[(&str, &str)] = &[ ("bkfrj-6k62g-dycql-7h53p-atvkj-zg4to-gaogh-netha-ptybj-ntsgw-rqe", "European"), ]; -pub struct LazyRegistry { +pub trait LazyRegistry: + LazyRegistryFamilyEntries + NodesConverter + SubnetQuerier + decentralization::network::TopologyManager + AvailableNodesQuerier + Send + Sync +{ + fn node_labels(&self) -> BoxFuture<'_, anyhow::Result>>>; + + fn elected_guestos(&self) -> BoxFuture<'_, anyhow::Result>>>; + + fn elected_hostos(&self) -> BoxFuture<'_, anyhow::Result>>>; + + fn sync_with_nns(&self) -> BoxFuture<'_, anyhow::Result<()>>; + + fn operators(&self) -> BoxFuture<'_, anyhow::Result>>>; + + fn nodes(&self) -> BoxFuture<'_, anyhow::Result>>>; + + fn firewall_rule_set(&self, firewall_rule_scope: FirewallRulesScope) -> BoxFuture<'_, anyhow::Result>; + + fn subnets(&self) -> BoxFuture<'_, anyhow::Result>>>; + + fn nodes_with_proposals(&self) -> BoxFuture<'_, anyhow::Result>>>; + + fn nns_replica_version(&self) -> BoxFuture<'_, anyhow::Result>> { + Box::pin(async { + Ok(self.subnets().await?.values().find_map(|s| { + if s.subnet_type.eq(&SubnetType::System) { + return Some(s.replica_version.clone()); + } + None + })) + }) + } + + fn missing_guests(&self) -> BoxFuture<'_, anyhow::Result>> { + Box::pin(async { + let nodes = self.nodes().await?; + let mut missing_guests = self + .node_labels() + .await? + .iter() + .filter(|g| !nodes.iter().any(|(_, n)| n.label.clone().unwrap_or_default() == g.name)) + .cloned() + .collect_vec(); + + missing_guests.sort_by_key(|g| g.name.to_owned()); + missing_guests.dedup_by_key(|g| g.name.to_owned()); + + Ok(missing_guests) + }) + } + + fn get_decentralized_nodes<'a>(&'a self, principals: &'a [PrincipalId]) -> BoxFuture<'a, anyhow::Result>> { + Box::pin(async { + Ok(self + .nodes() + .await? + .values() + .filter(|n| principals.contains(&n.principal)) + .map(decentralization::network::Node::from) + .collect_vec()) + }) + } + + fn unassigned_nodes_replica_version(&self) -> BoxFuture<'_, anyhow::Result>>; + + fn get_api_boundary_nodes(&self) -> anyhow::Result>; + + fn get_node_rewards_table(&self) -> anyhow::Result>; + + fn get_unassigned_nodes(&self) -> anyhow::Result>; + + fn get_datacenters(&self) -> anyhow::Result>; + + fn elected_guestos_records(&self) -> anyhow::Result>; + + fn elected_hostos_records(&self) -> anyhow::Result>; + + fn update_proposal_data(&self) -> BoxFuture<'_, anyhow::Result<()>>; + + fn subnets_and_proposals(&self) -> BoxFuture<'_, anyhow::Result>>> { + Box::pin(async { + let subnets = self.subnets().await?; + + self.update_proposal_data().await?; + + if subnets.iter().any(|(_, s)| s.proposal.is_some()) { + return Ok(subnets); + } + + self.subnets().await + }) + } +} + +impl NodesConverter for Box { + fn get_nodes<'a>( + &'a self, + from: &'a [PrincipalId], + ) -> BoxFuture<'a, Result, ic_management_types::NetworkError>> { + Box::pin(async { + let nodes = self + .nodes() + .await + .map_err(|e| ic_management_types::NetworkError::DataRequestError(e.to_string()))?; + from.iter() + .map(|n| { + nodes + .get(n) + .ok_or(ic_management_types::NetworkError::NodeNotFound(*n)) + .map(decentralization::network::Node::from) + }) + .collect() + }) + } +} + +pub struct LazyRegistryImpl +where + Self: Send + Sync, +{ local_registry: LocalRegistry, network: Network, - subnets: RefCell>>>, - nodes: RefCell>>>, - operators: RefCell>>>, - node_labels_guests: RefCell>>>, - elected_guestos: RefCell>>>, - elected_hostos: RefCell>>>, - unassigned_nodes_replica_version: RefCell>>, - firewall_rule_set: RefCell>>>, + subnets: RwLock>>>, + nodes: RwLock>>>, + operators: RwLock>>>, + node_labels_guests: RwLock>>>, + elected_guestos: RwLock>>>, + elected_hostos: RwLock>>>, + unassigned_nodes_replica_version: RwLock>>, + firewall_rule_set: RwLock>>>, no_sync: bool, } @@ -111,35 +231,38 @@ pub trait LazyRegistryFamilyEntries { fn get_key_family(&self, key_prefix: &str, version: RegistryVersion) -> anyhow::Result>; fn get_versioned_value(&self, key: &str, version: RegistryVersion) -> RegistryClientVersionedResult>; fn get_latest_version(&self) -> RegistryVersion; +} - fn get_family_entries(&self) -> anyhow::Result> { - let family = self.get_family_entries_versioned::()?; - Ok(family.into_iter().map(|(k, (_, v))| (k, v)).collect()) - } - fn get_family_entries_versioned(&self) -> anyhow::Result> { - self.get_family_entries_of_version(self.get_latest_version()) - } - fn get_family_entries_of_version(&self, version: RegistryVersion) -> anyhow::Result> { - let prefix_length = T::KEY_PREFIX.len(); - Ok(self - .get_key_family(T::KEY_PREFIX, version)? - .iter() - .filter_map(|key| { - let r = self - .get_versioned_value(key, version) - .unwrap_or_else(|_| panic!("Failed to get entry {} for type {}", key, std::any::type_name::())); - r.as_ref().map(|v| { - ( - key[prefix_length..].to_string(), - (r.version.get(), T::decode(v.as_slice()).expect("Invalid registry value")), - ) - }) +fn get_family_entries(reg: &impl LazyRegistryFamilyEntries) -> anyhow::Result> { + let family = get_family_entries_versioned::(reg)?; + Ok(family.into_iter().map(|(k, (_, v))| (k, v)).collect()) +} +fn get_family_entries_versioned(reg: &impl LazyRegistryFamilyEntries) -> anyhow::Result> { + get_family_entries_of_version(reg, reg.get_latest_version()) +} +fn get_family_entries_of_version( + reg: &impl LazyRegistryFamilyEntries, + version: RegistryVersion, +) -> anyhow::Result> { + let prefix_length = T::KEY_PREFIX.len(); + Ok(reg + .get_key_family(T::KEY_PREFIX, version)? + .iter() + .filter_map(|key| { + let r = reg + .get_versioned_value(key, version) + .unwrap_or_else(|_| panic!("Failed to get entry {} for type {}", key, std::any::type_name::())); + r.as_ref().map(|v| { + ( + key[prefix_length..].to_string(), + (r.version.get(), T::decode(v.as_slice()).expect("Invalid registry value")), + ) }) - .collect()) - } + }) + .collect()) } -impl LazyRegistryFamilyEntries for LazyRegistry { +impl LazyRegistryFamilyEntries for LazyRegistryImpl { fn get_key_family(&self, key_prefix: &str, version: RegistryVersion) -> anyhow::Result> { Ok(self.local_registry.get_key_family(key_prefix, version)?) } @@ -153,561 +276,636 @@ impl LazyRegistryFamilyEntries for LazyRegistry { } } -impl LazyRegistry { +impl LazyRegistryImpl { pub fn new(local_registry: LocalRegistry, network: Network, no_sync: bool) -> Self { Self { local_registry, network, - subnets: RefCell::new(None), - nodes: RefCell::new(None), - operators: RefCell::new(None), - node_labels_guests: RefCell::new(None), - elected_guestos: RefCell::new(None), - elected_hostos: RefCell::new(None), - unassigned_nodes_replica_version: RefCell::new(None), - firewall_rule_set: RefCell::new(None), + subnets: RwLock::new(None), + nodes: RwLock::new(None), + operators: RwLock::new(None), + node_labels_guests: RwLock::new(None), + elected_guestos: RwLock::new(None), + elected_hostos: RwLock::new(None), + unassigned_nodes_replica_version: RwLock::new(None), + firewall_rule_set: RwLock::new(None), no_sync, } } - // See if making it async would change anything - pub async fn node_labels(&self) -> anyhow::Result>> { - if let Some(guests) = self.node_labels_guests.borrow().as_ref() { - return Ok(guests.to_owned()); - } + fn node_record_guest(guests: Arc>, nr: &NodeRecord) -> Option { + guests + .iter() + .find(|g| g.ipv6 == Ipv6Addr::from_str(&nr.http.clone().unwrap().ip_addr).unwrap()) + .cloned() + } - if self.no_sync || (!self.network.is_mainnet() && !self.network.eq(&Network::staging_unchecked().unwrap())) { - let res = Arc::new(vec![]); - *self.node_labels_guests.borrow_mut() = Some(res.clone()); - return Ok(res); - } + fn node_ip_addr(nr: &NodeRecord) -> Ipv6Addr { + Ipv6Addr::from_str(&nr.http.clone().expect("missing ipv6 address").ip_addr).expect("invalid ipv6 address") + } +} - let guests = match node_labels::query_guests(&self.network.name).await { - Ok(g) => g, - Err(e) => { - warn!("Failed to query node labels: {}", e); - vec![] +impl LazyRegistry for LazyRegistryImpl { + // See if making it async would change anything + fn node_labels(&self) -> BoxFuture<'_, anyhow::Result>>> { + Box::pin(async { + if let Some(guests) = self.node_labels_guests.read().await.as_ref() { + return Ok(guests.to_owned()); } - }; - let guests = Arc::new(guests); - *self.node_labels_guests.borrow_mut() = Some(guests.clone()); - Ok(guests) - } - - pub fn elected_guestos(&self) -> anyhow::Result>> { - if let Some(elected) = self.elected_guestos.borrow().as_ref() { - return Ok(elected.to_owned()); - } + if self.no_sync || (!self.network.is_mainnet() && !self.network.eq(&Network::staging_unchecked().unwrap())) { + let res = Arc::new(vec![]); + *self.node_labels_guests.write().await = Some(res.clone()); + return Ok(res); + } - let record = self - .get_family_entries::()? - .first_entry() - .ok_or(anyhow::anyhow!("No blessed replica versions found"))? - .get() - .to_owned(); + let guests = match node_labels::query_guests(&self.network.name).await { + Ok(g) => g, + Err(e) => { + warn!("Failed to query node labels: {}", e); + vec![] + } + }; - let record = Arc::new(record.blessed_version_ids); - *self.elected_guestos.borrow_mut() = Some(record.clone()); - Ok(record) + let guests = Arc::new(guests); + *self.node_labels_guests.write().await = Some(guests.clone()); + Ok(guests) + }) } - pub fn elected_hostos(&self) -> anyhow::Result>> { - if let Some(elected) = self.elected_hostos.borrow().as_ref() { - return Ok(elected.to_owned()); - } + fn elected_guestos(&self) -> BoxFuture<'_, anyhow::Result>>> { + Box::pin(async { + if let Some(elected) = self.elected_guestos.read().await.as_ref() { + return Ok(elected.to_owned()); + } - let record = self - .get_family_entries::()? - .values() - .map(|v| v.hostos_version_id.to_owned()) - .collect_vec(); + let record = get_family_entries::(self)? + .first_entry() + .ok_or(anyhow::anyhow!("No blessed replica versions found"))? + .get() + .to_owned(); - let record = Arc::new(record); - *self.elected_hostos.borrow_mut() = Some(record.clone()); - Ok(record) + let record = Arc::new(record.blessed_version_ids); + *self.elected_guestos.write().await = Some(record.clone()); + Ok(record) + }) } - // Resets the whole state of after fetching so that targets can be - // recalculated - pub async fn sync_with_nns(&self) -> anyhow::Result<()> { - self.local_registry.sync_with_nns().await.map_err(|e| anyhow::anyhow!(e))?; - *self.subnets.borrow_mut() = None; - *self.nodes.borrow_mut() = None; - *self.operators.borrow_mut() = None; - - *self.elected_guestos.borrow_mut() = None; - *self.elected_hostos.borrow_mut() = None; - *self.unassigned_nodes_replica_version.borrow_mut() = None; - *self.firewall_rule_set.borrow_mut() = None; - - Ok(()) + fn elected_guestos_records(&self) -> anyhow::Result> { + Ok(get_family_entries_versioned::(self) + .map_err(|e| anyhow::anyhow!("Couldn't get elected versions: {:?}", e))? + .into_iter() + .map(|(_, (_, record))| record) + .collect()) } - pub async fn operators(&self) -> anyhow::Result>> { - if let Some(operators) = self.operators.borrow().as_ref() { - return Ok(operators.to_owned()); - } + fn elected_hostos(&self) -> BoxFuture<'_, anyhow::Result>>> { + Box::pin(async { + if let Some(elected) = self.elected_hostos.read().await.as_ref() { + return Ok(elected.to_owned()); + } - // Fetch node providers - let node_providers = match self.no_sync { - false => query_ic_dashboard_list::(&self.network, "v3/node-providers").await?, - true => NodeProvidersResponse { node_providers: vec![] }, - }; - let node_providers: BTreeMap<_, _> = node_providers.node_providers.iter().map(|p| (p.principal_id, p)).collect(); - let data_centers = self.get_family_entries::()?; - let operators = self.get_family_entries::()?; + let record = get_family_entries::(self)? + .values() + .map(|v| v.hostos_version_id.to_owned()) + .collect_vec(); - let records: BTreeMap<_, _> = operators - .iter() - .map(|(p, or)| { - let principal = PrincipalId::from_str(p).expect("Invalid operator principal id"); - ( - principal, - Operator { - principal, - provider: PrincipalId::try_from(or.node_provider_principal_id.as_slice()) - .map(|p| { - let maybe_provider = node_providers.get(&p).map(|node_provider| Provider { - name: Some(node_provider.display_name.to_owned()), - website: node_provider.website.to_owned(), - principal: p, - }); - - if maybe_provider.is_none() && self.network.is_mainnet() { - panic!("Node provider not found for operator: {}", principal); - } - maybe_provider.unwrap_or_default() - }) - .unwrap(), - allowance: or.node_allowance, - datacenter: data_centers.get(&or.dc_id).map(|dc| { - let (continent, country, city): (_, _, _) = dc.region.splitn(3, ',').map(|s| s.to_string()).collect_tuple().unwrap_or(( - "Unknown".to_string(), - "Unknown".to_string(), - "Unknown".to_string(), - )); - - Datacenter { - name: dc.id.clone(), - city, - country, - continent, - owner: DatacenterOwner { name: dc.owner.clone() }, - latitude: dc.gps.clone().map(|l| l.latitude as f64), - longitude: dc.gps.clone().map(|l| l.longitude as f64), - } - }), - }, - ) - }) - .collect(); + let record = Arc::new(record); + *self.elected_hostos.write().await = Some(record.clone()); + Ok(record) + }) + } - let records = Arc::new(records); - *self.operators.borrow_mut() = Some(records.clone()); - Ok(records) + fn elected_hostos_records(&self) -> anyhow::Result> { + Ok(get_family_entries_versioned::(self) + .map_err(|e| anyhow::anyhow!("Couldn't get elected versions: {:?}", e))? + .into_iter() + .map(|(_, (_, record))| record) + .collect()) } - pub async fn nodes(&self) -> anyhow::Result>> { - if let Some(nodes) = self.nodes.borrow().as_ref() { - return Ok(nodes.to_owned()); - } + // Resets the whole state of after fetching so that targets can be + // recalculated + fn sync_with_nns(&self) -> BoxFuture<'_, anyhow::Result<()>> { + Box::pin(async { + self.local_registry.sync_with_nns().await.map_err(|e| anyhow::anyhow!(e))?; + *self.subnets.write().await = None; + *self.nodes.write().await = None; + *self.operators.write().await = None; + + *self.elected_guestos.write().await = None; + *self.elected_hostos.write().await = None; + *self.unassigned_nodes_replica_version.write().await = None; + *self.firewall_rule_set.write().await = None; + + Ok(()) + }) + } - let node_entries = self.get_family_entries::()?; - let versioned_node_entries = self.get_family_entries_versioned::()?; - let dfinity_dcs = DFINITY_DCS.split(' ').map(|dc| dc.to_string().to_lowercase()).collect::>(); - let api_boundary_nodes = self.get_family_entries::()?; - let guests = self.node_labels().await?; - let operators = self.operators().await?; - let nodes: BTreeMap<_, _> = node_entries - .iter() - .map(|(p, nr)| { - let guest = Self::node_record_guest(guests.clone(), nr); - let operator = operators - .iter() - .find(|(op, _)| op.to_vec() == nr.node_operator_id) - .map(|(_, o)| o.to_owned()); - if operator.is_none() && self.network.is_mainnet() { - panic!("Operator cannot be none on mainnet") - } + fn operators(&self) -> BoxFuture<'_, anyhow::Result>>> { + Box::pin(async { + if let Some(operators) = self.operators.read().await.as_ref() { + return Ok(operators.to_owned()); + } - let principal = PrincipalId::from_str(p).expect("Invalid node principal id"); - let ip_addr = Self::node_ip_addr(nr); - let dc_name = operator - .clone() - .map(|op| match op.datacenter { - Some(dc) => dc.name.to_lowercase(), - None => "".to_string(), - }) - .unwrap_or_default(); - ( - principal, - Node { + // Fetch node providers + let node_providers = match self.no_sync { + false => query_ic_dashboard_list::(&self.network, "v3/node-providers").await?, + true => NodeProvidersResponse { node_providers: vec![] }, + }; + let node_providers: BTreeMap<_, _> = node_providers.node_providers.iter().map(|p| (p.principal_id, p)).collect(); + let data_centers = get_family_entries::(self)?; + let operators = get_family_entries::(self)?; + + let records: BTreeMap<_, _> = operators + .iter() + .map(|(p, or)| { + let principal = PrincipalId::from_str(p).expect("Invalid operator principal id"); + ( principal, - dfinity_owned: Some(dfinity_dcs.contains(&dc_name) || guest.as_ref().map(|g| g.dfinity_owned).unwrap_or_default()), - ip_addr, - hostname: guest - .as_ref() - .map(|g| g.name.clone()) - .unwrap_or_else(|| { - format!( - "{}-{}", - operator - .clone() - .map(|operator| operator.datacenter.as_ref().map(|d| d.name.clone()).unwrap_or_else(|| "??".to_string())) - .unwrap_or_default(), - p.to_string().split_once('-').map(|(first, _)| first).unwrap_or("?????") - ) - }) - .into(), - subnet_id: self - .local_registry - .get_subnet_id_from_node_id(NodeId::new(principal), self.local_registry.get_latest_version()) - .expect("failed to get subnet id") - .map(|s| s.get()), - hostos_version: nr.hostos_version_id.clone().unwrap_or_default(), - // TODO: map hostos release - hostos_release: None, - operator: operator.clone().unwrap_or_default(), - proposal: None, - label: guest.map(|g| g.name), - duplicates: versioned_node_entries - .iter() - .filter(|(_, (_, nr2))| Self::node_ip_addr(nr2) == Self::node_ip_addr(nr)) - .max_by_key(|(_, (version, _))| version) - .and_then(|(p2, _)| { - if p2 == p { - None - } else { - Some(PrincipalId::from_str(p2).expect("invalid node principal id")) + Operator { + principal, + provider: PrincipalId::try_from(or.node_provider_principal_id.as_slice()) + .map(|p| { + let maybe_provider = node_providers.get(&p).map(|node_provider| Provider { + name: Some(node_provider.display_name.to_owned()), + website: node_provider.website.to_owned(), + principal: p, + }); + + if maybe_provider.is_none() && self.network.is_mainnet() { + panic!("Node provider not found for operator: {}", principal); + } + maybe_provider.unwrap_or_default() + }) + .unwrap(), + allowance: or.node_allowance, + datacenter: data_centers.get(&or.dc_id).map(|dc| { + let (continent, country, city): (_, _, _) = dc + .region + .splitn(3, ',') + .map(|s| s.to_string()) + .collect_tuple() + .unwrap_or(("Unknown".to_string(), "Unknown".to_string(), "Unknown".to_string())); + + Datacenter { + name: dc.id.clone(), + city, + country, + continent, + owner: DatacenterOwner { name: dc.owner.clone() }, + latitude: dc.gps.clone().map(|l| l.latitude as f64), + longitude: dc.gps.clone().map(|l| l.longitude as f64), } }), - is_api_boundary_node: api_boundary_nodes.contains_key(p), - }, - ) - }) - .collect(); + rewardable_nodes: or.rewardable_nodes.clone(), + ipv6: or.ipv6().to_string(), + }, + ) + }) + .collect(); - let nodes = Arc::new(nodes); - *self.nodes.borrow_mut() = Some(nodes.clone()); - Ok(nodes) + let records = Arc::new(records); + *self.operators.write().await = Some(records.clone()); + Ok(records) + }) } - pub fn firewall_rule_set(&self, firewall_rule_scope: FirewallRulesScope) -> anyhow::Result { - let key = make_firewall_rules_record_key(&firewall_rule_scope); - if let Some(firewall_rule_set) = self.firewall_rule_set.borrow().as_ref() { - if let Some(entry) = firewall_rule_set.get(&key) { - return Ok(entry.to_owned()); + fn nodes(&self) -> BoxFuture<'_, anyhow::Result>>> { + Box::pin(async { + if let Some(nodes) = self.nodes.read().await.as_ref() { + return Ok(nodes.to_owned()); } - } - let value = match self - .local_registry - .get_value(&key, self.get_latest_version()) - .map_err(|e| anyhow::anyhow!(e))? - { - Some(v) => FirewallRuleSet::decode(v.as_slice())?, - None => FirewallRuleSet::default(), - }; - - let mut opt_arc_map = self.firewall_rule_set.borrow_mut(); - if let Some(arc_map) = opt_arc_map.as_mut() { - let bag = Arc::make_mut(arc_map); - bag.insert(key.to_owned(), value.clone()); - } else { - let mut all = BTreeMap::new(); - all.insert(key.to_owned(), value.clone()); - *opt_arc_map = Some(Arc::new(all)); - } + let node_entries = get_family_entries::(self)?; + let versioned_node_entries = get_family_entries_versioned::(self)?; + let dfinity_dcs = DFINITY_DCS.split(' ').map(|dc| dc.to_string().to_lowercase()).collect::>(); + let api_boundary_nodes = get_family_entries::(self)?; + let guests = self.node_labels().await?; + let operators = self.operators().await?; + let nodes: BTreeMap<_, _> = node_entries + .iter() + .map(|(p, nr)| { + let guest = Self::node_record_guest(guests.clone(), nr); + let operator = operators + .iter() + .find(|(op, _)| op.to_vec() == nr.node_operator_id) + .map(|(_, o)| o.to_owned()); + if operator.is_none() && self.network.is_mainnet() { + panic!("Operator cannot be none on mainnet") + } + + let principal = PrincipalId::from_str(p).expect("Invalid node principal id"); + let ip_addr = Self::node_ip_addr(nr); + let dc_name = operator + .clone() + .map(|op| match op.datacenter { + Some(dc) => dc.name.to_lowercase(), + None => "".to_string(), + }) + .unwrap_or_default(); + ( + principal, + Node { + principal, + dfinity_owned: Some(dfinity_dcs.contains(&dc_name) || guest.as_ref().map(|g| g.dfinity_owned).unwrap_or_default()), + ip_addr, + hostname: guest + .as_ref() + .map(|g| g.name.clone()) + .unwrap_or_else(|| { + format!( + "{}-{}", + operator + .clone() + .map(|operator| operator.datacenter.as_ref().map(|d| d.name.clone()).unwrap_or_else(|| "??".to_string())) + .unwrap_or_default(), + p.to_string().split_once('-').map(|(first, _)| first).unwrap_or("?????") + ) + }) + .into(), + subnet_id: self + .local_registry + .get_subnet_id_from_node_id(NodeId::new(principal), self.local_registry.get_latest_version()) + .expect("failed to get subnet id") + .map(|s| s.get()), + hostos_version: nr.hostos_version_id.clone().unwrap_or_default(), + // TODO: map hostos release + hostos_release: None, + operator: operator.clone().unwrap_or_default(), + proposal: None, + label: guest.map(|g| g.name), + duplicates: versioned_node_entries + .iter() + .filter(|(_, (_, nr2))| Self::node_ip_addr(nr2) == Self::node_ip_addr(nr)) + .max_by_key(|(_, (version, _))| version) + .and_then(|(p2, _)| { + if p2 == p { + None + } else { + Some(PrincipalId::from_str(p2).expect("invalid node principal id")) + } + }), + is_api_boundary_node: api_boundary_nodes.contains_key(p), + chip_id: nr.chip_id.clone(), + public_ipv4_config: nr.public_ipv4_config.clone(), + }, + ) + }) + .collect(); - Ok(value) + let nodes = Arc::new(nodes); + *self.nodes.write().await = Some(nodes.clone()); + Ok(nodes) + }) } - fn node_record_guest(guests: Arc>, nr: &NodeRecord) -> Option { - guests - .iter() - .find(|g| g.ipv6 == Ipv6Addr::from_str(&nr.http.clone().unwrap().ip_addr).unwrap()) - .cloned() - } + fn firewall_rule_set(&self, firewall_rule_scope: FirewallRulesScope) -> BoxFuture<'_, anyhow::Result> { + Box::pin(async move { + let key = make_firewall_rules_record_key(&firewall_rule_scope); + if let Some(firewall_rule_set) = self.firewall_rule_set.read().await.as_ref() { + if let Some(entry) = firewall_rule_set.get(&key) { + return Ok(entry.to_owned()); + } + } - fn node_ip_addr(nr: &NodeRecord) -> Ipv6Addr { - Ipv6Addr::from_str(&nr.http.clone().expect("missing ipv6 address").ip_addr).expect("invalid ipv6 address") - } + let value = match self + .local_registry + .get_value(&key, self.get_latest_version()) + .map_err(|e| anyhow::anyhow!(e))? + { + Some(v) => FirewallRuleSet::decode(v.as_slice())?, + None => FirewallRuleSet::default(), + }; + + let mut opt_arc_map = self.firewall_rule_set.write().await; + if let Some(arc_map) = opt_arc_map.as_mut() { + let bag = Arc::make_mut(arc_map); + bag.insert(key.to_owned(), value.clone()); + } else { + let mut all = BTreeMap::new(); + all.insert(key.to_owned(), value.clone()); + *opt_arc_map = Some(Arc::new(all)); + } - pub async fn subnets(&self) -> anyhow::Result>> { - if let Some(subnets) = self.subnets.borrow().as_ref() { - return Ok(subnets.to_owned()); - } + Ok(value) + }) + } - let all_nodes = self.nodes().await?; + fn subnets(&self) -> BoxFuture<'_, anyhow::Result>>> { + Box::pin(async { + if let Some(subnets) = self.subnets.read().await.as_ref() { + return Ok(subnets.to_owned()); + } - let subnets: BTreeMap<_, _> = self - .get_family_entries::()? - .iter() - .enumerate() - .map(|(i, (p, sr))| { - let principal = PrincipalId::from_str(p).expect("Invalid subnet principal id"); - let subnet_nodes = all_nodes - .iter() - .filter(|(_, n)| n.subnet_id.map_or(false, |s| s == principal)) - .map(|(_, n)| n) - .cloned() - .collect_vec(); - - let subnet_type = SubnetType::try_from(sr.subnet_type).unwrap(); - ( - principal, - Subnet { - nodes: subnet_nodes, + let all_nodes = self.nodes().await?; + + let subnets: BTreeMap<_, _> = get_family_entries::(self)? + .iter() + .enumerate() + .map(|(i, (p, sr))| { + let principal = PrincipalId::from_str(p).expect("Invalid subnet principal id"); + let subnet_nodes = all_nodes + .iter() + .filter(|(_, n)| n.subnet_id.map_or(false, |s| s == principal)) + .map(|(_, n)| n) + .cloned() + .collect_vec(); + + let subnet_type = SubnetType::try_from(sr.subnet_type).unwrap(); + ( principal, - subnet_type, - metadata: SubnetMetadata { - name: if let Some((_, val)) = KNOWN_SUBNETS.iter().find(|(key, _)| key == p) { - val.to_string() - } else if i == 0 { - NNS_SUBNET_NAME.to_string() - } else { - format!( - "{} {}", - match subnet_type { - SubnetType::System => "System", - SubnetType::Application | SubnetType::VerifiedApplication => "App", - }, - i - ) + Subnet { + nodes: subnet_nodes, + principal, + subnet_type, + metadata: SubnetMetadata { + name: if let Some((_, val)) = KNOWN_SUBNETS.iter().find(|(key, _)| key == p) { + val.to_string() + } else if i == 0 { + NNS_SUBNET_NAME.to_string() + } else { + format!( + "{} {}", + match subnet_type { + SubnetType::System => "System", + SubnetType::Application | SubnetType::VerifiedApplication => "App", + }, + i + ) + }, + ..Default::default() }, - ..Default::default() + replica_version: sr.replica_version_id.to_owned(), + // TODO: map replica release + replica_release: None, + proposal: None, + max_ingress_bytes_per_message: sr.max_ingress_bytes_per_message, + max_ingress_messages_per_block: sr.max_ingress_messages_per_block, + max_block_payload_size: sr.max_block_payload_size, + unit_delay_millis: sr.unit_delay_millis, + initial_notary_delay_millis: sr.initial_notary_delay_millis, + dkg_interval_length: sr.dkg_interval_length, + start_as_nns: sr.start_as_nns, + features: sr.features.clone(), + max_number_of_canisters: sr.max_number_of_canisters, + ssh_readonly_access: sr.ssh_readonly_access.clone(), + ssh_backup_access: sr.ssh_backup_access.clone(), + ecdsa_config: sr.ecdsa_config.clone(), + dkg_dealings_per_block: sr.dkg_dealings_per_block, + is_halted: sr.is_halted, + halt_at_cup_height: sr.halt_at_cup_height, + chain_key_config: sr.chain_key_config.clone(), }, - replica_version: sr.replica_version_id.to_owned(), - // TODO: map replica release - replica_release: None, - proposal: None, - }, - ) - }) - .filter(|(_, s)| !s.nodes.is_empty()) - .collect(); - - let subnets = Arc::new(subnets); - *self.subnets.borrow_mut() = Some(subnets.clone()); - Ok(subnets) - } - - pub async fn nodes_with_proposals(&self) -> anyhow::Result>> { - let nodes = self.nodes().await?; - if nodes.iter().any(|(_, n)| n.proposal.is_some()) { - return Ok(nodes); - } + ) + }) + .filter(|(_, s)| !s.nodes.is_empty()) + .collect(); - self.update_proposal_data().await?; - self.nodes().await + let subnets = Arc::new(subnets); + *self.subnets.write().await = Some(subnets.clone()); + Ok(subnets) + }) } - /// Get the list of subnets, and the list of open proposal for each subnet, if any - pub async fn subnets_and_proposals(&self) -> anyhow::Result>> { - let subnets = self.subnets().await?; - - self.update_proposal_data().await?; - - if subnets.iter().any(|(_, s)| s.proposal.is_some()) { - return Ok(subnets); - } + fn nodes_with_proposals(&self) -> BoxFuture<'_, anyhow::Result>>> { + Box::pin(async { + let nodes = self.nodes().await?; + if nodes.iter().any(|(_, n)| n.proposal.is_some()) { + return Ok(nodes); + } - self.subnets().await + self.update_proposal_data().await?; + self.nodes().await + }) } - async fn update_proposal_data(&self) -> anyhow::Result<()> { - if self.no_sync { - return Ok(()); - } - - let proposal_agent = proposal::ProposalAgent::new(self.network.get_nns_urls()); - let nodes = self.nodes().await?; - let subnets = self.subnets().await?; + fn update_proposal_data(&self) -> BoxFuture<'_, anyhow::Result<()>> { + Box::pin(async { + if self.no_sync { + return Ok(()); + } - let topology_proposals = proposal_agent.list_open_topology_proposals().await?; - let nodes: BTreeMap<_, _> = nodes - .iter() - .map(|(p, n)| { - let proposal = topology_proposals - .iter() - .find(|p| p.node_ids_added.contains(&n.principal) || p.node_ids_removed.contains(&n.principal)) - .cloned(); + let proposal_agent = proposal::ProposalAgent::new(self.network.get_nns_urls()); + let nodes = self.nodes().await?; + let subnets = self.subnets().await?; - (*p, Node { proposal, ..n.clone() }) - }) - .collect(); + let topology_proposals = proposal_agent.list_open_topology_proposals().await?; + let nodes: BTreeMap<_, _> = nodes + .iter() + .map(|(p, n)| { + let proposal = topology_proposals + .iter() + .find(|p| p.node_ids_added.contains(&n.principal) || p.node_ids_removed.contains(&n.principal)) + .cloned(); - let subnets: BTreeMap<_, _> = subnets - .iter() - .map(|(p, s)| { - let proposal = topology_proposals - .iter() - .find(|pr| { - pr.subnet_id.unwrap_or_default() == *p - || s.nodes - .iter() - .any(|n| pr.node_ids_added.contains(&n.principal) || pr.node_ids_removed.contains(&n.principal)) - }) - .cloned(); + (*p, Node { proposal, ..n.clone() }) + }) + .collect(); + + let subnets: BTreeMap<_, _> = subnets + .iter() + .map(|(p, s)| { + let proposal = topology_proposals + .iter() + .find(|pr| { + pr.subnet_id.unwrap_or_default() == *p + || s.nodes + .iter() + .any(|n| pr.node_ids_added.contains(&n.principal) || pr.node_ids_removed.contains(&n.principal)) + }) + .cloned(); + + (*p, Subnet { proposal, ..s.clone() }) + }) + .collect(); - (*p, Subnet { proposal, ..s.clone() }) - }) - .collect(); + *self.nodes.write().await = Some(Arc::new(nodes)); + *self.subnets.write().await = Some(Arc::new(subnets)); - *self.nodes.borrow_mut() = Some(Arc::new(nodes)); - *self.subnets.borrow_mut() = Some(Arc::new(subnets)); + Ok(()) + }) + } - Ok(()) + fn missing_guests(&self) -> BoxFuture<'_, anyhow::Result>> { + Box::pin(async { + let nodes = self.nodes().await?; + let mut missing_guests = self + .node_labels() + .await? + .iter() + .filter(|g| !nodes.iter().any(|(_, n)| n.label.clone().unwrap_or_default() == g.name)) + .cloned() + .collect_vec(); + + missing_guests.sort_by_key(|g| g.name.to_owned()); + missing_guests.dedup_by_key(|g| g.name.to_owned()); + + Ok(missing_guests) + }) } - // TODO: valid only for mainnet, on testnets its not mandatory that the first subnet - // is the NNS. Should query by subnet_type - pub async fn nns_replica_version(&self) -> anyhow::Result> { - Ok(self - .subnets() - .await? - .get(&PrincipalId::from_str("tdb26-jop6k-aogll-7ltgs-eruif-6kk7m-qpktf-gdiqx-mxtrf-vb5e6-eqe").unwrap()) - .map(|s| s.replica_version.clone())) + fn get_decentralized_nodes<'a>(&'a self, principals: &'a [PrincipalId]) -> BoxFuture<'a, anyhow::Result>> { + Box::pin(async { + Ok(self + .nodes() + .await? + .values() + .filter(|n| principals.contains(&n.principal)) + .map(decentralization::network::Node::from) + .collect_vec()) + }) } - pub async fn missing_guests(&self) -> anyhow::Result> { - let nodes = self.nodes().await?; - let mut missing_guests = self - .node_labels() - .await? - .iter() - .filter(|g| !nodes.iter().any(|(_, n)| n.label.clone().unwrap_or_default() == g.name)) - .cloned() - .collect_vec(); + fn unassigned_nodes_replica_version(&self) -> BoxFuture<'_, anyhow::Result>> { + Box::pin(async { + if let Some(v) = self.unassigned_nodes_replica_version.read().await.as_ref() { + return Ok(v.to_owned()); + } - missing_guests.sort_by_key(|g| g.name.to_owned()); - missing_guests.dedup_by_key(|g| g.name.to_owned()); + let version = get_family_entries::(self)? + .first_entry() + .map(|v| v.get().to_owned()) + .ok_or(anyhow::anyhow!("No unassigned nodes version"))?; - Ok(missing_guests) + let version = Arc::new(version.replica_version); + *self.unassigned_nodes_replica_version.write().await = Some(version.clone()); + Ok(version) + }) } - pub async fn get_decentralized_nodes(&self, principals: &[PrincipalId]) -> anyhow::Result> { - Ok(self - .nodes() - .await? - .values() - .filter(|n| principals.contains(&n.principal)) - .map(decentralization::network::Node::from) + fn get_api_boundary_nodes(&self) -> anyhow::Result> { + Ok(get_family_entries_versioned::(self) + .map_err(|e| anyhow::anyhow!("Couldn't get api boundary nodes: {:?}", e))? + .into_iter() + .map(|(k, (_, r))| (k, r)) .collect_vec()) } - pub fn unassigned_nodes_replica_version(&self) -> anyhow::Result> { - if let Some(v) = self.unassigned_nodes_replica_version.borrow().as_ref() { - return Ok(v.to_owned()); - } - - let version = self - .get_family_entries::()? - .first_entry() - .map(|v| v.get().to_owned()) - .ok_or(anyhow::anyhow!("No unassigned nodes version"))?; + fn get_node_rewards_table(&self) -> anyhow::Result> { + get_family_entries::(self).map_err(|e| anyhow::anyhow!("Couldn't get node rewards table: {:?}", e)) + } - let version = Arc::new(version.replica_version); - *self.unassigned_nodes_replica_version.borrow_mut() = Some(version.clone()); - Ok(version) + fn get_unassigned_nodes(&self) -> anyhow::Result> { + Ok(get_family_entries_versioned::(self) + .map_err(|e| anyhow::anyhow!("Couldn't get unassigned nodes config: {:?}", e))? + .into_iter() + .map(|(_, (_, record))| record) + .next()) } -} -impl NodesConverter for LazyRegistry { - async fn get_nodes(&self, from: &[PrincipalId]) -> Result, ic_management_types::NetworkError> { - let nodes = self - .nodes() - .await - .map_err(|e| ic_management_types::NetworkError::DataRequestError(e.to_string()))?; - from.iter() - .map(|n| { - nodes - .get(n) - .ok_or(ic_management_types::NetworkError::NodeNotFound(*n)) - .map(decentralization::network::Node::from) - }) - .collect() + fn get_datacenters(&self) -> anyhow::Result> { + Ok(get_family_entries_versioned::(self) + .map_err(|e| anyhow::anyhow!("Couldn't get data centers: {:?}", e))? + .into_iter() + .map(|(_, (_, record))| record) + .collect()) } } -impl SubnetQuerier for LazyRegistry { - async fn subnet(&self, by: SubnetQueryBy) -> Result { - match by { - SubnetQueryBy::SubnetId(id) => self - .subnets() +impl NodesConverter for LazyRegistryImpl { + fn get_nodes<'a>( + &'a self, + from: &'a [PrincipalId], + ) -> BoxFuture<'a, Result, ic_management_types::NetworkError>> { + Box::pin(async { + let nodes = self + .nodes() .await - .map_err(|e| ic_management_types::NetworkError::DataRequestError(e.to_string()))? - .get(&id) - .map(|s| DecentralizedSubnet { - id: s.principal, - nodes: s.nodes.iter().map(decentralization::network::Node::from).collect(), - added_nodes_desc: vec![], - removed_nodes_desc: vec![], - min_nakamoto_coefficients: None, - comment: None, - run_log: vec![], + .map_err(|e| ic_management_types::NetworkError::DataRequestError(e.to_string()))?; + from.iter() + .map(|n| { + nodes + .get(n) + .ok_or(ic_management_types::NetworkError::NodeNotFound(*n)) + .map(decentralization::network::Node::from) }) - .ok_or(ic_management_types::NetworkError::SubnetNotFound(id)), - SubnetQueryBy::NodeList(nodes) => { - let reg_nodes = self.nodes().await.map_err(|e| NetworkError::DataRequestError(e.to_string()))?; - let subnets = nodes - .iter() - .map(|n| reg_nodes.get(&n.id).and_then(|n| n.subnet_id)) - .collect::>(); - if subnets.len() > 1 { - return Err(NetworkError::IllegalRequest("Nodes don't belong to the same subnet".to_owned())); - } - if let Some(Some(subnet)) = subnets.first() { - Ok(decentralization::network::DecentralizedSubnet { - id: *subnet, - nodes: self - .subnets() - .await - .map_err(|e| NetworkError::IllegalRequest(e.to_string()))? - .get(subnet) - .ok_or(NetworkError::SubnetNotFound(*subnet))? - .nodes - .iter() - .map(decentralization::network::Node::from) - .collect(), + .collect() + }) + } +} + +impl SubnetQuerier for LazyRegistryImpl { + fn subnet(&self, by: SubnetQueryBy) -> BoxFuture<'_, Result> { + Box::pin(async { + match by { + SubnetQueryBy::SubnetId(id) => self + .subnets() + .await + .map_err(|e| ic_management_types::NetworkError::DataRequestError(e.to_string()))? + .get(&id) + .map(|s| DecentralizedSubnet { + id: s.principal, + nodes: s.nodes.iter().map(decentralization::network::Node::from).collect(), added_nodes_desc: vec![], removed_nodes_desc: vec![], min_nakamoto_coefficients: None, comment: None, run_log: vec![], }) - } else { - Err(NetworkError::IllegalRequest("no subnets found".to_string())) + .ok_or(ic_management_types::NetworkError::SubnetNotFound(id)), + SubnetQueryBy::NodeList(nodes) => { + let reg_nodes = self.nodes().await.map_err(|e| NetworkError::DataRequestError(e.to_string()))?; + let subnets = nodes + .iter() + .map(|n| reg_nodes.get(&n.id).and_then(|n| n.subnet_id)) + .collect::>(); + if subnets.len() > 1 { + return Err(NetworkError::IllegalRequest("Nodes don't belong to the same subnet".to_owned())); + } + if let Some(Some(subnet)) = subnets.first() { + Ok(decentralization::network::DecentralizedSubnet { + id: *subnet, + nodes: self + .subnets() + .await + .map_err(|e| NetworkError::IllegalRequest(e.to_string()))? + .get(subnet) + .ok_or(NetworkError::SubnetNotFound(*subnet))? + .nodes + .iter() + .map(decentralization::network::Node::from) + .collect(), + added_nodes_desc: vec![], + removed_nodes_desc: vec![], + min_nakamoto_coefficients: None, + comment: None, + run_log: vec![], + }) + } else { + Err(NetworkError::IllegalRequest("no subnets found".to_string())) + } } } - } + }) } } -impl decentralization::network::TopologyManager for LazyRegistry {} - -impl AvailableNodesQuerier for LazyRegistry { - async fn available_nodes(&self) -> Result, ic_management_types::NetworkError> { - let health_client = crate::health::HealthClient::new(self.network.clone()); - let (nodes, healths) = try_join!(self.nodes_with_proposals(), health_client.nodes()) - .map_err(|e| ic_management_types::NetworkError::DataRequestError(e.to_string()))?; - let nodes = nodes - .values() - .filter(|n| n.subnet_id.is_none() && n.proposal.is_none() && n.duplicates.is_none() && !n.is_api_boundary_node) - .cloned() - .collect_vec(); - - Ok(nodes - .iter() - .filter(|n| { - // Keep only healthy nodes. - healths - .get(&n.principal) - .map(|s| matches!(*s, ic_management_types::HealthStatus::Healthy)) - .unwrap_or(false) - }) - .map(decentralization::network::Node::from) - .sorted_by(|n1, n2| n1.id.cmp(&n2.id)) - .collect()) +impl decentralization::network::TopologyManager for LazyRegistryImpl {} + +impl AvailableNodesQuerier for LazyRegistryImpl { + fn available_nodes(&self) -> BoxFuture<'_, Result, ic_management_types::NetworkError>> { + Box::pin(async { + let health_client = crate::health::HealthClient::new(self.network.clone()); + let (nodes, healths) = try_join!(self.nodes_with_proposals(), health_client.nodes()) + .map_err(|e| ic_management_types::NetworkError::DataRequestError(e.to_string()))?; + let nodes = nodes + .values() + .filter(|n| n.subnet_id.is_none() && n.proposal.is_none() && n.duplicates.is_none() && !n.is_api_boundary_node) + .cloned() + .collect_vec(); + + Ok(nodes + .iter() + .filter(|n| { + // Keep only healthy nodes. + healths + .get(&n.principal) + .map(|s| matches!(*s, ic_management_types::HealthStatus::Healthy)) + .unwrap_or(false) + }) + .map(decentralization::network::Node::from) + .sorted_by(|n1, n2| n1.id.cmp(&n2.id)) + .collect()) + }) } } diff --git a/rs/ic-management-backend/src/registry.rs b/rs/ic-management-backend/src/registry.rs index ec71567ab..ecdf392fd 100644 --- a/rs/ic-management-backend/src/registry.rs +++ b/rs/ic-management-backend/src/registry.rs @@ -4,6 +4,7 @@ use crate::node_labels; use crate::proposal::{self, SubnetUpdateProposal, UpdateUnassignedNodesProposal}; use crate::public_dashboard::query_ic_dashboard_list; use decentralization::network::{AvailableNodesQuerier, NodesConverter, SubnetQuerier, SubnetQueryBy}; +use futures::future::BoxFuture; use futures::TryFutureExt; use ic_base_types::NodeId; use ic_base_types::{RegistryVersion, SubnetId}; @@ -420,6 +421,8 @@ impl RegistryState { longitude: dc.gps.clone().map(|l| l.longitude as f64), } }), + rewardable_nodes: or.rewardable_nodes.clone(), + ipv6: or.ipv6().to_string(), }, ) }) @@ -502,6 +505,8 @@ impl RegistryState { } }), is_api_boundary_node: api_boundary_nodes.contains_key(p), + chip_id: nr.chip_id.clone(), + public_ipv4_config: nr.public_ipv4_config.clone(), }, ) }) @@ -567,6 +572,22 @@ impl RegistryState { .find(|r| r.commit_hash == sr.replica_version_id) .cloned(), proposal: None, + max_ingress_bytes_per_message: sr.max_ingress_bytes_per_message, + max_ingress_messages_per_block: sr.max_ingress_messages_per_block, + max_block_payload_size: sr.max_block_payload_size, + unit_delay_millis: sr.unit_delay_millis, + initial_notary_delay_millis: sr.initial_notary_delay_millis, + dkg_interval_length: sr.dkg_interval_length, + start_as_nns: sr.start_as_nns, + features: sr.features.clone(), + max_number_of_canisters: sr.max_number_of_canisters, + ssh_readonly_access: sr.ssh_readonly_access.clone(), + ssh_backup_access: sr.ssh_backup_access.clone(), + ecdsa_config: sr.ecdsa_config.clone(), + dkg_dealings_per_block: sr.dkg_dealings_per_block, + is_halted: sr.is_halted, + halt_at_cup_height: sr.halt_at_cup_height, + chain_key_config: sr.chain_key_config.clone(), }, ) }) @@ -799,95 +820,101 @@ impl RegistryState { impl decentralization::network::TopologyManager for RegistryState {} impl NodesConverter for RegistryState { - async fn get_nodes(&self, from: &[PrincipalId]) -> std::result::Result, NetworkError> { - from.iter() - .map(|n| { - self.nodes() - .get(n) - .ok_or(NetworkError::NodeNotFound(*n)) - .map(decentralization::network::Node::from) - }) - .collect() + fn get_nodes<'a>(&'a self, from: &'a [PrincipalId]) -> BoxFuture<'a, std::result::Result, NetworkError>> { + Box::pin(async { + from.iter() + .map(|n| { + self.nodes() + .get(n) + .ok_or(NetworkError::NodeNotFound(*n)) + .map(decentralization::network::Node::from) + }) + .collect() + }) } } impl SubnetQuerier for RegistryState { - async fn subnet(&self, by: SubnetQueryBy) -> Result { - match by { - SubnetQueryBy::SubnetId(id) => self - .subnets - .get(&id) - .map(|s| decentralization::network::DecentralizedSubnet { - id: s.principal, - nodes: s.nodes.iter().map(decentralization::network::Node::from).collect(), - added_nodes_desc: Vec::new(), - removed_nodes_desc: Vec::new(), - min_nakamoto_coefficients: None, - comment: None, - run_log: Vec::new(), - }) - .ok_or(NetworkError::SubnetNotFound(id)), - SubnetQueryBy::NodeList(nodes) => { - let subnets = nodes - .to_vec() - .iter() - .map(|n| self.nodes.get(&n.id).and_then(|n| n.subnet_id)) - .collect::>(); - if subnets.len() > 1 { - return Err(NetworkError::IllegalRequest("nodes don't belong to the same subnet".to_string())); - } - if let Some(Some(subnet)) = subnets.into_iter().next() { - Ok(decentralization::network::DecentralizedSubnet { - id: subnet, - nodes: self - .subnets - .get(&subnet) - .ok_or(NetworkError::SubnetNotFound(subnet))? - .nodes - .iter() - .map(decentralization::network::Node::from) - .collect(), + fn subnet(&self, by: SubnetQueryBy) -> BoxFuture<'_, Result> { + Box::pin(async { + match by { + SubnetQueryBy::SubnetId(id) => self + .subnets + .get(&id) + .map(|s| decentralization::network::DecentralizedSubnet { + id: s.principal, + nodes: s.nodes.iter().map(decentralization::network::Node::from).collect(), added_nodes_desc: Vec::new(), removed_nodes_desc: Vec::new(), min_nakamoto_coefficients: None, comment: None, run_log: Vec::new(), }) - } else { - Err(NetworkError::IllegalRequest("no subnet found".to_string())) + .ok_or(NetworkError::SubnetNotFound(id)), + SubnetQueryBy::NodeList(nodes) => { + let subnets = nodes + .to_vec() + .iter() + .map(|n| self.nodes.get(&n.id).and_then(|n| n.subnet_id)) + .collect::>(); + if subnets.len() > 1 { + return Err(NetworkError::IllegalRequest("nodes don't belong to the same subnet".to_string())); + } + if let Some(Some(subnet)) = subnets.into_iter().next() { + Ok(decentralization::network::DecentralizedSubnet { + id: subnet, + nodes: self + .subnets + .get(&subnet) + .ok_or(NetworkError::SubnetNotFound(subnet))? + .nodes + .iter() + .map(decentralization::network::Node::from) + .collect(), + added_nodes_desc: Vec::new(), + removed_nodes_desc: Vec::new(), + min_nakamoto_coefficients: None, + comment: None, + run_log: Vec::new(), + }) + } else { + Err(NetworkError::IllegalRequest("no subnet found".to_string())) + } } } - } + }) } } impl AvailableNodesQuerier for RegistryState { - async fn available_nodes(&self) -> Result, NetworkError> { - let nodes = self - .nodes_with_proposals() - .await - .map_err(|err| NetworkError::DataRequestError(err.to_string()))? - .into_values() - .filter(|n| n.subnet_id.is_none() && n.proposal.is_none() && n.duplicates.is_none() && !n.is_api_boundary_node) - .collect::>(); - - let health_client = crate::health::HealthClient::new(self.network()); - let healths = health_client - .nodes() - .await - .map_err(|err| NetworkError::DataRequestError(err.to_string()))?; - Ok(nodes - .iter() - .filter(|n| { - // Keep only healthy nodes. - healths - .get(&n.principal) - .map(|s| matches!(*s, ic_management_types::HealthStatus::Healthy)) - .unwrap_or(false) - }) - .map(decentralization::network::Node::from) - .sorted_by(|n1, n2| n1.id.cmp(&n2.id)) - .collect()) + fn available_nodes(&self) -> BoxFuture<'_, Result, NetworkError>> { + Box::pin(async { + let nodes = self + .nodes_with_proposals() + .await + .map_err(|err| NetworkError::DataRequestError(err.to_string()))? + .into_values() + .filter(|n| n.subnet_id.is_none() && n.proposal.is_none() && n.duplicates.is_none() && !n.is_api_boundary_node) + .collect::>(); + + let health_client = crate::health::HealthClient::new(self.network()); + let healths = health_client + .nodes() + .await + .map_err(|err| NetworkError::DataRequestError(err.to_string()))?; + Ok(nodes + .iter() + .filter(|n| { + // Keep only healthy nodes. + healths + .get(&n.principal) + .map(|s| matches!(*s, ic_management_types::HealthStatus::Healthy)) + .unwrap_or(false) + }) + .map(decentralization::network::Node::from) + .sorted_by(|n1, n2| n1.id.cmp(&n2.id)) + .collect()) + }) } } diff --git a/rs/ic-management-backend/src/subnets.rs b/rs/ic-management-backend/src/subnets.rs index c6d6d2246..b94bf88a1 100644 --- a/rs/ic-management-backend/src/subnets.rs +++ b/rs/ic-management-backend/src/subnets.rs @@ -160,6 +160,8 @@ mod tests { label: None, hostos_version: "".to_string(), is_api_boundary_node: false, + chip_id: None, + public_ipv4_config: None, }; nodes.insert(node.principal, node); } diff --git a/rs/ic-management-types/Cargo.toml b/rs/ic-management-types/Cargo.toml index 8c1fb95b3..e1e971cc7 100644 --- a/rs/ic-management-types/Cargo.toml +++ b/rs/ic-management-types/Cargo.toml @@ -24,6 +24,7 @@ tokio = { workspace = true } url = { workspace = true } anyhow = { workspace = true } candid = { workspace = true } +ic-protobuf.workspace = true [dev-dependencies] wiremock = { workspace = true } diff --git a/rs/ic-management-types/src/lib.rs b/rs/ic-management-types/src/lib.rs index 5eedb36c6..195aa18b5 100644 --- a/rs/ic-management-types/src/lib.rs +++ b/rs/ic-management-types/src/lib.rs @@ -9,6 +9,10 @@ use ic_nns_governance::pb::v1::proposal::Action; use ic_nns_governance::pb::v1::NnsFunction; use ic_nns_governance::pb::v1::ProposalInfo; use ic_nns_governance::pb::v1::ProposalStatus; +use ic_protobuf::registry::node::v1::IPv4InterfaceConfig; +use ic_protobuf::registry::subnet::v1::ChainKeyConfig; +use ic_protobuf::registry::subnet::v1::EcdsaConfig; +use ic_protobuf::registry::subnet::v1::SubnetFeatures; use ic_registry_subnet_type::SubnetType; use ic_types::PrincipalId; use registry_canister::mutations::do_add_nodes_to_subnet::AddNodesToSubnetPayload; @@ -239,6 +243,38 @@ pub struct Subnet { #[serde(skip_serializing_if = "Option::is_none")] pub proposal: Option, pub replica_release: Option, + #[serde(default)] + pub max_ingress_bytes_per_message: u64, + #[serde(default)] + pub max_ingress_messages_per_block: u64, + #[serde(default)] + pub max_block_payload_size: u64, + #[serde(default)] + pub unit_delay_millis: u64, + #[serde(default)] + pub initial_notary_delay_millis: u64, + #[serde(default)] + pub dkg_interval_length: u64, + #[serde(default)] + pub start_as_nns: bool, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub features: Option, + #[serde(default)] + pub max_number_of_canisters: u64, + #[serde(default)] + pub ssh_readonly_access: Vec, + #[serde(default)] + pub ssh_backup_access: Vec, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub ecdsa_config: Option, + #[serde(default)] + pub dkg_dealings_per_block: u64, + #[serde(default)] + pub is_halted: bool, + #[serde(default)] + pub halt_at_cup_height: bool, + #[serde(skip_serializing_if = "Option::is_none", default)] + pub chain_key_config: Option, } type Application = String; @@ -271,6 +307,8 @@ pub struct Node { #[serde(default)] pub duplicates: Option, pub is_api_boundary_node: bool, + pub chip_id: Option>, + pub public_ipv4_config: Option, } #[derive(strum_macros::Display, EnumString, VariantNames, Hash, Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize, Debug)] @@ -351,6 +389,10 @@ pub struct Operator { #[serde(skip_serializing_if = "Option::is_none")] pub datacenter: Option, + #[serde(default)] + pub rewardable_nodes: BTreeMap, + #[serde(default)] + pub ipv6: String, } #[derive(Clone, Serialize, Default, Debug, Deserialize, PartialEq, Eq, PartialOrd, Ord)]