Skip to content

Commit

Permalink
tests: future dropping test added
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkucharczyk committed Nov 12, 2024
1 parent 5a83cf5 commit c566dad
Showing 1 changed file with 235 additions and 1 deletion.
236 changes: 235 additions & 1 deletion substrate/client/transaction-pool/tests/fatp_limits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use sc_transaction_pool::ChainApi;
use sc_transaction_pool_api::{
error::Error as TxPoolError, MaintainedTransactionPool, TransactionPool, TransactionStatus,
};
use std::thread::sleep;
use std::{collections::HashSet, thread::sleep};
use substrate_test_runtime_client::AccountKeyring::*;
use substrate_test_runtime_transaction_pool::uxt;

Expand Down Expand Up @@ -641,3 +641,237 @@ fn fatp_limits_future_size_works() {
assert_pool_status!(header01.hash(), &pool, 0, 3);
assert_eq!(pool.mempool_len().0, 3);
}

// this test is bad by design: xt0, xt1 cannot be dropped by the pool after finalizing 02 - there
// can be new fork where xt0, xt1 will be valid.
// todo: do we need this test?
#[test]
#[ignore]
fn fatp_limits_watcher_xxx() {
sp_tracing::try_init_simple();

let builder = TestPoolBuilder::new();
let (pool, api, _) = builder.with_mempool_count_limit(6).with_ready_count(2).build();
api.set_nonce(api.genesis_hash(), Bob.into(), 300);
api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
api.set_nonce(api.genesis_hash(), Dave.into(), 500);
api.set_nonce(api.genesis_hash(), Eve.into(), 600);
api.set_nonce(api.genesis_hash(), Ferdie.into(), 700);

let header01 = api.push_block(1, vec![], true);
let event = new_best_block_event(&pool, None, header01.hash());
block_on(pool.maintain(event));

let xt0 = uxt(Alice, 200);
let xt1 = uxt(Bob, 300);
let xt2 = uxt(Charlie, 400);

let xt3 = uxt(Dave, 500);
let xt4 = uxt(Eve, 600);
let xt5 = uxt(Ferdie, 700);

let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();

assert_pool_status!(header01.hash(), &pool, 2, 0);
assert_eq!(pool.mempool_len().1, 2);

let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));

let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();

assert_pool_status!(header02.hash(), &pool, 2, 0);
assert_eq!(pool.mempool_len().1, 4);

let header03 = api.push_block_with_parent(header02.hash(), vec![], true);
block_on(pool.maintain(new_best_block_event(&pool, Some(header02.hash()), header03.hash())));

let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap();
let xt5_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt5.clone())).unwrap();

assert_pool_status!(header03.hash(), &pool, 2, 0);
assert_eq!(pool.mempool_len().1, 6);

let header04 =
api.push_block_with_parent(header03.hash(), vec![xt4.clone(), xt5.clone()], true);
api.set_nonce(header04.hash(), Alice.into(), 201);
api.set_nonce(header04.hash(), Bob.into(), 301);
api.set_nonce(header04.hash(), Charlie.into(), 401);
api.set_nonce(header04.hash(), Dave.into(), 501);
api.set_nonce(header04.hash(), Eve.into(), 601);
api.set_nonce(header04.hash(), Ferdie.into(), 701);
block_on(pool.maintain(new_best_block_event(&pool, Some(header03.hash()), header04.hash())));

assert_ready_iterator!(header01.hash(), pool, [xt0, xt1]);
assert_ready_iterator!(header02.hash(), pool, [xt2, xt3]);
assert_ready_iterator!(header03.hash(), pool, [xt4, xt5]);
assert_ready_iterator!(header04.hash(), pool, []);

block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header01.hash())));
assert!(!pool.status_all().contains_key(&header01.hash()));

block_on(pool.maintain(finalized_block_event(&pool, header01.hash(), header02.hash())));
assert!(!pool.status_all().contains_key(&header02.hash()));
assert!(pool.ready_at(header01.hash()).now_or_never().is_none());
//todo: can we do better? We don't have API to check if event was processed internally.
let mut counter = 0;
while pool.mempool_len().1 != 4 {
sleep(std::time::Duration::from_millis(1));
counter = counter + 1;
if counter > 20 {
assert!(false, "timeout");
}
}
assert_eq!(pool.mempool_len().1, 4);

block_on(pool.maintain(finalized_block_event(&pool, header02.hash(), header03.hash())));
log::info!("status: {:?}", pool.status_all());
assert!(!pool.status_all().contains_key(&header03.hash()));
assert!(pool.ready_at(header02.hash()).now_or_never().is_none());

//
//
//
//

// assert_pool_status!(header02e.hash(), &pool, 0, 0);
//
// let header02f = api.push_block_with_parent(header01.hash(), vec![], true);
// block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()),
// header02f.hash()))); assert_pool_status!(header02f.hash(), &pool, 2, 0);
// assert_ready_iterator!(header02f.hash(), pool, [xt1, xt2]);
//
// let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE,
// xt3.clone())).unwrap(); let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(),
// SOURCE, xt4.clone())).unwrap(); let result5 = block_on(pool.submit_and_watch(invalid_hash(),
// SOURCE, xt5.clone())).map(|_| ());
//
// //xt5 hits internal mempool limit
// assert!(matches!(result5.unwrap_err().0, TxPoolError::ImmediatelyDropped));
//
// assert_pool_status!(header02e.hash(), &pool, 2, 0);
// assert_ready_iterator!(header02e.hash(), pool, [xt3, xt4]);
// assert_eq!(pool.mempool_len().1, 4);
//
// let xt1_status = futures::executor::block_on_stream(xt1_watcher).take(2).collect::<Vec<_>>();
// assert_eq!(
// xt1_status,
// vec![TransactionStatus::Ready, TransactionStatus::InBlock((header02e.hash(), 1))]
// );
//
// let xt2_status = futures::executor::block_on_stream(xt2_watcher).take(2).collect::<Vec<_>>();
// assert_eq!(
// xt2_status,
// vec![TransactionStatus::Ready, TransactionStatus::InBlock((header02e.hash(), 2))]
// );
//
// let xt3_status = futures::executor::block_on_stream(xt3_watcher).take(1).collect::<Vec<_>>();
// assert_eq!(xt3_status, vec![TransactionStatus::Ready]);
// let xt4_status = futures::executor::block_on_stream(xt4_watcher).take(1).collect::<Vec<_>>();
// assert_eq!(xt4_status, vec![TransactionStatus::Ready]);
}

macro_rules! assert_future_iterator {
($pool:expr, [$( $xt:expr ),*]) => {{
let futures = $pool.futures();
let expected = vec![ $($pool.api().hash_and_length(&$xt).0),*];
log::debug!(target:LOG_TARGET, "expected: {:#?}", futures);
log::debug!(target:LOG_TARGET, "output: {:#?}", expected);
assert_eq!(expected.len(), futures.len());
let hsf = futures.iter().map(|a| a.hash).collect::<HashSet<_>>();
let hse = expected.into_iter().collect::<HashSet<_>>();
assert_eq!(hse,hsf);
}};
}

#[test]
fn fatp_limits_watcher_future_transactions_are_droped_when_view_is_dropped() {
sp_tracing::try_init_simple();

let builder = TestPoolBuilder::new();
let (pool, api, _) = builder.with_mempool_count_limit(6).with_future_count(2).build();
api.set_nonce(api.genesis_hash(), Bob.into(), 300);
api.set_nonce(api.genesis_hash(), Charlie.into(), 400);
api.set_nonce(api.genesis_hash(), Dave.into(), 500);
api.set_nonce(api.genesis_hash(), Eve.into(), 600);
api.set_nonce(api.genesis_hash(), Ferdie.into(), 700);

let header01 = api.push_block(1, vec![], true);
let event = new_best_block_event(&pool, None, header01.hash());
block_on(pool.maintain(event));

let xt0 = uxt(Alice, 201);
let xt1 = uxt(Bob, 301);
let xt2 = uxt(Charlie, 401);

let xt3 = uxt(Dave, 501);
let xt4 = uxt(Eve, 601);
let xt5 = uxt(Ferdie, 701);

let xt0_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt0.clone())).unwrap();
let xt1_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt1.clone())).unwrap();

assert_pool_status!(header01.hash(), &pool, 0, 2);
assert_eq!(pool.mempool_len().1, 2);
assert_future_iterator!(pool, [xt0, xt1]);

let header02 = api.push_block_with_parent(header01.hash(), vec![], true);
block_on(pool.maintain(new_best_block_event(&pool, Some(header01.hash()), header02.hash())));

let xt2_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt2.clone())).unwrap();
let xt3_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt3.clone())).unwrap();

assert_pool_status!(header02.hash(), &pool, 0, 2);
assert_eq!(pool.mempool_len().1, 4);
assert_future_iterator!(pool, [xt2, xt3]);

let header03 = api.push_block_with_parent(header02.hash(), vec![], true);
block_on(pool.maintain(new_best_block_event(&pool, Some(header02.hash()), header03.hash())));

let xt4_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt4.clone())).unwrap();
let xt5_watcher = block_on(pool.submit_and_watch(invalid_hash(), SOURCE, xt5.clone())).unwrap();

assert_pool_status!(header03.hash(), &pool, 0, 2);
assert_eq!(pool.mempool_len().1, 6);
assert_future_iterator!(pool, [xt4, xt5]);

let header04 = api.push_block_with_parent(header03.hash(), vec![], true);
block_on(pool.maintain(new_best_block_event(&pool, Some(header03.hash()), header04.hash())));

assert_pool_status!(header04.hash(), &pool, 0, 2);
assert_eq!(pool.futures().len(), 2);

block_on(pool.maintain(finalized_block_event(&pool, api.genesis_hash(), header04.hash())));
assert_eq!(pool.active_views_count(), 1);
assert_eq!(pool.inactive_views_count(), 0);
//todo: can we do better? We don't have API to check if event was processed internally.
let mut counter = 0;
while pool.mempool_len().1 != 2 {
sleep(std::time::Duration::from_millis(1));
counter = counter + 1;
if counter > 20 {
assert!(false, "timeout {}", pool.mempool_len().1);
}
}
assert_eq!(pool.mempool_len().1, 2);
assert_pool_status!(header04.hash(), &pool, 0, 2);
assert_eq!(pool.futures().len(), 2);

let mut to_be_checked = vec![
(xt0, xt0_watcher),
(xt1, xt1_watcher),
(xt2, xt2_watcher),
(xt3, xt3_watcher),
(xt4, xt4_watcher),
(xt5, xt5_watcher),
];
let future_hashes = pool.futures().iter().map(|t| t.hash).collect::<Vec<_>>();
to_be_checked.retain(|x| !future_hashes.contains(&api.hash_and_length(&x.0).0));

for x in to_be_checked {
let x_status = futures::executor::block_on_stream(x.1).take(2).collect::<Vec<_>>();
assert_eq!(x_status, vec![TransactionStatus::Future, TransactionStatus::Dropped]);
}
}

0 comments on commit c566dad

Please sign in to comment.