Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fatxpool: proper handling of priorities when mempool is full #6647

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
018f9d4
dropped_watcher: new fns are public
michalkucharczyk Nov 26, 2024
4aa24d3
dropped_watcher: improvement
michalkucharczyk Nov 26, 2024
9fe88a6
fatp: handling higher prio with full mempool
michalkucharczyk Nov 26, 2024
54ae11c
graph: fold_ready improved
michalkucharczyk Nov 26, 2024
a6882eb
graph: make some staff public
michalkucharczyk Nov 26, 2024
8d3dffe
tests added
michalkucharczyk Nov 26, 2024
70fd186
type removed
michalkucharczyk Nov 26, 2024
d3a1a7b
improvements
michalkucharczyk Nov 28, 2024
4246dac
make use of your brain
michalkucharczyk Nov 28, 2024
c203d72
fatp: pending actions now support removals
michalkucharczyk Nov 28, 2024
edb1257
validated_pool: SubmitOutcome
michalkucharczyk Nov 29, 2024
f778176
view/view_store: SubmitOutcome
michalkucharczyk Nov 29, 2024
a72b3f9
mempool: update_transaction stub
michalkucharczyk Nov 29, 2024
c411bb4
fatp: SubmitOutcome
michalkucharczyk Nov 29, 2024
7b461bf
fatp: todo added
michalkucharczyk Nov 29, 2024
8765d2c
single-state txpool: SubmitOutcome integration
michalkucharczyk Nov 29, 2024
e8ccd44
tests: SubmitOutcome fixes
michalkucharczyk Nov 29, 2024
6cca272
mempool: sizes fix
michalkucharczyk Nov 29, 2024
3b17a16
dropping transaction - size limit is properly obeyed now
michalkucharczyk Dec 3, 2024
4f767e5
merge / rebase fixes
michalkucharczyk Dec 4, 2024
6ba133e
mempool: prio is now locked option
michalkucharczyk Dec 4, 2024
46fa1fd
tests added + dead code cleanup
michalkucharczyk Dec 4, 2024
2221d7a
comments cleanup
michalkucharczyk Dec 4, 2024
0244ba0
tweaks
michalkucharczyk Jan 7, 2025
037e016
Merge remote-tracking branch 'origin/master' into mku-fatxpool-mempoo…
michalkucharczyk Jan 7, 2025
5d0283e
review comments
michalkucharczyk Jan 8, 2025
caca2e1
clippy
michalkucharczyk Jan 8, 2025
b86ef05
clean up
michalkucharczyk Jan 8, 2025
736d698
Merge branch 'master' into mku-fatxpool-mempool-priorities-at-limits
michalkucharczyk Jan 8, 2025
4294261
Update from michalkucharczyk running command 'prdoc --bump minor --au…
Jan 9, 2025
b4290cd
Update prdoc/pr_6647.prdoc
michalkucharczyk Jan 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ pub struct DroppedTransaction<Hash> {
}

impl<Hash> DroppedTransaction<Hash> {
fn new_usurped(tx_hash: Hash, by: Hash) -> Self {
/// Creates an new instnance with reason set to `DroppedReason::Usurped(by)`.
pub fn new_usurped(tx_hash: Hash, by: Hash) -> Self {
Self { reason: DroppedReason::Usurped(by), tx_hash }
}

fn new_enforced_by_limts(tx_hash: Hash) -> Self {
/// Creates an new instnance with reason set to `DroppedReason::LimitsEnforced`.
pub fn new_enforced_by_limts(tx_hash: Hash) -> Self {
Self { reason: DroppedReason::LimitsEnforced, tx_hash }
}
}
Expand Down Expand Up @@ -204,6 +206,10 @@ where
views
);
views.remove(&key);
//todo: merge heads up warning!
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
if views.is_empty() {
self.pending_dropped_transactions.push(*tx_hash);
}
});

self.future_transaction_views.iter_mut().for_each(|(tx_hash, views)| {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use crate::{
api::FullChainApi,
common::log_xt::log_xt_trace,
enactment_state::{EnactmentAction, EnactmentState},
fork_aware_txpool::{dropped_watcher::DroppedReason, revalidation_worker},
fork_aware_txpool::{
dropped_watcher::{DroppedReason, DroppedTransaction},
revalidation_worker,
},
graph::{
self,
base_pool::{TimedTransactionSource, Transaction},
Expand All @@ -49,8 +52,9 @@ use futures::{
use parking_lot::Mutex;
use prometheus_endpoint::Registry as PrometheusRegistry;
use sc_transaction_pool_api::{
ChainEvent, ImportNotificationStream, MaintainedTransactionPool, PoolStatus, TransactionFor,
TransactionPool, TransactionSource, TransactionStatusStreamFor, TxHash,
error::Error as TxPoolApiError, ChainEvent, ImportNotificationStream,
MaintainedTransactionPool, PoolStatus, TransactionFor, TransactionPool, TransactionSource,
TransactionStatusStreamFor, TxHash,
};
use sp_blockchain::{HashAndNumber, TreeRoute};
use sp_core::traits::SpawnEssentialNamed;
Expand Down Expand Up @@ -287,7 +291,7 @@ where
DroppedReason::LimitsEnforced => {},
};

mempool.remove_dropped_transaction(&dropped_tx_hash).await;
mempool.remove_transaction(&dropped_tx_hash);
view_store.listener.transaction_dropped(dropped);
import_notification_sink.clean_notified_items(&[dropped_tx_hash]);
}
Expand Down Expand Up @@ -598,7 +602,7 @@ where
/// out:
/// [ Ok(xth0), Ok(xth1), Err ]
/// ```
fn reduce_multiview_result<H, E>(input: HashMap<H, Vec<Result<H, E>>>) -> Vec<Result<H, E>> {
fn reduce_multiview_result<H, D, E>(input: HashMap<H, Vec<Result<D, E>>>) -> Vec<Result<D, E>> {
let mut values = input.values();
let Some(first) = values.next() else {
return Default::default();
Expand Down Expand Up @@ -639,7 +643,7 @@ where
/// are reduced to single result. Refer to `reduce_multiview_result` for more details.
async fn submit_at(
&self,
_: <Self::Block as BlockT>::Hash,
at: <Self::Block as BlockT>::Hash,
source: TransactionSource,
xts: Vec<TransactionFor<Self>>,
) -> Result<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
Expand All @@ -650,9 +654,27 @@ where
let mempool_results = self.mempool.extend_unwatched(source, &xts);

if view_store.is_empty() {
return Ok(mempool_results.into_iter().map(|r| r.map(|r| r.hash)).collect::<Vec<_>>())
return Ok(mempool_results
.into_iter()
.map(|r| r.map(|r| r.hash).map_err(Into::into))
.collect::<Vec<_>>())
}

//todo: review + test maybe?
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
let retries = mempool_results
.into_iter()
.zip(xts.clone())
.map(|(result, xt)| async move {
match result {
Err(TxPoolApiError::ImmediatelyDropped) =>
self.attempt_transaction_replacement(at, source, false, xt).await,
result @ _ => result,
}
})
.collect::<Vec<_>>();

let mempool_results = futures::future::join_all(retries).await;

let to_be_submitted = mempool_results
.iter()
.zip(xts)
Expand All @@ -671,15 +693,21 @@ where
Ok(mempool_results
.into_iter()
.map(|result| {
result.and_then(|insertion| {
result
.map_err(Into::into)
.and_then(|insertion| {
submission_results
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
.next()
.expect("The number of Ok results in mempool is exactly the same as the size of to-views-submission result. qed.")
.inspect_err(|_|
mempool.remove(insertion.hash)
)
.inspect_err(|_|{
mempool.remove_transaction(&insertion.hash);
})
})
})
.map(|r| r.map(|r| {
mempool.update_transaction(&r);
r.hash()
}))
.collect::<Vec<_>>())
}

Expand Down Expand Up @@ -712,18 +740,27 @@ where
) -> Result<Pin<Box<TransactionStatusStreamFor<Self>>>, Self::Error> {
log::trace!(target: LOG_TARGET, "[{:?}] fatp::submit_and_watch views:{}", self.tx_hash(&xt), self.active_views_count());
let xt = Arc::from(xt);
let InsertionInfo { hash: xt_hash, source: timed_source } =

let InsertionInfo { hash: xt_hash, source: timed_source, .. } =
match self.mempool.push_watched(source, xt.clone()) {
Ok(result) => result,
Err(e) => return Err(e),
Err(TxPoolApiError::ImmediatelyDropped) =>
self.attempt_transaction_replacement(at, source, true, xt.clone()).await?,
Err(e) => return Err(e.into()),
};

self.metrics.report(|metrics| metrics.submitted_transactions.inc());

self.view_store
.submit_and_watch(at, timed_source, xt)
.await
.inspect_err(|_| self.mempool.remove(xt_hash))
.inspect_err(|_| {
self.mempool.remove_transaction(&xt_hash);
})
.map(|mut outcome| {
self.mempool.update_transaction(&outcome);
outcome.expect_watcher()
})
}

/// Intended to remove transactions identified by the given hashes, and any dependent
Expand Down Expand Up @@ -857,7 +894,13 @@ where
.extend_unwatched(TransactionSource::Local, &[xt.clone()])
.remove(0)?;

self.view_store.submit_local(xt).or_else(|_| Ok(xt_hash))
self.view_store
.submit_local(xt)
.map(|outcome| {
self.mempool.update_transaction(&outcome);
outcome.hash()
})
.or_else(|_| Ok(xt_hash))
}
}

Expand Down Expand Up @@ -1109,7 +1152,13 @@ where
.await
.into_iter()
.zip(hashes)
.map(|(result, tx_hash)| result.or_else(|_| Err(tx_hash)))
.map(|(result, tx_hash)| {
//todo: we may need a bool flag here indicating if we need to update priority
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
//(premature optimization)
result
.map(|outcome| self.mempool.update_transaction(&outcome.into()))
.or_else(|_| Err(tx_hash))
})
.collect::<Vec<_>>();

let submitted_count = watched_results.len();
Expand All @@ -1131,7 +1180,7 @@ where
for result in watched_results {
if let Err(tx_hash) = result {
self.view_store.listener.invalidate_transactions(&[tx_hash]);
self.mempool.remove(tx_hash);
self.mempool.remove_transaction(&tx_hash);
}
}
}
Expand Down Expand Up @@ -1263,6 +1312,69 @@ where
fn tx_hash(&self, xt: &TransactionFor<Self>) -> TxHash<Self> {
self.api.hash_and_length(xt).0
}

/// Attempts to find and replace a lower-priority transaction in the transaction pool with a new
/// one.
///
/// This asynchronous function verifies the new transaction against the most recent view. If a
/// transaction with a lower priority exists in the transaction pool, it is replaced with the
/// new transaction.
///
/// If no lower-priority transaction is found, the function returns an error indicating the
/// transaction was dropped immediately.
async fn attempt_transaction_replacement(
&self,
_: Block::Hash,
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
source: TransactionSource,
watched: bool,
xt: ExtrinsicFor<ChainApi>,
) -> Result<InsertionInfo<ExtrinsicHash<ChainApi>>, TxPoolApiError> {
let at = self
.view_store
.most_recent_view
.read()
.ok_or(TxPoolApiError::ImmediatelyDropped)?;

let (best_view, _) = self
.view_store
.get_view_at(at, false)
.ok_or(TxPoolApiError::ImmediatelyDropped)?;

let (tx_hash, validated_tx) = best_view
.pool
.verify_one(
best_view.at.hash,
best_view.at.number,
TimedTransactionSource::from_transaction_source(source, false),
xt.clone(),
crate::graph::CheckBannedBeforeVerify::Yes,
)
.await;

if let Some(priority) = validated_tx.priority() {
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
let insertion_info =
self.mempool.try_replace_transaction(xt, priority, source, watched)?;

for worst_hash in &insertion_info.removed {
log::trace!(target: LOG_TARGET, "removed: {worst_hash:?} replaced by {tx_hash:?}");
self.view_store
.listener
.transaction_dropped(DroppedTransaction::new_enforced_by_limts(*worst_hash));

self.view_store.remove_transaction_subtree(
*worst_hash,
|listener, removed_tx_hash| {
listener.limits_enforced(&removed_tx_hash);
},
);
}

// todo: add to pending_replacements - make sure it will not sneak back via cloned view
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
return Ok(insertion_info)
}

Err(TxPoolApiError::ImmediatelyDropped)
}
}

#[async_trait]
Expand Down Expand Up @@ -1410,7 +1522,7 @@ mod reduce_multiview_result_tests {
fn empty() {
sp_tracing::try_init_simple();
let input = HashMap::default();
let r = reduce_multiview_result::<H256, Error>(input);
let r = reduce_multiview_result::<H256, H256, Error>(input);
assert!(r.is_empty());
}

Expand Down
Loading
Loading