Skip to content

Commit

Permalink
fix: record nodes for writes in memtrie (#10841)
Browse files Browse the repository at this point in the history
Fixes #10769.

## UPD: New motivation

While all said below is true, the main motivation is to account for
nodes impacted at `squash_nodes` which can be touched without much pain
only at `Trie::update`.

## Old motivation

The root cause of errors "node not found" on reads from recorded storage
is that nodes touched only at `Trie::update` - KV updates in the end of
chunk applictaion - were never recorded on memtrie.
It happens automatically on disk tries. For memtries we retroactively
account for trie node accesses, but this wasn't done in similar way for
memtrie updates.
While infrastructure indeed needs to be improved so these issues won't
surprise us anymore, the easiest way is to account for all accesses in
the same way as `move_node_to_mutable` does - on memtrie paths like
`ensure_updated` which move node out of storage to memory, which means
that node has to be recorded, so trie can be validated. Unfortunately
refcount changes are not enough to cover this, so I record trie accesses
separately. Also, memtrie doesn't have access to values, so I have to
retrieve values and record them in the `Trie::update`.

### Testing

* New logic is covered by trie_recording tests. I also discovered an
issue there - `destructively_delete_in_memory_state_from_disk` was
removing all keys because key decoding was incorrect. Now these tests
fully cover trie access scenario: first we have get&get_ref accesses,
and in the very end we have update call for some KV pairs. I confirmed
that for incorrect impl some tests fail because of the same "node not
found" error, and also that new sanity check on `Trie::update` works.
* test_memtrie_discrepancy is renamed and works now

---------

Co-authored-by: Longarithm <the.aleksandr.logunov@gmail.com>
  • Loading branch information
Longarithm and Longarithm authored Mar 25, 2024
1 parent 254698a commit 73b8827
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 45 deletions.
4 changes: 2 additions & 2 deletions core/store/src/trie/mem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl MemTries {
pub fn update(
&self,
root: CryptoHash,
track_disk_changes: bool,
track_trie_changes: bool,
) -> Result<MemTrieUpdate, StorageError> {
let root_id = if root == CryptoHash::default() {
None
Expand All @@ -163,7 +163,7 @@ impl MemTries {
root_id,
&self.arena.memory(),
self.shard_uid.to_string(),
track_disk_changes,
track_trie_changes,
))
}
}
Expand Down
112 changes: 77 additions & 35 deletions core/store/src/trie/mem/updating.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use near_primitives::hash::{hash, CryptoHash};
use near_primitives::state::FlatStateValue;
use near_primitives::types::BlockHeight;
use std::collections::HashMap;
use std::sync::Arc;

/// An old node means a node in the current in-memory trie. An updated node means a
/// node we're going to store in the in-memory trie but have not constructed there yet.
Expand Down Expand Up @@ -43,6 +44,28 @@ pub enum UpdatedMemTrieNode {
},
}

/// Keeps values and internal nodes accessed on updating memtrie.
pub(crate) struct TrieAccesses {
/// Hashes and encoded trie nodes.
pub nodes: HashMap<CryptoHash, Arc<[u8]>>,
/// Hashes of accessed values - because values themselves are not
/// necessarily present in memtrie.
pub values: HashMap<CryptoHash, FlatStateValue>,
}

/// Tracks intermediate trie changes, final version of which is to be committed
/// to disk after finishing trie update.
struct TrieChangesTracker {
/// Changes of reference count on disk for each impacted node.
refcount_changes: TrieRefcountDeltaMap,
/// All observed values and internal nodes.
/// Needed to prepare recorded storage.
/// Note that negative `refcount_changes` does not fully cover it, as node
/// or value of the same hash can be removed and inserted for the same
/// update in different parts of trie!
accesses: TrieAccesses,
}

/// Structure to build an update to the in-memory trie.
pub struct MemTrieUpdate<'a> {
/// The original root before updates. It is None iff the original trie had no keys.
Expand All @@ -53,8 +76,9 @@ pub struct MemTrieUpdate<'a> {
/// (1) temporarily we take out the node from the slot to process it and put it back
/// later; or (2) the node is deleted afterwards.
pub updated_nodes: Vec<Option<UpdatedMemTrieNode>>,
/// Refcount changes to on-disk trie nodes.
pub trie_refcount_changes: Option<TrieRefcountDeltaMap>,
/// Tracks trie changes necessary to make on-disk updates and recorded
/// storage.
tracked_trie_changes: Option<TrieChangesTracker>,
}

impl UpdatedMemTrieNode {
Expand Down Expand Up @@ -97,15 +121,18 @@ impl<'a> MemTrieUpdate<'a> {
root: Option<MemTrieNodeId>,
arena: &'a ArenaMemory,
shard_uid: String,
track_disk_changes: bool,
track_trie_changes: bool,
) -> Self {
let mut trie_update = Self {
root,
arena,
shard_uid,
updated_nodes: vec![],
trie_refcount_changes: if track_disk_changes {
Some(TrieRefcountDeltaMap::new())
tracked_trie_changes: if track_trie_changes {
Some(TrieChangesTracker {
refcount_changes: TrieRefcountDeltaMap::new(),
accesses: TrieAccesses { nodes: HashMap::new(), values: HashMap::new() },
})
} else {
None
},
Expand Down Expand Up @@ -145,8 +172,16 @@ impl<'a> MemTrieUpdate<'a> {
match node {
None => self.new_updated_node(UpdatedMemTrieNode::Empty),
Some(node) => {
if let Some(trie_refcount_changes) = self.trie_refcount_changes.as_mut() {
trie_refcount_changes.subtract(node.as_ptr(self.arena).view().node_hash(), 1);
if let Some(tracked_trie_changes) = self.tracked_trie_changes.as_mut() {
let node_view = node.as_ptr(self.arena).view();
let node_hash = node_view.node_hash();
let raw_node_serialized =
borsh::to_vec(&node_view.to_raw_trie_node_with_size()).unwrap();
tracked_trie_changes
.accesses
.nodes
.insert(node_hash, raw_node_serialized.into());
tracked_trie_changes.refcount_changes.subtract(node_hash, 1);
}
self.new_updated_node(UpdatedMemTrieNode::from_existing_node_view(
node.as_ptr(self.arena).view(),
Expand All @@ -164,14 +199,16 @@ impl<'a> MemTrieUpdate<'a> {
}

fn add_refcount_to_value(&mut self, hash: CryptoHash, value: Option<Vec<u8>>) {
if let Some(trie_refcount_changes) = self.trie_refcount_changes.as_mut() {
trie_refcount_changes.add(hash, value.unwrap(), 1);
if let Some(tracked_node_changes) = self.tracked_trie_changes.as_mut() {
tracked_node_changes.refcount_changes.add(hash, value.unwrap(), 1);
}
}

fn subtract_refcount_for_value(&mut self, hash: CryptoHash) {
if let Some(trie_refcount_changes) = self.trie_refcount_changes.as_mut() {
trie_refcount_changes.subtract(hash, 1);
fn subtract_refcount_for_value(&mut self, value: FlatStateValue) {
if let Some(tracked_node_changes) = self.tracked_trie_changes.as_mut() {
let hash = value.to_value_ref().hash;
tracked_node_changes.accesses.values.insert(hash, value);
tracked_node_changes.refcount_changes.subtract(hash, 1);
}
}

Expand Down Expand Up @@ -219,7 +256,7 @@ impl<'a> MemTrieUpdate<'a> {
if partial.is_empty() {
// This branch node is exactly where the value should be added.
if let Some(value) = old_value {
self.subtract_refcount_for_value(value.to_value_ref().hash);
self.subtract_refcount_for_value(value);
}
self.place_node(
node_id,
Expand Down Expand Up @@ -250,7 +287,7 @@ impl<'a> MemTrieUpdate<'a> {
let common_prefix = partial.common_prefix(&existing_key);
if common_prefix == existing_key.len() && common_prefix == partial.len() {
// We're at the exact leaf. Rewrite the value at this leaf.
self.subtract_refcount_for_value(old_value.to_value_ref().hash);
self.subtract_refcount_for_value(old_value);
self.place_node(
node_id,
UpdatedMemTrieNode::Leaf { extension, value: flat_value },
Expand Down Expand Up @@ -389,7 +426,7 @@ impl<'a> MemTrieUpdate<'a> {
}
UpdatedMemTrieNode::Leaf { extension, value } => {
if NibbleSlice::from_encoded(&extension).0 == partial {
self.subtract_refcount_for_value(value.to_value_ref().hash);
self.subtract_refcount_for_value(value);
self.place_node(node_id, UpdatedMemTrieNode::Empty);
break;
} else {
Expand All @@ -408,7 +445,7 @@ impl<'a> MemTrieUpdate<'a> {
);
return;
};
self.subtract_refcount_for_value(value.unwrap().to_value_ref().hash);
self.subtract_refcount_for_value(value.unwrap());
self.place_node(
node_id,
UpdatedMemTrieNode::Branch { children: old_children, value: None },
Expand Down Expand Up @@ -779,31 +816,36 @@ impl<'a> MemTrieUpdate<'a> {
}

/// Converts the updates to trie changes as well as memtrie changes.
pub fn to_trie_changes(self) -> TrieChanges {
let Self { root, arena, shard_uid, trie_refcount_changes, updated_nodes } = self;
let mut trie_refcount_changes =
trie_refcount_changes.expect("Cannot to_trie_changes for memtrie changes only");
pub(crate) fn to_trie_changes(self) -> (TrieChanges, TrieAccesses) {
let Self { root, arena, shard_uid, tracked_trie_changes, updated_nodes } = self;
let TrieChangesTracker { mut refcount_changes, accesses } =
tracked_trie_changes.expect("Cannot to_trie_changes for memtrie changes only");
let (mem_trie_changes, hashes_and_serialized) =
Self::to_mem_trie_changes_internal(shard_uid, arena, updated_nodes);

// We've accounted for the dereferenced nodes, as well as value addition/subtractions.
// The only thing left is to increment refcount for all new nodes.
for (node_hash, node_serialized) in hashes_and_serialized {
trie_refcount_changes.add(node_hash, node_serialized, 1);
}
let (insertions, deletions) = trie_refcount_changes.into_changes();

TrieChanges {
old_root: root.map(|root| root.as_ptr(arena).view().node_hash()).unwrap_or_default(),
new_root: mem_trie_changes
.node_ids_with_hashes
.last()
.map(|(_, hash)| *hash)
.unwrap_or_default(),
insertions,
deletions,
mem_trie_changes: Some(mem_trie_changes),
refcount_changes.add(node_hash, node_serialized, 1);
}
let (insertions, deletions) = refcount_changes.into_changes();

(
TrieChanges {
old_root: root
.map(|root| root.as_ptr(arena).view().node_hash())
.unwrap_or_default(),
new_root: mem_trie_changes
.node_ids_with_hashes
.last()
.map(|(_, hash)| *hash)
.unwrap_or_default(),
insertions,
deletions,
mem_trie_changes: Some(mem_trie_changes),
},
accesses,
)
}
}

Expand Down Expand Up @@ -917,7 +959,7 @@ mod tests {
update.delete(&key);
}
}
update.to_trie_changes()
update.to_trie_changes().0
}

fn make_memtrie_changes_only(
Expand Down
39 changes: 38 additions & 1 deletion core/store/src/trie/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,44 @@ impl Trie {
None => trie_update.delete(&key),
}
}
Ok(trie_update.to_trie_changes())
let (trie_changes, trie_accesses) = trie_update.to_trie_changes();

// Sanity check for tests: all modified trie items must be
// present in ever accessed trie items.
#[cfg(test)]
{
for t in trie_changes.deletions.iter() {
let hash = t.trie_node_or_value_hash;
assert!(
trie_accesses.values.contains_key(&hash)
|| trie_accesses.nodes.contains_key(&hash),
"Hash {} is not present in trie accesses",
hash
);
}
}

// Retroactively record all accessed trie items which are
// required to process trie update but were not recorded at
// processing lookups.
// The main case is a branch with two children, one of which
// got removed, so we need to read another one and squash it
// together with parent.
if let Some(recorder) = &self.recorder {
for (node_hash, serialized_node) in trie_accesses.nodes {
recorder.borrow_mut().record(&node_hash, serialized_node);
}
for (value_hash, value) in trie_accesses.values {
let value = match value {
FlatStateValue::Ref(_) => {
self.storage.retrieve_raw_bytes(&value_hash)?
}
FlatStateValue::Inlined(value) => value.into(),
};
recorder.borrow_mut().record(&value_hash, value);
}
}
Ok(trie_changes)
}
None => {
let mut memory = NodesStorage::new();
Expand Down
Loading

0 comments on commit 73b8827

Please sign in to comment.