Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: record nodes for writes in memtrie #10841

Merged
merged 7 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/store/src/trie/mem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl MemTries {
pub fn update(
&self,
root: CryptoHash,
track_disk_changes: bool,
track_trie_changes: bool,
) -> Result<MemTrieUpdate, StorageError> {
let root_id = if root == CryptoHash::default() {
None
Expand All @@ -163,7 +163,7 @@ impl MemTries {
root_id,
&self.arena.memory(),
self.shard_uid.to_string(),
track_disk_changes,
track_trie_changes,
))
}
}
Expand Down
112 changes: 77 additions & 35 deletions core/store/src/trie/mem/updating.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use near_primitives::hash::{hash, CryptoHash};
use near_primitives::state::FlatStateValue;
use near_primitives::types::BlockHeight;
use std::collections::HashMap;
use std::sync::Arc;

/// An old node means a node in the current in-memory trie. An updated node means a
/// node we're going to store in the in-memory trie but have not constructed there yet.
Expand Down Expand Up @@ -43,6 +44,28 @@ pub enum UpdatedMemTrieNode {
},
}

/// Keeps values and internal nodes accessed on updating memtrie.
pub(crate) struct TrieAccesses {
/// Hashes and encoded trie nodes.
pub nodes: HashMap<CryptoHash, Arc<[u8]>>,
/// Hashes of accessed values - because values themselves are not
/// necessarily present in memtrie.
pub values: HashMap<CryptoHash, FlatStateValue>,
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't fully understand what are the additional nodes that this change records.

AFAIU runtime updates the trie by calling storage_write, which always does a trie read before writing a value:

let evicted_ptr = self.ext.storage_get(&key, StorageGetMode::Trie)?;

This trie read records all nodes that were accessed to reach this value, even in the case of memtries:

if let Some(recorder) = &self.recorder {

Doesn't that record everything that is needed to prove execution of the contract? Why do we need an additional access log?

Copy link
Contributor

@jancionear jancionear Mar 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I guess it doesn't record nodes that are created when adding new values...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this path is called only during contract execution. The gateway to update trie outside it is set<T: BorshSerialize>(state_update: &mut TrieUpdate, key: TrieKey, value: &T).


/// Tracks intermediate trie changes, final version of which is to be committed
/// to disk after finishing trie update.
struct TrieChangesTracker {
/// Changes of reference count on disk for each impacted node.
refcount_changes: TrieRefcountDeltaMap,
/// All observed values and internal nodes.
/// Needed to prepare recorded storage.
/// Note that negative `refcount_changes` does not fully cover it, as node
/// or value of the same hash can be removed and inserted for the same
/// update in different parts of trie!
accesses: TrieAccesses,
}

/// Structure to build an update to the in-memory trie.
pub struct MemTrieUpdate<'a> {
/// The original root before updates. It is None iff the original trie had no keys.
Expand All @@ -53,8 +76,9 @@ pub struct MemTrieUpdate<'a> {
/// (1) temporarily we take out the node from the slot to process it and put it back
/// later; or (2) the node is deleted afterwards.
pub updated_nodes: Vec<Option<UpdatedMemTrieNode>>,
/// Refcount changes to on-disk trie nodes.
pub trie_refcount_changes: Option<TrieRefcountDeltaMap>,
/// Tracks trie changes necessary to make on-disk updates and recorded
/// storage.
tracked_trie_changes: Option<TrieChangesTracker>,
}

impl UpdatedMemTrieNode {
Expand Down Expand Up @@ -97,15 +121,18 @@ impl<'a> MemTrieUpdate<'a> {
root: Option<MemTrieNodeId>,
arena: &'a ArenaMemory,
shard_uid: String,
track_disk_changes: bool,
track_trie_changes: bool,
) -> Self {
let mut trie_update = Self {
root,
arena,
shard_uid,
updated_nodes: vec![],
trie_refcount_changes: if track_disk_changes {
Some(TrieRefcountDeltaMap::new())
tracked_trie_changes: if track_trie_changes {
Some(TrieChangesTracker {
refcount_changes: TrieRefcountDeltaMap::new(),
accesses: TrieAccesses { nodes: HashMap::new(), values: HashMap::new() },
})
} else {
None
},
Expand Down Expand Up @@ -145,8 +172,16 @@ impl<'a> MemTrieUpdate<'a> {
match node {
None => self.new_updated_node(UpdatedMemTrieNode::Empty),
Some(node) => {
if let Some(trie_refcount_changes) = self.trie_refcount_changes.as_mut() {
trie_refcount_changes.subtract(node.as_ptr(self.arena).view().node_hash(), 1);
if let Some(tracked_trie_changes) = self.tracked_trie_changes.as_mut() {
let node_view = node.as_ptr(self.arena).view();
let node_hash = node_view.node_hash();
let raw_node_serialized =
borsh::to_vec(&node_view.to_raw_trie_node_with_size()).unwrap();
tracked_trie_changes
.accesses
.nodes
.insert(node_hash, raw_node_serialized.into());
tracked_trie_changes.refcount_changes.subtract(node_hash, 1);
}
self.new_updated_node(UpdatedMemTrieNode::from_existing_node_view(
node.as_ptr(self.arena).view(),
Expand All @@ -164,14 +199,16 @@ impl<'a> MemTrieUpdate<'a> {
}

fn add_refcount_to_value(&mut self, hash: CryptoHash, value: Option<Vec<u8>>) {
if let Some(trie_refcount_changes) = self.trie_refcount_changes.as_mut() {
trie_refcount_changes.add(hash, value.unwrap(), 1);
if let Some(tracked_node_changes) = self.tracked_trie_changes.as_mut() {
tracked_node_changes.refcount_changes.add(hash, value.unwrap(), 1);
}
}

fn subtract_refcount_for_value(&mut self, hash: CryptoHash) {
if let Some(trie_refcount_changes) = self.trie_refcount_changes.as_mut() {
trie_refcount_changes.subtract(hash, 1);
fn subtract_refcount_for_value(&mut self, value: FlatStateValue) {
if let Some(tracked_node_changes) = self.tracked_trie_changes.as_mut() {
let hash = value.to_value_ref().hash;
tracked_node_changes.accesses.values.insert(hash, value);
tracked_node_changes.refcount_changes.subtract(hash, 1);
}
}

Expand Down Expand Up @@ -219,7 +256,7 @@ impl<'a> MemTrieUpdate<'a> {
if partial.is_empty() {
// This branch node is exactly where the value should be added.
if let Some(value) = old_value {
self.subtract_refcount_for_value(value.to_value_ref().hash);
self.subtract_refcount_for_value(value);
}
self.place_node(
node_id,
Expand Down Expand Up @@ -250,7 +287,7 @@ impl<'a> MemTrieUpdate<'a> {
let common_prefix = partial.common_prefix(&existing_key);
if common_prefix == existing_key.len() && common_prefix == partial.len() {
// We're at the exact leaf. Rewrite the value at this leaf.
self.subtract_refcount_for_value(old_value.to_value_ref().hash);
self.subtract_refcount_for_value(old_value);
self.place_node(
node_id,
UpdatedMemTrieNode::Leaf { extension, value: flat_value },
Expand Down Expand Up @@ -389,7 +426,7 @@ impl<'a> MemTrieUpdate<'a> {
}
UpdatedMemTrieNode::Leaf { extension, value } => {
if NibbleSlice::from_encoded(&extension).0 == partial {
self.subtract_refcount_for_value(value.to_value_ref().hash);
self.subtract_refcount_for_value(value);
self.place_node(node_id, UpdatedMemTrieNode::Empty);
break;
} else {
Expand All @@ -408,7 +445,7 @@ impl<'a> MemTrieUpdate<'a> {
);
return;
};
self.subtract_refcount_for_value(value.unwrap().to_value_ref().hash);
self.subtract_refcount_for_value(value.unwrap());
self.place_node(
node_id,
UpdatedMemTrieNode::Branch { children: old_children, value: None },
Expand Down Expand Up @@ -779,31 +816,36 @@ impl<'a> MemTrieUpdate<'a> {
}

/// Converts the updates to trie changes as well as memtrie changes.
pub fn to_trie_changes(self) -> TrieChanges {
let Self { root, arena, shard_uid, trie_refcount_changes, updated_nodes } = self;
let mut trie_refcount_changes =
trie_refcount_changes.expect("Cannot to_trie_changes for memtrie changes only");
pub(crate) fn to_trie_changes(self) -> (TrieChanges, TrieAccesses) {
let Self { root, arena, shard_uid, tracked_trie_changes, updated_nodes } = self;
let TrieChangesTracker { mut refcount_changes, accesses } =
tracked_trie_changes.expect("Cannot to_trie_changes for memtrie changes only");
let (mem_trie_changes, hashes_and_serialized) =
Self::to_mem_trie_changes_internal(shard_uid, arena, updated_nodes);

// We've accounted for the dereferenced nodes, as well as value addition/subtractions.
// The only thing left is to increment refcount for all new nodes.
for (node_hash, node_serialized) in hashes_and_serialized {
trie_refcount_changes.add(node_hash, node_serialized, 1);
}
let (insertions, deletions) = trie_refcount_changes.into_changes();

TrieChanges {
old_root: root.map(|root| root.as_ptr(arena).view().node_hash()).unwrap_or_default(),
new_root: mem_trie_changes
.node_ids_with_hashes
.last()
.map(|(_, hash)| *hash)
.unwrap_or_default(),
insertions,
deletions,
mem_trie_changes: Some(mem_trie_changes),
refcount_changes.add(node_hash, node_serialized, 1);
}
let (insertions, deletions) = refcount_changes.into_changes();

(
TrieChanges {
old_root: root
.map(|root| root.as_ptr(arena).view().node_hash())
.unwrap_or_default(),
new_root: mem_trie_changes
.node_ids_with_hashes
.last()
.map(|(_, hash)| *hash)
.unwrap_or_default(),
insertions,
deletions,
mem_trie_changes: Some(mem_trie_changes),
},
accesses,
)
}
}

Expand Down Expand Up @@ -917,7 +959,7 @@ mod tests {
update.delete(&key);
}
}
update.to_trie_changes()
update.to_trie_changes().0
}

fn make_memtrie_changes_only(
Expand Down
39 changes: 38 additions & 1 deletion core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,44 @@ impl Trie {
None => trie_update.delete(&key),
}
}
Ok(trie_update.to_trie_changes())
let (trie_changes, trie_accesses) = trie_update.to_trie_changes();

// Sanity check for tests: all modified trie items must be
// present in ever accessed trie items.
#[cfg(test)]
{
for t in trie_changes.deletions.iter() {
let hash = t.trie_node_or_value_hash;
assert!(
trie_accesses.values.contains_key(&hash)
|| trie_accesses.nodes.contains_key(&hash),
"Hash {} is not present in trie accesses",
hash
);
}
}

// Retroactively record all accessed trie items which are
// required to process trie update but were not recorded at
// processing lookups.
// The main case is a branch with two children, one of which
// got removed, so we need to read another one and squash it
// together with parent.
if let Some(recorder) = &self.recorder {
for (node_hash, serialized_node) in trie_accesses.nodes {
recorder.borrow_mut().record(&node_hash, serialized_node);
}
for (value_hash, value) in trie_accesses.values {
let value = match value {
FlatStateValue::Ref(_) => {
self.storage.retrieve_raw_bytes(&value_hash)?
}
FlatStateValue::Inlined(value) => value.into(),
};
recorder.borrow_mut().record(&value_hash, value);
}
}
Ok(trie_changes)
}
None => {
let mut memory = NodesStorage::new();
Expand Down
Loading
Loading