Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
metrics: it's ugly, but it works
Browse files Browse the repository at this point in the history
  • Loading branch information
ordian committed Aug 5, 2020
1 parent e9a9e99 commit 7fac800
Show file tree
Hide file tree
Showing 6 changed files with 183 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

94 changes: 81 additions & 13 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use polkadot_subsystem::{
Subsystem, SubsystemContext, SpawnedSubsystem, SubsystemResult,
FromOverseer, OverseerSignal,
FromOverseer, OverseerSignal, prometheus, RegisterMetrics,
};
use polkadot_subsystem::messages::{
AllMessages, CandidateValidationMessage, RuntimeApiMessage, ValidationFailed, RuntimeApiRequest,
Expand All @@ -45,13 +45,74 @@ use futures::prelude::*;

use std::sync::Arc;

const LOG_TARGET: &'static str = "candidate_validation";

/// The candidate validation subsystem.
pub struct CandidateValidationSubsystem<S>(S);
pub struct CandidateValidationSubsystem<S> {
spawn: S,
metrics: MaybeMetrics,
}

/// Candidate validation metrics.
struct Metrics {
validation_requests: prometheus::CounterVec<prometheus::U64>,
}

struct MaybeMetrics(Option<Metrics>);

impl Default for MaybeMetrics {
fn default() -> Self {
Self(None)
}
}

impl MaybeMetrics {
fn update(&self, event: &Result<ValidationResult, ValidationFailed>) {
if let Some(metrics) = &self.0 {
match event {
Ok(ValidationResult::Valid(_)) => {
metrics.validation_requests.with_label_values(&["valid"]).inc();
},
Ok(ValidationResult::Invalid(_)) => {
metrics.validation_requests.with_label_values(&["invalid"]).inc();
},
Err(_) => {
metrics.validation_requests.with_label_values(&["failed"]).inc();
},
}
}
}
}

impl RegisterMetrics for MaybeMetrics {
fn try_register(&mut self, registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> {
let metrics = Metrics {
validation_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"validation_request_events",
"Number of validation requests served.",
),
&["valid", "invalid", "failed"],
)?,
registry,
)?,
};
self.0 = Some(metrics);
Ok(())
}
}

impl<S> RegisterMetrics for CandidateValidationSubsystem<S> {
fn try_register(&mut self, registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> {
self.metrics.try_register(registry)
}
}

impl<S> CandidateValidationSubsystem<S> {
/// Create a new `CandidateValidationSubsystem` with the given task spawner.
pub fn new(spawn: S) -> Self {
CandidateValidationSubsystem(spawn)
CandidateValidationSubsystem { spawn, metrics: Default::default() }
}
}

Expand All @@ -62,14 +123,15 @@ impl<S, C> Subsystem<C> for CandidateValidationSubsystem<S> where
fn start(self, ctx: C) -> SpawnedSubsystem {
SpawnedSubsystem {
name: "candidate-validation-subsystem",
future: run(ctx, self.0).map(|_| ()).boxed(),
future: run(ctx, self.spawn, self.metrics).map(|_| ()).boxed(),
}
}
}

async fn run(
mut ctx: impl SubsystemContext<Message = CandidateValidationMessage>,
spawn: impl SpawnNamed + Clone + 'static,
metrics: MaybeMetrics,
)
-> SubsystemResult<()>
{
Expand All @@ -95,8 +157,11 @@ async fn run(
).await;

match res {
Ok(x) => { let _ = response_sender.send(x); }
Err(e)=> return Err(e),
Ok(x) => {
metrics.update(&x);
let _ = response_sender.send(x);
}
Err(e) => return Err(e),
}
}
CandidateValidationMessage::ValidateFromExhaustive(
Expand All @@ -117,13 +182,16 @@ async fn run(
).await;

match res {
Ok(x) => if let Err(_e) = response_sender.send(x) {
log::warn!(
target: "candidate_validation",
"Requester of candidate validation dropped",
)
Ok(x) => {
metrics.update(&x);
if let Err(_e) = response_sender.send(x) {
log::warn!(
target: LOG_TARGET,
"Requester of candidate validation dropped",
)
}
},
Err(e)=> return Err(e),
Err(e) => return Err(e),
}
}
}
Expand Down Expand Up @@ -237,7 +305,7 @@ async fn spawn_validate_from_chain_state(
Ok(g) => g,
Err(e) => {
log::warn!(
target: "candidate_validation",
target: LOG_TARGET,
"Error making runtime API request: {:?}",
e,
);
Expand Down
1 change: 0 additions & 1 deletion node/overseer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ polkadot-primitives = { path = "../../primitives" }
client = { package = "sc-client-api", git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-subsystem = { package = "polkadot-node-subsystem", path = "../subsystem" }
polkadot-node-primitives = { package = "polkadot-node-primitives", path = "../primitives" }
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }
async-trait = "0.1"

[dev-dependencies]
Expand Down
129 changes: 82 additions & 47 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,9 @@ use polkadot_subsystem::messages::{
};
pub use polkadot_subsystem::{
Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult,
SpawnedSubsystem, ActiveLeavesUpdate,
SpawnedSubsystem, ActiveLeavesUpdate, prometheus, RegisterMetrics,
};
use polkadot_node_primitives::SpawnNamed;
use substrate_prometheus_endpoint as prometheus;


// A capacity of bounded channels inside the overseer.
Expand Down Expand Up @@ -435,8 +434,6 @@ pub struct AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA> {
/// Various prometheus metrics.
struct Metrics {
active_heads_count: prometheus::Gauge<prometheus::U64>,
// TODO do these metrics live here or in subsystems?
validation_requests: prometheus::CounterVec<prometheus::U64>,
// Number of statements signed
// Number of bitfields signed
// Number of availability chunks received
Expand All @@ -461,28 +458,19 @@ impl MaybeMetrics {
}
}

impl Metrics {
/// Try to register metrics in the prometheus registry.
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
Ok(Self {
impl RegisterMetrics for MaybeMetrics {
fn try_register(&mut self, registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> {
let metrics = Metrics {
active_heads_count: prometheus::register(
prometheus::Gauge::new(
"active_heads_count",
"Number of active heads."
)?, // TODO: should this be `.expect(PROOF)`?
registry,
)?,
validation_requests: prometheus::register(
prometheus::CounterVec::new(
prometheus::Opts::new(
"validation_requests",
"Number of validation requests served.",
),
&["succeeded", "failed"],
)?,
registry,
)?,
})
};
self.0 = Some(metrics);
Ok(())
}
}

Expand Down Expand Up @@ -589,24 +577,24 @@ where
/// ```
pub fn new<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>(
leaves: impl IntoIterator<Item = BlockInfo>,
all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>,
mut all_subsystems: AllSubsystems<CV, CB, CS, SD, AD, BS, BD, P, PoVD, RA, AS, NB, CA>,
prometheus_registry: Option<&prometheus::Registry>,
mut s: S,
) -> SubsystemResult<(Self, OverseerHandler)>
where
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + Send,
CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + Send,
CS: Subsystem<OverseerSubsystemContext<CandidateSelectionMessage>> + Send,
SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>> + Send,
AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>> + Send,
BS: Subsystem<OverseerSubsystemContext<BitfieldSigningMessage>> + Send,
BD: Subsystem<OverseerSubsystemContext<BitfieldDistributionMessage>> + Send,
P: Subsystem<OverseerSubsystemContext<ProvisionerMessage>> + Send,
PoVD: Subsystem<OverseerSubsystemContext<PoVDistributionMessage>> + Send,
RA: Subsystem<OverseerSubsystemContext<RuntimeApiMessage>> + Send,
AS: Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>> + Send,
NB: Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>> + Send,
CA: Subsystem<OverseerSubsystemContext<ChainApiMessage>> + Send,
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>> + RegisterMetrics + Send,
CB: Subsystem<OverseerSubsystemContext<CandidateBackingMessage>> + RegisterMetrics + Send,
CS: Subsystem<OverseerSubsystemContext<CandidateSelectionMessage>> + RegisterMetrics + Send,
SD: Subsystem<OverseerSubsystemContext<StatementDistributionMessage>> + RegisterMetrics + Send,
AD: Subsystem<OverseerSubsystemContext<AvailabilityDistributionMessage>> + RegisterMetrics + Send,
BS: Subsystem<OverseerSubsystemContext<BitfieldSigningMessage>> + RegisterMetrics + Send,
BD: Subsystem<OverseerSubsystemContext<BitfieldDistributionMessage>> + RegisterMetrics + Send,
P: Subsystem<OverseerSubsystemContext<ProvisionerMessage>> + RegisterMetrics + Send,
PoVD: Subsystem<OverseerSubsystemContext<PoVDistributionMessage>> + RegisterMetrics + Send,
RA: Subsystem<OverseerSubsystemContext<RuntimeApiMessage>> + RegisterMetrics + Send,
AS: Subsystem<OverseerSubsystemContext<AvailabilityStoreMessage>> + RegisterMetrics + Send,
NB: Subsystem<OverseerSubsystemContext<NetworkBridgeMessage>> + RegisterMetrics + Send,
CA: Subsystem<OverseerSubsystemContext<ChainApiMessage>> + RegisterMetrics + Send,
{
let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY);

Expand All @@ -617,6 +605,35 @@ where
let mut running_subsystems_rx = StreamUnordered::new();
let mut running_subsystems = FuturesUnordered::new();

fn register_metrics<S: RegisterMetrics>(s: &mut S, registry: Option<&prometheus::Registry>) {
if let Some(registry) = registry {
match RegisterMetrics::try_register(s, registry) {
Err(e) => {
log::warn!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
},
_ => {},
}
}
};

// subsystem metrics
register_metrics(&mut all_subsystems.candidate_validation, prometheus_registry);
register_metrics(&mut all_subsystems.candidate_backing, prometheus_registry);
register_metrics(&mut all_subsystems.candidate_selection, prometheus_registry);
register_metrics(&mut all_subsystems.statement_distribution, prometheus_registry);
register_metrics(&mut all_subsystems.availability_distribution, prometheus_registry);
register_metrics(&mut all_subsystems.bitfield_signing, prometheus_registry);
register_metrics(&mut all_subsystems.bitfield_distribution, prometheus_registry);
register_metrics(&mut all_subsystems.pov_distribution, prometheus_registry);
register_metrics(&mut all_subsystems.runtime_api, prometheus_registry);
register_metrics(&mut all_subsystems.availability_store, prometheus_registry);
register_metrics(&mut all_subsystems.network_bridge, prometheus_registry);
register_metrics(&mut all_subsystems.chain_api, prometheus_registry);

// overseer metrics
let mut metrics = MaybeMetrics(None);
register_metrics(&mut metrics, prometheus_registry);

let candidate_validation_subsystem = spawn(
&mut s,
&mut running_subsystems,
Expand Down Expand Up @@ -715,20 +732,6 @@ where

let active_leaves = HashSet::new();

let metrics = match prometheus_registry {
Some(registry) => {
match Metrics::try_register(registry) {
Ok(metrics) => Some(metrics),
Err(e) => {
log::warn!(target: LOG_TARGET, "Failed to register metrics: {:?}", e);
None
},
}
},
None => None,
};
let metrics = MaybeMetrics(metrics);

let this = Self {
candidate_validation_subsystem,
candidate_backing_subsystem,
Expand Down Expand Up @@ -1112,6 +1115,12 @@ mod tests {
}
}

impl RegisterMetrics for TestSubsystem1 {
fn try_register(&mut self, _registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> {
Ok(())
}
}

struct TestSubsystem2(mpsc::Sender<usize>);

impl<C> Subsystem<C> for TestSubsystem2
Expand Down Expand Up @@ -1158,6 +1167,12 @@ mod tests {
}
}

impl RegisterMetrics for TestSubsystem2 {
fn try_register(&mut self, _registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> {
Ok(())
}
}

struct TestSubsystem4;

impl<C> Subsystem<C> for TestSubsystem4
Expand All @@ -1173,6 +1188,13 @@ mod tests {
}
}

impl RegisterMetrics for TestSubsystem4 {
fn try_register(&mut self, _registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> {
Ok(())
}
}


// Checks that a minimal configuration of two jobs can run and exchange messages.
#[test]
fn overseer_works() {
Expand Down Expand Up @@ -1307,6 +1329,13 @@ mod tests {
}
}

impl RegisterMetrics for TestSubsystem5 {
fn try_register(&mut self, _registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> {
Ok(())
}
}


struct TestSubsystem6(mpsc::Sender<OverseerSignal>);

impl<C> Subsystem<C> for TestSubsystem6
Expand Down Expand Up @@ -1336,6 +1365,12 @@ mod tests {
}
}

impl RegisterMetrics for TestSubsystem6 {
fn try_register(&mut self, _registry: &prometheus::Registry) -> Result<(), prometheus::PrometheusError> {
Ok(())
}
}

// Tests that starting with a defined set of leaves and receiving
// notifications on imported blocks triggers expected `StartWork` and `StopWork` heartbeats.
#[test]
Expand Down
1 change: 1 addition & 0 deletions node/subsystem/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ sc-network = { git = "https://github.com/paritytech/substrate", branch = "master
smallvec = "1.4.1"
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
streamunordered = "0.5.1"
substrate-prometheus-endpoint = { git = "https://github.com/paritytech/substrate", branch = "master" }

[dev-dependencies]
assert_matches = "1.3.0"
Expand Down
Loading

0 comments on commit 7fac800

Please sign in to comment.