Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Revalidate transactions only on latest best block #6824

Merged
merged 5 commits into from
Aug 7, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ mod tests {
use sp_consensus::ImportedAux;
use sp_inherents::InherentDataProviders;
use sc_basic_authorship::ProposerFactory;
use sc_client_api::BlockBackend;

fn api() -> Arc<TestApi> {
Arc::new(TestApi::empty())
Expand Down Expand Up @@ -415,8 +416,8 @@ mod tests {
}
}
);
// assert that there's a new block in the db.
assert!(client.header(&BlockId::Number(0)).unwrap().is_some());
let block = client.block(&BlockId::Number(1)).unwrap().unwrap().block;
pool_api.add_block(block, true);
assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 1)).await.is_ok());

let header = client.header(&BlockId::Number(1)).expect("db error").expect("imported above");
Expand All @@ -438,10 +439,11 @@ mod tests {
rx1.await.expect("should be no error receiving"),
Ok(_)
);
assert!(client.header(&BlockId::Number(1)).unwrap().is_some());
let block = client.block(&BlockId::Number(2)).unwrap().unwrap().block;
pool_api.add_block(block, true);
pool_api.increment_nonce(Alice.into());

assert!(pool.submit_one(&BlockId::Number(2), SOURCE, uxt(Alice, 2)).await.is_ok());
assert!(pool.submit_one(&BlockId::Number(1), SOURCE, uxt(Alice, 2)).await.is_ok());
let (tx2, rx2) = futures::channel::oneshot::channel();
assert!(sink.send(EngineCommand::SealNewBlock {
parent_hash: Some(created_block.hash),
Expand Down
15 changes: 11 additions & 4 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,10 +465,11 @@ impl<N: Clone + Copy + AtLeast32Bit> RevalidationStrategy<N> {
block: N,
revalidate_time_period: Option<std::time::Duration>,
revalidate_block_period: Option<N>,
is_best_block: bool,
) -> RevalidationAction {
match self {
Self::Light(status) => RevalidationAction {
revalidate: status.next_required(
revalidate: is_best_block && status.next_required(
block,
revalidate_time_period,
revalidate_block_period,
Expand Down Expand Up @@ -570,11 +571,17 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
block_number,
Some(std::time::Duration::from_secs(60)),
Some(20.into()),
is_new_best,
tomusdrw marked this conversation as resolved.
Show resolved Hide resolved
);
let revalidation_strategy = self.revalidation_strategy.clone();
let revalidation_queue = self.revalidation_queue.clone();
let ready_poll = self.ready_poll.clone();
let metrics = self.metrics.clone();
let best_block = if is_new_best {
Some(id)
} else {
None
};

async move {
// We keep track of everything we prune so that later we won't add
Expand Down Expand Up @@ -689,10 +696,10 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
.ready()
.map(|tx| tx.hash.clone())
.collect();
revalidation_queue.revalidate_later(block_number, hashes).await;
}
revalidation_queue.revalidate_later(block_number, best_block, hashes).await;

revalidation_strategy.lock().clear();
revalidation_strategy.lock().clear();
}
}.boxed()
}
ChainEvent::Finalized { hash } => {
Expand Down
59 changes: 40 additions & 19 deletions client/transaction-pool/src/revalidation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ const MIN_BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;

/// Payload from queue to worker.
struct WorkerPayload<Api: ChainApi> {
/// The last known best_block.
best_block: Option<BlockId<Api::Block>>,
tomusdrw marked this conversation as resolved.
Show resolved Hide resolved
at: NumberFor<Api>,
transactions: Vec<ExtrinsicHash<Api>>,
}
Expand All @@ -48,7 +50,7 @@ struct WorkerPayload<Api: ChainApi> {
struct RevalidationWorker<Api: ChainApi> {
api: Arc<Api>,
pool: Arc<Pool<Api>>,
best_block: NumberFor<Api>,
best_block: BlockId<Api::Block>,
block_ordered: BTreeMap<NumberFor<Api>, HashSet<ExtrinsicHash<Api>>>,
members: HashMap<ExtrinsicHash<Api>, NumberFor<Api>>,
}
Expand All @@ -62,16 +64,29 @@ impl<Api: ChainApi> Unpin for RevalidationWorker<Api> {}
async fn batch_revalidate<Api: ChainApi>(
pool: Arc<Pool<Api>>,
api: Arc<Api>,
at: NumberFor<Api>,
at: BlockId<Api::Block>,
batch: impl IntoIterator<Item=ExtrinsicHash<Api>>,
) {
let mut invalid_hashes = Vec::new();
let mut revalidated = HashMap::new();

let block_number = match api.block_id_to_number(&at) {
Ok(Some(n)) => n,
_ => {
log::warn!(
target: "txpool",
"Failed to get block number of `{:?}`, aborting revalidation.",
at,
);

return;
}
};

let validation_results = futures::future::join_all(
batch.into_iter().filter_map(|ext_hash| {
pool.validated_pool().ready_by_hash(&ext_hash).map(|ext| {
api.validate_transaction(&BlockId::Number(at), ext.source, ext.data.clone())
api.validate_transaction(&at, ext.source, ext.data.clone())
.map(move |validation_result| (validation_result, ext_hash, ext))
})
})
Expand All @@ -92,7 +107,7 @@ async fn batch_revalidate<Api: ChainApi>(
revalidated.insert(
ext_hash.clone(),
ValidatedTransaction::valid_at(
at.saturated_into::<u64>(),
block_number.saturated_into::<u64>(),
ext_hash,
ext.source,
ext.data.clone(),
Expand Down Expand Up @@ -129,7 +144,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
pool,
block_ordered: Default::default(),
members: Default::default(),
best_block: Zero::zero(),
best_block: BlockId::Number(Zero::zero()),
}
}

Expand Down Expand Up @@ -211,8 +226,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
mut self,
from_queue: TracingUnboundedReceiver<WorkerPayload<Api>>,
interval: R,
) where R: Send, R::Guard: Send
{
) where R: Send, R::Guard: Send {
let interval = interval.into_stream().fuse();
let from_queue = from_queue.fuse();
futures::pin_mut!(interval, from_queue);
Expand Down Expand Up @@ -247,13 +261,15 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
workload = from_queue.next() => {
match workload {
Some(worker_payload) => {
this.best_block = worker_payload.at;
if let Some(at) = worker_payload.best_block {
this.best_block = at;
}
this.push(worker_payload);

if this.members.len() > 0 {
log::debug!(
target: "txpool",
"Updated revalidation queue at {}. Transactions: {:?}",
"Updated revalidation queue at {:?}. Transactions: {:?}",
this.best_block,
this.members,
);
Expand Down Expand Up @@ -298,9 +314,7 @@ where
api: Arc<Api>,
pool: Arc<Pool<Api>>,
interval: R,
) -> (Self, Pin<Box<dyn Future<Output=()> + Send>>)
where R: Send + 'static, R::Guard: Send
{
) -> (Self, Pin<Box<dyn Future<Output=()> + Send>>) where R: Send + 'static, R::Guard: Send {
let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue");

let worker = RevalidationWorker::new(api.clone(), pool.clone());
Expand Down Expand Up @@ -338,20 +352,27 @@ where
/// If queue configured with background worker, this will return immediately.
/// If queue configured without background worker, this will resolve after
/// revalidation is actually done.
pub async fn revalidate_later(&self, at: NumberFor<Api>, transactions: Vec<ExtrinsicHash<Api>>) {
pub async fn revalidate_later(
&self,
at: NumberFor<Api>,
best_block: Option<BlockId<Api::Block>>,
transactions: Vec<ExtrinsicHash<Api>>,
) {
if transactions.len() > 0 {
log::debug!(target: "txpool", "Sent {} transactions to revalidation queue", transactions.len());
log::debug!(
target: "txpool", "Sent {} transactions to revalidation queue",
transactions.len(),
);
}

if let Some(ref to_worker) = self.background {
if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions, best_block }) {
log::warn!(target: "txpool", "Failed to update background worker: {:?}", e);
}
return;
} else {
} else if let Some(best_block) = best_block {
let pool = self.pool.clone();
let api = self.api.clone();
batch_revalidate(pool, api, at, transactions).await
batch_revalidate(pool, api, best_block, transactions).await
}
}
}
Expand Down Expand Up @@ -382,7 +403,7 @@ mod tests {
pool.submit_one(&BlockId::number(0), TransactionSource::External, uxt.clone())
).expect("Should be valid");

block_on(queue.revalidate_later(0, vec![uxt_hash]));
block_on(queue.revalidate_later(0, Some(BlockId::Number(0)), vec![uxt_hash]));

// revalidated in sync offload 2nd time
assert_eq!(api.validation_requests().len(), 2);
Expand Down
Loading