Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Finalized block event triggers tx maintanance #12305

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
4995ead
finalized block event triggers tx maintanance
michalkucharczyk Sep 20, 2022
339636e
tx-pool: enactment helper introduced
michalkucharczyk Sep 22, 2022
cce685b
tx-pool: ChainApi: added tree_route method
michalkucharczyk Sep 26, 2022
a7c8035
enactment logic implemented + tests
michalkucharczyk Sep 26, 2022
cd27bc5
Some additional tests
michalkucharczyk Sep 27, 2022
9090201
minor improvements
michalkucharczyk Sep 27, 2022
8479b64
trigger CI job
michalkucharczyk Sep 27, 2022
e86d1bb
Merge remote-tracking branch 'origin/master' into mku-finalized-event…
michalkucharczyk Sep 27, 2022
deb7eab
fix compilation errors
michalkucharczyk Sep 28, 2022
cff7bd8
formatting
michalkucharczyk Sep 28, 2022
69040e9
trait removed
michalkucharczyk Sep 28, 2022
e367ffd
implementation slightly simplified
michalkucharczyk Sep 28, 2022
eb0a3b0
get rid of Arc<> in EnactmentState return value
michalkucharczyk Sep 28, 2022
c78f904
minor improvement
michalkucharczyk Sep 28, 2022
0c89000
Apply suggestions from code review
michalkucharczyk Sep 30, 2022
e75b180
Apply suggestions from code review
michalkucharczyk Sep 30, 2022
3a7f433
comment updated + formatting
michalkucharczyk Sep 30, 2022
a285f57
Apply suggestions from code review
michalkucharczyk Oct 1, 2022
26f8d96
formatting
michalkucharczyk Oct 1, 2022
4826e06
Merge remote-tracking branch 'origin/master' into mku-finalized-event…
michalkucharczyk Oct 3, 2022
7d59e8d
finalization notification bug fix
michalkucharczyk Oct 3, 2022
d0cc123
added error message on tree_route failure
michalkucharczyk Oct 3, 2022
7d8f162
Merge remote-tracking branch 'origin/master' into mku-finalized-event…
michalkucharczyk Oct 4, 2022
57b9192
Apply suggestions from code review
michalkucharczyk Oct 4, 2022
1e3af7e
use provided tree_route in Finalized event
michalkucharczyk Oct 4, 2022
cfd7f63
Option removed from ChainApi::tree_route
michalkucharczyk Oct 4, 2022
637a7ab
doc added, test and logs improved
michalkucharczyk Oct 4, 2022
6bfceca
handle_enactment aligned with original implementation
michalkucharczyk Oct 4, 2022
80f5216
use async-await
andresilva Oct 4, 2022
79c6739
Apply suggestions from code review
michalkucharczyk Oct 4, 2022
d7de511
Apply suggestions from code review
michalkucharczyk Oct 4, 2022
29e31b3
formatting + warn->debug
michalkucharczyk Oct 4, 2022
4bebf96
compilation error fix
michalkucharczyk Oct 4, 2022
0ef2a75
enactment_state initializers added
michalkucharczyk Oct 5, 2022
3793021
enactment_state: Option removed
michalkucharczyk Oct 5, 2022
d0d3d54
Merge remote-tracking branch 'origin/master' into mku-finalized-event…
michalkucharczyk Oct 5, 2022
8d26c23
manual-seal: compilation & tests fix
michalkucharczyk Oct 5, 2022
937daab
manual-seal: tests fixed
michalkucharczyk Oct 5, 2022
fef7e86
tests cleanup
michalkucharczyk Oct 5, 2022
49e4b4c
another compilation error fixed
michalkucharczyk Oct 5, 2022
3f038b5
TreeRoute::new added
michalkucharczyk Oct 7, 2022
0b6b28b
get rid of pub hack
michalkucharczyk Oct 7, 2022
36fb53b
one more test added
michalkucharczyk Oct 7, 2022
7d081b6
Merge remote-tracking branch 'origin/mku-finalized-tx-maintenance-imp…
michalkucharczyk Oct 7, 2022
65c41f5
formatting
michalkucharczyk Oct 7, 2022
7519a1b
Merge remote-tracking branch 'origin/master' into mku-finalized-event…
michalkucharczyk Oct 7, 2022
8e74825
Merge remote-tracking branch 'origin/master' into mku-finalized-event…
michalkucharczyk Oct 7, 2022
763c51c
TreeRoute::new doc added + formatting
michalkucharczyk Oct 7, 2022
1b49dd1
Apply suggestions from code review
michalkucharczyk Oct 7, 2022
cf98315
(bool,Option) simplified to Option
michalkucharczyk Oct 7, 2022
7f35983
log message improved
michalkucharczyk Oct 7, 2022
f46d5b6
Merge remote-tracking branch 'origin/master' into mku-finalized-event…
michalkucharczyk Oct 10, 2022
32a9522
yet another review suggestions applied
michalkucharczyk Oct 10, 2022
64a2ec0
get rid of hash in handle_enactment
michalkucharczyk Oct 10, 2022
61f9b92
Apply suggestions from code review
michalkucharczyk Oct 11, 2022
df3eec6
Update client/transaction-pool/src/lib.rs
michalkucharczyk Oct 11, 2022
d35c651
minor corrections
michalkucharczyk Oct 11, 2022
649bbf3
EnactmentState moved to new file
michalkucharczyk Oct 11, 2022
dab0fc3
File header corrected
michalkucharczyk Oct 11, 2022
63072de
error formatting aligned with codebase
michalkucharczyk Oct 11, 2022
e6ba411
Apply suggestions from code review
michalkucharczyk Oct 11, 2022
fdc6bd5
remove commented code
andresilva Oct 11, 2022
6ae40f6
small nits
andresilva Oct 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 150 additions & 124 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,153 +563,179 @@ async fn prune_known_txs_for_block<Block: BlockT, Api: graph::ChainApi<Block = B
hashes
}

impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
impl<PoolApi, Block> BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: 'static + graph::ChainApi<Block = Block>,
{
fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
match event {
ChainEvent::NewBestBlock { hash, tree_route } => {
let pool = self.pool.clone();
let api = self.api.clone();

let id = BlockId::hash(hash);
let block_number = match api.block_id_to_number(&id) {
Ok(Some(number)) => number,
_ => {
log::trace!(
target: "txpool",
"Skipping chain event - no number for that block {:?}",
id,
);
return Box::pin(ready(()))
},
};
fn handle_blocks_reorg(
&self,
hash: Block::Hash,
enacted: Vec<Block::Hash>,
retracted: Vec<Block::Hash>,
) -> Pin<Box<dyn Future<Output = ()> + Send>> {
println!("xxx handle_blocks_reorg {hash:?} retracted: {retracted:?} enacted: {enacted:?}");

let next_action = self.revalidation_strategy.lock().next(
block_number,
Some(std::time::Duration::from_secs(60)),
Some(20u32.into()),
let pool = self.pool.clone();
let api = self.api.clone();

let id = BlockId::hash(hash);
let block_number = match api.block_id_to_number(&id) {
Ok(Some(number)) => number,
_ => {
log::trace!(
target: "txpool",
"Skipping chain event - no number for that block {:?}",
id,
);
let revalidation_strategy = self.revalidation_strategy.clone();
let revalidation_queue = self.revalidation_queue.clone();
let ready_poll = self.ready_poll.clone();
let metrics = self.metrics.clone();
return Box::pin(ready(()))
},
};

async move {
// We keep track of everything we prune so that later we won't add
// transactions with those hashes from the retracted blocks.
let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();

// If there is a tree route, we use this to prune known tx based on the enacted
// blocks. Before pruning enacted transactions, we inform the listeners about
// retracted blocks and their transactions. This order is important, because
// if we enact and retract the same transaction at the same time, we want to
// send first the retract and than the prune event.
if let Some(ref tree_route) = tree_route {
for retracted in tree_route.retracted() {
// notify txs awaiting finality that it has been retracted
pool.validated_pool().on_block_retracted(retracted.hash);
}
let next_action = self.revalidation_strategy.lock().next(
block_number,
Some(std::time::Duration::from_secs(60)),
Some(20u32.into()),
);
let revalidation_strategy = self.revalidation_strategy.clone();
let revalidation_queue = self.revalidation_queue.clone();
let ready_poll = self.ready_poll.clone();
let metrics = self.metrics.clone();
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved

future::join_all(tree_route.enacted().iter().map(|h| {
prune_known_txs_for_block(BlockId::Hash(h.hash), &*api, &*pool)
}))
async move {
// We keep track of everything we prune so that later we won't add
// transactions with those hashes from the retracted blocks.
let mut pruned_log = HashSet::<ExtrinsicHash<PoolApi>>::new();

// If there is a tree route, we use this to prune known tx based on the enacted
// blocks. Before pruning enacted transactions, we inform the listeners about
// retracted blocks and their transactions. This order is important, because
// if we enact and retract the same transaction at the same time, we want to
// send first the retract and than the prune event.
for retracted_hash in retracted.iter() {
// notify txs awaiting finality that it has been retracted
pool.validated_pool().on_block_retracted(*retracted_hash);
}

future::join_all(
enacted
.iter()
.map(|hash| prune_known_txs_for_block(BlockId::Hash(*hash), &*api, &*pool)),
)
.await
.into_iter()
.for_each(|enacted_log| {
pruned_log.extend(enacted_log);
});

pruned_log.extend(prune_known_txs_for_block(id, &*api, &*pool).await);

metrics.report(|metrics| {
metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64)
});

// is Some(tree_route) == (enacted.len != 0 || retracted.len != 0)
if let true = next_action.resubmit {
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
let mut resubmit_transactions = Vec::new();

for hash in retracted.iter() {
let block_transactions = api
.block_body(&BlockId::hash(*hash))
.await
.into_iter()
.for_each(|enacted_log| {
pruned_log.extend(enacted_log);
.unwrap_or_else(|e| {
log::warn!("Failed to fetch block body: {}", e);
None
})
}
.unwrap_or_default()
.into_iter()
.filter(|tx| tx.is_signed().unwrap_or(true));

pruned_log.extend(prune_known_txs_for_block(id, &*api, &*pool).await);
let mut resubmitted_to_report = 0;

metrics.report(|metrics| {
metrics.block_transactions_pruned.inc_by(pruned_log.len() as u64)
});
resubmit_transactions.extend(block_transactions.into_iter().filter(|tx| {
let tx_hash = pool.hash_of(tx);
let contains = pruned_log.contains(&tx_hash);

if let (true, Some(tree_route)) = (next_action.resubmit, tree_route) {
let mut resubmit_transactions = Vec::new();

for retracted in tree_route.retracted() {
let hash = retracted.hash;

let block_transactions = api
.block_body(&BlockId::hash(hash))
.await
.unwrap_or_else(|e| {
log::warn!("Failed to fetch block body: {}", e);
None
})
.unwrap_or_default()
.into_iter()
.filter(|tx| tx.is_signed().unwrap_or(true));

let mut resubmitted_to_report = 0;

resubmit_transactions.extend(block_transactions.into_iter().filter(
|tx| {
let tx_hash = pool.hash_of(tx);
let contains = pruned_log.contains(&tx_hash);

// need to count all transactions, not just filtered, here
resubmitted_to_report += 1;

if !contains {
log::debug!(
target: "txpool",
"[{:?}]: Resubmitting from retracted block {:?}",
tx_hash,
hash,
);
}
!contains
},
));

metrics.report(|metrics| {
metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
});
}
// need to count all transactions, not just filtered, here
resubmitted_to_report += 1;

if let Err(e) = pool
.resubmit_at(
&id,
// These transactions are coming from retracted blocks, we should
// simply consider them external.
TransactionSource::External,
resubmit_transactions,
)
.await
{
if !contains {
log::debug!(
target: "txpool",
"[{:?}] Error re-submitting transactions: {}",
id,
e,
)
target: "txpool",
"[{:?}]: Resubmitting from retracted block {:?}",
tx_hash,
hash,
);
}
}
!contains
}));

let extra_pool = pool.clone();
// After #5200 lands, this arguably might be moved to the
// handler of "all blocks notification".
ready_poll.lock().trigger(block_number, move || {
Box::new(extra_pool.validated_pool().ready())
metrics.report(|metrics| {
metrics.block_transactions_resubmitted.inc_by(resubmitted_to_report)
});
}

if next_action.revalidate {
let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
revalidation_queue.revalidate_later(block_number, hashes).await;

revalidation_strategy.lock().clear();
}
if let Err(e) = pool
.resubmit_at(
&id,
// These transactions are coming from retracted blocks, we should
// simply consider them external.
TransactionSource::External,
resubmit_transactions,
)
.await
{
log::debug!(
target: "txpool",
"[{:?}] Error re-submitting transactions: {}",
id,
e,
)
}
.boxed()
}

let extra_pool = pool.clone();
// After #5200 lands, this arguably might be moved to the
// handler of "all blocks notification".
ready_poll
.lock()
.trigger(block_number, move || Box::new(extra_pool.validated_pool().ready()));

if next_action.revalidate {
let hashes = pool.validated_pool().ready().map(|tx| tx.hash).collect();
revalidation_queue.revalidate_later(block_number, hashes).await;

revalidation_strategy.lock().clear();
}
}
.boxed()
}
}

impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
where
Block: BlockT,
PoolApi: 'static + graph::ChainApi<Block = Block>,
{
fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output = ()> + Send>> {
match event {
ChainEvent::NewBestBlock { hash, tree_route } => {
println!("xxx NewBestBlock {hash:?} tree_route: {tree_route:?}");
let (enacted, retracted) = if let Some(tree_route) = tree_route {
(
tree_route.enacted().iter().map(|block| block.hash).collect(),
tree_route.retracted().iter().map(|block| block.hash).collect(),
)
} else {
(vec![], vec![])
};

self.handle_blocks_reorg(hash, enacted, retracted)
},
ChainEvent::Finalized { hash, tree_route } => {
let pool = self.pool.clone();
println!("xxx Finalized {hash:?} {tree_route:?}");
self.handle_blocks_reorg(hash, tree_route.to_vec(), vec![]);
async move {
michalkucharczyk marked this conversation as resolved.
Show resolved Hide resolved
for hash in tree_route.iter().chain(&[hash]) {
if let Err(e) = pool.validated_pool().on_block_finalized(*hash).await {
Expand Down
40 changes: 40 additions & 0 deletions client/transaction-pool/tests/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ fn finalization() {

#[test]
fn fork_aware_finalization() {
sp_tracing::try_init_simple();
let api = TestApi::empty();
// starting block A1 (last finalized.)
api.push_block(1, vec![], true);
Expand Down Expand Up @@ -997,3 +998,42 @@ fn stale_transactions_are_pruned() {
assert_eq!(pool.status().future, 0);
assert_eq!(pool.status().ready, 0);
}

#[test]
fn playground() {
sp_tracing::try_init_simple();
println!("xxxx");
let xt = uxt(Alice, 209);

let (pool, api, _guard) = maintained_pool();

// block_on(pool.submit_one(&BlockId::number(0), SOURCE, xt.clone())).expect("1. Imported");
let watcher = block_on(pool.submit_and_watch(&BlockId::number(0), SOURCE, xt.clone()))
.expect("1. Imported");
assert_eq!(pool.status().ready, 1);

let header = api.push_block(1, vec![xt.clone()], true);

let ps = pool.pool().validated_pool().status();
println!("status: {ps:?}");
// block_on(pool.maintain(block_event(header.clone())));
// let ps = pool.pool().validated_pool().status();
// println!("status: {ps:?}");

let event =
ChainEvent::Finalized { hash: header.clone().hash(), tree_route: Arc::from(vec![]) };
block_on(pool.maintain(event));

let ps = pool.pool().validated_pool().status();
println!("status: {ps:?}");

assert_eq!(pool.status().ready, 0);

{
let mut stream = futures::executor::block_on_stream(watcher);
assert_eq!(stream.next(), Some(TransactionStatus::Ready));
assert_eq!(stream.next(), Some(TransactionStatus::InBlock(header.clone().hash())));
assert_eq!(stream.next(), Some(TransactionStatus::Finalized(header.hash())));
assert_eq!(stream.next(), None);
}
}