diff --git a/node/core/parachains-inherent/src/lib.rs b/node/core/parachains-inherent/src/lib.rs index e8ffb573658c..23bd250a2f3c 100644 --- a/node/core/parachains-inherent/src/lib.rs +++ b/node/core/parachains-inherent/src/lib.rs @@ -57,9 +57,12 @@ impl ParachainsInherentDataProvider { receiver.await.map_err(|_| Error::ClosedChannelAwaitingActivation)?.map_err(Error::Subsystem)?; let (sender, receiver) = futures::channel::oneshot::channel(); - overseer.send_msg(AllMessages::Provisioner( - ProvisionerMessage::RequestInherentData(parent, sender), - )).await; + overseer.send_msg( + AllMessages::Provisioner( + ProvisionerMessage::RequestInherentData(parent, sender), + ), + std::any::type_name::(), + ).await; receiver.await.map_err(|_| Error::ClosedChannelAwaitingInherentData) }; diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 7e6f831dd70a..1a019b9c77dc 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -401,7 +401,10 @@ impl From> for BlockInfo { enum Event { BlockImported(BlockInfo), BlockFinalized(BlockInfo), - MsgToSubsystem(AllMessages), + MsgToSubsystem { + msg: AllMessages, + origin: &'static str, + }, ExternalRequest(ExternalRequest), Stop, } @@ -452,8 +455,16 @@ impl OverseerHandler { } /// Send some message to one of the `Subsystem`s. - pub async fn send_msg(&mut self, msg: impl Into) { - self.send_and_log_error(Event::MsgToSubsystem(msg.into())).await + pub async fn send_msg(&mut self, msg: impl Into, origin: &'static str) { + self.send_and_log_error(Event::MsgToSubsystem { + msg: msg.into(), + origin, + }).await + } + + /// Same as `send_msg`, but with no origin. Used for tests. + pub async fn send_msg_anon(&mut self, msg: impl Into) { + self.send_msg(msg, "").await } /// Inform the `Overseer` that some block was finalized. @@ -801,7 +812,8 @@ pub struct OverseerSubsystemSender { #[async_trait::async_trait] impl SubsystemSender for OverseerSubsystemSender { async fn send_message(&mut self, msg: AllMessages) { - self.channels.send_and_log_error(self.signals_received.load(), msg).await; + let needed_signals = self.signals_received.load(); + self.channels.send_and_log_error(needed_signals, msg).await; } async fn send_messages(&mut self, msgs: T) @@ -891,12 +903,18 @@ impl SubsystemContext for OverseerSubsystemContext { loop { // If we have a message pending an overseer signal, we only poll for signals // in the meantime. + let signals_received = self.signals_received.load(); if let Some((needs_signals_received, msg)) = self.pending_incoming.take() { - if needs_signals_received <= self.signals_received.load() { + if needs_signals_received <= signals_received { return Ok(FromOverseer::Communication { msg }); } else { self.pending_incoming = Some((needs_signals_received, msg)); - + tracing::debug!( + target: LOG_TARGET, + subsystem = std::any::type_name::(), + diff = needs_signals_received - signals_received, + "waiting for a signal", + ); // wait for next signal. let signal = self.signals.next().await .ok_or(SubsystemError::Context( @@ -911,7 +929,6 @@ impl SubsystemContext for OverseerSubsystemContext { let mut await_message = self.messages.next().fuse(); let mut await_signal = self.signals.next().fuse(); - let signals_received = self.signals_received.load(); let pending_incoming = &mut self.pending_incoming; // Otherwise, wait for the next signal or incoming message. @@ -989,7 +1006,7 @@ impl OverseenSubsystem { /// Send a message to the wrapped subsystem. /// /// If the inner `instance` is `None`, nothing is happening. - async fn send_message(&mut self, msg: M) -> SubsystemResult<()> { + async fn send_message(&mut self, msg: M, origin: &'static str) -> SubsystemResult<()> { const MESSAGE_TIMEOUT: Duration = Duration::from_secs(10); if let Some(ref mut instance) = self.instance { @@ -999,7 +1016,12 @@ impl OverseenSubsystem { }).timeout(MESSAGE_TIMEOUT).await { None => { - tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); + tracing::error!( + target: LOG_TARGET, + %origin, + "Subsystem {} appears unresponsive.", + instance.name, + ); Err(SubsystemError::SubsystemStalled(instance.name)) } Some(res) => res.map_err(Into::into), @@ -1016,9 +1038,15 @@ impl OverseenSubsystem { const SIGNAL_TIMEOUT: Duration = Duration::from_secs(10); if let Some(ref mut instance) = self.instance { - match instance.tx_signal.send(signal).timeout(SIGNAL_TIMEOUT).await { + match instance.tx_signal.send(signal.clone()).timeout(SIGNAL_TIMEOUT).await { None => { - tracing::error!(target: LOG_TARGET, "Subsystem {} appears unresponsive.", instance.name); + tracing::error!( + target: LOG_TARGET, + ?signal, + received = instance.signals_received, + "Subsystem {} appears unresponsive.", + instance.name, + ); Err(SubsystemError::SubsystemStalled(instance.name)) } Some(res) => { @@ -1909,8 +1937,8 @@ where }; match msg { - Event::MsgToSubsystem(msg) => { - self.route_message(msg.into()).await?; + Event::MsgToSubsystem { msg, origin } => { + self.route_message(msg.into(), origin).await?; } Event::Stop => { self.stop().await; @@ -2045,59 +2073,63 @@ where Ok(()) } - async fn route_message(&mut self, msg: AllMessages) -> SubsystemResult<()> { + async fn route_message( + &mut self, + msg: AllMessages, + origin: &'static str, + ) -> SubsystemResult<()> { self.metrics.on_message_relayed(); match msg { AllMessages::CandidateValidation(msg) => { - self.subsystems.candidate_validation.send_message(msg).await?; + self.subsystems.candidate_validation.send_message(msg, origin).await?; }, AllMessages::CandidateBacking(msg) => { - self.subsystems.candidate_backing.send_message(msg).await?; + self.subsystems.candidate_backing.send_message(msg, origin).await?; }, AllMessages::StatementDistribution(msg) => { - self.subsystems.statement_distribution.send_message(msg).await?; + self.subsystems.statement_distribution.send_message(msg, origin).await?; }, AllMessages::AvailabilityDistribution(msg) => { - self.subsystems.availability_distribution.send_message(msg).await?; + self.subsystems.availability_distribution.send_message(msg, origin).await?; }, AllMessages::AvailabilityRecovery(msg) => { - self.subsystems.availability_recovery.send_message(msg).await?; + self.subsystems.availability_recovery.send_message(msg, origin).await?; }, AllMessages::BitfieldDistribution(msg) => { - self.subsystems.bitfield_distribution.send_message(msg).await?; + self.subsystems.bitfield_distribution.send_message(msg, origin).await?; }, AllMessages::BitfieldSigning(msg) => { - self.subsystems.bitfield_signing.send_message(msg).await?; + self.subsystems.bitfield_signing.send_message(msg, origin).await?; }, AllMessages::Provisioner(msg) => { - self.subsystems.provisioner.send_message(msg).await?; + self.subsystems.provisioner.send_message(msg, origin).await?; }, AllMessages::RuntimeApi(msg) => { - self.subsystems.runtime_api.send_message(msg).await?; + self.subsystems.runtime_api.send_message(msg, origin).await?; }, AllMessages::AvailabilityStore(msg) => { - self.subsystems.availability_store.send_message(msg).await?; + self.subsystems.availability_store.send_message(msg, origin).await?; }, AllMessages::NetworkBridge(msg) => { - self.subsystems.network_bridge.send_message(msg).await?; + self.subsystems.network_bridge.send_message(msg, origin).await?; }, AllMessages::ChainApi(msg) => { - self.subsystems.chain_api.send_message(msg).await?; + self.subsystems.chain_api.send_message(msg, origin).await?; }, AllMessages::CollationGeneration(msg) => { - self.subsystems.collation_generation.send_message(msg).await?; + self.subsystems.collation_generation.send_message(msg, origin).await?; }, AllMessages::CollatorProtocol(msg) => { - self.subsystems.collator_protocol.send_message(msg).await?; + self.subsystems.collator_protocol.send_message(msg, origin).await?; }, AllMessages::ApprovalDistribution(msg) => { - self.subsystems.approval_distribution.send_message(msg).await?; + self.subsystems.approval_distribution.send_message(msg, origin).await?; }, AllMessages::ApprovalVoting(msg) => { - self.subsystems.approval_voting.send_message(msg).await?; + self.subsystems.approval_voting.send_message(msg, origin).await?; }, AllMessages::GossipSupport(msg) => { - self.subsystems.gossip_support.send_message(msg).await?; + self.subsystems.gossip_support.send_message(msg, origin).await?; }, AllMessages::DisputeCoordinator(_) => {} AllMessages::DisputeParticipation(_) => {} diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index c33a01dabfec..888ebea0e127 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -228,7 +228,7 @@ fn overseer_metrics_work() { handler.block_imported(second_block).await; handler.block_imported(third_block).await; - handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; + handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; handler.stop().await; select! { @@ -984,22 +984,22 @@ fn overseer_all_subsystems_receive_signals_and_messages() { // send a msg to each subsystem // except for BitfieldSigning and GossipSupport as the messages are not instantiable - handler.send_msg(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; - handler.send_msg(AllMessages::CandidateBacking(test_candidate_backing_msg())).await; - handler.send_msg(AllMessages::CollationGeneration(test_collator_generation_msg())).await; - handler.send_msg(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await; - handler.send_msg(AllMessages::StatementDistribution(test_statement_distribution_msg())).await; - handler.send_msg(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await; - // handler.send_msg(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await; - // handler.send_msg(AllMessages::GossipSupport(test_bitfield_signing_msg())).await; - handler.send_msg(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await; - handler.send_msg(AllMessages::Provisioner(test_provisioner_msg())).await; - handler.send_msg(AllMessages::RuntimeApi(test_runtime_api_msg())).await; - handler.send_msg(AllMessages::AvailabilityStore(test_availability_store_msg())).await; - handler.send_msg(AllMessages::NetworkBridge(test_network_bridge_msg())).await; - handler.send_msg(AllMessages::ChainApi(test_chain_api_msg())).await; - handler.send_msg(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await; - handler.send_msg(AllMessages::ApprovalVoting(test_approval_voting_msg())).await; + handler.send_msg_anon(AllMessages::CandidateValidation(test_candidate_validation_msg())).await; + handler.send_msg_anon(AllMessages::CandidateBacking(test_candidate_backing_msg())).await; + handler.send_msg_anon(AllMessages::CollationGeneration(test_collator_generation_msg())).await; + handler.send_msg_anon(AllMessages::CollatorProtocol(test_collator_protocol_msg())).await; + handler.send_msg_anon(AllMessages::StatementDistribution(test_statement_distribution_msg())).await; + handler.send_msg_anon(AllMessages::AvailabilityRecovery(test_availability_recovery_msg())).await; + // handler.send_msg_anon(AllMessages::BitfieldSigning(test_bitfield_signing_msg())).await; + // handler.send_msg_anon(AllMessages::GossipSupport(test_bitfield_signing_msg())).await; + handler.send_msg_anon(AllMessages::BitfieldDistribution(test_bitfield_distribution_msg())).await; + handler.send_msg_anon(AllMessages::Provisioner(test_provisioner_msg())).await; + handler.send_msg_anon(AllMessages::RuntimeApi(test_runtime_api_msg())).await; + handler.send_msg_anon(AllMessages::AvailabilityStore(test_availability_store_msg())).await; + handler.send_msg_anon(AllMessages::NetworkBridge(test_network_bridge_msg())).await; + handler.send_msg_anon(AllMessages::ChainApi(test_chain_api_msg())).await; + handler.send_msg_anon(AllMessages::ApprovalDistribution(test_approval_distribution_msg())).await; + handler.send_msg_anon(AllMessages::ApprovalVoting(test_approval_voting_msg())).await; // Wait until all subsystems have received. Otherwise the messages might race against // the conclude signal. diff --git a/node/service/src/grandpa_support.rs b/node/service/src/grandpa_support.rs index bb45709ebbc7..407bedf671c0 100644 --- a/node/service/src/grandpa_support.rs +++ b/node/service/src/grandpa_support.rs @@ -130,11 +130,14 @@ impl grandpa::VotingRule for ApprovalCheckingVotingRule Box::pin(async move { let (tx, rx) = oneshot::channel(); let approval_checking_subsystem_vote = { - overseer.send_msg(ApprovalVotingMessage::ApprovedAncestor( - best_hash, - base_number, - tx, - )).await; + overseer.send_msg( + ApprovalVotingMessage::ApprovedAncestor( + best_hash, + base_number, + tx, + ), + std::any::type_name::(), + ).await; rx.await.ok().and_then(|v| v) }; diff --git a/node/service/src/relay_chain_selection.rs b/node/service/src/relay_chain_selection.rs index 1ca1c5c5f29c..cca90e6ae1a9 100644 --- a/node/service/src/relay_chain_selection.rs +++ b/node/service/src/relay_chain_selection.rs @@ -216,7 +216,10 @@ impl SelectChain for SelectRelayChain self.overseer .clone() - .send_msg(ChainSelectionMessage::Leaves(tx)).await; + .send_msg( + ChainSelectionMessage::Leaves(tx), + std::any::type_name::(), + ).await; rx.await .map_err(Error::OverseerDisconnected) @@ -264,7 +267,10 @@ impl SelectChain for SelectRelayChain let subchain_head = { let (tx, rx) = oneshot::channel(); - overseer.send_msg(ChainSelectionMessage::BestLeafContaining(target_hash, tx)).await; + overseer.send_msg( + ChainSelectionMessage::BestLeafContaining(target_hash, tx), + std::any::type_name::(), + ).await; let best = rx.await .map_err(Error::OverseerDisconnected) @@ -318,11 +324,14 @@ impl SelectChain for SelectRelayChain let (subchain_head, subchain_number) = { let (tx, rx) = oneshot::channel(); - overseer.send_msg(ApprovalVotingMessage::ApprovedAncestor( - subchain_head, - target_number, - tx, - )).await; + overseer.send_msg( + ApprovalVotingMessage::ApprovedAncestor( + subchain_head, + target_number, + tx, + ), + std::any::type_name::(), + ).await; match rx.await .map_err(Error::OverseerDisconnected) diff --git a/node/subsystem-test-helpers/src/lib.rs b/node/subsystem-test-helpers/src/lib.rs index c09d0ed000af..cd3550678c11 100644 --- a/node/subsystem-test-helpers/src/lib.rs +++ b/node/subsystem-test-helpers/src/lib.rs @@ -385,7 +385,7 @@ mod tests { spawner.spawn("overseer", overseer.run().then(|_| async { () }).boxed()); - block_on(handler.send_msg(CollatorProtocolMessage::CollateOn(Default::default()))); + block_on(handler.send_msg_anon(CollatorProtocolMessage::CollateOn(Default::default()))); assert!(matches!(block_on(rx.into_future()).0.unwrap(), CollatorProtocolMessage::CollateOn(_))); } } diff --git a/node/test/service/src/lib.rs b/node/test/service/src/lib.rs index d7425cde3d41..121b7306623b 100644 --- a/node/test/service/src/lib.rs +++ b/node/test/service/src/lib.rs @@ -348,11 +348,11 @@ impl PolkadotTestNode { }; self.overseer_handler - .send_msg(CollationGenerationMessage::Initialize(config)) + .send_msg(CollationGenerationMessage::Initialize(config), "Collator") .await; self.overseer_handler - .send_msg(CollatorProtocolMessage::CollateOn(para_id)) + .send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator") .await; } } diff --git a/parachain/test-parachains/adder/collator/src/main.rs b/parachain/test-parachains/adder/collator/src/main.rs index 2b6b219f0d3c..83a65c447cbd 100644 --- a/parachain/test-parachains/adder/collator/src/main.rs +++ b/parachain/test-parachains/adder/collator/src/main.rs @@ -88,11 +88,11 @@ fn main() -> Result<()> { para_id, }; overseer_handler - .send_msg(CollationGenerationMessage::Initialize(config)) + .send_msg(CollationGenerationMessage::Initialize(config), "Collator") .await; overseer_handler - .send_msg(CollatorProtocolMessage::CollateOn(para_id)) + .send_msg(CollatorProtocolMessage::CollateOn(para_id), "Collator") .await; Ok(full_node.task_manager)