Skip to content

Commit

Permalink
[fork-network] Optimize amend-access-keys using in-memory tries.
Browse files Browse the repository at this point in the history
  • Loading branch information
robin-near committed Nov 15, 2023
1 parent d782f03 commit aac780b
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 92 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tools/fork-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ actix.workspace = true
anyhow.workspace = true
chrono.workspace = true
clap.workspace = true
hex.workspace = true
rayon.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
70 changes: 48 additions & 22 deletions tools/fork-network/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::single_shard_storage_mutator::SingleShardStorageMutator;
use crate::storage_mutator::StorageMutator;
use near_chain::types::Tip;
use near_chain::types::{RuntimeAdapter, Tip};
use near_chain::{ChainStore, ChainStoreAccess};
use near_chain_configs::{Genesis, GenesisConfig, GenesisValidationMode};
use near_crypto::PublicKey;
Expand Down Expand Up @@ -87,7 +87,7 @@ struct FinalizeCmd;

#[derive(clap::Parser)]
struct AmendAccessKeysCmd {
#[arg(short, long, default_value = "100000")]
#[arg(short, long, default_value = "2000000")]
batch_size: u64,
}

Expand Down Expand Up @@ -121,7 +121,7 @@ struct Validator {
}

type MakeSingleShardStorageMutatorFn =
Arc<dyn Fn(ShardId, StateRoot) -> anyhow::Result<SingleShardStorageMutator> + Send + Sync>;
Arc<dyn Fn(StateRoot) -> anyhow::Result<SingleShardStorageMutator> + Send + Sync>;

impl ForkNetworkCommand {
pub fn run(
Expand Down Expand Up @@ -306,10 +306,11 @@ impl ForkNetworkCommand {
home_dir: &Path,
) -> anyhow::Result<Vec<StateRoot>> {
// Open storage with migration
near_config.config.store.load_mem_tries_for_all_shards = true;
let storage = open_storage(&home_dir, near_config).unwrap();
let store = storage.get_hot_store();

let (prev_state_roots, prev_hash, epoch_id, _block_height) =
let (prev_state_roots, prev_hash, epoch_id, block_height) =
self.get_state_roots_and_hash(store.clone())?;
tracing::info!(?prev_state_roots, ?epoch_id, ?prev_hash);

Expand All @@ -321,22 +322,19 @@ impl ForkNetworkCommand {
.collect();
let runtime =
NightshadeRuntime::from_config(home_dir, store.clone(), &near_config, epoch_manager);
runtime.load_mem_tries_on_startup(&all_shard_uids).unwrap();

let make_storage_mutator: MakeSingleShardStorageMutatorFn =
Arc::new(move |shard_id, prev_state_root| {
SingleShardStorageMutator::new(
shard_id,
&runtime.clone(),
prev_hash,
prev_state_root,
)
Arc::new(move |prev_state_root| {
SingleShardStorageMutator::new(&runtime.clone(), prev_state_root)
});

let new_state_roots = self.prepare_state(
batch_size,
&all_shard_uids,
store,
&prev_state_roots,
block_height,
make_storage_mutator.clone(),
)?;
Ok(new_state_roots)
Expand All @@ -358,7 +356,7 @@ impl ForkNetworkCommand {
let storage = open_storage(&home_dir, near_config).unwrap();
let store = storage.get_hot_store();

let (prev_state_roots, prev_hash, epoch_id, block_height) =
let (prev_state_roots, _prev_hash, epoch_id, block_height) =
self.get_state_roots_and_hash(store.clone())?;

let epoch_manager =
Expand All @@ -374,7 +372,6 @@ impl ForkNetworkCommand {
epoch_manager.clone(),
&runtime,
epoch_id.clone(),
prev_hash,
prev_state_roots,
)?;
let (new_state_roots, new_validator_accounts) =
Expand Down Expand Up @@ -477,13 +474,12 @@ impl ForkNetworkCommand {
shard_uid: ShardUId,
store: Store,
prev_state_root: StateRoot,
block_height: BlockHeight,
make_storage_mutator: MakeSingleShardStorageMutatorFn,
) -> anyhow::Result<StateRoot> {
// Doesn't support secrets.
tracing::info!(?shard_uid);
let shard_id = shard_uid.shard_id as ShardId;
let mut storage_mutator: SingleShardStorageMutator =
make_storage_mutator(shard_id, prev_state_root)?;
let mut storage_mutator: SingleShardStorageMutator = make_storage_mutator(prev_state_root)?;

// Keeps track of accounts that have a full access key.
let mut has_full_key = HashSet::new();
Expand All @@ -492,6 +488,7 @@ impl ForkNetworkCommand {

// Iterate over the whole flat storage and do the necessary changes to have access to all accounts.
let mut index_delayed_receipt = 0;
let mut ref_keys_retrieved = 0;
let mut records_not_parsed = 0;
let mut records_parsed = 0;
let mut access_keys_updated = 0;
Expand All @@ -501,9 +498,11 @@ impl ForkNetworkCommand {
let mut postponed_receipts_updated = 0;
let mut delayed_receipts_updated = 0;
let mut received_data_updated = 0;
let mut fake_block_height = block_height + 1;
for item in store_helper::iter_flat_state_entries(shard_uid, &store, None, None) {
let (key, value) = match item {
Ok((key, FlatStateValue::Ref(ref_value))) => {
ref_keys_retrieved += 1;
(key, trie_storage.retrieve_raw_bytes(&ref_value.hash)?.to_vec())
}
Ok((key, FlatStateValue::Inlined(value))) => (key, value),
Expand Down Expand Up @@ -609,13 +608,27 @@ impl ForkNetworkCommand {
records_not_parsed += 1;
}
if storage_mutator.should_commit(batch_size) {
let state_root = storage_mutator.commit(&shard_uid)?;
storage_mutator = make_storage_mutator(shard_id, state_root)?;
tracing::info!(
?shard_uid,
ref_keys_retrieved,
records_parsed,
updated = access_keys_updated
+ accounts_implicit_updated
+ contract_data_updated
+ contract_code_updated
+ postponed_receipts_updated
+ delayed_receipts_updated
+ received_data_updated,
);
let state_root = storage_mutator.commit(&shard_uid, fake_block_height)?;
fake_block_height += 1;
storage_mutator = make_storage_mutator(state_root)?;
}
}

tracing::info!(
?shard_uid,
ref_keys_retrieved,
records_parsed,
records_not_parsed,
accounts_implicit_updated,
Expand All @@ -638,7 +651,17 @@ impl ForkNetworkCommand {
if let Ok((key, _)) = item {
if key[0] == col::ACCOUNT {
num_accounts += 1;
let account_id = parse_account_id_from_account_key(&key).unwrap();
let account_id = match parse_account_id_from_account_key(&key) {
Ok(account_id) => account_id,
Err(err) => {
tracing::error!(
?err,
"Failed to parse account id {}",
hex::encode(&key)
);
continue;
}
};
if has_full_key.contains(&account_id) {
continue;
}
Expand All @@ -649,15 +672,16 @@ impl ForkNetworkCommand {
)?;
num_added += 1;
if storage_mutator.should_commit(batch_size) {
let state_root = storage_mutator.commit(&shard_uid)?;
storage_mutator = make_storage_mutator(shard_id, state_root)?;
let state_root = storage_mutator.commit(&shard_uid, fake_block_height)?;
fake_block_height += 1;
storage_mutator = make_storage_mutator(state_root)?;
}
}
}
}
tracing::info!(?shard_uid, num_accounts, num_added, "Pass 2 done");

let state_root = storage_mutator.commit(&shard_uid)?;
let state_root = storage_mutator.commit(&shard_uid, fake_block_height)?;

tracing::info!(?shard_uid, "Commit done");
Ok(state_root)
Expand All @@ -669,6 +693,7 @@ impl ForkNetworkCommand {
all_shard_uids: &[ShardUId],
store: Store,
prev_state_roots: &[StateRoot],
block_height: BlockHeight,
make_storage_mutator: MakeSingleShardStorageMutatorFn,
) -> anyhow::Result<Vec<StateRoot>> {
let state_roots = all_shard_uids
Expand All @@ -680,6 +705,7 @@ impl ForkNetworkCommand {
*shard_uid,
store.clone(),
prev_state_roots[shard_uid.shard_id as usize],
block_height,
make_storage_mutator.clone(),
)
.unwrap();
Expand Down
Loading

0 comments on commit aac780b

Please sign in to comment.