-
Notifications
You must be signed in to change notification settings - Fork 618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[resharding] Use state sync snapshots to get flat storage iterator at a precise block #9598
Merged
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,15 @@ | ||
use crate::metrics::{ | ||
ReshardingStatus, RESHARDING_BATCH_COUNT, RESHARDING_BATCH_SIZE, RESHARDING_STATUS, | ||
}; | ||
/// Implementation for all resharding logic. | ||
/// StateSplitRequest and StateSplitResponse are exchanged across the client_actor and SyncJobsActor. | ||
/// build_state_for_split_shards_preprocessing and build_state_for_split_shards_postprocessing are handled | ||
/// by the client_actor while the heavy resharding build_state_for_split_shards is done by SyncJobsActor | ||
/// so as to not affect client. | ||
use crate::metrics::{ | ||
ReshardingStatus, RESHARDING_BATCH_COUNT, RESHARDING_BATCH_SIZE, RESHARDING_STATUS, | ||
}; | ||
use crate::Chain; | ||
use itertools::Itertools; | ||
use near_chain_primitives::error::Error; | ||
use near_primitives::errors::StorageError::StorageInconsistentState; | ||
use near_primitives::hash::CryptoHash; | ||
use near_primitives::shard_layout::{account_id_to_shard_uid, ShardLayout}; | ||
use near_primitives::state::FlatStateValue; | ||
|
@@ -36,8 +37,15 @@ const RESHARDING_BATCH_MEMORY_LIMIT: bytesize::ByteSize = bytesize::ByteSize(300 | |
#[rtype(result = "()")] | ||
pub struct StateSplitRequest { | ||
pub tries: Arc<ShardTries>, | ||
// The block hash of the first block of the epoch. | ||
pub sync_hash: CryptoHash, | ||
// The prev hash of the sync_hash. We want the state at that block hash. | ||
pub prev_hash: CryptoHash, | ||
// The prev prev hash of the sync_hash. The state snapshot should be saved at that block hash. | ||
pub prev_prev_hash: CryptoHash, | ||
// Parent shardUId to be split into child shards. | ||
pub shard_uid: ShardUId, | ||
// state root of the parent shardUId. This is different from block sync_hash | ||
pub state_root: StateRoot, | ||
pub next_epoch_shard_layout: ShardLayout, | ||
} | ||
|
@@ -49,6 +57,8 @@ impl Debug for StateSplitRequest { | |
f.debug_struct("StateSplitRequest") | ||
.field("tries", &"<not shown>") | ||
.field("sync_hash", &self.sync_hash) | ||
.field("prev_hash", &self.prev_hash) | ||
.field("prev_prev_hash", &self.prev_prev_hash) | ||
.field("shard_uid", &self.shard_uid) | ||
.field("state_root", &self.state_root) | ||
.field("next_epoch_shard_layout", &self.next_epoch_shard_layout) | ||
|
@@ -86,28 +96,6 @@ fn get_checked_account_id_to_shard_uid_fn( | |
} | ||
} | ||
|
||
// Return iterate over flat storage to get key, value. Used later in the get_trie_update_batch function. | ||
// TODO(#9436): This isn't completely correct. We need to get the flat storage iterator based off a | ||
// particular block, specifically, the last block of the previous epoch. | ||
fn get_flat_storage_iter<'a>( | ||
store: &'a Store, | ||
shard_uid: ShardUId, | ||
) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a { | ||
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid); | ||
store_helper::iter_flat_state_entries(shard_uid, &store, None, None).map( | ||
move |entry| -> (Vec<u8>, Vec<u8>) { | ||
let (key, value) = entry.unwrap(); | ||
let value = match value { | ||
FlatStateValue::Ref(ref_value) => { | ||
trie_storage.retrieve_raw_bytes(&ref_value.hash).unwrap().to_vec() | ||
} | ||
FlatStateValue::Inlined(inline_value) => inline_value, | ||
}; | ||
(key, value) | ||
}, | ||
) | ||
} | ||
|
||
// Format of the trie key, value pair that is used in tries.add_values_to_split_states() function | ||
type TrieEntry = (Vec<u8>, Option<Vec<u8>>); | ||
|
||
|
@@ -119,13 +107,13 @@ struct TrieUpdateBatch { | |
// Function to return batches of trie key, value pairs from flat storage iter. We return None at the end of iter. | ||
// The batch size is roughly RESHARDING_BATCH_MEMORY_LIMIT (300 MB) | ||
fn get_trie_update_batch( | ||
iter: &mut impl Iterator<Item = (Vec<u8>, Vec<u8>)>, | ||
iter: &mut impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>, | ||
) -> Option<TrieUpdateBatch> { | ||
let mut size: u64 = 0; | ||
let mut entries = Vec::new(); | ||
while let Some((key, value)) = iter.next() { | ||
size += key.len() as u64 + value.len() as u64; | ||
entries.push((key, Some(value))); | ||
size += key.len() as u64 + value.as_ref().map_or(0, |v| v.len() as u64); | ||
entries.push((key, value)); | ||
if size > RESHARDING_BATCH_MEMORY_LIMIT.as_u64() { | ||
break; | ||
} | ||
|
@@ -198,11 +186,15 @@ impl Chain { | |
|
||
let shard_uid = ShardUId::from_shard_id_and_layout(shard_id, &shard_layout); | ||
let prev_hash = block_header.prev_hash(); | ||
let prev_block_header = self.get_block_header(prev_hash)?; | ||
let prev_prev_hash = prev_block_header.prev_hash(); | ||
let state_root = *self.get_chunk_extra(&prev_hash, &shard_uid)?.state_root(); | ||
|
||
state_split_scheduler(StateSplitRequest { | ||
tries: Arc::new(self.runtime_adapter.get_tries()), | ||
sync_hash: *sync_hash, | ||
prev_hash: *prev_hash, | ||
prev_prev_hash: *prev_prev_hash, | ||
shard_uid, | ||
state_root, | ||
next_epoch_shard_layout, | ||
|
@@ -269,27 +261,75 @@ impl Chain { | |
fn build_state_for_split_shards_impl_v2( | ||
state_split_request: StateSplitRequest, | ||
) -> Result<HashMap<ShardUId, StateRoot>, Error> { | ||
let StateSplitRequest { tries, shard_uid, state_root, next_epoch_shard_layout, .. } = | ||
state_split_request; | ||
let store = tries.get_store(); | ||
|
||
let shard_id = shard_uid.shard_id(); | ||
let StateSplitRequest { | ||
tries, | ||
prev_hash, | ||
prev_prev_hash, | ||
shard_uid, | ||
state_root, | ||
next_epoch_shard_layout, | ||
.. | ||
} = state_split_request; | ||
|
||
RESHARDING_STATUS | ||
.with_label_values(&[&shard_uid.to_string()]) | ||
.set(ReshardingStatus::BuildingState.into()); | ||
|
||
let shard_id = shard_uid.shard_id(); | ||
let new_shards = next_epoch_shard_layout | ||
.get_split_shard_uids(shard_id) | ||
.ok_or(Error::InvalidShardId(shard_id))?; | ||
let mut state_roots: HashMap<_, _> = | ||
new_shards.iter().map(|shard_uid| (*shard_uid, Trie::EMPTY_ROOT)).collect(); | ||
|
||
// Build the required iterator from flat storage and delta changes. Note that we are | ||
// working with iterators as we don't want to have all the state in memory at once. | ||
// | ||
// Iterator is built by chaining the following: | ||
// 1. Flat storage iterator from the snapshot state as of `prev_prev_hash`. | ||
// 2. Delta changes iterator from the snapshot state as of `prev_hash`. | ||
// | ||
// The snapshot when created has the flat head as of `prev_prev_hash`, i.e. the hash as | ||
// of the second last block of the previous epoch. Hence we need to append the detla | ||
// changes on top of it. | ||
let (snapshot_store, flat_storage_manager) = tries.get_state_snapshot(&prev_prev_hash)?; | ||
let flat_storage_chunk_view = | ||
flat_storage_manager.chunk_view(shard_uid, prev_prev_hash).ok_or_else(|| { | ||
StorageInconsistentState("Chunk view missing for snapshot flat storage".to_string()) | ||
})?; | ||
let flat_storage_iter = | ||
flat_storage_chunk_view.iter_flat_state_entries(None, None).map(|entry| { | ||
let (key, value) = entry.unwrap(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I missed it yesterday but we should properly propagate that error. |
||
(key, Some(value)) | ||
}); | ||
|
||
let delta = store_helper::get_delta_changes(&snapshot_store, shard_uid, prev_hash) | ||
.map_err(|e| StorageInconsistentState(e.to_string()))? | ||
.ok_or_else(|| { | ||
StorageInconsistentState("Delta missing for snapshot flat storage".to_string()) | ||
})?; | ||
let delta_iter = delta.0.into_iter(); | ||
|
||
let trie_storage = TrieDBStorage::new(tries.get_store(), shard_uid); | ||
let flat_state_value_to_trie_value_fn = |value: FlatStateValue| -> Vec<u8> { | ||
match value { | ||
FlatStateValue::Ref(ref_value) => { | ||
trie_storage.retrieve_raw_bytes(&ref_value.hash).unwrap().to_vec() | ||
} | ||
FlatStateValue::Inlined(inline_value) => inline_value, | ||
} | ||
}; | ||
let mut iter = flat_storage_iter.chain(delta_iter).map( | ||
move |(key, value)| -> (Vec<u8>, Option<Vec<u8>>) { | ||
(key, value.map(flat_state_value_to_trie_value_fn)) | ||
}, | ||
); | ||
|
||
// function to map account id to shard uid in range of child shards | ||
let checked_account_id_to_shard_uid = | ||
get_checked_account_id_to_shard_uid_fn(shard_uid, new_shards, next_epoch_shard_layout); | ||
|
||
let mut iter = get_flat_storage_iter(&store, shard_uid); | ||
// Once we build the iterator, we break it into batches using the get_trie_update_batch function. | ||
while let Some(batch) = get_trie_update_batch(&mut iter) { | ||
let TrieUpdateBatch { entries, size } = batch; | ||
// TODO(#9435): This is highly inefficient as for each key in the batch, we are parsing the account_id | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can you refactor this to a separate method?
hehe gotcha ;)