diff --git a/Cargo.lock b/Cargo.lock index 6448c77d2452..054b03578f45 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23819,6 +23819,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tracing", ] [[package]] diff --git a/prdoc/pr_6897.prdoc b/prdoc/pr_6897.prdoc new file mode 100644 index 000000000000..38fd9417f48a --- /dev/null +++ b/prdoc/pr_6897.prdoc @@ -0,0 +1,7 @@ +title: 'Tracing Log for fork-aware transaction pool' +doc: +- audience: Node Dev + description: Replacement of log crate with tracing crate for better logging. +crates: +- name: sc-transaction-pool + bump: minor \ No newline at end of file diff --git a/substrate/client/transaction-pool/Cargo.toml b/substrate/client/transaction-pool/Cargo.toml index 72586b984920..26bbf58f1522 100644 --- a/substrate/client/transaction-pool/Cargo.toml +++ b/substrate/client/transaction-pool/Cargo.toml @@ -40,6 +40,7 @@ sp-transaction-pool = { workspace = true, default-features = true } thiserror = { workspace = true } tokio = { workspace = true, default-features = true, features = ["macros", "time"] } tokio-stream = { workspace = true } +tracing = { workspace = true, default-features = true } [dev-dependencies] array-bytes = { workspace = true, default-features = true } diff --git a/substrate/client/transaction-pool/src/common/mod.rs b/substrate/client/transaction-pool/src/common/mod.rs index fb280e8780ad..446a5c2ec022 100644 --- a/substrate/client/transaction-pool/src/common/mod.rs +++ b/substrate/client/transaction-pool/src/common/mod.rs @@ -25,6 +25,7 @@ pub(crate) mod log_xt; pub(crate) mod metrics; #[cfg(test)] pub(crate) mod tests; +pub(crate) mod tracing_log_xt; use futures::StreamExt; use std::sync::Arc; diff --git a/substrate/client/transaction-pool/src/common/tracing_log_xt.rs b/substrate/client/transaction-pool/src/common/tracing_log_xt.rs new file mode 100644 index 000000000000..4d1c5d09cc7a --- /dev/null +++ b/substrate/client/transaction-pool/src/common/tracing_log_xt.rs @@ -0,0 +1,69 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! Utility for logging transaction collections with tracing crate. + +/// Logs every transaction from given `tx_collection` with given level. +macro_rules! log_xt { + (data: hash, target: $target:expr, $level:expr, $tx_collection:expr, $text_with_format:expr) => { + for tx in $tx_collection { + tracing::event!( + $level, + target = $target, + tx_hash = format!("{:?}", tx), + $text_with_format, + ); + } + }; + (data: hash, target: $target:expr, $level:expr, $tx_collection:expr, $text_with_format:expr, $($arg:expr),*) => { + for tx in $tx_collection { + tracing::event!( + $level, + target = $target, + tx_hash = format!("{:?}", tx), + $text_with_format, + $($arg),* + ); + } + }; + (data: tuple, target: $target:expr, $level:expr, $tx_collection:expr, $text_with_format:expr) => { + for tx in $tx_collection { + tracing::event!( + $level, + target = $target, + tx_hash = format!("{:?}", tx.0), + $text_with_format, + tx.1 + ); + } + }; +} +macro_rules! log_xt_trace { + (data: $datatype:ident, target: $target:expr, $($arg:tt)+) => { + $crate::common::tracing_log_xt::log_xt!(data: $datatype, target: $target, tracing::Level::TRACE, $($arg)+); + }; + (target: $target:expr, $tx_collection:expr, $text_with_format:expr) => { + $crate::common::tracing_log_xt::log_xt!(data: hash, target: $target, tracing::Level::TRACE, $tx_collection, $text_with_format); + }; + (target: $target:expr, $tx_collection:expr, $text_with_format:expr, $($arg:expr)*) => { + $crate::common::tracing_log_xt::log_xt!(data: hash, target: $target, tracing::Level::TRACE, $tx_collection, $text_with_format, $($arg)*); + }; +} + +pub(crate) use log_xt; +pub(crate) use log_xt_trace; diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs index bf61558b00b0..3588645344ba 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs @@ -22,13 +22,12 @@ //! by any view are detected and properly notified. use crate::{ - common::log_xt::log_xt_trace, + common::tracing_log_xt::log_xt_trace, fork_aware_txpool::stream_map_util::next_event, graph::{self, BlockHash, ExtrinsicHash}, LOG_TARGET, }; use futures::stream::StreamExt; -use log::{debug, trace}; use sc_transaction_pool_api::TransactionStatus; use sc_utils::mpsc; use sp_runtime::traits::Block as BlockT; @@ -41,6 +40,7 @@ use std::{ pin::Pin, }; use tokio_stream::StreamMap; +use tracing::{debug, trace}; /// Represents a transaction that was removed from the transaction pool, including the reason of its /// removal. @@ -225,7 +225,7 @@ where log_xt_trace!( target: LOG_TARGET, xts.clone(), - "[{:?}] dropped_watcher: finalized xt removed" + "dropped_watcher: finalized xt removed" ); xts.iter().for_each(|xt| { self.ready_transaction_views.remove(xt); @@ -279,7 +279,7 @@ where return Some(DroppedTransaction::new_enforced_by_limts(tx_hash)) } } else { - debug!("[{:?}] dropped_watcher: removing (non-tracked) tx", tx_hash); + debug!(target: LOG_TARGET, ?tx_hash, "dropped_watcher: removing (non-tracked) tx"); return Some(DroppedTransaction::new_enforced_by_limts(tx_hash)) } }, diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index 766045718252..c609ee2da22e 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -29,7 +29,7 @@ use super::{ }; use crate::{ api::FullChainApi, - common::log_xt::log_xt_trace, + common::tracing_log_xt::log_xt_trace, enactment_state::{EnactmentAction, EnactmentState}, fork_aware_txpool::{ dropped_watcher::{DroppedReason, DroppedTransaction}, @@ -70,6 +70,7 @@ use std::{ time::Instant, }; use tokio::select; +use tracing::{debug, info, trace, warn}; /// Fork aware transaction pool task, that needs to be polled. pub type ForkAwareTxPoolTask = Pin + Send>>; @@ -105,10 +106,10 @@ where /// /// `ready_iterator` is a closure that generates the result data to be sent to the pollers. fn trigger(&mut self, at: Block::Hash, ready_iterator: impl Fn() -> T) { - log::trace!(target: LOG_TARGET, "fatp::trigger {at:?} pending keys: {:?}", self.pollers.keys()); + trace!(target: LOG_TARGET, ?at, keys = ?self.pollers.keys(), "fatp::trigger"); let Some(pollers) = self.pollers.remove(&at) else { return }; pollers.into_iter().for_each(|p| { - log::debug!(target: LOG_TARGET, "trigger ready signal at block {}", at); + debug!(target: LOG_TARGET, "trigger ready signal at block {}", at); let _ = p.send(ready_iterator()); }); } @@ -265,11 +266,16 @@ where ) { loop { let Some(dropped) = dropped_stream.next().await else { - log::debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated..."); + debug!(target: LOG_TARGET, "fatp::dropped_monitor_task: terminated..."); break; }; - let dropped_tx_hash = dropped.tx_hash; - log::trace!(target: LOG_TARGET, "[{:?}] fatp::dropped notification {:?}, removing", dropped_tx_hash,dropped.reason); + let tx_hash = dropped.tx_hash; + trace!( + target: LOG_TARGET, + ?tx_hash, + reason = ?dropped.reason, + "fatp::dropped notification, removing" + ); match dropped.reason { DroppedReason::Usurped(new_tx_hash) => { if let Some(new_tx) = mempool.get_by_hash(new_tx_hash) { @@ -277,24 +283,24 @@ where .replace_transaction( new_tx.source(), new_tx.tx(), - dropped_tx_hash, + tx_hash, new_tx.is_watched(), ) .await; } else { - log::trace!( - target:LOG_TARGET, - "error: dropped_monitor_task: no entry in mempool for new transaction {:?}", - new_tx_hash, + trace!( + target: LOG_TARGET, + tx_hash = ?new_tx_hash, + "error: dropped_monitor_task: no entry in mempool for new transaction" ); } }, DroppedReason::LimitsEnforced => {}, }; - mempool.remove_transaction(&dropped_tx_hash); + mempool.remove_transaction(&tx_hash); view_store.listener.transaction_dropped(dropped); - import_notification_sink.clean_notified_items(&[dropped_tx_hash]); + import_notification_sink.clean_notified_items(&[tx_hash]); } } @@ -433,7 +439,11 @@ where pub async fn ready_at_light(&self, at: Block::Hash) -> ReadyIteratorFor { let start = Instant::now(); let api = self.api.clone(); - log::trace!(target: LOG_TARGET, "fatp::ready_at_light {:?}", at); + trace!( + target: LOG_TARGET, + ?at, + "fatp::ready_at_light" + ); let Ok(block_number) = self.api.resolve_block_number(at) else { return Box::new(std::iter::empty()) @@ -465,8 +475,12 @@ where let extrinsics = api .block_body(h.hash) .await - .unwrap_or_else(|e| { - log::warn!(target: LOG_TARGET, "Compute ready light transactions: error request: {}", e); + .unwrap_or_else(|error| { + warn!( + target: LOG_TARGET, + %error, + "Compute ready light transactions: error request" + ); None }) .unwrap_or_default() @@ -487,19 +501,25 @@ where let _ = tmp_view.pool.validated_pool().prune_tags(tags); let after_count = tmp_view.pool.validated_pool().status().ready; - log::debug!(target: LOG_TARGET, - "fatp::ready_at_light {} from {} before: {} to be removed: {} after: {} took:{:?}", - at, - best_view.at.hash, + debug!( + target: LOG_TARGET, + ?at, + best_view_hash = ?best_view.at.hash, before_count, - all_extrinsics.len(), + to_be_removed = all_extrinsics.len(), after_count, - start.elapsed() + duration = ?start.elapsed(), + "fatp::ready_at_light" ); Box::new(tmp_view.pool.validated_pool().ready()) } else { let empty: ReadyIteratorFor = Box::new(std::iter::empty()); - log::debug!(target: LOG_TARGET, "fatp::ready_at_light {} -> empty, took:{:?}", at, start.elapsed()); + debug!( + target: LOG_TARGET, + ?at, + duration = ?start.elapsed(), + "fatp::ready_at_light -> empty" + ); empty } } @@ -519,8 +539,12 @@ where at: Block::Hash, timeout: std::time::Duration, ) -> ReadyIteratorFor { - log::debug!(target: LOG_TARGET, "fatp::ready_at_with_timeout at {:?} allowed delay: {:?}", at, timeout); - + debug!( + target: LOG_TARGET, + ?at, + ?timeout, + "fatp::ready_at_with_timeout" + ); let timeout = futures_timer::Delay::new(timeout); let (view_already_exists, ready_at) = self.ready_at_internal(at); @@ -532,10 +556,10 @@ where select! { ready = ready_at => Some(ready), _ = timeout => { - log::warn!(target: LOG_TARGET, - "Timeout fired waiting for transaction pool at block: ({:?}). \ - Proceeding with production.", - at, + warn!( + target: LOG_TARGET, + ?at, + "Timeout fired waiting for transaction pool at block. Proceeding with production." ); None } @@ -555,7 +579,12 @@ where let mut ready_poll = self.ready_poll.lock(); if let Some((view, inactive)) = self.view_store.get_view_at(at, true) { - log::debug!(target: LOG_TARGET, "fatp::ready_at_internal {at:?} (inactive:{inactive:?})"); + debug!( + target: LOG_TARGET, + ?at, + ?inactive, + "fatp::ready_at_internal" + ); let iterator: ReadyIteratorFor = Box::new(view.pool.validated_pool().ready()); return (true, async move { iterator }.boxed()); } @@ -563,15 +592,21 @@ where let pending = ready_poll .add(at) .map(|received| { - received.unwrap_or_else(|e| { - log::warn!(target: LOG_TARGET, "Error receiving ready-set iterator: {:?}", e); + received.unwrap_or_else(|error| { + warn!( + target: LOG_TARGET, + %error, + "Error receiving ready-set iterator" + ); Box::new(std::iter::empty()) }) }) .boxed(); - log::debug!(target: LOG_TARGET, - "fatp::ready_at_internal {at:?} pending keys: {:?}", - ready_poll.pollers.keys() + debug!( + target: LOG_TARGET, + ?at, + pending_keys = ?ready_poll.pollers.keys(), + "fatp::ready_at_internal" ); (false, pending) } @@ -649,8 +684,13 @@ where xts: Vec>, ) -> Result, Self::Error>>, Self::Error> { let view_store = self.view_store.clone(); - log::debug!(target: LOG_TARGET, "fatp::submit_at count:{} views:{}", xts.len(), self.active_views_count()); - log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "[{:?}] fatp::submit_at"); + debug!( + target: LOG_TARGET, + count = xts.len(), + active_views_count = self.active_views_count(), + "fatp::submit_at" + ); + log_xt_trace!(target: LOG_TARGET, xts.iter().map(|xt| self.tx_hash(xt)), "fatp::submit_at"); let xts = xts.into_iter().map(Arc::from).collect::>(); let mempool_results = self.mempool.extend_unwatched(source, &xts); @@ -741,7 +781,12 @@ where source: TransactionSource, xt: TransactionFor, ) -> Result, Self::Error> { - log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_one views:{}", self.tx_hash(&xt), self.active_views_count()); + trace!( + target: LOG_TARGET, + tx_hash = ?self.tx_hash(&xt), + active_views_count = self.active_views_count(), + "fatp::submit_one" + ); match self.submit_at(_at, source, vec![xt]).await { Ok(mut v) => v.pop().expect("There is exactly one element in result of submit_at. qed."), @@ -759,7 +804,12 @@ where source: TransactionSource, xt: TransactionFor, ) -> Result>>, Self::Error> { - log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count()); + trace!( + target: LOG_TARGET, + tx_hash = ?self.tx_hash(&xt), + views = self.active_views_count(), + "fatp::submit_and_watch" + ); let xt = Arc::from(xt); let InsertionInfo { hash: xt_hash, source: timed_source, .. } = @@ -791,8 +841,7 @@ where // useful for verification for debugging purposes). fn remove_invalid(&self, hashes: &[TxHash]) -> Vec> { if !hashes.is_empty() { - log::debug!(target: LOG_TARGET, "fatp::remove_invalid {}", hashes.len()); - log_xt_trace!(target:LOG_TARGET, hashes, "[{:?}] fatp::remove_invalid"); + log_xt_trace!(target:LOG_TARGET, hashes, "fatp::remove_invalid"); self.metrics .report(|metrics| metrics.removed_invalid_txs.inc_by(hashes.len() as _)); } @@ -842,11 +891,12 @@ where let result = most_recent_view .map(|block_hash| self.view_store.ready_transaction(block_hash, tx_hash)) .flatten(); - log::trace!( + trace!( target: LOG_TARGET, - "[{tx_hash:?}] ready_transaction: {} {:?}", - result.is_some(), - most_recent_view + ?tx_hash, + is_ready = result.is_some(), + ?most_recent_view, + "ready_transaction" ); result } @@ -902,7 +952,11 @@ where _at: Block::Hash, xt: sc_transaction_pool_api::LocalTransactionFor, ) -> Result { - log::debug!(target: LOG_TARGET, "fatp::submit_local views:{}", self.active_views_count()); + debug!( + target: LOG_TARGET, + active_views_count = self.active_views_count(), + "fatp::submit_local" + ); let xt = Arc::from(xt); let result = @@ -947,20 +1001,20 @@ where let hash_and_number = match tree_route.last() { Some(hash_and_number) => hash_and_number, None => { - log::warn!( + warn!( target: LOG_TARGET, - "Skipping ChainEvent - no last block in tree route {:?}", - tree_route, + ?tree_route, + "Skipping ChainEvent - no last block in tree route" ); return }, }; if self.has_view(&hash_and_number.hash) { - log::trace!( + trace!( target: LOG_TARGET, - "view already exists for block: {:?}", - hash_and_number, + ?hash_and_number, + "view already exists for block" ); return } @@ -995,12 +1049,12 @@ where at: &HashAndNumber, tree_route: &TreeRoute, ) -> Option>> { - log::debug!( + debug!( target: LOG_TARGET, - "build_new_view: for: {:?} from: {:?} tree_route: {:?}", - at, - origin_view.as_ref().map(|v| v.at.clone()), - tree_route + ?at, + origin_view_at = ?origin_view.as_ref().map(|v| v.at.clone()), + ?tree_route, + "build_new_view" ); let mut view = if let Some(origin_view) = origin_view { let mut view = View::new_from_other(&origin_view, at); @@ -1009,7 +1063,11 @@ where } view } else { - log::debug!(target: LOG_TARGET, "creating non-cloned view: for: {at:?}"); + debug!( + target: LOG_TARGET, + ?at, + "creating non-cloned view" + ); View::new( self.api.clone(), at.clone(), @@ -1037,21 +1095,35 @@ where // sync the transactions statuses and referencing views in all the listeners with newly // cloned view. view.pool.validated_pool().retrigger_notifications(); - log::debug!(target: LOG_TARGET, "register_listeners: at {at:?} took {duration:?}"); + debug!( + target: LOG_TARGET, + ?at, + ?duration, + "register_listeners" + ); // 2. Handle transactions from the tree route. Pruning transactions from the view first // will make some space for mempool transactions in case we are at the view's limits. let start = Instant::now(); self.update_view_with_fork(&view, tree_route, at.clone()).await; let duration = start.elapsed(); - log::debug!(target: LOG_TARGET, "update_view_with_fork: at {at:?} took {duration:?}"); + debug!( + target: LOG_TARGET, + ?at, + ?duration, + "update_view_with_fork" + ); // 3. Finally, submit transactions from the mempool. let start = Instant::now(); self.update_view_with_mempool(&mut view, watched_xts).await; let duration = start.elapsed(); - log::debug!(target: LOG_TARGET, "update_view_with_mempool: at {at:?} took {duration:?}"); - + debug!( + target: LOG_TARGET, + ?at, + ?duration, + "update_view_with_mempool" + ); let view = Arc::from(view); self.view_store.insert_new_view(view.clone(), tree_route).await; Some(view) @@ -1074,8 +1146,12 @@ where for h in tree_route.enacted().iter().rev() { api.block_body(h.hash) .await - .unwrap_or_else(|e| { - log::warn!(target: LOG_TARGET, "Compute ready light transactions: error request: {}", e); + .unwrap_or_else(|error| { + warn!( + target: LOG_TARGET, + %error, + "Compute ready light transactions: error request" + ); None }) .unwrap_or_default() @@ -1086,12 +1162,13 @@ where }); } - log::debug!(target: LOG_TARGET, - "fatp::extrinsics_included_since_finalized {} from {} count: {} took:{:?}", - at, - recent_finalized_block, - all_extrinsics.len(), - start.elapsed() + debug!( + target: LOG_TARGET, + ?at, + ?recent_finalized_block, + extrinsics_count = all_extrinsics.len(), + duration = ?start.elapsed(), + "fatp::extrinsics_included_since_finalized" ); all_extrinsics } @@ -1106,12 +1183,12 @@ where &self, view: &View, ) -> Vec<(ExtrinsicHash, Arc>)> { - log::debug!( + debug!( target: LOG_TARGET, - "register_listeners: {:?} xts:{:?} v:{}", - view.at, - self.mempool.unwatched_and_watched_count(), - self.active_views_count() + view_at = ?view.at, + xts_count = ?self.mempool.unwatched_and_watched_count(), + active_views_count = self.active_views_count(), + "register_listeners" ); //todo [#5495]: maybe we don't need to register listener in view? We could use @@ -1124,7 +1201,12 @@ where let watcher = view.create_watcher(tx_hash); let at = view.at.clone(); async move { - log::trace!(target: LOG_TARGET, "[{:?}] adding watcher {:?}", tx_hash, at.hash); + trace!( + target: LOG_TARGET, + ?tx_hash, + at = ?at.hash, + "adding watcher" + ); self.view_store.listener.add_view_watcher_for_tx( tx_hash, at.hash, @@ -1156,12 +1238,12 @@ where view: &View, watched_xts: Vec<(ExtrinsicHash, Arc>)>, ) { - log::debug!( + debug!( target: LOG_TARGET, - "update_view_with_mempool: {:?} xts:{:?} v:{}", - view.at, - self.mempool.unwatched_and_watched_count(), - self.active_views_count() + view_at = ?view.at, + xts_count = ?self.mempool.unwatched_and_watched_count(), + active_views_count = self.active_views_count(), + "update_view_with_mempool" ); let included_xts = self.extrinsics_included_since_finalized(view.at.hash).await; @@ -1187,12 +1269,12 @@ where let submitted_count = watched_results.len(); - log::debug!( + debug!( target: LOG_TARGET, - "update_view_with_mempool: at {:?} submitted {}/{}", - view.at.hash, + view_at_hash = ?view.at.hash, submitted_count, - self.mempool.len() + mempool_len = self.mempool.len(), + "update_view_with_mempool" ); self.metrics @@ -1220,7 +1302,12 @@ where tree_route: &TreeRoute, hash_and_number: HashAndNumber, ) { - log::debug!(target: LOG_TARGET, "update_view_with_fork tree_route: {:?} {tree_route:?}", view.at); + debug!( + target: LOG_TARGET, + ?tree_route, + at = ?view.at, + "update_view_with_fork" + ); let api = self.api.clone(); // We keep track of everything we prune so that later we won't add @@ -1249,8 +1336,12 @@ where let block_transactions = api .block_body(hash) .await - .unwrap_or_else(|e| { - log::warn!(target: LOG_TARGET, "Failed to fetch block body: {}", e); + .unwrap_or_else(|error| { + warn!( + target: LOG_TARGET, + %error, + "Failed to fetch block body" + ); None }) .unwrap_or_default() @@ -1269,11 +1360,11 @@ where resubmitted_to_report += 1; if !contains { - log::trace!( + trace!( target: LOG_TARGET, - "[{:?}]: Resubmitting from retracted block {:?}", - tx_hash, - hash, + ?tx_hash, + ?hash, + "Resubmitting from retracted block" ); } !contains @@ -1307,8 +1398,13 @@ where /// - purging finalized transactions from the mempool and triggering mempool revalidation, async fn handle_finalized(&self, finalized_hash: Block::Hash, tree_route: &[Block::Hash]) { let finalized_number = self.api.block_id_to_number(&BlockId::Hash(finalized_hash)); - log::debug!(target: LOG_TARGET, "handle_finalized {finalized_number:?} tree_route: {tree_route:?} views_count:{}", self.active_views_count()); - + debug!( + target: LOG_TARGET, + ?finalized_number, + ?tree_route, + active_views_count = self.active_views_count(), + "handle_finalized" + ); let finalized_xts = self.view_store.handle_finalized(finalized_hash, tree_route).await; self.mempool.purge_finalized_transactions(&finalized_xts).await; @@ -1325,11 +1421,19 @@ where ) .await; } else { - log::trace!(target: LOG_TARGET, "purge_transactions_later skipped, cannot find block number {finalized_number:?}"); + trace!( + target: LOG_TARGET, + ?finalized_number, + "purge_transactions_later skipped, cannot find block number" + ); } self.ready_poll.lock().remove_cancelled(); - log::trace!(target: LOG_TARGET, "handle_finalized after views_count:{:?}", self.active_views_count()); + trace!( + target: LOG_TARGET, + active_views_count = self.active_views_count(), + "handle_finalized after" + ); } /// Computes a hash of the provided transaction @@ -1443,7 +1547,11 @@ where /// Executes the maintainance for the given chain event. async fn maintain(&self, event: ChainEvent) { let start = Instant::now(); - log::debug!(target: LOG_TARGET, "processing event: {event:?}"); + debug!( + target: LOG_TARGET, + ?event, + "processing event" + ); self.view_store.finish_background_revalidations().await; @@ -1467,8 +1575,12 @@ where .update(&event, &compute_tree_route, &block_id_to_number); match result { - Err(msg) => { - log::trace!(target: LOG_TARGET, "enactment_state::update error: {msg}"); + Err(error) => { + trace!( + target: LOG_TARGET, + %error, + "enactment_state::update error" + ); self.enactment_state.lock().force_update(&event); }, Ok(EnactmentAction::Skip) => return, @@ -1494,23 +1606,25 @@ where ChainEvent::Finalized { hash, ref tree_route } => { self.handle_finalized(hash, tree_route).await; - log::trace!( + trace!( target: LOG_TARGET, - "on-finalized enacted: {tree_route:?}, previously finalized: \ - {prev_finalized_block:?}", + ?tree_route, + ?prev_finalized_block, + "on-finalized enacted" ); }, } - let maintain_duration = start.elapsed(); + let duration = start.elapsed(); - log::info!( + info!( target: LOG_TARGET, - "maintain: txs:{:?} views:[{};{:?}] event:{event:?} took:{:?}", - self.mempool_len(), - self.active_views_count(), - self.views_stats(), - maintain_duration + mempool_len = format!("{:?}", self.mempool_len()), + active_views_count = self.active_views_count(), + views_stats = ?self.views_stats(), + ?event, + ?duration, + "maintain" ); self.metrics.report(|metrics| { @@ -1521,7 +1635,7 @@ where watched.try_into().map(|v| metrics.watched_txs.set(v)), unwatched.try_into().map(|v| metrics.unwatched_txs.set(v)), ); - metrics.maintain_duration.observe(maintain_duration.as_secs_f64()); + metrics.maintain_duration.observe(duration.as_secs_f64()); }); } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs index f9a41673bb8f..1ca287fa2371 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/import_notification_sink.rs @@ -27,7 +27,6 @@ use futures::{ stream::StreamExt, Future, FutureExt, }; -use log::trace; use parking_lot::RwLock; use sc_utils::mpsc; use std::{ @@ -38,6 +37,7 @@ use std::{ sync::Arc, }; use tokio_stream::StreamMap; +use tracing::trace; /// A type alias for a pinned, boxed stream of items of type `I`. /// This alias is particularly useful for defining the types of the incoming streams from various @@ -109,14 +109,22 @@ where cmd = ctx.command_receiver.next() => { match cmd? { Command::AddView(key,stream) => { - trace!(target: LOG_TARGET,"Command::AddView {key:?}"); + trace!( + target: LOG_TARGET, + ?key, + "Command::AddView" + ); ctx.stream_map.insert(key,stream); }, } }, Some(event) = next_event(&mut ctx.stream_map) => { - trace!(target: LOG_TARGET, "import_notification_sink: select_next_some -> {:?}", event); + trace!( + target: LOG_TARGET, + ?event, + "import_notification_sink: select_next_some" + ); return Some((event.1, ctx)); } } @@ -179,9 +187,17 @@ where async move { if already_notified_items.write().insert(event.clone()) { external_sinks.write().retain_mut(|sink| { - trace!(target: LOG_TARGET, "[{:?}] import_sink_worker sending out imported", event); - if let Err(e) = sink.try_send(event.clone()) { - trace!(target: LOG_TARGET, "import_sink_worker sending message failed: {e}"); + trace!( + target: LOG_TARGET, + ?event, + "import_sink_worker sending out imported" + ); + if let Err(error) = sink.try_send(event.clone()) { + trace!( + target: LOG_TARGET, + %error, + "import_sink_worker sending message failed" + ); false } else { true @@ -199,12 +215,17 @@ where /// The new view's stream is added to the internal aggregated stream context by sending command /// to its `command_receiver`. pub fn add_view(&self, key: K, view: StreamOf) { - let _ = self - .controller - .unbounded_send(Command::AddView(key.clone(), view)) - .map_err(|e| { - trace!(target: LOG_TARGET, "add_view {key:?} send message failed: {e}"); - }); + let _ = + self.controller + .unbounded_send(Command::AddView(key.clone(), view)) + .map_err(|error| { + trace!( + target: LOG_TARGET, + ?key, + %error, + "add_view send message failed" + ); + }); } /// Creates and returns a new external stream of ready transactions hashes notifications. diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs index a00234a99808..a513559a7cd5 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/multi_view_listener.rs @@ -26,7 +26,6 @@ use crate::{ LOG_TARGET, }; use futures::StreamExt; -use log::{debug, trace}; use sc_transaction_pool_api::{TransactionStatus, TransactionStatusStream, TxIndex}; use sc_utils::mpsc; use sp_runtime::traits::Block as BlockT; @@ -35,6 +34,7 @@ use std::{ pin::Pin, }; use tokio_stream::StreamMap; +use tracing::{debug, trace}; use super::dropped_watcher::{DroppedReason, DroppedTransaction}; @@ -182,9 +182,14 @@ where hash: BlockHash, ) -> Option, BlockHash>> { trace!( - target: LOG_TARGET, "[{:?}] mvl handle event from {hash:?}: {status:?} views:{:?}", self.tx_hash, - self.status_stream_map.keys().collect::>() + target: LOG_TARGET, + tx_hash = ?self.tx_hash, + ?hash, + ?status, + views = ?self.status_stream_map.keys().collect::>(), + "mvl handle event" ); + match status { TransactionStatus::Future => { self.views_keeping_tx_valid.insert(hash); @@ -238,8 +243,9 @@ where ); trace!( target: LOG_TARGET, - "[{:?}] got invalidate_transaction: views:{:?}", self.tx_hash, - self.status_stream_map.keys().collect::>() + tx_hash = ?self.tx_hash, + views = ?self.status_stream_map.keys().collect::>(), + "got invalidate_transaction" ); if self.views_keeping_tx_valid.is_disjoint(&keys) { self.terminate = true; @@ -261,7 +267,13 @@ where /// the stream map. fn add_stream(&mut self, block_hash: BlockHash, stream: TxStatusStream) { self.status_stream_map.insert(block_hash, stream); - trace!(target: LOG_TARGET, "[{:?}] AddView view: {:?} views:{:?}", self.tx_hash, block_hash, self.status_stream_map.keys().collect::>()); + trace!( + target: LOG_TARGET, + tx_hash = ?self.tx_hash, + ?block_hash, + views = ?self.status_stream_map.keys().collect::>(), + "AddView view" + ); } /// Removes an existing transaction status stream. @@ -271,7 +283,13 @@ where fn remove_view(&mut self, block_hash: BlockHash) { self.status_stream_map.remove(&block_hash); self.views_keeping_tx_valid.remove(&block_hash); - trace!(target: LOG_TARGET, "[{:?}] RemoveView view: {:?} views:{:?}", self.tx_hash, block_hash, self.status_stream_map.keys().collect::>()); + trace!( + target: LOG_TARGET, + tx_hash = ?self.tx_hash, + ?block_hash, + views = ?self.status_stream_map.keys().collect::>(), + "RemoveView view" + ); } } @@ -306,8 +324,11 @@ where return None } - trace!(target: LOG_TARGET, "[{:?}] create_external_watcher_for_tx", tx_hash); - + trace!( + target: LOG_TARGET, + ?tx_hash, + "create_external_watcher_for_tx" + ); let (tx, rx) = mpsc::tracing_unbounded("txpool-multi-view-listener", 32); controllers.insert(tx_hash, tx); @@ -323,14 +344,21 @@ where biased; Some((view_hash, status)) = next_event(&mut ctx.status_stream_map) => { if let Some(new_status) = ctx.handle(status, view_hash) { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: {new_status:?}", ctx.tx_hash); - return Some((new_status, ctx)) + trace!( + target: LOG_TARGET, + tx_hash = ?ctx.tx_hash, + ?new_status, + "mvl sending out" + ); + return Some((new_status, ctx)) } }, cmd = ctx.command_receiver.next() => { - log::trace!(target: LOG_TARGET, "[{:?}] select::rx views:{:?}", - ctx.tx_hash, - ctx.status_stream_map.keys().collect::>() + trace!( + target: LOG_TARGET, + tx_hash = ?ctx.tx_hash, + views = ?ctx.status_stream_map.keys().collect::>(), + "select::rx" ); match cmd? { ControllerCommand::AddViewStream(h,stream) => { @@ -341,26 +369,52 @@ where }, ControllerCommand::TransactionInvalidated => { if ctx.handle_invalidate_transaction() { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Invalid", ctx.tx_hash); + trace!( + target: LOG_TARGET, + tx_hash = ?ctx.tx_hash, + status = "Invalid", + "mvl sending out" + ); return Some((TransactionStatus::Invalid, ctx)) } }, ControllerCommand::FinalizeTransaction(block, index) => { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Finalized", ctx.tx_hash); + trace!( + target: LOG_TARGET, + tx_hash = ?ctx.tx_hash, + status = "Finalized", + "mvl sending out" + ); ctx.terminate = true; return Some((TransactionStatus::Finalized((block, index)), ctx)) }, ControllerCommand::TransactionBroadcasted(peers) => { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Broadcasted", ctx.tx_hash); + trace!( + target: LOG_TARGET, + tx_hash = ?ctx.tx_hash, + status = "Broadcasted", + "mvl sending out" + ); return Some((TransactionStatus::Broadcast(peers), ctx)) }, ControllerCommand::TransactionDropped(DroppedReason::LimitsEnforced) => { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Dropped", ctx.tx_hash); + trace!( + target: LOG_TARGET, + tx_hash = ?ctx.tx_hash, + status = "Dropped", + "mvl sending out" + ); ctx.terminate = true; return Some((TransactionStatus::Dropped, ctx)) }, ControllerCommand::TransactionDropped(DroppedReason::Usurped(by)) => { - log::trace!(target: LOG_TARGET, "[{:?}] mvl sending out: Usurped({:?})", ctx.tx_hash, by); + trace!( + target: LOG_TARGET, + tx_hash = ?ctx.tx_hash, + status = "Usurped", + ?by, + "mvl sending out" + ); ctx.terminate = true; return Some((TransactionStatus::Usurped(by), ctx)) }, @@ -386,11 +440,16 @@ where let mut controllers = self.controllers.write(); if let Entry::Occupied(mut tx) = controllers.entry(tx_hash) { - if let Err(e) = tx + if let Err(error) = tx .get_mut() .unbounded_send(ControllerCommand::AddViewStream(block_hash, stream)) { - trace!(target: LOG_TARGET, "[{:?}] add_view_watcher_for_tx: send message failed: {:?}", tx_hash, e); + trace!( + target: LOG_TARGET, + ?tx_hash, + %error, + "add_view_watcher_for_tx: send message failed" + ); tx.remove(); } } @@ -404,9 +463,14 @@ where self.controllers.write().retain(|tx_hash, sender| { sender .unbounded_send(ControllerCommand::RemoveViewStream(block_hash)) - .map_err(|e| { - log::trace!(target: LOG_TARGET, "[{:?}] remove_view: send message failed: {:?}", tx_hash, e); - e + .map_err(|error| { + trace!( + target: LOG_TARGET, + ?tx_hash, + %error, + "remove_view: send message failed" + ); + error }) .is_ok() }); @@ -423,11 +487,20 @@ where let mut controllers = self.controllers.write(); invalid_hashes.iter().for_each(|tx_hash| { if let Entry::Occupied(mut tx) = controllers.entry(*tx_hash) { - trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction", tx_hash); - if let Err(e) = + trace!( + target: LOG_TARGET, + ?tx_hash, + "invalidate_transaction" + ); + if let Err(error) = tx.get_mut().unbounded_send(ControllerCommand::TransactionInvalidated) { - trace!(target: LOG_TARGET, "[{:?}] invalidate_transaction: send message failed: {:?}", tx_hash, e); + trace!( + target: LOG_TARGET, + ?tx_hash, + %error, + "invalidate_transaction: send message failed" + ); tx.remove(); } } @@ -445,9 +518,20 @@ where let mut controllers = self.controllers.write(); propagated.into_iter().for_each(|(tx_hash, peers)| { if let Entry::Occupied(mut tx) = controllers.entry(tx_hash) { - trace!(target: LOG_TARGET, "[{:?}] transaction_broadcasted", tx_hash); - if let Err(e) = tx.get_mut().unbounded_send(ControllerCommand::TransactionBroadcasted(peers)) { - trace!(target: LOG_TARGET, "[{:?}] transactions_broadcasted: send message failed: {:?}", tx_hash, e); + trace!( + target: LOG_TARGET, + ?tx_hash, + "transaction_broadcasted" + ); + if let Err(error) = + tx.get_mut().unbounded_send(ControllerCommand::TransactionBroadcasted(peers)) + { + trace!( + target: LOG_TARGET, + ?tx_hash, + %error, + "transactions_broadcasted: send message failed" + ); tx.remove(); } } @@ -460,12 +544,25 @@ where /// transaction prompting and external `Broadcasted` event. pub(crate) fn transaction_dropped(&self, dropped: DroppedTransaction>) { let mut controllers = self.controllers.write(); - debug!(target: LOG_TARGET, "mvl::transaction_dropped: {:?}", dropped); + debug!( + target: LOG_TARGET, + ?dropped, + "mvl::transaction_dropped" + ); if let Some(tx) = controllers.remove(&dropped.tx_hash) { let DroppedTransaction { tx_hash, reason } = dropped; - debug!(target: LOG_TARGET, "[{:?}] transaction_dropped", tx_hash); - if let Err(e) = tx.unbounded_send(ControllerCommand::TransactionDropped(reason)) { - trace!(target: LOG_TARGET, "[{:?}] transaction_dropped: send message failed: {:?}", tx_hash, e); + debug!( + target: LOG_TARGET, + ?tx_hash, + "transaction_dropped" + ); + if let Err(error) = tx.unbounded_send(ControllerCommand::TransactionDropped(reason)) { + trace!( + target: LOG_TARGET, + ?tx_hash, + %error, + "transaction_dropped: send message failed" + ); }; } } @@ -481,9 +578,20 @@ where ) { let mut controllers = self.controllers.write(); if let Some(tx) = controllers.remove(&tx_hash) { - trace!(target: LOG_TARGET, "[{:?}] finalize_transaction", tx_hash); - if let Err(e) = tx.unbounded_send(ControllerCommand::FinalizeTransaction(block, idx)) { - trace!(target: LOG_TARGET, "[{:?}] finalize_transaction: send message failed: {:?}", tx_hash, e); + trace!( + target: LOG_TARGET, + ?tx_hash, + "finalize_transaction" + ); + if let Err(error) = + tx.unbounded_send(ControllerCommand::FinalizeTransaction(block, idx)) + { + trace!( + target: LOG_TARGET, + ?tx_hash, + %error, + "finalize_transaction: send message failed" + ); } }; } @@ -525,7 +633,7 @@ mod tests { let out = handle.await.unwrap(); assert_eq!(out, events); - log::debug!("out: {:#?}", out); + debug!("out: {:#?}", out); } #[tokio::test] @@ -560,7 +668,7 @@ mod tests { let out = handle.await.unwrap(); - log::debug!("out: {:#?}", out); + debug!("out: {:#?}", out); assert!(out.iter().all(|v| vec![ TransactionStatus::Future, TransactionStatus::Ready, @@ -600,7 +708,7 @@ mod tests { listener.invalidate_transactions(&[tx_hash]); let out = handle.await.unwrap(); - log::debug!("out: {:#?}", out); + debug!("out: {:#?}", out); assert!(out.iter().all(|v| vec![ TransactionStatus::Future, TransactionStatus::Ready, @@ -654,8 +762,8 @@ mod tests { let out_tx0 = handle0.await.unwrap(); let out_tx1 = handle1.await.unwrap(); - log::debug!("out_tx0: {:#?}", out_tx0); - log::debug!("out_tx1: {:#?}", out_tx1); + debug!("out_tx0: {:#?}", out_tx0); + debug!("out_tx1: {:#?}", out_tx1); assert!(out_tx0.iter().all(|v| vec![ TransactionStatus::Future, TransactionStatus::Ready, @@ -707,7 +815,7 @@ mod tests { listener.invalidate_transactions(&[tx_hash]); let out = handle.await.unwrap(); - log::debug!("out: {:#?}", out); + debug!("out: {:#?}", out); // invalid shall not be sent assert!(out.iter().all(|v| vec![ @@ -740,7 +848,7 @@ mod tests { listener.add_view_watcher_for_tx(tx_hash, block_hash0, view_stream0.boxed()); let out = handle.await.unwrap(); - log::debug!("out: {:#?}", out); + debug!("out: {:#?}", out); assert!(out.iter().all(|v| vec![TransactionStatus::Invalid].contains(v))); assert_eq!(out.len(), 1); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs index e1c65a08a70b..0025d3e9f2d4 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/revalidation_worker.rs @@ -30,6 +30,7 @@ use sp_runtime::traits::Block as BlockT; use super::tx_mem_pool::TxMemPool; use futures::prelude::*; +use tracing::{trace, warn}; use super::view::{FinishRevalidationWorkerChannels, View}; @@ -131,18 +132,22 @@ where view: Arc>, finish_revalidation_worker_channels: FinishRevalidationWorkerChannels, ) { - log::trace!( + trace!( target: LOG_TARGET, - "revalidation_queue::revalidate_view: Sending view to revalidation queue at {}", - view.at.hash + view_at_hash = ?view.at.hash, + "revalidation_queue::revalidate_view: Sending view to revalidation queue" ); if let Some(ref to_worker) = self.background { - if let Err(e) = to_worker.unbounded_send(WorkerPayload::RevalidateView( + if let Err(error) = to_worker.unbounded_send(WorkerPayload::RevalidateView( view, finish_revalidation_worker_channels, )) { - log::warn!(target: LOG_TARGET, "revalidation_queue::revalidate_view: Failed to update background worker: {:?}", e); + warn!( + target: LOG_TARGET, + ?error, + "revalidation_queue::revalidate_view: Failed to update background worker" + ); } } else { view.revalidate(finish_revalidation_worker_channels).await @@ -161,17 +166,21 @@ where mempool: Arc>, finalized_hash: HashAndNumber, ) { - log::trace!( + trace!( target: LOG_TARGET, - "Sent mempool to revalidation queue at hash: {:?}", - finalized_hash + ?finalized_hash, + "Sent mempool to revalidation queue" ); if let Some(ref to_worker) = self.background { - if let Err(e) = + if let Err(error) = to_worker.unbounded_send(WorkerPayload::RevalidateMempool(mempool, finalized_hash)) { - log::warn!(target: LOG_TARGET, "Failed to update background worker: {:?}", e); + warn!( + target: LOG_TARGET, + ?error, + "Failed to update background worker" + ); } } else { mempool.revalidate(finalized_hash).await diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index c8a4d0c72dd3..440e77313d3e 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -26,33 +26,38 @@ //! it), while on other forks tx can be valid. Depending on which view is chosen to be cloned, //! such transaction could not be present in the newly created view. -use super::{ - metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener, - view_store::ViewStoreSubmitOutcome, -}; -use crate::{ - common::log_xt::log_xt_trace, - graph, - graph::{base_pool::TimedTransactionSource, tracked_map::Size, ExtrinsicFor, ExtrinsicHash}, - LOG_TARGET, +use std::{ + cmp::Ordering, + collections::HashMap, + sync::{ + atomic::{self, AtomicU64}, + Arc, + }, + time::Instant, }; + use futures::FutureExt; use itertools::Itertools; use parking_lot::RwLock; +use tracing::{debug, trace}; + use sc_transaction_pool_api::{TransactionPriority, TransactionSource}; use sp_blockchain::HashAndNumber; use sp_runtime::{ traits::Block as BlockT, transaction_validity::{InvalidTransaction, TransactionValidityError}, }; -use std::{ - cmp::Ordering, - collections::HashMap, - sync::{ - atomic::{self, AtomicU64}, - Arc, - }, - time::Instant, + +use crate::{ + common::tracing_log_xt::log_xt_trace, + graph, + graph::{base_pool::TimedTransactionSource, tracked_map::Size, ExtrinsicFor, ExtrinsicHash}, + LOG_TARGET, +}; + +use super::{ + metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener, + view_store::ViewStoreSubmitOutcome, }; /// The minimum interval between single transaction revalidations. Given in blocks. @@ -324,7 +329,7 @@ where /// exceed the maximum allowed transaction count. fn try_insert( &self, - hash: ExtrinsicHash, + tx_hash: ExtrinsicHash, tx: TxInMemPool, ) -> Result>, sc_transaction_pool_api::error::Error> { let mut transactions = self.transactions.write(); @@ -333,19 +338,23 @@ where let result = match ( self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes), - transactions.contains_key(&hash), + transactions.contains_key(&tx_hash), ) { (false, false) => { let source = tx.source(); - transactions.insert(hash, Arc::from(tx)); - Ok(InsertionInfo::new(hash, source)) + transactions.insert(tx_hash, Arc::from(tx)); + Ok(InsertionInfo::new(tx_hash, source)) }, (_, true) => - Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash))), + Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(tx_hash))), (true, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped), }; - log::trace!(target: LOG_TARGET, "[{:?}] mempool::try_insert: {:?}", hash, result.as_ref().map(|r| r.hash)); - + trace!( + target: LOG_TARGET, + ?tx_hash, + result_hash = ?result.as_ref().map(|r| r.hash), + "mempool::try_insert" + ); result } @@ -486,17 +495,21 @@ where /// Removes a transaction with given hash from the memory pool. pub(super) fn remove_transaction( &self, - hash: &ExtrinsicHash, + tx_hash: &ExtrinsicHash, ) -> Option>> { - log::debug!(target: LOG_TARGET, "[{hash:?}] mempool::remove_transaction"); - self.transactions.write().remove(hash) + debug!(target: LOG_TARGET, ?tx_hash, "mempool::remove_transaction"); + self.transactions.write().remove(tx_hash) } /// Revalidates a batch of transactions against the provided finalized block. /// /// Returns a vector of invalid transaction hashes. async fn revalidate_inner(&self, finalized_block: HashAndNumber) -> Vec { - log::trace!(target: LOG_TARGET, "mempool::revalidate at:{finalized_block:?}"); + trace!( + target: LOG_TARGET, + ?finalized_block, + "mempool::revalidate" + ); let start = Instant::now(); let (count, input) = { @@ -533,26 +546,31 @@ where let invalid_hashes = validation_results .into_iter() - .filter_map(|(xt_hash, validation_result)| match validation_result { + .filter_map(|(tx_hash, validation_result)| match validation_result { Ok(Ok(_)) | Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Future))) => None, Err(_) | Ok(Err(TransactionValidityError::Unknown(_))) | Ok(Err(TransactionValidityError::Invalid(_))) => { - log::trace!( + trace!( target: LOG_TARGET, - "[{:?}]: Purging: invalid: {:?}", - xt_hash, - validation_result, + ?tx_hash, + ?validation_result, + "Purging: invalid" ); - Some(xt_hash) + Some(tx_hash) }, }) .collect::>(); - log::debug!( + debug!( target: LOG_TARGET, - "mempool::revalidate: at {finalized_block:?} count:{input_len}/{count} invalid_hashes:{} took {duration:?}", invalid_hashes.len(), + ?finalized_block, + input_len, + count, + invalid_hashes = invalid_hashes.len(), + ?duration, + "mempool::revalidate" ); invalid_hashes @@ -563,8 +581,12 @@ where &self, finalized_xts: &Vec>, ) { - log::debug!(target: LOG_TARGET, "purge_finalized_transactions count:{:?}", finalized_xts.len()); - log_xt_trace!(target: LOG_TARGET, finalized_xts, "[{:?}] purged finalized transactions"); + debug!( + target: LOG_TARGET, + count = finalized_xts.len(), + "purge_finalized_transactions" + ); + log_xt_trace!(target: LOG_TARGET, finalized_xts, "purged finalized transactions"); let mut transactions = self.transactions.write(); finalized_xts.iter().for_each(|t| { transactions.remove(t); @@ -574,7 +596,11 @@ where /// Revalidates transactions in the memory pool against a given finalized block and removes /// invalid ones. pub(super) async fn revalidate(&self, finalized_block: HashAndNumber) { - log::trace!(target: LOG_TARGET, "purge_transactions at:{:?}", finalized_block); + trace!( + target: LOG_TARGET, + ?finalized_block, + "purge_transactions" + ); let invalid_hashes = self.revalidate_inner(finalized_block.clone()).await; self.metrics.report(|metrics| { @@ -602,10 +628,13 @@ where #[cfg(test)] mod tx_mem_pool_tests { - use super::*; - use crate::{common::tests::TestApi, graph::ChainApi}; use substrate_test_runtime::{AccountId, Extrinsic, ExtrinsicBuilder, Transfer, H256}; use substrate_test_runtime_client::Sr25519Keyring::*; + + use crate::{common::tests::TestApi, graph::ChainApi}; + + use super::*; + fn uxt(nonce: u64) -> Extrinsic { crate::common::tests::uxt(Transfer { from: Alice.into(), diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs index a35d68120a3a..6324997da67b 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs @@ -25,7 +25,7 @@ use super::metrics::MetricsLink as PrometheusMetrics; use crate::{ - common::log_xt::log_xt_trace, + common::tracing_log_xt::log_xt_trace, graph::{ self, base_pool::TimedTransactionSource, watcher::Watcher, ExtrinsicFor, ExtrinsicHash, IsValidator, ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor, @@ -40,6 +40,7 @@ use sp_runtime::{ SaturatedConversion, }; use std::{collections::HashMap, sync::Arc, time::Instant}; +use tracing::{debug, trace}; pub(super) struct RevalidationResult { revalidated: HashMap, ValidatedTransactionFor>, @@ -159,9 +160,9 @@ where &self, xts: impl IntoIterator)>, ) -> Vec, ChainApi::Error>> { - if log::log_enabled!(target: LOG_TARGET, log::Level::Trace) { + if tracing::enabled!(target: LOG_TARGET, tracing::Level::TRACE) { let xts = xts.into_iter().collect::>(); - log_xt_trace!(target: LOG_TARGET, xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0), "[{:?}] view::submit_many at:{}", self.at.hash); + log_xt_trace!(target: LOG_TARGET, xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0), "view::submit_many at:{}", self.at.hash); self.pool.submit_at(&self.at, xts).await } else { self.pool.submit_at(&self.at, xts).await @@ -174,7 +175,12 @@ where source: TimedTransactionSource, xt: ExtrinsicFor, ) -> Result, ChainApi::Error> { - log::trace!(target: LOG_TARGET, "[{:?}] view::submit_and_watch at:{}", self.pool.validated_pool().api().hash_and_length(&xt).0, self.at.hash); + trace!( + target: LOG_TARGET, + tx_hash = ?self.pool.validated_pool().api().hash_and_length(&xt).0, + view_at_hash = ?self.at.hash, + "view::submit_and_watch" + ); self.pool.submit_and_watch(&self.at, source, xt).await } @@ -183,9 +189,13 @@ where &self, xt: ExtrinsicFor, ) -> Result, ChainApi::Error> { - let (hash, length) = self.pool.validated_pool().api().hash_and_length(&xt); - log::trace!(target: LOG_TARGET, "[{:?}] view::submit_local at:{}", hash, self.at.hash); - + let (tx_hash, length) = self.pool.validated_pool().api().hash_and_length(&xt); + trace!( + target: LOG_TARGET, + ?tx_hash, + view_at_hash = ?self.at.hash, + "view::submit_local" + ); let validity = self .pool .validated_pool() @@ -212,7 +222,7 @@ where let validated = ValidatedTransaction::valid_at( block_number.saturated_into::(), - hash, + tx_hash, TimedTransactionSource::new_local(true), Arc::from(xt), length, @@ -258,7 +268,11 @@ where revalidation_result_tx, } = finish_revalidation_worker_channels; - log::trace!(target:LOG_TARGET, "view::revalidate: at {} starting", self.at.hash); + trace!( + target: LOG_TARGET, + at_hash = ?self.at.hash, + "view::revalidate: at starting" + ); let start = Instant::now(); let validated_pool = self.pool.validated_pool(); let api = validated_pool.api(); @@ -279,7 +293,11 @@ where let mut should_break = false; tokio::select! { _ = finish_revalidation_request_rx.recv() => { - log::trace!(target: LOG_TARGET, "view::revalidate: finish revalidation request received at {}.", self.at.hash); + trace!( + target: LOG_TARGET, + at_hash = ?self.at.hash, + "view::revalidate: finish revalidation request received" + ); break } _ = async { @@ -302,16 +320,15 @@ where self.metrics.report(|metrics| { metrics.view_revalidation_duration.observe(revalidation_duration.as_secs_f64()); }); - log::debug!( - target:LOG_TARGET, - "view::revalidate: at {:?} count: {}/{} took {:?}", - self.at.hash, - validation_results.len(), + debug!( + target: LOG_TARGET, + at_hash = ?self.at.hash, + count = validation_results.len(), batch_len, - revalidation_duration + duration = ?revalidation_duration, + "view::revalidate" ); - log_xt_trace!(data:tuple, target:LOG_TARGET, validation_results.iter().map(|x| (x.1, &x.0)), "[{:?}] view::revalidateresult: {:?}"); - + log_xt_trace!(data:tuple, target:LOG_TARGET, validation_results.iter().map(|x| (x.1, &x.0)), "view::revalidate result: {:?}"); for (validation_result, tx_hash, tx) in validation_results { match validation_result { Ok(Err(TransactionValidityError::Invalid(_))) => { @@ -330,33 +347,42 @@ where ), ); }, - Ok(Err(TransactionValidityError::Unknown(e))) => { - log::trace!( + Ok(Err(TransactionValidityError::Unknown(error))) => { + trace!( target: LOG_TARGET, - "[{:?}]: Removing. Cannot determine transaction validity: {:?}", - tx_hash, - e + ?tx_hash, + ?error, + "Removing. Cannot determine transaction validity" ); invalid_hashes.push(tx_hash); }, - Err(validation_err) => { - log::trace!( + Err(error) => { + trace!( target: LOG_TARGET, - "[{:?}]: Removing due to error during revalidation: {}", - tx_hash, - validation_err + ?tx_hash, + %error, + "Removing due to error during revalidation" ); invalid_hashes.push(tx_hash); }, } } - log::trace!(target:LOG_TARGET, "view::revalidate: sending revalidation result at {}", self.at.hash); - if let Err(e) = revalidation_result_tx + trace!( + target: LOG_TARGET, + at_hash = ?self.at.hash, + "view::revalidate: sending revalidation result" + ); + if let Err(error) = revalidation_result_tx .send(RevalidationResult { invalid_hashes, revalidated }) .await { - log::trace!(target:LOG_TARGET, "view::revalidate: sending revalidation_result at {} failed {:?}", self.at.hash, e); + trace!( + target: LOG_TARGET, + at_hash = ?self.at.hash, + ?error, + "view::revalidate: sending revalidation_result failed" + ); } } @@ -374,7 +400,11 @@ where super::revalidation_worker::RevalidationQueue, >, ) { - log::trace!(target:LOG_TARGET,"view::start_background_revalidation: at {}", view.at.hash); + trace!( + target: LOG_TARGET, + at_hash = ?view.at.hash, + "view::start_background_revalidation" + ); let (finish_revalidation_request_tx, finish_revalidation_request_rx) = tokio::sync::mpsc::channel(1); let (revalidation_result_tx, revalidation_result_rx) = tokio::sync::mpsc::channel(1); @@ -404,10 +434,14 @@ where /// /// Refer to [*View revalidation*](../index.html#view-revalidation) for more details. pub(super) async fn finish_revalidation(&self) { - log::trace!(target:LOG_TARGET,"view::finish_revalidation: at {}", self.at.hash); + trace!( + target: LOG_TARGET, + at_hash = ?self.at.hash, + "view::finish_revalidation" + ); let Some(revalidation_worker_channels) = self.revalidation_worker_channels.lock().take() else { - log::trace!(target:LOG_TARGET, "view::finish_revalidation: no finish_revalidation_request_tx"); + trace!(target:LOG_TARGET, "view::finish_revalidation: no finish_revalidation_request_tx"); return }; @@ -417,8 +451,13 @@ where } = revalidation_worker_channels; if let Some(finish_revalidation_request_tx) = finish_revalidation_request_tx { - if let Err(e) = finish_revalidation_request_tx.send(()).await { - log::trace!(target:LOG_TARGET, "view::finish_revalidation: sending cancellation request at {} failed {:?}", self.at.hash, e); + if let Err(error) = finish_revalidation_request_tx.send(()).await { + trace!( + target: LOG_TARGET, + at_hash = ?self.at.hash, + %error, + "view::finish_revalidation: sending cancellation request failed" + ); } } @@ -444,13 +483,13 @@ where ); }); - log::debug!( - target:LOG_TARGET, - "view::finish_revalidation: applying revalidation result invalid: {} revalidated: {} at {:?} took {:?}", - revalidation_result.invalid_hashes.len(), - revalidated_len, - self.at.hash, - start.elapsed() + debug!( + target: LOG_TARGET, + invalid = revalidation_result.invalid_hashes.len(), + revalidated = revalidated_len, + at_hash = ?self.at.hash, + duration = ?start.elapsed(), + "view::finish_revalidation: applying revalidation result" ); } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index 43ed5bbf8869..c4209a7d7f41 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -42,6 +42,7 @@ use std::{ sync::Arc, time::Instant, }; +use tracing::{trace, warn}; /// Helper struct to maintain the context for pending transaction submission, executed for /// newly inserted views. @@ -258,9 +259,14 @@ where .find_or_first(Result::is_ok); match result { - Some(Err(err)) => { - log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err); - Err(err) + Some(Err(error)) => { + trace!( + target: LOG_TARGET, + ?tx_hash, + %error, + "submit_local: err" + ); + Err(error) }, None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None)), Some(Ok(r)) => Ok(r.into()), @@ -314,9 +320,14 @@ where .find_or_first(Result::is_ok); match result { - Some(Err(err)) => { - log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err); - return Err(err); + Some(Err(error)) => { + trace!( + target: LOG_TARGET, + ?tx_hash, + %error, + "submit_and_watch: err" + ); + return Err(error); }, Some(Ok(result)) => Ok(ViewStoreSubmitOutcome::from(result).with_watcher(external_watcher)), @@ -422,8 +433,12 @@ where finalized_hash: Block::Hash, tree_route: &[Block::Hash], ) -> Vec> { - log::trace!(target: LOG_TARGET, "finalize_route finalized_hash:{finalized_hash:?} tree_route: {tree_route:?}"); - + trace!( + target: LOG_TARGET, + ?finalized_hash, + ?tree_route, + "finalize_route" + ); let mut finalized_transactions = Vec::new(); for block in tree_route.iter().chain(std::iter::once(&finalized_hash)) { @@ -431,8 +446,12 @@ where .api .block_body(*block) .await - .unwrap_or_else(|e| { - log::warn!(target: LOG_TARGET, "Finalize route: error request: {}", e); + .unwrap_or_else(|error| { + warn!( + target: LOG_TARGET, + %error, + "Finalize route: error request" + ); None }) .unwrap_or_default() @@ -500,7 +519,11 @@ where active_views.insert(view.at.hash, view.clone()); most_recent_view_lock.replace(view.at.hash); }; - log::trace!(target:LOG_TARGET,"insert_new_view: inactive_views: {:?}", self.inactive_views.read().keys()); + trace!( + target: LOG_TARGET, + inactive_views = ?self.inactive_views.read().keys(), + "insert_new_view" + ); } /// Returns an optional reference to the view at given hash. @@ -557,8 +580,11 @@ where .for_each(drop); } - log::trace!(target:LOG_TARGET,"handle_pre_finalized: removed_views: {:?}", removed_views); - + trace!( + target: LOG_TARGET, + ?removed_views, + "handle_pre_finalized" + ); removed_views.iter().for_each(|view| { self.dropped_stream_controller.remove_view(*view); }); @@ -613,10 +639,18 @@ where retain }); - log::trace!(target:LOG_TARGET,"handle_finalized: inactive_views: {:?}", inactive_views.keys()); + trace!( + target: LOG_TARGET, + inactive_views = ?inactive_views.keys(), + "handle_finalized" + ); } - log::trace!(target:LOG_TARGET,"handle_finalized: dropped_views: {:?}", dropped_views); + trace!( + target: LOG_TARGET, + ?dropped_views, + "handle_finalized" + ); self.listener.remove_stale_controllers(); self.dropped_stream_controller.remove_finalized_txs(finalized_xts.clone()); @@ -647,7 +681,11 @@ where .collect::>() }; futures::future::join_all(finish_revalidation_futures).await; - log::trace!(target:LOG_TARGET,"finish_background_revalidations took {:?}", start.elapsed()); + trace!( + target: LOG_TARGET, + duration = ?start.elapsed(), + "finish_background_revalidations" + ); } /// Replaces an existing transaction in the view_store with a new one. @@ -679,10 +717,16 @@ where return }; - let xt_hash = self.api.hash_and_length(&xt).0; - log::trace!(target:LOG_TARGET,"[{replaced:?}] replace_transaction wtih {xt_hash:?}, w:{watched}"); + let tx_hash = self.api.hash_and_length(&xt).0; + trace!( + target: LOG_TARGET, + ?replaced, + ?tx_hash, + watched, + "replace_transaction" + ); - self.replace_transaction_in_views(source, xt, xt_hash, replaced, watched).await; + self.replace_transaction_in_views(source, xt, tx_hash, replaced, watched).await; if let Some(replacement) = self.pending_txs_tasks.write().get_mut(&replaced) { replacement.mark_processed(); @@ -723,32 +767,36 @@ where view: Arc>, source: TimedTransactionSource, xt: ExtrinsicFor, - xt_hash: ExtrinsicHash, + tx_hash: ExtrinsicHash, watched: bool, ) { if watched { match view.submit_and_watch(source, xt).await { Ok(mut result) => { self.listener.add_view_watcher_for_tx( - xt_hash, + tx_hash, view.at.hash, result.expect_watcher().into_stream().boxed(), ); }, - Err(e) => { - log::trace!( - target:LOG_TARGET, - "[{:?}] replace_transaction: submit_and_watch to {} failed {}", - xt_hash, view.at.hash, e + Err(error) => { + trace!( + target: LOG_TARGET, + ?tx_hash, + at_hash = ?view.at.hash, + %error, + "replace_transaction: submit_and_watch failed" ); }, } } else { - if let Some(Err(e)) = view.submit_many(std::iter::once((source, xt))).await.pop() { - log::trace!( - target:LOG_TARGET, - "[{:?}] replace_transaction: submit to {} failed {}", - xt_hash, view.at.hash, e + if let Some(Err(error)) = view.submit_many(std::iter::once((source, xt))).await.pop() { + trace!( + target: LOG_TARGET, + ?tx_hash, + at_hash = ?view.at.hash, + %error, + "replace_transaction: submit failed" ); } } @@ -762,15 +810,15 @@ where &self, source: TimedTransactionSource, xt: ExtrinsicFor, - xt_hash: ExtrinsicHash, + tx_hash: ExtrinsicHash, replaced: ExtrinsicHash, watched: bool, ) { - if watched && !self.listener.contains_tx(&xt_hash) { - log::trace!( - target:LOG_TARGET, - "error: replace_transaction_in_views: no listener for watched transaction {:?}", - xt_hash, + if watched && !self.listener.contains_tx(&tx_hash) { + trace!( + target: LOG_TARGET, + ?tx_hash, + "error: replace_transaction_in_views: no listener for watched transaction" ); return; } @@ -787,7 +835,7 @@ where view.clone(), source.clone(), xt.clone(), - xt_hash, + tx_hash, watched, ) }) diff --git a/substrate/client/transaction-pool/src/graph/future.rs b/substrate/client/transaction-pool/src/graph/future.rs index 2c1e64c04b7f..848893b026c5 100644 --- a/substrate/client/transaction-pool/src/graph/future.rs +++ b/substrate/client/transaction-pool/src/graph/future.rs @@ -27,7 +27,7 @@ use sp_runtime::transaction_validity::TransactionTag as Tag; use std::time::Instant; use super::base_pool::Transaction; -use crate::{common::log_xt::log_xt_trace, LOG_TARGET}; +use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET}; /// Transaction with partially satisfied dependencies. pub struct WaitingTransaction { @@ -184,7 +184,7 @@ impl }) .collect::>(); - log_xt_trace!(target: LOG_TARGET, &pruned, "[{:?}] FutureTransactions: removed while pruning tags."); + log_xt_trace!(target: LOG_TARGET, &pruned, "FutureTransactions: removed while pruning tags."); self.remove(&pruned) } diff --git a/substrate/client/transaction-pool/src/graph/pool.rs b/substrate/client/transaction-pool/src/graph/pool.rs index 403712662ada..52b12e3fabae 100644 --- a/substrate/client/transaction-pool/src/graph/pool.rs +++ b/substrate/client/transaction-pool/src/graph/pool.rs @@ -16,7 +16,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use crate::{common::log_xt::log_xt_trace, LOG_TARGET}; +use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET}; use futures::{channel::mpsc::Receiver, Future}; use indexmap::IndexMap; use sc_transaction_pool_api::error; @@ -395,7 +395,7 @@ impl Pool { let pruned_hashes = reverified_transactions.keys().map(Clone::clone).collect::>(); log::debug!(target: LOG_TARGET, "Pruning at {:?}. Resubmitting transactions: {}, reverification took: {:?}", &at, reverified_transactions.len(), now.elapsed()); - log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "[{:?}] Resubmitting transaction: {:?}"); + log_xt_trace!(data: tuple, target: LOG_TARGET, &reverified_transactions, "Resubmitting transaction: {:?}"); // And finally - submit reverified transactions back to the pool self.validated_pool.resubmit_pruned( diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs index bc2b07896dba..bbfcb9b40aca 100644 --- a/substrate/client/transaction-pool/src/graph/validated_pool.rs +++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs @@ -21,7 +21,7 @@ use std::{ sync::Arc, }; -use crate::{common::log_xt::log_xt_trace, LOG_TARGET}; +use crate::{common::tracing_log_xt::log_xt_trace, LOG_TARGET}; use futures::channel::mpsc::{channel, Sender}; use parking_lot::{Mutex, RwLock}; use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions, TransactionPriority}; @@ -706,7 +706,7 @@ impl ValidatedPool { let invalid = self.pool.write().remove_subtree(hashes); log::trace!(target: LOG_TARGET, "Removed invalid transactions: {:?}", invalid.len()); - log_xt_trace!(target: LOG_TARGET, invalid.iter().map(|t| t.hash), "{:?} Removed invalid transaction"); + log_xt_trace!(target: LOG_TARGET, invalid.iter().map(|t| t.hash), "Removed invalid transaction"); let mut listener = self.listener.write(); for tx in &invalid {