Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Add metrics for the events in the network output channels (#5597)
Browse files Browse the repository at this point in the history
* Add metrics for the events in the network output channels

* Documentation fixes

* A couple fixes

* Fix panic at destruction

* Rework for direct Prometheus integration

* Don't lock on the Receiver

* Another review address

* Address review

* Update client/network/src/service/out_events.rs

Co-Authored-By: Max Inden <mail@max-inden.de>

* Fix bad event name

* Fix descriptions

* Fix names

* client/network/service/out_events: Apply remaining suggestions

Co-authored-by: Max Inden <mail@max-inden.de>
  • Loading branch information
tomaka and mxinden authored Apr 9, 2020
1 parent cb3e628 commit 7f12a9f
Show file tree
Hide file tree
Showing 2 changed files with 275 additions and 11 deletions.
17 changes: 6 additions & 11 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use std::{
task::Poll,
};

mod out_events;
#[cfg(test)]
mod tests;

Expand Down Expand Up @@ -386,7 +387,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
import_queue: params.import_queue,
from_worker,
light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()),
event_streams: Vec::new(),
event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?,
metrics,
boot_node_ids,
})
Expand Down Expand Up @@ -576,7 +577,7 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkService<B, H> {
/// The stream never ends (unless the `NetworkWorker` gets shut down).
pub fn event_stream(&self) -> impl Stream<Item = Event> {
// Note: when transitioning to stable futures, remove the `Error` entirely
let (tx, rx) = tracing_unbounded("mpsc_network_event_stream");
let (tx, rx) = out_events::channel();
let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx));
rx
}
Expand Down Expand Up @@ -796,7 +797,7 @@ enum ServiceToWorkerMsg<B: BlockT, H: ExHashT> {
PutValue(record::Key, Vec<u8>),
AddKnownAddress(PeerId, Multiaddr),
SyncFork(Vec<PeerId>, B::Hash, NumberFor<B>),
EventStream(TracingUnboundedSender<Event>),
EventStream(out_events::Sender),
WriteNotification {
message: Vec<u8>,
engine_id: ConsensusEngineId,
Expand Down Expand Up @@ -831,7 +832,7 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
/// Receiver for queries from the light client that must be processed.
light_client_rqs: Option<TracingUnboundedReceiver<light_client_handler::Request<B>>>,
/// Senders for events that happen on the network.
event_streams: Vec<TracingUnboundedSender<Event>>,
event_streams: out_events::OutChannels,
/// Prometheus network metrics.
metrics: Option<Metrics>,
/// The `PeerId`'s of all boot nodes.
Expand All @@ -855,7 +856,6 @@ struct Metrics {
network_per_sec_bytes: GaugeVec<U64>,
notifications_queues_size: HistogramVec,
notifications_sizes: HistogramVec,
num_event_stream_channels: Gauge<U64>,
opened_notification_streams: GaugeVec<U64>,
peers_count: Gauge<U64>,
peerset_num_discovered: Gauge<U64>,
Expand Down Expand Up @@ -948,10 +948,6 @@ impl Metrics {
},
&["direction", "protocol"]
)?, registry)?,
num_event_stream_channels: register(Gauge::new(
"sub_libp2p_num_event_stream_channels",
"Number of internal active channels that broadcast network events",
)?, registry)?,
opened_notification_streams: register(GaugeVec::new(
Opts::new(
"sub_libp2p_opened_notification_streams",
Expand Down Expand Up @@ -1105,10 +1101,10 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
}
},
Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => {
this.event_streams.retain(|sender| sender.unbounded_send(ev.clone()).is_ok());
if let Some(metrics) = this.metrics.as_ref() {
metrics.update_with_network_event(&ev);
}
this.event_streams.send(ev);
},
Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. }) => {
trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id);
Expand Down Expand Up @@ -1249,7 +1245,6 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
metrics.network_per_sec_bytes.with_label_values(&["out"]).set(this.service.bandwidth.average_upload_per_sec());
metrics.is_major_syncing.set(is_major_syncing as u64);
metrics.kbuckets_num_nodes.set(this.network_service.num_kbuckets_entries() as u64);
metrics.num_event_stream_channels.set(this.event_streams.len() as u64);
metrics.peers_count.set(num_connected_peers as u64);
metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64);
metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64);
Expand Down
269 changes: 269 additions & 0 deletions client/network/src/service/out_events.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
// Copyright 2017-2020 Parity Technologies (UK) Ltd.
// This file is part of Substrate.

// Substrate is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Substrate is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.

//! Registering events streams.
//!
//! This code holds the logic that is used for the network service to inform other parts of
//! Substrate about what is happening.
//!
//! # Usage
//!
//! - Create an instance of [`OutChannels`].
//! - Create channels using the [`channel`] function. The receiving side implements the `Stream`
//! trait.
//! - You cannot directly send an event on a sender. Instead, you have to call
//! [`OutChannels::push`] to put the sender within a [`OutChannels`].
//! - Send events by calling [`OutChannels::send`]. Events are cloned for each sender in the
//! collection.
//!

use crate::Event;
use super::engine_id_to_string;

use futures::{prelude::*, channel::mpsc, ready};
use parking_lot::Mutex;
use prometheus_endpoint::{register, CounterVec, Gauge, Opts, PrometheusError, Registry, U64};
use std::{
convert::TryFrom as _,
fmt, pin::Pin, sync::Arc,
task::{Context, Poll}
};

/// Creates a new channel that can be associated to a [`OutChannels`].
pub fn channel() -> (Sender, Receiver) {
let (tx, rx) = mpsc::unbounded();
let metrics = Arc::new(Mutex::new(None));
let tx = Sender { inner: tx, metrics: metrics.clone() };
let rx = Receiver { inner: rx, metrics };
(tx, rx)
}

/// Sending side of a channel.
///
/// Must be associated with an [`OutChannels`] before anything can be sent on it
///
/// > **Note**: Contrary to regular channels, this `Sender` is purposefully designed to not
/// implement the `Clone` trait e.g. in Order to not complicate the logic keeping the metrics in
/// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**.
pub struct Sender {
inner: mpsc::UnboundedSender<Event>,
/// Clone of [`Receiver::metrics`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
}

impl fmt::Debug for Sender {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("Sender").finish()
}
}

impl Drop for Sender {
fn drop(&mut self) {
let metrics = self.metrics.lock();
if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) {
metrics.num_channels.dec();
}
}
}

/// Receiving side of a channel.
pub struct Receiver {
inner: mpsc::UnboundedReceiver<Event>,
/// Initially contains `None`, and will be set to a value once the corresponding [`Sender`]
/// is assigned to an instance of [`OutChannels`].
metrics: Arc<Mutex<Option<Arc<Option<Metrics>>>>>,
}

impl Stream for Receiver {
type Item = Event;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Event>> {
if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) {
let metrics = self.metrics.lock().clone();
if let Some(Some(metrics)) = metrics.as_ref().map(|m| &**m) {
metrics.event_out(&ev);
} else {
log::warn!("Inconsistency in out_events: event happened before sender associated");
}
Poll::Ready(Some(ev))
} else {
Poll::Ready(None)
}
}
}

impl fmt::Debug for Receiver {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("Receiver").finish()
}
}

impl Drop for Receiver {
fn drop(&mut self) {
// Empty the list to properly decrease the metrics.
while let Some(Some(_)) = self.next().now_or_never() {}
}
}

/// Collection of senders.
pub struct OutChannels {
event_streams: Vec<Sender>,
/// The metrics we collect. A clone of this is sent to each [`Receiver`] associated with this
/// object.
metrics: Arc<Option<Metrics>>,
}

impl OutChannels {
/// Creates a new empty collection of senders.
pub fn new(registry: Option<&Registry>) -> Result<Self, PrometheusError> {
let metrics = if let Some(registry) = registry {
Some(Metrics::register(registry)?)
} else {
None
};

Ok(OutChannels {
event_streams: Vec::new(),
metrics: Arc::new(metrics),
})
}

/// Adds a new [`Sender`] to the collection.
pub fn push(&mut self, sender: Sender) {
let mut metrics = sender.metrics.lock();
debug_assert!(metrics.is_none());
*metrics = Some(self.metrics.clone());
drop(metrics);
self.event_streams.push(sender);

if let Some(metrics) = &*self.metrics {
metrics.num_channels.inc();
}
}

/// Sends an event.
pub fn send(&mut self, event: Event) {
self.event_streams.retain(|sender| {
sender.inner.unbounded_send(event.clone()).is_ok()
});

if let Some(metrics) = &*self.metrics {
metrics.event_in(&event, self.event_streams.len() as u64);
}
}
}

impl fmt::Debug for OutChannels {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("OutChannels")
.field("num_channels", &self.event_streams.len())
.finish()
}
}

struct Metrics {
// This list is ordered alphabetically
events_total: CounterVec<U64>,
notifications_sizes: CounterVec<U64>,
num_channels: Gauge<U64>,
}

impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
events_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_out_events_events_total",
"Number of broadcast network events that have been sent or received across all \
channels"
),
&["event_name", "action"]
)?, registry)?,
notifications_sizes: register(CounterVec::new(
Opts::new(
"sub_libp2p_out_events_notifications_sizes",
"Size of notification events that have been sent or received across all \
channels"
),
&["protocol", "action"]
)?, registry)?,
num_channels: register(Gauge::new(
"sub_libp2p_out_events_num_channels",
"Number of internal active channels that broadcast network events",
)?, registry)?,
})
}

fn event_in(&self, event: &Event, num: u64) {
match event {
Event::Dht(_) => {
self.events_total
.with_label_values(&["dht", "sent"])
.inc_by(num);
}
Event::NotificationStreamOpened { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "sent"])
.inc_by(num);
},
Event::NotificationStreamClosed { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "sent"])
.inc_by(num);
},
Event::NotificationsReceived { messages, .. } => {
for (engine_id, message) in messages {
self.events_total
.with_label_values(&[&format!("notif-{:?}", engine_id), "sent"])
.inc_by(num);
self.notifications_sizes
.with_label_values(&[&engine_id_to_string(engine_id), "sent"])
.inc_by(num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::max_value())));
}
},
}
}

fn event_out(&self, event: &Event) {
match event {
Event::Dht(_) => {
self.events_total
.with_label_values(&["dht", "received"])
.inc();
}
Event::NotificationStreamOpened { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-open-{:?}", engine_id), "received"])
.inc();
},
Event::NotificationStreamClosed { engine_id, .. } => {
self.events_total
.with_label_values(&[&format!("notif-closed-{:?}", engine_id), "received"])
.inc();
},
Event::NotificationsReceived { messages, .. } => {
for (engine_id, message) in messages {
self.events_total
.with_label_values(&[&format!("notif-{:?}", engine_id), "received"])
.inc();
self.notifications_sizes
.with_label_values(&[&engine_id_to_string(engine_id), "received"])
.inc_by(u64::try_from(message.len()).unwrap_or(u64::max_value()));
}
},
}
}
}

0 comments on commit 7f12a9f

Please sign in to comment.