diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index ce92114c53011..eae20c9031ec1 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -32,7 +32,7 @@ use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollPa use log::debug; use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}}; use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId, Justification}; -use std::{borrow::Cow, iter, task::Context, task::Poll}; +use std::{borrow::Cow, iter, task::{Context, Poll}, time::Duration}; use void; /// General behaviour of the network. Combines all protocols together. @@ -67,8 +67,39 @@ pub enum BehaviourOut { BlockImport(BlockOrigin, Vec>), JustificationImport(Origin, B::Hash, NumberFor, Justification), FinalityProofImport(Origin, B::Hash, NumberFor, Vec), - /// Started a random Kademlia discovery query. + + /// Started a random iterative Kademlia discovery query. RandomKademliaStarted(ProtocolId), + + /// We have received a request from a peer and answered it. + AnsweredRequest { + /// Peer which sent us a request. + peer: PeerId, + /// Protocol name of the request. + protocol: Vec, + /// Time it took to build the response. + build_time: Duration, + }, + /// Started a new request with the given node. + RequestStarted { + peer: PeerId, + /// Protocol name of the request. + protocol: Vec, + }, + /// Finished, successfully or not, a previously-started request. + RequestFinished { + /// Who we were requesting. + peer: PeerId, + /// Protocol name of the request. + protocol: Vec, + /// How long before the response came or the request got cancelled. + request_duration: Duration, + }, + + /// Any event represented by the [`Event`] enum. + /// + /// > **Note**: The [`Event`] enum contains the events that are available through the public + /// > API of the library. Event(Event), } @@ -220,7 +251,27 @@ Behaviour { CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) => self.events.push(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)), CustomMessageOutcome::BlockRequest { target, request } => { - self.block_requests.send_request(&target, request); + match self.block_requests.send_request(&target, request) { + block_requests::SendRequestOutcome::Ok => { + self.events.push(BehaviourOut::RequestStarted { + peer: target, + protocol: self.block_requests.protocol_name().to_vec(), + }); + }, + block_requests::SendRequestOutcome::Replaced { request_duration, .. } => { + self.events.push(BehaviourOut::RequestFinished { + peer: target.clone(), + protocol: self.block_requests.protocol_name().to_vec(), + request_duration, + }); + self.events.push(BehaviourOut::RequestStarted { + peer: target, + protocol: self.block_requests.protocol_name().to_vec(), + }); + } + block_requests::SendRequestOutcome::NotConnected | + block_requests::SendRequestOutcome::EncodeError(_) => {}, + } }, CustomMessageOutcome::FinalityProofRequest { target, block_hash, request } => { self.finality_proof_requests.send_request(&target, block_hash, request); @@ -257,18 +308,40 @@ Behaviour { impl NetworkBehaviourEventProcess> for Behaviour { fn inject_event(&mut self, event: block_requests::Event) { match event { - block_requests::Event::Response { peer, original_request, response } => { + block_requests::Event::AnsweredRequest { peer, response_build_time } => { + self.events.push(BehaviourOut::AnsweredRequest { + peer, + protocol: self.block_requests.protocol_name().to_vec(), + build_time: response_build_time, + }); + }, + block_requests::Event::Response { peer, original_request, response, request_duration } => { + self.events.push(BehaviourOut::RequestFinished { + peer: peer.clone(), + protocol: self.block_requests.protocol_name().to_vec(), + request_duration, + }); let ev = self.substrate.on_block_response(peer, original_request, response); self.inject_event(ev); } - block_requests::Event::RequestCancelled { .. } => { + block_requests::Event::RequestCancelled { peer, request_duration, .. } => { // There doesn't exist any mechanism to report cancellations yet. // We would normally disconnect the node, but this event happens as the result of // a disconnect, so there's nothing more to do. + self.events.push(BehaviourOut::RequestFinished { + peer, + protocol: self.block_requests.protocol_name().to_vec(), + request_duration, + }); } - block_requests::Event::RequestTimeout { peer, .. } => { + block_requests::Event::RequestTimeout { peer, request_duration, .. } => { // There doesn't exist any mechanism to report timeouts yet, so we process them by // disconnecting the node. + self.events.push(BehaviourOut::RequestFinished { + peer: peer.clone(), + protocol: self.block_requests.protocol_name().to_vec(), + request_duration, + }); self.substrate.disconnect_peer(&peer); } } diff --git a/client/network/src/protocol/block_requests.rs b/client/network/src/protocol/block_requests.rs index 3c511538d9999..920d3f0e23f41 100644 --- a/client/network/src/protocol/block_requests.rs +++ b/client/network/src/protocol/block_requests.rs @@ -65,6 +65,7 @@ use std::{ task::{Context, Poll} }; use void::{Void, unreachable}; +use wasm_timer::Instant; // Type alias for convenience. pub type Error = Box; @@ -72,25 +73,44 @@ pub type Error = Box; /// Event generated by the block requests behaviour. #[derive(Debug)] pub enum Event { + /// A request came and we answered it. + AnsweredRequest { + /// Peer which has emitted the request. + peer: PeerId, + /// Time it took to compute the response. + response_build_time: Duration, + }, + /// A response to a block request has arrived. Response { peer: PeerId, /// The original request passed to `send_request`. original_request: message::BlockRequest, response: message::BlockResponse, + /// Time elapsed between the start of the request and the response. + request_duration: Duration, }, + /// A request has been cancelled because the peer has disconnected. /// Disconnects can also happen as a result of violating the network protocol. + /// + /// > **Note**: This event is NOT emitted if a request is overridden by calling `send_request`. + /// > For that, you must check the value returned by `send_request`. RequestCancelled { peer: PeerId, /// The original request passed to `send_request`. original_request: message::BlockRequest, + /// Time elapsed between the start of the request and the cancellation. + request_duration: Duration, }, + /// A request has timed out. RequestTimeout { peer: PeerId, /// The original request passed to `send_request`. original_request: message::BlockRequest, + /// Time elapsed between the start of the request and the timeout. + request_duration: Duration, } } @@ -184,10 +204,32 @@ struct Connection { #[derive(Debug)] struct OngoingRequest { + /// `Instant` when the request has been emitted. Used for diagnostic purposes. + emitted: Instant, request: message::BlockRequest, timeout: Delay, } +/// Outcome of calling `send_request`. +#[derive(Debug)] +#[must_use] +pub enum SendRequestOutcome { + /// Request has been emitted. + Ok, + /// The request has been emitted and has replaced an existing request. + Replaced { + /// The previously-emitted request. + previous: message::BlockRequest, + /// Time that had elapsed since `previous` has been emitted. + request_duration: Duration, + }, + /// Didn't start a request because we have no connection to this node. + /// If `send_request` returns that, it is as if the function had never been called. + NotConnected, + /// Error while serializing the request. + EncodeError(prost::EncodeError), +} + impl BlockRequests where B: Block, @@ -202,13 +244,18 @@ where } } + /// Returns the libp2p protocol name used on the wire (e.g. `/foo/sync/2`). + pub fn protocol_name(&self) -> &[u8] { + &self.config.protocol + } + /// Issue a new block request. /// /// Cancels any existing request targeting the same `PeerId`. /// /// If the response doesn't arrive in time, or if the remote answers improperly, the target /// will be disconnected. - pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest) { + pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest) -> SendRequestOutcome { // Determine which connection to send the request to. let connection = if let Some(peer) = self.peers.get_mut(target) { // We don't want to have multiple requests for any given node, so in priority try to @@ -222,10 +269,10 @@ where target: "sync", "State inconsistency: empty list of peer connections" ); - return; + return SendRequestOutcome::NotConnected; } } else { - return; + return SendRequestOutcome::NotConnected; }; let protobuf_rq = api::v1::BlockRequest { @@ -252,17 +299,12 @@ where protobuf_rq, err ); - return; + return SendRequestOutcome::EncodeError(err); } - if let Some(rq) = &connection.ongoing_request { - log::debug!( - target: "sync", - "Replacing existing block request on connection {:?}", - connection.id - ); - } + let previous_request = connection.ongoing_request.take(); connection.ongoing_request = Some(OngoingRequest { + emitted: Instant::now(), request: req.clone(), timeout: Delay::new(self.config.request_timeout), }); @@ -278,6 +320,20 @@ where protocol: self.config.protocol.clone(), }, }); + + if let Some(previous_request) = previous_request { + log::debug!( + target: "sync", + "Replacing existing block request on connection {:?}", + connection.id + ); + SendRequestOutcome::Replaced { + previous: previous_request.request, + request_duration: previous_request.emitted.elapsed(), + } + } else { + SendRequestOutcome::Ok + } } /// Callback, invoked when a new block request has been received from remote. @@ -445,6 +501,7 @@ where let ev = Event::RequestCancelled { peer: peer_id.clone(), original_request: ongoing_request.request.clone(), + request_duration: ongoing_request.emitted.elapsed(), }; self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); } @@ -476,6 +533,8 @@ where ) { match node_event { NodeEvent::Request(request, mut stream) => { + let before_answer_build = Instant::now(); + match self.on_block_request(&peer, &request) { Ok(res) => { log::trace!( @@ -508,6 +567,12 @@ where "Error handling block request from peer {}: {}", peer, e ) } + + let ev = Event::AnsweredRequest { + peer: peer.clone(), + response_build_time: before_answer_build.elapsed(), + }; + self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); } NodeEvent::Response(original_request, response) => { log::trace!( @@ -515,11 +580,13 @@ where "Received block response from peer {} with {} blocks", peer, response.blocks.len() ); - if let Some(connections) = self.peers.get_mut(&peer) { + let request_duration = if let Some(connections) = self.peers.get_mut(&peer) { if let Some(connection) = connections.iter_mut().find(|c| c.id == connection_id) { if let Some(ongoing_request) = &mut connection.ongoing_request { if ongoing_request.request == original_request { + let request_duration = ongoing_request.emitted.elapsed(); connection.ongoing_request = None; + request_duration } else { // We're no longer interested in that request. log::debug!( @@ -550,7 +617,7 @@ where peer ); return; - } + }; let blocks = response.blocks.into_iter().map(|block_data| { Ok(message::BlockData:: { @@ -594,6 +661,7 @@ where peer, original_request, response: message::BlockResponse:: { id, blocks }, + request_duration, }; self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); } @@ -625,6 +693,7 @@ where if let Poll::Ready(_) = Pin::new(&mut ongoing_request.timeout).poll(cx) { let original_request = ongoing_request.request.clone(); + let request_duration = ongoing_request.emitted.elapsed(); connection.ongoing_request = None; log::debug!( target: "sync", @@ -634,6 +703,7 @@ where let ev = Event::RequestTimeout { peer: peer.clone(), original_request, + request_duration, }; return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index f811266a13445..3fbfd2dd14252 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -817,6 +817,9 @@ struct Metrics { peerset_num_requested: Gauge, pending_connections: Gauge, pending_connections_errors_total: CounterVec, + requests_in_total: HistogramVec, + requests_out_finished: HistogramVec, + requests_out_started_total: CounterVec, } impl Metrics { @@ -965,6 +968,35 @@ impl Metrics { ), &["reason"] )?, registry)?, + requests_in_total: register(HistogramVec::new( + HistogramOpts { + common_opts: Opts::new( + "sub_libp2p_requests_in_total", + "Total number of requests received and answered" + ), + buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16) + .expect("parameters are always valid values; qed"), + }, + &["protocol"] + )?, registry)?, + requests_out_finished: register(HistogramVec::new( + HistogramOpts { + common_opts: Opts::new( + "sub_libp2p_requests_out_finished", + "Time between a request's start and finish (successful or not)" + ), + buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16) + .expect("parameters are always valid values; qed"), + }, + &["protocol"] + )?, registry)?, + requests_out_started_total: register(CounterVec::new( + Opts::new( + "sub_libp2p_requests_out_started_total", + "Total number of requests emitted" + ), + &["protocol"] + )?, registry)?, }) } @@ -1084,6 +1116,27 @@ impl Future for NetworkWorker { } this.import_queue.import_finality_proof(origin, hash, nb, proof); }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::AnsweredRequest { protocol, build_time, .. })) => { + if let Some(metrics) = this.metrics.as_ref() { + metrics.requests_in_total + .with_label_values(&[&maybe_utf8_bytes_to_string(&protocol)]) + .observe(build_time.as_secs_f64()); + } + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestStarted { protocol, .. })) => { + if let Some(metrics) = this.metrics.as_ref() { + metrics.requests_out_started_total + .with_label_values(&[&maybe_utf8_bytes_to_string(&protocol)]) + .inc(); + } + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { protocol, request_duration, .. })) => { + if let Some(metrics) = this.metrics.as_ref() { + metrics.requests_out_finished + .with_label_values(&[&maybe_utf8_bytes_to_string(&protocol)]) + .observe(request_duration.as_secs_f64()); + } + }, Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted(protocol))) => { if let Some(metrics) = this.metrics.as_ref() { metrics.kademlia_random_queries_total