diff --git a/client/authority-discovery/src/worker/tests.rs b/client/authority-discovery/src/worker/tests.rs index ce55728a1bfa6..7f3d113a83448 100644 --- a/client/authority-discovery/src/worker/tests.rs +++ b/client/authority-discovery/src/worker/tests.rs @@ -184,6 +184,10 @@ impl NetworkStateInfo for TestNetwork { fn external_addresses(&self) -> Vec { self.external_addresses.clone() } + + fn listen_addresses(&self) -> Vec { + self.external_addresses.clone() + } } struct TestSigner<'a> { diff --git a/client/network/common/src/service.rs b/client/network/common/src/service.rs index 54d254eac384f..f0f307800202f 100644 --- a/client/network/common/src/service.rs +++ b/client/network/common/src/service.rs @@ -180,13 +180,13 @@ pub trait NetworkPeers { /// purposes. fn deny_unreserved_peers(&self); - /// Adds a `PeerId` and its `Multiaddr` as reserved. + /// Adds a `PeerId` and its `Multiaddr` as reserved for a sync protocol (default peer set). /// /// Returns an `Err` if the given string is not a valid multiaddress /// or contains an invalid peer ID (which includes the local peer ID). fn add_reserved_peer(&self, peer: MultiaddrWithPeerId) -> Result<(), String>; - /// Removes a `PeerId` from the list of reserved peers. + /// Removes a `PeerId` from the list of reserved peers for a sync protocol (default peer set). fn remove_reserved_peer(&self, peer_id: PeerId); /// Sets the reserved set of a protocol to the given set of peers. @@ -359,6 +359,9 @@ pub trait NetworkStateInfo { /// Returns the local external addresses. fn external_addresses(&self) -> Vec; + /// Returns the listening addresses (without trailing `/p2p/` with our `PeerId`). + fn listen_addresses(&self) -> Vec; + /// Returns the local Peer ID. fn local_peer_id(&self) -> PeerId; } @@ -372,6 +375,10 @@ where T::external_addresses(self) } + fn listen_addresses(&self) -> Vec { + T::listen_addresses(self) + } + fn local_peer_id(&self) -> PeerId { T::local_peer_id(self) } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index dbe148c899395..3c8856eaf8eff 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -19,13 +19,13 @@ //! Main entry point of the sc-network crate. //! //! There are two main structs in this module: [`NetworkWorker`] and [`NetworkService`]. -//! The [`NetworkWorker`] *is* the network and implements the `Future` trait. It must be polled in -//! order for the network to advance. +//! The [`NetworkWorker`] *is* the network. Network is driven by [`NetworkWorker::run`] future that +//! terminates only when all instances of the control handles [`NetworkService`] were dropped. //! The [`NetworkService`] is merely a shared version of the [`NetworkWorker`]. You can obtain an //! `Arc` by calling [`NetworkWorker::service`]. //! //! The methods of the [`NetworkService`] are implemented by sending a message over a channel, -//! which is then processed by [`NetworkWorker::poll`]. +//! which is then processed by [`NetworkWorker::next_action`]. use crate::{ behaviour::{self, Behaviour, BehaviourOut}, @@ -46,8 +46,9 @@ use libp2p::{ multiaddr, ping::Failure as PingFailure, swarm::{ - AddressScore, ConnectionError, ConnectionLimits, DialError, Executor, NetworkBehaviour, - PendingConnectionError, Swarm, SwarmBuilder, SwarmEvent, + AddressScore, ConnectionError, ConnectionHandler, ConnectionLimits, DialError, Executor, + IntoConnectionHandler, NetworkBehaviour, PendingConnectionError, Swarm, SwarmBuilder, + SwarmEvent, }, Multiaddr, PeerId, }; @@ -87,7 +88,6 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, - task::Poll, }; pub use behaviour::{InboundFailure, OutboundFailure, ResponseFailure}; @@ -100,12 +100,20 @@ mod tests; pub use libp2p::identity::{error::DecodingError, Keypair, PublicKey}; use sc_network_common::service::{NetworkBlock, NetworkRequest}; +/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`]. +/// Used as a template parameter of [`SwarmEvent`] below. +type ConnectionHandlerErr = + <<::ConnectionHandler as IntoConnectionHandler> + ::Handler as ConnectionHandler>::Error; + /// Substrate network service. Handles network IO and manages connectivity. pub struct NetworkService { /// Number of peers we're connected to. num_connected: Arc, /// The local external addresses. external_addresses: Arc>>, + /// Listen addresses. Do **NOT** include a trailing `/p2p/` with our `PeerId`. + listen_addresses: Arc>>, /// Are we actively catching up with the chain? is_major_syncing: Arc, /// Local copy of the `PeerId` of the local node. @@ -434,11 +442,13 @@ where } let external_addresses = Arc::new(Mutex::new(Vec::new())); + let listen_addresses = Arc::new(Mutex::new(Vec::new())); let peers_notifications_sinks = Arc::new(Mutex::new(HashMap::new())); let service = Arc::new(NetworkService { bandwidth, external_addresses: external_addresses.clone(), + listen_addresses: listen_addresses.clone(), num_connected: num_connected.clone(), is_major_syncing: is_major_syncing.clone(), peerset: peerset_handle, @@ -455,6 +465,7 @@ where Ok(NetworkWorker { external_addresses, + listen_addresses, num_connected, is_major_syncing, network_service: swarm, @@ -711,6 +722,34 @@ impl NetworkService { } } + /// Get connected peers debug information. + /// + /// Returns an error if the `NetworkWorker` is no longer running. + pub async fn peers_debug_info(&self) -> Result)>, ()> { + let (tx, rx) = oneshot::channel(); + + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::PeersDebugInfo { pending_response: tx }); + + // The channel can only be closed if the network worker no longer exists. + rx.await.map_err(|_| ()) + } + + /// Get the list of reserved peers. + /// + /// Returns an error if the `NetworkWorker` is no longer running. + pub async fn reserved_peers(&self) -> Result, ()> { + let (tx, rx) = oneshot::channel(); + + let _ = self + .to_worker + .unbounded_send(ServiceToWorkerMsg::ReservedPeers { pending_response: tx }); + + // The channel can only be closed if the network worker no longer exists. + rx.await.map_err(|_| ()) + } + /// Utility function to extract `PeerId` from each `Multiaddr` for peer set updates. /// /// Returns an `Err` if one of the given addresses is invalid or contains an @@ -774,6 +813,11 @@ where self.external_addresses.lock().clone() } + /// Returns the listener addresses (without trailing `/p2p/` with our `PeerId`). + fn listen_addresses(&self) -> Vec { + self.listen_addresses.lock().clone() + } + /// Returns the local Peer ID. fn local_peer_id(&self) -> PeerId { self.local_peer_id @@ -1243,6 +1287,12 @@ enum ServiceToWorkerMsg { }, DisconnectPeer(PeerId, ProtocolName), NewBestBlockImported(B::Hash, NumberFor), + PeersDebugInfo { + pending_response: oneshot::Sender)>>, + }, + ReservedPeers { + pending_response: oneshot::Sender>, + }, } /// Main network worker. Must be polled in order for the network to advance. @@ -1258,6 +1308,8 @@ where /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. external_addresses: Arc>>, /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. + listen_addresses: Arc>>, + /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. num_connected: Arc, /// Updated by the `NetworkWorker` and loaded by the `NetworkService`. is_major_syncing: Arc, @@ -1281,637 +1333,595 @@ where _marker: PhantomData, } -impl Future for NetworkWorker +impl NetworkWorker where B: BlockT + 'static, H: ExHashT, Client: HeaderBackend + 'static, { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { - let this = &mut *self; - - // At the time of writing of this comment, due to a high volume of messages, the network - // worker sometimes takes a long time to process the loop below. When that happens, the - // rest of the polling is frozen. In order to avoid negative side-effects caused by this - // freeze, a limit to the number of iterations is enforced below. If the limit is reached, - // the task is interrupted then scheduled again. - // - // This allows for a more even distribution in the time taken by each sub-part of the - // polling. - let mut num_iterations = 0; - loop { - num_iterations += 1; - if num_iterations >= 100 { - cx.waker().wake_by_ref(); - break - } + /// Run the network. + pub async fn run(mut self) { + while self.next_action().await {} + } - // Process the next message coming from the `NetworkService`. - let msg = match this.from_service.poll_next_unpin(cx) { - Poll::Ready(Some(msg)) => msg, - Poll::Ready(None) => return Poll::Ready(()), - Poll::Pending => break, - }; - match msg { - ServiceToWorkerMsg::AnnounceBlock(hash, data) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .announce_block(hash, data), - ServiceToWorkerMsg::GetValue(key) => - this.network_service.behaviour_mut().get_value(key), - ServiceToWorkerMsg::PutValue(key, value) => - this.network_service.behaviour_mut().put_value(key, value), - ServiceToWorkerMsg::SetReservedOnly(reserved_only) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .set_reserved_only(reserved_only), - ServiceToWorkerMsg::SetReserved(peers) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .set_reserved_peers(peers), - ServiceToWorkerMsg::SetPeersetReserved(protocol, peers) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .set_reserved_peerset_peers(protocol, peers), - ServiceToWorkerMsg::AddReserved(peer_id) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .add_reserved_peer(peer_id), - ServiceToWorkerMsg::RemoveReserved(peer_id) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .remove_reserved_peer(peer_id), - ServiceToWorkerMsg::AddSetReserved(protocol, peer_id) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .add_set_reserved_peer(protocol, peer_id), - ServiceToWorkerMsg::RemoveSetReserved(protocol, peer_id) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .remove_set_reserved_peer(protocol, peer_id), - ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => - this.network_service.behaviour_mut().add_known_address(peer_id, addr), - ServiceToWorkerMsg::AddToPeersSet(protocol, peer_id) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .add_to_peers_set(protocol, peer_id), - ServiceToWorkerMsg::RemoveFromPeersSet(protocol, peer_id) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .remove_from_peers_set(protocol, peer_id), - ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender), - ServiceToWorkerMsg::Request { - target, - protocol, - request, - pending_response, - connect, - } => { - this.network_service.behaviour_mut().send_request( - &target, - &protocol, - request, - pending_response, - connect, - ); - }, - ServiceToWorkerMsg::NetworkStatus { pending_response } => { - let _ = pending_response.send(Ok(this.status())); - }, - ServiceToWorkerMsg::NetworkState { pending_response } => { - let _ = pending_response.send(Ok(this.network_state())); - }, - ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .disconnect_peer(&who, protocol_name), - ServiceToWorkerMsg::NewBestBlockImported(hash, number) => this - .network_service - .behaviour_mut() - .user_protocol_mut() - .new_best_block_imported(hash, number), - } + /// Perform one action on the network. + /// + /// Returns `false` when the worker should be shutdown. + /// Use in tests only. + pub async fn next_action(&mut self) -> bool { + futures::select! { + // Next message from the service. + msg = self.from_service.next() => { + if let Some(msg) = msg { + self.handle_worker_message(msg); + } else { + return false + } + }, + // Next event from `Swarm` (the stream guaranteed to never terminate). + event = self.network_service.select_next_some() => { + self.handle_swarm_event(event); + }, + }; + + let num_connected_peers = + self.network_service.behaviour_mut().user_protocol_mut().num_connected_peers(); + + // Update the variables shared with the `NetworkService`. + self.num_connected.store(num_connected_peers, Ordering::Relaxed); + { + let external_addresses = + self.network_service.external_addresses().map(|r| &r.addr).cloned().collect(); + *self.external_addresses.lock() = external_addresses; + + let listen_addresses = + self.network_service.listeners().map(ToOwned::to_owned).collect(); + *self.listen_addresses.lock() = listen_addresses; } - // `num_iterations` serves the same purpose as in the previous loop. - // See the previous loop for explanations. - let mut num_iterations = 0; - loop { - num_iterations += 1; - if num_iterations >= 1000 { - cx.waker().wake_by_ref(); - break + let is_major_syncing = self + .network_service + .behaviour_mut() + .user_protocol_mut() + .sync_state() + .state + .is_major_syncing(); + + self.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); + + if let Some(metrics) = self.metrics.as_ref() { + if let Some(buckets) = self.network_service.behaviour_mut().num_entries_per_kbucket() { + for (lower_ilog2_bucket_bound, num_entries) in buckets { + metrics + .kbuckets_num_nodes + .with_label_values(&[&lower_ilog2_bucket_bound.to_string()]) + .set(num_entries as u64); + } + } + if let Some(num_entries) = self.network_service.behaviour_mut().num_kademlia_records() { + metrics.kademlia_records_count.set(num_entries as u64); + } + if let Some(num_entries) = + self.network_service.behaviour_mut().kademlia_records_total_size() + { + metrics.kademlia_records_sizes_total.set(num_entries as u64); } + metrics + .peerset_num_discovered + .set(self.network_service.behaviour_mut().user_protocol().num_discovered_peers() + as u64); + metrics.pending_connections.set( + Swarm::network_info(&self.network_service).connection_counters().num_pending() + as u64, + ); + } - // Process the next action coming from the network. - let next_event = this.network_service.select_next_some(); - futures::pin_mut!(next_event); - let poll_value = next_event.poll_unpin(cx); + true + } - match poll_value { - Poll::Pending => break, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { - protocol, - result, - .. - })) => { - if let Some(metrics) = this.metrics.as_ref() { - match result { - Ok(serve_time) => { - metrics - .requests_in_success_total - .with_label_values(&[&protocol]) - .observe(serve_time.as_secs_f64()); - }, - Err(err) => { - let reason = match err { - ResponseFailure::Network(InboundFailure::Timeout) => "timeout", - ResponseFailure::Network( - InboundFailure::UnsupportedProtocols, - ) => - // `UnsupportedProtocols` is reported for every single - // inbound request whenever a request with an unsupported - // protocol is received. This is not reported in order to - // avoid confusions. - continue, - ResponseFailure::Network(InboundFailure::ResponseOmission) => - "busy-omitted", - ResponseFailure::Network(InboundFailure::ConnectionClosed) => - "connection-closed", - }; + /// Process the next message coming from the `NetworkService`. + fn handle_worker_message(&mut self, msg: ServiceToWorkerMsg) { + match msg { + ServiceToWorkerMsg::AnnounceBlock(hash, data) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .announce_block(hash, data), + ServiceToWorkerMsg::GetValue(key) => + self.network_service.behaviour_mut().get_value(key), + ServiceToWorkerMsg::PutValue(key, value) => + self.network_service.behaviour_mut().put_value(key, value), + ServiceToWorkerMsg::SetReservedOnly(reserved_only) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .set_reserved_only(reserved_only), + ServiceToWorkerMsg::SetReserved(peers) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .set_reserved_peers(peers), + ServiceToWorkerMsg::SetPeersetReserved(protocol, peers) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .set_reserved_peerset_peers(protocol, peers), + ServiceToWorkerMsg::AddReserved(peer_id) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .add_reserved_peer(peer_id), + ServiceToWorkerMsg::RemoveReserved(peer_id) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .remove_reserved_peer(peer_id), + ServiceToWorkerMsg::AddSetReserved(protocol, peer_id) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .add_set_reserved_peer(protocol, peer_id), + ServiceToWorkerMsg::RemoveSetReserved(protocol, peer_id) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .remove_set_reserved_peer(protocol, peer_id), + ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => + self.network_service.behaviour_mut().add_known_address(peer_id, addr), + ServiceToWorkerMsg::AddToPeersSet(protocol, peer_id) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .add_to_peers_set(protocol, peer_id), + ServiceToWorkerMsg::RemoveFromPeersSet(protocol, peer_id) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .remove_from_peers_set(protocol, peer_id), + ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender), + ServiceToWorkerMsg::Request { + target, + protocol, + request, + pending_response, + connect, + } => { + self.network_service.behaviour_mut().send_request( + &target, + &protocol, + request, + pending_response, + connect, + ); + }, + ServiceToWorkerMsg::NetworkStatus { pending_response } => { + let _ = pending_response.send(Ok(self.status())); + }, + ServiceToWorkerMsg::NetworkState { pending_response } => { + let _ = pending_response.send(Ok(self.network_state())); + }, + ServiceToWorkerMsg::DisconnectPeer(who, protocol_name) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .disconnect_peer(&who, protocol_name), + ServiceToWorkerMsg::NewBestBlockImported(hash, number) => self + .network_service + .behaviour_mut() + .user_protocol_mut() + .new_best_block_imported(hash, number), + ServiceToWorkerMsg::PeersDebugInfo { pending_response } => { + let _ = pending_response.send(self.peers_debug_info()); + }, + ServiceToWorkerMsg::ReservedPeers { pending_response } => { + let _ = + pending_response.send(self.reserved_peers().map(ToOwned::to_owned).collect()); + }, + } + } + /// Process the next event coming from `Swarm`. + fn handle_swarm_event( + &mut self, + event: SwarmEvent>>, + ) { + match event { + SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. }) => { + if let Some(metrics) = self.metrics.as_ref() { + match result { + Ok(serve_time) => { + metrics + .requests_in_success_total + .with_label_values(&[&protocol]) + .observe(serve_time.as_secs_f64()); + }, + Err(err) => { + let reason = match err { + ResponseFailure::Network(InboundFailure::Timeout) => + Some("timeout"), + ResponseFailure::Network(InboundFailure::UnsupportedProtocols) => + // `UnsupportedProtocols` is reported for every single + // inbound request whenever a request with an unsupported + // protocol is received. This is not reported in order to + // avoid confusions. + None, + ResponseFailure::Network(InboundFailure::ResponseOmission) => + Some("busy-omitted"), + ResponseFailure::Network(InboundFailure::ConnectionClosed) => + Some("connection-closed"), + }; + + if let Some(reason) = reason { metrics .requests_in_failure_total .with_label_values(&[&protocol, reason]) .inc(); - }, - } + } + }, } - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { - protocol, - duration, - result, - .. - })) => - if let Some(metrics) = this.metrics.as_ref() { - match result { - Ok(_) => { - metrics - .requests_out_success_total - .with_label_values(&[&protocol]) - .observe(duration.as_secs_f64()); - }, - Err(err) => { - let reason = match err { - RequestFailure::NotConnected => "not-connected", - RequestFailure::UnknownProtocol => "unknown-protocol", - RequestFailure::Refused => "refused", - RequestFailure::Obsolete => "obsolete", - RequestFailure::Network(OutboundFailure::DialFailure) => - "dial-failure", - RequestFailure::Network(OutboundFailure::Timeout) => "timeout", - RequestFailure::Network(OutboundFailure::ConnectionClosed) => - "connection-closed", - RequestFailure::Network( - OutboundFailure::UnsupportedProtocols, - ) => "unsupported", - }; + } + }, + SwarmEvent::Behaviour(BehaviourOut::RequestFinished { + protocol, + duration, + result, + .. + }) => + if let Some(metrics) = self.metrics.as_ref() { + match result { + Ok(_) => { + metrics + .requests_out_success_total + .with_label_values(&[&protocol]) + .observe(duration.as_secs_f64()); + }, + Err(err) => { + let reason = match err { + RequestFailure::NotConnected => "not-connected", + RequestFailure::UnknownProtocol => "unknown-protocol", + RequestFailure::Refused => "refused", + RequestFailure::Obsolete => "obsolete", + RequestFailure::Network(OutboundFailure::DialFailure) => + "dial-failure", + RequestFailure::Network(OutboundFailure::Timeout) => "timeout", + RequestFailure::Network(OutboundFailure::ConnectionClosed) => + "connection-closed", + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => + "unsupported", + }; - metrics - .requests_out_failure_total - .with_label_values(&[&protocol, reason]) - .inc(); - }, - } - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { - peer, - changes, - })) => - for change in changes { - this.network_service.behaviour().user_protocol().report_peer(peer, change); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::PeerIdentify { - peer_id, - info: - IdentifyInfo { - protocol_version, - agent_version, - mut listen_addrs, - protocols, - .. + metrics + .requests_out_failure_total + .with_label_values(&[&protocol, reason]) + .inc(); }, - })) => { - if listen_addrs.len() > 30 { - debug!( - target: "sub-libp2p", - "Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}", - peer_id, protocol_version, agent_version - ); - listen_addrs.truncate(30); } - for addr in listen_addrs { - this.network_service - .behaviour_mut() - .add_self_reported_address_to_dht(&peer_id, &protocols, addr); - } - this.network_service - .behaviour_mut() - .user_protocol_mut() - .add_default_set_discovered_nodes(iter::once(peer_id)); }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id))) => { - this.network_service - .behaviour_mut() - .user_protocol_mut() - .add_default_set_discovered_nodes(iter::once(peer_id)); + SwarmEvent::Behaviour(BehaviourOut::ReputationChanges { peer, changes }) => + for change in changes { + self.network_service.behaviour().user_protocol().report_peer(peer, change); }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted)) => - if let Some(metrics) = this.metrics.as_ref() { - metrics.kademlia_random_queries_total.inc(); + SwarmEvent::Behaviour(BehaviourOut::PeerIdentify { + peer_id, + info: + IdentifyInfo { + protocol_version, agent_version, mut listen_addrs, protocols, .. }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened { + }) => { + if listen_addrs.len() > 30 { + debug!( + target: "sub-libp2p", + "Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}", + peer_id, protocol_version, agent_version + ); + listen_addrs.truncate(30); + } + for addr in listen_addrs { + self.network_service + .behaviour_mut() + .add_self_reported_address_to_dht(&peer_id, &protocols, addr); + } + self.network_service + .behaviour_mut() + .user_protocol_mut() + .add_default_set_discovered_nodes(iter::once(peer_id)); + }, + SwarmEvent::Behaviour(BehaviourOut::Discovered(peer_id)) => { + self.network_service + .behaviour_mut() + .user_protocol_mut() + .add_default_set_discovered_nodes(iter::once(peer_id)); + }, + SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted) => + if let Some(metrics) = self.metrics.as_ref() { + metrics.kademlia_random_queries_total.inc(); + }, + SwarmEvent::Behaviour(BehaviourOut::NotificationStreamOpened { + remote, + protocol, + negotiated_fallback, + notifications_sink, + role, + }) => { + if let Some(metrics) = self.metrics.as_ref() { + metrics + .notifications_streams_opened_total + .with_label_values(&[&protocol]) + .inc(); + } + { + let mut peers_notifications_sinks = self.peers_notifications_sinks.lock(); + let _previous_value = peers_notifications_sinks + .insert((remote, protocol.clone()), notifications_sink); + debug_assert!(_previous_value.is_none()); + } + self.event_streams.send(Event::NotificationStreamOpened { remote, protocol, negotiated_fallback, - notifications_sink, role, - })) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics - .notifications_streams_opened_total - .with_label_values(&[&protocol]) - .inc(); - } - { - let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); - let _previous_value = peers_notifications_sinks - .insert((remote, protocol.clone()), notifications_sink); - debug_assert!(_previous_value.is_none()); - } - this.event_streams.send(Event::NotificationStreamOpened { - remote, - protocol, - negotiated_fallback, - role, - }); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced { - remote, - protocol, - notifications_sink, - })) => { - let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); - if let Some(s) = peers_notifications_sinks.get_mut(&(remote, protocol)) { - *s = notifications_sink; - } else { - error!( - target: "sub-libp2p", - "NotificationStreamReplaced for non-existing substream" - ); - debug_assert!(false); - } + }); + }, + SwarmEvent::Behaviour(BehaviourOut::NotificationStreamReplaced { + remote, + protocol, + notifications_sink, + }) => { + let mut peers_notifications_sinks = self.peers_notifications_sinks.lock(); + if let Some(s) = peers_notifications_sinks.get_mut(&(remote, protocol)) { + *s = notifications_sink; + } else { + error!( + target: "sub-libp2p", + "NotificationStreamReplaced for non-existing substream" + ); + debug_assert!(false); + } - // TODO: Notifications might have been lost as a result of the previous - // connection being dropped, and as a result it would be preferable to notify - // the users of this fact by simulating the substream being closed then - // reopened. - // The code below doesn't compile because `role` is unknown. Propagating the - // handshake of the secondary connections is quite an invasive change and - // would conflict with https://github.com/paritytech/substrate/issues/6403. - // Considering that dropping notifications is generally regarded as - // acceptable, this bug is at the moment intentionally left there and is - // intended to be fixed at the same time as - // https://github.com/paritytech/substrate/issues/6403. - // this.event_streams.send(Event::NotificationStreamClosed { - // remote, - // protocol, - // }); - // this.event_streams.send(Event::NotificationStreamOpened { - // remote, - // protocol, - // role, - // }); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { - remote, - protocol, - })) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics - .notifications_streams_closed_total - .with_label_values(&[&protocol[..]]) - .inc(); - } - this.event_streams.send(Event::NotificationStreamClosed { - remote, - protocol: protocol.clone(), - }); - { - let mut peers_notifications_sinks = this.peers_notifications_sinks.lock(); - let _previous_value = peers_notifications_sinks.remove(&(remote, protocol)); - debug_assert!(_previous_value.is_some()); - } - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived { - remote, - messages, - })) => { - if let Some(metrics) = this.metrics.as_ref() { - for (protocol, message) in &messages { - metrics - .notifications_sizes - .with_label_values(&["in", protocol]) - .observe(message.len() as f64); - } - } - this.event_streams.send(Event::NotificationsReceived { remote, messages }); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::SyncConnected(remote))) => { - this.event_streams.send(Event::SyncConnected { remote }); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::SyncDisconnected(remote))) => { - this.event_streams.send(Event::SyncDisconnected { remote }); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration))) => { - if let Some(metrics) = this.metrics.as_ref() { - let query_type = match event { - DhtEvent::ValueFound(_) => "value-found", - DhtEvent::ValueNotFound(_) => "value-not-found", - DhtEvent::ValuePut(_) => "value-put", - DhtEvent::ValuePutFailed(_) => "value-put-failed", - }; + // TODO: Notifications might have been lost as a result of the previous + // connection being dropped, and as a result it would be preferable to notify + // the users of this fact by simulating the substream being closed then + // reopened. + // The code below doesn't compile because `role` is unknown. Propagating the + // handshake of the secondary connections is quite an invasive change and + // would conflict with https://github.com/paritytech/substrate/issues/6403. + // Considering that dropping notifications is generally regarded as + // acceptable, this bug is at the moment intentionally left there and is + // intended to be fixed at the same time as + // https://github.com/paritytech/substrate/issues/6403. + // self.event_streams.send(Event::NotificationStreamClosed { + // remote, + // protocol, + // }); + // self.event_streams.send(Event::NotificationStreamOpened { + // remote, + // protocol, + // role, + // }); + }, + SwarmEvent::Behaviour(BehaviourOut::NotificationStreamClosed { remote, protocol }) => { + if let Some(metrics) = self.metrics.as_ref() { + metrics + .notifications_streams_closed_total + .with_label_values(&[&protocol[..]]) + .inc(); + } + self.event_streams + .send(Event::NotificationStreamClosed { remote, protocol: protocol.clone() }); + { + let mut peers_notifications_sinks = self.peers_notifications_sinks.lock(); + let _previous_value = peers_notifications_sinks.remove(&(remote, protocol)); + debug_assert!(_previous_value.is_some()); + } + }, + SwarmEvent::Behaviour(BehaviourOut::NotificationsReceived { remote, messages }) => { + if let Some(metrics) = self.metrics.as_ref() { + for (protocol, message) in &messages { metrics - .kademlia_query_duration - .with_label_values(&[query_type]) - .observe(duration.as_secs_f64()); + .notifications_sizes + .with_label_values(&["in", protocol]) + .observe(message.len() as f64); } + } + self.event_streams.send(Event::NotificationsReceived { remote, messages }); + }, + SwarmEvent::Behaviour(BehaviourOut::SyncConnected(remote)) => { + self.event_streams.send(Event::SyncConnected { remote }); + }, + SwarmEvent::Behaviour(BehaviourOut::SyncDisconnected(remote)) => { + self.event_streams.send(Event::SyncDisconnected { remote }); + }, + SwarmEvent::Behaviour(BehaviourOut::Dht(event, duration)) => { + if let Some(metrics) = self.metrics.as_ref() { + let query_type = match event { + DhtEvent::ValueFound(_) => "value-found", + DhtEvent::ValueNotFound(_) => "value-not-found", + DhtEvent::ValuePut(_) => "value-put", + DhtEvent::ValuePutFailed(_) => "value-put-failed", + }; + metrics + .kademlia_query_duration + .with_label_values(&[query_type]) + .observe(duration.as_secs_f64()); + } - this.event_streams.send(Event::Dht(event)); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::None)) => { - // Ignored event from lower layers. - }, - Poll::Ready(SwarmEvent::ConnectionEstablished { - peer_id, - endpoint, - num_established, - concurrent_dial_errors, - }) => { - if let Some(errors) = concurrent_dial_errors { - debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors); - } else { - debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id); - } + self.event_streams.send(Event::Dht(event)); + }, + SwarmEvent::Behaviour(BehaviourOut::None) => { + // Ignored event from lower layers. + }, + SwarmEvent::ConnectionEstablished { + peer_id, + endpoint, + num_established, + concurrent_dial_errors, + } => { + if let Some(errors) = concurrent_dial_errors { + debug!(target: "sub-libp2p", "Libp2p => Connected({:?}) with errors: {:?}", peer_id, errors); + } else { + debug!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id); + } - if let Some(metrics) = this.metrics.as_ref() { - let direction = match endpoint { - ConnectedPoint::Dialer { .. } => "out", - ConnectedPoint::Listener { .. } => "in", - }; - metrics.connections_opened_total.with_label_values(&[direction]).inc(); + if let Some(metrics) = self.metrics.as_ref() { + let direction = match endpoint { + ConnectedPoint::Dialer { .. } => "out", + ConnectedPoint::Listener { .. } => "in", + }; + metrics.connections_opened_total.with_label_values(&[direction]).inc(); - if num_established.get() == 1 { - metrics.distinct_peers_connections_opened_total.inc(); - } + if num_established.get() == 1 { + metrics.distinct_peers_connections_opened_total.inc(); } - }, - Poll::Ready(SwarmEvent::ConnectionClosed { - peer_id, - cause, - endpoint, - num_established, - }) => { - debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause); - if let Some(metrics) = this.metrics.as_ref() { - let direction = match endpoint { - ConnectedPoint::Dialer { .. } => "out", - ConnectedPoint::Listener { .. } => "in", - }; - let reason = match cause { - Some(ConnectionError::IO(_)) => "transport-error", - Some(ConnectionError::Handler(EitherError::A(EitherError::A( - EitherError::B(EitherError::A(PingFailure::Timeout)), - )))) => "ping-timeout", - Some(ConnectionError::Handler(EitherError::A(EitherError::A( - EitherError::A(NotifsHandlerError::SyncNotificationsClogged), - )))) => "sync-notifications-clogged", - Some(ConnectionError::Handler(_)) => "protocol-error", - Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout", - None => "actively-closed", - }; - metrics - .connections_closed_total - .with_label_values(&[direction, reason]) - .inc(); + } + }, + SwarmEvent::ConnectionClosed { peer_id, cause, endpoint, num_established } => { + debug!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause); + if let Some(metrics) = self.metrics.as_ref() { + let direction = match endpoint { + ConnectedPoint::Dialer { .. } => "out", + ConnectedPoint::Listener { .. } => "in", + }; + let reason = match cause { + Some(ConnectionError::IO(_)) => "transport-error", + Some(ConnectionError::Handler(EitherError::A(EitherError::A( + EitherError::B(EitherError::A(PingFailure::Timeout)), + )))) => "ping-timeout", + Some(ConnectionError::Handler(EitherError::A(EitherError::A( + EitherError::A(NotifsHandlerError::SyncNotificationsClogged), + )))) => "sync-notifications-clogged", + Some(ConnectionError::Handler(_)) => "protocol-error", + Some(ConnectionError::KeepAliveTimeout) => "keep-alive-timeout", + None => "actively-closed", + }; + metrics.connections_closed_total.with_label_values(&[direction, reason]).inc(); - // `num_established` represents the number of *remaining* connections. - if num_established == 0 { - metrics.distinct_peers_connections_closed_total.inc(); - } - } - }, - Poll::Ready(SwarmEvent::NewListenAddr { address, .. }) => { - trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", address); - if let Some(metrics) = this.metrics.as_ref() { - metrics.listeners_local_addresses.inc(); - } - }, - Poll::Ready(SwarmEvent::ExpiredListenAddr { address, .. }) => { - info!(target: "sub-libp2p", "📪 No longer listening on {}", address); - if let Some(metrics) = this.metrics.as_ref() { - metrics.listeners_local_addresses.dec(); + // `num_established` represents the number of *remaining* connections. + if num_established == 0 { + metrics.distinct_peers_connections_closed_total.inc(); } - }, - Poll::Ready(SwarmEvent::OutgoingConnectionError { peer_id, error }) => { - if let Some(peer_id) = peer_id { - trace!( - target: "sub-libp2p", - "Libp2p => Failed to reach {:?}: {}", - peer_id, error, - ); - - if this.boot_node_ids.contains(&peer_id) { - if let DialError::WrongPeerId { obtained, endpoint } = &error { - if let ConnectedPoint::Dialer { address, role_override: _ } = - endpoint - { - warn!( - "💔 The bootnode you want to connect to at `{}` provided a different peer ID `{}` than the one you expect `{}`.", - address, - obtained, - peer_id, - ); - } + } + }, + SwarmEvent::NewListenAddr { address, .. } => { + trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", address); + if let Some(metrics) = self.metrics.as_ref() { + metrics.listeners_local_addresses.inc(); + } + }, + SwarmEvent::ExpiredListenAddr { address, .. } => { + info!(target: "sub-libp2p", "📪 No longer listening on {}", address); + if let Some(metrics) = self.metrics.as_ref() { + metrics.listeners_local_addresses.dec(); + } + }, + SwarmEvent::OutgoingConnectionError { peer_id, error } => { + if let Some(peer_id) = peer_id { + trace!( + target: "sub-libp2p", + "Libp2p => Failed to reach {:?}: {}", + peer_id, error, + ); + + if self.boot_node_ids.contains(&peer_id) { + if let DialError::WrongPeerId { obtained, endpoint } = &error { + if let ConnectedPoint::Dialer { address, role_override: _ } = endpoint { + warn!( + "💔 The bootnode you want to connect to at `{}` provided a different peer ID `{}` than the one you expect `{}`.", + address, + obtained, + peer_id, + ); } } } + } - if let Some(metrics) = this.metrics.as_ref() { - let reason = match error { - DialError::ConnectionLimit(_) => Some("limit-reached"), - DialError::InvalidPeerId(_) => Some("invalid-peer-id"), - DialError::Transport(_) | DialError::ConnectionIo(_) => - Some("transport-error"), - DialError::Banned | - DialError::LocalPeerId | - DialError::NoAddresses | - DialError::DialPeerConditionFalse(_) | - DialError::WrongPeerId { .. } | - DialError::Aborted => None, // ignore them - }; - if let Some(reason) = reason { - metrics - .pending_connections_errors_total - .with_label_values(&[reason]) - .inc(); - } - } - }, - Poll::Ready(SwarmEvent::Dialing(peer_id)) => { - trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id) - }, - Poll::Ready(SwarmEvent::IncomingConnection { local_addr, send_back_addr }) => { - trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))", - local_addr, send_back_addr); - if let Some(metrics) = this.metrics.as_ref() { - metrics.incoming_connections_total.inc(); - } - }, - Poll::Ready(SwarmEvent::IncomingConnectionError { - local_addr, - send_back_addr, - error, - }) => { - debug!( - target: "sub-libp2p", - "Libp2p => IncomingConnectionError({},{}): {}", - local_addr, send_back_addr, error, - ); - if let Some(metrics) = this.metrics.as_ref() { - let reason = match error { - PendingConnectionError::ConnectionLimit(_) => Some("limit-reached"), - PendingConnectionError::WrongPeerId { .. } => Some("invalid-peer-id"), - PendingConnectionError::Transport(_) | - PendingConnectionError::IO(_) => Some("transport-error"), - PendingConnectionError::Aborted => None, // ignore it - }; - - if let Some(reason) = reason { - metrics - .incoming_connections_errors_total - .with_label_values(&[reason]) - .inc(); - } + if let Some(metrics) = self.metrics.as_ref() { + let reason = match error { + DialError::ConnectionLimit(_) => Some("limit-reached"), + DialError::InvalidPeerId(_) => Some("invalid-peer-id"), + DialError::Transport(_) | DialError::ConnectionIo(_) => + Some("transport-error"), + DialError::Banned | + DialError::LocalPeerId | + DialError::NoAddresses | + DialError::DialPeerConditionFalse(_) | + DialError::WrongPeerId { .. } | + DialError::Aborted => None, // ignore them + }; + if let Some(reason) = reason { + metrics.pending_connections_errors_total.with_label_values(&[reason]).inc(); } - }, - Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }) => { - debug!( - target: "sub-libp2p", - "Libp2p => BannedPeer({}). Connected via {:?}.", - peer_id, endpoint, - ); - if let Some(metrics) = this.metrics.as_ref() { + } + }, + SwarmEvent::Dialing(peer_id) => { + trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id) + }, + SwarmEvent::IncomingConnection { local_addr, send_back_addr } => { + trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))", + local_addr, send_back_addr); + if let Some(metrics) = self.metrics.as_ref() { + metrics.incoming_connections_total.inc(); + } + }, + SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error } => { + debug!( + target: "sub-libp2p", + "Libp2p => IncomingConnectionError({},{}): {}", + local_addr, send_back_addr, error, + ); + if let Some(metrics) = self.metrics.as_ref() { + let reason = match error { + PendingConnectionError::ConnectionLimit(_) => Some("limit-reached"), + PendingConnectionError::WrongPeerId { .. } => Some("invalid-peer-id"), + PendingConnectionError::Transport(_) | PendingConnectionError::IO(_) => + Some("transport-error"), + PendingConnectionError::Aborted => None, // ignore it + }; + + if let Some(reason) = reason { metrics .incoming_connections_errors_total - .with_label_values(&["banned"]) + .with_label_values(&[reason]) .inc(); } - }, - Poll::Ready(SwarmEvent::ListenerClosed { reason, addresses, .. }) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics.listeners_local_addresses.sub(addresses.len() as u64); - } - let addrs = - addresses.into_iter().map(|a| a.to_string()).collect::>().join(", "); - match reason { - Ok(()) => error!( - target: "sub-libp2p", - "📪 Libp2p listener ({}) closed gracefully", - addrs - ), - Err(e) => error!( - target: "sub-libp2p", - "📪 Libp2p listener ({}) closed: {}", - addrs, e - ), - } - }, - Poll::Ready(SwarmEvent::ListenerError { error, .. }) => { - debug!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error); - if let Some(metrics) = this.metrics.as_ref() { - metrics.listeners_errors_total.inc(); - } - }, - }; - } - - let num_connected_peers = - this.network_service.behaviour_mut().user_protocol_mut().num_connected_peers(); - - // Update the variables shared with the `NetworkService`. - this.num_connected.store(num_connected_peers, Ordering::Relaxed); - { - let external_addresses = - Swarm::>::external_addresses(&this.network_service) - .map(|r| &r.addr) - .cloned() - .collect(); - *this.external_addresses.lock() = external_addresses; - } - - let is_major_syncing = this - .network_service - .behaviour_mut() - .user_protocol_mut() - .sync_state() - .state - .is_major_syncing(); - - this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); - - if let Some(metrics) = this.metrics.as_ref() { - if let Some(buckets) = this.network_service.behaviour_mut().num_entries_per_kbucket() { - for (lower_ilog2_bucket_bound, num_entries) in buckets { - metrics - .kbuckets_num_nodes - .with_label_values(&[&lower_ilog2_bucket_bound.to_string()]) - .set(num_entries as u64); } - } - if let Some(num_entries) = this.network_service.behaviour_mut().num_kademlia_records() { - metrics.kademlia_records_count.set(num_entries as u64); - } - if let Some(num_entries) = - this.network_service.behaviour_mut().kademlia_records_total_size() - { - metrics.kademlia_records_sizes_total.set(num_entries as u64); - } - metrics - .peerset_num_discovered - .set(this.network_service.behaviour_mut().user_protocol().num_discovered_peers() - as u64); - metrics.pending_connections.set( - Swarm::network_info(&this.network_service).connection_counters().num_pending() - as u64, - ); + }, + SwarmEvent::BannedPeer { peer_id, endpoint } => { + debug!( + target: "sub-libp2p", + "Libp2p => BannedPeer({}). Connected via {:?}.", + peer_id, endpoint, + ); + if let Some(metrics) = self.metrics.as_ref() { + metrics.incoming_connections_errors_total.with_label_values(&["banned"]).inc(); + } + }, + SwarmEvent::ListenerClosed { reason, addresses, .. } => { + if let Some(metrics) = self.metrics.as_ref() { + metrics.listeners_local_addresses.sub(addresses.len() as u64); + } + let addrs = + addresses.into_iter().map(|a| a.to_string()).collect::>().join(", "); + match reason { + Ok(()) => error!( + target: "sub-libp2p", + "📪 Libp2p listener ({}) closed gracefully", + addrs + ), + Err(e) => error!( + target: "sub-libp2p", + "📪 Libp2p listener ({}) closed: {}", + addrs, e + ), + } + }, + SwarmEvent::ListenerError { error, .. } => { + debug!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error); + if let Some(metrics) = self.metrics.as_ref() { + metrics.listeners_errors_total.inc(); + } + }, } - - Poll::Pending } } diff --git a/client/network/src/service/tests/chain_sync.rs b/client/network/src/service/tests/chain_sync.rs index 9d8463ff190a8..1ae432fd4c235 100644 --- a/client/network/src/service/tests/chain_sync.rs +++ b/client/network/src/service/tests/chain_sync.rs @@ -75,12 +75,8 @@ async fn normal_network_poll_no_peers() { .with_chain_sync((chain_sync, chain_sync_service)) .build(); - // poll the network once - futures::future::poll_fn(|cx| { - let _ = network.network().poll_unpin(cx); - Poll::Ready(()) - }) - .await; + // perform one action on network + let _ = network.network().next_action().await; } #[tokio::test] @@ -110,11 +106,8 @@ async fn request_justification() { // send "request justifiction" message and poll the network network.service().request_justification(&hash, number); - futures::future::poll_fn(|cx| { - let _ = network.network().poll_unpin(cx); - Poll::Ready(()) - }) - .await; + // perform one action on network + let _ = network.network().next_action().await; } #[tokio::test] @@ -141,11 +134,8 @@ async fn clear_justification_requests() { // send "request justifiction" message and poll the network network.service().clear_justification_requests(); - futures::future::poll_fn(|cx| { - let _ = network.network().poll_unpin(cx); - Poll::Ready(()) - }) - .await; + // perform one action on network + let _ = network.network().next_action().await; } #[tokio::test] @@ -180,11 +170,8 @@ async fn set_sync_fork_request() { // send "set sync fork request" message and poll the network network.service().set_sync_fork_request(copy_peers, hash, number); - futures::future::poll_fn(|cx| { - let _ = network.network().poll_unpin(cx); - Poll::Ready(()) - }) - .await; + // perform one action on network + let _ = network.network().next_action().await; } #[tokio::test] @@ -225,11 +212,8 @@ async fn on_block_finalized() { // send "set sync fork request" message and poll the network network.network().on_block_finalized(hash, header); - futures::future::poll_fn(|cx| { - let _ = network.network().poll_unpin(cx); - Poll::Ready(()) - }) - .await; + // perform one action on network + let _ = network.network().next_action().await; } // report from mock import queue that importing a justification was not successful diff --git a/client/network/src/service/tests/mod.rs b/client/network/src/service/tests/mod.rs index 9c97a7f73837d..3ce139ff03a33 100644 --- a/client/network/src/service/tests/mod.rs +++ b/client/network/src/service/tests/mod.rs @@ -80,10 +80,7 @@ impl TestNetwork { let service = worker.service().clone(); let event_stream = service.event_stream("test"); - tokio::spawn(async move { - futures::pin_mut!(worker); - let _ = worker.await; - }); + tokio::spawn(worker.run()); (service, event_stream) } diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index ffc0edaf31e72..f56ef13fc9065 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -1358,6 +1358,12 @@ where ); } }, + ToServiceCommand::BlockFinalized(hash, number) => { + self.on_block_finalized(&hash, number); + }, + ToServiceCommand::Status { pending_response } => { + let _ = pending_response.send(self.status()); + }, } } diff --git a/client/network/sync/src/service/chain_sync.rs b/client/network/sync/src/service/chain_sync.rs index 50ded5b643dea..824303ec09d70 100644 --- a/client/network/sync/src/service/chain_sync.rs +++ b/client/network/sync/src/service/chain_sync.rs @@ -16,9 +16,10 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use futures::channel::oneshot; use libp2p::PeerId; use sc_consensus::{BlockImportError, BlockImportStatus, JustificationSyncLink, Link}; -use sc_network_common::service::NetworkSyncForkRequest; +use sc_network_common::{service::NetworkSyncForkRequest, sync::SyncStatus}; use sc_utils::mpsc::TracingUnboundedSender; use sp_runtime::traits::{Block as BlockT, NumberFor}; @@ -34,6 +35,10 @@ pub enum ToServiceCommand { Vec<(Result>, BlockImportError>, B::Hash)>, ), JustificationImported(PeerId, B::Hash, NumberFor, bool), + BlockFinalized(B::Hash, NumberFor), + Status { + pending_response: oneshot::Sender>, + }, } /// Handle for communicating with `ChainSync` asynchronously @@ -47,6 +52,21 @@ impl ChainSyncInterfaceHandle { pub fn new(tx: TracingUnboundedSender>) -> Self { Self { tx } } + + /// Notify ChainSync about finalized block + pub fn on_block_finalized(&self, hash: B::Hash, number: NumberFor) { + let _ = self.tx.unbounded_send(ToServiceCommand::BlockFinalized(hash, number)); + } + + /// Get sync status + /// + /// Returns an error if `ChainSync` has terminated. + pub async fn status(&self) -> Result, ()> { + let (tx, rx) = oneshot::channel(); + let _ = self.tx.unbounded_send(ToServiceCommand::Status { pending_response: tx }); + + rx.await.map_err(|_| ()) + } } impl NetworkSyncForkRequest> diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index ccaebc976135b..c47e3c86f5c0e 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -31,7 +31,7 @@ use std::{ time::Duration, }; -use futures::{channel::oneshot, future::BoxFuture, prelude::*}; +use futures::{channel::oneshot, future::BoxFuture, pin_mut, prelude::*}; use libp2p::{build_multiaddr, PeerId}; use log::trace; use parking_lot::Mutex; @@ -83,7 +83,7 @@ use sp_runtime::{ }; use substrate_test_runtime_client::AccountKeyring; pub use substrate_test_runtime_client::{ - runtime::{Block, Extrinsic, Hash, Transfer}, + runtime::{Block, Extrinsic, Hash, Header, Transfer}, TestClient, TestClientBuilder, TestClientBuilderExt, }; use tokio::time::timeout; @@ -1078,8 +1078,17 @@ where self.mut_peers(|peers| { for (i, peer) in peers.iter_mut().enumerate() { trace!(target: "sync", "-- Polling {}: {}", i, peer.id()); - if let Poll::Ready(()) = peer.network.poll_unpin(cx) { - panic!("NetworkWorker has terminated unexpectedly.") + loop { + // The code below is not quite correct, because we are polling a different + // instance of the future every time. But as long as + // `NetworkWorker::next_action()` contains just streams polling not interleaved + // with other `.await`s, dropping the future and recreating it works the same as + // polling a single instance. + let net_poll_future = peer.network.next_action(); + pin_mut!(net_poll_future); + if let Poll::Pending = net_poll_future.poll(cx) { + break + } } trace!(target: "sync", "-- Polling complete {}: {}", i, peer.id()); diff --git a/client/offchain/src/api.rs b/client/offchain/src/api.rs index 1301ce9fd9627..cd9f5c8f6feb9 100644 --- a/client/offchain/src/api.rs +++ b/client/offchain/src/api.rs @@ -419,6 +419,10 @@ mod tests { fn local_peer_id(&self) -> PeerId { PeerId::random() } + + fn listen_addresses(&self) -> Vec { + Vec::new() + } } fn offchain_api() -> (Api, AsyncApi) { diff --git a/client/offchain/src/lib.rs b/client/offchain/src/lib.rs index 6b28d3f8a48e9..6cf5838a46bed 100644 --- a/client/offchain/src/lib.rs +++ b/client/offchain/src/lib.rs @@ -270,6 +270,10 @@ mod tests { fn local_peer_id(&self) -> PeerId { PeerId::random() } + + fn listen_addresses(&self) -> Vec { + Vec::new() + } } impl NetworkPeers for TestNetwork { diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index a737601f71b83..fb80753b4cbf6 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -17,7 +17,7 @@ // along with this program. If not, see . use crate::{ - build_network_future, + build_network_future, build_system_rpc_future, client::{Client, ClientConfig}, config::{Configuration, KeystoreConfig, PrometheusConfig}, error::Error, @@ -963,19 +963,29 @@ where Some("networking"), chain_sync_network_provider.run(network.clone()), ); - spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(chain_sync_service))); + spawn_handle.spawn( + "import-queue", + None, + import_queue.run(Box::new(chain_sync_service.clone())), + ); let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000); - - let future = build_network_future( - config.role.clone(), - network_mut, - client, - system_rpc_rx, - has_bootnodes, - config.announce_block, + spawn_handle.spawn( + "system-rpc-handler", + Some("networking"), + build_system_rpc_future( + config.role.clone(), + network_mut.service().clone(), + chain_sync_service.clone(), + client.clone(), + system_rpc_rx, + has_bootnodes, + ), ); + let future = + build_network_future(network_mut, client, chain_sync_service, config.announce_block); + // TODO: Normally, one is supposed to pass a list of notifications protocols supported by the // node through the `NetworkConfiguration` struct. But because this function doesn't know in // advance which components, such as GrandPa or Polkadot, will be plugged on top of the diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index e3dcf012892c3..253479abc3bca 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -37,12 +37,16 @@ mod task_manager; use std::{collections::HashMap, net::SocketAddr}; use codec::{Decode, Encode}; -use futures::{channel::mpsc, FutureExt, StreamExt}; +use futures::{channel::mpsc, pin_mut, FutureExt, StreamExt}; use jsonrpsee::{core::Error as JsonRpseeError, RpcModule}; use log::{debug, error, warn}; use sc_client_api::{blockchain::HeaderBackend, BlockBackend, BlockchainEvents, ProofProvider}; -use sc_network::PeerId; -use sc_network_common::{config::MultiaddrWithPeerId, service::NetworkBlock}; +use sc_network::{NetworkStateInfo, PeerId}; +use sc_network_common::{ + config::MultiaddrWithPeerId, + service::{NetworkBlock, NetworkPeers}, +}; +use sc_network_sync::service::chain_sync::ChainSyncInterfaceHandle; use sc_utils::mpsc::TracingUnboundedReceiver; use sp_blockchain::HeaderMetadata; use sp_consensus::SyncOracle; @@ -138,9 +142,7 @@ pub struct PartialComponents @@ -153,21 +155,21 @@ async fn build_network_future< + 'static, H: sc_network_common::ExHashT, >( - role: Role, - mut network: sc_network::NetworkWorker, + network: sc_network::NetworkWorker, client: Arc, - mut rpc_rx: TracingUnboundedReceiver>, - should_have_peers: bool, + chain_sync_service: ChainSyncInterfaceHandle, announce_imported_blocks: bool, ) { let mut imported_blocks_stream = client.import_notification_stream().fuse(); - // Current best block at initialization, to report to the RPC layer. - let starting_block = client.info().best_number; - // Stream of finalized blocks reported by the client. let mut finality_notification_stream = client.finality_notification_stream().fuse(); + let network_service = network.service().clone(); + + let network_run = network.run().fuse(); + pin_mut!(network_run); + loop { futures::select! { // List of blocks that the client has imported. @@ -176,15 +178,18 @@ async fn build_network_future< Some(n) => n, // If this stream is shut down, that means the client has shut down, and the // most appropriate thing to do for the network future is to shut down too. - None => return, + None => { + debug!("Block import stream has terminated, shutting down the network future."); + return + }, }; if announce_imported_blocks { - network.service().announce_block(notification.hash, None); + network_service.announce_block(notification.hash, None); } if notification.is_new_best { - network.service().new_best_block_imported( + network_service.new_best_block_imported( notification.hash, *notification.header.number(), ); @@ -193,106 +198,160 @@ async fn build_network_future< // List of blocks that the client has finalized. notification = finality_notification_stream.select_next_some() => { - network.on_block_finalized(notification.hash, notification.header); + chain_sync_service.on_block_finalized(notification.hash, *notification.header.number()); } - // Answer incoming RPC requests. - request = rpc_rx.select_next_some() => { - match request { - sc_rpc::system::Request::Health(sender) => { - let _ = sender.send(sc_rpc::system::Health { - peers: network.peers_debug_info().len(), - is_syncing: network.service().is_major_syncing(), - should_have_peers, - }); - }, - sc_rpc::system::Request::LocalPeerId(sender) => { - let _ = sender.send(network.local_peer_id().to_base58()); - }, - sc_rpc::system::Request::LocalListenAddresses(sender) => { - let peer_id = (*network.local_peer_id()).into(); - let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id); - let addresses = network.listen_addresses() - .map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string()) - .collect(); - let _ = sender.send(addresses); - }, - sc_rpc::system::Request::Peers(sender) => { - let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| - sc_rpc::system::PeerInfo { + // Drive the network. Shut down the network future if `NetworkWorker` has terminated. + _ = network_run => { + debug!("`NetworkWorker` has terminated, shutting down the network future."); + return + } + } + } +} + +/// Builds a future that processes system RPC requests. +async fn build_system_rpc_future< + B: BlockT, + C: BlockchainEvents + + HeaderBackend + + BlockBackend + + HeaderMetadata + + ProofProvider + + Send + + Sync + + 'static, + H: sc_network_common::ExHashT, +>( + role: Role, + network_service: Arc>, + chain_sync_service: ChainSyncInterfaceHandle, + client: Arc, + mut rpc_rx: TracingUnboundedReceiver>, + should_have_peers: bool, +) { + // Current best block at initialization, to report to the RPC layer. + let starting_block = client.info().best_number; + + loop { + // Answer incoming RPC requests. + let Some(req) = rpc_rx.next().await else { + debug!("RPC requests stream has terminated, shutting down the system RPC future."); + return; + }; + + match req { + sc_rpc::system::Request::Health(sender) => { + let peers = network_service.peers_debug_info().await; + if let Ok(peers) = peers { + let _ = sender.send(sc_rpc::system::Health { + peers: peers.len(), + is_syncing: network_service.is_major_syncing(), + should_have_peers, + }); + } else { + break + } + }, + sc_rpc::system::Request::LocalPeerId(sender) => { + let _ = sender.send(network_service.local_peer_id().to_base58()); + }, + sc_rpc::system::Request::LocalListenAddresses(sender) => { + let peer_id = network_service.local_peer_id().into(); + let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id); + let addresses = network_service + .listen_addresses() + .iter() + .map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string()) + .collect(); + let _ = sender.send(addresses); + }, + sc_rpc::system::Request::Peers(sender) => { + let peers = network_service.peers_debug_info().await; + if let Ok(peers) = peers { + let _ = sender.send( + peers + .into_iter() + .map(|(peer_id, p)| sc_rpc::system::PeerInfo { peer_id: peer_id.to_base58(), roles: format!("{:?}", p.roles), best_hash: p.best_hash, best_number: p.best_number, - } - ).collect()); - } - sc_rpc::system::Request::NetworkState(sender) => { - if let Ok(network_state) = serde_json::to_value(&network.network_state()) { - let _ = sender.send(network_state); - } - } - sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => { - let result = match MultiaddrWithPeerId::try_from(peer_addr) { - Ok(peer) => { - network.add_reserved_peer(peer) - }, - Err(err) => { - Err(err.to_string()) - }, - }; - let x = result.map_err(sc_rpc::system::error::Error::MalformattedPeerArg); - let _ = sender.send(x); - } - sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => { - let _ = match peer_id.parse::() { - Ok(peer_id) => { - network.remove_reserved_peer(peer_id); - sender.send(Ok(())) - } - Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg( - e.to_string(), - ))), - }; - } - sc_rpc::system::Request::NetworkReservedPeers(sender) => { - let reserved_peers = network.reserved_peers(); - let reserved_peers = reserved_peers - .map(|peer_id| peer_id.to_base58()) - .collect(); - - let _ = sender.send(reserved_peers); + }) + .collect(), + ); + } else { + break + } + }, + sc_rpc::system::Request::NetworkState(sender) => { + let network_state = network_service.network_state().await; + if let Ok(network_state) = network_state { + if let Ok(network_state) = serde_json::to_value(network_state) { + let _ = sender.send(network_state); } - sc_rpc::system::Request::NodeRoles(sender) => { - use sc_rpc::system::NodeRole; + } else { + break + } + }, + sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => { + let result = match MultiaddrWithPeerId::try_from(peer_addr) { + Ok(peer) => network_service.add_reserved_peer(peer), + Err(err) => Err(err.to_string()), + }; + let x = result.map_err(sc_rpc::system::error::Error::MalformattedPeerArg); + let _ = sender.send(x); + }, + sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => { + let _ = match peer_id.parse::() { + Ok(peer_id) => { + network_service.remove_reserved_peer(peer_id); + sender.send(Ok(())) + }, + Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg( + e.to_string(), + ))), + }; + }, + sc_rpc::system::Request::NetworkReservedPeers(sender) => { + let reserved_peers = network_service.reserved_peers().await; + if let Ok(reserved_peers) = reserved_peers { + let reserved_peers = + reserved_peers.iter().map(|peer_id| peer_id.to_base58()).collect(); + let _ = sender.send(reserved_peers); + } else { + break + } + }, + sc_rpc::system::Request::NodeRoles(sender) => { + use sc_rpc::system::NodeRole; - let node_role = match role { - Role::Authority { .. } => NodeRole::Authority, - Role::Full => NodeRole::Full, - }; + let node_role = match role { + Role::Authority { .. } => NodeRole::Authority, + Role::Full => NodeRole::Full, + }; - let _ = sender.send(vec![node_role]); - } - sc_rpc::system::Request::SyncState(sender) => { - use sc_rpc::system::SyncState; + let _ = sender.send(vec![node_role]); + }, + sc_rpc::system::Request::SyncState(sender) => { + use sc_rpc::system::SyncState; - let best_number = client.info().best_number; + let best_number = client.info().best_number; - let _ = sender.send(SyncState { - starting_block, - current_block: best_number, - highest_block: network.best_seen_block().unwrap_or(best_number), - }); - } - } - } + let Ok(status) = chain_sync_service.status().await else { + debug!("`ChainSync` has terminated, shutting down the system RPC future."); + return + }; - // The network worker has done something. Nothing special to do, but could be - // used in the future to perform actions in response of things that happened on - // the network. - _ = (&mut network).fuse() => {} + let _ = sender.send(SyncState { + starting_block, + current_block: best_number, + highest_block: status.best_seen_block.unwrap_or(best_number), + }); + }, } } + debug!("`NetworkWorker` has terminated, shutting down the system RPC future."); } // Wrapper for HTTP and WS servers that makes sure they are properly shut down.