Skip to content

Commit

Permalink
refactor: lazy registry (#831)
Browse files Browse the repository at this point in the history
Co-authored-by: sa-github-api <138766536+sa-github-api@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
3 people committed Aug 30, 2024
1 parent 931d1e0 commit ceeb19b
Show file tree
Hide file tree
Showing 21 changed files with 992 additions and 757 deletions.
10 changes: 9 additions & 1 deletion Cargo.Bazel.lock
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"checksum": "2bbd3d15bf606c9af001e99012b5deb363a91b712bf83a21702c95fdf0ccdf21",
"checksum": "b92d8de730e6f789d5322f3f47eaecd7bdbef7250e5d5b835bb493ec0a3ce1da",
"crates": {
"actix-codec 0.5.2": {
"name": "actix-codec",
Expand Down Expand Up @@ -9085,6 +9085,10 @@
"id": "colored 2.1.0",
"target": "colored"
},
{
"id": "futures 0.3.30",
"target": "futures"
},
{
"id": "ic-base-types 0.9.0",
"target": "ic_base_types"
Expand Down Expand Up @@ -20942,6 +20946,10 @@
"id": "ic-nns-governance 0.9.0",
"target": "ic_nns_governance"
},
{
"id": "ic-protobuf 0.9.0",
"target": "ic_protobuf"
},
{
"id": "ic-registry-subnet-type 0.9.0",
"target": "ic_registry_subnet_type"
Expand Down
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rs/cli/src/commands/firewall.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl ExecutableCommand for Firewall {

async fn execute(&self, ctx: crate::ctx::DreContext) -> anyhow::Result<()> {
let registry = ctx.registry().await;
let firewall_ruleset = registry.firewall_rule_set(self.rules_scope.clone())?;
let firewall_ruleset = registry.firewall_rule_set(self.rules_scope.clone()).await?;

let rules: BTreeMap<usize, &FirewallRule> = firewall_ruleset.entries.iter().enumerate().sorted_by(|a, b| a.0.cmp(&b.0)).collect();

Expand Down
206 changes: 75 additions & 131 deletions rs/cli/src/commands/registry.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
use std::{
collections::{BTreeMap, HashMap},
path::PathBuf,
rc::Rc,
str::FromStr,
};
use std::{collections::BTreeMap, path::PathBuf, str::FromStr, sync::Arc};

use clap::Args;
use ic_management_backend::{
health::{HealthClient, HealthStatusQuerier},
lazy_registry::{LazyRegistry, LazyRegistryFamilyEntries},
public_dashboard::query_ic_dashboard_list,
lazy_registry::LazyRegistry,
};
use ic_management_types::{HealthStatus, Network, NodeProvidersResponse};
use ic_management_types::{HealthStatus, Network};
use ic_protobuf::registry::{
api_boundary_node::v1::ApiBoundaryNodeRecord,
dc::v1::DataCenterRecord,
hostos_version::v1::HostosVersionRecord,
node::v1::{ConnectionEndpoint, IPv4InterfaceConfig, NodeRecord},
node_operator::v1::NodeOperatorRecord,
node_rewards::v2::NodeRewardsTable,
node::v1::{ConnectionEndpoint, IPv4InterfaceConfig},
replica_version::v1::ReplicaVersionRecord,
subnet::v1::{ChainKeyConfig, EcdsaConfig, SubnetFeatures, SubnetRecord as SubnetRecordProto},
subnet::v1::{ChainKeyConfig, EcdsaConfig, SubnetFeatures},
unassigned_nodes_config::v1::UnassignedNodesConfigRecord,
};
use ic_registry_subnet_type::SubnetType;
Expand Down Expand Up @@ -80,23 +71,11 @@ impl Registry {
let elected_guest_os_versions = get_elected_guest_os_versions(&local_registry)?;
let elected_host_os_versions = get_elected_host_os_versions(&local_registry)?;

let node_provider_names: HashMap<PrincipalId, String> = HashMap::from_iter(
query_ic_dashboard_list::<NodeProvidersResponse>(ctx.network(), "v3/node-providers")
.await?
.node_providers
.iter()
.map(|np| (np.principal_id, np.display_name.clone())),
);
let mut node_operators = get_node_operators(&local_registry, &node_provider_names, ctx.network())?;
let mut node_operators = get_node_operators(&local_registry, ctx.network()).await?;

let dcs = local_registry
.get_family_entries_versioned::<DataCenterRecord>()
.map_err(|e| anyhow::anyhow!("Couldn't get data centers: {:?}", e))?
.into_iter()
.map(|(_, (_, record))| record)
.collect();
let dcs = local_registry.get_datacenters()?;

let subnets = get_subnets(&local_registry)?;
let subnets = get_subnets(&local_registry).await?;

let unassigned_nodes_config = get_unassigned_nodes(&local_registry)?;

Expand Down Expand Up @@ -142,69 +121,42 @@ impl Registry {
}
}

fn get_elected_guest_os_versions(local_registry: &Rc<LazyRegistry>) -> anyhow::Result<Vec<ReplicaVersionRecord>> {
let elected_versions = local_registry
.get_family_entries_versioned::<ReplicaVersionRecord>()
.map_err(|e| anyhow::anyhow!("Couldn't get elected versions: {:?}", e))?
.into_iter()
.map(|(_, (_, record))| record)
.collect();
Ok(elected_versions)
fn get_elected_guest_os_versions(local_registry: &Arc<dyn LazyRegistry>) -> anyhow::Result<Vec<ReplicaVersionRecord>> {
local_registry.elected_guestos_records()
}

fn get_elected_host_os_versions(local_registry: &Rc<LazyRegistry>) -> anyhow::Result<Vec<HostosVersionRecord>> {
let elected_versions = local_registry
.get_family_entries_versioned::<HostosVersionRecord>()
.map_err(|e| anyhow::anyhow!("Couldn't get elected versions: {:?}", e))?
.into_iter()
.map(|(_, (_, record))| record)
.collect();
Ok(elected_versions)
fn get_elected_host_os_versions(local_registry: &Arc<dyn LazyRegistry>) -> anyhow::Result<Vec<HostosVersionRecord>> {
local_registry.elected_hostos_records()
}

fn get_node_operators(
local_registry: &Rc<LazyRegistry>,
node_provider_names: &HashMap<PrincipalId, String>,
network: &Network,
) -> anyhow::Result<BTreeMap<PrincipalId, NodeOperator>> {
let all_nodes = local_registry
.get_family_entries_versioned::<NodeRecord>()
.map_err(|e| anyhow::anyhow!("Couldn't get nodes: {:?}", e))?
.into_iter()
.map(|(k, (_, record))| (k, record))
.collect::<BTreeMap<_, _>>();
let node_operators = local_registry
.get_family_entries_versioned::<NodeOperatorRecord>()
.map_err(|e| anyhow::anyhow!("Couldn't get node operators: {:?}", e))?
.into_iter()
.map(|(k, (_, record))| {
let node_operator_principal_id = PrincipalId::from_str(k.as_str()).expect("Couldn't parse principal id");
let node_provider_name = node_provider_names
.get(&PrincipalId::try_from(&record.node_provider_principal_id).expect("Couldn't parse principal id"))
.map_or_else(String::default, |v| {
if network.is_mainnet() && v.is_empty() {
panic!("Node provider name should not be empty for mainnet")
}
v.to_string()
});
async fn get_node_operators(local_registry: &Arc<dyn LazyRegistry>, network: &Network) -> anyhow::Result<BTreeMap<PrincipalId, NodeOperator>> {
let all_nodes = local_registry.nodes().await?;
let operators = local_registry
.operators()
.await
.map_err(|e| anyhow::anyhow!("Couldn't get node operators: {:?}", e))?;
let node_operators = operators
.iter()
.map(|(k, record)| {
let node_provider_name = record.provider.name.as_ref().map_or_else(String::default, |v| {
if network.is_mainnet() && v.is_empty() {
panic!("Node provider name should not be empty for mainnet")
}
v.to_string()
});
// Find the number of nodes registered by this operator
let operator_registered_nodes_num = all_nodes
.iter()
.filter(|(_, record)| {
PrincipalId::try_from(&record.node_operator_id).expect("Couldn't parse principal") == node_operator_principal_id
})
.count() as u64;
let operator_registered_nodes_num = all_nodes.iter().filter(|(nk, _)| nk == &k).count() as u64;
(
node_operator_principal_id,
record.provider.principal,
NodeOperator {
node_operator_principal_id,
node_allowance_remaining: record.node_allowance,
node_allowance_total: record.node_allowance + operator_registered_nodes_num,
node_provider_principal_id: PrincipalId::try_from(record.node_provider_principal_id).expect("Couldn't parse principal id"),
node_operator_principal_id: *k,
node_allowance_remaining: record.allowance,
node_allowance_total: record.allowance + operator_registered_nodes_num,
node_provider_principal_id: record.provider.principal,
node_provider_name,
dc_id: record.dc_id,
rewardable_nodes: record.rewardable_nodes,
ipv6: record.ipv6,
dc_id: record.datacenter.as_ref().map(|d| d.name.to_owned()).unwrap_or_default(),
rewardable_nodes: record.rewardable_nodes.clone(),
ipv6: Some(record.ipv6.to_string()),
total_up_nodes: 0,
nodes_health: Default::default(),
rewards_correct: false,
Expand All @@ -215,56 +167,42 @@ fn get_node_operators(
Ok(node_operators)
}

fn get_subnets(local_registry: &Rc<LazyRegistry>) -> anyhow::Result<Vec<SubnetRecord>> {
Ok(local_registry
.get_family_entries::<SubnetRecordProto>()?
.into_iter()
async fn get_subnets(local_registry: &Arc<dyn LazyRegistry>) -> anyhow::Result<Vec<SubnetRecord>> {
let subnets = local_registry.subnets().await?;
Ok(subnets
.iter()
.map(|(subnet_id, record)| SubnetRecord {
subnet_id: PrincipalId::from_str(&subnet_id).expect("Couldn't parse principal id"),
membership: record
.membership
.iter()
.map(|n| {
PrincipalId::try_from(&n[..])
.expect("could not create PrincipalId from membership entry")
.to_string()
})
.collect(),
subnet_id: *subnet_id,
membership: record.nodes.iter().map(|n| n.principal.to_string()).collect(),
nodes: Default::default(),
max_ingress_bytes_per_message: record.max_ingress_bytes_per_message,
max_ingress_messages_per_block: record.max_ingress_messages_per_block,
max_block_payload_size: record.max_block_payload_size,
unit_delay_millis: record.unit_delay_millis,
initial_notary_delay_millis: record.initial_notary_delay_millis,
replica_version_id: record.replica_version_id,
replica_version_id: record.replica_version.clone(),
dkg_interval_length: record.dkg_interval_length,
start_as_nns: record.start_as_nns,
subnet_type: SubnetType::try_from(record.subnet_type).unwrap(),
subnet_type: record.subnet_type,
features: record.features.clone().unwrap_or_default(),
max_number_of_canisters: record.max_number_of_canisters,
ssh_readonly_access: record.ssh_readonly_access,
ssh_backup_access: record.ssh_backup_access,
ecdsa_config: record.ecdsa_config,
ssh_readonly_access: record.ssh_readonly_access.clone(),
ssh_backup_access: record.ssh_backup_access.clone(),
ecdsa_config: record.ecdsa_config.clone(),
dkg_dealings_per_block: record.dkg_dealings_per_block,
is_halted: record.is_halted,
halt_at_cup_height: record.halt_at_cup_height,
chain_key_config: record.chain_key_config,
chain_key_config: record.chain_key_config.clone(),
})
.collect::<Vec<_>>())
}

fn get_unassigned_nodes(local_registry: &Rc<LazyRegistry>) -> anyhow::Result<Option<UnassignedNodesConfigRecord>> {
let unassigned_nodes_config = local_registry
.get_family_entries_versioned::<UnassignedNodesConfigRecord>()
.map_err(|e| anyhow::anyhow!("Couldn't get unassigned nodes config: {:?}", e))?
.into_iter()
.map(|(_, (_, record))| record)
.next();
Ok(unassigned_nodes_config)
fn get_unassigned_nodes(local_registry: &Arc<dyn LazyRegistry>) -> anyhow::Result<Option<UnassignedNodesConfigRecord>> {
local_registry.get_unassigned_nodes()
}

async fn get_nodes(
local_registry: &Rc<LazyRegistry>,
local_registry: &Arc<dyn LazyRegistry>,
node_operators: &BTreeMap<PrincipalId, NodeOperator>,
subnets: &[SubnetRecord],
network: &Network,
Expand All @@ -273,41 +211,47 @@ async fn get_nodes(
let nodes_health = health_client.nodes().await?;

let nodes = local_registry
.get_family_entries_versioned::<NodeRecord>()
.nodes()
.await
.map_err(|e| anyhow::anyhow!("Couldn't get nodes: {:?}", e))?
.into_iter()
.map(|(k, (_, record))| {
let node_operator_id = PrincipalId::try_from(&record.node_operator_id).expect("Couldn't parse principal id");
let node_id = PrincipalId::from_str(&k).expect("Couldn't parse principal id");
.iter()
.map(|(k, record)| {
let node_operator_id = record.operator.principal;
NodeDetails {
node_id,
xnet: record.xnet,
http: record.http,
node_id: *k,
xnet: Some(ConnectionEndpoint {
ip_addr: record.ip_addr.to_string(),
port: 2497,
}),
http: Some(ConnectionEndpoint {
ip_addr: record.ip_addr.to_string(),
port: 8080,
}),
node_operator_id,
chip_id: record.chip_id,
hostos_version_id: record.hostos_version_id,
public_ipv4_config: record.public_ipv4_config,
chip_id: record.chip_id.clone(),
hostos_version_id: Some(record.hostos_version.clone()),
public_ipv4_config: record.public_ipv4_config.clone(),
node_provider_id: match node_operators.get(&node_operator_id) {
Some(no) => no.node_provider_principal_id,
None => PrincipalId::new_anonymous(),
},
subnet_id: subnets
.iter()
.find(|subnet| subnet.membership.contains(&k))
.find(|subnet| subnet.membership.contains(&k.to_string()))
.map(|subnet| subnet.subnet_id),
dc_id: match node_operators.get(&node_operator_id) {
Some(no) => no.dc_id.clone(),
None => "".to_string(),
},
status: nodes_health.get(&node_id).unwrap_or(&ic_management_types::HealthStatus::Unknown).clone(),
status: nodes_health.get(k).unwrap_or(&ic_management_types::HealthStatus::Unknown).clone(),
}
})
.collect::<Vec<_>>();
Ok(nodes)
}

fn get_node_rewards_table(local_registry: &Rc<LazyRegistry>, network: &Network) -> NodeRewardsTableFlattened {
let rewards_table_bytes = local_registry.get_family_entries::<NodeRewardsTable>();
fn get_node_rewards_table(local_registry: &Arc<dyn LazyRegistry>, network: &Network) -> NodeRewardsTableFlattened {
let rewards_table_bytes = local_registry.get_node_rewards_table();

let mut rewards_table = match rewards_table_bytes {
Ok(r) => r,
Expand Down Expand Up @@ -360,12 +304,12 @@ fn get_node_rewards_table(local_registry: &Rc<LazyRegistry>, network: &Network)
}
}

fn get_api_boundary_nodes(local_registry: &Rc<LazyRegistry>) -> anyhow::Result<Vec<ApiBoundaryNodeDetails>> {
fn get_api_boundary_nodes(local_registry: &Arc<dyn LazyRegistry>) -> anyhow::Result<Vec<ApiBoundaryNodeDetails>> {
let api_bns = local_registry
.get_family_entries_versioned::<ApiBoundaryNodeRecord>()
.get_api_boundary_nodes()
.map_err(|e| anyhow::anyhow!("Couldn't get api boundary nodes: {:?}", e))?
.into_iter()
.map(|(k, (_, record))| {
.map(|(k, record)| {
let principal = PrincipalId::from_str(k.as_str()).expect("Couldn't parse principal id");
ApiBoundaryNodeDetails {
principal,
Expand Down
8 changes: 4 additions & 4 deletions rs/cli/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{

use ic_canisters::{governance::governance_canister_version, CanisterClient, IcAgentCanisterClient};
use ic_management_backend::{
lazy_registry::LazyRegistry,
lazy_registry::{LazyRegistry, LazyRegistryImpl},
proposal::ProposalAgent,
registry::{local_registry_path, sync_local_store},
};
Expand All @@ -29,7 +29,7 @@ const STAGING_NEURON_ID: u64 = 49;
#[derive(Clone)]
pub struct DreContext {
network: Network,
registry: RefCell<Option<Rc<LazyRegistry>>>,
registry: RefCell<Option<Arc<dyn LazyRegistry>>>,
ic_admin: Option<Arc<IcAdminWrapper>>,
runner: RefCell<Option<Rc<Runner>>>,
verbose_runner: bool,
Expand Down Expand Up @@ -149,7 +149,7 @@ impl DreContext {
Ok((ic_admin, Some(ic_admin_path)))
}

pub async fn registry(&self) -> Rc<LazyRegistry> {
pub async fn registry(&self) -> Arc<dyn LazyRegistry> {
if let Some(reg) = self.registry.borrow().as_ref() {
return reg.clone();
}
Expand All @@ -162,7 +162,7 @@ impl DreContext {
info!("Using local registry path for network {}: {}", network.name, local_path.display());
let local_registry = LocalRegistry::new(local_path, Duration::from_millis(1000)).expect("Failed to create local registry");

let registry = Rc::new(LazyRegistry::new(local_registry, network.clone(), self.skip_sync));
let registry = Arc::new(LazyRegistryImpl::new(local_registry, network.clone(), self.skip_sync));
*self.registry.borrow_mut() = Some(registry.clone());
registry
}
Expand Down
Loading

0 comments on commit ceeb19b

Please sign in to comment.