diff --git a/rs/ic-observability/config-writer-common/src/config_builder.rs b/rs/ic-observability/config-writer-common/src/config_builder.rs index 1dd80820c..2614141b6 100644 --- a/rs/ic-observability/config-writer-common/src/config_builder.rs +++ b/rs/ic-observability/config-writer-common/src/config_builder.rs @@ -1,7 +1,7 @@ use std::collections::BTreeSet; use std::fmt::Debug; -use service_discovery::{jobs::Job, TargetGroup}; +use service_discovery::{job_types::JobType, TargetGroup}; pub trait Config: erased_serde::Serialize + Debug { fn updated(&self) -> bool; @@ -10,5 +10,5 @@ pub trait Config: erased_serde::Serialize + Debug { erased_serde::serialize_trait_object!(Config); pub trait ConfigBuilder { - fn build(&mut self, target_groups: BTreeSet, job: Job) -> Box; + fn build(&mut self, target_groups: BTreeSet, job_type: JobType) -> Box; } diff --git a/rs/ic-observability/config-writer-common/src/config_updater_loop.rs b/rs/ic-observability/config-writer-common/src/config_updater_loop.rs index 7b0b28843..0031fc691 100644 --- a/rs/ic-observability/config-writer-common/src/config_updater_loop.rs +++ b/rs/ic-observability/config-writer-common/src/config_updater_loop.rs @@ -3,19 +3,17 @@ use std::{collections::BTreeSet, sync::Arc}; use crossbeam::select; use crossbeam_channel::Receiver; use service_discovery::metrics::Metrics; -use service_discovery::{jobs::Job, IcServiceDiscovery, TargetGroup}; +use service_discovery::{job_types::JobType, IcServiceDiscovery, TargetGroup}; use slog::{info, warn}; -use crate::{ - config_builder::ConfigBuilder, config_updater::ConfigUpdater, filters::TargetGroupFilter, -}; +use crate::{config_builder::ConfigBuilder, config_updater::ConfigUpdater, filters::TargetGroupFilter}; pub fn config_updater_loop( log: slog::Logger, discovery: Arc, filters: Arc, shutdown_signal: Receiver<()>, - jobs: Vec, + jobs: Vec, update_signal_recv: Receiver<()>, mut config_builder: impl ConfigBuilder, config_updater: impl ConfigUpdater, @@ -23,13 +21,10 @@ pub fn config_updater_loop( ) -> impl FnMut() { move || loop { for job in &jobs { - let target_groups = match discovery.get_target_groups(job._type, log.clone()) { + let target_groups = match discovery.get_target_groups(*job, log.clone()) { Ok(t) => t, Err(e) => { - warn!( - log, - "Failed to retrieve targets for job {}: {:?}", job._type, e - ); + warn!(log, "Failed to retrieve targets for job {}: {:?}", job, e); continue; } }; @@ -41,10 +36,10 @@ pub fn config_updater_loop( metrics .total_targets - .with_label_values(&[job._type.to_string().as_str()]) + .with_label_values(&[job.to_string().as_str()]) .set(target_groups.len().try_into().unwrap()); - let config = config_builder.build(filtered_target_groups, job.clone()); + let config = config_builder.build(filtered_target_groups, *job); let config_binding = config.as_ref(); if let Err(e) = config_updater.update(config_binding) { warn!(log, "Failed to write config {}: {:?}", &config.name(), e); diff --git a/rs/ic-observability/multiservice-discovery-downloader/src/downloader_loop.rs b/rs/ic-observability/multiservice-discovery-downloader/src/downloader_loop.rs index 56c373705..15d15022d 100644 --- a/rs/ic-observability/multiservice-discovery-downloader/src/downloader_loop.rs +++ b/rs/ic-observability/multiservice-discovery-downloader/src/downloader_loop.rs @@ -6,12 +6,12 @@ use multiservice_discovery_shared::filters::node_regex_id_filter::NodeIDRegexFil use multiservice_discovery_shared::filters::{TargetGroupFilter, TargetGroupFilterList}; use multiservice_discovery_shared::{ builders::{ - log_vector_config_structure::VectorConfigBuilderImpl, - prometheus_config_structure::PrometheusConfigBuilder, ConfigBuilder, + log_vector_config_structure::VectorConfigBuilderImpl, prometheus_config_structure::PrometheusConfigBuilder, + ConfigBuilder, }, contracts::target::TargetDto, }; -use service_discovery::job_types::{JobType, NodeOS}; +use service_discovery::job_types::JobType; use slog::{debug, info, warn, Logger}; use std::{ collections::hash_map::DefaultHasher, @@ -49,10 +49,7 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece }, recv(interval) -> msg => msg.expect("tick failed!") }; - info!( - logger, - "Downloading from {} @ interval {:?}", cli.sd_url, tick - ); + info!(logger, "Downloading from {} @ interval {:?}", cli.sd_url, tick); let response = match client.get(cli.sd_url.clone()).send().await { Ok(res) => res, @@ -91,10 +88,7 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece let mut hasher = DefaultHasher::new(); - let targets = targets - .into_iter() - .filter(|f| filters.filter(f)) - .collect::>(); + let targets = targets.into_iter().filter(|f| filters.filter(f)).collect::>(); for target in &targets { target.hash(&mut hasher); @@ -103,10 +97,7 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece let hash = hasher.finish(); if current_hash != hash { - info!( - logger, - "Received new targets from {} @ interval {:?}", cli.sd_url, tick - ); + info!(logger, "Received new targets from {} @ interval {:?}", cli.sd_url, tick); current_hash = hash; generate_config(&cli, targets, logger.clone()); @@ -116,17 +107,8 @@ pub async fn run_downloader_loop(logger: Logger, cli: CliArgs, stop_signal: Rece fn generate_config(cli: &CliArgs, targets: Vec, logger: Logger) { let jobs = match cli.generator { - crate::Generator::Log(_) => vec![ - JobType::NodeExporter(NodeOS::Guest), - JobType::NodeExporter(NodeOS::Host), - ], - crate::Generator::Metric => vec![ - JobType::NodeExporter(NodeOS::Guest), - JobType::NodeExporter(NodeOS::Host), - JobType::Orchestrator, - JobType::Replica, - JobType::MetricsProxy, - ], + crate::Generator::Log(_) => JobType::all_for_logs(), + crate::Generator::Metric => JobType::all_for_ic_nodes(), }; if std::fs::metadata(&cli.output_dir).is_err() { @@ -148,8 +130,7 @@ fn generate_config(cli: &CliArgs, targets: Vec, logger: Logger) { let config = match &cli.generator { crate::Generator::Log(subtype) => match &subtype.subcommands { Subtype::SystemdJournalGatewayd { batch_size } => { - VectorConfigBuilderImpl::new(*batch_size, subtype.port, subtype.bn_port) - .build(targets_with_job) + VectorConfigBuilderImpl::new(*batch_size, subtype.port, subtype.bn_port).build(targets_with_job) } Subtype::ExecAndJournald { script_path, diff --git a/rs/ic-observability/multiservice-discovery-shared/src/builders/exec_log_config_structure.rs b/rs/ic-observability/multiservice-discovery-shared/src/builders/exec_log_config_structure.rs index c59a7c94d..4ea9f5b81 100644 --- a/rs/ic-observability/multiservice-discovery-shared/src/builders/exec_log_config_structure.rs +++ b/rs/ic-observability/multiservice-discovery-shared/src/builders/exec_log_config_structure.rs @@ -6,7 +6,7 @@ use serde::Serialize; use crate::contracts::target::TargetDto; use super::{ - log_vector_config_structure::{handle_ip, VectorRemapTransform}, + log_vector_config_structure::VectorRemapTransform, vector_config_enriched::{VectorConfigEnriched, VectorSource, VectorTransform}, ConfigBuilder, }; @@ -22,10 +22,7 @@ pub struct ExecLogConfigBuilderImpl { } impl ConfigBuilder for ExecLogConfigBuilderImpl { - fn build( - &self, - target_groups: std::collections::BTreeSet, - ) -> String { + fn build(&self, target_groups: std::collections::BTreeSet) -> String { let mut config = VectorConfigEnriched::new(); let mut edited_records: Vec = vec![]; @@ -60,7 +57,7 @@ impl ConfigBuilder for ExecLogConfigBuilderImpl { "--url", format!( "http://[{}]:{}/entries", - handle_ip(record.clone(), job, is_bn), + job.ip(*record.targets.first().unwrap(), is_bn), match is_bn { true => self.bn_port, false => self.port, @@ -82,8 +79,7 @@ impl ConfigBuilder for ExecLogConfigBuilderImpl { include_stderr: self.include_stderr, }; - let transform = - VectorRemapTransform::from(record.clone(), *job, key.clone(), is_bn); + let transform = VectorRemapTransform::from(record.clone(), *job, key.clone(), is_bn); let mut source_map = HashMap::new(); source_map.insert(key.clone(), Box::new(source) as Box); diff --git a/rs/ic-observability/multiservice-discovery-shared/src/builders/log_vector_config_structure.rs b/rs/ic-observability/multiservice-discovery-shared/src/builders/log_vector_config_structure.rs index 85bc74191..3f02538e0 100644 --- a/rs/ic-observability/multiservice-discovery-shared/src/builders/log_vector_config_structure.rs +++ b/rs/ic-observability/multiservice-discovery-shared/src/builders/log_vector_config_structure.rs @@ -3,8 +3,6 @@ use std::collections::{BTreeSet, HashMap}; use ic_types::PrincipalId; use serde::Serialize; -use service_discovery::guest_to_host_address; -use service_discovery::job_types::NodeOS; use service_discovery::{job_types::JobType, TargetGroup}; use crate::builders::vector_config_enriched::VectorSource; @@ -68,7 +66,7 @@ pub(crate) fn from_targets_into_vector_config( let key = format!("{}-{}", key, job); let source = VectorSystemdGatewayJournaldSource { _type: "systemd_journal_gatewayd".into(), - endpoint: handle_ip(record.clone(), job, is_bn), + endpoint: job.ip(*record.targets.first().unwrap(), is_bn).to_string(), data_dir: "logs".to_string(), batch_size: builder.batch_size, port: match is_bn { @@ -77,8 +75,7 @@ pub(crate) fn from_targets_into_vector_config( }, }; let source_key = format!("{}-source", key); - let transform = - VectorRemapTransform::from(record.clone(), *job, source_key.clone(), is_bn); + let transform = VectorRemapTransform::from(record.clone(), *job, source_key.clone(), is_bn); let mut sources_map = HashMap::new(); sources_map.insert(source_key, Box::new(source) as Box); @@ -134,7 +131,6 @@ const NODE_PROVIDER_ID: &str = "node_provider_id"; impl VectorRemapTransform { pub fn from(target: TargetDto, job: JobType, input: String, is_bn: bool) -> Self { let target_group = Into::::into(&target); - let mut labels: HashMap = HashMap::new(); let anonymous = PrincipalId::new_anonymous().to_string(); let mut node_id = target_group.node_id.to_string(); @@ -142,20 +138,22 @@ impl VectorRemapTransform { node_id = target.clone().name } - let endpoint = handle_ip(target.clone(), &job, is_bn); - - labels.insert(IC_NAME.into(), target_group.ic_name.to_string()); - labels.insert(IC_NODE.into(), node_id.clone()); - labels.insert(ADDRESS.into(), endpoint); - labels.insert( - NODE_PROVIDER_ID.into(), - target_group.node_provider_id.to_string(), - ); - labels.insert(DC.into(), target_group.dc_id); - labels.extend(target.custom_labels); - if let Some(subnet_id) = target_group.subnet_id { - labels.insert(IC_SUBNET.into(), subnet_id.to_string()); - } + let ip = job.ip(*target.targets.first().unwrap(), is_bn).to_string(); + let labels = HashMap::from([ + (IC_NAME.into(), target_group.ic_name.to_string()), + (IC_NODE.into(), node_id.clone()), + (ADDRESS.into(), ip), + (NODE_PROVIDER_ID.into(), target_group.node_provider_id.to_string()), + (DC.into(), target_group.dc_id), + ]) + .into_iter() + .chain(target.custom_labels.into_iter()) + .chain(match target_group.subnet_id { + Some(subnet_id) => vec![(IC_SUBNET.into(), subnet_id.to_string())], + None => vec![], + }) + .collect::>(); + Self { _type: "remap".into(), inputs: vec![input], @@ -170,32 +168,6 @@ impl VectorRemapTransform { } } -pub fn handle_ip(target_group: TargetDto, job_type: &JobType, is_bn: bool) -> String { - match job_type { - JobType::NodeExporter(NodeOS::Guest) => { - target_group.targets.first().unwrap().ip().to_string() - } - JobType::NodeExporter(NodeOS::Host) => match is_bn { - true => target_group.targets.first().unwrap().ip().to_string(), - false => guest_to_host_address(*target_group.targets.first().unwrap()) - .unwrap() - .ip() - .to_string(), - }, - JobType::MetricsProxy => match is_bn { - // It should not be possible for this to ever be true. - // There is a structural typing problem somewhere here. - true => target_group.targets.first().unwrap().ip().to_string(), - false => guest_to_host_address(*target_group.targets.first().unwrap()) - .unwrap() - .ip() - .to_string(), - }, - JobType::Replica => panic!("Unsupported job type for handle_ip"), - JobType::Orchestrator => panic!("Unsupported job type for handle_ip"), - } -} - #[cfg(test)] mod tests { use std::collections::{BTreeMap, BTreeSet}; @@ -217,9 +189,7 @@ mod tests { let mut parts = ipv6.split(':'); for item in &mut array { - *item = u16::from_str_radix(parts.next().unwrap(), 16) - .unwrap() - .to_be(); + *item = u16::from_str_radix(parts.next().unwrap(), 16).unwrap().to_be(); } array diff --git a/rs/ic-observability/multiservice-discovery-shared/src/builders/prometheus_config_structure.rs b/rs/ic-observability/multiservice-discovery-shared/src/builders/prometheus_config_structure.rs index ada6dadf1..de79bb9f6 100644 --- a/rs/ic-observability/multiservice-discovery-shared/src/builders/prometheus_config_structure.rs +++ b/rs/ic-observability/multiservice-discovery-shared/src/builders/prometheus_config_structure.rs @@ -2,8 +2,6 @@ use std::collections::{BTreeMap, BTreeSet}; use ic_types::PrincipalId; use serde::{Serialize, Serializer}; -use service_discovery::job_types::JobType; -use service_discovery::jobs::Job; use crate::{builders::ConfigBuilder, contracts::target::TargetDto}; @@ -31,26 +29,6 @@ impl Serialize for PrometheusFileSdConfig { #[derive(Debug, Clone)] pub struct PrometheusConfigBuilder {} -fn get_endpoints(target_group: TargetDto, job: JobType) -> BTreeSet { - let binding = Job::all(); - let job = binding.iter().find(|j| j._type == job).unwrap(); - - target_group - .targets - .iter() - .map(|g| { - let mut g = *g; - g.set_port(job.port); - format!( - "{}://{}/{}", - job.scheme, - g, - job.endpoint.trim_start_matches('/'), - ) - }) - .collect() -} - const IC_NAME: &str = "ic"; const IC_NODE: &str = "ic_node"; const IC_SUBNET: &str = "ic_subnet"; @@ -61,34 +39,39 @@ const JOB: &str = "job"; // const NODE_PROVIDER_ID: &str = "node_provider_id"; // const NODE_OPERATOR_ID: &str = "node_operator_id"; -pub fn map_target_group(target_groups: BTreeSet) -> BTreeSet { +pub fn map_target_group(target_groups: Vec) -> Vec { target_groups .into_iter() .flat_map(|tg| { let mut ret = vec![]; for job in &tg.jobs { ret.push(PrometheusStaticConfig { - targets: get_endpoints(tg.clone(), *job), + targets: tg.targets.iter().map(|sa| job.url(*sa, false)).collect(), labels: { - let anonymous = PrincipalId::new_anonymous().to_string(); - let mut node_id = tg.node_id.to_string(); - if node_id == anonymous { - node_id = tg.name.clone() - } - let mut labels = BTreeMap::new(); - labels.insert(IC_NAME.into(), tg.ic_name.clone()); - labels.insert(IC_NODE.into(), node_id); - if let Some(subnet_id) = tg.subnet_id { - labels.insert(IC_SUBNET.into(), subnet_id.to_string()); - } - labels.insert(JOB.into(), job.to_string()); - labels.extend(tg.custom_labels.clone().into_iter()); + BTreeMap::from([ + (IC_NAME.into(), tg.ic_name.clone()), + ( + IC_NODE.into(), + if tg.node_id.to_string() == PrincipalId::new_anonymous().to_string() { + tg.name.clone() + } else { + tg.node_id.to_string() + }, + ), + (JOB.into(), job.to_string()), + ]) + .into_iter() + .chain(match tg.subnet_id { + Some(subnet_id) => vec![(IC_SUBNET.into(), subnet_id.to_string())], + None => vec![], + }) + .chain(tg.custom_labels.clone().into_iter()) + .collect() // TODO: Re-add the labels below once we resolve the issues with the public dashboard queries // https://dfinity.atlassian.net/browse/OB-442 // labels.insert(DC.into(), tg.dc_id.clone()); // labels.insert(NODE_PROVIDER_ID.into(), tg.node_provider_id.to_string()); // labels.insert(NODE_OPERATOR_ID.into(), tg.operator_id.to_string()); - labels }, }) } @@ -99,8 +82,7 @@ pub fn map_target_group(target_groups: BTreeSet) -> BTreeSet) -> String { - let new_configs: BTreeSet = map_target_group(target_groups); - + let new_configs: Vec = map_target_group(target_groups.into_iter().collect()); serde_json::to_string_pretty(&new_configs).unwrap() } } diff --git a/rs/ic-observability/multiservice-discovery-shared/src/builders/script_log_config_structure.rs b/rs/ic-observability/multiservice-discovery-shared/src/builders/script_log_config_structure.rs index ddf1801cd..38b6f609a 100644 --- a/rs/ic-observability/multiservice-discovery-shared/src/builders/script_log_config_structure.rs +++ b/rs/ic-observability/multiservice-discovery-shared/src/builders/script_log_config_structure.rs @@ -6,7 +6,7 @@ use serde::Serialize; use crate::contracts::target::TargetDto; use super::{ - log_vector_config_structure::{handle_ip, VectorRemapTransform}, + log_vector_config_structure::VectorRemapTransform, vector_config_enriched::{VectorConfigEnriched, VectorSource, VectorTransform}, ConfigBuilder, }; @@ -23,10 +23,7 @@ pub struct ScriptLogConfigBuilderImpl { } impl ConfigBuilder for ScriptLogConfigBuilderImpl { - fn build( - &self, - target_groups: std::collections::BTreeSet, - ) -> String { + fn build(&self, target_groups: std::collections::BTreeSet) -> String { let mut config = VectorConfigEnriched::new(); let mut edited_records: Vec = vec![]; @@ -63,7 +60,7 @@ impl ConfigBuilder for ScriptLogConfigBuilderImpl { "--url", format!( "http://[{}]:{}/entries", - handle_ip(record.clone(), job, is_bn), + job.ip(*record.targets.first().unwrap(), is_bn), match is_bn { true => self.bn_port, false => self.port, @@ -75,11 +72,7 @@ impl ConfigBuilder for ScriptLogConfigBuilderImpl { "--cursor-path", format!("{}/{}/checkpoint.txt", self.worker_cursor_folder, key).as_str(), "--expected-vector-cursor-path", - format!( - "{}/{}/checkpoint.txt", - self.data_folder, journald_source_key - ) - .as_str(), + format!("{}/{}/checkpoint.txt", self.data_folder, journald_source_key).as_str(), ] .into_iter() .map(|s| s.to_string()) @@ -96,22 +89,14 @@ impl ConfigBuilder for ScriptLogConfigBuilderImpl { journal_directory: format!("{}/{}", self.journals_folder, key), }; - let transform = VectorRemapTransform::from( - record.clone(), - *job, - journald_source_key.clone(), - is_bn, - ); + let transform = VectorRemapTransform::from(record.clone(), *job, journald_source_key.clone(), is_bn); let mut source_map = HashMap::new(); source_map.insert( format!("{}-script", key), Box::new(script_source) as Box, ); - source_map.insert( - journald_source_key, - Box::new(journald_source) as Box, - ); + source_map.insert(journald_source_key, Box::new(journald_source) as Box); let mut transform_map = HashMap::new(); transform_map.insert( diff --git a/rs/ic-observability/multiservice-discovery/src/definition.rs b/rs/ic-observability/multiservice-discovery/src/definition.rs index 9597d2338..e70e0dfa3 100644 --- a/rs/ic-observability/multiservice-discovery/src/definition.rs +++ b/rs/ic-observability/multiservice-discovery/src/definition.rs @@ -1,11 +1,8 @@ use crossbeam_channel::Receiver; use crossbeam_channel::Sender; use ic_registry_client::client::ThresholdSigPublicKey; -use service_discovery::job_types::map_jobs; use service_discovery::job_types::JobType; -use service_discovery::{ - job_types::JobAndPort, registry_sync::sync_local_registry, IcServiceDiscoveryImpl, -}; +use service_discovery::{registry_sync::sync_local_registry, IcServiceDiscoveryImpl}; use slog::{debug, info, warn, Logger}; use std::collections::BTreeMap; use std::collections::BTreeSet; @@ -44,8 +41,7 @@ impl Definition { registry_query_timeout: Duration, stop_signal_sender: Sender<()>, ) -> Self { - let global_registry_path = - std::fs::canonicalize(global_registry_path).expect("Invalid global registry path"); + let global_registry_path = std::fs::canonicalize(global_registry_path).expect("Invalid global registry path"); let registry_path = global_registry_path.join(name.clone()); if std::fs::metadata(®istry_path).is_err() { std::fs::create_dir_all(registry_path.clone()).unwrap(); @@ -60,26 +56,14 @@ impl Definition { stop_signal, registry_query_timeout, stop_signal_sender, - ic_discovery: Arc::new( - IcServiceDiscoveryImpl::new( - log, - registry_path, - registry_query_timeout, - map_jobs(&JobAndPort::all()), - ) - .unwrap(), - ), + ic_discovery: Arc::new(IcServiceDiscoveryImpl::new(log, registry_path, registry_query_timeout).unwrap()), boundary_nodes: vec![], } } async fn initial_registry_sync(&self) { info!(self.log, "Syncing local registry for {} started", self.name); - info!( - self.log, - "Using local registry path: {}", - self.registry_path.display() - ); + info!(self.log, "Using local registry path: {}", self.registry_path.display()); sync_local_registry( self.log.clone(), @@ -90,10 +74,7 @@ impl Definition { ) .await; - info!( - self.log, - "Syncing local registry for {} completed", self.name - ); + info!(self.log, "Syncing local registry for {} completed", self.name); } async fn poll_loop(&mut self) { @@ -107,10 +88,7 @@ impl Definition { if let Err(e) = self.ic_discovery.load_new_ics(self.log.clone()) { warn!( self.log, - "Failed to load new scraping targets for {} @ interval {:?}: {:?}", - self.name, - tick, - e + "Failed to load new scraping targets for {} @ interval {:?}: {:?}", self.name, tick, e ); } debug!(self.log, "Update registries for {}", self.name); @@ -134,10 +112,7 @@ impl Definition { async fn run(&mut self) { self.initial_registry_sync().await; - info!( - self.log, - "Starting to watch for changes for definition {}", self.name - ); + info!(self.log, "Starting to watch for changes for definition {}", self.name); self.poll_loop().await; diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs index db4b08ee1..622c9cac5 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_boundary_node_to_definition_handler.rs @@ -21,10 +21,7 @@ pub async fn add_boundary_node( ) -> WebResult { let mut definitions = binding.definitions.lock().await; - let definition = match definitions - .iter_mut() - .find(|d| d.name == boundary_node.ic_name) - { + let definition = match definitions.iter_mut().find(|d| d.name == boundary_node.ic_name) { Some(def) => def, None => { return Ok(warp::reply::with_status( @@ -46,15 +43,25 @@ pub async fn add_boundary_node( }; let job_type = match JobType::from_str(&boundary_node.job_type) { - Ok(jt) => jt, Err(e) => { + // We don't have this job type here. return Ok(warp::reply::with_status( - format!( - "Job type {} is not supported: {}", - boundary_node.job_type, e - ), + format!("Job type {} is not known: {}", boundary_node.job_type, e), warp::http::StatusCode::BAD_REQUEST, - )) + )); + } + Ok(jt) => { + // Forbid addition of any job type not known to be supported by boundary nodes. + if !JobType::all_for_boundary_nodes().contains(&jt) { + return Ok(warp::reply::with_status( + format!( + "Job type {} is not supported for boundary nodes.", + boundary_node.job_type + ), + warp::http::StatusCode::BAD_REQUEST, + )); + } + jt } }; @@ -66,7 +73,7 @@ pub async fn add_boundary_node( }); Ok(warp::reply::with_status( - "success".to_string(), - warp::http::StatusCode::OK, + "".to_string(), + warp::http::StatusCode::CREATED, )) } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs index b7b42dd8d..7ba3ad798 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_prometheus_config_handler.rs @@ -1,124 +1,95 @@ -use std::{collections::BTreeMap, sync::Arc}; - +use super::WebResult; +use crate::definition::Definition; +use multiservice_discovery_shared::builders::prometheus_config_structure::{map_target_group, PrometheusStaticConfig}; +use multiservice_discovery_shared::contracts::target::{map_to_target_dto, TargetDto}; use service_discovery::{ job_types::{JobType, NodeOS}, - jobs::Job, IcServiceDiscovery, }; use slog::Logger; +use std::{collections::BTreeMap, sync::Arc}; use tokio::sync::Mutex; use warp::reply::Reply; -use crate::definition::Definition; -use multiservice_discovery_shared::{ - builders::prometheus_config_structure::{map_target_group, PrometheusStaticConfig}, - contracts::target::TargetDto, -}; - -use super::WebResult; - pub struct ExportDefinitionConfigBinding { pub definitions: Arc>>, pub log: Logger, } -pub async fn export_prometheus_config( - binding: ExportDefinitionConfigBinding, -) -> WebResult { +pub async fn export_prometheus_config(binding: ExportDefinitionConfigBinding) -> WebResult { let definitions = binding.definitions.lock().await; - let all_jobs = [ - JobType::Replica, - JobType::Orchestrator, - JobType::NodeExporter(NodeOS::Guest), - JobType::NodeExporter(NodeOS::Host), - JobType::MetricsProxy, - ]; - - let mut total_targets: Vec = vec![]; + let mut ic_node_targets: Vec = vec![]; for def in definitions.iter() { - for job_type in all_jobs { - let targets = match def - .ic_discovery - .get_target_groups(job_type, binding.log.clone()) - { + for job_type in JobType::all_for_ic_nodes() { + let targets = match def.ic_discovery.get_target_groups(job_type, binding.log.clone()) { Ok(targets) => targets, Err(_) => continue, }; - for target in targets { - if let Some(entry) = total_targets - .iter_mut() - .find(|t| t.node_id == target.node_id) - { - entry.jobs.push(job_type); + targets.iter().for_each(|target_group| { + if let Some(target) = ic_node_targets.iter_mut().find(|t| t.node_id == target_group.node_id) { + target.jobs.push(job_type); } else { - let mut mapped = Into::::into(&target); - mapped.ic_name = def.name.clone(); - total_targets.push(TargetDto { - jobs: vec![job_type], - ..mapped - }); + ic_node_targets.push(map_to_target_dto( + target_group, + job_type, + BTreeMap::new(), + target_group.node_id.to_string(), + def.name.clone(), + )); } - } + }); } } - let mut total_set = map_target_group(total_targets.into_iter().collect()); - - definitions.iter().for_each(|def| { - def.boundary_nodes.iter().for_each(|bn| { - // Boundary nodes do not get the metrics-proxy installed. - if bn.job_type == JobType::MetricsProxy { - return; - } - - // If this boundary node is under the test environment, - // and the job is Node Exporter, then skip adding this - // target altogether. - if bn - .custom_labels - .iter() - .any(|(k, v)| k.as_str() == "env" && v.as_str() == "test") - && bn.job_type == JobType::NodeExporter(NodeOS::Host) - { - return; - } - - let binding = Job::all(); - let job = binding.iter().find(|j| j._type == bn.job_type).unwrap(); - - total_set.insert(PrometheusStaticConfig { - targets: bn - .targets - .clone() + let ic_node_targets: Vec = map_target_group(ic_node_targets.into_iter().collect()); + + let boundary_nodes_targets = definitions + .iter() + .flat_map(|def| { + def.boundary_nodes.iter().filter_map(|bn| { + // Since boundary nodes have been checked for correct job + // type when they were added via POST, then we can trust + // the correct job type is at play here. + // If, however, this boundary node is under the test environment, + // and the job is Node Exporter, then skip adding this + // target altogether. + if bn + .custom_labels .iter() - .map(|g| { - let mut g = *g; - g.set_port(job.port); - format!("http://{}/{}", g, job.endpoint.trim_start_matches('/'),) - }) - .collect(), - labels: { - let mut labels = BTreeMap::new(); - labels.insert("ic".to_string(), def.name.clone()); - labels.insert("name".to_string(), bn.name.clone()); - labels.extend(bn.custom_labels.clone()); - labels.insert("job".to_string(), bn.job_type.to_string()); - labels - }, - }); + .any(|(k, v)| k.as_str() == "env" && v.as_str() == "test") + && bn.job_type == JobType::NodeExporter(NodeOS::Host) + { + return None; + } + Some(PrometheusStaticConfig { + targets: bn.targets.clone().iter().map(|g| bn.job_type.url(*g, true)).collect(), + labels: { + BTreeMap::from([ + ("ic", def.name.clone()), + ("name", bn.name.clone()), + ("job", bn.job_type.to_string()), + ]) + .into_iter() + .map(|(k, v)| (k.to_string(), v)) + .chain(bn.custom_labels.clone()) + .collect::>() + }, + }) + }) }) - }); - - let prom_config = serde_json::to_string_pretty(&total_set).unwrap(); - - let status_code = if !total_set.is_empty() { - warp::http::StatusCode::OK - } else { - warp::http::StatusCode::NOT_FOUND - }; - - Ok(warp::reply::with_status(prom_config, status_code)) + .collect(); + + let total_targets = [ic_node_targets, boundary_nodes_targets].concat(); + + Ok(warp::reply::with_status( + serde_json::to_string_pretty(&total_targets).unwrap(), + if !total_targets.is_empty() { + warp::http::StatusCode::OK + } else { + warp::http::StatusCode::NOT_FOUND + }, + )) } diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs index 18a2688cf..44e017cb2 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/export_targets_handler.rs @@ -1,18 +1,15 @@ -use std::{collections::BTreeMap, sync::Arc}; - +use super::WebResult; +use crate::definition::Definition; use ic_types::{NodeId, PrincipalId}; +use multiservice_discovery_shared::contracts::target::{map_to_target_dto, TargetDto}; use service_discovery::{ job_types::{JobType, NodeOS}, IcServiceDiscovery, }; use slog::Logger; -use warp::reply::Reply; - -use crate::definition::Definition; - -use super::WebResult; -use multiservice_discovery_shared::contracts::target::{map_to_target_dto, TargetDto}; +use std::{collections::BTreeMap, sync::Arc}; use tokio::sync::Mutex; +use warp::reply::Reply; pub struct ExportTargetsBinding { pub definitions: Arc>>, @@ -22,34 +19,20 @@ pub struct ExportTargetsBinding { pub async fn export_targets(binding: ExportTargetsBinding) -> WebResult { let definitions = binding.definitions.lock().await; - let all_jobs = [ - JobType::Replica, - JobType::Orchestrator, - JobType::NodeExporter(NodeOS::Guest), - JobType::NodeExporter(NodeOS::Host), - JobType::MetricsProxy, - ]; - - let mut total_targets: Vec = vec![]; + let mut ic_node_targets: Vec = vec![]; for def in definitions.iter() { - for job_type in all_jobs { - let targets = match def - .ic_discovery - .get_target_groups(job_type, binding.log.clone()) - { + for job_type in JobType::all_for_ic_nodes() { + let targets = match def.ic_discovery.get_target_groups(job_type, binding.log.clone()) { Ok(targets) => targets, Err(_) => continue, }; targets.iter().for_each(|target_group| { - if let Some(target) = total_targets - .iter_mut() - .find(|t| t.node_id == target_group.node_id) - { + if let Some(target) = ic_node_targets.iter_mut().find(|t| t.node_id == target_group.node_id) { target.jobs.push(job_type); } else { - total_targets.push(map_to_target_dto( + ic_node_targets.push(map_to_target_dto( target_group, job_type, BTreeMap::new(), @@ -59,35 +42,50 @@ pub async fn export_targets(binding: ExportTargetsBinding) -> WebResult>(); + + let total_targets = [ic_node_targets, boundary_nodes_targets].concat(); Ok(warp::reply::with_status( - prom_config, - warp::http::StatusCode::OK, + serde_json::to_string_pretty(&total_targets).unwrap(), + if !total_targets.is_empty() { + warp::http::StatusCode::OK + } else { + warp::http::StatusCode::NOT_FOUND + }, )) } diff --git a/rs/ic-observability/node-status-updater/src/main.rs b/rs/ic-observability/node-status-updater/src/main.rs index 59a83780f..f694927ea 100644 --- a/rs/ic-observability/node-status-updater/src/main.rs +++ b/rs/ic-observability/node-status-updater/src/main.rs @@ -8,8 +8,7 @@ use ic_metrics::MetricsRegistry; use obs_canister_clients::node_status_canister_client::NodeStatusCanister; use prometheus_http_query::Client; use service_discovery::{ - job_types::JobType, metrics::Metrics, poll_loop::make_poll_loop, - registry_sync::sync_local_registry, IcServiceDiscoveryImpl, + metrics::Metrics, poll_loop::make_poll_loop, registry_sync::sync_local_registry, IcServiceDiscoveryImpl, }; use slog::{info, o, Drain}; use std::{path::PathBuf, sync::Arc, time::Duration}; @@ -47,10 +46,6 @@ fn main() -> Result<()> { log.clone(), cli_args.targets_dir, cli_args.registry_query_timeout, - [(JobType::Replica, 9090)] - .iter() - .map(|(j, p)| (*j, *p)) - .collect(), )?); let (stop_signal_sender, stop_signal_rcv) = crossbeam::channel::bounded::<()>(0); @@ -66,10 +61,7 @@ fn main() -> Result<()> { 1, ); - info!( - log, - "Spawning scraping thread. Interval: {:?}", cli_args.poll_interval - ); + info!(log, "Spawning scraping thread. Interval: {:?}", cli_args.poll_interval); let join_handle = std::thread::spawn(poll_loop); handles.push(join_handle); diff --git a/rs/ic-observability/prometheus-config-updater/src/main.rs b/rs/ic-observability/prometheus-config-updater/src/main.rs index 010be4b2d..821ab0a44 100644 --- a/rs/ic-observability/prometheus-config-updater/src/main.rs +++ b/rs/ic-observability/prometheus-config-updater/src/main.rs @@ -14,8 +14,7 @@ use ic_crypto_utils_threshold_sig_der::parse_threshold_sig_key_from_der; use ic_http_endpoints_metrics::MetricsHttpEndpoint; use ic_metrics::MetricsRegistry; use regex::Regex; -use service_discovery::job_types::{map_jobs, JobAndPort, JobType, NodeOS}; -use service_discovery::jobs::Job; +use service_discovery::job_types::{JobType, NodeOS}; use service_discovery::registry_sync::sync_local_registry; use service_discovery::{metrics::Metrics, poll_loop::make_poll_loop, IcServiceDiscoveryImpl}; use slog::{info, o, Drain, Logger}; @@ -65,14 +64,11 @@ fn main() -> Result<()> { public_key, )); - let jobs = map_jobs(&JobAndPort::all()); - info!(log, "Starting IcServiceDiscovery ..."); let ic_discovery = Arc::new(IcServiceDiscoveryImpl::new( log.clone(), cli_args.targets_dir, cli_args.registry_query_timeout, - jobs, )?); let metrics = Metrics::new(metrics_registry.clone()); @@ -108,9 +104,9 @@ fn main() -> Result<()> { // We need to filter old nodes for host node exporters, but not for everything else // To do that, we will create 2 separate updated nodes, with different filters for them let jobs = vec![ - Job::from(JobType::NodeExporter(NodeOS::Guest)), - Job::from(JobType::Orchestrator), - Job::from(JobType::Replica), + JobType::NodeExporter(NodeOS::Guest), + JobType::Orchestrator, + JobType::Replica, ]; let filters = Arc::new(TargetGroupFilterList::new(filters_vec)); @@ -134,7 +130,7 @@ fn main() -> Result<()> { filters_vec.push(Box::new(NodeIDRegexFilter::new(filter_node_id_regex.clone()))); }; // Second loop, with the old machines filter - let jobs = vec![Job::from(JobType::NodeExporter(NodeOS::Host))]; + let jobs = vec![JobType::NodeExporter(NodeOS::Host)]; filters_vec.push(Box::new(OldMachinesFilter {})); diff --git a/rs/ic-observability/prometheus-config-updater/src/prometheus_config.rs b/rs/ic-observability/prometheus-config-updater/src/prometheus_config.rs index 5a093d664..fc82a32cb 100644 --- a/rs/ic-observability/prometheus-config-updater/src/prometheus_config.rs +++ b/rs/ic-observability/prometheus-config-updater/src/prometheus_config.rs @@ -6,7 +6,7 @@ use config_writer_common::{ }; use serde::{Serialize, Serializer}; use service_discovery::job_types::JobType; -use service_discovery::{jobs::Job, TargetGroup}; +use service_discovery::TargetGroup; #[derive(Serialize, Debug, Clone, PartialEq, PartialOrd, Ord, Eq)] pub struct PrometheusStaticConfig { @@ -60,34 +60,40 @@ impl PrometheusConfigBuilder { } } -fn get_endpoints(target_group: TargetGroup, _job: Job) -> BTreeSet { +fn get_endpoints(target_group: TargetGroup, job_type: JobType, is_boundary_node: bool) -> BTreeSet { target_group .targets .into_iter() - .map(|g| g.to_string()) + .map(|g| job_type.sockaddr(g, is_boundary_node).to_string()) .collect() } impl ConfigBuilder for PrometheusConfigBuilder { - fn build(&mut self, target_groups: BTreeSet, job: Job) -> Box { + fn build(&mut self, target_groups: BTreeSet, job_type: JobType) -> Box { let new_configs: BTreeSet = target_groups .into_iter() .map(|tg| PrometheusStaticConfig { - targets: get_endpoints(tg.clone(), job.clone()), + targets: get_endpoints(tg.clone(), job_type, false), labels: { - let mut labels = BTreeMap::new(); - labels.insert(labels_keys::IC_NAME.into(), tg.ic_name); - labels.insert(labels_keys::IC_NODE.into(), tg.node_id.to_string()); - if let Some(subnet_id) = tg.subnet_id { - labels.insert(labels_keys::IC_SUBNET.into(), subnet_id.to_string()); - } - labels.insert(labels_keys::JOB.into(), job._type.to_string()); - labels + BTreeMap::from([ + (labels_keys::IC_NAME, tg.ic_name), + (labels_keys::IC_NODE, tg.node_id.to_string()), + (labels_keys::JOB, job_type.to_string()), + ]) + .into_iter() + .map(|k| (k.0.to_string(), k.1)) + .chain(match tg.subnet_id { + Some(subnet_id) => { + BTreeMap::from([(labels_keys::IC_SUBNET.to_string(), subnet_id.to_string())]) + } + None => BTreeMap::new(), + }) + .collect::>() }, }) .collect(); - let updated = match self.get_old_config(job._type) { + let updated = match self.get_old_config(job_type) { None => true, Some(config) if config.configs == new_configs => false, Some(_) => true, @@ -95,12 +101,12 @@ impl ConfigBuilder for PrometheusConfigBuilder { let new_file_config = PrometheusFileSdConfig { configs: new_configs, - job: job._type, + job: job_type, updated, }; if updated { - self.set_old_config(job._type, new_file_config.clone()); + self.set_old_config(job_type, new_file_config.clone()); } Box::new(new_file_config) @@ -118,15 +124,12 @@ mod prometheus_serialize { use crate::prometheus_config::PrometheusConfigBuilder; use config_writer_common::config_builder::ConfigBuilder; - use service_discovery::jobs::Job; use super::get_endpoints; fn create_dummy_target_group(ipv6: &str, with_subnet_id: bool) -> TargetGroup { let mut targets = BTreeSet::new(); - targets.insert(std::net::SocketAddr::V6( - SocketAddrV6::from_str(ipv6).unwrap(), - )); + targets.insert(std::net::SocketAddr::V6(SocketAddrV6::from_str(ipv6).unwrap())); let subnet_id = match with_subnet_id { true => Some(SubnetId::from(PrincipalId::new_anonymous())), false => None, @@ -153,13 +156,13 @@ mod prometheus_serialize { let tg2 = create_dummy_target_group("[2a02:800:2:2003:6801:f6ff:fec4:4c87]:9091", false); target_groups.insert(tg2.clone()); - let config = cb.build(target_groups, Job::from(JobType::Replica)); + let config = cb.build(target_groups, JobType::Replica); let expected_config = json!( [ { "targets": [ - "[2a02:800:2:2003:6801:f6ff:fec4:4c86]:9091" + "[2a02:800:2:2003:6801:f6ff:fec4:4c86]:9090" ], "labels": { "ic": tg1.ic_name, @@ -170,7 +173,7 @@ mod prometheus_serialize { }, { "targets": [ - "[2a02:800:2:2003:6801:f6ff:fec4:4c87]:9091" + "[2a02:800:2:2003:6801:f6ff:fec4:4c87]:9090" ], "labels": { "ic": tg2.ic_name, @@ -191,26 +194,26 @@ mod prometheus_serialize { let tg1 = create_dummy_target_group("[2a02:800:2:2003:6801:f6ff:fec4:4c86]:9091", true); target_groups.insert(tg1); - let config = cb.build(target_groups.clone(), Job::from(JobType::Replica)); + let config = cb.build(target_groups.clone(), JobType::Replica); assert!(config.updated()); - let config = cb.build(target_groups.clone(), Job::from(JobType::Replica)); + let config = cb.build(target_groups.clone(), JobType::Replica); assert!(!config.updated()); let tg2 = create_dummy_target_group("[2a02:800:2:2003:6801:f6ff:fec4:4c87]:9091", true); target_groups.insert(tg2); - let config = cb.build(target_groups.clone(), Job::from(JobType::Replica)); + let config = cb.build(target_groups.clone(), JobType::Replica); assert!(config.updated()); } #[test] fn test_get_endpoints() { - let target_group = - create_dummy_target_group("[2a02:800:2:2003:6801:f6ff:fec4:4c87]:9091", true); - let endpoints = get_endpoints(target_group, Job::from(JobType::Replica)); + // Whatever the port supplied, the get_endpoints() function should substitute with the correct port for the service type. + let target_group = create_dummy_target_group("[2a02:800:2:2003:6801:f6ff:fec4:4c87]:9091", true); + let endpoints = get_endpoints(target_group, JobType::Replica, false); let mut expected_endpoints = BTreeSet::new(); - expected_endpoints.insert("[2a02:800:2:2003:6801:f6ff:fec4:4c87]:9091".to_string()); + expected_endpoints.insert("[2a02:800:2:2003:6801:f6ff:fec4:4c87]:9090".to_string()); assert_eq!(endpoints, expected_endpoints) } diff --git a/rs/ic-observability/prometheus-config-updater/src/prometheus_updater.rs b/rs/ic-observability/prometheus-config-updater/src/prometheus_updater.rs index 807d7785f..ade4bd05e 100644 --- a/rs/ic-observability/prometheus-config-updater/src/prometheus_updater.rs +++ b/rs/ic-observability/prometheus-config-updater/src/prometheus_updater.rs @@ -21,11 +21,7 @@ pub struct VectorConfigBuilderImpl { } impl VectorConfigBuilderImpl { - pub fn new( - proxy_url: Option, - scrape_interval: u64, - jobs_parameters: HashMap, - ) -> Self { + pub fn new(proxy_url: Option, scrape_interval: u64, jobs_parameters: HashMap) -> Self { Self { proxy_url, scrape_interval, @@ -33,11 +29,7 @@ impl VectorConfigBuilderImpl { } } - fn add_target_groups_with_job( - &self, - targets: BTreeSet, - job: JobType, - ) -> VectorConfigEnriched { + fn add_target_groups_with_job(&self, targets: BTreeSet, job: JobType) -> VectorConfigEnriched { let mut config = VectorConfigEnriched::new(); for target in targets { let key = target @@ -54,8 +46,7 @@ impl VectorConfigBuilderImpl { self.scrape_interval, self.proxy_url.as_ref().cloned(), ); - let transform = - VectorPrometheusScrapeTransform::from_target_group_with_job(target, &job); + let transform = VectorPrometheusScrapeTransform::from_target_group_with_job(target, &job); config.add_target_group(key, Box::new(source), Box::new(transform)) } config @@ -151,13 +142,17 @@ impl VectorTransform for VectorPrometheusScrapeTransform { impl VectorPrometheusScrapeTransform { fn from_target_group_with_job(tg: TargetGroup, job: &JobType) -> Self { - let mut labels: HashMap = HashMap::new(); - labels.insert(IC_NAME.into(), tg.ic_name); - labels.insert(IC_NODE.into(), tg.node_id.to_string()); - if let Some(subnet_id) = tg.subnet_id { - labels.insert(IC_SUBNET.into(), subnet_id.to_string()); - } - labels.insert("job".into(), job.to_string()); + let labels = HashMap::from([ + (IC_NAME.into(), tg.ic_name), + (IC_NODE.into(), tg.node_id.to_string()), + ("job".into(), job.to_string()), + ]) + .into_iter() + .chain(match tg.subnet_id { + Some(subnet_id) => vec![(IC_SUBNET.into(), subnet_id.to_string())], + None => vec![], + }) + .collect(); Self { _type: "remap".into(), inputs: tg @@ -202,24 +197,16 @@ mod tests { let sources_key = String::from(original_addr) + "-source"; let mut targets = BTreeSet::new(); - targets.insert(SocketAddr::V6( - SocketAddrV6::from_str(original_addr).unwrap(), - )); + targets.insert(SocketAddr::V6(SocketAddrV6::from_str(original_addr).unwrap())); let ptg = TargetGroup { node_id: NodeId::from( - PrincipalId::from_str( - "iylgr-zpxwq-kqgmf-4srtx-o4eey-d6bln-smmq6-we7px-ibdea-nondy-eae", - ) - .unwrap(), + PrincipalId::from_str("iylgr-zpxwq-kqgmf-4srtx-o4eey-d6bln-smmq6-we7px-ibdea-nondy-eae").unwrap(), ), ic_name: "mercury".into(), targets, subnet_id: Some(SubnetId::from( - PrincipalId::from_str( - "x33ed-h457x-bsgyx-oqxqf-6pzwv-wkhzr-rm2j3-npodi-purzm-n66cg-gae", - ) - .unwrap(), + PrincipalId::from_str("x33ed-h457x-bsgyx-oqxqf-6pzwv-wkhzr-rm2j3-npodi-purzm-n66cg-gae").unwrap(), )), dc_id: None, operator_id: None, @@ -236,10 +223,7 @@ mod tests { let sources_config_endpoint = binding.get(&sources_key); if let Some(conf) = sources_config_endpoint { - let downcast = conf - .as_any() - .downcast_ref::() - .unwrap(); + let downcast = conf.as_any().downcast_ref::().unwrap(); assert_eq!( downcast.endpoints[0], url::Url::parse(&("http://".to_owned() + original_addr)) diff --git a/rs/ic-observability/service-discovery/src/job_types.rs b/rs/ic-observability/service-discovery/src/job_types.rs index 952e87c43..0bc24582e 100644 --- a/rs/ic-observability/service-discovery/src/job_types.rs +++ b/rs/ic-observability/service-discovery/src/job_types.rs @@ -1,6 +1,6 @@ -use std::{collections::HashMap, fmt, str::FromStr}; - use serde::{Deserialize, Serialize}; +use std::net::{IpAddr, Ipv6Addr, SocketAddr}; +use std::{fmt, str::FromStr}; #[derive(PartialEq, Eq, Hash, Clone, Copy, Debug, PartialOrd, Ord, Serialize, Deserialize)] pub enum NodeOS { @@ -13,7 +13,24 @@ pub enum JobType { Replica, NodeExporter(NodeOS), Orchestrator, - MetricsProxy, + MetricsProxy(NodeOS), +} + +/// By convention, the first two bytes of the host-part of the replica's IP +/// address are 0x6801. The corresponding segment for the host is 0x6800. +/// +/// (The MAC starts with 0x6a00. The 7'th bit of the first byte is flipped. See +/// https://en.wikipedia.org/wiki/MAC_address) +fn guest_to_host_address(sockaddr: SocketAddr) -> SocketAddr { + match sockaddr.ip() { + IpAddr::V6(a) if a.segments()[4] == 0x6801 => { + let s = a.segments(); + let new_addr = Ipv6Addr::new(s[0], s[1], s[2], s[3], 0x6800, s[5], s[6], s[7]); + let ip = IpAddr::V6(new_addr); + SocketAddr::new(ip, sockaddr.port()) + } + _ip => sockaddr, + } } // The type of discovered job. @@ -24,27 +41,95 @@ impl JobType { Self::NodeExporter(NodeOS::Host) => 9100, Self::NodeExporter(NodeOS::Guest) => 9100, Self::Orchestrator => 9091, - Self::MetricsProxy => 19100, + Self::MetricsProxy(NodeOS::Host) => 19100, + Self::MetricsProxy(NodeOS::Guest) => 19100, } } pub fn endpoint(&self) -> &'static str { match self { Self::Replica => "/", - Self::NodeExporter(NodeOS::Host) => "/metrics", - Self::NodeExporter(NodeOS::Guest) => "/metrics", + Self::NodeExporter(_) => "/metrics", Self::Orchestrator => "/", - Self::MetricsProxy => "/metrics", + Self::MetricsProxy(_) => "/metrics", } } pub fn scheme(&self) -> &'static str { match self { Self::Replica => "http", - Self::NodeExporter(NodeOS::Host) => "https", - Self::NodeExporter(NodeOS::Guest) => "https", + Self::NodeExporter(_) => "https", Self::Orchestrator => "http", - Self::MetricsProxy => "https", + Self::MetricsProxy(_) => "https", + } + } + + // Return the socket address with the correct port and IP address. + // Any non-guest IP address is returned unchanged. Any guest IP + // address that needs changing to host is returned with host IP. + // Boundary nodes are correctly handled. + // FIXME: make me private! + pub fn sockaddr(&self, s: SocketAddr, is_boundary_node: bool) -> SocketAddr { + let mut ss = s; + ss.set_port(self.port()); + if *self == Self::NodeExporter(NodeOS::Host) { + guest_to_host_address(ss) + } else if *self == Self::MetricsProxy(NodeOS::Host) { + match is_boundary_node { + // This is a boundary node IP. Return it unchanged. + true => ss, + // Change GuestOS IP to HostOS IP. + false => guest_to_host_address(ss), + } + } else { + ss } } + + pub fn ip(&self, s: SocketAddr, is_boundary_node: bool) -> IpAddr { + self.sockaddr(s, is_boundary_node).ip() + } + + pub fn url(&self, s: SocketAddr, is_boundary_node: bool) -> String { + format!( + "{}://{}/{}", + self.scheme(), + self.sockaddr(s, is_boundary_node), + self.endpoint().trim_start_matches('/'), + ) + } +} + +/// This is duplicated in impl Job. +impl JobType { + pub fn all_for_ic_nodes() -> Vec { + [ + JobType::Replica, + JobType::Orchestrator, + JobType::NodeExporter(NodeOS::Guest), + JobType::NodeExporter(NodeOS::Host), + JobType::MetricsProxy(NodeOS::Host), + JobType::MetricsProxy(NodeOS::Guest), + ] + .into_iter() + .collect::>() + } + + pub fn all_for_boundary_nodes() -> Vec { + [ + JobType::NodeExporter(NodeOS::Guest), + JobType::NodeExporter(NodeOS::Host), + ] + .into_iter() + .collect::>() + } + + pub fn all_for_logs() -> Vec { + [ + JobType::NodeExporter(NodeOS::Guest), + JobType::NodeExporter(NodeOS::Host), + ] + .into_iter() + .collect::>() + } } #[derive(Debug)] @@ -64,14 +149,15 @@ impl FromStr for JobType { fn from_str(s: &str) -> Result { match s { + // When a new job type is added, please do not forget to + // update its antipode method at fmt() below. "replica" => Ok(JobType::Replica), "node_exporter" => Ok(JobType::NodeExporter(NodeOS::Guest)), "host_node_exporter" => Ok(JobType::NodeExporter(NodeOS::Host)), "orchestrator" => Ok(JobType::Orchestrator), - "metrics-proxy" => Ok(JobType::MetricsProxy), - _ => Err(JobTypeParseError { - input: s.to_string(), - }), + "host_metrics_proxy" => Ok(JobType::MetricsProxy(NodeOS::Host)), + "guest_metrics_proxy" => Ok(JobType::MetricsProxy(NodeOS::Guest)), + _ => Err(JobTypeParseError { input: s.to_string() }), } } } @@ -94,69 +180,8 @@ impl fmt::Display for JobType { JobType::NodeExporter(NodeOS::Guest) => write!(f, "node_exporter"), JobType::NodeExporter(NodeOS::Host) => write!(f, "host_node_exporter"), JobType::Orchestrator => write!(f, "orchestrator"), - JobType::MetricsProxy => write!(f, "metrics-proxy"), + JobType::MetricsProxy(NodeOS::Host) => write!(f, "host_metrics_proxy"), + JobType::MetricsProxy(NodeOS::Guest) => write!(f, "guest_metrics_proxy"), } } } - -#[derive(Clone)] -pub struct JobAndPort { - pub job_type: JobType, - pub port: u16, -} - -impl FromStr for JobAndPort { - type Err = JobTypeParseError; - - fn from_str(s: &str) -> Result { - let elements = s.split(':').collect::>(); - - Ok(JobAndPort { - job_type: elements.first().unwrap().to_string().into(), - port: elements.get(1).unwrap().parse().unwrap(), - }) - } -} - -/// This is duplicated in impl Job. -impl JobAndPort { - pub fn all() -> Vec { - [ - JobAndPort { - job_type: JobType::Replica, - port: JobType::Replica.port(), - }, - JobAndPort { - job_type: JobType::Orchestrator, - port: JobType::Orchestrator.port(), - }, - JobAndPort { - job_type: JobType::NodeExporter(NodeOS::Guest), - port: JobType::NodeExporter(NodeOS::Guest).port(), - }, - JobAndPort { - job_type: JobType::NodeExporter(NodeOS::Host), - port: JobType::NodeExporter(NodeOS::Host).port(), - }, - JobAndPort { - job_type: JobType::MetricsProxy, - port: JobType::MetricsProxy.port(), - }, - ] - .into_iter() - .collect::>() - } -} - -impl fmt::Debug for JobAndPort { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "<{}, {}>", self.job_type, self.port) - } -} - -pub fn map_jobs(jobs_and_ports: &[JobAndPort]) -> HashMap { - jobs_and_ports - .iter() - .map(|job| (job.job_type, job.port)) - .collect() -} diff --git a/rs/ic-observability/service-discovery/src/jobs.rs b/rs/ic-observability/service-discovery/src/jobs.rs deleted file mode 100644 index 0c87303ef..000000000 --- a/rs/ic-observability/service-discovery/src/jobs.rs +++ /dev/null @@ -1,34 +0,0 @@ -use crate::job_types::JobType; -use crate::job_types::NodeOS; - -#[derive(Clone)] -pub struct Job { - pub _type: JobType, - pub port: u16, - pub endpoint: &'static str, - pub scheme: &'static str, -} - -impl From for Job { - fn from(value: JobType) -> Self { - Job { - _type: value, - port: value.port(), - endpoint: value.endpoint(), - scheme: value.scheme(), - } - } -} - -/// This is duplicated in impl JobAndPort. -impl Job { - pub fn all() -> Vec { - vec![ - Job::from(JobType::NodeExporter(NodeOS::Guest)), - Job::from(JobType::NodeExporter(NodeOS::Host)), - Job::from(JobType::Orchestrator), - Job::from(JobType::Replica), - Job::from(JobType::MetricsProxy), - ] - } -} diff --git a/rs/ic-observability/service-discovery/src/lib.rs b/rs/ic-observability/service-discovery/src/lib.rs index 3fa3506c4..aca2cfe0f 100644 --- a/rs/ic-observability/service-discovery/src/lib.rs +++ b/rs/ic-observability/service-discovery/src/lib.rs @@ -7,9 +7,9 @@ //! #![allow(clippy::await_holding_lock, clippy::result_large_err)] use std::{ - collections::{btree_map::Entry, BTreeMap, BTreeSet, HashMap}, + collections::{btree_map::Entry, BTreeMap, BTreeSet}, convert::TryFrom, - net::{IpAddr, Ipv6Addr, SocketAddr}, + net::SocketAddr, path::{Path, PathBuf}, sync::{Arc, RwLock}, time::Duration, @@ -25,7 +25,7 @@ use ic_registry_client_helpers::{ }; use ic_registry_local_registry::{LocalRegistry, LocalRegistryError}; use ic_types::{registry::RegistryClientError, NodeId, PrincipalId, RegistryVersion, SubnetId}; -use job_types::{JobType, NodeOS}; +use job_types::JobType; use regex::Regex; use serde::{Deserialize, Serialize}; use slog::{warn, Logger}; @@ -33,7 +33,6 @@ use thiserror::Error; pub mod file_sd; pub mod job_types; -pub mod jobs; pub mod mainnet_registry; pub mod metrics; pub mod poll_loop; @@ -104,8 +103,6 @@ pub struct IcServiceDiscoveryImpl { /// An in-memory representation of the registries that is updated when /// calling `load_new_scraping_targets`. registries: Arc>>, - - jobs: HashMap, } impl IcServiceDiscoveryImpl { @@ -116,7 +113,6 @@ impl IcServiceDiscoveryImpl { log: Logger, ic_scraping_targets_dir: P, registry_query_timeout: Duration, - jobs: HashMap, ) -> Result { let ic_scraping_targets_dir = PathBuf::from(ic_scraping_targets_dir.as_ref()); if !ic_scraping_targets_dir.is_dir() { @@ -129,7 +125,6 @@ impl IcServiceDiscoveryImpl { ic_scraping_targets_dir, registry_query_timeout, registries, - jobs, }; self_.load_new_ics(log)?; Ok(self_) @@ -224,9 +219,7 @@ impl IcServiceDiscoveryImpl { Err(e) => { warn!( log, - "Error while fetching get_subnet_transport_info for node id {}: {:?}", - subnet_id, - e + "Error while fetching get_subnet_transport_info for node id {}: {:?}", subnet_id, e ); continue; } @@ -285,8 +278,7 @@ impl IcServiceDiscoveryImpl { subnet_id: Option, ic_name: &str, ) -> Result<(), IcServiceDiscoveryError> { - let socket_addr = - Self::node_record_to_target_addr(node_id, latest_version, node_record.clone())?; + let socket_addr = Self::node_record_to_target_addr(node_id, latest_version, node_record.clone())?; let operator_id = PrincipalId::try_from(node_record.node_operator_id).unwrap_or_default(); @@ -302,8 +294,7 @@ impl IcServiceDiscoveryImpl { ic_name: ic_name.into(), dc_id: node_operator.dc_id, operator_id, - node_provider_id: PrincipalId::try_from(node_operator.node_provider_principal_id) - .unwrap_or_default(), + node_provider_id: PrincipalId::try_from(node_operator.node_provider_principal_id).unwrap_or_default(), }); Ok(()) @@ -332,58 +323,25 @@ impl IcServiceDiscoveryImpl { impl IcServiceDiscovery for IcServiceDiscoveryImpl { fn get_target_groups( &self, - job: JobType, + job_type: JobType, log: Logger, ) -> Result, IcServiceDiscoveryError> { - let mut mapping: Option Option>> = None; - - if job == JobType::NodeExporter(NodeOS::Host) { - mapping = Some(Box::new(|sockaddr: SocketAddr| { - guest_to_host_address((set_port(job.port()))(sockaddr)) - })); - } else if job == JobType::MetricsProxy { - mapping = Some(Box::new(|sockaddr: SocketAddr| { - guest_to_host_address((set_port(job.port()))(sockaddr)) - })); - } - - for (listed_job, port) in &self.jobs { - if mapping.is_some() { - break; - } - - if *listed_job == job { - mapping = Some(some_after(set_port(*port))); - break; - } - } - - if mapping.is_none() { - return Err(IcServiceDiscoveryError::JobNameNotFound { - job_name: job.to_string(), - }); - } - + let mapping = Box::new(|sockaddr: SocketAddr| job_type.sockaddr(sockaddr, false)); let registries_lock_guard = self.registries.read().unwrap(); - let target_list = registries_lock_guard.iter().try_fold( - BTreeSet::new(), - |mut a, (ic_name, registry)| { + let target_list = registries_lock_guard + .iter() + .try_fold(BTreeSet::new(), |mut a, (ic_name, registry)| { a.append(&mut Self::get_targets(registry, ic_name, log.clone())?); Ok::<_, IcServiceDiscoveryError>(a) - }, - )?; + })?; Ok(target_list .into_iter() .filter_map(|target_group| { // replica targets are only exposed if they are assigned to a // subnet (i.e. if the subnet id is set) - if job != JobType::Replica || target_group.subnet_id.is_some() { - let targets: BTreeSet<_> = target_group - .targets - .into_iter() - .filter_map(&mapping.as_ref().unwrap()) - .collect(); + if job_type != JobType::Replica || target_group.subnet_id.is_some() { + let targets: BTreeSet<_> = target_group.targets.into_iter().map(&mapping).collect(); if !targets.is_empty() { return Some(TargetGroup { targets, @@ -397,51 +355,12 @@ impl IcServiceDiscovery for IcServiceDiscoveryImpl { } } -fn set_port(port: u16) -> Box SocketAddr> { - Box::new(move |mut sockaddr: SocketAddr| { - sockaddr.set_port(port); - sockaddr - }) -} - -/// Take a function f and return `Some . f` -fn some_after( - f: Box SocketAddr>, -) -> Box Option> { - Box::new(move |s| Some(f(s))) -} - -/// By convention, the first two bytes of the host-part of the replica's IP -/// address are 0x6801. The corresponding segment for the host is 0x6800. -/// -/// (The MAC starts with 0x6a00. The 7'th bit of the first byte is flipped. See -/// https://en.wikipedia.org/wiki/MAC_address) -pub fn guest_to_host_address(sockaddr: SocketAddr) -> Option { - match sockaddr.ip() { - IpAddr::V6(a) if a.segments()[4] == 0x6801 => { - let s = a.segments(); - let new_addr = Ipv6Addr::new(s[0], s[1], s[2], s[3], 0x6800, s[5], s[6], s[7]); - let ip = IpAddr::V6(new_addr); - Some(SocketAddr::new(ip, sockaddr.port())) - } - _ip => None, - } -} - trait MapRegistryClientErr { - fn map_registry_err( - self, - version: RegistryVersion, - context: &str, - ) -> Result; + fn map_registry_err(self, version: RegistryVersion, context: &str) -> Result; } impl MapRegistryClientErr for RegistryClientResult { - fn map_registry_err( - self, - version: RegistryVersion, - context: &str, - ) -> Result { + fn map_registry_err(self, version: RegistryVersion, context: &str) -> Result { use IcServiceDiscoveryError::*; match self { Ok(Some(v)) => Ok(v), @@ -469,10 +388,7 @@ pub enum IcServiceDiscoveryError { source: std::io::Error, }, #[error("Missing registry value. context: {context} version: {version}")] - MissingRegistryValue { - version: RegistryVersion, - context: String, - }, + MissingRegistryValue { version: RegistryVersion, context: String }, #[error("RegistryClientError")] RegistryClient { #[from] @@ -501,7 +417,7 @@ pub enum IcServiceDiscoveryError { #[cfg(test)] mod tests { - use std::collections::{HashMap, HashSet}; + use std::collections::HashSet; use slog::o; use tempfile::TempDir; @@ -517,17 +433,12 @@ mod tests { let tempdir = TempDir::new().unwrap(); let ic_dir = PathBuf::from(tempdir.path()).join("mainnet"); let _store = create_local_store_from_changelog(ic_dir, get_mainnet_delta_6d_c1()); - let mut jobs: HashMap = HashMap::new(); - jobs.insert(JobType::Replica, 9090); let log = slog::Logger::root(slog::Discard, o!()); - let ic_scraper = - IcServiceDiscoveryImpl::new(log.clone(), tempdir.path(), QUERY_TIMEOUT, jobs).unwrap(); + let ic_scraper = IcServiceDiscoveryImpl::new(log.clone(), tempdir.path(), QUERY_TIMEOUT).unwrap(); ic_scraper.load_new_ics(log.clone()).unwrap(); - let target_groups = ic_scraper - .get_target_groups(JobType::Replica, log.clone()) - .unwrap(); + let target_groups = ic_scraper.get_target_groups(JobType::Replica, log.clone()).unwrap(); let nns_targets: HashSet<_> = target_groups .iter() diff --git a/rs/ic-observability/service-discovery/src/service_discovery_record.rs b/rs/ic-observability/service-discovery/src/service_discovery_record.rs index 222f06745..e668561d5 100644 --- a/rs/ic-observability/service-discovery/src/service_discovery_record.rs +++ b/rs/ic-observability/service-discovery/src/service_discovery_record.rs @@ -14,15 +14,19 @@ pub struct ServiceDiscoveryRecord { impl From for ServiceDiscoveryRecord { fn from(group: TargetGroup) -> Self { - let targets: Vec<_> = group.targets.into_iter().map(|x| x.to_string()).collect(); - let mut labels = BTreeMap::new(); - - labels.insert(IC_NAME.into(), group.ic_name); - labels.insert(IC_NODE.into(), group.node_id.to_string()); - if let Some(subnet_id) = group.subnet_id { - labels.insert(IC_SUBNET.into(), subnet_id.to_string()); + Self { + targets: group.targets.into_iter().map(|x| x.to_string()).collect(), + labels: BTreeMap::from([ + (IC_NAME.into(), group.ic_name), + (IC_NODE.into(), group.node_id.to_string()), + ]) + .into_iter() + .chain(match group.subnet_id { + Some(subnet_id) => vec![(IC_SUBNET.into(), subnet_id.to_string())], + None => vec![], + }) + .collect(), } - Self { targets, labels } } }