Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[resharding] Use state sync snapshots to get flat storage iterator at a precise block #9598

Merged
merged 4 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 74 additions & 34 deletions chain/chain/src/resharding.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::metrics::{
ReshardingStatus, RESHARDING_BATCH_COUNT, RESHARDING_BATCH_SIZE, RESHARDING_STATUS,
};
/// Implementation for all resharding logic.
/// StateSplitRequest and StateSplitResponse are exchanged across the client_actor and SyncJobsActor.
/// build_state_for_split_shards_preprocessing and build_state_for_split_shards_postprocessing are handled
/// by the client_actor while the heavy resharding build_state_for_split_shards is done by SyncJobsActor
/// so as to not affect client.
use crate::metrics::{
ReshardingStatus, RESHARDING_BATCH_COUNT, RESHARDING_BATCH_SIZE, RESHARDING_STATUS,
};
use crate::Chain;
use itertools::Itertools;
use near_chain_primitives::error::Error;
use near_primitives::errors::StorageError::StorageInconsistentState;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{account_id_to_shard_uid, ShardLayout};
use near_primitives::state::FlatStateValue;
Expand Down Expand Up @@ -36,8 +37,15 @@ const RESHARDING_BATCH_MEMORY_LIMIT: bytesize::ByteSize = bytesize::ByteSize(300
#[rtype(result = "()")]
pub struct StateSplitRequest {
pub tries: Arc<ShardTries>,
// The block hash of the first block of the epoch.
pub sync_hash: CryptoHash,
// The prev hash of the sync_hash. We want the state at that block hash.
pub prev_hash: CryptoHash,
// The prev prev hash of the sync_hash. The state snapshot should be saved at that block hash.
pub prev_prev_hash: CryptoHash,
// Parent shardUId to be split into child shards.
pub shard_uid: ShardUId,
// state root of the parent shardUId. This is different from block sync_hash
pub state_root: StateRoot,
pub next_epoch_shard_layout: ShardLayout,
}
Expand All @@ -49,6 +57,8 @@ impl Debug for StateSplitRequest {
f.debug_struct("StateSplitRequest")
.field("tries", &"<not shown>")
.field("sync_hash", &self.sync_hash)
.field("prev_hash", &self.prev_hash)
.field("prev_prev_hash", &self.prev_prev_hash)
.field("shard_uid", &self.shard_uid)
.field("state_root", &self.state_root)
.field("next_epoch_shard_layout", &self.next_epoch_shard_layout)
Expand Down Expand Up @@ -86,28 +96,6 @@ fn get_checked_account_id_to_shard_uid_fn(
}
}

// Return iterate over flat storage to get key, value. Used later in the get_trie_update_batch function.
// TODO(#9436): This isn't completely correct. We need to get the flat storage iterator based off a
// particular block, specifically, the last block of the previous epoch.
fn get_flat_storage_iter<'a>(
store: &'a Store,
shard_uid: ShardUId,
) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid);
store_helper::iter_flat_state_entries(shard_uid, &store, None, None).map(
move |entry| -> (Vec<u8>, Vec<u8>) {
let (key, value) = entry.unwrap();
let value = match value {
FlatStateValue::Ref(ref_value) => {
trie_storage.retrieve_raw_bytes(&ref_value.hash).unwrap().to_vec()
}
FlatStateValue::Inlined(inline_value) => inline_value,
};
(key, value)
},
)
}

// Format of the trie key, value pair that is used in tries.add_values_to_split_states() function
type TrieEntry = (Vec<u8>, Option<Vec<u8>>);

Expand All @@ -119,13 +107,13 @@ struct TrieUpdateBatch {
// Function to return batches of trie key, value pairs from flat storage iter. We return None at the end of iter.
// The batch size is roughly RESHARDING_BATCH_MEMORY_LIMIT (300 MB)
fn get_trie_update_batch(
iter: &mut impl Iterator<Item = (Vec<u8>, Vec<u8>)>,
iter: &mut impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
) -> Option<TrieUpdateBatch> {
let mut size: u64 = 0;
let mut entries = Vec::new();
while let Some((key, value)) = iter.next() {
size += key.len() as u64 + value.len() as u64;
entries.push((key, Some(value)));
size += key.len() as u64 + value.as_ref().map_or(0, |v| v.len() as u64);
entries.push((key, value));
if size > RESHARDING_BATCH_MEMORY_LIMIT.as_u64() {
break;
}
Expand Down Expand Up @@ -198,11 +186,15 @@ impl Chain {

let shard_uid = ShardUId::from_shard_id_and_layout(shard_id, &shard_layout);
let prev_hash = block_header.prev_hash();
let prev_block_header = self.get_block_header(prev_hash)?;
let prev_prev_hash = prev_block_header.prev_hash();
let state_root = *self.get_chunk_extra(&prev_hash, &shard_uid)?.state_root();

state_split_scheduler(StateSplitRequest {
tries: Arc::new(self.runtime_adapter.get_tries()),
sync_hash: *sync_hash,
prev_hash: *prev_hash,
prev_prev_hash: *prev_prev_hash,
shard_uid,
state_root,
next_epoch_shard_layout,
Expand Down Expand Up @@ -269,27 +261,75 @@ impl Chain {
fn build_state_for_split_shards_impl_v2(
state_split_request: StateSplitRequest,
) -> Result<HashMap<ShardUId, StateRoot>, Error> {
let StateSplitRequest { tries, shard_uid, state_root, next_epoch_shard_layout, .. } =
state_split_request;
let store = tries.get_store();

let shard_id = shard_uid.shard_id();
let StateSplitRequest {
tries,
prev_hash,
prev_prev_hash,
shard_uid,
state_root,
next_epoch_shard_layout,
..
} = state_split_request;

RESHARDING_STATUS
.with_label_values(&[&shard_uid.to_string()])
.set(ReshardingStatus::BuildingState.into());

let shard_id = shard_uid.shard_id();
let new_shards = next_epoch_shard_layout
.get_split_shard_uids(shard_id)
.ok_or(Error::InvalidShardId(shard_id))?;
let mut state_roots: HashMap<_, _> =
new_shards.iter().map(|shard_uid| (*shard_uid, Trie::EMPTY_ROOT)).collect();

// Build the required iterator from flat storage and delta changes. Note that we are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Can you refactor this to a separate method?

hehe gotcha ;)

// working with iterators as we don't want to have all the state in memory at once.
//
// Iterator is built by chaining the following:
// 1. Flat storage iterator from the snapshot state as of `prev_prev_hash`.
// 2. Delta changes iterator from the snapshot state as of `prev_hash`.
//
// The snapshot when created has the flat head as of `prev_prev_hash`, i.e. the hash as
// of the second last block of the previous epoch. Hence we need to append the detla
// changes on top of it.
let (snapshot_store, flat_storage_manager) = tries.get_state_snapshot(&prev_prev_hash)?;
let flat_storage_chunk_view =
flat_storage_manager.chunk_view(shard_uid, prev_prev_hash).ok_or_else(|| {
StorageInconsistentState("Chunk view missing for snapshot flat storage".to_string())
})?;
let flat_storage_iter =
flat_storage_chunk_view.iter_flat_state_entries(None, None).map(|entry| {
let (key, value) = entry.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I missed it yesterday but we should properly propagate that error.

(key, Some(value))
});

let delta = store_helper::get_delta_changes(&snapshot_store, shard_uid, prev_hash)
.map_err(|e| StorageInconsistentState(e.to_string()))?
.ok_or_else(|| {
StorageInconsistentState("Delta missing for snapshot flat storage".to_string())
})?;
let delta_iter = delta.0.into_iter();

let trie_storage = TrieDBStorage::new(tries.get_store(), shard_uid);
let flat_state_value_to_trie_value_fn = |value: FlatStateValue| -> Vec<u8> {
match value {
FlatStateValue::Ref(ref_value) => {
trie_storage.retrieve_raw_bytes(&ref_value.hash).unwrap().to_vec()
}
FlatStateValue::Inlined(inline_value) => inline_value,
}
};
let mut iter = flat_storage_iter.chain(delta_iter).map(
move |(key, value)| -> (Vec<u8>, Option<Vec<u8>>) {
(key, value.map(flat_state_value_to_trie_value_fn))
},
);

// function to map account id to shard uid in range of child shards
let checked_account_id_to_shard_uid =
get_checked_account_id_to_shard_uid_fn(shard_uid, new_shards, next_epoch_shard_layout);

let mut iter = get_flat_storage_iter(&store, shard_uid);
// Once we build the iterator, we break it into batches using the get_trie_update_batch function.
while let Some(batch) = get_trie_update_batch(&mut iter) {
let TrieUpdateBatch { entries, size } = batch;
// TODO(#9435): This is highly inefficient as for each key in the batch, we are parsing the account_id
Expand Down
60 changes: 29 additions & 31 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,43 +248,41 @@ impl ShardTries {
self.get_trie_for_shard_internal(shard_uid, state_root, false, None)
}

pub fn get_trie_with_block_hash_for_shard_from_snapshot(
pub fn get_state_snapshot(
&self,
shard_uid: ShardUId,
state_root: StateRoot,
block_hash: &CryptoHash,
) -> Result<Trie, StorageError> {
let (store, flat_storage_manager) = {
// Taking this lock can last up to 10 seconds, if the snapshot happens to be re-created.
match self.0.state_snapshot.try_read() {
Ok(guard) => {
if let Some(data) = guard.as_ref() {
if &data.prev_block_hash != block_hash {
return Err(StorageInconsistentState(format!(
"Wrong state snapshot. Requested: {:?}, Available: {:?}",
block_hash, data.prev_block_hash
)));
}
(data.store.clone(), data.flat_storage_manager.clone())
} else {
return Err(StorageInconsistentState(
"No state snapshot available".to_string(),
));
) -> Result<(Store, FlatStorageManager), StorageError> {
// Taking this lock can last up to 10 seconds, if the snapshot happens to be re-created.
match self.0.state_snapshot.try_read() {
Ok(guard) => {
if let Some(data) = guard.as_ref() {
if &data.prev_block_hash != block_hash {
return Err(StorageInconsistentState(format!(
"Wrong state snapshot. Requested: {:?}, Available: {:?}",
block_hash, data.prev_block_hash
)));
}
}
Err(TryLockError::WouldBlock) => {
return Err(StorageInconsistentState(
"Accessing state snapshot would block. Retry in a few seconds.".to_string(),
));
}
Err(err) => {
return Err(StorageInconsistentState(format!(
"Can't access state snapshot: {err:?}"
)));
Ok((data.store.clone(), data.flat_storage_manager.clone()))
} else {
Err(StorageInconsistentState("No state snapshot available".to_string()))
}
}
};
Err(TryLockError::WouldBlock) => Err(StorageInconsistentState(
"Accessing state snapshot would block. Retry in a few seconds.".to_string(),
)),
Err(err) => {
Err(StorageInconsistentState(format!("Can't access state snapshot: {err:?}")))
}
}
}

pub fn get_trie_with_block_hash_for_shard_from_snapshot(
&self,
shard_uid: ShardUId,
state_root: StateRoot,
block_hash: &CryptoHash,
) -> Result<Trie, StorageError> {
let (store, flat_storage_manager) = self.get_state_snapshot(block_hash)?;
let cache = {
let mut caches = self.0.view_caches.write().expect(POISONED_LOCK_ERR);
caches
Expand Down