Skip to content

Commit

Permalink
[resharding] Use state sync snapshots to get flat storage iterator at…
Browse files Browse the repository at this point in the history
… a precise block (#9598)

With the current implementation for flat storage iterator, we have the
issue that we can not control the block over which we create the
iterator. A solution to that is to use the snapshot mechanism from state
sync.

We can assume that the snapshot would be created on all nodes as of the
last block of an epoch.

The way we get the correct state for resharding is that we take the
snapshot as of the `prev_prev_hash` and append the delta from the
`prev_hash` so that the state of the child tries/shards matches that of
the last block of prev epoch.
  • Loading branch information
Shreyan Gupta authored Oct 2, 2023
1 parent 9ed6866 commit 50e5ef4
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 65 deletions.
108 changes: 74 additions & 34 deletions chain/chain/src/resharding.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::metrics::{
ReshardingStatus, RESHARDING_BATCH_COUNT, RESHARDING_BATCH_SIZE, RESHARDING_STATUS,
};
/// Implementation for all resharding logic.
/// StateSplitRequest and StateSplitResponse are exchanged across the client_actor and SyncJobsActor.
/// build_state_for_split_shards_preprocessing and build_state_for_split_shards_postprocessing are handled
/// by the client_actor while the heavy resharding build_state_for_split_shards is done by SyncJobsActor
/// so as to not affect client.
use crate::metrics::{
ReshardingStatus, RESHARDING_BATCH_COUNT, RESHARDING_BATCH_SIZE, RESHARDING_STATUS,
};
use crate::Chain;
use itertools::Itertools;
use near_chain_primitives::error::Error;
use near_primitives::errors::StorageError::StorageInconsistentState;
use near_primitives::hash::CryptoHash;
use near_primitives::shard_layout::{account_id_to_shard_uid, ShardLayout};
use near_primitives::state::FlatStateValue;
Expand Down Expand Up @@ -36,8 +37,15 @@ const RESHARDING_BATCH_MEMORY_LIMIT: bytesize::ByteSize = bytesize::ByteSize(300
#[rtype(result = "()")]
pub struct StateSplitRequest {
pub tries: Arc<ShardTries>,
// The block hash of the first block of the epoch.
pub sync_hash: CryptoHash,
// The prev hash of the sync_hash. We want the state at that block hash.
pub prev_hash: CryptoHash,
// The prev prev hash of the sync_hash. The state snapshot should be saved at that block hash.
pub prev_prev_hash: CryptoHash,
// Parent shardUId to be split into child shards.
pub shard_uid: ShardUId,
// state root of the parent shardUId. This is different from block sync_hash
pub state_root: StateRoot,
pub next_epoch_shard_layout: ShardLayout,
}
Expand All @@ -49,6 +57,8 @@ impl Debug for StateSplitRequest {
f.debug_struct("StateSplitRequest")
.field("tries", &"<not shown>")
.field("sync_hash", &self.sync_hash)
.field("prev_hash", &self.prev_hash)
.field("prev_prev_hash", &self.prev_prev_hash)
.field("shard_uid", &self.shard_uid)
.field("state_root", &self.state_root)
.field("next_epoch_shard_layout", &self.next_epoch_shard_layout)
Expand Down Expand Up @@ -86,28 +96,6 @@ fn get_checked_account_id_to_shard_uid_fn(
}
}

// Return iterate over flat storage to get key, value. Used later in the get_trie_update_batch function.
// TODO(#9436): This isn't completely correct. We need to get the flat storage iterator based off a
// particular block, specifically, the last block of the previous epoch.
fn get_flat_storage_iter<'a>(
store: &'a Store,
shard_uid: ShardUId,
) -> impl Iterator<Item = (Vec<u8>, Vec<u8>)> + 'a {
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid);
store_helper::iter_flat_state_entries(shard_uid, &store, None, None).map(
move |entry| -> (Vec<u8>, Vec<u8>) {
let (key, value) = entry.unwrap();
let value = match value {
FlatStateValue::Ref(ref_value) => {
trie_storage.retrieve_raw_bytes(&ref_value.hash).unwrap().to_vec()
}
FlatStateValue::Inlined(inline_value) => inline_value,
};
(key, value)
},
)
}

// Format of the trie key, value pair that is used in tries.add_values_to_split_states() function
type TrieEntry = (Vec<u8>, Option<Vec<u8>>);

Expand All @@ -119,13 +107,13 @@ struct TrieUpdateBatch {
// Function to return batches of trie key, value pairs from flat storage iter. We return None at the end of iter.
// The batch size is roughly RESHARDING_BATCH_MEMORY_LIMIT (300 MB)
fn get_trie_update_batch(
iter: &mut impl Iterator<Item = (Vec<u8>, Vec<u8>)>,
iter: &mut impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
) -> Option<TrieUpdateBatch> {
let mut size: u64 = 0;
let mut entries = Vec::new();
while let Some((key, value)) = iter.next() {
size += key.len() as u64 + value.len() as u64;
entries.push((key, Some(value)));
size += key.len() as u64 + value.as_ref().map_or(0, |v| v.len() as u64);
entries.push((key, value));
if size > RESHARDING_BATCH_MEMORY_LIMIT.as_u64() {
break;
}
Expand Down Expand Up @@ -198,11 +186,15 @@ impl Chain {

let shard_uid = ShardUId::from_shard_id_and_layout(shard_id, &shard_layout);
let prev_hash = block_header.prev_hash();
let prev_block_header = self.get_block_header(prev_hash)?;
let prev_prev_hash = prev_block_header.prev_hash();
let state_root = *self.get_chunk_extra(&prev_hash, &shard_uid)?.state_root();

state_split_scheduler(StateSplitRequest {
tries: Arc::new(self.runtime_adapter.get_tries()),
sync_hash: *sync_hash,
prev_hash: *prev_hash,
prev_prev_hash: *prev_prev_hash,
shard_uid,
state_root,
next_epoch_shard_layout,
Expand Down Expand Up @@ -269,27 +261,75 @@ impl Chain {
fn build_state_for_split_shards_impl_v2(
state_split_request: StateSplitRequest,
) -> Result<HashMap<ShardUId, StateRoot>, Error> {
let StateSplitRequest { tries, shard_uid, state_root, next_epoch_shard_layout, .. } =
state_split_request;
let store = tries.get_store();

let shard_id = shard_uid.shard_id();
let StateSplitRequest {
tries,
prev_hash,
prev_prev_hash,
shard_uid,
state_root,
next_epoch_shard_layout,
..
} = state_split_request;

RESHARDING_STATUS
.with_label_values(&[&shard_uid.to_string()])
.set(ReshardingStatus::BuildingState.into());

let shard_id = shard_uid.shard_id();
let new_shards = next_epoch_shard_layout
.get_split_shard_uids(shard_id)
.ok_or(Error::InvalidShardId(shard_id))?;
let mut state_roots: HashMap<_, _> =
new_shards.iter().map(|shard_uid| (*shard_uid, Trie::EMPTY_ROOT)).collect();

// Build the required iterator from flat storage and delta changes. Note that we are
// working with iterators as we don't want to have all the state in memory at once.
//
// Iterator is built by chaining the following:
// 1. Flat storage iterator from the snapshot state as of `prev_prev_hash`.
// 2. Delta changes iterator from the snapshot state as of `prev_hash`.
//
// The snapshot when created has the flat head as of `prev_prev_hash`, i.e. the hash as
// of the second last block of the previous epoch. Hence we need to append the detla
// changes on top of it.
let (snapshot_store, flat_storage_manager) = tries.get_state_snapshot(&prev_prev_hash)?;
let flat_storage_chunk_view =
flat_storage_manager.chunk_view(shard_uid, prev_prev_hash).ok_or_else(|| {
StorageInconsistentState("Chunk view missing for snapshot flat storage".to_string())
})?;
let flat_storage_iter =
flat_storage_chunk_view.iter_flat_state_entries(None, None).map(|entry| {
let (key, value) = entry.unwrap();
(key, Some(value))
});

let delta = store_helper::get_delta_changes(&snapshot_store, shard_uid, prev_hash)
.map_err(|e| StorageInconsistentState(e.to_string()))?
.ok_or_else(|| {
StorageInconsistentState("Delta missing for snapshot flat storage".to_string())
})?;
let delta_iter = delta.0.into_iter();

let trie_storage = TrieDBStorage::new(tries.get_store(), shard_uid);
let flat_state_value_to_trie_value_fn = |value: FlatStateValue| -> Vec<u8> {
match value {
FlatStateValue::Ref(ref_value) => {
trie_storage.retrieve_raw_bytes(&ref_value.hash).unwrap().to_vec()
}
FlatStateValue::Inlined(inline_value) => inline_value,
}
};
let mut iter = flat_storage_iter.chain(delta_iter).map(
move |(key, value)| -> (Vec<u8>, Option<Vec<u8>>) {
(key, value.map(flat_state_value_to_trie_value_fn))
},
);

// function to map account id to shard uid in range of child shards
let checked_account_id_to_shard_uid =
get_checked_account_id_to_shard_uid_fn(shard_uid, new_shards, next_epoch_shard_layout);

let mut iter = get_flat_storage_iter(&store, shard_uid);
// Once we build the iterator, we break it into batches using the get_trie_update_batch function.
while let Some(batch) = get_trie_update_batch(&mut iter) {
let TrieUpdateBatch { entries, size } = batch;
// TODO(#9435): This is highly inefficient as for each key in the batch, we are parsing the account_id
Expand Down
60 changes: 29 additions & 31 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,43 +248,41 @@ impl ShardTries {
self.get_trie_for_shard_internal(shard_uid, state_root, false, None)
}

pub fn get_trie_with_block_hash_for_shard_from_snapshot(
pub fn get_state_snapshot(
&self,
shard_uid: ShardUId,
state_root: StateRoot,
block_hash: &CryptoHash,
) -> Result<Trie, StorageError> {
let (store, flat_storage_manager) = {
// Taking this lock can last up to 10 seconds, if the snapshot happens to be re-created.
match self.0.state_snapshot.try_read() {
Ok(guard) => {
if let Some(data) = guard.as_ref() {
if &data.prev_block_hash != block_hash {
return Err(StorageInconsistentState(format!(
"Wrong state snapshot. Requested: {:?}, Available: {:?}",
block_hash, data.prev_block_hash
)));
}
(data.store.clone(), data.flat_storage_manager.clone())
} else {
return Err(StorageInconsistentState(
"No state snapshot available".to_string(),
));
) -> Result<(Store, FlatStorageManager), StorageError> {
// Taking this lock can last up to 10 seconds, if the snapshot happens to be re-created.
match self.0.state_snapshot.try_read() {
Ok(guard) => {
if let Some(data) = guard.as_ref() {
if &data.prev_block_hash != block_hash {
return Err(StorageInconsistentState(format!(
"Wrong state snapshot. Requested: {:?}, Available: {:?}",
block_hash, data.prev_block_hash
)));
}
}
Err(TryLockError::WouldBlock) => {
return Err(StorageInconsistentState(
"Accessing state snapshot would block. Retry in a few seconds.".to_string(),
));
}
Err(err) => {
return Err(StorageInconsistentState(format!(
"Can't access state snapshot: {err:?}"
)));
Ok((data.store.clone(), data.flat_storage_manager.clone()))
} else {
Err(StorageInconsistentState("No state snapshot available".to_string()))
}
}
};
Err(TryLockError::WouldBlock) => Err(StorageInconsistentState(
"Accessing state snapshot would block. Retry in a few seconds.".to_string(),
)),
Err(err) => {
Err(StorageInconsistentState(format!("Can't access state snapshot: {err:?}")))
}
}
}

pub fn get_trie_with_block_hash_for_shard_from_snapshot(
&self,
shard_uid: ShardUId,
state_root: StateRoot,
block_hash: &CryptoHash,
) -> Result<Trie, StorageError> {
let (store, flat_storage_manager) = self.get_state_snapshot(block_hash)?;
let cache = {
let mut caches = self.0.view_caches.write().expect(POISONED_LOCK_ERR);
caches
Expand Down

0 comments on commit 50e5ef4

Please sign in to comment.