Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fork-network] Optimize amend-access-keys using in-memory tries. #10176

Merged
merged 1 commit into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions tools/fork-network/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ actix.workspace = true
anyhow.workspace = true
chrono.workspace = true
clap.workspace = true
hex.workspace = true
rayon.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
70 changes: 48 additions & 22 deletions tools/fork-network/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::single_shard_storage_mutator::SingleShardStorageMutator;
use crate::storage_mutator::StorageMutator;
use near_chain::types::Tip;
use near_chain::types::{RuntimeAdapter, Tip};
use near_chain::{ChainStore, ChainStoreAccess};
use near_chain_configs::{Genesis, GenesisConfig, GenesisValidationMode};
use near_crypto::PublicKey;
Expand Down Expand Up @@ -87,7 +87,7 @@ struct FinalizeCmd;

#[derive(clap::Parser)]
struct AmendAccessKeysCmd {
#[arg(short, long, default_value = "100000")]
#[arg(short, long, default_value = "2000000")]
batch_size: u64,
}

Expand Down Expand Up @@ -121,7 +121,7 @@ struct Validator {
}

type MakeSingleShardStorageMutatorFn =
Arc<dyn Fn(ShardId, StateRoot) -> anyhow::Result<SingleShardStorageMutator> + Send + Sync>;
Arc<dyn Fn(StateRoot) -> anyhow::Result<SingleShardStorageMutator> + Send + Sync>;

impl ForkNetworkCommand {
pub fn run(
Expand Down Expand Up @@ -306,10 +306,11 @@ impl ForkNetworkCommand {
home_dir: &Path,
) -> anyhow::Result<Vec<StateRoot>> {
// Open storage with migration
near_config.config.store.load_mem_tries_for_all_shards = true;
let storage = open_storage(&home_dir, near_config).unwrap();
let store = storage.get_hot_store();

let (prev_state_roots, prev_hash, epoch_id, _block_height) =
let (prev_state_roots, prev_hash, epoch_id, block_height) =
self.get_state_roots_and_hash(store.clone())?;
tracing::info!(?prev_state_roots, ?epoch_id, ?prev_hash);

Expand All @@ -321,22 +322,19 @@ impl ForkNetworkCommand {
.collect();
let runtime =
NightshadeRuntime::from_config(home_dir, store.clone(), &near_config, epoch_manager);
runtime.load_mem_tries_on_startup(&all_shard_uids).unwrap();

let make_storage_mutator: MakeSingleShardStorageMutatorFn =
Arc::new(move |shard_id, prev_state_root| {
SingleShardStorageMutator::new(
shard_id,
&runtime.clone(),
prev_hash,
prev_state_root,
)
Arc::new(move |prev_state_root| {
SingleShardStorageMutator::new(&runtime.clone(), prev_state_root)
});

let new_state_roots = self.prepare_state(
batch_size,
&all_shard_uids,
store,
&prev_state_roots,
block_height,
make_storage_mutator.clone(),
)?;
Ok(new_state_roots)
Expand All @@ -358,7 +356,7 @@ impl ForkNetworkCommand {
let storage = open_storage(&home_dir, near_config).unwrap();
let store = storage.get_hot_store();

let (prev_state_roots, prev_hash, epoch_id, block_height) =
let (prev_state_roots, _prev_hash, epoch_id, block_height) =
self.get_state_roots_and_hash(store.clone())?;

let epoch_manager =
Expand All @@ -374,7 +372,6 @@ impl ForkNetworkCommand {
epoch_manager.clone(),
&runtime,
epoch_id.clone(),
prev_hash,
prev_state_roots,
)?;
let (new_state_roots, new_validator_accounts) =
Expand Down Expand Up @@ -477,13 +474,12 @@ impl ForkNetworkCommand {
shard_uid: ShardUId,
store: Store,
prev_state_root: StateRoot,
block_height: BlockHeight,
make_storage_mutator: MakeSingleShardStorageMutatorFn,
) -> anyhow::Result<StateRoot> {
// Doesn't support secrets.
tracing::info!(?shard_uid);
let shard_id = shard_uid.shard_id as ShardId;
let mut storage_mutator: SingleShardStorageMutator =
make_storage_mutator(shard_id, prev_state_root)?;
let mut storage_mutator: SingleShardStorageMutator = make_storage_mutator(prev_state_root)?;

// Keeps track of accounts that have a full access key.
let mut has_full_key = HashSet::new();
Expand All @@ -492,6 +488,7 @@ impl ForkNetworkCommand {

// Iterate over the whole flat storage and do the necessary changes to have access to all accounts.
let mut index_delayed_receipt = 0;
let mut ref_keys_retrieved = 0;
let mut records_not_parsed = 0;
let mut records_parsed = 0;
let mut access_keys_updated = 0;
Expand All @@ -501,9 +498,11 @@ impl ForkNetworkCommand {
let mut postponed_receipts_updated = 0;
let mut delayed_receipts_updated = 0;
let mut received_data_updated = 0;
let mut fake_block_height = block_height + 1;
for item in store_helper::iter_flat_state_entries(shard_uid, &store, None, None) {
let (key, value) = match item {
Ok((key, FlatStateValue::Ref(ref_value))) => {
ref_keys_retrieved += 1;
(key, trie_storage.retrieve_raw_bytes(&ref_value.hash)?.to_vec())
}
Ok((key, FlatStateValue::Inlined(value))) => (key, value),
Expand Down Expand Up @@ -609,13 +608,27 @@ impl ForkNetworkCommand {
records_not_parsed += 1;
}
if storage_mutator.should_commit(batch_size) {
let state_root = storage_mutator.commit(&shard_uid)?;
storage_mutator = make_storage_mutator(shard_id, state_root)?;
tracing::info!(
?shard_uid,
ref_keys_retrieved,
records_parsed,
updated = access_keys_updated
+ accounts_implicit_updated
+ contract_data_updated
+ contract_code_updated
+ postponed_receipts_updated
+ delayed_receipts_updated
+ received_data_updated,
);
let state_root = storage_mutator.commit(&shard_uid, fake_block_height)?;
fake_block_height += 1;
storage_mutator = make_storage_mutator(state_root)?;
}
}

tracing::info!(
?shard_uid,
ref_keys_retrieved,
records_parsed,
records_not_parsed,
accounts_implicit_updated,
Expand All @@ -638,7 +651,17 @@ impl ForkNetworkCommand {
if let Ok((key, _)) = item {
if key[0] == col::ACCOUNT {
num_accounts += 1;
let account_id = parse_account_id_from_account_key(&key).unwrap();
let account_id = match parse_account_id_from_account_key(&key) {
Ok(account_id) => account_id,
Err(err) => {
tracing::error!(
?err,
"Failed to parse account id {}",
hex::encode(&key)
);
continue;
}
};
if has_full_key.contains(&account_id) {
continue;
}
Expand All @@ -649,15 +672,16 @@ impl ForkNetworkCommand {
)?;
num_added += 1;
if storage_mutator.should_commit(batch_size) {
let state_root = storage_mutator.commit(&shard_uid)?;
storage_mutator = make_storage_mutator(shard_id, state_root)?;
let state_root = storage_mutator.commit(&shard_uid, fake_block_height)?;
fake_block_height += 1;
storage_mutator = make_storage_mutator(state_root)?;
}
}
}
}
tracing::info!(?shard_uid, num_accounts, num_added, "Pass 2 done");

let state_root = storage_mutator.commit(&shard_uid)?;
let state_root = storage_mutator.commit(&shard_uid, fake_block_height)?;

tracing::info!(?shard_uid, "Commit done");
Ok(state_root)
Expand All @@ -669,6 +693,7 @@ impl ForkNetworkCommand {
all_shard_uids: &[ShardUId],
store: Store,
prev_state_roots: &[StateRoot],
block_height: BlockHeight,
make_storage_mutator: MakeSingleShardStorageMutatorFn,
) -> anyhow::Result<Vec<StateRoot>> {
let state_roots = all_shard_uids
Expand All @@ -680,6 +705,7 @@ impl ForkNetworkCommand {
*shard_uid,
store.clone(),
prev_state_roots[shard_uid.shard_id as usize],
block_height,
make_storage_mutator.clone(),
)
.unwrap();
Expand Down
Loading
Loading