From 7fac8004ad1685b2d3a0602c6bc62ebc1ca338ed Mon Sep 17 00:00:00 2001 From: Andronik Ordian Date: Wed, 5 Aug 2020 20:53:18 +0200 Subject: [PATCH] metrics: it's ugly, but it works --- Cargo.lock | 2 +- node/core/candidate-validation/src/lib.rs | 94 +++++++++++++--- node/overseer/Cargo.toml | 1 - node/overseer/src/lib.rs | 129 ++++++++++++++-------- node/subsystem/Cargo.toml | 1 + node/subsystem/src/lib.rs | 18 +++ 6 files changed, 183 insertions(+), 62 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9daeef5d5315..6c05b0fe9817 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4736,6 +4736,7 @@ dependencies = [ "smallvec 1.4.1", "sp-core", "streamunordered", + "substrate-prometheus-endpoint", ] [[package]] @@ -4754,7 +4755,6 @@ dependencies = [ "sc-client-api", "sp-core", "streamunordered", - "substrate-prometheus-endpoint", ] [[package]] diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 8dcc0a574bba..618f8166644b 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -22,7 +22,7 @@ use polkadot_subsystem::{ Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemResult, - FromOverseer, OverseerSignal, + FromOverseer, OverseerSignal, prometheus, RegisterMetrics, }; use polkadot_subsystem::messages::{ AllMessages, CandidateValidationMessage, RuntimeApiMessage, ValidationFailed, RuntimeApiRequest, @@ -45,13 +45,74 @@ use futures::prelude::*; use std::sync::Arc; +const LOG_TARGET: &'static str = "candidate_validation"; + /// The candidate validation subsystem. -pub struct CandidateValidationSubsystem(S); +pub struct CandidateValidationSubsystem { + spawn: S, + metrics: MaybeMetrics, +} + +/// Candidate validation metrics. +struct Metrics { + validation_requests: prometheus::CounterVec, +} + +struct MaybeMetrics(Option); + +impl Default for MaybeMetrics { + fn default() -> Self { + Self(None) + } +} + +impl MaybeMetrics { + fn update(&self, event: &Result) { + if let Some(metrics) = &self.0 { + match event { + Ok(ValidationResult::Valid(_)) => { + metrics.validation_requests.with_label_values(&["valid"]).inc(); + }, + Ok(ValidationResult::Invalid(_)) => { + metrics.validation_requests.with_label_values(&["invalid"]).inc(); + }, + Err(_) => { + metrics.validation_requests.with_label_values(&["failed"]).inc(); + }, + } + } + } +} + +impl RegisterMetrics for MaybeMetrics { + fn try_register(&mut self, registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> { + let metrics = Metrics { + validation_requests: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "validation_request_events", + "Number of validation requests served.", + ), + &["valid", "invalid", "failed"], + )?, + registry, + )?, + }; + self.0 = Some(metrics); + Ok(()) + } +} + +impl RegisterMetrics for CandidateValidationSubsystem { + fn try_register(&mut self, registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> { + self.metrics.try_register(registry) + } +} impl CandidateValidationSubsystem { /// Create a new `CandidateValidationSubsystem` with the given task spawner. pub fn new(spawn: S) -> Self { - CandidateValidationSubsystem(spawn) + CandidateValidationSubsystem { spawn, metrics: Default::default() } } } @@ -62,7 +123,7 @@ impl Subsystem for CandidateValidationSubsystem where fn start(self, ctx: C) -> SpawnedSubsystem { SpawnedSubsystem { name: "candidate-validation-subsystem", - future: run(ctx, self.0).map(|_| ()).boxed(), + future: run(ctx, self.spawn, self.metrics).map(|_| ()).boxed(), } } } @@ -70,6 +131,7 @@ impl Subsystem for CandidateValidationSubsystem where async fn run( mut ctx: impl SubsystemContext, spawn: impl SpawnNamed + Clone + 'static, + metrics: MaybeMetrics, ) -> SubsystemResult<()> { @@ -95,8 +157,11 @@ async fn run( ).await; match res { - Ok(x) => { let _ = response_sender.send(x); } - Err(e)=> return Err(e), + Ok(x) => { + metrics.update(&x); + let _ = response_sender.send(x); + } + Err(e) => return Err(e), } } CandidateValidationMessage::ValidateFromExhaustive( @@ -117,13 +182,16 @@ async fn run( ).await; match res { - Ok(x) => if let Err(_e) = response_sender.send(x) { - log::warn!( - target: "candidate_validation", - "Requester of candidate validation dropped", - ) + Ok(x) => { + metrics.update(&x); + if let Err(_e) = response_sender.send(x) { + log::warn!( + target: LOG_TARGET, + "Requester of candidate validation dropped", + ) + } }, - Err(e)=> return Err(e), + Err(e) => return Err(e), } } } @@ -237,7 +305,7 @@ async fn spawn_validate_from_chain_state( Ok(g) => g, Err(e) => { log::warn!( - target: "candidate_validation", + target: LOG_TARGET, "Error making runtime API request: {:?}", e, ); diff --git a/node/overseer/Cargo.toml b/node/overseer/Cargo.toml index 819fc020c3df..da19a26e0a92 100644 --- a/node/overseer/Cargo.toml +++ b/node/overseer/Cargo.toml @@ -13,7 +13,6 @@ polkadot-primitives = { path = "../../primitives" } client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" } polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" } -substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" } async-trait = "0.1" [dev-dependencies] diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 175428a63595..9baadb61bf6f 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -83,10 +83,9 @@ use polkadot_subsystem::messages::{ }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, - SpawnedSubsystem, ActiveLeavesUpdate, + SpawnedSubsystem, ActiveLeavesUpdate, prometheus, RegisterMetrics, }; use polkadot_node_primitives::SpawnNamed; -use substrate_prometheus_endpoint as prometheus; // A capacity of bounded channels inside the overseer. @@ -435,8 +434,6 @@ pub struct AllSubsystems { /// Various prometheus metrics. struct Metrics { active_heads_count: prometheus::Gauge, - // TODO do these metrics live here or in subsystems? - validation_requests: prometheus::CounterVec, // Number of statements signed // Number of bitfields signed // Number of availability chunks received @@ -461,10 +458,9 @@ impl MaybeMetrics { } } -impl Metrics { - /// Try to register metrics in the prometheus registry. - fn try_register(registry: &prometheus::Registry) -> Result { - Ok(Self { +impl RegisterMetrics for MaybeMetrics { + fn try_register(&mut self, registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> { + let metrics = Metrics { active_heads_count: prometheus::register( prometheus::Gauge::new( "active_heads_count", @@ -472,17 +468,9 @@ impl Metrics { )?, // TODO: should this be `.expect(PROOF)`? registry, )?, - validation_requests: prometheus::register( - prometheus::CounterVec::new( - prometheus::Opts::new( - "validation_requests", - "Number of validation requests served.", - ), - &["succeeded", "failed"], - )?, - registry, - )?, - }) + }; + self.0 = Some(metrics); + Ok(()) } } @@ -589,24 +577,24 @@ where /// ``` pub fn new( leaves: impl IntoIterator, - all_subsystems: AllSubsystems, + mut all_subsystems: AllSubsystems, prometheus_registry: Option<&prometheus::Registry>, mut s: S, ) -> SubsystemResult<(Self, OverseerHandler)> where - CV: Subsystem> + Send, - CB: Subsystem> + Send, - CS: Subsystem> + Send, - SD: Subsystem> + Send, - AD: Subsystem> + Send, - BS: Subsystem> + Send, - BD: Subsystem> + Send, - P: Subsystem> + Send, - PoVD: Subsystem> + Send, - RA: Subsystem> + Send, - AS: Subsystem> + Send, - NB: Subsystem> + Send, - CA: Subsystem> + Send, + CV: Subsystem> + RegisterMetrics + Send, + CB: Subsystem> + RegisterMetrics + Send, + CS: Subsystem> + RegisterMetrics + Send, + SD: Subsystem> + RegisterMetrics + Send, + AD: Subsystem> + RegisterMetrics + Send, + BS: Subsystem> + RegisterMetrics + Send, + BD: Subsystem> + RegisterMetrics + Send, + P: Subsystem> + RegisterMetrics + Send, + PoVD: Subsystem> + RegisterMetrics + Send, + RA: Subsystem> + RegisterMetrics + Send, + AS: Subsystem> + RegisterMetrics + Send, + NB: Subsystem> + RegisterMetrics + Send, + CA: Subsystem> + RegisterMetrics + Send, { let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); @@ -617,6 +605,35 @@ where let mut running_subsystems_rx = StreamUnordered::new(); let mut running_subsystems = FuturesUnordered::new(); + fn register_metrics(s: &mut S, registry: Option<&prometheus::Registry>) { + if let Some(registry) = registry { + match RegisterMetrics::try_register(s, registry) { + Err(e) => { + log::warn!(target: LOG_TARGET, "Failed to register metrics: {:?}", e); + }, + _ => {}, + } + } + }; + + // subsystem metrics + register_metrics(&mut all_subsystems.candidate_validation, prometheus_registry); + register_metrics(&mut all_subsystems.candidate_backing, prometheus_registry); + register_metrics(&mut all_subsystems.candidate_selection, prometheus_registry); + register_metrics(&mut all_subsystems.statement_distribution, prometheus_registry); + register_metrics(&mut all_subsystems.availability_distribution, prometheus_registry); + register_metrics(&mut all_subsystems.bitfield_signing, prometheus_registry); + register_metrics(&mut all_subsystems.bitfield_distribution, prometheus_registry); + register_metrics(&mut all_subsystems.pov_distribution, prometheus_registry); + register_metrics(&mut all_subsystems.runtime_api, prometheus_registry); + register_metrics(&mut all_subsystems.availability_store, prometheus_registry); + register_metrics(&mut all_subsystems.network_bridge, prometheus_registry); + register_metrics(&mut all_subsystems.chain_api, prometheus_registry); + + // overseer metrics + let mut metrics = MaybeMetrics(None); + register_metrics(&mut metrics, prometheus_registry); + let candidate_validation_subsystem = spawn( &mut s, &mut running_subsystems, @@ -715,20 +732,6 @@ where let active_leaves = HashSet::new(); - let metrics = match prometheus_registry { - Some(registry) => { - match Metrics::try_register(registry) { - Ok(metrics) => Some(metrics), - Err(e) => { - log::warn!(target: LOG_TARGET, "Failed to register metrics: {:?}", e); - None - }, - } - }, - None => None, - }; - let metrics = MaybeMetrics(metrics); - let this = Self { candidate_validation_subsystem, candidate_backing_subsystem, @@ -1112,6 +1115,12 @@ mod tests { } } + impl RegisterMetrics for TestSubsystem1 { + fn try_register(&mut self, _registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> { + Ok(()) + } + } + struct TestSubsystem2(mpsc::Sender); impl Subsystem for TestSubsystem2 @@ -1158,6 +1167,12 @@ mod tests { } } + impl RegisterMetrics for TestSubsystem2 { + fn try_register(&mut self, _registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> { + Ok(()) + } + } + struct TestSubsystem4; impl Subsystem for TestSubsystem4 @@ -1173,6 +1188,13 @@ mod tests { } } + impl RegisterMetrics for TestSubsystem4 { + fn try_register(&mut self, _registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> { + Ok(()) + } + } + + // Checks that a minimal configuration of two jobs can run and exchange messages. #[test] fn overseer_works() { @@ -1307,6 +1329,13 @@ mod tests { } } + impl RegisterMetrics for TestSubsystem5 { + fn try_register(&mut self, _registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> { + Ok(()) + } + } + + struct TestSubsystem6(mpsc::Sender); impl Subsystem for TestSubsystem6 @@ -1336,6 +1365,12 @@ mod tests { } } + impl RegisterMetrics for TestSubsystem6 { + fn try_register(&mut self, _registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> { + Ok(()) + } + } + // Tests that starting with a defined set of leaves and receiving // notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats. #[test] diff --git a/node/subsystem/Cargo.toml b/node/subsystem/Cargo.toml index f924808182df..fc5dff1f3c4b 100644 --- a/node/subsystem/Cargo.toml +++ b/node/subsystem/Cargo.toml @@ -22,6 +22,7 @@ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master smallvec = "1.4.1" sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } streamunordered = "0.5.1" +substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" } [dev-dependencies] assert_matches = "1.3.0" diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index 7e112f543911..cc778c79f249 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -19,6 +19,8 @@ //! Node-side logic for Polkadot is mostly comprised of Subsystems, which are discrete components //! that communicate via message-passing. They are coordinated by an overseer, provided by a //! separate crate. +//! +//! This crate also reexports prometheus metrics which are expected to be implemented by subsystems. #![warn(missing_docs)] @@ -32,6 +34,8 @@ use polkadot_primitives::v1::Hash; use async_trait::async_trait; use smallvec::SmallVec; +pub use substrate_prometheus_endpoint as prometheus; + use crate::messages::AllMessages; pub mod errors; @@ -224,3 +228,17 @@ impl Subsystem for DummySubsystem { } } } + +/// A trait to register subsystem-specific metrics in the prometheus registry. +/// Will be called before the subsystem starts. +pub trait RegisterMetrics { + /// Try to register metrics in the prometheus registry. + fn try_register(&mut self, registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError>; +} + + +impl RegisterMetrics for DummySubsystem { + fn try_register(&mut self, _registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> { + Ok(()) + } +} \ No newline at end of file