diff --git a/prdoc/pr_6647.prdoc b/prdoc/pr_6647.prdoc new file mode 100644 index 000000000000..47af9924ef1c --- /dev/null +++ b/prdoc/pr_6647.prdoc @@ -0,0 +1,8 @@ +title: '`fatxpool`: proper handling of priorities when mempool is full' +doc: +- audience: Node Dev + description: |- + Higher-priority transactions can now replace lower-priority transactions even when the internal _tx_mem_pool_ is full. +crates: +- name: sc-transaction-pool + bump: minor diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs index d69aa37c94a1..bf61558b00b0 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/dropped_watcher.rs @@ -53,11 +53,13 @@ pub struct DroppedTransaction { } impl DroppedTransaction { - fn new_usurped(tx_hash: Hash, by: Hash) -> Self { + /// Creates a new instance with reason set to `DroppedReason::Usurped(by)`. + pub fn new_usurped(tx_hash: Hash, by: Hash) -> Self { Self { reason: DroppedReason::Usurped(by), tx_hash } } - fn new_enforced_by_limts(tx_hash: Hash) -> Self { + /// Creates a new instance with reason set to `DroppedReason::LimitsEnforced`. + pub fn new_enforced_by_limts(tx_hash: Hash) -> Self { Self { reason: DroppedReason::LimitsEnforced, tx_hash } } } @@ -256,11 +258,13 @@ where self.future_transaction_views.entry(tx_hash).or_default().insert(block_hash); }, TransactionStatus::Ready | TransactionStatus::InBlock(..) => { - // note: if future transaction was once seens as the ready we may want to treat it - // as ready transactions. Unreferenced future transactions are more likely to be - // removed when the last referencing view is removed then ready transactions. - // Transcaction seen as ready is likely quite close to be included in some - // future fork. + // note: if future transaction was once seen as the ready we may want to treat it + // as ready transaction. The rationale behind this is as follows: we want to remove + // unreferenced future transactions when the last referencing view is removed (to + // avoid clogging mempool). For ready transactions we prefer to keep them in mempool + // even if no view is currently referencing them. Future transcaction once seen as + // ready is likely quite close to be included in some future fork (it is close to be + // ready, so we make exception and treat such transaction as ready). if let Some(mut views) = self.future_transaction_views.remove(&tx_hash) { views.insert(block_hash); self.ready_transaction_views.insert(tx_hash, views); diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs index e57256943ccf..766045718252 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/fork_aware_txpool.rs @@ -31,7 +31,10 @@ use crate::{ api::FullChainApi, common::log_xt::log_xt_trace, enactment_state::{EnactmentAction, EnactmentState}, - fork_aware_txpool::{dropped_watcher::DroppedReason, revalidation_worker}, + fork_aware_txpool::{ + dropped_watcher::{DroppedReason, DroppedTransaction}, + revalidation_worker, + }, graph::{ self, base_pool::{TimedTransactionSource, Transaction}, @@ -49,14 +52,16 @@ use futures::{ use parking_lot::Mutex; use prometheus_endpoint::Registry as PrometheusRegistry; use sc_transaction_pool_api::{ - ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolStatus, TransactionFor, - TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash, + error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream, + MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionPriority, + TransactionSource, TransactionStatusStreamFor, TxHash, }; use sp_blockchain::{HashAndNumber, TreeRoute}; use sp_core::traits::SpawnEssentialNamed; use sp_runtime::{ generic::BlockId, traits::{Block as BlockT, NumberFor}, + transaction_validity::{TransactionValidityError, ValidTransaction}, }; use std::{ collections::{HashMap, HashSet}, @@ -287,7 +292,7 @@ where DroppedReason::LimitsEnforced => {}, }; - mempool.remove_dropped_transaction(&dropped_tx_hash).await; + mempool.remove_transaction(&dropped_tx_hash); view_store.listener.transaction_dropped(dropped); import_notification_sink.clean_notified_items(&[dropped_tx_hash]); } @@ -598,7 +603,7 @@ where /// out: /// [ Ok(xth0), Ok(xth1), Err ] /// ``` -fn reduce_multiview_result(input: HashMap>>) -> Vec> { +fn reduce_multiview_result(input: HashMap>>) -> Vec> { let mut values = input.values(); let Some(first) = values.next() else { return Default::default(); @@ -650,9 +655,28 @@ where let mempool_results = self.mempool.extend_unwatched(source, &xts); if view_store.is_empty() { - return Ok(mempool_results.into_iter().map(|r| r.map(|r| r.hash)).collect::>()) + return Ok(mempool_results + .into_iter() + .map(|r| r.map(|r| r.hash).map_err(Into::into)) + .collect::>()) } + // Submit all the transactions to the mempool + let retries = mempool_results + .into_iter() + .zip(xts.clone()) + .map(|(result, xt)| async move { + match result { + Err(TxPoolApiError::ImmediatelyDropped) => + self.attempt_transaction_replacement(source, false, xt).await, + _ => result, + } + }) + .collect::>(); + + let mempool_results = futures::future::join_all(retries).await; + + // Collect transactions that were successfully submitted to the mempool... let to_be_submitted = mempool_results .iter() .zip(xts) @@ -664,22 +688,47 @@ where self.metrics .report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _)); + // ... and submit them to the view_store. Please note that transactions rejected by mempool + // are not sent here. let mempool = self.mempool.clone(); let results_map = view_store.submit(to_be_submitted.into_iter()).await; let mut submission_results = reduce_multiview_result(results_map).into_iter(); + // Note for composing final result: + // + // For each failed insertion into the mempool, the mempool result should be placed into + // the returned vector. + // + // For each successful insertion into the mempool, the corresponding + // view_store submission result needs to be examined: + // - If there is an error during view_store submission, the transaction is removed from + // the mempool, and the final result recorded in the vector for this transaction is the + // view_store submission error. + // + // - If the view_store submission is successful, the transaction priority is updated in the + // mempool. + // + // Finally, it collects the hashes of updated transactions or submission errors (either + // from the mempool or view_store) into a returned vector. Ok(mempool_results .into_iter() .map(|result| { - result.and_then(|insertion| { - submission_results - .next() - .expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.") - .inspect_err(|_| - mempool.remove(insertion.hash) - ) + result + .map_err(Into::into) + .and_then(|insertion| { + submission_results + .next() + .expect("The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.") + .inspect_err(|_|{ + mempool.remove_transaction(&insertion.hash); + }) }) + }) + .map(|r| r.map(|r| { + mempool.update_transaction_priority(&r); + r.hash() + })) .collect::>()) } @@ -712,10 +761,13 @@ where ) -> Result>>, Self::Error> { log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count()); let xt = Arc::from(xt); - let InsertionInfo { hash: xt_hash, source: timed_source } = + + let InsertionInfo { hash: xt_hash, source: timed_source, .. } = match self.mempool.push_watched(source, xt.clone()) { Ok(result) => result, - Err(e) => return Err(e), + Err(TxPoolApiError::ImmediatelyDropped) => + self.attempt_transaction_replacement(source, true, xt.clone()).await?, + Err(e) => return Err(e.into()), }; self.metrics.report(|metrics| metrics.submitted_transactions.inc()); @@ -723,7 +775,13 @@ where self.view_store .submit_and_watch(at, timed_source, xt) .await - .inspect_err(|_| self.mempool.remove(xt_hash)) + .inspect_err(|_| { + self.mempool.remove_transaction(&xt_hash); + }) + .map(|mut outcome| { + self.mempool.update_transaction_priority(&outcome); + outcome.expect_watcher() + }) } /// Intended to remove transactions identified by the given hashes, and any dependent @@ -828,22 +886,16 @@ where } } -impl sc_transaction_pool_api::LocalTransactionPool - for ForkAwareTxPool, Block> +impl sc_transaction_pool_api::LocalTransactionPool + for ForkAwareTxPool where Block: BlockT, + ChainApi: 'static + graph::ChainApi, ::Hash: Unpin, - Client: sp_api::ProvideRuntimeApi - + sc_client_api::BlockBackend - + sc_client_api::blockchain::HeaderBackend - + sp_runtime::traits::BlockIdTo - + sp_blockchain::HeaderMetadata, - Client: Send + Sync + 'static, - Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue, { type Block = Block; - type Hash = ExtrinsicHash>; - type Error = as graph::ChainApi>::Error; + type Hash = ExtrinsicHash; + type Error = ChainApi::Error; fn submit_local( &self, @@ -852,12 +904,29 @@ where ) -> Result { log::debug!(target: LOG_TARGET, "fatp::submit_local views:{}", self.active_views_count()); let xt = Arc::from(xt); - let InsertionInfo { hash: xt_hash, .. } = self - .mempool - .extend_unwatched(TransactionSource::Local, &[xt.clone()]) - .remove(0)?; - self.view_store.submit_local(xt).or_else(|_| Ok(xt_hash)) + let result = + self.mempool.extend_unwatched(TransactionSource::Local, &[xt.clone()]).remove(0); + + let insertion = match result { + Err(TxPoolApiError::ImmediatelyDropped) => self.attempt_transaction_replacement_sync( + TransactionSource::Local, + false, + xt.clone(), + ), + _ => result, + }?; + + self.view_store + .submit_local(xt) + .inspect_err(|_| { + self.mempool.remove_transaction(&insertion.hash); + }) + .map(|outcome| { + self.mempool.update_transaction_priority(&outcome); + outcome.hash() + }) + .or_else(|_| Ok(insertion.hash)) } } @@ -1109,7 +1178,11 @@ where .await .into_iter() .zip(hashes) - .map(|(result, tx_hash)| result.or_else(|_| Err(tx_hash))) + .map(|(result, tx_hash)| { + result + .map(|outcome| self.mempool.update_transaction_priority(&outcome.into())) + .or_else(|_| Err(tx_hash)) + }) .collect::>(); let submitted_count = watched_results.len(); @@ -1131,7 +1204,7 @@ where for result in watched_results { if let Err(tx_hash) = result { self.view_store.listener.invalidate_transactions(&[tx_hash]); - self.mempool.remove(tx_hash); + self.mempool.remove_transaction(&tx_hash); } } } @@ -1263,6 +1336,101 @@ where fn tx_hash(&self, xt: &TransactionFor) -> TxHash { self.api.hash_and_length(xt).0 } + + /// Attempts to find and replace a lower-priority transaction in the transaction pool with a new + /// one. + /// + /// This asynchronous function verifies the new transaction against the most recent view. If a + /// transaction with a lower priority exists in the transaction pool, it is replaced with the + /// new transaction. + /// + /// If no lower-priority transaction is found, the function returns an error indicating the + /// transaction was dropped immediately. + async fn attempt_transaction_replacement( + &self, + source: TransactionSource, + watched: bool, + xt: ExtrinsicFor, + ) -> Result>, TxPoolApiError> { + let at = self + .view_store + .most_recent_view + .read() + .ok_or(TxPoolApiError::ImmediatelyDropped)?; + + let (best_view, _) = self + .view_store + .get_view_at(at, false) + .ok_or(TxPoolApiError::ImmediatelyDropped)?; + + let (xt_hash, validated_tx) = best_view + .pool + .verify_one( + best_view.at.hash, + best_view.at.number, + TimedTransactionSource::from_transaction_source(source, false), + xt.clone(), + crate::graph::CheckBannedBeforeVerify::Yes, + ) + .await; + + let Some(priority) = validated_tx.priority() else { + return Err(TxPoolApiError::ImmediatelyDropped) + }; + + self.attempt_transaction_replacement_inner(xt, xt_hash, priority, source, watched) + } + + /// Sync version of [`Self::attempt_transaction_replacement`]. + fn attempt_transaction_replacement_sync( + &self, + source: TransactionSource, + watched: bool, + xt: ExtrinsicFor, + ) -> Result>, TxPoolApiError> { + let at = self + .view_store + .most_recent_view + .read() + .ok_or(TxPoolApiError::ImmediatelyDropped)?; + + let ValidTransaction { priority, .. } = self + .api + .validate_transaction_blocking(at, TransactionSource::Local, Arc::from(xt.clone())) + .map_err(|_| TxPoolApiError::ImmediatelyDropped)? + .map_err(|e| match e { + TransactionValidityError::Invalid(i) => TxPoolApiError::InvalidTransaction(i), + TransactionValidityError::Unknown(u) => TxPoolApiError::UnknownTransaction(u), + })?; + let xt_hash = self.hash_of(&xt); + self.attempt_transaction_replacement_inner(xt, xt_hash, priority, source, watched) + } + + fn attempt_transaction_replacement_inner( + &self, + xt: ExtrinsicFor, + tx_hash: ExtrinsicHash, + priority: TransactionPriority, + source: TransactionSource, + watched: bool, + ) -> Result>, TxPoolApiError> { + let insertion_info = + self.mempool.try_insert_with_replacement(xt, priority, source, watched)?; + + for worst_hash in &insertion_info.removed { + log::trace!(target: LOG_TARGET, "removed: {worst_hash:?} replaced by {tx_hash:?}"); + self.view_store + .listener + .transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash)); + + self.view_store + .remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| { + listener.limits_enforced(&removed_tx_hash); + }); + } + + return Ok(insertion_info) + } } #[async_trait] @@ -1410,7 +1578,7 @@ mod reduce_multiview_result_tests { fn empty() { sp_tracing::try_init_simple(); let input = HashMap::default(); - let r = reduce_multiview_result::(input); + let r = reduce_multiview_result::(input); assert!(r.is_empty()); } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs index 989ae4425dc4..c8a4d0c72dd3 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/tx_mem_pool.rs @@ -26,7 +26,10 @@ //! it), while on other forks tx can be valid. Depending on which view is chosen to be cloned, //! such transaction could not be present in the newly created view. -use super::{metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener}; +use super::{ + metrics::MetricsLink as PrometheusMetrics, multi_view_listener::MultiViewListener, + view_store::ViewStoreSubmitOutcome, +}; use crate::{ common::log_xt::log_xt_trace, graph, @@ -35,15 +38,20 @@ use crate::{ }; use futures::FutureExt; use itertools::Itertools; -use sc_transaction_pool_api::TransactionSource; +use parking_lot::RwLock; +use sc_transaction_pool_api::{TransactionPriority, TransactionSource}; use sp_blockchain::HashAndNumber; use sp_runtime::{ traits::Block as BlockT, transaction_validity::{InvalidTransaction, TransactionValidityError}, }; use std::{ + cmp::Ordering, collections::HashMap, - sync::{atomic, atomic::AtomicU64, Arc}, + sync::{ + atomic::{self, AtomicU64}, + Arc, + }, time::Instant, }; @@ -77,6 +85,9 @@ where source: TimedTransactionSource, /// When the transaction was revalidated, used to periodically revalidate the mem pool buffer. validated_at: AtomicU64, + /// Priority of transaction at some block. It is assumed it will not be changed often. None if + /// not known. + priority: RwLock>, //todo: we need to add future / ready status at finalized block. //If future transactions are stuck in tx_mem_pool (due to limits being hit), we need a means // to replace them somehow with newly coming transactions. @@ -101,23 +112,50 @@ where /// Creates a new instance of wrapper for unwatched transaction. fn new_unwatched(source: TransactionSource, tx: ExtrinsicFor, bytes: usize) -> Self { - Self { - watched: false, - tx, - source: TimedTransactionSource::from_transaction_source(source, true), - validated_at: AtomicU64::new(0), - bytes, - } + Self::new(false, source, tx, bytes) } /// Creates a new instance of wrapper for watched transaction. fn new_watched(source: TransactionSource, tx: ExtrinsicFor, bytes: usize) -> Self { + Self::new(true, source, tx, bytes) + } + + /// Creates a new instance of wrapper for a transaction with no priority. + fn new( + watched: bool, + source: TransactionSource, + tx: ExtrinsicFor, + bytes: usize, + ) -> Self { + Self::new_with_optional_priority(watched, source, tx, bytes, None) + } + + /// Creates a new instance of wrapper for a transaction with given priority. + fn new_with_priority( + watched: bool, + source: TransactionSource, + tx: ExtrinsicFor, + bytes: usize, + priority: TransactionPriority, + ) -> Self { + Self::new_with_optional_priority(watched, source, tx, bytes, Some(priority)) + } + + /// Creates a new instance of wrapper for a transaction with optional priority. + fn new_with_optional_priority( + watched: bool, + source: TransactionSource, + tx: ExtrinsicFor, + bytes: usize, + priority: Option, + ) -> Self { Self { - watched: true, + watched, tx, source: TimedTransactionSource::from_transaction_source(source, true), validated_at: AtomicU64::new(0), bytes, + priority: priority.into(), } } @@ -132,6 +170,11 @@ where pub(crate) fn source(&self) -> TimedTransactionSource { self.source.clone() } + + /// Returns the priority of the transaction. + pub(crate) fn priority(&self) -> Option { + *self.priority.read() + } } impl Size for Arc> @@ -191,11 +234,15 @@ where pub(super) struct InsertionInfo { pub(super) hash: Hash, pub(super) source: TimedTransactionSource, + pub(super) removed: Vec, } impl InsertionInfo { fn new(hash: Hash, source: TimedTransactionSource) -> Self { - Self { hash, source } + Self::new_with_removed(hash, source, Default::default()) + } + fn new_with_removed(hash: Hash, source: TimedTransactionSource, removed: Vec) -> Self { + Self { hash, source, removed } } } @@ -279,27 +326,109 @@ where &self, hash: ExtrinsicHash, tx: TxInMemPool, - ) -> Result>, ChainApi::Error> { - let bytes = self.transactions.bytes(); + ) -> Result>, sc_transaction_pool_api::error::Error> { let mut transactions = self.transactions.write(); + + let bytes = self.transactions.bytes(); + let result = match ( - !self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes), + self.is_limit_exceeded(transactions.len() + 1, bytes + tx.bytes), transactions.contains_key(&hash), ) { - (true, false) => { + (false, false) => { let source = tx.source(); transactions.insert(hash, Arc::from(tx)); Ok(InsertionInfo::new(hash, source)) }, (_, true) => - Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash)).into()), - (false, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped.into()), + Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash))), + (true, _) => Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped), }; log::trace!(target: LOG_TARGET, "[{:?}] mempool::try_insert: {:?}", hash, result.as_ref().map(|r| r.hash)); result } + /// Attempts to insert a new transaction in the memory pool and drop some worse existing + /// transactions. + /// + /// A "worse" transaction means transaction with lower priority, or older transaction with the + /// same prio. + /// + /// This operation will not overflow the limit of the mempool. It means that cumulative + /// size of removed transactions will be equal (or greated) then size of newly inserted + /// transaction. + /// + /// Returns a `Result` containing `InsertionInfo` if the new transaction is successfully + /// inserted; otherwise, returns an appropriate error indicating the failure. + pub(super) fn try_insert_with_replacement( + &self, + new_tx: ExtrinsicFor, + priority: TransactionPriority, + source: TransactionSource, + watched: bool, + ) -> Result>, sc_transaction_pool_api::error::Error> { + let (hash, length) = self.api.hash_and_length(&new_tx); + let new_tx = TxInMemPool::new_with_priority(watched, source, new_tx, length, priority); + if new_tx.bytes > self.max_transactions_total_bytes { + return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped); + } + + let mut transactions = self.transactions.write(); + + if transactions.contains_key(&hash) { + return Err(sc_transaction_pool_api::error::Error::AlreadyImported(Box::new(hash))); + } + + let mut sorted = transactions + .iter() + .filter_map(|(h, v)| v.priority().map(|_| (*h, v.clone()))) + .collect::>(); + + // When pushing higher prio transaction, we need to find a number of lower prio txs, such + // that the sum of their bytes is ge then size of new tx. Otherwise we could overflow size + // limits. Naive way to do it - rev-sort by priority and eat the tail. + + // reverse (oldest, lowest prio last) + sorted.sort_by(|(_, a), (_, b)| match b.priority().cmp(&a.priority()) { + Ordering::Equal => match (a.source.timestamp, b.source.timestamp) { + (Some(a), Some(b)) => b.cmp(&a), + _ => Ordering::Equal, + }, + ordering => ordering, + }); + + let mut total_size_removed = 0usize; + let mut to_be_removed = vec![]; + let free_bytes = self.max_transactions_total_bytes - self.transactions.bytes(); + + loop { + let Some((worst_hash, worst_tx)) = sorted.pop() else { + return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped); + }; + + if worst_tx.priority() >= new_tx.priority() { + return Err(sc_transaction_pool_api::error::Error::ImmediatelyDropped); + } + + total_size_removed += worst_tx.bytes; + to_be_removed.push(worst_hash); + + if free_bytes + total_size_removed >= new_tx.bytes { + break; + } + } + + let source = new_tx.source(); + transactions.insert(hash, Arc::from(new_tx)); + for worst_hash in &to_be_removed { + transactions.remove(worst_hash); + } + debug_assert!(!self.is_limit_exceeded(transactions.len(), self.transactions.bytes())); + + Ok(InsertionInfo::new_with_removed(hash, source, to_be_removed)) + } + /// Adds a new unwatched transactions to the internal buffer not exceeding the limit. /// /// Returns the vector of results for each transaction, the order corresponds to the input @@ -308,7 +437,8 @@ where &self, source: TransactionSource, xts: &[ExtrinsicFor], - ) -> Vec>, ChainApi::Error>> { + ) -> Vec>, sc_transaction_pool_api::error::Error>> + { let result = xts .iter() .map(|xt| { @@ -325,20 +455,11 @@ where &self, source: TransactionSource, xt: ExtrinsicFor, - ) -> Result>, ChainApi::Error> { + ) -> Result>, sc_transaction_pool_api::error::Error> { let (hash, length) = self.api.hash_and_length(&xt); self.try_insert(hash, TxInMemPool::new_watched(source, xt.clone(), length)) } - /// Removes transaction from the memory pool which are specified by the given list of hashes. - pub(super) async fn remove_dropped_transaction( - &self, - dropped: &ExtrinsicHash, - ) -> Option>> { - log::debug!(target: LOG_TARGET, "[{:?}] mempool::remove_dropped_transaction", dropped); - self.transactions.write().remove(dropped) - } - /// Clones and returns a `HashMap` of references to all unwatched transactions in the memory /// pool. pub(super) fn clone_unwatched( @@ -362,9 +483,13 @@ where .collect::>() } - /// Removes a transaction from the memory pool based on a given hash. - pub(super) fn remove(&self, hash: ExtrinsicHash) { - let _ = self.transactions.write().remove(&hash); + /// Removes a transaction with given hash from the memory pool. + pub(super) fn remove_transaction( + &self, + hash: &ExtrinsicHash, + ) -> Option>> { + log::debug!(target: LOG_TARGET, "[{hash:?}] mempool::remove_transaction"); + self.transactions.write().remove(hash) } /// Revalidates a batch of transactions against the provided finalized block. @@ -462,6 +587,17 @@ where }); self.listener.invalidate_transactions(&invalid_hashes); } + + /// Updates the priority of transaction stored in mempool using provided view_store submission + /// outcome. + pub(super) fn update_transaction_priority(&self, outcome: &ViewStoreSubmitOutcome) { + outcome.priority().map(|priority| { + self.transactions + .write() + .get_mut(&outcome.hash()) + .map(|p| *p.priority.write() = Some(priority)) + }); + } } #[cfg(test)] @@ -583,6 +719,9 @@ mod tx_mem_pool_tests { assert_eq!(mempool.unwatched_and_watched_count(), (10, 5)); } + /// size of large extrinsic + const LARGE_XT_SIZE: usize = 1129; + fn large_uxt(x: usize) -> Extrinsic { ExtrinsicBuilder::new_include_data(vec![x as u8; 1024]).build() } @@ -592,8 +731,7 @@ mod tx_mem_pool_tests { sp_tracing::try_init_simple(); let max = 10; let api = Arc::from(TestApi::default()); - //size of large extrinsic is: 1129 - let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * 1129); + let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE); let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::>(); @@ -617,4 +755,200 @@ mod tx_mem_pool_tests { sc_transaction_pool_api::error::Error::ImmediatelyDropped )); } + + #[test] + fn replacing_txs_works_for_same_tx_size() { + sp_tracing::try_init_simple(); + let max = 10; + let api = Arc::from(TestApi::default()); + let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE); + + let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::>(); + + let low_prio = 0u64; + let hi_prio = u64::MAX; + + let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1); + let (submit_outcomes, hashes): (Vec<_>, Vec<_>) = xts + .iter() + .map(|t| { + let h = api.hash_and_length(t).0; + (ViewStoreSubmitOutcome::new(h, Some(low_prio)), h) + }) + .unzip(); + + let results = mempool.extend_unwatched(TransactionSource::External, &xts); + assert!(results.iter().all(Result::is_ok)); + assert_eq!(mempool.bytes(), total_xts_bytes); + + submit_outcomes + .into_iter() + .for_each(|o| mempool.update_transaction_priority(&o)); + + let xt = Arc::from(large_uxt(98)); + let hash = api.hash_and_length(&xt).0; + let result = mempool + .try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false) + .unwrap(); + + assert_eq!(result.hash, hash); + assert_eq!(result.removed, hashes[0..1]); + } + + #[test] + fn replacing_txs_removes_proper_size_of_txs() { + sp_tracing::try_init_simple(); + let max = 10; + let api = Arc::from(TestApi::default()); + let mempool = TxMemPool::new_test(api.clone(), usize::MAX, max * LARGE_XT_SIZE); + + let xts = (0..max).map(|x| Arc::from(large_uxt(x))).collect::>(); + + let low_prio = 0u64; + let hi_prio = u64::MAX; + + let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1); + let (submit_outcomes, hashes): (Vec<_>, Vec<_>) = xts + .iter() + .map(|t| { + let h = api.hash_and_length(t).0; + (ViewStoreSubmitOutcome::new(h, Some(low_prio)), h) + }) + .unzip(); + + let results = mempool.extend_unwatched(TransactionSource::External, &xts); + assert!(results.iter().all(Result::is_ok)); + assert_eq!(mempool.bytes(), total_xts_bytes); + assert_eq!(total_xts_bytes, max * LARGE_XT_SIZE); + + submit_outcomes + .into_iter() + .for_each(|o| mempool.update_transaction_priority(&o)); + + //this one should drop 2 xts (size: 1130): + let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 1025]).build()); + let (hash, length) = api.hash_and_length(&xt); + assert_eq!(length, 1130); + let result = mempool + .try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false) + .unwrap(); + + assert_eq!(result.hash, hash); + assert_eq!(result.removed, hashes[0..2]); + } + + #[test] + fn replacing_txs_removes_proper_size_and_prios() { + sp_tracing::try_init_simple(); + const COUNT: usize = 10; + let api = Arc::from(TestApi::default()); + let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE); + + let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::>(); + + let hi_prio = u64::MAX; + + let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1); + let (submit_outcomes, hashes): (Vec<_>, Vec<_>) = xts + .iter() + .enumerate() + .map(|(prio, t)| { + let h = api.hash_and_length(t).0; + (ViewStoreSubmitOutcome::new(h, Some((COUNT - prio).try_into().unwrap())), h) + }) + .unzip(); + + let results = mempool.extend_unwatched(TransactionSource::External, &xts); + assert!(results.iter().all(Result::is_ok)); + assert_eq!(mempool.bytes(), total_xts_bytes); + + submit_outcomes + .into_iter() + .for_each(|o| mempool.update_transaction_priority(&o)); + + //this one should drop 3 xts (each of size 1129) + let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build()); + let (hash, length) = api.hash_and_length(&xt); + // overhead is 105, thus length: 105 + 2154 + assert_eq!(length, 2 * LARGE_XT_SIZE + 1); + let result = mempool + .try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false) + .unwrap(); + + assert_eq!(result.hash, hash); + assert!(result.removed.iter().eq(hashes[COUNT - 3..COUNT].iter().rev())); + } + + #[test] + fn replacing_txs_skips_lower_prio_tx() { + sp_tracing::try_init_simple(); + const COUNT: usize = 10; + let api = Arc::from(TestApi::default()); + let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE); + + let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::>(); + + let hi_prio = 100u64; + let low_prio = 10u64; + + let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1); + let submit_outcomes = xts + .iter() + .map(|t| { + let h = api.hash_and_length(t).0; + ViewStoreSubmitOutcome::new(h, Some(hi_prio)) + }) + .collect::>(); + + let results = mempool.extend_unwatched(TransactionSource::External, &xts); + assert!(results.iter().all(Result::is_ok)); + assert_eq!(mempool.bytes(), total_xts_bytes); + + submit_outcomes + .into_iter() + .for_each(|o| mempool.update_transaction_priority(&o)); + + let xt = Arc::from(large_uxt(98)); + let result = + mempool.try_insert_with_replacement(xt, low_prio, TransactionSource::External, false); + + // lower prio tx is rejected immediately + assert!(matches!( + result.unwrap_err(), + sc_transaction_pool_api::error::Error::ImmediatelyDropped + )); + } + + #[test] + fn replacing_txs_is_skipped_if_prios_are_not_set() { + sp_tracing::try_init_simple(); + const COUNT: usize = 10; + let api = Arc::from(TestApi::default()); + let mempool = TxMemPool::new_test(api.clone(), usize::MAX, COUNT * LARGE_XT_SIZE); + + let xts = (0..COUNT).map(|x| Arc::from(large_uxt(x))).collect::>(); + + let hi_prio = u64::MAX; + + let total_xts_bytes = xts.iter().fold(0, |r, x| r + api.hash_and_length(&x).1); + + let results = mempool.extend_unwatched(TransactionSource::External, &xts); + assert!(results.iter().all(Result::is_ok)); + assert_eq!(mempool.bytes(), total_xts_bytes); + + //this one could drop 3 xts (each of size 1129) + let xt = Arc::from(ExtrinsicBuilder::new_include_data(vec![98 as u8; 2154]).build()); + let length = api.hash_and_length(&xt).1; + // overhead is 105, thus length: 105 + 2154 + assert_eq!(length, 2 * LARGE_XT_SIZE + 1); + + let result = + mempool.try_insert_with_replacement(xt, hi_prio, TransactionSource::External, false); + + // we did not update priorities (update_transaction_priority was not called): + assert!(matches!( + result.unwrap_err(), + sc_transaction_pool_api::error::Error::ImmediatelyDropped + )); + } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs index 3cbb8fa4871d..a35d68120a3a 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view.rs @@ -28,7 +28,7 @@ use crate::{ common::log_xt::log_xt_trace, graph::{ self, base_pool::TimedTransactionSource, watcher::Watcher, ExtrinsicFor, ExtrinsicHash, - IsValidator, ValidatedTransaction, ValidatedTransactionFor, + IsValidator, ValidatedPoolSubmitOutcome, ValidatedTransaction, ValidatedTransactionFor, }, LOG_TARGET, }; @@ -158,7 +158,7 @@ where pub(super) async fn submit_many( &self, xts: impl IntoIterator)>, - ) -> Vec, ChainApi::Error>> { + ) -> Vec, ChainApi::Error>> { if log::log_enabled!(target: LOG_TARGET, log::Level::Trace) { let xts = xts.into_iter().collect::>(); log_xt_trace!(target: LOG_TARGET, xts.iter().map(|(_,xt)| self.pool.validated_pool().api().hash_and_length(xt).0), "[{:?}] view::submit_many at:{}", self.at.hash); @@ -173,7 +173,7 @@ where &self, source: TimedTransactionSource, xt: ExtrinsicFor, - ) -> Result, ExtrinsicHash>, ChainApi::Error> { + ) -> Result, ChainApi::Error> { log::trace!(target: LOG_TARGET, "[{:?}] view::submit_and_watch at:{}", self.pool.validated_pool().api().hash_and_length(&xt).0, self.at.hash); self.pool.submit_and_watch(&self.at, source, xt).await } @@ -182,7 +182,7 @@ where pub(super) fn submit_local( &self, xt: ExtrinsicFor, - ) -> Result, ChainApi::Error> { + ) -> Result, ChainApi::Error> { let (hash, length) = self.pool.validated_pool().api().hash_and_length(&xt); log::trace!(target: LOG_TARGET, "[{:?}] view::submit_local at:{}", hash, self.at.hash); @@ -460,4 +460,18 @@ where const IGNORE_BANNED: bool = false; self.pool.validated_pool().check_is_known(tx_hash, IGNORE_BANNED).is_err() } + + /// Removes the whole transaction subtree from the inner pool. + /// + /// Refer to [`crate::graph::ValidatedPool::remove_subtree`] for more details. + pub fn remove_subtree( + &self, + tx_hash: ExtrinsicHash, + listener_action: F, + ) -> Vec> + where + F: Fn(&mut crate::graph::Listener, ExtrinsicHash), + { + self.pool.validated_pool().remove_subtree(tx_hash, listener_action) + } } diff --git a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs index a06c051f0a7e..43ed5bbf8869 100644 --- a/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs +++ b/substrate/client/transaction-pool/src/fork_aware_txpool/view_store.rs @@ -27,7 +27,7 @@ use crate::{ graph::{ self, base_pool::{TimedTransactionSource, Transaction}, - ExtrinsicFor, ExtrinsicHash, TransactionFor, + BaseSubmitOutcome, ExtrinsicFor, ExtrinsicHash, TransactionFor, ValidatedPoolSubmitOutcome, }, ReadyIteratorFor, LOG_TARGET, }; @@ -38,20 +38,18 @@ use sc_transaction_pool_api::{error::Error as PoolError, PoolStatus}; use sp_blockchain::TreeRoute; use sp_runtime::{generic::BlockId, traits::Block as BlockT}; use std::{ - collections::{hash_map::Entry, HashMap}, + collections::{hash_map::Entry, HashMap, HashSet}, sync::Arc, time::Instant, }; -/// Helper struct to keep the context for transaction replacements. +/// Helper struct to maintain the context for pending transaction submission, executed for +/// newly inserted views. #[derive(Clone)] -struct PendingTxReplacement +struct PendingTxSubmission where ChainApi: graph::ChainApi, { - /// Indicates if the new transaction was already submitted to all the views in the view_store. - /// If true, it can be removed after inserting any new view. - processed: bool, /// New transaction replacing the old one. xt: ExtrinsicFor, /// Source of the transaction. @@ -60,13 +58,84 @@ where watched: bool, } -impl PendingTxReplacement +/// Helper type representing the callback allowing to trigger per-transaction events on +/// `ValidatedPool`'s listener. +type RemovalListener = + Arc, ExtrinsicHash) + Send + Sync>; + +/// Helper struct to maintain the context for pending transaction removal, executed for +/// newly inserted views. +struct PendingTxRemoval +where + ChainApi: graph::ChainApi, +{ + /// Hash of the transaction that will be removed, + xt_hash: ExtrinsicHash, + /// Action that shall be executed on underlying `ValidatedPool`'s listener. + listener_action: RemovalListener, +} + +/// This enum represents an action that should be executed on the newly built +/// view before this view is inserted into the view store. +enum PreInsertAction +where + ChainApi: graph::ChainApi, +{ + /// Represents the action of submitting a new transaction. Intended to use to handle usurped + /// transactions. + SubmitTx(PendingTxSubmission), + + /// Represents the action of removing a subtree of transactions. + RemoveSubtree(PendingTxRemoval), +} + +/// Represents a task awaiting execution, to be performed immediately prior to the view insertion +/// into the view store. +struct PendingPreInsertTask +where + ChainApi: graph::ChainApi, +{ + /// The action to be applied when inserting a new view. + action: PreInsertAction, + /// Indicates if the action was already applied to all the views in the view_store. + /// If true, it can be removed after inserting any new view. + processed: bool, +} + +impl PendingPreInsertTask where ChainApi: graph::ChainApi, { - /// Creates new unprocessed instance of pending transaction replacement. - fn new(xt: ExtrinsicFor, source: TimedTransactionSource, watched: bool) -> Self { - Self { processed: false, xt, source, watched } + /// Creates new unprocessed instance of pending transaction submission. + fn new_submission_action( + xt: ExtrinsicFor, + source: TimedTransactionSource, + watched: bool, + ) -> Self { + Self { + processed: false, + action: PreInsertAction::SubmitTx(PendingTxSubmission { xt, source, watched }), + } + } + + /// Creates new unprocessed instance of pending transaction removal. + fn new_removal_action( + xt_hash: ExtrinsicHash, + listener: RemovalListener, + ) -> Self { + Self { + processed: false, + action: PreInsertAction::RemoveSubtree(PendingTxRemoval { + xt_hash, + listener_action: listener, + }), + } + } + + /// Marks a task as done for every view present in view store. Basically means that can be + /// removed on new view insertion. + fn mark_processed(&mut self) { + self.processed = true; } } @@ -100,9 +169,20 @@ where /// notifcication threads. It is meant to assure that replaced transaction is also removed from /// newly built views in maintain process. /// - /// The map's key is hash of replaced extrinsic. - pending_txs_replacements: - RwLock, PendingTxReplacement>>, + /// The map's key is hash of actionable extrinsic (to avoid duplicated entries). + pending_txs_tasks: RwLock, PendingPreInsertTask>>, +} + +/// Type alias to outcome of submission to `ViewStore`. +pub(super) type ViewStoreSubmitOutcome = + BaseSubmitOutcome>; + +impl From> + for ViewStoreSubmitOutcome +{ + fn from(value: ValidatedPoolSubmitOutcome) -> Self { + Self::new(value.hash(), value.priority()) + } } impl ViewStore @@ -124,7 +204,7 @@ where listener, most_recent_view: RwLock::from(None), dropped_stream_controller, - pending_txs_replacements: Default::default(), + pending_txs_tasks: Default::default(), } } @@ -132,7 +212,7 @@ where pub(super) async fn submit( &self, xts: impl IntoIterator)> + Clone, - ) -> HashMap, ChainApi::Error>>> { + ) -> HashMap, ChainApi::Error>>> { let submit_futures = { let active_views = self.active_views.read(); active_views @@ -140,7 +220,16 @@ where .map(|(_, view)| { let view = view.clone(); let xts = xts.clone(); - async move { (view.at.hash, view.submit_many(xts).await) } + async move { + ( + view.at.hash, + view.submit_many(xts) + .await + .into_iter() + .map(|r| r.map(Into::into)) + .collect::>(), + ) + } }) .collect::>() }; @@ -153,7 +242,7 @@ where pub(super) fn submit_local( &self, xt: ExtrinsicFor, - ) -> Result, ChainApi::Error> { + ) -> Result, ChainApi::Error> { let active_views = self .active_views .read() @@ -168,12 +257,14 @@ where .map(|view| view.submit_local(xt.clone())) .find_or_first(Result::is_ok); - if let Some(Err(err)) = result { - log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err); - return Err(err) - }; - - Ok(tx_hash) + match result { + Some(Err(err)) => { + log::trace!(target: LOG_TARGET, "[{:?}] submit_local: err: {}", tx_hash, err); + Err(err) + }, + None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None)), + Some(Ok(r)) => Ok(r.into()), + } } /// Import a single extrinsic and starts to watch its progress in the pool. @@ -188,7 +279,7 @@ where _at: Block::Hash, source: TimedTransactionSource, xt: ExtrinsicFor, - ) -> Result, ChainApi::Error> { + ) -> Result, ChainApi::Error> { let tx_hash = self.api.hash_and_length(&xt).0; let Some(external_watcher) = self.listener.create_external_watcher_for_tx(tx_hash) else { return Err(PoolError::AlreadyImported(Box::new(tx_hash)).into()) @@ -203,13 +294,13 @@ where let source = source.clone(); async move { match view.submit_and_watch(source, xt).await { - Ok(watcher) => { + Ok(mut result) => { self.listener.add_view_watcher_for_tx( tx_hash, view.at.hash, - watcher.into_stream().boxed(), + result.expect_watcher().into_stream().boxed(), ); - Ok(()) + Ok(result) }, Err(e) => Err(e), } @@ -217,17 +308,20 @@ where }) .collect::>() }; - let maybe_error = futures::future::join_all(submit_and_watch_futures) + let result = futures::future::join_all(submit_and_watch_futures) .await .into_iter() .find_or_first(Result::is_ok); - if let Some(Err(err)) = maybe_error { - log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err); - return Err(err); - }; - - Ok(external_watcher) + match result { + Some(Err(err)) => { + log::trace!(target: LOG_TARGET, "[{:?}] submit_and_watch: err: {}", tx_hash, err); + return Err(err); + }, + Some(Ok(result)) => + Ok(ViewStoreSubmitOutcome::from(result).with_watcher(external_watcher)), + None => Ok(ViewStoreSubmitOutcome::new(tx_hash, None).with_watcher(external_watcher)), + } } /// Returns the pool status for every active view. @@ -575,8 +669,12 @@ where replaced: ExtrinsicHash, watched: bool, ) { - if let Entry::Vacant(entry) = self.pending_txs_replacements.write().entry(replaced) { - entry.insert(PendingTxReplacement::new(xt.clone(), source.clone(), watched)); + if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(replaced) { + entry.insert(PendingPreInsertTask::new_submission_action( + xt.clone(), + source.clone(), + watched, + )); } else { return }; @@ -586,8 +684,8 @@ where self.replace_transaction_in_views(source, xt, xt_hash, replaced, watched).await; - if let Some(replacement) = self.pending_txs_replacements.write().get_mut(&replaced) { - replacement.processed = true; + if let Some(replacement) = self.pending_txs_tasks.write().get_mut(&replaced) { + replacement.mark_processed(); } } @@ -596,18 +694,25 @@ where /// After application, all already processed replacements are removed. async fn apply_pending_tx_replacements(&self, view: Arc>) { let mut futures = vec![]; - for replacement in self.pending_txs_replacements.read().values() { - let xt_hash = self.api.hash_and_length(&replacement.xt).0; - futures.push(self.replace_transaction_in_view( - view.clone(), - replacement.source.clone(), - replacement.xt.clone(), - xt_hash, - replacement.watched, - )); + for replacement in self.pending_txs_tasks.read().values() { + match replacement.action { + PreInsertAction::SubmitTx(ref submission) => { + let xt_hash = self.api.hash_and_length(&submission.xt).0; + futures.push(self.replace_transaction_in_view( + view.clone(), + submission.source.clone(), + submission.xt.clone(), + xt_hash, + submission.watched, + )); + }, + PreInsertAction::RemoveSubtree(ref removal) => { + view.remove_subtree(removal.xt_hash, &*removal.listener_action); + }, + } } let _results = futures::future::join_all(futures).await; - self.pending_txs_replacements.write().retain(|_, r| r.processed); + self.pending_txs_tasks.write().retain(|_, r| r.processed); } /// Submits `xt` to the given view. @@ -623,11 +728,11 @@ where ) { if watched { match view.submit_and_watch(source, xt).await { - Ok(watcher) => { + Ok(mut result) => { self.listener.add_view_watcher_for_tx( xt_hash, view.at.hash, - watcher.into_stream().boxed(), + result.expect_watcher().into_stream().boxed(), ); }, Err(e) => { @@ -690,4 +795,58 @@ where }; let _results = futures::future::join_all(submit_futures).await; } + + /// Removes a transaction subtree from every view in the view_store, starting from the given + /// transaction hash. + /// + /// This function traverses the dependency graph of transactions and removes the specified + /// transaction along with all its descendant transactions from every view. + /// + /// A `listener_action` callback function is invoked for every transaction that is removed, + /// providing a reference to the pool's listener and the hash of the removed transaction. This + /// allows to trigger the required events. Note that listener may be called multiple times for + /// the same hash. + /// + /// Function will also schedule view pre-insertion actions to ensure that transactions will be + /// removed from newly created view. + /// + /// Returns a vector containing the hashes of all removed transactions, including the root + /// transaction specified by `tx_hash`. Vector contains only unique hashes. + pub(super) fn remove_transaction_subtree( + &self, + xt_hash: ExtrinsicHash, + listener_action: F, + ) -> Vec> + where + F: Fn(&mut crate::graph::Listener, ExtrinsicHash) + + Clone + + Send + + Sync + + 'static, + { + if let Entry::Vacant(entry) = self.pending_txs_tasks.write().entry(xt_hash) { + entry.insert(PendingPreInsertTask::new_removal_action( + xt_hash, + Arc::from(listener_action.clone()), + )); + }; + + let mut seen = HashSet::new(); + + let removed = self + .active_views + .read() + .iter() + .chain(self.inactive_views.read().iter()) + .filter(|(_, view)| view.is_imported(&xt_hash)) + .flat_map(|(_, view)| view.remove_subtree(xt_hash, &listener_action)) + .filter(|xt_hash| seen.insert(*xt_hash)) + .collect(); + + if let Some(removal_action) = self.pending_txs_tasks.write().get_mut(&xt_hash) { + removal_action.mark_processed(); + } + + removed + } } diff --git a/substrate/client/transaction-pool/src/graph/base_pool.rs b/substrate/client/transaction-pool/src/graph/base_pool.rs index 04eaa998f42e..3b4afc88b789 100644 --- a/substrate/client/transaction-pool/src/graph/base_pool.rs +++ b/substrate/client/transaction-pool/src/graph/base_pool.rs @@ -453,27 +453,29 @@ impl BasePool, _>(|worst, current| { - let transaction = ¤t.transaction; - worst - .map(|worst| { - // Here we don't use `TransactionRef`'s ordering implementation because - // while it prefers priority like need here, it also prefers older - // transactions for inclusion purposes and limit enforcement needs to prefer - // newer transactions instead and drop the older ones. - match worst.transaction.priority.cmp(&transaction.transaction.priority) { - Ordering::Less => worst, - Ordering::Equal => - if worst.insertion_id > transaction.insertion_id { - transaction.clone() - } else { - worst - }, - Ordering::Greater => transaction.clone(), - } - }) - .or_else(|| Some(transaction.clone())) - }); + let worst = + self.ready.fold::>, _>(None, |worst, current| { + let transaction = ¤t.transaction; + worst + .map(|worst| { + // Here we don't use `TransactionRef`'s ordering implementation because + // while it prefers priority like need here, it also prefers older + // transactions for inclusion purposes and limit enforcement needs to + // prefer newer transactions instead and drop the older ones. + match worst.transaction.priority.cmp(&transaction.transaction.priority) + { + Ordering::Less => worst, + Ordering::Equal => + if worst.insertion_id > transaction.insertion_id { + transaction.clone() + } else { + worst + }, + Ordering::Greater => transaction.clone(), + } + }) + .or_else(|| Some(transaction.clone())) + }); if let Some(worst) = worst { removed.append(&mut self.remove_subtree(&[worst.transaction.hash.clone()])) diff --git a/substrate/client/transaction-pool/src/graph/listener.rs b/substrate/client/transaction-pool/src/graph/listener.rs index 41daf5491f70..7b09ee4c6409 100644 --- a/substrate/client/transaction-pool/src/graph/listener.rs +++ b/substrate/client/transaction-pool/src/graph/listener.rs @@ -126,8 +126,8 @@ impl Listener Pool { &self, at: &HashAndNumber, xts: impl IntoIterator)>, - ) -> Vec, B::Error>> { + ) -> Vec, B::Error>> { let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::Yes).await; self.validated_pool.submit(validated_transactions.into_values()) } @@ -216,7 +216,7 @@ impl Pool { &self, at: &HashAndNumber, xts: impl IntoIterator)>, - ) -> Vec, B::Error>> { + ) -> Vec, B::Error>> { let validated_transactions = self.verify(at, xts, CheckBannedBeforeVerify::No).await; self.validated_pool.submit(validated_transactions.into_values()) } @@ -227,7 +227,7 @@ impl Pool { at: &HashAndNumber, source: base::TimedTransactionSource, xt: ExtrinsicFor, - ) -> Result, B::Error> { + ) -> Result, B::Error> { let res = self.submit_at(at, std::iter::once((source, xt))).await.pop(); res.expect("One extrinsic passed; one result returned; qed") } @@ -238,7 +238,7 @@ impl Pool { at: &HashAndNumber, source: base::TimedTransactionSource, xt: ExtrinsicFor, - ) -> Result, ExtrinsicHash>, B::Error> { + ) -> Result, B::Error> { let (_, tx) = self .verify_one(at.hash, at.number, source, xt, CheckBannedBeforeVerify::Yes) .await; @@ -432,7 +432,7 @@ impl Pool { } /// Returns future that validates single transaction at given block. - async fn verify_one( + pub(crate) async fn verify_one( &self, block_hash: ::Hash, block_number: NumberFor, @@ -539,6 +539,7 @@ mod tests { .into(), ), ) + .map(|outcome| outcome.hash()) .unwrap(); // then @@ -567,7 +568,10 @@ mod tests { // when let txs = txs.into_iter().map(|x| (SOURCE, Arc::from(x))).collect::>(); - let hashes = block_on(pool.submit_at(&api.expect_hash_and_number(0), txs)); + let hashes = block_on(pool.submit_at(&api.expect_hash_and_number(0), txs)) + .into_iter() + .map(|r| r.map(|o| o.hash())) + .collect::>(); log::debug!("--> {hashes:#?}"); // then @@ -591,7 +595,8 @@ mod tests { // when pool.validated_pool.ban(&Instant::now(), vec![pool.hash_of(&uxt)]); - let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into())); + let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into())) + .map(|o| o.hash()); assert_eq!(pool.validated_pool().status().ready, 0); assert_eq!(pool.validated_pool().status().future, 0); @@ -614,7 +619,8 @@ mod tests { let uxt = ExtrinsicBuilder::new_include_data(vec![42]).build(); // when - let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into())); + let res = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, uxt.into())) + .map(|o| o.hash()); // then assert_matches!(res.unwrap_err(), error::Error::Unactionable); @@ -642,7 +648,8 @@ mod tests { .into(), ), ) - .unwrap(); + .unwrap() + .hash(); let hash1 = block_on( pool.submit_one( &han_of_block0, @@ -656,7 +663,8 @@ mod tests { .into(), ), ) - .unwrap(); + .unwrap() + .hash(); // future doesn't count let _hash = block_on( pool.submit_one( @@ -671,7 +679,8 @@ mod tests { .into(), ), ) - .unwrap(); + .unwrap() + .hash(); assert_eq!(pool.validated_pool().status().ready, 2); assert_eq!(pool.validated_pool().status().future, 1); @@ -704,7 +713,8 @@ mod tests { .into(), ), ) - .unwrap(); + .unwrap() + .hash(); let hash2 = block_on( pool.submit_one( &han_of_block0, @@ -718,7 +728,8 @@ mod tests { .into(), ), ) - .unwrap(); + .unwrap() + .hash(); let hash3 = block_on( pool.submit_one( &han_of_block0, @@ -732,7 +743,8 @@ mod tests { .into(), ), ) - .unwrap(); + .unwrap() + .hash(); // when pool.validated_pool.clear_stale(&api.expect_hash_and_number(5)); @@ -764,7 +776,8 @@ mod tests { .into(), ), ) - .unwrap(); + .unwrap() + .hash(); // when block_on(pool.prune_tags(&api.expect_hash_and_number(1), vec![vec![0]], vec![hash1])); @@ -792,8 +805,9 @@ mod tests { let api = Arc::new(TestApi::default()); let pool = Pool::new_with_staticly_sized_rotator(options, true.into(), api.clone()); - let hash1 = - block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into())).unwrap(); + let hash1 = block_on(pool.submit_one(&api.expect_hash_and_number(0), SOURCE, xt.into())) + .unwrap() + .hash(); assert_eq!(pool.validated_pool().status().future, 1); // when @@ -810,7 +824,8 @@ mod tests { .into(), ), ) - .unwrap(); + .unwrap() + .hash(); // then assert_eq!(pool.validated_pool().status().future, 1); @@ -842,6 +857,7 @@ mod tests { .into(), ), ) + .map(|o| o.hash()) .unwrap_err(); // then @@ -868,6 +884,7 @@ mod tests { .into(), ), ) + .map(|o| o.hash()) .unwrap_err(); // then @@ -896,7 +913,8 @@ mod tests { .into(), ), ) - .unwrap(); + .unwrap() + .expect_watcher(); assert_eq!(pool.validated_pool().status().ready, 1); assert_eq!(pool.validated_pool().status().future, 0); @@ -933,7 +951,8 @@ mod tests { .into(), ), ) - .unwrap(); + .unwrap() + .expect_watcher(); assert_eq!(pool.validated_pool().status().ready, 1); assert_eq!(pool.validated_pool().status().future, 0); @@ -972,7 +991,8 @@ mod tests { .into(), ), ) - .unwrap(); + .unwrap() + .expect_watcher(); assert_eq!(pool.validated_pool().status().ready, 0); assert_eq!(pool.validated_pool().status().future, 1); @@ -1011,7 +1031,8 @@ mod tests { }); let watcher = block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into())) - .unwrap(); + .unwrap() + .expect_watcher(); assert_eq!(pool.validated_pool().status().ready, 1); // when @@ -1036,7 +1057,8 @@ mod tests { }); let watcher = block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, uxt.into())) - .unwrap(); + .unwrap() + .expect_watcher(); assert_eq!(pool.validated_pool().status().ready, 1); // when @@ -1069,7 +1091,8 @@ mod tests { }); let watcher = block_on(pool.submit_and_watch(&api.expect_hash_and_number(0), SOURCE, xt.into())) - .unwrap(); + .unwrap() + .expect_watcher(); assert_eq!(pool.validated_pool().status().ready, 1); // when @@ -1136,7 +1159,9 @@ mod tests { // after validation `IncludeData` will have priority set to 9001 // (validate_transaction mock) let xt = ExtrinsicBuilder::new_include_data(Vec::new()).build(); - block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())).unwrap(); + block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())) + .unwrap() + .expect_watcher(); assert_eq!(pool.validated_pool().status().ready, 1); // after validation `Transfer` will have priority set to 4 (validate_transaction @@ -1147,8 +1172,9 @@ mod tests { amount: 5, nonce: 0, }); - let watcher = - block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())).unwrap(); + let watcher = block_on(pool.submit_and_watch(&han_of_block0, SOURCE, xt.into())) + .unwrap() + .expect_watcher(); assert_eq!(pool.validated_pool().status().ready, 2); // when diff --git a/substrate/client/transaction-pool/src/graph/ready.rs b/substrate/client/transaction-pool/src/graph/ready.rs index 9061d0e25581..b8aef99e638d 100644 --- a/substrate/client/transaction-pool/src/graph/ready.rs +++ b/substrate/client/transaction-pool/src/graph/ready.rs @@ -232,12 +232,10 @@ impl ReadyTransactions { Ok(replaced) } - /// Fold a list of ready transactions to compute a single value. - pub fn fold, &ReadyTx) -> Option>( - &mut self, - f: F, - ) -> Option { - self.ready.read().values().fold(None, f) + /// Fold a list of ready transactions to compute a single value using initial value of + /// accumulator. + pub fn fold) -> R>(&self, init: R, f: F) -> R { + self.ready.read().values().fold(init, f) } /// Returns true if given transaction is part of the queue. diff --git a/substrate/client/transaction-pool/src/graph/tracked_map.rs b/substrate/client/transaction-pool/src/graph/tracked_map.rs index 6c3bbbf34b55..fe15c6eca308 100644 --- a/substrate/client/transaction-pool/src/graph/tracked_map.rs +++ b/substrate/client/transaction-pool/src/graph/tracked_map.rs @@ -173,6 +173,11 @@ where pub fn len(&mut self) -> usize { self.inner_guard.len() } + + /// Returns an iterator over all key-value pairs. + pub fn iter(&self) -> Iter<'_, K, V> { + self.inner_guard.iter() + } } #[cfg(test)] diff --git a/substrate/client/transaction-pool/src/graph/validated_pool.rs b/substrate/client/transaction-pool/src/graph/validated_pool.rs index 3f7bf4773de7..bc2b07896dba 100644 --- a/substrate/client/transaction-pool/src/graph/validated_pool.rs +++ b/substrate/client/transaction-pool/src/graph/validated_pool.rs @@ -18,25 +18,22 @@ use std::{ collections::{HashMap, HashSet}, - hash, sync::Arc, }; use crate::{common::log_xt::log_xt_trace, LOG_TARGET}; use futures::channel::mpsc::{channel, Sender}; use parking_lot::{Mutex, RwLock}; -use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions}; -use serde::Serialize; +use sc_transaction_pool_api::{error, PoolStatus, ReadyTransactions, TransactionPriority}; use sp_blockchain::HashAndNumber; use sp_runtime::{ - traits::{self, SaturatedConversion}, + traits::SaturatedConversion, transaction_validity::{TransactionTag as Tag, ValidTransaction}, }; use std::time::Instant; use super::{ base_pool::{self as base, PruneStatus}, - listener::Listener, pool::{ BlockHash, ChainApi, EventStream, ExtrinsicFor, ExtrinsicHash, Options, TransactionFor, }, @@ -79,12 +76,23 @@ impl ValidatedTransaction { valid_till: at.saturated_into::().saturating_add(validity.longevity), }) } + + /// Returns priority for valid transaction, None if transaction is not valid. + pub fn priority(&self) -> Option { + match self { + ValidatedTransaction::Valid(base::Transaction { priority, .. }) => Some(*priority), + _ => None, + } + } } -/// A type of validated transaction stored in the pool. +/// A type of validated transaction stored in the validated pool. pub type ValidatedTransactionFor = ValidatedTransaction, ExtrinsicFor, ::Error>; +/// A type alias representing ValidatedPool listener for given ChainApi type. +pub type Listener = super::listener::Listener, B>; + /// A closure that returns true if the local node is a validator that can author blocks. #[derive(Clone)] pub struct IsValidator(Arc bool + Send + Sync>>); @@ -101,12 +109,56 @@ impl From bool + Send + Sync>> for IsValidator { } } +/// Represents the result of `submit` or `submit_and_watch` operations. +pub struct BaseSubmitOutcome { + /// The hash of the submitted transaction. + hash: ExtrinsicHash, + /// A transaction watcher. This is `Some` for `submit_and_watch` and `None` for `submit`. + watcher: Option, + + /// The priority of the transaction. Defaults to None if unknown. + priority: Option, +} + +/// Type alias to outcome of submission to `ValidatedPool`. +pub type ValidatedPoolSubmitOutcome = + BaseSubmitOutcome, ExtrinsicHash>>; + +impl BaseSubmitOutcome { + /// Creates a new instance with given hash and priority. + pub fn new(hash: ExtrinsicHash, priority: Option) -> Self { + Self { hash, priority, watcher: None } + } + + /// Sets the transaction watcher. + pub fn with_watcher(mut self, watcher: W) -> Self { + self.watcher = Some(watcher); + self + } + + /// Provides priority of submitted transaction. + pub fn priority(&self) -> Option { + self.priority + } + + /// Provides hash of submitted transaction. + pub fn hash(&self) -> ExtrinsicHash { + self.hash + } + + /// Provides a watcher. Should only be called on outcomes of `submit_and_watch`. Otherwise will + /// panic (that would mean logical error in program). + pub fn expect_watcher(&mut self) -> W { + self.watcher.take().expect("watcher was set in submit_and_watch. qed") + } +} + /// Pool that deals with validated transactions. pub struct ValidatedPool { api: Arc, is_validator: IsValidator, options: Options, - listener: RwLock, B>>, + listener: RwLock>, pub(crate) pool: RwLock, ExtrinsicFor>>, import_notification_sinks: Mutex>>>, rotator: PoolRotator>, @@ -200,7 +252,7 @@ impl ValidatedPool { pub fn submit( &self, txs: impl IntoIterator>, - ) -> Vec, B::Error>> { + ) -> Vec, B::Error>> { let results = txs .into_iter() .map(|validated_tx| self.submit_one(validated_tx)) @@ -216,7 +268,7 @@ impl ValidatedPool { results .into_iter() .map(|res| match res { - Ok(ref hash) if removed.contains(hash) => + Ok(outcome) if removed.contains(&outcome.hash) => Err(error::Error::ImmediatelyDropped.into()), other => other, }) @@ -224,9 +276,13 @@ impl ValidatedPool { } /// Submit single pre-validated transaction to the pool. - fn submit_one(&self, tx: ValidatedTransactionFor) -> Result, B::Error> { + fn submit_one( + &self, + tx: ValidatedTransactionFor, + ) -> Result, B::Error> { match tx { ValidatedTransaction::Valid(tx) => { + let priority = tx.priority; log::trace!(target: LOG_TARGET, "[{:?}] ValidatedPool::submit_one", tx.hash); if !tx.propagate && !(self.is_validator.0)() { return Err(error::Error::Unactionable.into()) @@ -254,7 +310,7 @@ impl ValidatedPool { let mut listener = self.listener.write(); fire_events(&mut *listener, &imported); - Ok(*imported.hash()) + Ok(ValidatedPoolSubmitOutcome::new(*imported.hash(), Some(priority))) }, ValidatedTransaction::Invalid(hash, err) => { log::trace!(target: LOG_TARGET, "[{:?}] ValidatedPool::submit_one invalid: {:?}", hash, err); @@ -305,7 +361,7 @@ impl ValidatedPool { // run notifications let mut listener = self.listener.write(); for h in &removed { - listener.limit_enforced(h); + listener.limits_enforced(h); } removed @@ -318,7 +374,7 @@ impl ValidatedPool { pub fn submit_and_watch( &self, tx: ValidatedTransactionFor, - ) -> Result, ExtrinsicHash>, B::Error> { + ) -> Result, B::Error> { match tx { ValidatedTransaction::Valid(tx) => { let hash = self.api.hash_and_length(&tx.data).0; @@ -326,7 +382,7 @@ impl ValidatedPool { self.submit(std::iter::once(ValidatedTransaction::Valid(tx))) .pop() .expect("One extrinsic passed; one result returned; qed") - .map(|_| watcher) + .map(|outcome| outcome.with_watcher(watcher)) }, ValidatedTransaction::Invalid(hash, err) => { self.rotator.ban(&Instant::now(), std::iter::once(hash)); @@ -711,11 +767,42 @@ impl ValidatedPool { listener.future(&f.hash); }); } + + /// Removes a transaction subtree from the pool, starting from the given transaction hash. + /// + /// This function traverses the dependency graph of transactions and removes the specified + /// transaction along with all its descendant transactions from the pool. + /// + /// A `listener_action` callback function is invoked for every transaction that is removed, + /// providing a reference to the pool's listener and the hash of the removed transaction. This + /// allows to trigger the required events. + /// + /// Returns a vector containing the hashes of all removed transactions, including the root + /// transaction specified by `tx_hash`. + pub fn remove_subtree( + &self, + tx_hash: ExtrinsicHash, + listener_action: F, + ) -> Vec> + where + F: Fn(&mut Listener, ExtrinsicHash), + { + self.pool + .write() + .remove_subtree(&[tx_hash]) + .into_iter() + .map(|tx| { + let removed_tx_hash = tx.hash; + let mut listener = self.listener.write(); + listener_action(&mut *listener, removed_tx_hash); + removed_tx_hash + }) + .collect::>() + } } -fn fire_events(listener: &mut Listener, imported: &base::Imported) +fn fire_events(listener: &mut Listener, imported: &base::Imported, Ex>) where - H: hash::Hash + Eq + traits::Member + Serialize, B: ChainApi, { match *imported { diff --git a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs index caa09585b28b..2a691ae35eaf 100644 --- a/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs +++ b/substrate/client/transaction-pool/src/single_state_txpool/revalidation.rs @@ -405,7 +405,8 @@ mod tests { TimedTransactionSource::new_external(false), uxt.clone().into(), )) - .expect("Should be valid"); + .expect("Should be valid") + .hash(); block_on(queue.revalidate_later(han_of_block0.hash, vec![uxt_hash])); @@ -448,7 +449,7 @@ mod tests { vec![(source.clone(), uxt0.into()), (source, uxt1.into())], )) .into_iter() - .map(|r| r.expect("Should be valid")) + .map(|r| r.expect("Should be valid").hash()) .collect::>(); assert_eq!(api.validation_requests().len(), 2); diff --git a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs index 2b32704945c7..3598f9dbc2af 100644 --- a/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs +++ b/substrate/client/transaction-pool/src/single_state_txpool/single_state_txpool.rs @@ -274,7 +274,12 @@ where let number = self.api.resolve_block_number(at); let at = HashAndNumber { hash: at, number: number? }; - Ok(pool.submit_at(&at, xts).await) + Ok(pool + .submit_at(&at, xts) + .await + .into_iter() + .map(|result| result.map(|outcome| outcome.hash())) + .collect()) } async fn submit_one( @@ -292,6 +297,7 @@ where let at = HashAndNumber { hash: at, number: number? }; pool.submit_one(&at, TimedTransactionSource::from_transaction_source(source, false), xt) .await + .map(|outcome| outcome.hash()) } async fn submit_and_watch( @@ -308,15 +314,13 @@ where let number = self.api.resolve_block_number(at); let at = HashAndNumber { hash: at, number: number? }; - let watcher = pool - .submit_and_watch( - &at, - TimedTransactionSource::from_transaction_source(source, false), - xt, - ) - .await?; - - Ok(watcher.into_stream().boxed()) + pool.submit_and_watch( + &at, + TimedTransactionSource::from_transaction_source(source, false), + xt, + ) + .await + .map(|mut outcome| outcome.expect_watcher().into_stream().boxed()) } fn remove_invalid(&self, hashes: &[TxHash]) -> Vec> { @@ -484,7 +488,11 @@ where validity, ); - self.pool.validated_pool().submit(vec![validated]).remove(0) + self.pool + .validated_pool() + .submit(vec![validated]) + .remove(0) + .map(|outcome| outcome.hash()) } } diff --git a/substrate/client/transaction-pool/tests/fatp_common/mod.rs b/substrate/client/transaction-pool/tests/fatp_common/mod.rs index aaffebc0db0a..530c25caf88e 100644 --- a/substrate/client/transaction-pool/tests/fatp_common/mod.rs +++ b/substrate/client/transaction-pool/tests/fatp_common/mod.rs @@ -192,12 +192,9 @@ macro_rules! assert_ready_iterator { let output: Vec<_> = ready_iterator.collect(); log::debug!(target:LOG_TARGET, "expected: {:#?}", expected); log::debug!(target:LOG_TARGET, "output: {:#?}", output); + let output = output.into_iter().map(|t|t.hash).collect::>(); assert_eq!(expected.len(), output.len()); - assert!( - output.iter().zip(expected.iter()).all(|(o,e)| { - o.hash == *e - }) - ); + assert_eq!(output,expected); }}; } @@ -215,6 +212,18 @@ macro_rules! assert_future_iterator { }}; } +#[macro_export] +macro_rules! assert_watcher_stream { + ($stream:ident, [$( $event:expr ),*]) => {{ + let expected = vec![ $($event),*]; + log::debug!(target:LOG_TARGET, "expected: {:#?} {}, block now:", expected, expected.len()); + let output = futures::executor::block_on_stream($stream).take(expected.len()).collect::>(); + log::debug!(target:LOG_TARGET, "output: {:#?}", output); + assert_eq!(expected.len(), output.len()); + assert_eq!(output, expected); + }}; +} + pub const SOURCE: TransactionSource = TransactionSource::External; #[cfg(test)] diff --git a/substrate/client/transaction-pool/tests/fatp_prios.rs b/substrate/client/transaction-pool/tests/fatp_prios.rs index 4ed9b4503861..af5e7e8c5a6a 100644 --- a/substrate/client/transaction-pool/tests/fatp_prios.rs +++ b/substrate/client/transaction-pool/tests/fatp_prios.rs @@ -20,13 +20,15 @@ pub mod fatp_common; -use fatp_common::{new_best_block_event, TestPoolBuilder, LOG_TARGET, SOURCE}; +use fatp_common::{invalid_hash, new_best_block_event, TestPoolBuilder, LOG_TARGET, SOURCE}; use futures::{executor::block_on, FutureExt}; use sc_transaction_pool::ChainApi; -use sc_transaction_pool_api::{MaintainedTransactionPool, TransactionPool, TransactionStatus}; +use sc_transaction_pool_api::{ + error::Error as TxPoolError, LocalTransactionPool, MaintainedTransactionPool, TransactionPool, + TransactionStatus, +}; use substrate_test_runtime_client::Sr25519Keyring::*; use substrate_test_runtime_transaction_pool::uxt; - #[test] fn fatp_prio_ready_higher_evicts_lower() { sp_tracing::try_init_simple(); @@ -247,3 +249,312 @@ fn fatp_prio_watcher_future_lower_prio_gets_dropped_from_all_views() { assert_ready_iterator!(header01.hash(), pool, [xt2, xt1]); assert_ready_iterator!(header02.hash(), pool, [xt2, xt1]); } + +#[test] +fn fatp_prios_watcher_full_mempool_higher_prio_is_accepted() { + sp_tracing::try_init_simple(); + + let builder = TestPoolBuilder::new(); + let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(2).build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 400); + api.set_nonce(api.genesis_hash(), Dave.into(), 500); + api.set_nonce(api.genesis_hash(), Eve.into(), 600); + api.set_nonce(api.genesis_hash(), Ferdie.into(), 700); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Bob, 300); + let xt2 = uxt(Charlie, 400); + + let xt3 = uxt(Dave, 500); + + let xt4 = uxt(Eve, 600); + let xt5 = uxt(Ferdie, 700); + + api.set_priority(&xt0, 1); + api.set_priority(&xt1, 2); + api.set_priority(&xt2, 3); + api.set_priority(&xt3, 4); + + api.set_priority(&xt4, 5); + api.set_priority(&xt5, 6); + + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + + assert_pool_status!(header01.hash(), &pool, 2, 0); + assert_eq!(pool.mempool_len().1, 2); + + let header02 = api.push_block_with_parent(header01.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash()))); + + let _xt2_watcher = + block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + let _xt3_watcher = + block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + + assert_pool_status!(header02.hash(), &pool, 2, 0); + assert_eq!(pool.mempool_len().1, 4); + + let header03 = api.push_block_with_parent(header02.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header02.hash()), header03.hash()))); + + let _xt4_watcher = + block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap(); + let _xt5_watcher = + block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt5.clone())).unwrap(); + + assert_pool_status!(header03.hash(), &pool, 2, 0); + assert_eq!(pool.mempool_len().1, 4); + + assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]); + assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]); + + assert_ready_iterator!(header01.hash(), pool, []); + assert_ready_iterator!(header02.hash(), pool, [xt3, xt2]); + assert_ready_iterator!(header03.hash(), pool, [xt5, xt4]); +} + +#[test] +fn fatp_prios_watcher_full_mempool_higher_prio_is_accepted_with_subtree() { + sp_tracing::try_init_simple(); + + let builder = TestPoolBuilder::new(); + let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(4).build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 400); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Alice, 201); + let xt2 = uxt(Alice, 202); + let xt3 = uxt(Bob, 300); + let xt4 = uxt(Charlie, 400); + + api.set_priority(&xt0, 1); + api.set_priority(&xt1, 3); + api.set_priority(&xt2, 3); + api.set_priority(&xt3, 2); + api.set_priority(&xt4, 2); + + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + + assert_ready_iterator!(header01.hash(), pool, [xt3, xt0, xt1, xt2]); + assert_pool_status!(header01.hash(), &pool, 4, 0); + assert_eq!(pool.mempool_len().1, 4); + + let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap(); + assert_pool_status!(header01.hash(), &pool, 2, 0); + assert_ready_iterator!(header01.hash(), pool, [xt3, xt4]); + + assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]); + assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]); + assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]); + assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt4_watcher, [TransactionStatus::Ready]); +} + +#[test] +fn fatp_prios_watcher_full_mempool_higher_prio_is_accepted_with_subtree2() { + sp_tracing::try_init_simple(); + + let builder = TestPoolBuilder::new(); + let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(4).build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 400); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Alice, 201); + let xt2 = uxt(Alice, 202); + let xt3 = uxt(Bob, 300); + let xt4 = uxt(Charlie, 400); + + api.set_priority(&xt0, 1); + api.set_priority(&xt1, 3); + api.set_priority(&xt2, 3); + api.set_priority(&xt3, 2); + api.set_priority(&xt4, 2); + + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + + assert_ready_iterator!(header01.hash(), pool, [xt3, xt0, xt1, xt2]); + assert_pool_status!(header01.hash(), &pool, 4, 0); + assert_eq!(pool.mempool_len().1, 4); + + let header02 = api.push_block_with_parent(header01.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash()))); + + let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap(); + assert_ready_iterator!(header01.hash(), pool, [xt3]); + assert_pool_status!(header02.hash(), &pool, 2, 0); + assert_ready_iterator!(header02.hash(), pool, [xt3, xt4]); + + assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]); + assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]); + assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]); + assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt4_watcher, [TransactionStatus::Ready]); +} + +#[test] +fn fatp_prios_watcher_full_mempool_lower_prio_gets_rejected() { + sp_tracing::try_init_simple(); + + let builder = TestPoolBuilder::new(); + let (pool, api, _) = builder.with_mempool_count_limit(2).with_ready_count(2).build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 400); + api.set_nonce(api.genesis_hash(), Dave.into(), 500); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Bob, 300); + let xt2 = uxt(Charlie, 400); + let xt3 = uxt(Dave, 500); + + api.set_priority(&xt0, 2); + api.set_priority(&xt1, 2); + api.set_priority(&xt2, 2); + api.set_priority(&xt3, 1); + + let _xt0_watcher = + block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let _xt1_watcher = + block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + + assert_pool_status!(header01.hash(), &pool, 2, 0); + assert_eq!(pool.mempool_len().1, 2); + + let header02 = api.push_block_with_parent(header01.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash()))); + + assert_pool_status!(header02.hash(), &pool, 2, 0); + assert_eq!(pool.mempool_len().1, 2); + + assert_ready_iterator!(header01.hash(), pool, [xt0, xt1]); + assert_ready_iterator!(header02.hash(), pool, [xt0, xt1]); + + let result2 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).map(|_| ()); + assert!(matches!(result2.as_ref().unwrap_err().0, TxPoolError::ImmediatelyDropped)); + let result3 = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).map(|_| ()); + assert!(matches!(result3.as_ref().unwrap_err().0, TxPoolError::ImmediatelyDropped)); +} + +#[test] +fn fatp_prios_watcher_full_mempool_does_not_keep_dropped_transaction() { + sp_tracing::try_init_simple(); + + let builder = TestPoolBuilder::new(); + let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(2).build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 400); + api.set_nonce(api.genesis_hash(), Dave.into(), 500); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Bob, 300); + let xt2 = uxt(Charlie, 400); + let xt3 = uxt(Dave, 500); + + api.set_priority(&xt0, 2); + api.set_priority(&xt1, 2); + api.set_priority(&xt2, 2); + api.set_priority(&xt3, 2); + + let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap(); + let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap(); + let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap(); + let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap(); + + assert_pool_status!(header01.hash(), &pool, 2, 0); + assert_ready_iterator!(header01.hash(), pool, [xt2, xt3]); + + assert_watcher_stream!(xt0_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]); + assert_watcher_stream!(xt1_watcher, [TransactionStatus::Ready, TransactionStatus::Dropped]); + assert_watcher_stream!(xt2_watcher, [TransactionStatus::Ready]); + assert_watcher_stream!(xt3_watcher, [TransactionStatus::Ready]); +} + +#[test] +fn fatp_prios_submit_local_full_mempool_higher_prio_is_accepted() { + sp_tracing::try_init_simple(); + + let builder = TestPoolBuilder::new(); + let (pool, api, _) = builder.with_mempool_count_limit(4).with_ready_count(2).build(); + api.set_nonce(api.genesis_hash(), Bob.into(), 300); + api.set_nonce(api.genesis_hash(), Charlie.into(), 400); + api.set_nonce(api.genesis_hash(), Dave.into(), 500); + api.set_nonce(api.genesis_hash(), Eve.into(), 600); + api.set_nonce(api.genesis_hash(), Ferdie.into(), 700); + + let header01 = api.push_block(1, vec![], true); + let event = new_best_block_event(&pool, None, header01.hash()); + block_on(pool.maintain(event)); + + let xt0 = uxt(Alice, 200); + let xt1 = uxt(Bob, 300); + let xt2 = uxt(Charlie, 400); + + let xt3 = uxt(Dave, 500); + + let xt4 = uxt(Eve, 600); + let xt5 = uxt(Ferdie, 700); + + api.set_priority(&xt0, 1); + api.set_priority(&xt1, 2); + api.set_priority(&xt2, 3); + api.set_priority(&xt3, 4); + + api.set_priority(&xt4, 5); + api.set_priority(&xt5, 6); + pool.submit_local(invalid_hash(), xt0.clone()).unwrap(); + pool.submit_local(invalid_hash(), xt1.clone()).unwrap(); + + assert_pool_status!(header01.hash(), &pool, 2, 0); + assert_eq!(pool.mempool_len().0, 2); + + let header02 = api.push_block_with_parent(header01.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash()))); + + pool.submit_local(invalid_hash(), xt2.clone()).unwrap(); + pool.submit_local(invalid_hash(), xt3.clone()).unwrap(); + + assert_pool_status!(header02.hash(), &pool, 2, 0); + assert_eq!(pool.mempool_len().0, 4); + + let header03 = api.push_block_with_parent(header02.hash(), vec![], true); + block_on(pool.maintain(new_best_block_event(&pool, Some(header02.hash()), header03.hash()))); + + pool.submit_local(invalid_hash(), xt4.clone()).unwrap(); + pool.submit_local(invalid_hash(), xt5.clone()).unwrap(); + + assert_pool_status!(header03.hash(), &pool, 2, 0); + assert_eq!(pool.mempool_len().0, 4); + + assert_ready_iterator!(header01.hash(), pool, []); + assert_ready_iterator!(header02.hash(), pool, [xt3, xt2]); + assert_ready_iterator!(header03.hash(), pool, [xt5, xt4]); +} diff --git a/substrate/client/transaction-pool/tests/pool.rs b/substrate/client/transaction-pool/tests/pool.rs index de35726435f0..c70f45483314 100644 --- a/substrate/client/transaction-pool/tests/pool.rs +++ b/substrate/client/transaction-pool/tests/pool.rs @@ -158,6 +158,7 @@ fn prune_tags_should_work() { let (pool, api) = pool(); let hash209 = block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt(Alice, 209).into())) + .map(|o| o.hash()) .unwrap(); block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt(Alice, 210).into())) .unwrap(); @@ -184,10 +185,13 @@ fn prune_tags_should_work() { fn should_ban_invalid_transactions() { let (pool, api) = pool(); let uxt = Arc::from(uxt(Alice, 209)); - let hash = - block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone())).unwrap(); + let hash = block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone())) + .unwrap() + .hash(); pool.validated_pool().remove_invalid(&[hash]); - block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone())).unwrap_err(); + block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone())) + .map(|_| ()) + .unwrap_err(); // when let pending: Vec<_> = pool @@ -198,7 +202,9 @@ fn should_ban_invalid_transactions() { assert_eq!(pending, Vec::::new()); // then - block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone())).unwrap_err(); + block_on(pool.submit_one(&api.expect_hash_and_number(0), TSOURCE, uxt.clone())) + .map(|_| ()) + .unwrap_err(); } #[test] diff --git a/substrate/test-utils/runtime/transaction-pool/src/lib.rs b/substrate/test-utils/runtime/transaction-pool/src/lib.rs index 93e5855eefc6..f88694fb1071 100644 --- a/substrate/test-utils/runtime/transaction-pool/src/lib.rs +++ b/substrate/test-utils/runtime/transaction-pool/src/lib.rs @@ -352,9 +352,18 @@ impl ChainApi for TestApi { fn validate_transaction( &self, at: ::Hash, - _source: TransactionSource, + source: TransactionSource, uxt: Arc<::Extrinsic>, ) -> Self::ValidationFuture { + ready(self.validate_transaction_blocking(at, source, uxt)) + } + + fn validate_transaction_blocking( + &self, + at: ::Hash, + _source: TransactionSource, + uxt: Arc<::Extrinsic>, + ) -> Result { let uxt = (*uxt).clone(); self.validation_requests.write().push(uxt.clone()); let block_number; @@ -374,16 +383,12 @@ impl ChainApi for TestApi { // the transaction. (This is not required for this test function, but in real // environment it would fail because of this). if !found_best { - return ready(Ok(Err(TransactionValidityError::Invalid( - InvalidTransaction::Custom(1), - )))) + return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(1)))) } }, Ok(None) => - return ready(Ok(Err(TransactionValidityError::Invalid( - InvalidTransaction::Custom(2), - )))), - Err(e) => return ready(Err(e)), + return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(2)))), + Err(e) => return Err(e), } let (requires, provides) = if let Ok(transfer) = TransferData::try_from(&uxt) { @@ -423,7 +428,7 @@ impl ChainApi for TestApi { if self.enable_stale_check && transfer.nonce < chain_nonce { log::info!("test_api::validate_transaction: invalid_transaction(stale)...."); - return ready(Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Stale)))) + return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Stale))) } (requires, provides) @@ -433,7 +438,7 @@ impl ChainApi for TestApi { if self.chain.read().invalid_hashes.contains(&self.hash_and_length(&uxt).0) { log::info!("test_api::validate_transaction: invalid_transaction...."); - return ready(Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(0))))) + return Ok(Err(TransactionValidityError::Invalid(InvalidTransaction::Custom(0)))) } let priority = self.chain.read().priorities.get(&self.hash_and_length(&uxt).0).cloned(); @@ -447,16 +452,7 @@ impl ChainApi for TestApi { (self.valid_modifier.read())(&mut validity); - ready(Ok(Ok(validity))) - } - - fn validate_transaction_blocking( - &self, - _at: ::Hash, - _source: TransactionSource, - _uxt: Arc<::Extrinsic>, - ) -> Result { - unimplemented!(); + Ok(Ok(validity)) } fn block_id_to_number(