From aac780b4ca2945d976d479da9aac3b258468bd78 Mon Sep 17 00:00:00 2001 From: robin-near Date: Tue, 14 Nov 2023 10:26:25 -0800 Subject: [PATCH] [fork-network] Optimize amend-access-keys using in-memory tries. --- Cargo.lock | 1 + tools/fork-network/Cargo.toml | 1 + tools/fork-network/src/cli.rs | 70 ++++++---- .../src/single_shard_storage_mutator.rs | 120 +++++++++--------- tools/fork-network/src/storage_mutator.rs | 15 +-- 5 files changed, 115 insertions(+), 92 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8bb01fb394e..b50052585dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3763,6 +3763,7 @@ dependencies = [ "anyhow", "chrono", "clap 4.2.4", + "hex", "near-chain", "near-chain-configs", "near-crypto", diff --git a/tools/fork-network/Cargo.toml b/tools/fork-network/Cargo.toml index c69b48c6294..cbb535d0dae 100644 --- a/tools/fork-network/Cargo.toml +++ b/tools/fork-network/Cargo.toml @@ -13,6 +13,7 @@ actix.workspace = true anyhow.workspace = true chrono.workspace = true clap.workspace = true +hex.workspace = true rayon.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/tools/fork-network/src/cli.rs b/tools/fork-network/src/cli.rs index 3217f94575c..0ea52130688 100644 --- a/tools/fork-network/src/cli.rs +++ b/tools/fork-network/src/cli.rs @@ -1,6 +1,6 @@ use crate::single_shard_storage_mutator::SingleShardStorageMutator; use crate::storage_mutator::StorageMutator; -use near_chain::types::Tip; +use near_chain::types::{RuntimeAdapter, Tip}; use near_chain::{ChainStore, ChainStoreAccess}; use near_chain_configs::{Genesis, GenesisConfig, GenesisValidationMode}; use near_crypto::PublicKey; @@ -87,7 +87,7 @@ struct FinalizeCmd; #[derive(clap::Parser)] struct AmendAccessKeysCmd { - #[arg(short, long, default_value = "100000")] + #[arg(short, long, default_value = "2000000")] batch_size: u64, } @@ -121,7 +121,7 @@ struct Validator { } type MakeSingleShardStorageMutatorFn = - Arc anyhow::Result + Send + Sync>; + Arc anyhow::Result + Send + Sync>; impl ForkNetworkCommand { pub fn run( @@ -306,10 +306,11 @@ impl ForkNetworkCommand { home_dir: &Path, ) -> anyhow::Result> { // Open storage with migration + near_config.config.store.load_mem_tries_for_all_shards = true; let storage = open_storage(&home_dir, near_config).unwrap(); let store = storage.get_hot_store(); - let (prev_state_roots, prev_hash, epoch_id, _block_height) = + let (prev_state_roots, prev_hash, epoch_id, block_height) = self.get_state_roots_and_hash(store.clone())?; tracing::info!(?prev_state_roots, ?epoch_id, ?prev_hash); @@ -321,15 +322,11 @@ impl ForkNetworkCommand { .collect(); let runtime = NightshadeRuntime::from_config(home_dir, store.clone(), &near_config, epoch_manager); + runtime.load_mem_tries_on_startup(&all_shard_uids).unwrap(); let make_storage_mutator: MakeSingleShardStorageMutatorFn = - Arc::new(move |shard_id, prev_state_root| { - SingleShardStorageMutator::new( - shard_id, - &runtime.clone(), - prev_hash, - prev_state_root, - ) + Arc::new(move |prev_state_root| { + SingleShardStorageMutator::new(&runtime.clone(), prev_state_root) }); let new_state_roots = self.prepare_state( @@ -337,6 +334,7 @@ impl ForkNetworkCommand { &all_shard_uids, store, &prev_state_roots, + block_height, make_storage_mutator.clone(), )?; Ok(new_state_roots) @@ -358,7 +356,7 @@ impl ForkNetworkCommand { let storage = open_storage(&home_dir, near_config).unwrap(); let store = storage.get_hot_store(); - let (prev_state_roots, prev_hash, epoch_id, block_height) = + let (prev_state_roots, _prev_hash, epoch_id, block_height) = self.get_state_roots_and_hash(store.clone())?; let epoch_manager = @@ -374,7 +372,6 @@ impl ForkNetworkCommand { epoch_manager.clone(), &runtime, epoch_id.clone(), - prev_hash, prev_state_roots, )?; let (new_state_roots, new_validator_accounts) = @@ -477,13 +474,12 @@ impl ForkNetworkCommand { shard_uid: ShardUId, store: Store, prev_state_root: StateRoot, + block_height: BlockHeight, make_storage_mutator: MakeSingleShardStorageMutatorFn, ) -> anyhow::Result { // Doesn't support secrets. tracing::info!(?shard_uid); - let shard_id = shard_uid.shard_id as ShardId; - let mut storage_mutator: SingleShardStorageMutator = - make_storage_mutator(shard_id, prev_state_root)?; + let mut storage_mutator: SingleShardStorageMutator = make_storage_mutator(prev_state_root)?; // Keeps track of accounts that have a full access key. let mut has_full_key = HashSet::new(); @@ -492,6 +488,7 @@ impl ForkNetworkCommand { // Iterate over the whole flat storage and do the necessary changes to have access to all accounts. let mut index_delayed_receipt = 0; + let mut ref_keys_retrieved = 0; let mut records_not_parsed = 0; let mut records_parsed = 0; let mut access_keys_updated = 0; @@ -501,9 +498,11 @@ impl ForkNetworkCommand { let mut postponed_receipts_updated = 0; let mut delayed_receipts_updated = 0; let mut received_data_updated = 0; + let mut fake_block_height = block_height + 1; for item in store_helper::iter_flat_state_entries(shard_uid, &store, None, None) { let (key, value) = match item { Ok((key, FlatStateValue::Ref(ref_value))) => { + ref_keys_retrieved += 1; (key, trie_storage.retrieve_raw_bytes(&ref_value.hash)?.to_vec()) } Ok((key, FlatStateValue::Inlined(value))) => (key, value), @@ -609,13 +608,27 @@ impl ForkNetworkCommand { records_not_parsed += 1; } if storage_mutator.should_commit(batch_size) { - let state_root = storage_mutator.commit(&shard_uid)?; - storage_mutator = make_storage_mutator(shard_id, state_root)?; + tracing::info!( + ?shard_uid, + ref_keys_retrieved, + records_parsed, + updated = access_keys_updated + + accounts_implicit_updated + + contract_data_updated + + contract_code_updated + + postponed_receipts_updated + + delayed_receipts_updated + + received_data_updated, + ); + let state_root = storage_mutator.commit(&shard_uid, fake_block_height)?; + fake_block_height += 1; + storage_mutator = make_storage_mutator(state_root)?; } } tracing::info!( ?shard_uid, + ref_keys_retrieved, records_parsed, records_not_parsed, accounts_implicit_updated, @@ -638,7 +651,17 @@ impl ForkNetworkCommand { if let Ok((key, _)) = item { if key[0] == col::ACCOUNT { num_accounts += 1; - let account_id = parse_account_id_from_account_key(&key).unwrap(); + let account_id = match parse_account_id_from_account_key(&key) { + Ok(account_id) => account_id, + Err(err) => { + tracing::error!( + ?err, + "Failed to parse account id {}", + hex::encode(&key) + ); + continue; + } + }; if has_full_key.contains(&account_id) { continue; } @@ -649,15 +672,16 @@ impl ForkNetworkCommand { )?; num_added += 1; if storage_mutator.should_commit(batch_size) { - let state_root = storage_mutator.commit(&shard_uid)?; - storage_mutator = make_storage_mutator(shard_id, state_root)?; + let state_root = storage_mutator.commit(&shard_uid, fake_block_height)?; + fake_block_height += 1; + storage_mutator = make_storage_mutator(state_root)?; } } } } tracing::info!(?shard_uid, num_accounts, num_added, "Pass 2 done"); - let state_root = storage_mutator.commit(&shard_uid)?; + let state_root = storage_mutator.commit(&shard_uid, fake_block_height)?; tracing::info!(?shard_uid, "Commit done"); Ok(state_root) @@ -669,6 +693,7 @@ impl ForkNetworkCommand { all_shard_uids: &[ShardUId], store: Store, prev_state_roots: &[StateRoot], + block_height: BlockHeight, make_storage_mutator: MakeSingleShardStorageMutatorFn, ) -> anyhow::Result> { let state_roots = all_shard_uids @@ -680,6 +705,7 @@ impl ForkNetworkCommand { *shard_uid, store.clone(), prev_state_roots[shard_uid.shard_id as usize], + block_height, make_storage_mutator.clone(), ) .unwrap(); diff --git a/tools/fork-network/src/single_shard_storage_mutator.rs b/tools/fork-network/src/single_shard_storage_mutator.rs index 01b1752a084..9c2368b3237 100644 --- a/tools/fork-network/src/single_shard_storage_mutator.rs +++ b/tools/fork-network/src/single_shard_storage_mutator.rs @@ -7,33 +7,31 @@ use near_primitives::receipt::Receipt; use near_primitives::shard_layout::ShardUId; use near_primitives::trie_key::TrieKey; use near_primitives::types::{AccountId, StateRoot}; -use near_primitives::types::{ShardId, StoreKey, StoreValue}; -use near_store::{flat::FlatStateChanges, DBCol, ShardTries, TrieUpdate}; +use near_primitives::types::{StoreKey, StoreValue}; +use near_store::{flat::FlatStateChanges, DBCol, ShardTries}; use nearcore::NightshadeRuntime; /// Object that updates the existing state. Combines all changes, commits them /// and returns new state roots. pub(crate) struct SingleShardStorageMutator { - trie_update: TrieUpdate, + updates: Vec<(Vec, Option>)>, + state_root: StateRoot, shard_tries: ShardTries, - num_changes: u64, } impl SingleShardStorageMutator { - pub(crate) fn new( - shard_id: ShardId, - runtime: &NightshadeRuntime, - prev_block_hash: CryptoHash, - state_root: StateRoot, - ) -> anyhow::Result { - let trie = runtime.get_trie_for_shard(shard_id, &prev_block_hash, state_root, false)?; - let trie_update = TrieUpdate::new(trie); - Ok(Self { trie_update, shard_tries: runtime.get_tries(), num_changes: 0 }) + pub(crate) fn new(runtime: &NightshadeRuntime, state_root: StateRoot) -> anyhow::Result { + Ok(Self { updates: Vec::new(), state_root, shard_tries: runtime.get_tries() }) } - fn trie_update(&mut self) -> &mut TrieUpdate { - self.num_changes += 1; - &mut self.trie_update + fn set(&mut self, key: TrieKey, value: Vec) -> anyhow::Result<()> { + self.updates.push((key.to_vec(), Some(value))); + Ok(()) + } + + fn remove(&mut self, key: TrieKey) -> anyhow::Result<()> { + self.updates.push((key.to_vec(), None)); + Ok(()) } pub(crate) fn set_account( @@ -41,13 +39,11 @@ impl SingleShardStorageMutator { account_id: AccountId, value: Account, ) -> anyhow::Result<()> { - self.trie_update().set(TrieKey::Account { account_id }, borsh::to_vec(&value)?); - Ok(()) + self.set(TrieKey::Account { account_id }, borsh::to_vec(&value)?) } pub(crate) fn delete_account(&mut self, account_id: AccountId) -> anyhow::Result<()> { - self.trie_update().remove(TrieKey::Account { account_id }); - Ok(()) + self.remove(TrieKey::Account { account_id }) } pub(crate) fn set_access_key( @@ -56,9 +52,7 @@ impl SingleShardStorageMutator { public_key: PublicKey, access_key: AccessKey, ) -> anyhow::Result<()> { - self.trie_update() - .set(TrieKey::AccessKey { account_id, public_key }, borsh::to_vec(&access_key)?); - Ok(()) + self.set(TrieKey::AccessKey { account_id, public_key }, borsh::to_vec(&access_key)?) } pub(crate) fn delete_access_key( @@ -66,8 +60,7 @@ impl SingleShardStorageMutator { account_id: AccountId, public_key: PublicKey, ) -> anyhow::Result<()> { - self.trie_update().remove(TrieKey::AccessKey { account_id, public_key }); - Ok(()) + self.remove(TrieKey::AccessKey { account_id, public_key }) } pub(crate) fn set_data( @@ -76,11 +69,10 @@ impl SingleShardStorageMutator { data_key: &StoreKey, value: StoreValue, ) -> anyhow::Result<()> { - self.trie_update().set( + self.set( TrieKey::ContractData { account_id, key: data_key.to_vec() }, borsh::to_vec(&value)?, - ); - Ok(()) + ) } pub(crate) fn delete_data( @@ -88,37 +80,32 @@ impl SingleShardStorageMutator { account_id: AccountId, data_key: &StoreKey, ) -> anyhow::Result<()> { - self.trie_update().remove(TrieKey::ContractData { account_id, key: data_key.to_vec() }); - Ok(()) + self.remove(TrieKey::ContractData { account_id, key: data_key.to_vec() }) } pub(crate) fn set_code(&mut self, account_id: AccountId, value: Vec) -> anyhow::Result<()> { - self.trie_update().set(TrieKey::ContractCode { account_id }, value); - Ok(()) + self.set(TrieKey::ContractCode { account_id }, value) } pub(crate) fn delete_code(&mut self, account_id: AccountId) -> anyhow::Result<()> { - self.trie_update().remove(TrieKey::ContractCode { account_id }); - Ok(()) + self.remove(TrieKey::ContractCode { account_id }) } pub(crate) fn set_postponed_receipt(&mut self, receipt: &Receipt) -> anyhow::Result<()> { - self.trie_update().set( + self.set( TrieKey::PostponedReceipt { receiver_id: receipt.receiver_id.clone(), receipt_id: receipt.receipt_id, }, borsh::to_vec(&receipt)?, - ); - Ok(()) + ) } pub(crate) fn delete_postponed_receipt(&mut self, receipt: Box) -> anyhow::Result<()> { - self.trie_update().remove(TrieKey::PostponedReceipt { + self.remove(TrieKey::PostponedReceipt { receiver_id: receipt.receiver_id, receipt_id: receipt.receipt_id, - }); - Ok(()) + }) } pub(crate) fn set_received_data( @@ -127,9 +114,7 @@ impl SingleShardStorageMutator { data_id: CryptoHash, data: &Option>, ) -> anyhow::Result<()> { - self.trie_update() - .set(TrieKey::ReceivedData { receiver_id: account_id, data_id }, borsh::to_vec(data)?); - Ok(()) + self.set(TrieKey::ReceivedData { receiver_id: account_id, data_id }, borsh::to_vec(data)?) } pub(crate) fn delete_received_data( @@ -137,8 +122,7 @@ impl SingleShardStorageMutator { account_id: AccountId, data_id: CryptoHash, ) -> anyhow::Result<()> { - self.trie_update().remove(TrieKey::ReceivedData { receiver_id: account_id, data_id }); - Ok(()) + self.remove(TrieKey::ReceivedData { receiver_id: account_id, data_id }) } pub(crate) fn set_delayed_receipt( @@ -146,28 +130,46 @@ impl SingleShardStorageMutator { index: u64, receipt: &Receipt, ) -> anyhow::Result<()> { - self.trie_update().set(TrieKey::DelayedReceipt { index }, borsh::to_vec(receipt)?); - Ok(()) + self.set(TrieKey::DelayedReceipt { index }, borsh::to_vec(receipt)?) } pub(crate) fn delete_delayed_receipt(&mut self, index: u64) -> anyhow::Result<()> { - self.trie_update().remove(TrieKey::DelayedReceipt { index }); - Ok(()) + self.remove(TrieKey::DelayedReceipt { index }) } pub(crate) fn should_commit(&self, batch_size: u64) -> bool { - self.num_changes >= batch_size - } - - pub(crate) fn commit(mut self, shard_uid: &ShardUId) -> anyhow::Result { - tracing::info!(?shard_uid, num_changes = ?self.num_changes, "commit"); + self.updates.len() >= batch_size as usize + } + + /// The fake block height is used to allow memtries to garbage collect. + /// Otherwise it would take significantly more memory holding old nodes. + pub(crate) fn commit( + self, + shard_uid: &ShardUId, + fake_block_height: u64, + ) -> anyhow::Result { + let num_updates = self.updates.len(); + tracing::info!(?shard_uid, num_updates, "commit"); let mut update = self.shard_tries.store_update(); - self.trie_update.commit(near_primitives::types::StateChangeCause::Migration); - let (_, trie_updates, raw_changes) = self.trie_update.finalize()?; - let state_root = self.shard_tries.apply_all(&trie_updates, *shard_uid, &mut update); - let flat_state_changes = FlatStateChanges::from_state_changes(&raw_changes); + let flat_state_changes = FlatStateChanges::from_raw_key_value(&self.updates); flat_state_changes.apply_to_flat_state(&mut update, *shard_uid); - tracing::info!(?shard_uid, num_changes = ?self.num_changes, "committing"); + + let trie_changes = self + .shard_tries + .get_trie_for_shard(*shard_uid, self.state_root) + .update(self.updates)?; + tracing::info!( + ?shard_uid, + num_trie_node_insertions = trie_changes.insertions().len(), + num_trie_node_deletions = trie_changes.deletions().len() + ); + let state_root = self.shard_tries.apply_all(&trie_changes, *shard_uid, &mut update); + self.shard_tries.apply_memtrie_changes(&trie_changes, *shard_uid, fake_block_height); + // We may not have loaded memtries (some commands don't need to), so check. + if let Some(mem_tries) = self.shard_tries.get_mem_tries(*shard_uid) { + mem_tries.write().unwrap().delete_until_height(fake_block_height - 1); + } + tracing::info!(?shard_uid, num_updates, "committing"); update.set_ser( DBCol::Misc, format!("FORK_TOOL_SHARD_ID:{}", shard_uid.shard_id).as_bytes(), diff --git a/tools/fork-network/src/storage_mutator.rs b/tools/fork-network/src/storage_mutator.rs index 743aad4aa63..00f292bf8dc 100644 --- a/tools/fork-network/src/storage_mutator.rs +++ b/tools/fork-network/src/storage_mutator.rs @@ -2,8 +2,7 @@ use crate::single_shard_storage_mutator::SingleShardStorageMutator; use near_crypto::PublicKey; use near_epoch_manager::EpochManagerAdapter; use near_primitives::account::{AccessKey, Account}; -use near_primitives::hash::CryptoHash; -use near_primitives::types::{AccountId, EpochId, ShardId, StateRoot}; +use near_primitives::types::{AccountId, EpochId, StateRoot}; use nearcore::NightshadeRuntime; use std::sync::Arc; @@ -20,20 +19,14 @@ impl StorageMutator { epoch_manager: Arc, runtime: &NightshadeRuntime, epoch_id: EpochId, - prev_block_hash: CryptoHash, state_roots: Vec, ) -> anyhow::Result { let shard_layout = epoch_manager.get_shard_layout(&epoch_id)?; assert_eq!(shard_layout.num_shards(), state_roots.len() as u64); let mut mutators = vec![]; - for (shard_id, state_root) in state_roots.iter().enumerate() { - mutators.push(SingleShardStorageMutator::new( - shard_id as ShardId, - runtime, - prev_block_hash, - *state_root, - )?); + for state_root in state_roots { + mutators.push(SingleShardStorageMutator::new(runtime, state_root)?); } Ok(Self { epoch_manager, epoch_id, mutators }) } @@ -68,7 +61,7 @@ impl StorageMutator { let all_shard_uids = shard_layout.get_shard_uids(); let mut state_roots = vec![]; for (mutator, shard_uid) in self.mutators.into_iter().zip(all_shard_uids.into_iter()) { - let state_root = mutator.commit(&shard_uid)?; + let state_root = mutator.commit(&shard_uid, 0)?; state_roots.push(state_root); } Ok(state_roots)