From c5f590a1f7705259a93a3b38a992952c06d704a7 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 20 Apr 2020 15:25:10 +0200 Subject: [PATCH 1/2] Turn NetworkWorker and build_network_future into async/await --- client/network/src/lib.rs | 1 + client/network/src/service.rs | 555 ++++++++++----------- client/network/src/service/import_queue.rs | 181 +++++++ client/network/src/service/tests.rs | 7 +- client/network/test/src/lib.rs | 8 +- client/service/src/builder.rs | 6 +- client/service/src/lib.rs | 224 +++++---- client/service/src/status_sinks.rs | 148 ++++-- 8 files changed, 691 insertions(+), 439 deletions(-) create mode 100644 client/network/src/service/import_queue.rs diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index d8afa1f1530ef..7d6e91b6c69a2 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -16,6 +16,7 @@ #![warn(unused_extern_crates)] #![warn(missing_docs)] +#![recursion_limit = "2048"] //! Substrate-specific P2P networking. //! diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 091c75d63560d..daf1ca997243e 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -49,7 +49,6 @@ use prometheus_endpoint::{ register, Counter, CounterVec, Gauge, GaugeVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64, }; use sc_peerset::PeersetHandle; -use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link}; use sp_runtime::{ traits::{Block as BlockT, NumberFor}, ConsensusEngineId, @@ -58,7 +57,7 @@ use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnbound use std::{ borrow::Cow, collections::{HashMap, HashSet}, - fs, io, + fs, marker::PhantomData, pin::Pin, str, @@ -66,9 +65,9 @@ use std::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, }, - task::Poll, }; +mod import_queue; mod out_events; #[cfg(test)] mod tests; @@ -182,7 +181,7 @@ impl NetworkWorker { /// for the network processing to advance. From it, you can extract a `NetworkService` using /// `worker.service()`. The `NetworkService` can be shared through the codebase. pub fn new(params: Params) -> Result, Error> { - let (to_worker, from_worker) = tracing_unbounded("mpsc_network_worker"); + let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker"); if let Some(path) = params.network_config.net_config_path { fs::create_dir_all(&path)?; @@ -400,8 +399,8 @@ impl NetworkWorker { is_major_syncing, network_service: swarm, service, - import_queue: params.import_queue, - from_worker, + import_queue: From::from(params.import_queue), + from_service, light_client_rqs: params.on_demand.and_then(|od| od.extract_receiver()), event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?, metrics, @@ -858,9 +857,9 @@ pub struct NetworkWorker { /// The *actual* network. network_service: Swarm, /// The import queue that was passed as initialization. - import_queue: Box>, + import_queue: import_queue::SmartImportQueue, /// Messages from the `NetworkService` and that must be processed. - from_worker: TracingUnboundedReceiver>, + from_service: TracingUnboundedReceiver>, /// Receiver for queries from the light client that must be processed. light_client_rqs: Option>>, /// Senders for events that happen on the network. @@ -1071,282 +1070,314 @@ impl Metrics { } } -impl Future for NetworkWorker { - type Output = Result<(), io::Error>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll { - let this = &mut *self; - - // Poll the import queue for actions to perform. - this.import_queue.poll_actions(cx, &mut NetworkLink { - protocol: &mut this.network_service, - }); - +impl NetworkWorker { + /// Performs one action on the network, then returns. + pub async fn next_action(&mut self) { // Check for new incoming light client requests. - if let Some(light_client_rqs) = this.light_client_rqs.as_mut() { - while let Poll::Ready(Some(rq)) = light_client_rqs.poll_next_unpin(cx) { - // This can error if there are too many queued requests already. - if this.network_service.light_client_request(rq).is_err() { - log::warn!("Couldn't start light client request: too many pending requests"); - } - if let Some(metrics) = this.metrics.as_ref() { - metrics.issued_light_requests.inc(); - } - } - } - - loop { - // Process the next message coming from the `NetworkService`. - let msg = match this.from_worker.poll_next_unpin(cx) { - Poll::Ready(Some(msg)) => msg, - Poll::Ready(None) => return Poll::Ready(Ok(())), - Poll::Pending => break, - }; + let next_light_client_rq = if let Some(light_client_rqs) = self.light_client_rqs.as_mut() { + light_client_rqs.next().left_future() + } else { + future::pending().right_future() + }; - match msg { - ServiceToWorkerMsg::AnnounceBlock(hash, data) => - this.network_service.user_protocol_mut().announce_block(hash, data), - ServiceToWorkerMsg::RequestJustification(hash, number) => - this.network_service.user_protocol_mut().request_justification(&hash, number), - ServiceToWorkerMsg::PropagateExtrinsic(hash) => - this.network_service.user_protocol_mut().propagate_extrinsic(&hash), - ServiceToWorkerMsg::PropagateExtrinsics => - this.network_service.user_protocol_mut().propagate_extrinsics(), - ServiceToWorkerMsg::GetValue(key) => - this.network_service.get_value(&key), - ServiceToWorkerMsg::PutValue(key, value) => - this.network_service.put_value(key, value), - ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => - this.network_service.add_known_address(peer_id, addr), - ServiceToWorkerMsg::SyncFork(peer_ids, hash, number) => - this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number), - ServiceToWorkerMsg::EventStream(sender) => - this.event_streams.push(sender), - ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => { - if let Some(metrics) = this.metrics.as_ref() { - metrics.notifications_sizes - .with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)]) - .observe(message.len() as f64); + // Next message from the service, or an "infinite loop" if the service is closed. + let next_worker_msg = { + let from_service = &mut self.from_service; + async move { + if let Some(msg) = from_service.next().await { + msg + } else { + loop { + futures::pending!() } - this.network_service.user_protocol_mut().write_notification(target, engine_id, message) - }, - ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => { - this.network_service - .register_notifications_protocol(engine_id, protocol_name); - }, - ServiceToWorkerMsg::DisconnectPeer(who) => - this.network_service.user_protocol_mut().disconnect_peer(&who), + } } - } + }; - loop { - // Process the next action coming from the network. - let next_event = this.network_service.next_event(); - futures::pin_mut!(next_event); - let poll_value = next_event.poll_unpin(cx); - - match poll_value { - Poll::Pending => break, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::BlockImport(origin, blocks))) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics.import_queue_blocks_submitted.inc(); - } - this.import_queue.import_blocks(origin, blocks); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::JustificationImport(origin, hash, nb, justification))) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics.import_queue_justifications_submitted.inc(); - } - this.import_queue.import_justification(origin, hash, nb, justification); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::FinalityProofImport(origin, hash, nb, proof))) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics.import_queue_finality_proofs_submitted.inc(); - } - this.import_queue.import_finality_proof(origin, hash, nb, proof); - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted(protocol))) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics.kademlia_random_queries_total - .with_label_values(&[&maybe_utf8_bytes_to_string(protocol.as_bytes())]) - .inc(); - } - }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::Event(ev))) => { - if let Some(metrics) = this.metrics.as_ref() { - metrics.update_with_network_event(&ev); - } - this.event_streams.send(ev); - }, - Poll::Ready(SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. }) => { - trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id); - if let Some(metrics) = this.metrics.as_ref() { - match endpoint { - ConnectedPoint::Dialer { .. } => - metrics.connections_opened_total.with_label_values(&["out"]).inc(), - ConnectedPoint::Listener { .. } => - metrics.connections_opened_total.with_label_values(&["in"]).inc(), + futures::select!{ + msg = next_worker_msg.fuse() => { + match msg { + ServiceToWorkerMsg::AnnounceBlock(hash, data) => + self.network_service.user_protocol_mut().announce_block(hash, data), + ServiceToWorkerMsg::RequestJustification(hash, number) => + self.network_service.user_protocol_mut().request_justification(&hash, number), + ServiceToWorkerMsg::PropagateExtrinsic(hash) => + self.network_service.user_protocol_mut().propagate_extrinsic(&hash), + ServiceToWorkerMsg::PropagateExtrinsics => + self.network_service.user_protocol_mut().propagate_extrinsics(), + ServiceToWorkerMsg::GetValue(key) => + self.network_service.get_value(&key), + ServiceToWorkerMsg::PutValue(key, value) => + self.network_service.put_value(key, value), + ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => + self.network_service.add_known_address(peer_id, addr), + ServiceToWorkerMsg::SyncFork(peer_ids, hash, number) => + self.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number), + ServiceToWorkerMsg::EventStream(sender) => + self.event_streams.push(sender), + ServiceToWorkerMsg::WriteNotification { message, engine_id, target } => { + if let Some(metrics) = self.metrics.as_ref() { + metrics.notifications_sizes + .with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)]) + .observe(message.len() as f64); } + self.network_service.user_protocol_mut().write_notification(target, engine_id, message) + }, + ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => { + self.network_service + .register_notifications_protocol(engine_id, protocol_name); + }, + ServiceToWorkerMsg::DisconnectPeer(who) => + self.network_service.user_protocol_mut().disconnect_peer(&who), + } + }, + + action = self.import_queue.next_action().fuse() => { + match action { + import_queue::ImportQueueAction::BlocksProcessed { imported, count, results } => { + self.network_service.user_protocol_mut().blocks_processed(imported, count, results) } - }, - Poll::Ready(SwarmEvent::ConnectionClosed { peer_id, cause, endpoint, .. }) => { - trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause); - if let Some(metrics) = this.metrics.as_ref() { - let dir = match endpoint { - ConnectedPoint::Dialer { .. } => "out", - ConnectedPoint::Listener { .. } => "in", - }; - - match cause { - ConnectionError::IO(_) => - metrics.connections_closed_total.with_label_values(&[dir, "transport-error"]).inc(), - ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::B(EitherError::A(PingFailure::Timeout))))))) => - metrics.connections_closed_total.with_label_values(&[dir, "ping-timeout"]).inc(), - ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::A(EitherError::B(LegacyConnectionKillError))))))) => - metrics.connections_closed_total.with_label_values(&[dir, "force-closed"]).inc(), - ConnectionError::Handler(NodeHandlerWrapperError::Handler(_)) => - metrics.connections_closed_total.with_label_values(&[dir, "protocol-error"]).inc(), - ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout) => - metrics.connections_closed_total.with_label_values(&[dir, "keep-alive-timeout"]).inc(), + import_queue::ImportQueueAction::JustificationImported { who, hash, number, success } => { + self.network_service.user_protocol_mut().justification_import_result(hash.clone(), number, success); + if !success { + info!("💔 Invalid justification provided by {} for #{}", who, hash); + self.network_service.user_protocol_mut().disconnect_peer(&who); + self.network_service.user_protocol_mut().report_peer(who, ReputationChange::new_fatal("Invalid justification")); } } - }, - Poll::Ready(SwarmEvent::NewListenAddr(addr)) => { - trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", addr); - if let Some(metrics) = this.metrics.as_ref() { - metrics.listeners_local_addresses.inc(); - } - }, - Poll::Ready(SwarmEvent::ExpiredListenAddr(addr)) => { - trace!(target: "sub-libp2p", "Libp2p => ExpiredListenAddr({})", addr); - if let Some(metrics) = this.metrics.as_ref() { - metrics.listeners_local_addresses.dec(); + import_queue::ImportQueueAction::RequestJustification { hash, number } => { + self.network_service.user_protocol_mut().request_justification(&hash, number) } - }, - Poll::Ready(SwarmEvent::UnreachableAddr { peer_id, address, error, .. }) => { - trace!( - target: "sub-libp2p", "Libp2p => Failed to reach {:?} through {:?}: {}", - peer_id, - address, - error, - ); - - if this.boot_node_ids.contains(&peer_id) { - if let PendingConnectionError::InvalidPeerId = error { - error!( - "💔 Invalid peer ID from bootnode, expected `{}` at address `{}`.", - peer_id, - address, - ); - } + import_queue::ImportQueueAction::RequestFinalityProof { hash, number } => { + self.network_service.user_protocol_mut().request_finality_proof(&hash, number) } - - if let Some(metrics) = this.metrics.as_ref() { - match error { - PendingConnectionError::ConnectionLimit(_) => - metrics.pending_connections_errors_total.with_label_values(&["limit-reached"]).inc(), - PendingConnectionError::InvalidPeerId => - metrics.pending_connections_errors_total.with_label_values(&["invalid-peer-id"]).inc(), - PendingConnectionError::Transport(_) | PendingConnectionError::IO(_) => - metrics.pending_connections_errors_total.with_label_values(&["transport-error"]).inc(), + import_queue::ImportQueueAction::FinalityProofImported { who, request_block, finalization_result } => { + let success = finalization_result.is_ok(); + self.network_service.user_protocol_mut().finality_proof_import_result(request_block, finalization_result); + if !success { + info!("💔 Invalid finality proof provided by {} for #{}", who, request_block.0); + self.network_service.user_protocol_mut().disconnect_peer(&who); + self.network_service.user_protocol_mut().report_peer(who, ReputationChange::new_fatal("Invalid finality proof")); } } } - Poll::Ready(SwarmEvent::Dialing(peer_id)) => - trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id), - Poll::Ready(SwarmEvent::IncomingConnection { local_addr, send_back_addr }) => { - trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))", - local_addr, send_back_addr); - if let Some(metrics) = this.metrics.as_ref() { - metrics.incoming_connections_total.inc(); - } - }, - Poll::Ready(SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error }) => { - trace!(target: "sub-libp2p", "Libp2p => IncomingConnectionError({},{}): {}", - local_addr, send_back_addr, error); - if let Some(metrics) = this.metrics.as_ref() { - let reason = match error { - PendingConnectionError::ConnectionLimit(_) => "limit-reached", - PendingConnectionError::InvalidPeerId => "invalid-peer-id", - PendingConnectionError::Transport(_) | - PendingConnectionError::IO(_) => "transport-error", - }; - - metrics.incoming_connections_errors_total.with_label_values(&[reason]).inc(); + }, + + rq = next_light_client_rq.fuse() => { + if let Some(rq) = rq { + // This can error if there are too many queued requests already. + if self.network_service.light_client_request(rq).is_err() { + log::warn!("Couldn't start light client request: too many pending requests"); } - }, - Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint }) => { - trace!(target: "sub-libp2p", "Libp2p => BannedPeer({}). Connected via {:?}.", - peer_id, endpoint); - if let Some(metrics) = this.metrics.as_ref() { - metrics.incoming_connections_errors_total.with_label_values(&["banned"]).inc(); + if let Some(metrics) = self.metrics.as_ref() { + metrics.issued_light_requests.inc(); } - }, - Poll::Ready(SwarmEvent::UnknownPeerUnreachableAddr { address, error }) => - trace!(target: "sub-libp2p", "Libp2p => UnknownPeerUnreachableAddr({}): {}", - address, error), - Poll::Ready(SwarmEvent::ListenerClosed { reason, addresses }) => { - warn!(target: "sub-libp2p", "Libp2p => ListenerClosed: {:?}", reason); - if let Some(metrics) = this.metrics.as_ref() { - metrics.listeners_local_addresses.sub(addresses.len() as u64); - } - }, - Poll::Ready(SwarmEvent::ListenerError { error }) => { - trace!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error); - if let Some(metrics) = this.metrics.as_ref() { - metrics.listeners_errors_total.inc(); + } + } + + next_event = self.network_service.next_event().fuse() => { + match next_event { + SwarmEvent::Behaviour(BehaviourOut::BlockImport(origin, blocks)) => { + if let Some(metrics) = self.metrics.as_ref() { + metrics.import_queue_blocks_submitted.inc(); + } + self.import_queue.import_blocks(origin, blocks); + }, + SwarmEvent::Behaviour(BehaviourOut::JustificationImport(origin, hash, nb, justification)) => { + if let Some(metrics) = self.metrics.as_ref() { + metrics.import_queue_justifications_submitted.inc(); + } + self.import_queue.import_justification(origin, hash, nb, justification); + }, + SwarmEvent::Behaviour(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)) => { + if let Some(metrics) = self.metrics.as_ref() { + metrics.import_queue_finality_proofs_submitted.inc(); + } + self.import_queue.import_finality_proof(origin, hash, nb, proof); + }, + SwarmEvent::Behaviour(BehaviourOut::RandomKademliaStarted(protocol)) => { + if let Some(metrics) = self.metrics.as_ref() { + metrics.kademlia_random_queries_total + .with_label_values(&[&maybe_utf8_bytes_to_string(protocol.as_bytes())]) + .inc(); + } + }, + SwarmEvent::Behaviour(BehaviourOut::Event(ev)) => { + if let Some(metrics) = self.metrics.as_ref() { + metrics.update_with_network_event(&ev); + } + self.event_streams.send(ev); + }, + SwarmEvent::ConnectionEstablished { peer_id, endpoint, .. } => { + trace!(target: "sub-libp2p", "Libp2p => Connected({:?})", peer_id); + if let Some(metrics) = self.metrics.as_ref() { + match endpoint { + ConnectedPoint::Dialer { .. } => + metrics.connections_opened_total.with_label_values(&["out"]).inc(), + ConnectedPoint::Listener { .. } => + metrics.connections_opened_total.with_label_values(&["in"]).inc(), + } + } + }, + SwarmEvent::ConnectionClosed { peer_id, cause, endpoint, .. } => { + trace!(target: "sub-libp2p", "Libp2p => Disconnected({:?}, {:?})", peer_id, cause); + if let Some(metrics) = self.metrics.as_ref() { + let dir = match endpoint { + ConnectedPoint::Dialer { .. } => "out", + ConnectedPoint::Listener { .. } => "in", + }; + + match cause { + ConnectionError::IO(_) => + metrics.connections_closed_total.with_label_values(&[dir, "transport-error"]).inc(), + ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( + EitherError::A(EitherError::B(EitherError::A(PingFailure::Timeout))))))) => + metrics.connections_closed_total.with_label_values(&[dir, "ping-timeout"]).inc(), + ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( + EitherError::A(EitherError::A(EitherError::B(LegacyConnectionKillError))))))) => + metrics.connections_closed_total.with_label_values(&[dir, "force-closed"]).inc(), + ConnectionError::Handler(NodeHandlerWrapperError::Handler(_)) => + metrics.connections_closed_total.with_label_values(&[dir, "protocol-error"]).inc(), + ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout) => + metrics.connections_closed_total.with_label_values(&[dir, "keep-alive-timeout"]).inc(), + } + } + }, + SwarmEvent::NewListenAddr(addr) => { + trace!(target: "sub-libp2p", "Libp2p => NewListenAddr({})", addr); + if let Some(metrics) = self.metrics.as_ref() { + metrics.listeners_local_addresses.inc(); + } + }, + SwarmEvent::ExpiredListenAddr(addr) => { + trace!(target: "sub-libp2p", "Libp2p => ExpiredListenAddr({})", addr); + if let Some(metrics) = self.metrics.as_ref() { + metrics.listeners_local_addresses.dec(); + } + }, + SwarmEvent::UnreachableAddr { peer_id, address, error, .. } => { + trace!( + target: "sub-libp2p", "Libp2p => Failed to reach {:?} through {:?}: {}", + peer_id, + address, + error, + ); + + if self.boot_node_ids.contains(&peer_id) { + if let PendingConnectionError::InvalidPeerId = error { + error!( + "💔 Invalid peer ID from bootnode, expected `{}` at address `{}`.", + peer_id, + address, + ); + } + } + + if let Some(metrics) = self.metrics.as_ref() { + match error { + PendingConnectionError::ConnectionLimit(_) => + metrics.pending_connections_errors_total + .with_label_values(&["limit-reached"]).inc(), + PendingConnectionError::InvalidPeerId => + metrics.pending_connections_errors_total + .with_label_values(&["invalid-peer-id"]).inc(), + PendingConnectionError::Transport(_) | PendingConnectionError::IO(_) => + metrics.pending_connections_errors_total + .with_label_values(&["transport-error"]).inc(), + } + } } - }, - }; + SwarmEvent::Dialing(peer_id) => + trace!(target: "sub-libp2p", "Libp2p => Dialing({:?})", peer_id), + SwarmEvent::IncomingConnection { local_addr, send_back_addr } => { + trace!(target: "sub-libp2p", "Libp2p => IncomingConnection({},{}))", + local_addr, send_back_addr); + if let Some(metrics) = self.metrics.as_ref() { + metrics.incoming_connections_total.inc(); + } + }, + SwarmEvent::IncomingConnectionError { local_addr, send_back_addr, error } => { + trace!(target: "sub-libp2p", "Libp2p => IncomingConnectionError({},{}): {}", + local_addr, send_back_addr, error); + if let Some(metrics) = self.metrics.as_ref() { + let reason = match error { + PendingConnectionError::ConnectionLimit(_) => "limit-reached", + PendingConnectionError::InvalidPeerId => "invalid-peer-id", + PendingConnectionError::Transport(_) | + PendingConnectionError::IO(_) => "transport-error", + }; + + metrics.incoming_connections_errors_total.with_label_values(&[reason]).inc(); + } + }, + SwarmEvent::BannedPeer { peer_id, endpoint } => { + trace!(target: "sub-libp2p", "Libp2p => BannedPeer({}). Connected via {:?}.", + peer_id, endpoint); + if let Some(metrics) = self.metrics.as_ref() { + metrics.incoming_connections_errors_total.with_label_values(&["banned"]).inc(); + } + }, + SwarmEvent::UnknownPeerUnreachableAddr { address, error } => + trace!(target: "sub-libp2p", "Libp2p => UnknownPeerUnreachableAddr({}): {}", + address, error), + SwarmEvent::ListenerClosed { reason, addresses } => { + warn!(target: "sub-libp2p", "Libp2p => ListenerClosed: {:?}", reason); + if let Some(metrics) = self.metrics.as_ref() { + metrics.listeners_local_addresses.sub(addresses.len() as u64); + } + }, + SwarmEvent::ListenerError { error } => { + trace!(target: "sub-libp2p", "Libp2p => ListenerError: {}", error); + if let Some(metrics) = self.metrics.as_ref() { + metrics.listeners_errors_total.inc(); + } + }, + }; + } } - let num_connected_peers = this.network_service.user_protocol_mut().num_connected_peers(); + let num_connected_peers = self.network_service.user_protocol_mut().num_connected_peers(); // Update the variables shared with the `NetworkService`. - this.num_connected.store(num_connected_peers, Ordering::Relaxed); + self.num_connected.store(num_connected_peers, Ordering::Relaxed); { - let external_addresses = Swarm::::external_addresses(&this.network_service).cloned().collect(); - *this.external_addresses.lock() = external_addresses; + let external_addresses = Swarm::::external_addresses(&self.network_service).cloned().collect(); + *self.external_addresses.lock() = external_addresses; } - let is_major_syncing = match this.network_service.user_protocol_mut().sync_state() { + let is_major_syncing = match self.network_service.user_protocol_mut().sync_state() { SyncState::Idle => false, SyncState::Downloading => true, }; - this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); + self.is_major_syncing.store(is_major_syncing, Ordering::Relaxed); - if let Some(metrics) = this.metrics.as_ref() { - metrics.network_per_sec_bytes.with_label_values(&["in"]).set(this.service.bandwidth.average_download_per_sec()); - metrics.network_per_sec_bytes.with_label_values(&["out"]).set(this.service.bandwidth.average_upload_per_sec()); + if let Some(metrics) = self.metrics.as_ref() { + metrics.network_per_sec_bytes.with_label_values(&["in"]) + .set(self.service.bandwidth.average_download_per_sec()); + metrics.network_per_sec_bytes.with_label_values(&["out"]) + .set(self.service.bandwidth.average_upload_per_sec()); metrics.is_major_syncing.set(is_major_syncing as u64); - for (proto, num_entries) in this.network_service.num_kbuckets_entries() { + for (proto, num_entries) in self.network_service.num_kbuckets_entries() { let proto = maybe_utf8_bytes_to_string(proto.as_bytes()); metrics.kbuckets_num_nodes.with_label_values(&[&proto]).set(num_entries as u64); } - for (proto, num_entries) in this.network_service.num_kademlia_records() { + for (proto, num_entries) in self.network_service.num_kademlia_records() { let proto = maybe_utf8_bytes_to_string(proto.as_bytes()); metrics.kademlia_records_count.with_label_values(&[&proto]).set(num_entries as u64); } - for (proto, num_entries) in this.network_service.kademlia_records_total_size() { + for (proto, num_entries) in self.network_service.kademlia_records_total_size() { let proto = maybe_utf8_bytes_to_string(proto.as_bytes()); metrics.kademlia_records_sizes_total.with_label_values(&[&proto]).set(num_entries as u64); } metrics.peers_count.set(num_connected_peers as u64); - metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64); - metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64); - metrics.pending_connections.set(Swarm::network_info(&this.network_service).num_connections_pending as u64); + metrics.peerset_num_discovered.set(self.network_service.user_protocol().num_discovered_peers() as u64); + metrics.peerset_num_requested.set(self.network_service.user_protocol().requested_peers().count() as u64); + metrics.pending_connections.set(Swarm::network_info(&self.network_service).num_connections_pending as u64); } - - Poll::Pending } } -impl Unpin for NetworkWorker { -} - /// Turns bytes that are potentially UTF-8 into a reasonable representable string. /// /// Meant to be used only for debugging or metrics-reporting purposes. @@ -1360,47 +1391,3 @@ fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow { /// The libp2p swarm, customized for our needs. type Swarm = libp2p::swarm::Swarm>; - -// Implementation of `import_queue::Link` trait using the available local variables. -struct NetworkLink<'a, B: BlockT, H: ExHashT> { - protocol: &'a mut Swarm, -} - -impl<'a, B: BlockT, H: ExHashT> Link for NetworkLink<'a, B, H> { - fn blocks_processed( - &mut self, - imported: usize, - count: usize, - results: Vec<(Result>, BlockImportError>, B::Hash)> - ) { - self.protocol.user_protocol_mut().blocks_processed(imported, count, results) - } - fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { - self.protocol.user_protocol_mut().justification_import_result(hash.clone(), number, success); - if !success { - info!("💔 Invalid justification provided by {} for #{}", who, hash); - self.protocol.user_protocol_mut().disconnect_peer(&who); - self.protocol.user_protocol_mut().report_peer(who, ReputationChange::new_fatal("Invalid justification")); - } - } - fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.user_protocol_mut().request_justification(hash, number) - } - fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { - self.protocol.user_protocol_mut().request_finality_proof(hash, number) - } - fn finality_proof_imported( - &mut self, - who: PeerId, - request_block: (B::Hash, NumberFor), - finalization_result: Result<(B::Hash, NumberFor), ()>, - ) { - let success = finalization_result.is_ok(); - self.protocol.user_protocol_mut().finality_proof_import_result(request_block, finalization_result); - if !success { - info!("💔 Invalid finality proof provided by {} for #{}", who, request_block.0); - self.protocol.user_protocol_mut().disconnect_peer(&who); - self.protocol.user_protocol_mut().report_peer(who, ReputationChange::new_fatal("Invalid finality proof")); - } - } -} diff --git a/client/network/src/service/import_queue.rs b/client/network/src/service/import_queue.rs new file mode 100644 index 0000000000000..e04a80433ae51 --- /dev/null +++ b/client/network/src/service/import_queue.rs @@ -0,0 +1,181 @@ +// Copyright 2017-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Wraps around a `Box>` and gives it a better future-oriented API. + +use crate::PeerId; + +use futures::prelude::*; +use sp_consensus::BlockOrigin; +use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link, Origin, IncomingBlock}; +use sp_runtime::{Justification, traits::{Block as BlockT, NumberFor}}; +use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; +use std::task::Poll; + +/// Wraps around a `Box>`. +pub struct SmartImportQueue { + queue: Box>, + pending_actions_tx: TracingUnboundedSender>, + pending_actions_rx: TracingUnboundedReceiver>, +} + +impl SmartImportQueue { + /// Import bunch of blocks. + pub fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec>) { + self.queue.import_blocks(origin, blocks); + } + + /// Import a block justification. + pub fn import_justification( + &mut self, + who: Origin, + hash: B::Hash, + number: NumberFor, + justification: Justification + ) { + self.queue.import_justification(who, hash, number, justification); + } + + /// Import block finality proof. + pub fn import_finality_proof( + &mut self, + who: Origin, + hash: B::Hash, + number: NumberFor, + finality_proof: Vec + ) { + self.queue.import_finality_proof(who, hash, number, finality_proof); + } + + /// Returns the next action reported by the import queue. + pub fn next_action<'a>(&'a mut self) -> impl Future> + 'a { + future::poll_fn(move |cx| { + // Try to empty the receiver first, so that the unbounded queue doesn't get filled up + // if `next_action` isn't called quickly enough. + if let Poll::Ready(Some(action)) = self.pending_actions_rx.poll_next_unpin(cx) { + return Poll::Ready(action) + } + + // If the receiver is empty, ask the import queue to push things on it. + self.queue.poll_actions(cx, &mut NetworkLink { + out: &mut self.pending_actions_tx, + }); + + if let Poll::Ready(Some(action)) = self.pending_actions_rx.poll_next_unpin(cx) { + Poll::Ready(action) + } else { + Poll::Pending + } + }) + } +} + +impl From>> for SmartImportQueue { + fn from(queue: Box>) -> Self { + let (pending_actions_tx, pending_actions_rx) = tracing_unbounded("smart-import-queue"); + SmartImportQueue { + queue, + pending_actions_tx, + pending_actions_rx, + } + } +} + +/// Action that the import queue has performed. +/// +/// This enum mimics the functions of the `Link` trait. +pub enum ImportQueueAction { + BlocksProcessed { + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)>, + }, + JustificationImported { + who: PeerId, + hash: B::Hash, + number: NumberFor, + success: bool, + }, + RequestJustification { + hash: B::Hash, + number: NumberFor, + }, + RequestFinalityProof { + hash: B::Hash, + number: NumberFor, + }, + FinalityProofImported { + who: PeerId, + request_block: (B::Hash, NumberFor), + finalization_result: Result<(B::Hash, NumberFor), ()>, + }, +} + +// Implementation of `import_queue::Link` trait that sends actions on the sender inside of it. +struct NetworkLink<'a, B: BlockT> { + out: &'a mut TracingUnboundedSender>, +} + +impl<'a, B: BlockT> Link for NetworkLink<'a, B> { + fn blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<(Result>, BlockImportError>, B::Hash)> + ) { + let _ = self.out.unbounded_send(ImportQueueAction::BlocksProcessed { + imported, + count, + results, + }); + } + + fn justification_imported(&mut self, who: PeerId, hash: &B::Hash, number: NumberFor, success: bool) { + let _ = self.out.unbounded_send(ImportQueueAction::JustificationImported { + who, + hash: hash.clone(), + number, + success, + }); + } + + fn request_justification(&mut self, hash: &B::Hash, number: NumberFor) { + let _ = self.out.unbounded_send(ImportQueueAction::RequestJustification { + hash: hash.clone(), + number, + }); + } + + fn request_finality_proof(&mut self, hash: &B::Hash, number: NumberFor) { + let _ = self.out.unbounded_send(ImportQueueAction::RequestFinalityProof { + hash: hash.clone(), + number, + }); + } + + fn finality_proof_imported( + &mut self, + who: PeerId, + request_block: (B::Hash, NumberFor), + finalization_result: Result<(B::Hash, NumberFor), ()>, + ) { + let _ = self.out.unbounded_send(ImportQueueAction::FinalityProofImported { + who, + request_block, + finalization_result, + }); + } +} diff --git a/client/network/src/service/tests.rs b/client/network/src/service/tests.rs index a60b32efb414e..f131a53b69885 100644 --- a/client/network/src/service/tests.rs +++ b/client/network/src/service/tests.rs @@ -87,7 +87,7 @@ fn build_test_full_node(config: config::NetworkConfiguration) None, )); - let worker = NetworkWorker::new(config::Params { + let mut worker = NetworkWorker::new(config::Params { role: config::Role::Full, executor: None, network_config: config, @@ -109,8 +109,9 @@ fn build_test_full_node(config: config::NetworkConfiguration) let event_stream = service.event_stream("test"); async_std::task::spawn(async move { - futures::pin_mut!(worker); - let _ = worker.await; + loop { + worker.next_action().await; + } }); (service, event_stream) diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index 7b070f8041220..d276d82fc2d67 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -787,8 +787,12 @@ pub trait TestNetFactory: Sized { self.mut_peers(|peers| { for peer in peers { trace!(target: "sync", "-- Polling {}", peer.id()); - if let Poll::Ready(res) = Pin::new(&mut peer.network).poll(cx) { - res.unwrap(); + loop { + let net_poll_future = peer.network.next_action(); + futures::pin_mut!(net_poll_future); + if let Poll::Pending = net_poll_future.poll(cx) { + break; + } } trace!(target: "sync", "-- Polling complete {}", peer.id()); diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 90e644481f76e..8d6c06cd9d6f0 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -830,7 +830,7 @@ ServiceBuilder< let has_bootnodes = !network_params.network_config.boot_nodes.is_empty(); let network_mut = sc_network::NetworkWorker::new(network_params)?; let network = network_mut.service().clone(); - let network_status_sinks = Arc::new(Mutex::new(status_sinks::StatusSinks::new())); + let network_status_sinks = Arc::new(status_sinks::StatusSinks::new()); let offchain_storage = backend.offchain_storage(); let offchain_workers = match (config.offchain_worker, offchain_storage.clone()) { @@ -967,7 +967,7 @@ ServiceBuilder< let transaction_pool_ = transaction_pool.clone(); let client_ = client.clone(); let (state_tx, state_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat1"); - network_status_sinks.lock().push(std::time::Duration::from_millis(5000), state_tx); + network_status_sinks.push(std::time::Duration::from_millis(5000), state_tx); let tel_task = state_rx.for_each(move |(net_status, _)| { let info = client_.usage_info(); metrics_service.tick( @@ -985,7 +985,7 @@ ServiceBuilder< // Periodically send the network state to the telemetry. let (netstat_tx, netstat_rx) = tracing_unbounded::<(NetworkStatus<_>, NetworkState)>("mpsc_netstat2"); - network_status_sinks.lock().push(std::time::Duration::from_secs(30), netstat_tx); + network_status_sinks.push(std::time::Duration::from_secs(30), netstat_tx); let tel_task_2 = netstat_rx.for_each(move |(_, network_state)| { telemetry!( SUBSTRATE_INFO; diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 97481fcc251b3..a78a20373d94d 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -17,6 +17,7 @@ //! Substrate service. Starts a thread that spins up the network, client, and extrinsic pool. //! Manages communication between them. +#![recursion_limit = "1024"] #![warn(missing_docs)] pub mod config; @@ -97,7 +98,7 @@ pub struct Service { network: Arc, /// Sinks to propagate network status updates. /// For each element, every time the `Interval` fires we push an element on the sender. - network_status_sinks: Arc>>, + network_status_sinks: Arc>, transaction_pool: Arc, /// Send a signal when a spawned essential task has concluded. The next time /// the service future is polled it should complete with an error. @@ -270,7 +271,7 @@ where fn network_status(&self, interval: Duration) -> TracingUnboundedReceiver<(NetworkStatus, NetworkState)> { let (sink, stream) = tracing_unbounded("mpsc_network_status"); - self.network_status_sinks.lock().push(interval, sink); + self.network_status_sinks.push(interval, sink); stream } @@ -326,7 +327,7 @@ impl Spawn for /// Builds a never-ending future that continuously polls the network. /// /// The `status_sink` contain a list of senders to send a periodic network status to. -fn build_network_future< +async fn build_network_future< B: BlockT, C: sc_client::BlockchainEvents, H: sc_network::ExHashT @@ -334,124 +335,135 @@ fn build_network_future< role: Role, mut network: sc_network::NetworkWorker, client: Arc, - status_sinks: Arc, NetworkState)>>>, + status_sinks: Arc, NetworkState)>>, mut rpc_rx: TracingUnboundedReceiver>, should_have_peers: bool, announce_imported_blocks: bool, -) -> impl Future { +) { let mut imported_blocks_stream = client.import_notification_stream().fuse(); let mut finality_notification_stream = client.finality_notification_stream().fuse(); - futures::future::poll_fn(move |cx| { + loop { let before_polling = Instant::now(); - // We poll `imported_blocks_stream`. - while let Poll::Ready(Some(notification)) = Pin::new(&mut imported_blocks_stream).poll_next(cx) { - network.on_block_imported(notification.header, notification.is_new_best); - - if announce_imported_blocks { - network.service().announce_block(notification.hash, Vec::new()); + // This future does the same as `finality_notification_stream.next()`, except that if + // multiple events are ready on the stream, only the last one is returned. + let last_finality_notification = futures::future::poll_fn(|cx| { + let mut last = None; + while let Poll::Ready(Some(item)) = Pin::new(&mut finality_notification_stream).poll_next(cx) { + last = Some(item); } - } - - // We poll `finality_notification_stream`, but we only take the last event. - let mut last = None; - while let Poll::Ready(Some(item)) = Pin::new(&mut finality_notification_stream).poll_next(cx) { - last = Some(item); - } - if let Some(notification) = last { - network.on_block_finalized(notification.hash, notification.header); - } + if let Some(last) = last { + Poll::Ready(last) + } else { + Poll::Pending + } + }); - // Poll the RPC requests and answer them. - while let Poll::Ready(Some(request)) = Pin::new(&mut rpc_rx).poll_next(cx) { - match request { - sc_rpc::system::Request::Health(sender) => { - let _ = sender.send(sc_rpc::system::Health { - peers: network.peers_debug_info().len(), - is_syncing: network.service().is_major_syncing(), - should_have_peers, - }); - }, - sc_rpc::system::Request::LocalPeerId(sender) => { - let _ = sender.send(network.local_peer_id().to_base58()); - }, - sc_rpc::system::Request::LocalListenAddresses(sender) => { - let peer_id = network.local_peer_id().clone().into(); - let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id); - let addresses = network.listen_addresses() - .map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string()) - .collect(); - let _ = sender.send(addresses); - }, - sc_rpc::system::Request::Peers(sender) => { - let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| - sc_rpc::system::PeerInfo { - peer_id: peer_id.to_base58(), - roles: format!("{:?}", p.roles), - protocol_version: p.protocol_version, - best_hash: p.best_hash, - best_number: p.best_number, - } - ).collect()); - } - sc_rpc::system::Request::NetworkState(sender) => { - if let Some(network_state) = serde_json::to_value(&network.network_state()).ok() { - let _ = sender.send(network_state); + futures::select!{ + notification = imported_blocks_stream.next() => { + // Report blocks imported by the client to the network. + if let Some(notification) = notification { + network.on_block_imported(notification.header, notification.is_new_best); + if announce_imported_blocks { + network.service().announce_block(notification.hash, Vec::new()); } } - sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender) => { - let x = network.add_reserved_peer(peer_addr) - .map_err(sc_rpc::system::error::Error::MalformattedPeerArg); - let _ = sender.send(x); - } - sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender) => { - let _ = match peer_id.parse::() { - Ok(peer_id) => { - network.remove_reserved_peer(peer_id); - sender.send(Ok(())) + }, + + notification = last_finality_notification.fuse() => { + network.on_block_finalized(notification.hash, notification.header); + }, + + request = rpc_rx.next() => { + // Answers incoming RPC requests. + match request { + None => {}, // RPC requests source is closed; continue running the network. + Some(sc_rpc::system::Request::Health(sender)) => { + let _ = sender.send(sc_rpc::system::Health { + peers: network.peers_debug_info().len(), + is_syncing: network.service().is_major_syncing(), + should_have_peers, + }); + }, + Some(sc_rpc::system::Request::LocalPeerId(sender)) => { + let _ = sender.send(network.local_peer_id().to_base58()); + }, + Some(sc_rpc::system::Request::LocalListenAddresses(sender)) => { + let peer_id = network.local_peer_id().clone().into(); + let p2p_proto_suffix = sc_network::multiaddr::Protocol::P2p(peer_id); + let addresses = network.listen_addresses() + .map(|addr| addr.clone().with(p2p_proto_suffix.clone()).to_string()) + .collect(); + let _ = sender.send(addresses); + }, + Some(sc_rpc::system::Request::Peers(sender)) => { + let _ = sender.send(network.peers_debug_info().into_iter().map(|(peer_id, p)| + sc_rpc::system::PeerInfo { + peer_id: peer_id.to_base58(), + roles: format!("{:?}", p.roles), + protocol_version: p.protocol_version, + best_hash: p.best_hash, + best_number: p.best_number, + } + ).collect()); + } + Some(sc_rpc::system::Request::NetworkState(sender)) => { + if let Some(network_state) = serde_json::to_value(&network.network_state()).ok() { + let _ = sender.send(network_state); } - Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg( - e.to_string(), - ))), - }; - } - sc_rpc::system::Request::NodeRoles(sender) => { - use sc_rpc::system::NodeRole; + } + Some(sc_rpc::system::Request::NetworkAddReservedPeer(peer_addr, sender)) => { + let x = network.add_reserved_peer(peer_addr) + .map_err(sc_rpc::system::error::Error::MalformattedPeerArg); + let _ = sender.send(x); + } + Some(sc_rpc::system::Request::NetworkRemoveReservedPeer(peer_id, sender)) => { + let _ = match peer_id.parse::() { + Ok(peer_id) => { + network.remove_reserved_peer(peer_id); + sender.send(Ok(())) + } + Err(e) => sender.send(Err(sc_rpc::system::error::Error::MalformattedPeerArg( + e.to_string(), + ))), + }; + } + Some(sc_rpc::system::Request::NodeRoles(sender)) => { + use sc_rpc::system::NodeRole; - let node_role = match role { - Role::Authority { .. } => NodeRole::Authority, - Role::Light => NodeRole::LightClient, - Role::Full => NodeRole::Full, - Role::Sentry { .. } => NodeRole::Sentry, - }; + let node_role = match role { + Role::Authority { .. } => NodeRole::Authority, + Role::Light => NodeRole::LightClient, + Role::Full => NodeRole::Full, + Role::Sentry { .. } => NodeRole::Sentry, + }; - let _ = sender.send(vec![node_role]); - } - }; - } + let _ = sender.send(vec![node_role]); + } + }; + }, - // Interval report for the external API. - status_sinks.lock().poll(cx, || { - let status = NetworkStatus { - sync_state: network.sync_state(), - best_seen_block: network.best_seen_block(), - num_sync_peers: network.num_sync_peers(), - num_connected_peers: network.num_connected_peers(), - num_active_peers: network.num_active_peers(), - average_download_per_sec: network.average_download_per_sec(), - average_upload_per_sec: network.average_upload_per_sec(), - }; - let state = network.network_state(); - (status, state) - }); + _ = network.next_action().fuse() => { + // The network worker has done something. Nothing special to do, but could be + // used in the future to perform actions in response of things that happened on + // the network. + } - // Main network polling. - if let Poll::Ready(Ok(())) = Pin::new(&mut network).poll(cx).map_err(|err| { - warn!(target: "service", "Error in network: {:?}", err); - }) { - return Poll::Ready(()); - } + ready_sink = status_sinks.next().fuse() => { + let status = NetworkStatus { + sync_state: network.sync_state(), + best_seen_block: network.best_seen_block(), + num_sync_peers: network.num_sync_peers(), + num_connected_peers: network.num_connected_peers(), + num_active_peers: network.num_active_peers(), + average_download_per_sec: network.average_download_per_sec(), + average_upload_per_sec: network.average_upload_per_sec(), + }; + let state = network.network_state(); + ready_sink.send((status, state)); + } + }; // Now some diagnostic for performances. let polling_dur = before_polling.elapsed(); @@ -461,9 +473,7 @@ fn build_network_future< "⚠️ Polling the network future took {:?}", polling_dur ); - - Poll::Pending - }) + } } /// Overview status of the network. diff --git a/client/service/src/status_sinks.rs b/client/service/src/status_sinks.rs index 4b1dce52f9a31..509f62e83f751 100644 --- a/client/service/src/status_sinks.rs +++ b/client/service/src/status_sinks.rs @@ -14,19 +14,27 @@ // You should have received a copy of the GNU General Public License // along with Substrate. If not, see . -use futures::{Stream, stream::futures_unordered::FuturesUnordered}; -use std::time::Duration; -use std::pin::Pin; -use std::task::{Poll, Context}; +use futures::{prelude::*, lock::Mutex}; use futures_timer::Delay; -use sp_utils::mpsc::TracingUnboundedSender; +use std::{pin::Pin, task::{Poll, Context}, time::Duration}; +use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; /// Holds a list of `UnboundedSender`s, each associated with a certain time period. Every time the /// period elapses, we push an element on the sender. /// /// Senders are removed only when they are closed. pub struct StatusSinks { - entries: FuturesUnordered>, + /// Should only be locked by `next`. + inner: Mutex>, + /// Sending side of `Inner::entries_rx`. + entries_tx: TracingUnboundedSender>, +} + +struct Inner { + /// The actual entries of the list. + entries: stream::FuturesUnordered>, + /// Receives new entries and puts them in `entries`. + entries_rx: TracingUnboundedReceiver>, } struct YieldAfter { @@ -38,56 +46,114 @@ struct YieldAfter { impl StatusSinks { /// Builds a new empty collection. pub fn new() -> StatusSinks { + let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries"); + StatusSinks { - entries: FuturesUnordered::new(), + inner: Mutex::new(Inner { + entries: stream::FuturesUnordered::new(), + entries_rx, + }), + entries_tx, } } /// Adds a sender to the collection. /// /// The `interval` is the time period between two pushes on the sender. - pub fn push(&mut self, interval: Duration, sender: TracingUnboundedSender) { - self.entries.push(YieldAfter { + pub fn push(&self, interval: Duration, sender: TracingUnboundedSender) { + let _ = self.entries_tx.unbounded_send(YieldAfter { delay: Delay::new(interval), interval, sender: Some(sender), - }) + }); } - /// Processes all the senders. If any sender is ready, calls the `status_grab` function and - /// pushes what it returns to the sender. + /// Waits until one of the sinks is ready, then returns an object that can be used to send + /// an element on said sink. /// - /// This function doesn't return anything, but it should be treated as if it implicitly - /// returns `Poll::Pending`. In particular, it should be called again when the task - /// is waken up. - /// - /// # Panic - /// - /// Panics if not called within the context of a task. - pub fn poll(&mut self, cx: &mut Context, mut status_grab: impl FnMut() -> T) { + /// If the object isn't used to send an element, the slot is skipped. + pub async fn next(&self) -> ReadySinkEvent<'_, T> { + // This is only ever locked by `next`, which means that one `next` at a time can run. + let mut inner = self.inner.lock().await; + let inner = &mut *inner; + loop { - match Pin::new(&mut self.entries).poll_next(cx) { - Poll::Ready(Some((sender, interval))) => { - let status = status_grab(); - if sender.unbounded_send(status).is_ok() { - self.entries.push(YieldAfter { - // Note that since there's a small delay between the moment a task is - // waken up and the moment it is polled, the period is actually not - // `interval` but `interval + `. We ignore this problem in - // practice. - delay: Delay::new(interval), - interval, - sender: Some(sender), - }); + // Future that produces the next ready entry in `entries`, or doesn't produce anything if + // the list is empty. + let next_ready_entry = { + let entries = &mut inner.entries; + async move { + if let Some(v) = entries.next().await { + v + } else { + loop { + futures::pending!() + } + } + } + }; + + futures::select!{ + new_entry = inner.entries_rx.next() => { + if let Some(new_entry) = new_entry { + inner.entries.push(new_entry); + } + }, + (sender, interval) = next_ready_entry.fuse() => { + return ReadySinkEvent { + sinks: self, + sender: Some(sender), + interval, } } - Poll::Ready(None) | - Poll::Pending => break, } } } } +/// One of the sinks is ready. +#[must_use] +pub struct ReadySinkEvent<'a, T> { + sinks: &'a StatusSinks, + sender: Option>, + interval: Duration, +} + +impl<'a, T> ReadySinkEvent<'a, T> { + /// Sends an element on the sender. + pub fn send(mut self, element: T) { + if let Some(sender) = self.sender.take() { + if sender.unbounded_send(element).is_ok() { + let _ = self.sinks.entries_tx.unbounded_send(YieldAfter { + // Note that since there's a small delay between the moment a task is + // waken up and the moment it is polled, the period is actually not + // `interval` but `interval + `. We ignore this problem in + // practice. + delay: Delay::new(self.interval), + interval: self.interval, + sender: Some(sender), + }); + } + } + } +} + +impl<'a, T> Drop for ReadySinkEvent<'a, T> { + fn drop(&mut self) { + if let Some(sender) = self.sender.take() { + if sender.is_closed() { + return; + } + + let _ = self.sinks.entries_tx.unbounded_send(YieldAfter { + delay: Delay::new(self.interval), + interval: self.interval, + sender: Some(sender), + }); + } + } +} + impl futures::Future for YieldAfter { type Output = (TracingUnboundedSender, Duration); @@ -111,14 +177,13 @@ mod tests { use futures::prelude::*; use futures::channel::mpsc; use std::time::Duration; - use std::task::Poll; #[test] fn works() { // We're not testing that the `StatusSink` properly enforces an order in the intervals, as // this easily causes test failures on busy CPUs. - let mut status_sinks = StatusSinks::new(); + let status_sinks = StatusSinks::new(); let (tx, rx) = mpsc::unbounded(); status_sinks.push(Duration::from_millis(100), tx); @@ -126,9 +191,12 @@ mod tests { let mut val_order = 5; futures::executor::block_on(futures::future::select( - futures::future::poll_fn(move |cx| { - status_sinks.poll(cx, || { val_order += 1; val_order }); - Poll::<()>::Pending + Box::pin(async move { + loop { + let ev = status_sinks.next().await; + val_order += 1; + ev.send(val_order); + } }), Box::pin(async { let items: Vec = rx.take(3).collect().await; From 841aafe743e8564553713c4142bac07c33d0c458 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 21 Apr 2020 15:28:08 +0200 Subject: [PATCH 2/2] Update client/service/src/status_sinks.rs Co-Authored-By: Max Inden --- client/service/src/status_sinks.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/service/src/status_sinks.rs b/client/service/src/status_sinks.rs index 509f62e83f751..9ab2b2cae5dbf 100644 --- a/client/service/src/status_sinks.rs +++ b/client/service/src/status_sinks.rs @@ -126,7 +126,7 @@ impl<'a, T> ReadySinkEvent<'a, T> { if sender.unbounded_send(element).is_ok() { let _ = self.sinks.entries_tx.unbounded_send(YieldAfter { // Note that since there's a small delay between the moment a task is - // waken up and the moment it is polled, the period is actually not + // woken up and the moment it is polled, the period is actually not // `interval` but `interval + `. We ignore this problem in // practice. delay: Delay::new(self.interval),