From cd12121439e20516b6bf81fc8e898018cf1bb06e Mon Sep 17 00:00:00 2001 From: nikurt <86772482+nikurt@users.noreply.github.com> Date: Thu, 9 Mar 2023 17:30:11 +0100 Subject: [PATCH] feat: Dump and apply state changes for a range of blocks. (#8628) This simulates shard shadowing. In order to fully correctly replay state changes and get matching state roots, all kinds of state changes need to be applied, including the ones which currently are filtered out. To achieve this I add a compilation feature. Furthermore, `DelayedReceipt{,Indices}` state changes have no information about the shard they are coming from, which leads to two issues: 1) If two shards have a `DelayedReceipt` change, they will override one another, and the StateChanges column will contain only one of them. 2) We don't know to which shard to apply these state changes. The solution is to encode `ShardUId` in the row key of the `StateChanges` column for these two kids of state changes. The other kinds of state changes are matched to shards by their `account_id`s or `receiver_id`s. Combination of these two solutions lets us have a backwards-compatible solution. --- core/primitives/src/trie_key.rs | 83 ++++++++- core/primitives/src/types.rs | 22 +++ core/store/Cargo.toml | 2 + core/store/src/trie/shard_tries.rs | 193 ++++++++++++++++++--- nearcore/Cargo.toml | 2 + neard/Cargo.toml | 1 + tools/state-viewer/Cargo.toml | 1 + tools/state-viewer/src/cli.rs | 27 ++- tools/state-viewer/src/lib.rs | 1 + tools/state-viewer/src/state_changes.rs | 220 ++++++++++++++++++++++++ 10 files changed, 520 insertions(+), 32 deletions(-) create mode 100644 tools/state-viewer/src/state_changes.rs diff --git a/core/primitives/src/trie_key.rs b/core/primitives/src/trie_key.rs index 3af418ce526..cced62860ff 100644 --- a/core/primitives/src/trie_key.rs +++ b/core/primitives/src/trie_key.rs @@ -1,8 +1,11 @@ -use crate::hash::CryptoHash; -use crate::types::AccountId; +use std::mem::size_of; + use borsh::{BorshDeserialize, BorshSerialize}; + use near_crypto::PublicKey; -use std::mem::size_of; + +use crate::hash::CryptoHash; +use crate::types::AccountId; pub(crate) const ACCOUNT_DATA_SEPARATOR: u8 = b','; @@ -208,6 +211,22 @@ impl TrieKey { self.append_into(&mut buf); buf } + + /// Extracts account id from a TrieKey if available. + pub fn get_account_id(&self) -> Option { + match self { + TrieKey::Account { account_id, .. } => Some(account_id.clone()), + TrieKey::ContractCode { account_id, .. } => Some(account_id.clone()), + TrieKey::AccessKey { account_id, .. } => Some(account_id.clone()), + TrieKey::ReceivedData { receiver_id, .. } => Some(receiver_id.clone()), + TrieKey::PostponedReceiptId { receiver_id, .. } => Some(receiver_id.clone()), + TrieKey::PendingDataCount { receiver_id, .. } => Some(receiver_id.clone()), + TrieKey::PostponedReceipt { receiver_id, .. } => Some(receiver_id.clone()), + TrieKey::DelayedReceiptIndices => None, + TrieKey::DelayedReceipt { .. } => None, + TrieKey::ContractData { account_id, .. } => Some(account_id.clone()), + } + } } // TODO: Remove once we switch to non-raw keys everywhere. @@ -624,4 +643,62 @@ mod tests { let raw_key = key.to_vec(); assert!(trie_key_parsers::parse_account_id_from_raw_key(&raw_key).unwrap().is_none()); } + + #[test] + fn test_account_id_from_trie_key() { + let account_id = OK_ACCOUNT_IDS[0].parse::().unwrap(); + + assert_eq!( + TrieKey::Account { account_id: account_id.clone() }.get_account_id(), + Some(account_id.clone()) + ); + assert_eq!( + TrieKey::ContractCode { account_id: account_id.clone() }.get_account_id(), + Some(account_id.clone()) + ); + assert_eq!( + TrieKey::AccessKey { + account_id: account_id.clone(), + public_key: PublicKey::empty(KeyType::ED25519) + } + .get_account_id(), + Some(account_id.clone()) + ); + assert_eq!( + TrieKey::ReceivedData { receiver_id: account_id.clone(), data_id: Default::default() } + .get_account_id(), + Some(account_id.clone()) + ); + assert_eq!( + TrieKey::PostponedReceiptId { + receiver_id: account_id.clone(), + data_id: Default::default() + } + .get_account_id(), + Some(account_id.clone()) + ); + assert_eq!( + TrieKey::PendingDataCount { + receiver_id: account_id.clone(), + receipt_id: Default::default() + } + .get_account_id(), + Some(account_id.clone()) + ); + assert_eq!( + TrieKey::PostponedReceipt { + receiver_id: account_id.clone(), + receipt_id: Default::default() + } + .get_account_id(), + Some(account_id.clone()) + ); + assert_eq!(TrieKey::DelayedReceipt { index: Default::default() }.get_account_id(), None); + assert_eq!(TrieKey::DelayedReceiptIndices.get_account_id(), None); + assert_eq!( + TrieKey::ContractData { account_id: account_id.clone(), key: Default::default() } + .get_account_id(), + Some(account_id.clone()) + ); + } } diff --git a/core/primitives/src/types.rs b/core/primitives/src/types.rs index a817ba01ce7..93145d16e30 100644 --- a/core/primitives/src/types.rs +++ b/core/primitives/src/types.rs @@ -975,3 +975,25 @@ impl TrieNodesCount { }) } } + +/// State changes for a range of blocks. +/// Expects that a block is present at most once in the list. +#[derive(borsh::BorshDeserialize, borsh::BorshSerialize)] +pub struct StateChangesForBlockRange { + pub blocks: Vec, +} + +/// State changes for a single block. +/// Expects that a shard is present at most once in the list of state changes. +#[derive(borsh::BorshDeserialize, borsh::BorshSerialize)] +pub struct StateChangesForBlock { + pub block_hash: CryptoHash, + pub state_changes: Vec, +} + +/// Key and value of a StateChanges column. +#[derive(borsh::BorshDeserialize, borsh::BorshSerialize)] +pub struct StateChangesForShard { + pub shard_id: ShardId, + pub state_changes: Vec, +} diff --git a/core/store/Cargo.toml b/core/store/Cargo.toml index f65f0c862e2..dea8222f297 100644 --- a/core/store/Cargo.toml +++ b/core/store/Cargo.toml @@ -40,6 +40,7 @@ assert_matches.workspace = true bencher.workspace = true insta.workspace = true rand.workspace = true +thiserror.workspace = true [[bench]] name = "trie_bench" @@ -56,6 +57,7 @@ no_cache = [] single_thread_rocksdb = [] # Deactivate RocksDB IO background threads test_features = [] protocol_feature_flat_state = [] +serialize_all_state_changes = [] nightly_protocol = [] nightly = [ diff --git a/core/store/src/trie/shard_tries.rs b/core/store/src/trie/shard_tries.rs index 65e296dbad4..23c2eb195ff 100644 --- a/core/store/src/trie/shard_tries.rs +++ b/core/store/src/trie/shard_tries.rs @@ -1,4 +1,3 @@ -use std::io; use std::rc::Rc; use std::sync::{Arc, RwLock}; @@ -383,18 +382,42 @@ impl WrappedTrieChanges { "Resharding changes must never be finalized." ); - // Filtering trie keys for user facing RPC reporting. - // NOTE: If the trie key is not one of the account specific, it may cause key conflict - // when the node tracks multiple shards. See #2563. - match &change_with_trie_key.trie_key { - TrieKey::Account { .. } - | TrieKey::ContractCode { .. } - | TrieKey::AccessKey { .. } - | TrieKey::ContractData { .. } => {} - _ => continue, + let storage_key = if cfg!(feature = "serialize_all_state_changes") { + // Serialize all kinds of state changes without any filtering. + // Without this it's not possible to replay state changes to get an identical state root. + + // This branch will become the default in the near future. + + match change_with_trie_key.trie_key.get_account_id() { + // If a TrieKey itself doesn't identify the Shard, then we need to add shard id to the row key. + None => KeyForStateChanges::delayed_receipt_key_from_trie_key( + &self.block_hash, + &change_with_trie_key.trie_key, + &self.shard_uid, + ), + // TrieKey has enough information to identify the shard it comes from. + _ => KeyForStateChanges::from_trie_key( + &self.block_hash, + &change_with_trie_key.trie_key, + ), + } + } else { + // This branch is the current neard behavior. + // Only a subset of state changes get serialized. + + // Filtering trie keys for user facing RPC reporting. + // NOTE: If the trie key is not one of the account specific, it may cause key conflict + // when the node tracks multiple shards. See #2563. + match &change_with_trie_key.trie_key { + TrieKey::Account { .. } + | TrieKey::ContractCode { .. } + | TrieKey::AccessKey { .. } + | TrieKey::ContractData { .. } => {} + _ => continue, + }; + KeyForStateChanges::from_trie_key(&self.block_hash, &change_with_trie_key.trie_key) }; - let storage_key = - KeyForStateChanges::from_trie_key(&self.block_hash, &change_with_trie_key.trie_key); + store_update.set( DBCol::StateChanges, storage_key.as_ref(), @@ -403,7 +426,7 @@ impl WrappedTrieChanges { } } - pub fn trie_changes_into(&mut self, store_update: &mut StoreUpdate) -> io::Result<()> { + pub fn trie_changes_into(&mut self, store_update: &mut StoreUpdate) -> std::io::Result<()> { store_update.set_ser( DBCol::TrieChanges, &shard_layout::get_block_shard_uid(&self.block_hash, &self.shard_uid), @@ -412,6 +435,12 @@ impl WrappedTrieChanges { } } +#[derive(thiserror::Error, Debug)] +pub enum KeyForStateChangesError { + #[error("Row key of StateChange of kind DelayedReceipt or DelayedReceiptIndices doesn't contain ShardUId: row_key: {0:?} ; trie_key: {1:?}")] + DelayedReceiptRowKeyError(Vec, TrieKey), +} + #[derive(derive_more::AsRef, derive_more::Into)] pub struct KeyForStateChanges(Vec); @@ -443,22 +472,46 @@ impl KeyForStateChanges { key } + /// Without changing the existing TrieKey format, encodes ShardUId into the row key. + /// See `delayed_receipt_key_from_trie_key` for decoding this row key. + pub fn delayed_receipt_key_from_trie_key( + block_hash: &CryptoHash, + trie_key: &TrieKey, + shard_uid: &ShardUId, + ) -> Self { + let mut key = Self::new(block_hash, trie_key.len() + std::mem::size_of::()); + trie_key.append_into(&mut key.0); + key.0.extend(shard_uid.to_bytes()); + key + } + + /// Extracts ShardUId from row key which contains ShardUId encoded. + /// See `delayed_receipt_key_from_trie_key` for encoding ShardUId into the row key. + pub fn delayed_receipt_key_decode_shard_uid( + row_key: &[u8], + block_hash: &CryptoHash, + trie_key: &TrieKey, + ) -> Result { + let prefix = KeyForStateChanges::from_trie_key(block_hash, trie_key); + let prefix = prefix.as_ref(); + + let suffix = &row_key[prefix.len()..]; + let shard_uid = ShardUId::try_from(suffix).map_err(|_err| { + KeyForStateChangesError::DelayedReceiptRowKeyError(row_key.to_vec(), trie_key.clone()) + })?; + Ok(shard_uid) + } + + /// Iterates over deserialized row values where row key matches `self`. pub fn find_iter<'a>( &'a self, store: &'a Store, ) -> impl Iterator> + 'a { - let prefix_len = Self::estimate_prefix_len(); - debug_assert!(self.0.len() >= prefix_len); - store.iter_prefix_ser::(DBCol::StateChanges, &self.0).map( - move |change| { - // Split off the irrelevant part of the key, so only the original trie_key is left. - let (key, state_changes) = change?; - debug_assert!(key.starts_with(&self.0)); - Ok(state_changes) - }, - ) + // Split off the irrelevant part of the key, so only the original trie_key is left. + self.find_rows_iter(store).map(|row| row.map(|kv| kv.1)) } + /// Iterates over deserialized row values where the row key matches `self` exactly. pub fn find_exact_iter<'a>( &'a self, store: &'a Store, @@ -480,4 +533,98 @@ impl KeyForStateChanges { } }) } + + /// Iterates over pairs of `(row key, deserialized row value)` where row key matches `self`. + pub fn find_rows_iter<'a>( + &'a self, + store: &'a Store, + ) -> impl Iterator, RawStateChangesWithTrieKey), std::io::Error>> + 'a + { + let prefix_len = Self::estimate_prefix_len(); + debug_assert!( + self.0.len() >= prefix_len, + "Key length: {}, prefix length: {}, key: {:?}", + self.0.len(), + prefix_len, + self.0 + ); + store.iter_prefix_ser::(DBCol::StateChanges, &self.0).map( + move |change| { + // Split off the irrelevant part of the key, so only the original trie_key is left. + let (key, state_changes) = change?; + debug_assert!(key.starts_with(&self.0), "Key: {:?}, row key: {:?}", self.0, key); + Ok((key, state_changes)) + }, + ) + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::str::FromStr; + + #[test] + fn test_delayed_receipt_row_key() { + let trie_key1 = TrieKey::DelayedReceipt { index: 1 }; + let trie_key2 = TrieKey::DelayedReceiptIndices {}; + let shard_uid = ShardUId { version: 10, shard_id: 5 }; + + // Random value. + let block_hash = + CryptoHash::from_str("32222222222233333333334444444444445555555777").unwrap(); + + for trie_key in [trie_key1.clone(), trie_key2.clone()] { + let row_key = KeyForStateChanges::delayed_receipt_key_from_trie_key( + &block_hash, + &trie_key, + &shard_uid, + ); + + let got_shard_uid = KeyForStateChanges::delayed_receipt_key_decode_shard_uid( + row_key.as_ref(), + &block_hash, + &trie_key, + ) + .unwrap(); + assert_eq!(shard_uid, got_shard_uid); + } + + // Forget to add a ShardUId to the key, fail to extra a ShardUId from the key. + let row_key_without_shard_uid = KeyForStateChanges::from_trie_key(&block_hash, &trie_key1); + assert!(KeyForStateChanges::delayed_receipt_key_decode_shard_uid( + row_key_without_shard_uid.as_ref(), + &block_hash, + &trie_key1 + ) + .is_err()); + + // Add an extra byte to the key, fail to extra a ShardUId from the key. + let mut row_key_extra_bytes = KeyForStateChanges::delayed_receipt_key_from_trie_key( + &block_hash, + &trie_key1, + &shard_uid, + ); + row_key_extra_bytes.0.extend([8u8]); + assert!(KeyForStateChanges::delayed_receipt_key_decode_shard_uid( + row_key_extra_bytes.as_ref(), + &block_hash, + &trie_key1 + ) + .is_err()); + + // This is the internal detail of how delayed_receipt_key_from_trie_key() works. + let mut row_key_with_single_shard_uid = + KeyForStateChanges::from_trie_key(&block_hash, &trie_key1); + row_key_with_single_shard_uid.0.extend(shard_uid.to_bytes()); + assert_eq!( + KeyForStateChanges::delayed_receipt_key_decode_shard_uid( + row_key_with_single_shard_uid.as_ref(), + &block_hash, + &trie_key1 + ) + .unwrap(), + shard_uid + ); + } } diff --git a/nearcore/Cargo.toml b/nearcore/Cargo.toml index a657e7be5b5..9ed1f0c5887 100644 --- a/nearcore/Cargo.toml +++ b/nearcore/Cargo.toml @@ -108,6 +108,7 @@ protocol_feature_fix_contract_loading_cost = [ "near-vm-runner/protocol_feature_fix_contract_loading_cost", ] protocol_feature_flat_state = ["near-store/protocol_feature_flat_state", "near-chain/protocol_feature_flat_state", "node-runtime/protocol_feature_flat_state"] +serialize_all_state_changes = ["near-store/serialize_all_state_changes"] nightly = [ "nightly_protocol", @@ -119,6 +120,7 @@ nightly = [ "protocol_feature_fix_staking_threshold", "protocol_feature_fix_contract_loading_cost", "protocol_feature_flat_state", + "serialize_all_state_changes", ] nightly_protocol = [ "near-primitives/nightly_protocol", diff --git a/neard/Cargo.toml b/neard/Cargo.toml index bd1ff803c12..fcfb6137969 100644 --- a/neard/Cargo.toml +++ b/neard/Cargo.toml @@ -65,6 +65,7 @@ delay_detector = ["nearcore/delay_detector"] rosetta_rpc = ["nearcore/rosetta_rpc"] json_rpc = ["nearcore/json_rpc"] protocol_feature_fix_staking_threshold = ["nearcore/protocol_feature_fix_staking_threshold"] +serialize_all_state_changes = ["nearcore/serialize_all_state_changes"] protocol_feature_flat_state = ["nearcore/protocol_feature_flat_state", "near-flat-storage/protocol_feature_flat_state"] nightly = [ diff --git a/tools/state-viewer/Cargo.toml b/tools/state-viewer/Cargo.toml index 95e14ead367..a0ca108a5da 100644 --- a/tools/state-viewer/Cargo.toml +++ b/tools/state-viewer/Cargo.toml @@ -25,6 +25,7 @@ tracing.workspace = true near-chain = { path = "../../chain/chain" } near-chain-configs = { path = "../../core/chain-configs" } +near-client = { path = "../../chain/client" } near-crypto = { path = "../../core/crypto" } near-epoch-manager = { path = "../../chain/epoch-manager" } near-network = { path = "../../chain/network" } diff --git a/tools/state-viewer/src/cli.rs b/tools/state-viewer/src/cli.rs index 3d134671850..aef577a7326 100644 --- a/tools/state-viewer/src/cli.rs +++ b/tools/state-viewer/src/cli.rs @@ -2,7 +2,6 @@ use crate::commands::*; use crate::contract_accounts::ContractAccountFilter; use crate::rocksdb_stats::get_rocksdb_stats; use crate::state_parts::{apply_state_parts, dump_state_parts}; -use crate::{epoch_info, state_parts}; use near_chain_configs::{GenesisChangeConfig, GenesisValidationMode}; use near_primitives::account::id::AccountId; use near_primitives::hash::CryptoHash; @@ -76,6 +75,9 @@ pub enum StateViewerSubCommand { RocksDBStats(RocksDBStatsCmd), /// Iterates over a trie and prints the StateRecords. State, + /// Dumps or applies StateChanges. + /// Experimental tool for shard shadowing development. + StateChanges(StateChangesCmd), /// View head of the storage. #[clap(alias = "view_chain")] ViewChain(ViewChainCmd), @@ -133,6 +135,7 @@ impl StateViewerSubCommand { StateViewerSubCommand::Replay(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::RocksDBStats(cmd) => cmd.run(store_opener.path()), StateViewerSubCommand::State => state(home_dir, near_config, store), + StateViewerSubCommand::StateChanges(cmd) => cmd.run(home_dir, near_config, store), StateViewerSubCommand::ViewChain(cmd) => cmd.run(near_config, store), StateViewerSubCommand::ViewTrie(cmd) => cmd.run(store), } @@ -220,7 +223,7 @@ impl ApplyReceiptCmd { pub struct ApplyStatePartsCmd { /// Selects an epoch. The dump will be of the state at the beginning of this epoch. #[clap(subcommand)] - epoch_selection: state_parts::EpochSelection, + epoch_selection: crate::state_parts::EpochSelection, /// Shard id. #[clap(long)] shard_id: ShardId, @@ -247,7 +250,7 @@ impl ApplyStatePartsCmd { home_dir, near_config, store, - state_parts::Location::new(self.root_dir, (self.s3_bucket, self.s3_region)), + crate::state_parts::Location::new(self.root_dir, (self.s3_bucket, self.s3_region)), ); } } @@ -403,7 +406,7 @@ impl DumpStateCmd { pub struct DumpStatePartsCmd { /// Selects an epoch. The dump will be of the state at the beginning of this epoch. #[clap(subcommand)] - epoch_selection: state_parts::EpochSelection, + epoch_selection: crate::state_parts::EpochSelection, /// Shard id. #[clap(long)] shard_id: ShardId, @@ -437,7 +440,7 @@ impl DumpStatePartsCmd { home_dir, near_config, store, - state_parts::Location::new(self.root_dir, (self.s3_bucket, self.s3_region)), + crate::state_parts::Location::new(self.root_dir, (self.s3_bucket, self.s3_region)), ); } } @@ -490,7 +493,7 @@ impl DumpTxCmd { #[derive(clap::Args)] pub struct EpochInfoCmd { #[clap(subcommand)] - epoch_selection: epoch_info::EpochSelection, + epoch_selection: crate::epoch_info::EpochSelection, /// Displays kickouts of the given validator and expected and missed blocks and chunks produced. #[clap(long)] validator_account_id: Option, @@ -561,6 +564,18 @@ impl RocksDBStatsCmd { } } +#[derive(clap::Parser)] +pub struct StateChangesCmd { + #[clap(subcommand)] + command: crate::state_changes::StateChangesSubCommand, +} + +impl StateChangesCmd { + pub fn run(self, home_dir: &Path, near_config: NearConfig, store: Store) { + self.command.run(home_dir, near_config, store) + } +} + #[derive(clap::Parser)] pub struct ViewChainCmd { #[clap(long)] diff --git a/tools/state-viewer/src/lib.rs b/tools/state-viewer/src/lib.rs index 612dcb95b4b..eaf40440b2c 100644 --- a/tools/state-viewer/src/lib.rs +++ b/tools/state-viewer/src/lib.rs @@ -7,6 +7,7 @@ mod commands; mod contract_accounts; mod epoch_info; mod rocksdb_stats; +mod state_changes; mod state_dump; mod state_parts; mod tx_dump; diff --git a/tools/state-viewer/src/state_changes.rs b/tools/state-viewer/src/state_changes.rs new file mode 100644 index 00000000000..86f3166a18f --- /dev/null +++ b/tools/state-viewer/src/state_changes.rs @@ -0,0 +1,220 @@ +use borsh::{BorshDeserialize, BorshSerialize}; +use near_chain::types::RuntimeAdapter; +use near_chain::{ChainStore, ChainStoreAccess}; +use near_epoch_manager::EpochManagerAdapter; +use near_primitives::hash::CryptoHash; +use near_primitives::trie_key::TrieKey; +use near_primitives::types::{ + EpochId, ShardId, StateChangesForBlock, StateChangesForBlockRange, StateChangesForShard, + StateRoot, +}; +use near_primitives_core::types::BlockHeight; +use near_store::{KeyForStateChanges, Store, WrappedTrieChanges}; +use nearcore::{NearConfig, NightshadeRuntime}; +use std::path::{Path, PathBuf}; + +#[derive(clap::Subcommand, Debug, Clone)] +pub(crate) enum StateChangesSubCommand { + /// Applies StateChanges from a file. + /// Needs state roots because chunks state roots may not be known. + /// Needs BlockHeaders available for the corresponding blocks. + Apply { + /// Location of the input file. + /// The file must be generated by the Dump subcommand. + #[clap(parse(from_os_str))] + file: PathBuf, + /// Id of the shard to apply state changes to. + shard_id: ShardId, + /// State root of the shard at the height of the first block with state changes. + state_root: StateRoot, + }, + /// Dumps state changes for a range of blocks from --height-from to --height-to inclusive. + /// The dump will include all available state changes, i.e. of shards tracked at the time. + Dump { + /// --height-from defines the lower (inclusive) bound of a range of block heights to dump. + height_from: BlockHeight, + /// --height-to defines the upper (inclusive) bound of a range of block heights to dump. + height_to: BlockHeight, + /// Location of the output file. + #[clap(parse(from_os_str))] + file: PathBuf, + }, +} + +impl StateChangesSubCommand { + pub(crate) fn run(self, home_dir: &Path, near_config: NearConfig, store: Store) { + match self { + StateChangesSubCommand::Apply { file, shard_id, state_root } => { + apply_state_changes(file, shard_id, state_root, home_dir, near_config, store) + } + StateChangesSubCommand::Dump { height_from, height_to, file } => { + dump_state_changes(height_from, height_to, file, home_dir, near_config, store) + } + } + } +} + +/// Reads StateChanges from the DB for the specified range of blocks. +/// Writes these state changes to the specified file. +/// The data written is borsh-serialized StateChangesForBlockRange. +/// State changes in that file are expected to be ordered in the increasing order of height. +/// Changes are grouped by (block_hash, shard_id). +/// If a block or a shard has no changes, they can be omitted from the serialization. +/// Row key of the StateChanges column are really important because the logic relies that state +/// changes of kinds DelayedReceipt and DelayedReceiptIndices have shard id encoded in the row key. +fn dump_state_changes( + height_from: BlockHeight, + height_to: BlockHeight, + file: PathBuf, + home_dir: &Path, + near_config: NearConfig, + store: Store, +) { + assert!(height_from <= height_to, "--height-from must be less than or equal to --height-to"); + + let runtime: NightshadeRuntime = + NightshadeRuntime::from_config(home_dir, store.clone(), &near_config); + let chain_store = ChainStore::new( + store.clone(), + near_config.genesis.config.genesis_height, + near_config.client_config.save_trie_changes, + ); + + let blocks = (height_from..=height_to).filter_map(|block_height| { + let block_header = chain_store.get_block_header_by_height(block_height).unwrap(); + let block_hash = block_header.hash(); + let epoch_id = block_header.epoch_id(); + let key = KeyForStateChanges::for_block(block_header.hash()); + let mut state_changes_per_shard = vec![vec![];runtime.num_shards(epoch_id).unwrap() as usize]; + + for row in key.find_rows_iter(&store) { + let (key, value) = row.unwrap(); + let shard_id = get_state_change_shard_id(key.as_ref(), &value.trie_key, block_hash, epoch_id, &runtime).unwrap(); + state_changes_per_shard[shard_id as usize].push(value); + } + + tracing::info!(target: "state-changes", block_height = block_header.height(), num_state_changes_per_shard = ?state_changes_per_shard.iter().map(|v|v.len()).collect::>()); + let state_changes : Vec = state_changes_per_shard.into_iter().enumerate().filter_map(|(shard_id,state_changes)|{ + if state_changes.is_empty() { + // Skip serializing state changes for a shard if no state changes were found for this shard in this block. + None + } else { + Some(StateChangesForShard{shard_id:shard_id as ShardId, state_changes}) + } + }).collect(); + + if state_changes.is_empty() { + // Skip serializing state changes for a block if no state changes were found for this block. + None + } else { + Some(StateChangesForBlock { block_hash: *block_hash, state_changes }) + } + }).collect(); + + let state_changes_for_block_range = StateChangesForBlockRange { blocks }; + + tracing::info!(target: "state-changes", ?file, "Writing state changes to a file"); + let data: Vec = state_changes_for_block_range.try_to_vec().unwrap(); + std::fs::write(&file, &data).unwrap(); +} + +/// Reads StateChanges from a file. Applies StateChanges in the order of increasing block height. +/// +/// The file is assumed to be created by `dump_state_changes`. Same assumptions apply. +/// Row key of the StateChanges column are really important because the logic relies that state +/// changes of kinds DelayedReceipt and DelayedReceiptIndices have shard id encoded in the row key. +/// +/// The operation needs state viewer to be run in read-write mode, use `--readwrite` flag. +/// +/// In case the DB contains state roots of chunks, the process compares resulting StateRoots with those known StateRoots. +fn apply_state_changes( + file: PathBuf, + shard_id: ShardId, + mut state_root: StateRoot, + home_dir: &Path, + near_config: NearConfig, + store: Store, +) { + let runtime: NightshadeRuntime = + NightshadeRuntime::from_config(home_dir, store.clone(), &near_config); + let mut chain_store = ChainStore::new( + store, + near_config.genesis.config.genesis_height, + near_config.client_config.save_trie_changes, + ); + + let data = std::fs::read(&file).unwrap(); + let state_changes_for_block_range = StateChangesForBlockRange::try_from_slice(&data).unwrap(); + + for StateChangesForBlock { block_hash, state_changes } in state_changes_for_block_range.blocks { + let block_header = chain_store.get_block_header(&block_hash).unwrap(); + let block_hash = block_header.hash(); + let block_height = block_header.height(); + let epoch_id = block_header.epoch_id(); + let shard_uid = runtime.shard_id_to_uid(shard_id, epoch_id).unwrap(); + + for StateChangesForShard { shard_id: state_change_shard_id, state_changes } in state_changes + { + if state_change_shard_id != shard_id { + continue; + } + + if let Ok(block) = chain_store.get_block(block_hash) { + let known_state_root = block.chunks()[shard_id as usize].prev_state_root(); + assert_eq!(known_state_root, state_root); + tracing::debug!(target: "state-changes", block_height, ?state_root, "Known StateRoot matches"); + } + + tracing::info!(target: "state-changes", block_height, ?block_hash, ?shard_uid, ?state_root, num_changes = state_changes.len(), "Applying state changes"); + let trie = + runtime.get_trie_for_shard(shard_id, &block_hash, state_root, false).unwrap(); + + let trie_update = trie + .update(state_changes.iter().map(|raw_state_changes_with_trie_key| { + tracing::debug!(target: "state-changes", ?raw_state_changes_with_trie_key); + let raw_key = raw_state_changes_with_trie_key.trie_key.to_vec(); + let data = raw_state_changes_with_trie_key.changes.last().unwrap().data.clone(); + (raw_key, data) + })) + .unwrap(); + + tracing::info!(target: "state-change", block_height, ?block_hash, ?shard_uid, old_state_root = ?trie_update.old_root, new_state_root = ?trie_update.new_root, "Applied state changes"); + state_root = trie_update.new_root; + + let wrapped_trie_changes = WrappedTrieChanges::new( + runtime.get_tries(), + shard_uid, + trie_update, + state_changes, + *block_hash, + ); + let mut store_update = chain_store.store_update(); + store_update.save_trie_changes(wrapped_trie_changes); + store_update.commit().unwrap(); + } + } + + tracing::info!(target: "state-changes", ?file, ?shard_id, ?state_root, "Done applying changes"); +} + +/// Determines the shard id which produced the StateChange based the row key, +/// part of the value (TrieKey) and the block that resulted in this state change. +pub fn get_state_change_shard_id( + row_key: &[u8], + trie_key: &TrieKey, + block_hash: &CryptoHash, + epoch_id: &EpochId, + epoch_manager: &dyn EpochManagerAdapter, +) -> Result { + if let Some(account_id) = trie_key.get_account_id() { + let shard_id = epoch_manager.account_id_to_shard_id(&account_id, epoch_id)?; + Ok(shard_id) + } else { + let shard_uid = + KeyForStateChanges::delayed_receipt_key_decode_shard_uid(row_key, block_hash, trie_key) + .map_err(|err| { + near_chain::near_chain_primitives::error::Error::Other(err.to_string()) + })?; + Ok(shard_uid.shard_id as ShardId) + } +}