Skip to content

Commit

Permalink
runtime: prefetch account keys and data for chunk finalization (#10935)
Browse files Browse the repository at this point in the history
This significantly reduces the cost of finalize, especially for shard 3
where they were the largest:


![before](https://github.com/near/nearcore/assets/679122/c5b3fb26-3582-43f5-8935-56b8da98acad)

![after](https://github.com/near/nearcore/assets/679122/d3d8ed3c-3423-4a9e-86db-5f8538843a2f)

The effect on the overall apply duration is not as high however:


![image](https://github.com/near/nearcore/assets/679122/4844ad73-b11e-41f0-b1ea-765b1c475b3d)

![image](https://github.com/near/nearcore/assets/679122/548da67d-c761-4adc-99b4-a4a8cc6e408f)

This is understandable, as ultimately all of this data that we were
seeing in `finalize` still needs to be retrieved. The important part,
however is that the duration of this is much more consistent and less
jumpy now.

cc @saketh-are keep an eye out on whether we need to prefetch receipt
data for yield-resume receipts.

---------

Co-authored-by: Bowen Wang <bowen@near.org>
Co-authored-by: Bowen Wang <bowenwang1996@users.noreply.github.com>
  • Loading branch information
3 people authored Apr 5, 2024
1 parent f42345a commit 75f801d
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 12 deletions.
8 changes: 6 additions & 2 deletions core/store/src/trie/prefetching_trie_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,8 +494,12 @@ impl PrefetchApi {
///
/// Queued up work will not be finished. But trie keys that are already
/// being fetched will finish.
pub fn clear_queue(&self) {
while let Ok(_dropped) = self.work_queue_rx.try_recv() {}
pub fn clear_queue(&self) -> usize {
let mut count = 0;
while let Ok(_dropped) = self.work_queue_rx.try_recv() {
count += 1;
}
count
}

/// Clear prefetched staging area from data that has not been picked up by the main thread.
Expand Down
22 changes: 14 additions & 8 deletions runtime/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1466,7 +1466,6 @@ impl Runtime {

// We first process local receipts. They contain staking, local contract calls, etc.
if let Some(prefetcher) = &mut prefetcher {
prefetcher.clear();
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_receipts_data(&local_receipts);
}
Expand Down Expand Up @@ -1501,7 +1500,6 @@ impl Runtime {
})?;

if let Some(prefetcher) = &mut prefetcher {
prefetcher.clear();
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_receipts_data(std::slice::from_ref(&receipt));
}
Expand Down Expand Up @@ -1529,7 +1527,6 @@ impl Runtime {

// And then we process the new incoming receipts. These are receipts from other shards.
if let Some(prefetcher) = &mut prefetcher {
prefetcher.clear();
// Prefetcher is allowed to fail
_ = prefetcher.prefetch_receipts_data(&incoming_receipts);
}
Expand All @@ -1553,11 +1550,6 @@ impl Runtime {
}
metrics.incoming_receipts_done(total.gas, total.compute);

// No more receipts are executed on this trie, stop any pending prefetches on it.
if let Some(prefetcher) = &prefetcher {
prefetcher.clear();
}

// Resolve timed-out PromiseYield receipts
let mut promise_yield_indices: PromiseYieldIndices =
get(&state_update, &TrieKey::PromiseYieldIndices)?.unwrap_or_default();
Expand Down Expand Up @@ -1655,6 +1647,20 @@ impl Runtime {
state_update.commit(StateChangeCause::UpdatedDelayedReceipts);
self.apply_state_patch(&mut state_update, state_patch);
let (trie, trie_changes, state_changes) = state_update.finalize()?;
if let Some(prefetcher) = &prefetcher {
// Only clear the prefetcher queue after finalize is done because as part of receipt
// processing we also prefetch account data and access keys that are accessed in
// finalize. This data can take a very long time otherwise if not prefetched.
//
// (This probably results in more data being accessed than strictly necessary and
// prefetcher may touch data that is no longer relevant as a result but...)
//
// In the future it may make sense to have prefetcher have a mode where it has two
// queues: one for data that is going to be required soon, and the other that it would
// only work when otherwise idle.
let discarded_prefetch_requests = prefetcher.clear();
tracing::debug!(target: "runtime", discarded_prefetch_requests);
}

// Dedup proposals from the same account.
// The order is deterministically changed.
Expand Down
48 changes: 46 additions & 2 deletions runtime/runtime/src/prefetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl TriePrefetcher {
receipts: &[Receipt],
) -> Result<(), PrefetchError> {
for receipt in receipts.iter() {
let is_refund = receipt.predecessor_id.is_system();
match &receipt.receipt {
ReceiptEnum::Action(action_receipt) | ReceiptEnum::PromiseYield(action_receipt) => {
let account_id = receipt.receiver_id.clone();
Expand All @@ -99,6 +100,45 @@ impl TriePrefetcher {
if self.prefetch_api.enable_receipt_prefetching {
let trie_key = TrieKey::Account { account_id: account_id.clone() };
self.prefetch_trie_key(trie_key)?;
if is_refund {
let trie_key = TrieKey::AccessKey {
account_id: account_id.clone(),
public_key: action_receipt.signer_public_key.clone(),
};
self.prefetch_trie_key(trie_key)?;
}
for action in &action_receipt.actions {
match action {
Action::Delegate(delegate_action) => {
let trie_key = TrieKey::AccessKey {
account_id: delegate_action
.delegate_action
.sender_id
.clone(),
public_key: delegate_action
.delegate_action
.public_key
.clone(),
};
self.prefetch_trie_key(trie_key)?;
}
Action::AddKey(add_key_action) => {
let trie_key = TrieKey::AccessKey {
account_id: account_id.clone(),
public_key: add_key_action.public_key.clone(),
};
self.prefetch_trie_key(trie_key)?;
}
Action::DeleteKey(delete_key_action) => {
let trie_key = TrieKey::AccessKey {
account_id: account_id.clone(),
public_key: delete_key_action.public_key.clone(),
};
self.prefetch_trie_key(trie_key)?;
}
_ => {}
}
}
}

// SWEAT specific argument prefetcher
Expand Down Expand Up @@ -164,9 +204,13 @@ impl TriePrefetcher {
/// at the same time. They share a prefetcher, so they will clean each others
/// data. Handling this is a bit more involved. Failing to do so makes prefetching
/// less effective in those cases but crucially nothing breaks.
pub(crate) fn clear(&self) {
self.prefetch_api.clear_queue();
///
/// Returns the number of prefetch requests that have been removed from the prefetch queue.
/// If this number is large, the prefetches aren't actually getting executed before cancelling.
pub(crate) fn clear(&self) -> usize {
let ret = self.prefetch_api.clear_queue();
self.prefetch_api.clear_data();
ret
}

fn prefetch_trie_key(&self, trie_key: TrieKey) -> Result<(), PrefetchError> {
Expand Down

0 comments on commit 75f801d

Please sign in to comment.