Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

pallet-mmr: fix offchain db for sync from zero #12498

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 60 additions & 26 deletions frame/merkle-mountain-range/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@
#![cfg_attr(not(feature = "std"), no_std)]

use codec::Encode;
use frame_support::weights::Weight;
use frame_support::{traits::Get, weights::Weight};
use sp_runtime::{
traits::{self, CheckedSub, One, Saturating},
traits::{self, CheckedSub, One, Saturating, UniqueSaturatedInto},
SaturatedConversion,
};

Expand Down Expand Up @@ -103,6 +103,15 @@ pub trait WeightInfo {
fn on_initialize(peaks: NodeIndex) -> Weight;
}

/// A MMR specific to the pallet.
type ModuleMmr<StorageType, T, I> = mmr::Mmr<StorageType, T, I, LeafOf<T, I>>;

/// Leaf data.
type LeafOf<T, I> = <<T as Config<I>>::LeafData as primitives::LeafDataProvider>::LeafData;

/// Hashing used for the pallet.
pub(crate) type HashingOf<T, I> = <T as Config<I>>::Hashing;

#[frame_support::pallet]
pub mod pallet {
use super::*;
Expand Down Expand Up @@ -214,10 +223,19 @@ pub mod pallet {
let data = T::LeafData::leaf_data();
// append new leaf to MMR
let mut mmr: ModuleMmr<mmr::storage::RuntimeStorage, T, I> = mmr::Mmr::new(leaves);
mmr.push(data).expect("MMR push never fails.");

// update the size
let (leaves, root) = mmr.finalize().expect("MMR finalize never fails.");
// MMR push never fails.
let _ = mmr.push(data);
acatangiu marked this conversation as resolved.
Show resolved Hide resolved

// Update the size.
let (leaves, root) = match mmr.finalize() {
Ok((leaves, root)) => (leaves, root),
Err(e) => {
frame_support::log::error!(
target: "runtime::mmr", "Could not finalize MMR for new block: {:?}", e,
);
return T::WeightInfo::on_initialize(peaks_before)
},
};
<T::OnNewRoot as primitives::OnNewRoot<_>>::on_new_root(&root);

<NumberOfLeaves<T, I>>::put(leaves);
Expand All @@ -230,37 +248,42 @@ pub mod pallet {

fn offchain_worker(n: T::BlockNumber) {
use mmr::storage::{OffchainStorage, Storage};
// MMR pallet uses offchain storage to hold full MMR and leaves.
// The leaves are saved under fork-unique keys `(parent_hash, pos)`.
// MMR Runtime depends on `frame_system::block_hash(block_num)` mappings to find
// parent hashes for particular nodes or leaves.
// This MMR offchain worker function moves a rolling window of the same size
// as `frame_system::block_hash` map, where nodes/leaves added by blocks that are just
// The MMR nodes can be found in offchain db under either:
// - fork-unique keys `(prefix, parent_hash, pos)`, or,
// - "canonical" keys `(prefix, pos)`,
// depending on how many blocks in the past the node at position `pos` was
// added to the MMR.
//
// For the fork-unique keys, the MMR pallet depends on
// `frame_system::block_hash(parent_num)` mappings to find the relevant parent block
// hashes, so it is limited by `frame_system::BlockHashCount` in terms of how many
// historical forks it can track. Nodes added to MMR by block `N` can be found in
// offchain db at:
// - fork-unique keys `(prefix, parent_hash, pos)` when (`N` >= `latest_block` -
// `frame_system::BlockHashCount`);
// - "canonical" keys `(prefix, pos)` when (`N` < `latest_block` -
// `frame_system::BlockHashCount`);
//
// The offchain worker is responsible for maintaining the nodes' positions in
// offchain db as the chain progresses by moving a rolling window of the same size as
// `frame_system::block_hash` map, where nodes/leaves added by blocks that are just
// about to exit the window are "canonicalized" so that their offchain key no longer
// depends on `parent_hash` therefore on access to `frame_system::block_hash`.
// depends on `parent_hash`.
//
// This approach works to eliminate fork-induced leaf collisions in offchain db,
// under the assumption that no fork will be deeper than `frame_system::BlockHashCount`
// blocks (2400 blocks on Polkadot, Kusama, Rococo, etc):
// entries pertaining to block `N` where `N < current-2400` are moved to a key based
// solely on block number. The only way to have collisions is if two competing forks
// are deeper than 2400 blocks and they both "canonicalize" their view of block `N`.
// blocks:
// entries pertaining to block `N` where `N < current-BlockHashCount` are moved to a
// key based solely on block number. The only way to have collisions is if two
// competing forks are deeper than `frame_system::BlockHashCount` blocks and they
// both "canonicalize" their view of block `N`
// Once a block is canonicalized, all MMR entries pertaining to sibling blocks from
// other forks are pruned from offchain db.
Storage::<OffchainStorage, T, I, LeafOf<T, I>>::canonicalize_and_prune(n);
}
}
}

/// A MMR specific to the pallet.
type ModuleMmr<StorageType, T, I> = mmr::Mmr<StorageType, T, I, LeafOf<T, I>>;

/// Leaf data.
type LeafOf<T, I> = <<T as Config<I>>::LeafData as primitives::LeafDataProvider>::LeafData;

/// Hashing used for the pallet.
pub(crate) type HashingOf<T, I> = <T as Config<I>>::Hashing;

/// Stateless MMR proof verification for batch of leaves.
///
/// This function can be used to verify received MMR [primitives::BatchProof] (`proof`)
Expand Down Expand Up @@ -303,6 +326,17 @@ impl<T: Config<I>, I: 'static> Pallet<T, I> {
(T::INDEXING_PREFIX, pos).encode()
}

/// Return size of rolling window of leaves saved in offchain under fork-unique keys.
///
/// Leaves outside this window are canonicalized.
/// Window size is `frame_system::BlockHashCount - 1` to make sure fork-unique keys
/// can be built using `frame_system::block_hash` map.
fn offchain_canonicalization_window() -> LeafIndex {
let window_size: LeafIndex =
<T as frame_system::Config>::BlockHashCount::get().unique_saturated_into();
window_size.saturating_sub(1)
}

/// Provide the parent number for the block that added `leaf_index` to the MMR.
fn leaf_index_to_parent_block_num(
leaf_index: LeafIndex,
Expand Down
91 changes: 52 additions & 39 deletions frame/merkle-mountain-range/src/mmr/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
//! A MMR storage implementations.

use codec::Encode;
use frame_support::traits::Get;
use frame_support::log::{debug, error, trace};
use mmr_lib::helper;
use sp_core::offchain::StorageKind;
use sp_io::{offchain, offchain_index};
use sp_runtime::traits::UniqueSaturatedInto;
use sp_std::iter::Peekable;
#[cfg(not(feature = "std"))]
use sp_std::prelude::*;
Expand Down Expand Up @@ -133,23 +132,22 @@ where
// Effectively move a rolling window of fork-unique leaves. Once out of the window, leaves
// are "canonicalized" in offchain by moving them under `Pallet::node_canon_offchain_key`.
let leaves = NumberOfLeaves::<T, I>::get();
let window_size =
<T as frame_system::Config>::BlockHashCount::get().unique_saturated_into();
let window_size = Pallet::<T, I>::offchain_canonicalization_window();
acatangiu marked this conversation as resolved.
Show resolved Hide resolved
if leaves >= window_size {
// Move the rolling window towards the end of `block_num->hash` mappings available
// in the runtime: we "canonicalize" the leaf at the end,
let to_canon_leaf = leaves.saturating_sub(window_size);
// and all the nodes added by that leaf.
let to_canon_nodes = NodesUtils::right_branch_ending_in_leaf(to_canon_leaf);
frame_support::log::debug!(
debug!(
target: "runtime::mmr::offchain", "Nodes to canon for leaf {}: {:?}",
to_canon_leaf, to_canon_nodes
);
// For this block number there may be node entries saved from multiple forks.
let to_canon_block_num =
Pallet::<T, I>::leaf_index_to_parent_block_num(to_canon_leaf, leaves);
// Only entries under this hash (retrieved from state on current canon fork) are to be
// persisted. All other entries added by same block number will be cleared.
// persisted. All entries added by same block number on other forks will be cleared.
let to_canon_hash = <frame_system::Pallet<T>>::block_hash(to_canon_block_num);

Self::canonicalize_nodes_for_hash(&to_canon_nodes, to_canon_hash);
Expand All @@ -159,7 +157,7 @@ where
Self::prune_nodes_for_forks(&to_canon_nodes, forks);
})
.unwrap_or_else(|| {
frame_support::log::error!(
error!(
target: "runtime::mmr::offchain",
"Offchain: could not prune: no entry in pruning map for block {:?}",
to_canon_block_num
Expand All @@ -172,7 +170,7 @@ where
for hash in forks {
for pos in nodes {
let key = Pallet::<T, I>::node_offchain_key(hash, *pos);
frame_support::log::debug!(
debug!(
target: "runtime::mmr::offchain",
"Clear elem at pos {} with key {:?}",
pos, key
Expand All @@ -193,13 +191,13 @@ where
let canon_key = Pallet::<T, I>::node_canon_offchain_key(*pos);
// Add under new canon key.
offchain::local_storage_set(StorageKind::PERSISTENT, &canon_key, &elem);
frame_support::log::debug!(
debug!(
target: "runtime::mmr::offchain",
"Moved elem at pos {} from key {:?} to canon key {:?}",
pos, key, canon_key
);
} else {
frame_support::log::error!(
error!(
target: "runtime::mmr::offchain",
"Could not canonicalize elem at pos {} using key {:?}",
pos, key
Expand All @@ -220,21 +218,18 @@ where
// Find out which leaf added node `pos` in the MMR.
let ancestor_leaf_idx = NodesUtils::leaf_index_that_added_node(pos);

let window_size =
<T as frame_system::Config>::BlockHashCount::get().unique_saturated_into();
let window_size = Pallet::<T, I>::offchain_canonicalization_window();
// Leaves older than this window should have been canonicalized.
if leaves.saturating_sub(ancestor_leaf_idx) > window_size {
let key = Pallet::<T, I>::node_canon_offchain_key(pos);
frame_support::log::debug!(
debug!(
target: "runtime::mmr::offchain", "offchain db get {}: leaf idx {:?}, key {:?}",
pos, ancestor_leaf_idx, key
);
// Just for safety, to easily handle runtime upgrades where any of the window params
// change and maybe we mess up storage migration,
// return _if and only if_ node is found (in normal conditions it's always found),
if let Some(elem) =
sp_io::offchain::local_storage_get(sp_core::offchain::StorageKind::PERSISTENT, &key)
{
if let Some(elem) = sp_io::offchain::local_storage_get(StorageKind::PERSISTENT, &key) {
return Ok(codec::Decode::decode(&mut &*elem).ok())
}
// BUT if we DID MESS UP, fall through to searching node using fork-specific key.
Expand All @@ -245,19 +240,19 @@ where
Pallet::<T, I>::leaf_index_to_parent_block_num(ancestor_leaf_idx, leaves);
let ancestor_parent_hash = <frame_system::Pallet<T>>::block_hash(ancestor_parent_block_num);
let key = Pallet::<T, I>::node_offchain_key(ancestor_parent_hash, pos);
frame_support::log::debug!(
debug!(
target: "runtime::mmr::offchain", "offchain db get {}: leaf idx {:?}, hash {:?}, key {:?}",
pos, ancestor_leaf_idx, ancestor_parent_hash, key
);
// Retrieve the element from Off-chain DB.
Ok(sp_io::offchain::local_storage_get(sp_core::offchain::StorageKind::PERSISTENT, &key)
Ok(sp_io::offchain::local_storage_get(StorageKind::PERSISTENT, &key)
.or_else(|| {
// Again, this is just us being extra paranoid.
// We get here only if we mess up a storage migration for a runtime upgrades where
// say the window is increased, and for a little while following the upgrade there's
// leaves inside new 'window' that had been already canonicalized before upgrade.
let key = Pallet::<T, I>::node_canon_offchain_key(pos);
sp_io::offchain::local_storage_get(sp_core::offchain::StorageKind::PERSISTENT, &key)
sp_io::offchain::local_storage_get(StorageKind::PERSISTENT, &key)
})
.and_then(|v| codec::Decode::decode(&mut &*v).ok()))
}
Expand All @@ -282,9 +277,8 @@ where
return Ok(())
}

frame_support::log::trace!(
target: "runtime::mmr",
"elems: {:?}",
trace!(
target: "runtime::mmr", "elems: {:?}",
elems.iter().map(|elem| elem.hash()).collect::<Vec<_>>()
);

Expand All @@ -309,25 +303,12 @@ where
// in offchain DB to avoid DB collisions and overwrites in case of forks.
let parent_hash = <frame_system::Pallet<T>>::parent_hash();
for elem in elems {
// For now we store this leaf offchain keyed by `(parent_hash, node_index)`
// to make it fork-resistant.
// Offchain worker task will "canonicalize" it `frame_system::BlockHashCount` blocks
// later when we are not worried about forks anymore (highly unlikely to have a fork
// in the chain that deep).
// "Canonicalization" in this case means moving this leaf under a new key based
// only on the leaf's `node_index`.
let key = Pallet::<T, I>::node_offchain_key(parent_hash, node_index);
frame_support::log::debug!(
target: "runtime::mmr::offchain", "offchain db set: pos {} parent_hash {:?} key {:?}",
node_index, parent_hash, key
);
// Indexing API is used to store the full node content (both leaf and inner).
elem.using_encoded(|elem| offchain_index::set(&key, elem));

// On-chain we are going to only store new peaks.
if peaks_to_store.next_if_eq(&node_index).is_some() {
<Nodes<T, I>>::insert(node_index, elem.hash());
}
// We are storing full node off-chain (using indexing API).
Self::store_to_offchain(node_index, parent_hash, &elem);

// Increase the indices.
if let Node::Data(..) = elem {
Expand All @@ -348,6 +329,38 @@ where
}
}

impl<T, I, L> Storage<RuntimeStorage, T, I, L>
where
T: Config<I>,
I: 'static,
L: primitives::FullLeaf,
{
fn store_to_offchain(
pos: NodeIndex,
parent_hash: <T as frame_system::Config>::Hash,
elem: &NodeOf<T, I, L>,
) {
let encoded_node = elem.encode();
acatangiu marked this conversation as resolved.
Show resolved Hide resolved
// We store this leaf offchain keyed by `(parent_hash, node_index)` to make it
// fork-resistant. Offchain worker task will "canonicalize" it
// `frame_system::BlockHashCount` blocks later, when we are not worried about forks anymore
// (multi-era-deep forks should not happen).
let key = Pallet::<T, I>::node_offchain_key(parent_hash, pos);
debug!(
target: "runtime::mmr::offchain", "offchain db set: pos {} parent_hash {:?} key {:?}",
pos, parent_hash, key
);
// Indexing API is used to store the full node content.
offchain_index::set(&key, &encoded_node);
// We also directly save the full node under the "canonical" key.
// This is superfluous for the normal case - this entry will possibly be overwritten
acatangiu marked this conversation as resolved.
Show resolved Hide resolved
// by forks, and will also be overwritten by "offchain_worker canonicalization".
// But it is required for blocks imported during initial sync where none of the above apply
// (`offchain_worker` doesn't run for initial sync blocks).
offchain_index::set(&Pallet::<T, I>::node_canon_offchain_key(pos), &encoded_node);
}
}

fn peaks_to_prune_and_store(
old_size: NodeIndex,
new_size: NodeIndex,
Expand All @@ -356,8 +369,8 @@ fn peaks_to_prune_and_store(
// both collections may share a common prefix.
let peaks_before = if old_size == 0 { vec![] } else { helper::get_peaks(old_size) };
let peaks_after = helper::get_peaks(new_size);
frame_support::log::trace!(target: "runtime::mmr", "peaks_before: {:?}", peaks_before);
frame_support::log::trace!(target: "runtime::mmr", "peaks_after: {:?}", peaks_after);
trace!(target: "runtime::mmr", "peaks_before: {:?}", peaks_before);
trace!(target: "runtime::mmr", "peaks_after: {:?}", peaks_after);
let mut peaks_before = peaks_before.into_iter().peekable();
let mut peaks_after = peaks_after.into_iter().peekable();

Expand Down
Loading