diff --git a/Cargo.Bazel.lock b/Cargo.Bazel.lock index a6d9f6b9b..14faf6646 100644 --- a/Cargo.Bazel.lock +++ b/Cargo.Bazel.lock @@ -1,5 +1,5 @@ { - "checksum": "33a7792c0242d506d6be14d79c6146ea57bd5f73b4092afc33864713df44d83a", + "checksum": "5f20b91851f0676df47d73f7fc00816fa99dd7895eb15cf8e406976340d609c1", "crates": { "actix-codec 0.5.2": { "name": "actix-codec", @@ -3711,6 +3711,85 @@ }, "license": "MIT" }, + "axum-otel-metrics 0.8.0": { + "name": "axum-otel-metrics", + "version": "0.8.0", + "repository": { + "Http": { + "url": "https://crates.io/api/v1/crates/axum-otel-metrics/0.8.0/download", + "sha256": "05498d33363e05a88a33e7071053c43bb3585c89e0bccb71b2ad06425b23b14f" + } + }, + "targets": [ + { + "Library": { + "crate_name": "axum_otel_metrics", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "axum_otel_metrics", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "deps": { + "common": [ + { + "id": "axum 0.7.4", + "target": "axum" + }, + { + "id": "futures-util 0.3.30", + "target": "futures_util" + }, + { + "id": "http 1.0.0", + "target": "http" + }, + { + "id": "http-body 1.0.0", + "target": "http_body" + }, + { + "id": "opentelemetry 0.21.0", + "target": "opentelemetry" + }, + { + "id": "opentelemetry-prometheus 0.14.1", + "target": "opentelemetry_prometheus" + }, + { + "id": "opentelemetry-semantic-conventions 0.13.0", + "target": "opentelemetry_semantic_conventions" + }, + { + "id": "opentelemetry_sdk 0.21.2", + "target": "opentelemetry_sdk" + }, + { + "id": "pin-project-lite 0.2.13", + "target": "pin_project_lite" + }, + { + "id": "prometheus 0.13.3", + "target": "prometheus" + }, + { + "id": "tower 0.4.13", + "target": "tower" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "0.8.0" + }, + "license": "MIT" + }, "backoff 0.4.0": { "name": "backoff", "version": "0.4.0", @@ -13672,6 +13751,7 @@ ], "crate_features": { "common": [ + "default", "std" ], "selects": {} @@ -29375,6 +29455,10 @@ "id": "axum 0.7.4", "target": "axum" }, + { + "id": "axum-otel-metrics 0.8.0", + "target": "axum_otel_metrics" + }, { "id": "base64 0.21.7", "target": "base64" @@ -29427,6 +29511,10 @@ "id": "ic-utils 0.9.0", "target": "ic_utils" }, + { + "id": "opentelemetry 0.21.0", + "target": "opentelemetry" + }, { "id": "regex 1.10.3", "target": "regex" @@ -31719,6 +31807,297 @@ }, "license": "MIT" }, + "opentelemetry 0.21.0": { + "name": "opentelemetry", + "version": "0.21.0", + "repository": { + "Http": { + "url": "https://crates.io/api/v1/crates/opentelemetry/0.21.0/download", + "sha256": "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" + } + }, + "targets": [ + { + "Library": { + "crate_name": "opentelemetry", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "opentelemetry", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "crate_features": { + "common": [ + "default", + "metrics", + "pin-project-lite", + "trace" + ], + "selects": {} + }, + "deps": { + "common": [ + { + "id": "futures-core 0.3.30", + "target": "futures_core" + }, + { + "id": "futures-sink 0.3.30", + "target": "futures_sink" + }, + { + "id": "indexmap 2.2.2", + "target": "indexmap" + }, + { + "id": "once_cell 1.19.0", + "target": "once_cell" + }, + { + "id": "pin-project-lite 0.2.13", + "target": "pin_project_lite" + }, + { + "id": "thiserror 1.0.56", + "target": "thiserror" + }, + { + "id": "urlencoding 2.1.3", + "target": "urlencoding" + } + ], + "selects": { + "cfg(all(target_arch = \"wasm32\", not(target_os = \"wasi\")))": [ + { + "id": "js-sys 0.3.67", + "target": "js_sys" + } + ] + } + }, + "edition": "2021", + "version": "0.21.0" + }, + "license": "Apache-2.0" + }, + "opentelemetry-prometheus 0.14.1": { + "name": "opentelemetry-prometheus", + "version": "0.14.1", + "repository": { + "Http": { + "url": "https://crates.io/api/v1/crates/opentelemetry-prometheus/0.14.1/download", + "sha256": "6f8f082da115b0dcb250829e3ed0b8792b8f963a1ad42466e48422fbe6a079bd" + } + }, + "targets": [ + { + "Library": { + "crate_name": "opentelemetry_prometheus", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "opentelemetry_prometheus", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "crate_features": { + "common": [ + "prometheus-encoding" + ], + "selects": {} + }, + "deps": { + "common": [ + { + "id": "once_cell 1.19.0", + "target": "once_cell" + }, + { + "id": "opentelemetry 0.21.0", + "target": "opentelemetry" + }, + { + "id": "opentelemetry_sdk 0.21.2", + "target": "opentelemetry_sdk" + }, + { + "id": "prometheus 0.13.3", + "target": "prometheus" + }, + { + "id": "protobuf 2.28.0", + "target": "protobuf" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "0.14.1" + }, + "license": "Apache-2.0" + }, + "opentelemetry-semantic-conventions 0.13.0": { + "name": "opentelemetry-semantic-conventions", + "version": "0.13.0", + "repository": { + "Http": { + "url": "https://crates.io/api/v1/crates/opentelemetry-semantic-conventions/0.13.0/download", + "sha256": "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84" + } + }, + "targets": [ + { + "Library": { + "crate_name": "opentelemetry_semantic_conventions", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "opentelemetry_semantic_conventions", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "deps": { + "common": [ + { + "id": "opentelemetry 0.21.0", + "target": "opentelemetry" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "0.13.0" + }, + "license": "Apache-2.0" + }, + "opentelemetry_sdk 0.21.2": { + "name": "opentelemetry_sdk", + "version": "0.21.2", + "repository": { + "Http": { + "url": "https://crates.io/api/v1/crates/opentelemetry_sdk/0.21.2/download", + "sha256": "2f16aec8a98a457a52664d69e0091bac3a0abd18ead9b641cb00202ba4e0efe4" + } + }, + "targets": [ + { + "Library": { + "crate_name": "opentelemetry_sdk", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "opentelemetry_sdk", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "crate_features": { + "common": [ + "async-trait", + "crossbeam-channel", + "default", + "glob", + "metrics", + "percent-encoding", + "rand", + "rt-tokio", + "tokio", + "tokio-stream", + "trace" + ], + "selects": {} + }, + "deps": { + "common": [ + { + "id": "crossbeam-channel 0.5.11", + "target": "crossbeam_channel" + }, + { + "id": "futures-channel 0.3.30", + "target": "futures_channel" + }, + { + "id": "futures-executor 0.3.30", + "target": "futures_executor" + }, + { + "id": "futures-util 0.3.30", + "target": "futures_util" + }, + { + "id": "glob 0.3.1", + "target": "glob" + }, + { + "id": "once_cell 1.19.0", + "target": "once_cell" + }, + { + "id": "opentelemetry 0.21.0", + "target": "opentelemetry" + }, + { + "id": "ordered-float 4.2.0", + "target": "ordered_float" + }, + { + "id": "percent-encoding 2.3.1", + "target": "percent_encoding" + }, + { + "id": "rand 0.8.5", + "target": "rand" + }, + { + "id": "thiserror 1.0.56", + "target": "thiserror" + }, + { + "id": "tokio 1.36.0", + "target": "tokio" + }, + { + "id": "tokio-stream 0.1.14", + "target": "tokio_stream" + } + ], + "selects": {} + }, + "edition": "2021", + "proc_macro_deps": { + "common": [ + { + "id": "async-trait 0.1.77", + "target": "async_trait" + } + ], + "selects": {} + }, + "version": "0.21.2" + }, + "license": "Apache-2.0" + }, "option-ext 0.2.0": { "name": "option-ext", "version": "0.2.0", @@ -31749,6 +32128,52 @@ }, "license": "MPL-2.0" }, + "ordered-float 4.2.0": { + "name": "ordered-float", + "version": "4.2.0", + "repository": { + "Http": { + "url": "https://crates.io/api/v1/crates/ordered-float/4.2.0/download", + "sha256": "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" + } + }, + "targets": [ + { + "Library": { + "crate_name": "ordered_float", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "ordered_float", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "crate_features": { + "common": [ + "default", + "std" + ], + "selects": {} + }, + "deps": { + "common": [ + { + "id": "num-traits 0.2.17", + "target": "num_traits" + } + ], + "selects": {} + }, + "edition": "2021", + "version": "4.2.0" + }, + "license": "MIT" + }, "ordered-stream 0.2.0": { "name": "ordered-stream", "version": "0.2.0", @@ -44835,6 +45260,36 @@ }, "license": "MIT OR Apache-2.0" }, + "urlencoding 2.1.3": { + "name": "urlencoding", + "version": "2.1.3", + "repository": { + "Http": { + "url": "https://crates.io/api/v1/crates/urlencoding/2.1.3/download", + "sha256": "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + } + }, + "targets": [ + { + "Library": { + "crate_name": "urlencoding", + "crate_root": "src/lib.rs", + "srcs": [ + "**/*.rs" + ] + } + } + ], + "library_target_name": "urlencoding", + "common_attrs": { + "compile_data_glob": [ + "**" + ], + "edition": "2021", + "version": "2.1.3" + }, + "license": "MIT" + }, "utf8-width 0.1.7": { "name": "utf8-width", "version": "0.1.7", diff --git a/Cargo.lock b/Cargo.lock index 2880c2692..e3f932a71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -754,6 +754,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "axum-otel-metrics" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05498d33363e05a88a33e7071053c43bb3585c89e0bccb71b2ad06425b23b14f" +dependencies = [ + "axum 0.7.4", + "futures-util", + "http 1.0.0", + "http-body 1.0.0", + "opentelemetry", + "opentelemetry-prometheus", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk", + "pin-project-lite", + "prometheus", + "tower", +] + [[package]] name = "backoff" version = "0.4.0" @@ -5899,6 +5918,7 @@ dependencies = [ "anyhow", "assert_cmd", "axum 0.7.4", + "axum-otel-metrics", "base64 0.21.7", "clap 4.4.18", "crossbeam", @@ -5916,6 +5936,7 @@ dependencies = [ "ic-types", "ic-utils 0.9.0", "multiservice-discovery-shared", + "opentelemetry", "regex", "reqwest", "retry", @@ -6332,12 +6353,81 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" +dependencies = [ + "futures-core", + "futures-sink", + "indexmap 2.2.2", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", +] + +[[package]] +name = "opentelemetry-prometheus" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8f082da115b0dcb250829e3ed0b8792b8f963a1ad42466e48422fbe6a079bd" +dependencies = [ + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "prometheus", + "protobuf", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84" +dependencies = [ + "opentelemetry", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.21.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f16aec8a98a457a52664d69e0091bac3a0abd18ead9b641cb00202ba4e0efe4" +dependencies = [ + "async-trait", + "crossbeam-channel", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "ordered-float", + "percent-encoding", + "rand 0.8.5", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" +[[package]] +name = "ordered-float" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-stream" version = "0.2.0" @@ -8868,6 +8958,12 @@ dependencies = [ "serde", ] +[[package]] +name = "urlencoding" +version = "2.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" + [[package]] name = "utf8-width" version = "0.1.7" diff --git a/rs/ic-observability/config-writer-common/src/config_writer.rs b/rs/ic-observability/config-writer-common/src/config_writer.rs index 18cadb305..b8fe05179 100644 --- a/rs/ic-observability/config-writer-common/src/config_writer.rs +++ b/rs/ic-observability/config-writer-common/src/config_writer.rs @@ -22,11 +22,7 @@ pub struct ConfigWriter { } impl ConfigWriter { - pub fn new>( - write_path: P, - filters: Arc, - log: Logger, - ) -> Self { + pub fn new>(write_path: P, filters: Arc, log: Logger) -> Self { ConfigWriter { base_directory: PathBuf::from(write_path.as_ref()), last_targets: Default::default(), @@ -48,16 +44,10 @@ impl ConfigWriter { ) -> std::io::Result<()> { let last_job_targets = self.last_targets.entry(job.to_string()).or_default(); if last_job_targets == &target_groups { - debug!( - self.log, - "Targets didn't change, skipped regenerating config" - ); + debug!(self.log, "Targets didn't change, skipped regenerating config"); return Ok(()); } - debug!( - self.log, - "Targets changed, proceeding with regenerating config" - ); + debug!(self.log, "Targets changed, proceeding with regenerating config"); let target_path = self.base_directory.join(format!("{}.json", job)); let filtered_target_groups: BTreeSet = target_groups @@ -69,12 +59,8 @@ impl ConfigWriter { let vector_config = vector_config_builder.build(filtered_target_groups, job); ic_utils::fs::write_atomically(target_path.as_path(), |f| { - serde_json::to_writer_pretty(f, &vector_config).map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("Serialization error: {:?}", e), - ) - }) + serde_json::to_writer_pretty(f, &vector_config) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("Serialization error: {:?}", e))) })?; self.last_targets.insert(job.to_string(), target_groups); Ok(()) @@ -84,25 +70,15 @@ impl ConfigWriter { impl ConfigUpdater for ConfigWriter { fn update(&self, config: &dyn Config) -> Result<(), Box> { if !config.updated() { - debug!( - self.log, - "Targets didn't change, skipped regenerating config" - ); + debug!(self.log, "Targets didn't change, skipped regenerating config"); return Ok(()); } - debug!( - self.log, - "Targets changed, proceeding with regenerating config" - ); + debug!(self.log, "Targets changed, proceeding with regenerating config"); let target_path = self.base_directory.join(format!("{}.json", config.name())); ic_utils::fs::write_atomically(target_path.as_path(), |f| { - serde_json::to_writer_pretty(f, &config).map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("Serialization error: {:?}", e), - ) - }) + serde_json::to_writer_pretty(f, &config) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("Serialization error: {:?}", e))) })?; Ok(()) } diff --git a/rs/ic-observability/config-writer-common/src/config_writer_loop.rs b/rs/ic-observability/config-writer-common/src/config_writer_loop.rs index 4f206ba15..7f6ddfae6 100644 --- a/rs/ic-observability/config-writer-common/src/config_writer_loop.rs +++ b/rs/ic-observability/config-writer-common/src/config_writer_loop.rs @@ -26,8 +26,7 @@ pub fn config_writer_loop( metrics: Metrics, ) -> impl FnMut() { move || { - let mut config_writer = - ConfigWriter::new(vector_config_dir.clone(), filters.clone(), log.clone()); + let mut config_writer = ConfigWriter::new(vector_config_dir.clone(), filters.clone(), log.clone()); loop { for job in &jobs { let targets = match discovery.get_target_groups(*job, log.clone()) { @@ -42,10 +41,7 @@ pub fn config_writer_loop( .with_label_values(&[job.to_string().as_str()]) .set(targets.len().try_into().unwrap()); if let Err(e) = config_writer.write_config(*job, targets, &vector_config_builder) { - warn!( - log, - "Failed to write config for targets for job {}: {:?}", job, e - ); + warn!(log, "Failed to write config for targets for job {}: {:?}", job, e); }; } select! { diff --git a/rs/ic-observability/config-writer-common/src/filters.rs b/rs/ic-observability/config-writer-common/src/filters.rs index 655c1c4ef..3a3a86813 100644 --- a/rs/ic-observability/config-writer-common/src/filters.rs +++ b/rs/ic-observability/config-writer-common/src/filters.rs @@ -62,9 +62,7 @@ mod tests { fn create_dummy_target_group(ipv6: &str) -> TargetGroup { let mut targets = BTreeSet::new(); - targets.insert(std::net::SocketAddr::V6( - SocketAddrV6::from_str(ipv6).unwrap(), - )); + targets.insert(std::net::SocketAddr::V6(SocketAddrV6::from_str(ipv6).unwrap())); TargetGroup { node_id: NodeId::from(PrincipalId::new_anonymous()), ic_name: "mercury".into(), @@ -82,10 +80,7 @@ mod tests { let accepted_tg = TargetGroup { node_id: NodeId::from( - PrincipalId::from_str( - "iylgr-zpxwq-kqgmf-4srtx-o4eey-d6bln-smmq6-we7px-ibdea-nondy-eae", - ) - .unwrap(), + PrincipalId::from_str("iylgr-zpxwq-kqgmf-4srtx-o4eey-d6bln-smmq6-we7px-ibdea-nondy-eae").unwrap(), ), ic_name: "mercury".into(), targets: BTreeSet::new(), @@ -98,10 +93,7 @@ mod tests { let rejected_tg = TargetGroup { node_id: NodeId::from( - PrincipalId::from_str( - "x33ed-h457x-bsgyx-oqxqf-6pzwv-wkhzr-rm2j3-npodi-purzm-n66cg-gae", - ) - .unwrap(), + PrincipalId::from_str("x33ed-h457x-bsgyx-oqxqf-6pzwv-wkhzr-rm2j3-npodi-purzm-n66cg-gae").unwrap(), ), ic_name: "mercury".into(), targets: BTreeSet::new(), @@ -121,10 +113,7 @@ mod tests { let accepted_tg = TargetGroup { node_id: NodeId::from( - PrincipalId::from_str( - "iylgr-zpxwq-kqgmf-4srtx-o4eey-d6bln-smmq6-we7px-ibdea-nondy-eae", - ) - .unwrap(), + PrincipalId::from_str("iylgr-zpxwq-kqgmf-4srtx-o4eey-d6bln-smmq6-we7px-ibdea-nondy-eae").unwrap(), ), ic_name: "mercury".into(), targets: BTreeSet::new(), @@ -137,10 +126,7 @@ mod tests { let rejected_tg_1 = TargetGroup { node_id: NodeId::from( - PrincipalId::from_str( - "x33ed-h457x-bsgyx-oqxqf-6pzwv-wkhzr-rm2j3-npodi-purzm-n66cg-gae", - ) - .unwrap(), + PrincipalId::from_str("x33ed-h457x-bsgyx-oqxqf-6pzwv-wkhzr-rm2j3-npodi-purzm-n66cg-gae").unwrap(), ), ic_name: "mercury".into(), targets: BTreeSet::new(), diff --git a/rs/ic-observability/multiservice-discovery-downloader/src/main.rs b/rs/ic-observability/multiservice-discovery-downloader/src/main.rs index a4c556915..661e7c47a 100644 --- a/rs/ic-observability/multiservice-discovery-downloader/src/main.rs +++ b/rs/ic-observability/multiservice-discovery-downloader/src/main.rs @@ -21,11 +21,7 @@ fn main() { info!(logger, "Starting downloader loop"; "cli_args" => ?cli_args); - let downloader_handle = rt.spawn(run_downloader_loop( - logger.clone(), - cli_args, - stop_signal_rcv, - )); + let downloader_handle = rt.spawn(run_downloader_loop(logger.clone(), cli_args, stop_signal_rcv)); rt.block_on(shutdown_signal); info!(logger, "Received shutdown signal, shutting down ..."); @@ -127,11 +123,7 @@ pub mod log_subtype { use super::*; #[derive(Parser, Clone, Debug)] pub struct LogSubtype { - #[clap( - long = "port", - help = "Custom port for standard nodes", - default_value = "19531" - )] + #[clap(long = "port", help = "Custom port for standard nodes", default_value = "19531")] pub port: u64, #[clap( long = "boundary-nodes-port", @@ -158,10 +150,7 @@ pub mod log_subtype { #[clap(long = "journals-folder", help = "Path to the root journals folder")] journals_folder: String, - #[clap( - long = "worker-cursor-folder", - help = "Path for the root worker cursors folder" - )] + #[clap(long = "worker-cursor-folder", help = "Path for the root worker cursors folder")] worker_cursor_folder: String, #[clap(long = "data-folder", help = "Path for the data folder")] diff --git a/rs/ic-observability/multiservice-discovery-shared/src/filters/mod.rs b/rs/ic-observability/multiservice-discovery-shared/src/filters/mod.rs index 8f2c3a171..4bebe026d 100644 --- a/rs/ic-observability/multiservice-discovery-shared/src/filters/mod.rs +++ b/rs/ic-observability/multiservice-discovery-shared/src/filters/mod.rs @@ -31,10 +31,7 @@ impl TargetGroupFilter for TargetGroupFilterList { if self.filters.is_empty() { true } else { - self.filters - .iter() - .map(|f| f.filter(target_group)) - .all(|status| status) + self.filters.iter().map(|f| f.filter(target_group)).all(|status| status) } } } diff --git a/rs/ic-observability/multiservice-discovery/Cargo.toml b/rs/ic-observability/multiservice-discovery/Cargo.toml index 85b65433a..818774186 100644 --- a/rs/ic-observability/multiservice-discovery/Cargo.toml +++ b/rs/ic-observability/multiservice-discovery/Cargo.toml @@ -31,6 +31,8 @@ tokio = { workspace = true } url = { workspace = true } futures.workspace = true axum = "0.7.4" +axum-otel-metrics = "0.8.0" +opentelemetry = { version = "0.21.0", features = ["metrics"] } retry = { workspace = true } [dev-dependencies] diff --git a/rs/ic-observability/multiservice-discovery/src/definition.rs b/rs/ic-observability/multiservice-discovery/src/definition.rs index 70e390dd1..daa53a2ab 100644 --- a/rs/ic-observability/multiservice-discovery/src/definition.rs +++ b/rs/ic-observability/multiservice-discovery/src/definition.rs @@ -30,6 +30,7 @@ use tokio::sync::Mutex; use url::Url; use crate::make_logger; +use crate::metrics::RunningDefinitionsMetrics; #[derive(Clone, Serialize, Deserialize)] pub struct FSDefinition { @@ -120,6 +121,7 @@ pub struct RunningDefinition { pub(crate) definition: Definition, stop_signal: Receiver<()>, ender: Arc>>, + metrics: RunningDefinitionsMetrics, } pub struct TestDefinition { @@ -127,7 +129,7 @@ pub struct TestDefinition { } impl TestDefinition { - pub(crate) fn new(definition: Definition) -> Self { + pub(crate) fn new(definition: Definition, metrics: RunningDefinitionsMetrics) -> Self { let (_, stop_signal) = crossbeam::channel::bounded::<()>(0); let ender: Arc>> = Arc::new(Mutex::new(None)); Self { @@ -135,6 +137,7 @@ impl TestDefinition { definition, stop_signal, ender, + metrics, }, } } @@ -185,8 +188,8 @@ impl Definition { } } - pub(crate) async fn run(self, rt: tokio::runtime::Handle) -> RunningDefinition { - fn wrap(definition: RunningDefinition, rt: tokio::runtime::Handle) -> impl FnMut() { + pub(crate) async fn run(self, rt: tokio::runtime::Handle, metrics: RunningDefinitionsMetrics) -> RunningDefinition { + fn wrap(mut definition: RunningDefinition, rt: tokio::runtime::Handle) -> impl FnMut() { move || { rt.block_on(definition.run()); } @@ -199,6 +202,7 @@ impl Definition { definition: self, stop_signal, ender: ender.clone(), + metrics, }; let join_handle = std::thread::spawn(wrap(d.clone(), rt)); ender.lock().await.replace(Ender { @@ -271,7 +275,7 @@ impl RunningDefinition { r } - async fn poll_loop(&self) { + async fn poll_loop(&mut self) { let interval = crossbeam::channel::tick(self.definition.poll_interval); let mut tick = Instant::now(); loop { @@ -284,6 +288,16 @@ impl RunningDefinition { self.definition.log, "Failed to load new scraping targets for {} @ interval {:?}: {:?}", self.definition.name, tick, e ); + self.metrics + .inc_load_errors(self.name(), self.definition.log.clone()) + .await; + self.metrics + .set_failed_load(self.name(), self.definition.log.clone()) + .await + } else { + self.metrics + .set_successful_load(self.name(), self.definition.log.clone()) + .await } debug!(self.definition.log, "Update registries for {}", self.definition.name); if let Err(e) = self.definition.ic_discovery.update_registries().await { @@ -291,6 +305,16 @@ impl RunningDefinition { self.definition.log, "Failed to sync registry for {} @ interval {:?}: {:?}", self.definition.name, tick, e ); + self.metrics + .inc_sync_errors(self.name(), self.definition.log.clone()) + .await; + self.metrics + .set_failed_sync(self.name(), self.definition.log.clone()) + .await + } else { + self.metrics + .set_successful_sync(self.name(), self.definition.log.clone()) + .await } tick = crossbeam::select! { @@ -305,7 +329,7 @@ impl RunningDefinition { // Syncs the registry and keeps running, syncing as new // registry versions come in. - async fn run(&self) { + async fn run(&mut self) { if self.initial_registry_sync().await.is_err() { // FIXME: Error has been logged, but ideally, it should be handled. // E.g. telemetry should collect this. @@ -439,13 +463,18 @@ impl DefinitionsSupervisor { } } - pub(crate) async fn load_or_create_defs(&self, networks_state_file: PathBuf) -> Result<(), Box> { + pub(crate) async fn load_or_create_defs( + &self, + networks_state_file: PathBuf, + metrics: RunningDefinitionsMetrics, + ) -> Result<(), Box> { if networks_state_file.exists() { let file_content = fs::read_to_string(networks_state_file)?; let initial_definitions: Vec = serde_json::from_str(&file_content)?; self.start( initial_definitions.into_iter().map(|def| def.into()).collect(), StartMode::AddToDefinitions, + metrics, ) .await?; } @@ -463,6 +492,7 @@ impl DefinitionsSupervisor { let fs_def: Vec = existing .values() .cloned() + .into_iter() .map(|running_def| running_def.definition.into()) .collect::>(); @@ -478,6 +508,7 @@ impl DefinitionsSupervisor { existing: &mut BTreeMap, definitions: Vec, start_mode: StartMode, + metrics: RunningDefinitionsMetrics, ) -> Result<(), StartDefinitionsError> { let mut error = StartDefinitionsError { errors: vec![] }; let mut ic_names_to_add: HashSet = HashSet::new(); @@ -537,7 +568,10 @@ impl DefinitionsSupervisor { // Now we add the incoming definitions. for definition in definitions.into_iter() { - existing.insert(definition.name.clone(), definition.run(self.rt.clone()).await); + existing.insert( + definition.name.clone(), + definition.run(self.rt.clone(), metrics.clone()).await, + ); } Ok(()) } @@ -552,9 +586,14 @@ impl DefinitionsSupervisor { &self, definitions: Vec, start_mode: StartMode, + metrics: RunningDefinitionsMetrics, ) -> Result<(), StartDefinitionsError> { let mut existing = self.definitions.lock().await; - self.start_inner(&mut existing, definitions, start_mode).await + self.start_inner(&mut existing, definitions, start_mode, metrics).await + } + + pub(crate) async fn definition_names(&self) -> Vec { + self.definitions.lock().await.clone().into_keys().collect() } /// Stop all definitions and end. diff --git a/rs/ic-observability/multiservice-discovery/src/main.rs b/rs/ic-observability/multiservice-discovery/src/main.rs index 5fe3110d7..513873d74 100644 --- a/rs/ic-observability/multiservice-discovery/src/main.rs +++ b/rs/ic-observability/multiservice-discovery/src/main.rs @@ -3,6 +3,7 @@ use std::path::PathBuf; use std::time::Duration; use std::vec; +use axum_otel_metrics::HttpMetricsLayerBuilder; use clap::Parser; use humantime::parse_duration; use slog::{info, o, Drain, Logger}; @@ -15,10 +16,12 @@ use ic_async_utils::shutdown_signal; use ic_management_types::Network; use crate::definition::{RunningDefinition, TestDefinition}; +use crate::metrics::{MSDMetrics, RunningDefinitionsMetrics}; use crate::server_handlers::export_prometheus_config_handler::serialize_definitions_to_prometheus_config; use crate::server_handlers::Server; mod definition; +mod metrics; mod server_handlers; fn main() { @@ -46,7 +49,7 @@ fn main() { shutdown_signal: impl futures_util::Future, ) -> Option { let def = get_mainnet_definition(cli_args, log.clone()); - let mut test_def = TestDefinition::new(def); + let mut test_def = TestDefinition::new(def, RunningDefinitionsMetrics::new()); let sync_fut = test_def.sync_and_stop(); tokio::select! { _ = sync_fut => { @@ -67,35 +70,47 @@ fn main() { } } else { let supervisor = DefinitionsSupervisor::new(rt.handle().clone(), cli_args.start_without_mainnet); - if let Some(networks_state_file) = cli_args.networks_state_file.clone() { - rt.block_on(supervisor.load_or_create_defs(networks_state_file)).unwrap(); - } - let (server_stop, server_stop_receiver) = oneshot::channel(); - //Configure server - let server_handle = rt.spawn( - Server::new( - log.clone(), - supervisor.clone(), - cli_args.poll_interval, - cli_args.registry_query_timeout, - cli_args.targets_dir.clone(), + // Initialize the metrics layer because in the build method the `global::provider` + // is set. We can use global::meter only after that call. + let metrics_layer = HttpMetricsLayerBuilder::new().build(); + let metrics = MSDMetrics::new(); + + if let Some(networks_state_file) = cli_args.networks_state_file.clone() { + rt.block_on( + supervisor.load_or_create_defs(networks_state_file, metrics.running_definition_metrics.clone()), ) - .run(server_stop_receiver), - ); + .unwrap(); + } + // First check if we should start the mainnet definition so we can + // serve it right after the server starts. if !cli_args.start_without_mainnet { rt.block_on(async { let _ = supervisor .start( vec![get_mainnet_definition(&cli_args, log.clone())], StartMode::AddToDefinitions, + metrics.running_definition_metrics.clone(), ) .await; }); } + //Configure server + let server_handle = rt.spawn( + Server::new( + log.clone(), + supervisor.clone(), + cli_args.poll_interval, + cli_args.registry_query_timeout, + cli_args.targets_dir.clone(), + metrics, + ) + .run(server_stop_receiver, metrics_layer), + ); + // Wait for shutdown signal. rt.block_on(shutdown_signal); @@ -106,7 +121,7 @@ fn main() { if let Some(networks_state_file) = cli_args.networks_state_file.clone() { rt.block_on(supervisor.persist_defs(networks_state_file)).unwrap(); } - + //Stop all definitions. End happens in parallel with server stop. rt.block_on(supervisor.end()); @@ -189,7 +204,7 @@ the Prometheus targets of mainnet as a JSON structure on stdout. "# )] render_prom_targets_to_stdout: bool, - + #[clap( long = "networks-state-file", default_value = None, diff --git a/rs/ic-observability/multiservice-discovery/src/metrics.rs b/rs/ic-observability/multiservice-discovery/src/metrics.rs new file mode 100644 index 000000000..9a9da9760 --- /dev/null +++ b/rs/ic-observability/multiservice-discovery/src/metrics.rs @@ -0,0 +1,308 @@ +use std::{collections::HashMap, sync::Arc}; + +use opentelemetry::{ + global, + metrics::{CallbackRegistration, ObservableGauge}, + KeyValue, +}; +use slog::{error, info, Logger}; +use tokio::sync::Mutex; + +const NETWORK: &str = "network"; +const AXUM_APP: &str = "axum-app"; +const LOAD: &str = "load"; +const SYNC: &str = "sync"; + +type StatusCallbacks = Arc>>>>; +type ValueCallbacks = Arc>>>>; + +#[derive(Clone)] +pub struct MSDMetrics { + pub running_definition_metrics: RunningDefinitionsMetrics, +} + +impl Default for MSDMetrics { + fn default() -> Self { + Self::new() + } +} + +impl MSDMetrics { + pub fn new() -> Self { + Self { + running_definition_metrics: RunningDefinitionsMetrics::new(), + } + } +} + +#[derive(Clone)] +pub struct RunningDefinitionsMetrics { + pub load_new_targets_error: ObservableGauge, + pub definitions_load_successful: ObservableGauge, + + pub sync_registry_error: ObservableGauge, + pub definitions_sync_successful: ObservableGauge, + + definition_status_callbacks: StatusCallbacks, + definition_value_callbacks: ValueCallbacks, +} + +impl RunningDefinitionsMetrics { + pub fn new() -> Self { + let meter = global::meter(AXUM_APP); + let load_new_targets_error = meter + .i64_observable_gauge("msd.definitions.load.errors") + .with_description("Total number of errors while loading new targets per definition") + .init(); + + let sync_registry_error = meter + .i64_observable_gauge("msd.definitions.sync.errors") + .with_description("Total number of errors while syncing the registry per definition") + .init(); + + let definitions_load_successful = meter + .i64_observable_gauge("msd.definitions.load.successful") + .with_description("Status of last load of the registry per definition") + .init(); + + let definitions_sync_successful = meter + .i64_observable_gauge("msd.definitions.sync.successful") + .with_description("Status of last sync of the registry with NNS of definition") + .init(); + + Self { + load_new_targets_error, + definitions_load_successful, + sync_registry_error, + definitions_sync_successful, + definition_status_callbacks: Arc::new(Mutex::new(HashMap::new())), + definition_value_callbacks: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub async fn inc_load_errors(&self, network: String, logger: Logger) { + Self::inc_counter( + network, + logger, + &self.definition_value_callbacks, + &self.load_new_targets_error, + LOAD.to_string(), + ) + .await + } + + pub async fn inc_sync_errors(&self, network: String, logger: Logger) { + Self::inc_counter( + network, + logger, + &self.definition_value_callbacks, + &self.sync_registry_error, + SYNC.to_string(), + ) + .await + } + + pub async fn set_successful_sync(&mut self, network: String, logger: Logger) { + Self::set_status( + network, + logger, + 1, + &self.definitions_sync_successful, + &self.definition_status_callbacks, + ) + .await + } + + pub async fn set_failed_sync(&mut self, network: String, logger: Logger) { + Self::set_status( + network, + logger, + 0, + &self.definitions_sync_successful, + &self.definition_status_callbacks, + ) + .await + } + + pub async fn set_successful_load(&mut self, network: String, logger: Logger) { + Self::set_status( + network, + logger, + 1, + &self.definitions_load_successful, + &self.definition_status_callbacks, + ) + .await + } + + pub async fn set_failed_load(&mut self, network: String, logger: Logger) { + Self::set_status( + network, + logger, + 0, + &self.definitions_load_successful, + &self.definition_status_callbacks, + ) + .await + } + + async fn set_status( + network: String, + logger: Logger, + status: i64, + gague: &ObservableGauge, + callbacks: &StatusCallbacks, + ) { + let meter = global::meter(AXUM_APP); + let network_clone = network.clone(); + let local_clone = gague.clone(); + + match meter.register_callback(&[local_clone.as_any()], move |observer| { + observer.observe_i64(&local_clone, status, &[KeyValue::new(NETWORK, network.clone())]) + }) { + Ok(callback) => { + info!(logger, "Registering callback for '{}'", &network_clone); + let mut locked = callbacks.lock().await; + + if let Some(definition) = locked.get_mut(&network_clone) { + definition.push(callback) + } else { + locked.insert(network_clone, vec![callback]); + } + } + Err(e) => error!( + logger, + "Couldn't register callback for network '{}': {:?}", network_clone, e + ), + } + } + + pub async fn unregister_callback(&self, network: String, logger: Logger) { + self.unregister_unnamed_callback(network.clone(), logger.clone()).await; + self.unregister_named_callback(network, logger).await + } + + async fn unregister_named_callback(&self, network: String, logger: Logger) { + let mut locked = self.definition_value_callbacks.lock().await; + + if let Some(callbacks) = locked.remove(&network) { + for mut nc in callbacks { + if let Err(e) = nc.callback.unregister() { + error!( + logger, + "Couldn't unregister callback for network '{}': {:?}", network, e + ) + } + } + } + } + + async fn unregister_unnamed_callback(&self, network: String, logger: Logger) { + let mut locked = self.definition_status_callbacks.lock().await; + + if let Some(callbacks) = locked.remove(&network) { + for mut callback in callbacks { + if let Err(e) = callback.unregister() { + error!( + logger, + "Couldn't unregister callback for network '{}': {:?}", network, e + ) + } + } + } else { + error!( + logger, + "Couldn't unregister callbacks for network '{}': key not found", &network + ) + } + } + + async fn inc_counter( + network: String, + logger: Logger, + callbacks: &ValueCallbacks, + counter: &ObservableGauge, + metric_name: String, + ) { + let mut locked = callbacks.lock().await; + let network_clone = network.clone(); + let meter = global::meter(AXUM_APP); + let local_clone = counter.clone(); + + match locked.get_mut(&network) { + Some(callbacks) => match callbacks.iter_mut().find(|nc| nc.name == metric_name) { + Some(nc) => { + info!(logger, "Updating the named callback for network '{}'", network.clone()); + if let Err(e) = nc.callback.unregister() { + error!(logger, "Couldn't unregister metric for network '{}': {:?}", network, e); + return; + } + + nc.value += 1; + let cloned = nc.value; + + match meter.register_callback(&[local_clone.as_any()], move |observer| { + observer.observe_i64(&local_clone, cloned, &[KeyValue::new(NETWORK, network.clone())]) + }) { + Ok(callback) => nc.callback = callback, + Err(e) => { + error!( + logger, + "Couldn't register counter for network '{}': {:?}", network_clone, e + ) + } + } + } + None => { + match meter.register_callback(&[local_clone.as_any()], move |observer| { + observer.observe_i64(&local_clone, 1, &[KeyValue::new(NETWORK, network.clone())]) + }) { + Ok(callback) => { + let named = NamedCallbackWithValue { + value: 1_i64, + callback, + name: metric_name, + }; + + callbacks.push(named) + } + Err(e) => { + error!( + logger, + "Couldn't register counter for network '{}': {:?}", network_clone, e + ) + } + } + } + }, + None => { + match meter.register_callback(&[local_clone.as_any()], move |observer| { + observer.observe_i64(&local_clone, 1, &[KeyValue::new(NETWORK, network.clone())]) + }) { + Ok(callback) => { + info!(logger, "Registering new counter for '{}'", network_clone); + let named = NamedCallbackWithValue { + value: 1_i64, + callback, + name: metric_name, + }; + + locked.insert(network_clone, vec![named]); + } + Err(e) => { + error!( + logger, + "Couldn't register counter for network '{}': {:?}", network_clone, e + ) + } + } + } + } + } +} + +struct NamedCallbackWithValue { + callback: Box, + value: T, + name: String, +} diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs index b5d43bf5d..c729749e3 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/add_definition_handler.rs @@ -27,7 +27,11 @@ pub(super) async fn add_definition( }; match binding .supervisor - .start(vec![new_definition], StartMode::AddToDefinitions) + .start( + vec![new_definition], + StartMode::AddToDefinitions, + binding.metrics.running_definition_metrics.clone(), + ) .await { Ok(()) => ok(binding.log, format!("Definition {} added successfully", dname)), diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs index b78072aa7..ea5aad85c 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/delete_definition_handler.rs @@ -9,7 +9,14 @@ pub(super) async fn delete_definition( State(binding): State, ) -> Result { match binding.supervisor.stop(vec![name.clone()]).await { - Ok(_) => Ok(format!("Deleted definition {}", name.clone())), + Ok(_) => { + binding + .metrics + .running_definition_metrics + .unregister_callback(name.clone(), binding.log) + .await; + Ok(format!("Deleted definition {}", name)) + } Err(e) => match e.errors.into_iter().next().unwrap() { StopDefinitionError::DoesNotExist(e) => { not_found(binding.log, format!("Definition with name '{}' doesn't exist", name), e) diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs index 0c1eaa805..4140f9c19 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/mod.rs @@ -5,9 +5,11 @@ use std::time::Duration; use axum::http::StatusCode; use axum::routing::{delete, get, post, put}; use axum::Router; +use axum_otel_metrics::HttpMetricsLayer; use slog::{debug, info, Logger}; use crate::definition::DefinitionsSupervisor; +use crate::metrics::MSDMetrics; use crate::server_handlers::add_boundary_node_to_definition_handler::add_boundary_node; use crate::server_handlers::add_definition_handler::add_definition; use crate::server_handlers::delete_definition_handler::delete_definition; @@ -62,6 +64,7 @@ pub(crate) struct Server { poll_interval: Duration, registry_query_timeout: Duration, registry_path: PathBuf, + pub metrics: MSDMetrics, } impl Server { @@ -71,6 +74,7 @@ impl Server { poll_interval: Duration, registry_query_timeout: Duration, registry_path: PathBuf, + metrics: MSDMetrics, ) -> Self { Self { log, @@ -78,10 +82,12 @@ impl Server { poll_interval, registry_query_timeout, registry_path, + metrics, } } - pub(crate) async fn run(self, recv: tokio::sync::oneshot::Receiver<()>) { + pub(crate) async fn run(self, recv: tokio::sync::oneshot::Receiver<()>, metrics_layer: HttpMetricsLayer) { let app = Router::new() + .merge(metrics_layer.routes()) .route("/", post(add_definition)) .route("/", put(replace_definitions)) .route("/", get(get_definitions)) @@ -89,6 +95,7 @@ impl Server { .route("/prom/targets", get(export_prometheus_config)) .route("/targets", get(export_targets)) .route("/add_boundary_node", post(add_boundary_node)) + .layer(metrics_layer) .with_state(self.clone()); let listener = tokio::net::TcpListener::bind("0.0.0.0:8000").await.unwrap(); diff --git a/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs b/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs index f9ec6e33b..feeba2584 100644 --- a/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs +++ b/rs/ic-observability/multiservice-discovery/src/server_handlers/replace_definitions_handler.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use axum::extract::State; use axum::http::StatusCode; use axum::Json; @@ -12,6 +14,8 @@ pub(super) async fn replace_definitions( State(binding): State, Json(definitions): Json>, ) -> WebResult { + // Cache old names if we need to remove them from metrics + let old_names = binding.supervisor.definition_names().await; let dnames = definitions .iter() .map(|d| d.name.clone()) @@ -48,13 +52,29 @@ pub(super) async fn replace_definitions( let new_definitions: Vec<_> = new_definitions.into_iter().map(Result::unwrap).collect(); match binding .supervisor - .start(new_definitions, StartMode::ReplaceExistingDefinitions) + .start( + new_definitions.clone(), + StartMode::ReplaceExistingDefinitions, + binding.metrics.running_definition_metrics.clone(), + ) .await { - Ok(_) => ok( - binding.log, - format!("Added new definitions {} to existing ones", dnames), - ), + Ok(_) => { + let old_set: HashSet = old_names.iter().cloned().collect(); + let new_set: HashSet = new_definitions.iter().cloned().map(|d| d.name).collect(); + let difference = old_set.difference(&new_set); + for network in difference { + binding + .metrics + .running_definition_metrics + .unregister_callback(network.clone(), binding.log.clone()) + .await + } + ok( + binding.log, + format!("Added new definitions {} to existing ones", dnames), + ) + } Err(e) => bad_request(binding.log, format!(":\n{}", e), e), } } diff --git a/rs/ic-observability/multiservice-discovery/tests/tests.rs b/rs/ic-observability/multiservice-discovery/tests/tests.rs index fd79da455..06d6a4b40 100644 --- a/rs/ic-observability/multiservice-discovery/tests/tests.rs +++ b/rs/ic-observability/multiservice-discovery/tests/tests.rs @@ -3,23 +3,26 @@ mod tests { use anyhow::anyhow; use assert_cmd::cargo::CommandCargoExt; use multiservice_discovery_shared::builders::prometheus_config_structure::{ - PrometheusStaticConfig, IC_NAME, IC_NODE, IC_SUBNET, JOB + PrometheusStaticConfig, IC_NAME, IC_NODE, IC_SUBNET, JOB, }; use reqwest::IntoUrl; use serde_json::Value; - use tempfile::tempdir; use std::collections::{BTreeMap, BTreeSet}; use std::io::Cursor; use std::path::Path; use std::process::Command; - use std::time::Duration; use std::thread; + use std::time::Duration; + use tempfile::tempdir; const BAZEL_SD_BIN: &str = "rs/ic-observability/multiservice-discovery/multiservice-discovery"; const CARGO_SD_BIN: &str = "multiservice-discovery"; const API_NODES_URL: &str = "https://ic-api.internetcomputer.org/api/v3/nodes"; - async fn reqwest_retry(url: T, timeout: Duration) -> anyhow::Result { + async fn reqwest_retry( + url: T, + timeout: Duration, + ) -> anyhow::Result { let client = reqwest::Client::builder() .timeout(Duration::from_secs(15)) .build() @@ -55,30 +58,29 @@ mod tests { } impl TestData { fn from_prom(targets: Vec) -> Self { - let labels_set = targets - .iter() - .cloned() - .fold(BTreeMap::new(), |mut acc: BTreeMap>, v| { - for (key, value) in v.labels { - if let Some(grouped_set) = acc.get_mut(&key) { - grouped_set.insert(value); - } else { - let mut new_set = BTreeSet::new(); - new_set.insert(value); - acc.insert(key, new_set); + let labels_set = + targets + .iter() + .cloned() + .fold(BTreeMap::new(), |mut acc: BTreeMap>, v| { + for (key, value) in v.labels { + if let Some(grouped_set) = acc.get_mut(&key) { + grouped_set.insert(value); + } else { + let mut new_set = BTreeSet::new(); + new_set.insert(value); + acc.insert(key, new_set); + } } - } - acc - }); + acc + }); Self { keys: labels_set.keys().cloned().collect::>(), ic_name: labels_set.get(IC_NAME).unwrap().iter().cloned().collect::>(), jobs: labels_set.get(JOB).unwrap().iter().cloned().collect::>(), - nodes: labels_set.get(IC_NODE).unwrap() - .iter().cloned().collect::>(), - subnets: labels_set.get(IC_SUBNET).unwrap() - .iter().cloned().collect::>(), + nodes: labels_set.get(IC_NODE).unwrap().iter().cloned().collect::>(), + subnets: labels_set.get(IC_SUBNET).unwrap().iter().cloned().collect::>(), } } @@ -86,7 +88,10 @@ mod tests { Self { nodes, subnets, - keys: vec!["ic", "ic_node", "ic_subnet", "job"].into_iter().map(String::from).collect(), + keys: vec!["ic", "ic_node", "ic_subnet", "job"] + .into_iter() + .map(String::from) + .collect(), ic_name: vec!["mercury"].into_iter().map(String::from).collect(), jobs: vec![ "guest_metrics_proxy", @@ -94,38 +99,35 @@ mod tests { "host_node_exporter", "node_exporter", "orchestrator", - "replica" - ].into_iter().map(String::from).collect(), + "replica", + ] + .into_iter() + .map(String::from) + .collect(), } } } - pub struct SDRunner{ - command: Command + pub struct SDRunner { + command: Command, } impl SDRunner { - async fn fetch_targets(&mut self) -> anyhow::Result> { + async fn fetch_targets(&mut self) -> anyhow::Result> { let registry_dir = tempdir().unwrap(); const TARGETS_URL: &str = "http://localhost:8000/prom/targets"; const REQWEST_TIMEOUT: Duration = Duration::from_secs(240); - let args = vec![ - "--targets-dir", - registry_dir.path().to_str().unwrap(), - ]; - let mut sd_server = self.command - .args(args) - .spawn() - .unwrap(); - let targets: Vec = reqwest_retry(TARGETS_URL, REQWEST_TIMEOUT).await?.json().await?; + let args = vec!["--targets-dir", registry_dir.path().to_str().unwrap()]; + let mut sd_server = self.command.args(args).spawn().unwrap(); + let targets: Vec = + reqwest_retry(TARGETS_URL, REQWEST_TIMEOUT).await?.json().await?; sd_server.kill().unwrap(); return Ok(targets); } pub fn from_local_bin() -> Self { Self { - command: Command::cargo_bin(CARGO_SD_BIN) - .unwrap_or(Command::new(BAZEL_SD_BIN)) + command: Command::cargo_bin(CARGO_SD_BIN).unwrap_or(Command::new(BAZEL_SD_BIN)), } } @@ -134,10 +136,9 @@ mod tests { let sd_bin_path = sd_dir.path().join("multiservice-discovery"); download_and_extract(sd_url, sd_bin_path.as_path()).await.unwrap(); Self { - command: Command::new(sd_bin_path) + command: Command::new(sd_bin_path), } } - } pub struct ExpectedDataFetcher; @@ -146,30 +147,39 @@ mod tests { const REQWEST_TIMEOUT: Duration = Duration::from_secs(15); let response: Value = reqwest_retry(API_NODES_URL, REQWEST_TIMEOUT).await?.json().await?; let mut subnets = BTreeSet::new(); - - let nodes = response["nodes"].as_array().unwrap() + + let nodes = response["nodes"] + .as_array() + .unwrap() .iter() .map(|val| { if let Some(sub) = val["subnet_id"].as_str() { subnets.insert(String::from(sub)); } - + String::from(val["node_id"].as_str().unwrap()) - }).collect::>(); + }) + .collect::>(); - Ok(TestData::from_expected(nodes, subnets.iter().cloned().collect::>())) + Ok(TestData::from_expected( + nodes, + subnets.iter().cloned().collect::>(), + )) } async fn from_main_sd(&self) -> anyhow::Result { const MAIN_SD_URL: &str = ""; - let targets: Vec = SDRunner::from_remote_bin(MAIN_SD_URL).await.fetch_targets().await?; + let targets: Vec = + SDRunner::from_remote_bin(MAIN_SD_URL).await.fetch_targets().await?; Ok(TestData::from_prom(targets)) } pub async fn get_expected_data(&self) -> anyhow::Result { // TODO: Add support for getting TestData from main MSD bin - self.from_public_dashboard_api().await.or(Err(anyhow!("Expected data not found"))) + self.from_public_dashboard_api() + .await + .or(Err(anyhow!("Expected data not found"))) } } @@ -180,9 +190,6 @@ mod tests { let test_data = TestData::from_prom(targets); - assert_eq!( - test_data, - expected_data - ); + assert_eq!(test_data, expected_data); } } diff --git a/rs/ic-observability/node-status-updater/src/canister_updater_loop.rs b/rs/ic-observability/node-status-updater/src/canister_updater_loop.rs index 2e4fc7bf9..0df1f0591 100644 --- a/rs/ic-observability/node-status-updater/src/canister_updater_loop.rs +++ b/rs/ic-observability/node-status-updater/src/canister_updater_loop.rs @@ -18,38 +18,32 @@ pub fn canister_updater_loop( prom_client: Client, ) -> impl FnMut() { move || loop { - let prom_response: Vec = match rt.block_on( - prom_client - .query("up{ ic_node=~\".+\", job=\"node_exporter\" }") - .get(), - ) { - Ok(response) => response - .data() - .as_vector() - .unwrap_or_default() - .iter() - .map(|entry| NodeStatus { - node_id: Principal::from_text(entry.metric().get("ic_node").unwrap()).unwrap(), - status: entry.sample().value() > 0.0, - subnet_id: entry - .metric() - .get("ic_subnet") - .map(|subnet_id| Principal::from_text(subnet_id).unwrap()), - }) - .collect(), - Err(err) => { - warn!(log, "Failed to query Prometheus: {:?}", err); - Vec::new() - } - }; + let prom_response: Vec = + match rt.block_on(prom_client.query("up{ ic_node=~\".+\", job=\"node_exporter\" }").get()) { + Ok(response) => response + .data() + .as_vector() + .unwrap_or_default() + .iter() + .map(|entry| NodeStatus { + node_id: Principal::from_text(entry.metric().get("ic_node").unwrap()).unwrap(), + status: entry.sample().value() > 0.0, + subnet_id: entry + .metric() + .get("ic_subnet") + .map(|subnet_id| Principal::from_text(subnet_id).unwrap()), + }) + .collect(), + Err(err) => { + warn!(log, "Failed to query Prometheus: {:?}", err); + Vec::new() + } + }; let present_nodes = match rt.block_on(canister.get_node_status(false)) { Ok(node_status) => node_status, Err(err) => { - warn!( - log, - "Failed to query node statuses from canister: {:?}", err - ); + warn!(log, "Failed to query node statuses from canister: {:?}", err); vec![] } }; diff --git a/rs/ic-observability/obs-canister-clients/src/node_status_canister_client.rs b/rs/ic-observability/obs-canister-clients/src/node_status_canister_client.rs index 53d9b739c..6e9dfa128 100644 --- a/rs/ic-observability/obs-canister-clients/src/node_status_canister_client.rs +++ b/rs/ic-observability/obs-canister-clients/src/node_status_canister_client.rs @@ -71,20 +71,14 @@ impl NodeStatusCanister { agent } - pub async fn get_node_status( - &self, - format_for_frontend: bool, - ) -> Result, NodeStatusCanisterError> { + pub async fn get_node_status(&self, format_for_frontend: bool) -> Result, NodeStatusCanisterError> { match self .choose_random_agent() .await .query(&self.canister_id, "get_node_status") .with_effective_canister_id(self.canister_id) .with_arg(Encode! { &format_for_frontend }.map_err(|err| { - NodeStatusCanisterError::Encoding(format!( - "Error encoding argument for get_node_status: {}", - err - )) + NodeStatusCanisterError::Encoding(format!("Error encoding argument for get_node_status: {}", err)) })?) .call() .await @@ -103,20 +97,14 @@ impl NodeStatusCanister { } } - pub async fn update_node_statuses( - &self, - statuses: Vec, - ) -> Result { + pub async fn update_node_statuses(&self, statuses: Vec) -> Result { let request_id = match self .choose_random_agent() .await .update(&self.canister_id, "update_node_status") .with_effective_canister_id(self.canister_id) .with_arg(Encode! { &statuses }.map_err(|err| { - NodeStatusCanisterError::Encoding(format!( - "Error encoding argument for update_node_status: {}", - err - )) + NodeStatusCanisterError::Encoding(format!("Error encoding argument for update_node_status: {}", err)) })?) .call() .await diff --git a/rs/ic-observability/prometheus-config-updater/src/custom_filters.rs b/rs/ic-observability/prometheus-config-updater/src/custom_filters.rs index aaaa3dbb8..018d312d4 100644 --- a/rs/ic-observability/prometheus-config-updater/src/custom_filters.rs +++ b/rs/ic-observability/prometheus-config-updater/src/custom_filters.rs @@ -13,8 +13,7 @@ impl TargetGroupFilter for OldMachinesFilter { .iter() // Maps addresses to true if they are new .map(|sockaddr: &SocketAddr| { - sockaddr.port() != 9100 - || !matches!(sockaddr.ip(), IpAddr::V6(a) if a.segments()[4] == 0x5000) + sockaddr.port() != 9100 || !matches!(sockaddr.ip(), IpAddr::V6(a) if a.segments()[4] == 0x5000) }) .all(|is_new| is_new) } @@ -32,9 +31,7 @@ mod tests { fn create_dummy_target_group(ipv6: &str) -> TargetGroup { let mut targets = BTreeSet::new(); - targets.insert(std::net::SocketAddr::V6( - SocketAddrV6::from_str(ipv6).unwrap(), - )); + targets.insert(std::net::SocketAddr::V6(SocketAddrV6::from_str(ipv6).unwrap())); TargetGroup { node_id: NodeId::from(PrincipalId::new_anonymous()), ic_name: "mercury".into(), @@ -50,12 +47,10 @@ mod tests { fn old_machine_filter_test() { let filter = OldMachinesFilter {}; - let new_orchestrator_tg = - create_dummy_target_group("[2a02:800:2:2003:6801:f6ff:fec4:4c86]:9091"); + let new_orchestrator_tg = create_dummy_target_group("[2a02:800:2:2003:6801:f6ff:fec4:4c86]:9091"); assert!(TargetGroupFilter::filter(&filter, new_orchestrator_tg)); - let old_orchestrator_tg = - create_dummy_target_group("[2a02:800:2:2003:5000:f6ff:fec4:4c86]:9091"); + let old_orchestrator_tg = create_dummy_target_group("[2a02:800:2:2003:5000:f6ff:fec4:4c86]:9091"); assert!(TargetGroupFilter::filter(&filter, old_orchestrator_tg)); let old_host_tg = create_dummy_target_group("[2a02:800:2:2003:5000:f6ff:fec4:4c86]:9100"); diff --git a/rs/ic-observability/service-discovery/src/file_sd.rs b/rs/ic-observability/service-discovery/src/file_sd.rs index fad26f39f..b357499e1 100644 --- a/rs/ic-observability/service-discovery/src/file_sd.rs +++ b/rs/ic-observability/service-discovery/src/file_sd.rs @@ -31,11 +31,7 @@ impl FileSd { /// The assumption is that no external process manipulates or deletes the written files. /// FileSd will memoize the calls. Thus, calling this method twice with the /// same arguments will have no effect. - pub fn write_sd_config( - &self, - job: JobType, - p8s_target_groups: BTreeSet, - ) -> std::io::Result<()> { + pub fn write_sd_config(&self, job: JobType, p8s_target_groups: BTreeSet) -> std::io::Result<()> { let mut last_targets = self.last_targets.write().unwrap(); let last_job_targets = last_targets.entry(job.to_string()).or_default(); if last_job_targets == &p8s_target_groups { @@ -53,12 +49,8 @@ impl FileSd { .map(ServiceDiscoveryRecord::from) .collect(); ic_utils::fs::write_atomically(target_path.as_path(), |f| { - serde_json::to_writer_pretty(f, &targets).map_err(|e| { - std::io::Error::new( - std::io::ErrorKind::Other, - format!("Serialization error: {:?}", e), - ) - }) + serde_json::to_writer_pretty(f, &targets) + .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, format!("Serialization error: {:?}", e))) })?; last_targets.insert(job.to_string(), p8s_target_groups); Ok(()) diff --git a/rs/ic-observability/service-discovery/src/mainnet_registry.rs b/rs/ic-observability/service-discovery/src/mainnet_registry.rs index 87241299b..6a628e65d 100644 --- a/rs/ic-observability/service-discovery/src/mainnet_registry.rs +++ b/rs/ic-observability/service-discovery/src/mainnet_registry.rs @@ -14,10 +14,7 @@ pub fn get_mainnet_delta_6d_c1() -> Changelog { /// in this particular case, as this method is only used on startup and the /// missing versions will be fetched through subsequent updates of the local /// store. -pub fn create_local_store_from_changelog>( - path: P, - changelog: Changelog, -) -> LocalStoreImpl { +pub fn create_local_store_from_changelog>(path: P, changelog: Changelog) -> LocalStoreImpl { let store = LocalStoreImpl::new(path.as_ref()); for (v, changelog_entry) in changelog.into_iter().enumerate() { store diff --git a/rs/ic-observability/service-discovery/src/poll_loop.rs b/rs/ic-observability/service-discovery/src/poll_loop.rs index d01e56782..f45aab270 100644 --- a/rs/ic-observability/service-discovery/src/poll_loop.rs +++ b/rs/ic-observability/service-discovery/src/poll_loop.rs @@ -38,14 +38,8 @@ pub fn make_poll_loop( info!(log, "Update registries"); let timer = metrics.registries_update_latency_seconds.start_timer(); if let Err(e) = rt.block_on(ic_discovery.update_registries()) { - warn!( - log, - "Failed to sync registry @ interval {:?}: {:?}", tick, e - ); - metrics - .poll_error_count - .with_label_values(&["update_registries"]) - .inc(); + warn!(log, "Failed to sync registry @ interval {:?}: {:?}", tick, e); + metrics.poll_error_count.with_label_values(&["update_registries"]).inc(); err = true; } if let Some(sender) = &update_notifier { diff --git a/rs/ic-observability/service-discovery/src/rest_api.rs b/rs/ic-observability/service-discovery/src/rest_api.rs index 951197662..47f87ad59 100644 --- a/rs/ic-observability/service-discovery/src/rest_api.rs +++ b/rs/ic-observability/service-discovery/src/rest_api.rs @@ -73,11 +73,8 @@ impl RestApi { &self, target_groups: Result, IcServiceDiscoveryError>, ) -> Result, hyper::http::Error> { - let groups = target_groups.map(|l| -> Vec<_> { - l.into_iter() - .map(ServiceDiscoveryRecord::from) - .collect::>() - }); + let groups = + target_groups.map(|l| -> Vec<_> { l.into_iter().map(ServiceDiscoveryRecord::from).collect::>() }); match groups { Ok(groups) => { let response = serde_json::to_vec(&groups).unwrap(); diff --git a/rs/ic-observability/sns-downloader/src/main.rs b/rs/ic-observability/sns-downloader/src/main.rs index 565e2a731..b4bc14c20 100644 --- a/rs/ic-observability/sns-downloader/src/main.rs +++ b/rs/ic-observability/sns-downloader/src/main.rs @@ -21,11 +21,7 @@ fn main() { info!(logger, "Starting downloader loop"; "cli_args" => ?cli_args); - let downloader_handle = rt.spawn(run_downloader_loop( - logger.clone(), - cli_args, - stop_signal_rcv, - )); + let downloader_handle = rt.spawn(run_downloader_loop(logger.clone(), cli_args, stop_signal_rcv)); rt.block_on(shutdown_signal); info!(logger, "Received shutdown signal, shutting down ...");