Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fatxpool: proper handling of priorities when mempool is full #6647

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
018f9d4
dropped_watcher: new fns are public
michalkucharczyk Nov 26, 2024
4aa24d3
dropped_watcher: improvement
michalkucharczyk Nov 26, 2024
9fe88a6
fatp: handling higher prio with full mempool
michalkucharczyk Nov 26, 2024
54ae11c
graph: fold_ready improved
michalkucharczyk Nov 26, 2024
a6882eb
graph: make some staff public
michalkucharczyk Nov 26, 2024
8d3dffe
tests added
michalkucharczyk Nov 26, 2024
70fd186
type removed
michalkucharczyk Nov 26, 2024
d3a1a7b
improvements
michalkucharczyk Nov 28, 2024
4246dac
make use of your brain
michalkucharczyk Nov 28, 2024
c203d72
fatp: pending actions now support removals
michalkucharczyk Nov 28, 2024
edb1257
validated_pool: SubmitOutcome
michalkucharczyk Nov 29, 2024
f778176
view/view_store: SubmitOutcome
michalkucharczyk Nov 29, 2024
a72b3f9
mempool: update_transaction stub
michalkucharczyk Nov 29, 2024
c411bb4
fatp: SubmitOutcome
michalkucharczyk Nov 29, 2024
7b461bf
fatp: todo added
michalkucharczyk Nov 29, 2024
8765d2c
single-state txpool: SubmitOutcome integration
michalkucharczyk Nov 29, 2024
e8ccd44
tests: SubmitOutcome fixes
michalkucharczyk Nov 29, 2024
6cca272
mempool: sizes fix
michalkucharczyk Nov 29, 2024
3b17a16
dropping transaction - size limit is properly obeyed now
michalkucharczyk Dec 3, 2024
4f767e5
merge / rebase fixes
michalkucharczyk Dec 4, 2024
6ba133e
mempool: prio is now locked option
michalkucharczyk Dec 4, 2024
46fa1fd
tests added + dead code cleanup
michalkucharczyk Dec 4, 2024
2221d7a
comments cleanup
michalkucharczyk Dec 4, 2024
0244ba0
tweaks
michalkucharczyk Jan 7, 2025
037e016
Merge remote-tracking branch 'origin/master' into mku-fatxpool-mempoo…
michalkucharczyk Jan 7, 2025
5d0283e
review comments
michalkucharczyk Jan 8, 2025
caca2e1
clippy
michalkucharczyk Jan 8, 2025
b86ef05
clean up
michalkucharczyk Jan 8, 2025
736d698
Merge branch 'master' into mku-fatxpool-mempool-priorities-at-limits
michalkucharczyk Jan 8, 2025
4294261
Update from michalkucharczyk running command 'prdoc --bump minor --au…
Jan 9, 2025
b4290cd
Update prdoc/pr_6647.prdoc
michalkucharczyk Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions prdoc/pr_6647.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
title: '`fatxpool`: proper handling of priorities when mempool is full'
doc:
- audience: Node Dev
description: |-
Higher-priority transactions can now replace lower-priority transactions even when the internal _tx_mem_pool_ is full.

**Notes for reviewers:**
- The _tx_mem_pool_ now maintains information about transaction priority. Although _tx_mem_pool_ itself is stateless, transaction priority is updated after submission to the view. An alternative approach could involve validating transactions at the `at` block, but this is computationally expensive. To avoid additional validation overhead, I opted to use the priority obtained from runtime during submission to the view. This is the rationale behind introducing the `SubmitOutcome` struct, which synchronously communicates transaction priority from the view to the pool. This results in a very brief window during which the transaction priority remains unknown - those transaction are not taken into consideration while dropping takes place. In the future, if needed, we could update transaction priority using view revalidation results to keep this information fully up-to-date (as priority of transaction may change with chain-state evolution).
- When _tx_mem_pool_ becomes full (an event anticipated to be rare), transaction priority must be known to perform priority-based removal. In such cases, the most recent block known is utilized for validation. I think that speculative submission to the view and re-using the priority from this submission would be an unnecessary complication.
- Once the priority is determined, lower-priority transactions whose cumulative size meets or exceeds the size of the new transaction are collected to ensure the pool size limit is not exceeded.
- Transaction removed from _tx_mem_pool_ , also needs to be removed from all the views with appropriate event (which is done by `remove_transaction_subtree`). To ensure complete removal, the `PendingTxReplacement` struct was re-factored to more generic `PendingPreInsertTask` (introduced in #6405) which covers removal and submssion of transaction in the view which may be potentially created in the background. This is to ensure that removed transaction will not re-enter to the newly created view.

Closes: #5809
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
crates:
- name: sc-transaction-pool
bump: minor
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ pub struct DroppedTransaction<Hash> {
}

impl<Hash> DroppedTransaction<Hash> {
fn new_usurped(tx_hash: Hash, by: Hash) -> Self {
/// Creates an new instnance with reason set to `DroppedReason::Usurped(by)`.
pub fn new_usurped(tx_hash: Hash, by: Hash) -> Self {
Self { reason: DroppedReason::Usurped(by), tx_hash }
}

fn new_enforced_by_limts(tx_hash: Hash) -> Self {
/// Creates an new instnance with reason set to `DroppedReason::LimitsEnforced`.
pub fn new_enforced_by_limts(tx_hash: Hash) -> Self {
Self { reason: DroppedReason::LimitsEnforced, tx_hash }
}
}
Expand Down Expand Up @@ -257,10 +259,12 @@ where
},
TransactionStatus::Ready | TransactionStatus::InBlock(..) => {
// note: if future transaction was once seens as the ready we may want to treat it
// as ready transactions. Unreferenced future transactions are more likely to be
// removed when the last referencing view is removed then ready transactions.
// Transcaction seen as ready is likely quite close to be included in some
// future fork.
// as ready transaction. The rationale behind this is as follows: we want to remove
// unreferenced future transactions when the last referencing view is removed (to
// avoid clogging mempool). For ready transactions we prefer to keep them in mempool
// even if no view is currently referencing them. Future transcaction once seen as
// ready is likely quite close to be included in some future fork (it is close to be
// ready, so we make exception and treat such transaction as ready).
if let Some(mut views) = self.future_transaction_views.remove(&tx_hash) {
views.insert(block_hash);
self.ready_transaction_views.insert(tx_hash, views);
Expand Down Expand Up @@ -329,14 +333,14 @@ where
let stream_map = futures::stream::unfold(ctx, |mut ctx| async move {
loop {
if let Some(dropped) = ctx.get_pending_dropped_transaction() {
debug!("dropped_watcher: sending out (pending): {dropped:?}");
trace!("dropped_watcher: sending out (pending): {dropped:?}");
return Some((dropped, ctx));
}
tokio::select! {
biased;
Some(event) = next_event(&mut ctx.stream_map) => {
if let Some(dropped) = ctx.handle_event(event.0, event.1) {
debug!("dropped_watcher: sending out: {dropped:?}");
trace!("dropped_watcher: sending out: {dropped:?}");
return Some((dropped, ctx));
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use crate::{
api::FullChainApi,
common::log_xt::log_xt_trace,
enactment_state::{EnactmentAction, EnactmentState},
fork_aware_txpool::{dropped_watcher::DroppedReason, revalidation_worker},
fork_aware_txpool::{
dropped_watcher::{DroppedReason, DroppedTransaction},
revalidation_worker,
},
graph::{
self,
base_pool::{TimedTransactionSource, Transaction},
Expand All @@ -49,8 +52,9 @@ use futures::{
use parking_lot::Mutex;
use prometheus_endpoint::Registry as PrometheusRegistry;
use sc_transaction_pool_api::{
ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolStatus, TransactionFor,
TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
TransactionStatusStreamFor, TxHash,
};
use sp_blockchain::{HashAndNumber, TreeRoute};
use sp_core::traits::SpawnEssentialNamed;
Expand Down Expand Up @@ -287,7 +291,7 @@ where
DroppedReason::LimitsEnforced => {},
};

mempool.remove_dropped_transaction(&dropped_tx_hash).await;
mempool.remove_transaction(&dropped_tx_hash);
view_store.listener.transaction_dropped(dropped);
import_notification_sink.clean_notified_items(&[dropped_tx_hash]);
}
Expand Down Expand Up @@ -598,7 +602,7 @@ where
/// out:
/// [ Ok(xth0), Ok(xth1), Err ]
/// ```
fn reduce_multiview_result<H, E>(input: HashMap<H, Vec<Result<H, E>>>) -> Vec<Result<H, E>> {
fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
let mut values = input.values();
let Some(first) = values.next() else {
return Default::default();
Expand Down Expand Up @@ -650,9 +654,28 @@ where
let mempool_results = self.mempool.extend_unwatched(source, &xts);

if view_store.is_empty() {
return Ok(mempool_results.into_iter().map(|r| r.map(|r| r.hash)).collect::<Vec<_>>())
return Ok(mempool_results
.into_iter()
.map(|r| r.map(|r| r.hash).map_err(Into::into))
.collect::<Vec<_>>())
}

// Submit all the transactions to the mempool
let retries = mempool_results
.into_iter()
.zip(xts.clone())
.map(|(result, xt)| async move {
match result {
Err(TxPoolApiError::ImmediatelyDropped) =>
self.attempt_transaction_replacement(source, false, xt).await,
result @ _ => result,
}
})
.collect::<Vec<_>>();

let mempool_results = futures::future::join_all(retries).await;

// Collect transactions that were successfully submitted to the mempool...
let to_be_submitted = mempool_results
.iter()
.zip(xts)
Expand All @@ -664,22 +687,47 @@ where
self.metrics
.report(|metrics| metrics.submitted_transactions.inc_by(to_be_submitted.len() as _));

// ... and submit them to the view_store. Please note that transaction rejected by mempool
// are not sent here.
let mempool = self.mempool.clone();
let results_map = view_store.submit(to_be_submitted.into_iter()).await;
let mut submission_results = reduce_multiview_result(results_map).into_iter();

// Note for composing final result:
//
// For each failed insertion into the mempool, the mempool result should be placed into
// the returned vector.
//
// For each successful insertion into the mempool, the corresponding
// view_store submission result needs to be examined:
// - If there is an error during view_store submission, the transaction is removed from
// the mempool, and the final result recorded in the vector for this transaction is the
// view_store submission error.
//
// - If the view_store submission is successful, the transaction priority is updated in the
// mempool.
//
// Finally, it collects the hashes of updated transactions or submission errors (either
// from the mempool or view_store) into a returned vector.
Ok(mempool_results
.into_iter()
.map(|result| {
result.and_then(|insertion| {
submission_results
.next()
.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.")
.inspect_err(|_|
mempool.remove(insertion.hash)
)
result
.map_err(Into::into)
.and_then(|insertion| {
submission_results
.next()
.expect("The number of Ok results in mempool is exactly the same as the size of view_store submission result. qed.")
.inspect_err(|_|{
mempool.remove_transaction(&insertion.hash);
})
})

})
.map(|r| r.map(|r| {
mempool.update_transaction_priority(&r);
r.hash()
}))
.collect::<Vec<_>>())
}

Expand Down Expand Up @@ -712,18 +760,27 @@ where
) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count());
let xt = Arc::from(xt);
let InsertionInfo { hash: xt_hash, source: timed_source } =

let InsertionInfo { hash: xt_hash, source: timed_source, .. } =
match self.mempool.push_watched(source, xt.clone()) {
Ok(result) => result,
Err(e) => return Err(e),
Err(TxPoolApiError::ImmediatelyDropped) =>
self.attempt_transaction_replacement(source, true, xt.clone()).await?,
Err(e) => return Err(e.into()),
};

self.metrics.report(|metrics| metrics.submitted_transactions.inc());

self.view_store
.submit_and_watch(at, timed_source, xt)
.await
.inspect_err(|_| self.mempool.remove(xt_hash))
.inspect_err(|_| {
self.mempool.remove_transaction(&xt_hash);
})
.map(|mut outcome| {
self.mempool.update_transaction_priority(&outcome);
outcome.expect_watcher()
})
}

/// Intended to remove transactions identified by the given hashes, and any dependent
Expand Down Expand Up @@ -857,7 +914,13 @@ where
.extend_unwatched(TransactionSource::Local, &[xt.clone()])
.remove(0)?;

self.view_store.submit_local(xt).or_else(|_| Ok(xt_hash))
self.view_store
.submit_local(xt)
.map(|outcome| {
self.mempool.update_transaction_priority(&outcome);
outcome.hash()
})
.or_else(|_| Ok(xt_hash))
}
}

Expand Down Expand Up @@ -1109,7 +1172,11 @@ where
.await
.into_iter()
.zip(hashes)
.map(|(result, tx_hash)| result.or_else(|_| Err(tx_hash)))
.map(|(result, tx_hash)| {
result
.map(|outcome| self.mempool.update_transaction_priority(&outcome.into()))
.or_else(|_| Err(tx_hash))
})
.collect::<Vec<_>>();

let submitted_count = watched_results.len();
Expand All @@ -1131,7 +1198,7 @@ where
for result in watched_results {
if let Err(tx_hash) = result {
self.view_store.listener.invalidate_transactions(&[tx_hash]);
self.mempool.remove(tx_hash);
self.mempool.remove_transaction(&tx_hash);
}
}
}
Expand Down Expand Up @@ -1263,6 +1330,64 @@ where
fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
self.api.hash_and_length(xt).0
}

/// Attempts to find and replace a lower-priority transaction in the transaction pool with a new
/// one.
///
/// This asynchronous function verifies the new transaction against the most recent view. If a
/// transaction with a lower priority exists in the transaction pool, it is replaced with the
/// new transaction.
///
/// If no lower-priority transaction is found, the function returns an error indicating the
/// transaction was dropped immediately.
async fn attempt_transaction_replacement(
&self,
source: TransactionSource,
watched: bool,
xt: ExtrinsicFor<ChainApi>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
let at = self
.view_store
.most_recent_view
.read()
.ok_or(TxPoolApiError::ImmediatelyDropped)?;

let (best_view, _) = self
.view_store
.get_view_at(at, false)
.ok_or(TxPoolApiError::ImmediatelyDropped)?;

let (tx_hash, validated_tx) = best_view
.pool
.verify_one(
best_view.at.hash,
best_view.at.number,
TimedTransactionSource::from_transaction_source(source, false),
xt.clone(),
crate::graph::CheckBannedBeforeVerify::Yes,
)
.await;

let Some(priority) = validated_tx.priority() else {
return Err(TxPoolApiError::ImmediatelyDropped)
};

let insertion_info = self.mempool.try_replace_transaction(xt, priority, source, watched)?;

for worst_hash in &insertion_info.removed {
log::trace!(target: LOG_TARGET, "removed: {worst_hash:?} replaced by {tx_hash:?}");
self.view_store
.listener
.transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));

self.view_store
.remove_transaction_subtree(*worst_hash, |listener, removed_tx_hash| {
listener.limits_enforced(&removed_tx_hash);
});
}

return Ok(insertion_info)
}
}

#[async_trait]
Expand Down Expand Up @@ -1410,7 +1535,7 @@ mod reduce_multiview_result_tests {
fn empty() {
sp_tracing::try_init_simple();
let input = HashMap::default();
let r = reduce_multiview_result::<H256, Error>(input);
let r = reduce_multiview_result::<H256, H256, Error>(input);
assert!(r.is_empty());
}

Expand Down
Loading
Loading