Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Fix a race conditon in the pool when transactions are imported during…
Browse files Browse the repository at this point in the history
… pruning. (#2136)

* Store recently pruned tags to avoid re-importing transactions.

* Update core/transaction-pool/graph/src/base_pool.rs

* Update core/transaction-pool/graph/src/base_pool.rs

* Update core/transaction-pool/graph/src/base_pool.rs

* Update base_pool.rs
  • Loading branch information
tomusdrw authored and gavofyork committed Mar 28, 2019
1 parent fd15825 commit c767d10
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 17 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/transaction-pool/graph/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@ sr-primitives = { path = "../../sr-primitives" }

[dev-dependencies]
assert_matches = "1.1"
env_logger = "0.6"
parity-codec = "3.2"
test_runtime = { package = "substrate-test-runtime", path = "../../test-runtime" }
28 changes: 23 additions & 5 deletions core/transaction-pool/graph/src/base_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! For a more full-featured pool, have a look at the `pool` module.
use std::{
collections::HashSet,
fmt,
hash,
sync::Arc,
Expand Down Expand Up @@ -134,6 +135,9 @@ impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic> where
}
}

/// Store last pruned tags for given number of invocations.
const RECENTLY_PRUNED_TAGS: usize = 2;

/// Transaction pool.
///
/// Builds a dependency graph for all transactions in the pool and returns
Expand All @@ -148,13 +152,21 @@ impl<Hash, Extrinsic> fmt::Debug for Transaction<Hash, Extrinsic> where
pub struct BasePool<Hash: hash::Hash + Eq, Ex> {
future: FutureTransactions<Hash, Ex>,
ready: ReadyTransactions<Hash, Ex>,
/// Store recently pruned tags (for last two invocations).
///
/// This is used to make sure we don't accidentally put
/// transactions to future in case they were just stuck in verification.
recently_pruned: [HashSet<Tag>; RECENTLY_PRUNED_TAGS],
recently_pruned_index: usize,
}

impl<Hash: hash::Hash + Eq, Ex> Default for BasePool<Hash, Ex> {
fn default() -> Self {
BasePool {
future: Default::default(),
ready: Default::default(),
recently_pruned: Default::default(),
recently_pruned_index: 0,
}
}
}
Expand All @@ -175,7 +187,11 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: ::std::fmt::Debug> BasePool<Hash
bail!(error::ErrorKind::AlreadyImported(Box::new(tx.hash.clone())))
}

let tx = WaitingTransaction::new(tx, self.ready.provided_tags());
let tx = WaitingTransaction::new(
tx,
self.ready.provided_tags(),
&self.recently_pruned,
);
trace!(target: "txpool", "[{:?}] {:?}", tx.transaction.hash, tx);
debug!(target: "txpool", "[{:?}] Importing to {}", tx.transaction.hash, if tx.is_ready() { "ready" } else { "future" });

Expand Down Expand Up @@ -354,12 +370,17 @@ impl<Hash: hash::Hash + Member + Serialize, Ex: ::std::fmt::Debug> BasePool<Hash
pub fn prune_tags(&mut self, tags: impl IntoIterator<Item=Tag>) -> PruneStatus<Hash, Ex> {
let mut to_import = vec![];
let mut pruned = vec![];
let recently_pruned = &mut self.recently_pruned[self.recently_pruned_index];
self.recently_pruned_index = (self.recently_pruned_index + 1) % RECENTLY_PRUNED_TAGS;
recently_pruned.clear();

for tag in tags {
// make sure to promote any future transactions that could be unlocked
to_import.append(&mut self.future.satisfy_tags(::std::iter::once(&tag)));
// and actually prune transactions in ready queue
pruned.append(&mut self.ready.prune_tags(tag));
pruned.append(&mut self.ready.prune_tags(tag.clone()));
// store the tags for next submission
recently_pruned.insert(tag);
}

let mut promoted = vec![];
Expand Down Expand Up @@ -663,7 +684,6 @@ mod tests {
assert_eq!(pool.future.len(), 0);
}


#[test]
fn should_handle_a_cycle_with_low_priority() {
// given
Expand Down Expand Up @@ -798,7 +818,6 @@ mod tests {
assert_eq!(pool.future.len(), 0);
}


#[test]
fn should_prune_ready_transactions() {
// given
Expand Down Expand Up @@ -887,5 +906,4 @@ mod tests {
r#"Transaction { hash: 4, priority: 1000, valid_till: 64, bytes: 1, requires: [03,02], provides: [04], data: [4]}"#.to_owned()
);
}

}
13 changes: 11 additions & 2 deletions core/transaction-pool/graph/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,19 @@ impl<Hash, Ex> WaitingTransaction<Hash, Ex> {
///
/// Computes the set of missing tags based on the requirements and tags that
/// are provided by all transactions in the ready queue.
pub fn new(transaction: Transaction<Hash, Ex>, provided: &HashMap<Tag, Hash>) -> Self {
pub fn new(
transaction: Transaction<Hash, Ex>,
provided: &HashMap<Tag, Hash>,
recently_pruned: &[HashSet<Tag>],
) -> Self {
let missing_tags = transaction.requires
.iter()
.filter(|tag| !provided.contains_key(&**tag))
.filter(|tag| {
// is true if the tag is already satisfied either via transaction in the pool
// or one that was recently included.
let is_provided = provided.contains_key(&**tag) || recently_pruned.iter().any(|x| x.contains(&**tag));
!is_provided
})
.cloned()
.collect();

Expand Down
69 changes: 68 additions & 1 deletion core/transaction-pool/graph/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,9 @@ mod tests {
use crate::watcher;

#[derive(Debug, Default)]
struct TestApi;
struct TestApi {
delay: Mutex<Option<std::sync::mpsc::Receiver<()>>>,
}

impl ChainApi for TestApi {
type Block = Block;
Expand All @@ -469,9 +471,20 @@ mod tests {

/// Verify extrinsic at given block.
fn validate_transaction(&self, at: &BlockId<Self::Block>, uxt: ExtrinsicFor<Self>) -> Result<TransactionValidity, Self::Error> {

let block_number = self.block_id_to_number(at)?.unwrap();
let nonce = uxt.transfer().nonce;

// This is used to control the test flow.
if nonce > 0 {
let opt = self.delay.lock().take();
if let Some(delay) = opt {
if delay.recv().is_err() {
println!("Error waiting for delay!");
}
}
}

if nonce < block_number {
Ok(TransactionValidity::Invalid(0))
} else {
Expand Down Expand Up @@ -878,5 +891,59 @@ mod tests {
assert_eq!(stream.next(), Some(Ok(watcher::Status::Ready)));
assert_eq!(stream.next(), Some(Ok(watcher::Status::Dropped)));
}

#[test]
fn should_handle_pruning_in_the_middle_of_import() {
let _ = env_logger::try_init();
// given
let (ready, is_ready) = std::sync::mpsc::sync_channel(0);
let (tx, rx) = std::sync::mpsc::sync_channel(1);
let mut api = TestApi::default();
api.delay = Mutex::new(rx.into());
let pool = Arc::new(Pool::new(Default::default(), api));

// when
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 5,
nonce: 1,
});

// This transaction should go to future, since we use `nonce: 1`
let pool2 = pool.clone();
std::thread::spawn(move || {
pool2.submit_one(&BlockId::Number(0), xt).unwrap();
ready.send(()).unwrap();
});

// But now before the previous one is imported we import
// the one that it depends on.
let xt = uxt(Transfer {
from: AccountId::from_h256(H256::from_low_u64_be(1)),
to: AccountId::from_h256(H256::from_low_u64_be(2)),
amount: 4,
nonce: 0,
});
// The tag the above transaction provides (TestApi is using just nonce as u8)
let provides = vec![0_u8];
pool.submit_one(&BlockId::Number(0), xt).unwrap();
assert_eq!(pool.status().ready, 1);

// Now block import happens before the second transaction is able to finish verification.
pool.prune_tags(&BlockId::Number(1), vec![provides], vec![]).unwrap();
assert_eq!(pool.status().ready, 0);


// so when we release the verification of the previous one it will have
// something in `requires`, but should go to ready directly, since the previous transaction was imported
// correctly.
tx.send(()).unwrap();

// then
is_ready.recv().unwrap(); // wait for finish
assert_eq!(pool.status().ready, 1);
assert_eq!(pool.status().future, 0);
}
}
}
18 changes: 9 additions & 9 deletions core/transaction-pool/graph/src/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,18 +517,18 @@ mod tests {
tx3.provides = vec![vec![4]];

// when
let x = WaitingTransaction::new(tx2, &ready.provided_tags());
let x = WaitingTransaction::new(tx2, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
let x = WaitingTransaction::new(tx3, &ready.provided_tags());
let x = WaitingTransaction::new(tx3, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
assert_eq!(ready.get().count(), 2);

// too low priority
let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags());
let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags(), &[]);
ready.import(x).unwrap_err();

tx1.priority = 10;
let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags());
let x = WaitingTransaction::new(tx1.clone(), &ready.provided_tags(), &[]);
ready.import(x).unwrap();

// then
Expand Down Expand Up @@ -562,15 +562,15 @@ mod tests {
};

// when
let x = WaitingTransaction::new(tx1, &ready.provided_tags());
let x = WaitingTransaction::new(tx1, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
let x = WaitingTransaction::new(tx2, &ready.provided_tags());
let x = WaitingTransaction::new(tx2, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
let x = WaitingTransaction::new(tx3, &ready.provided_tags());
let x = WaitingTransaction::new(tx3, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
let x = WaitingTransaction::new(tx4, &ready.provided_tags());
let x = WaitingTransaction::new(tx4, &ready.provided_tags(), &[]);
ready.import(x).unwrap();
let x = WaitingTransaction::new(tx5, &ready.provided_tags());
let x = WaitingTransaction::new(tx5, &ready.provided_tags(), &[]);
ready.import(x).unwrap();

// then
Expand Down

0 comments on commit c767d10

Please sign in to comment.