Skip to content

Commit

Permalink
[resharding] Include state_changes in ApplySplitStateResult for split…
Browse files Browse the repository at this point in the history
…_state (#9419)

For more context on the change, please look at #9420 and #9418
  • Loading branch information
Shreyan Gupta authored Aug 14, 2023
1 parent 22f49f0 commit 1a928e7
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 43 deletions.
13 changes: 7 additions & 6 deletions chain/chain/src/resharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ mod tests {
state_root = new_state_root;

// update split states
let trie_changes = tries
let mut trie_updates = tries
.apply_state_changes_to_split_states(
&split_state_roots,
StateChangesForSplitStates::from_raw_state_changes(
Expand All @@ -372,13 +372,14 @@ mod tests {
account_id_to_shard_id,
)
.unwrap();
split_state_roots = trie_changes
.iter()
.map(|(shard_uid, trie_changes)| {
split_state_roots = trie_updates
.drain()
.map(|(shard_uid, trie_update)| {
let mut state_update = tries.store_update();
let state_root = tries.apply_all(trie_changes, *shard_uid, &mut state_update);
let (_, trie_changes, _) = trie_update.finalize().unwrap();
let state_root = tries.apply_all(&trie_changes, shard_uid, &mut state_update);
state_update.commit().unwrap();
(*shard_uid, state_root)
(shard_uid, state_root)
})
.collect();

Expand Down
2 changes: 1 addition & 1 deletion core/primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl StateChangesKinds {
}

/// A structure used to index state changes due to transaction/receipt processing and other things.
#[derive(Debug, Clone, BorshSerialize, BorshDeserialize)]
#[derive(Debug, Clone, BorshSerialize, BorshDeserialize, PartialEq)]
pub enum StateChangeCause {
/// A type of update that does not get finalized. Used for verification and execution of
/// immutable smart contract methods. Attempt fo finalize a `TrieUpdate` containing such
Expand Down
17 changes: 9 additions & 8 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ impl WrappedTrieChanges {
///
/// NOTE: the changes are drained from `self`.
pub fn state_changes_into(&mut self, store_update: &mut StoreUpdate) {
for change_with_trie_key in self.state_changes.drain(..) {
for mut change_with_trie_key in self.state_changes.drain(..) {
assert!(
!change_with_trie_key.changes.iter().any(|RawStateChange { cause, .. }| matches!(
cause,
Expand All @@ -777,13 +777,14 @@ impl WrappedTrieChanges {
"NotWritableToDisk changes must never be finalized."
);

assert!(
!change_with_trie_key.changes.iter().any(|RawStateChange { cause, .. }| matches!(
cause,
StateChangeCause::Resharding
)),
"Resharding changes must never be finalized."
);
// Resharding changes must not be finalized, however they may be introduced here when we are
// evaluating changes for split state in process_split_state function
change_with_trie_key
.changes
.retain(|change| change.cause != StateChangeCause::Resharding);
if change_with_trie_key.changes.is_empty() {
continue;
}

let storage_key = if cfg!(feature = "serialize_all_state_changes") {
// Serialize all kinds of state changes without any filtering.
Expand Down
24 changes: 12 additions & 12 deletions core/store/src/trie/split_state.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::flat::FlatStateChanges;
use crate::trie::iterator::TrieItem;
use crate::{
get, get_delayed_receipt_indices, set, ShardTries, StoreUpdate, Trie, TrieChanges, TrieUpdate,
};
use crate::{get, get_delayed_receipt_indices, set, ShardTries, StoreUpdate, Trie, TrieUpdate};
use borsh::BorshDeserialize;
use bytesize::ByteSize;
use near_primitives::account::id::AccountId;
Expand Down Expand Up @@ -34,7 +32,7 @@ impl Trie {

impl ShardTries {
/// applies `changes` to split states
/// and returns the generated TrieChanges for all split states
/// and returns the generated TrieUpdate for all split states
/// Note that this function is different from the function `add_values_to_split_states`
/// This function is used for applying updates to split states when processing blocks
/// `add_values_to_split_states` are used to generate the initial states for shards split
Expand All @@ -44,7 +42,7 @@ impl ShardTries {
state_roots: &HashMap<ShardUId, StateRoot>,
changes: StateChangesForSplitStates,
account_id_to_shard_id: &dyn Fn(&AccountId) -> ShardUId,
) -> Result<HashMap<ShardUId, TrieChanges>, StorageError> {
) -> Result<HashMap<ShardUId, TrieUpdate>, StorageError> {
let mut trie_updates: HashMap<_, _> = self.get_trie_updates(state_roots);
let mut insert_receipts = Vec::new();
for ConsolidatedStateChange { trie_key, value } in changes.changes {
Expand Down Expand Up @@ -72,7 +70,8 @@ impl ShardTries {
| TrieKey::PostponedReceipt { receiver_id: account_id, .. }
| TrieKey::ContractData { account_id, .. } => {
let new_shard_uid = account_id_to_shard_id(account_id);
// we can safely unwrap here because the caller of this function guarantees trie_updates contains all shard_uids for the new shards
// we can safely unwrap here because the caller of this function guarantees trie_updates
// contains all shard_uids for the new shards
let trie_update = trie_updates.get_mut(&new_shard_uid).unwrap();
match value {
Some(value) => trie_update.set(trie_key, value),
Expand All @@ -82,6 +81,9 @@ impl ShardTries {
}
}
for (_, update) in trie_updates.iter_mut() {
// StateChangeCause should always be Resharding for processing split state.
// We do not want to commit the state_changes from resharding as they are already handled while
// processing parent shard
update.commit(StateChangeCause::Resharding);
}

Expand All @@ -97,12 +99,7 @@ impl ShardTries {
account_id_to_shard_id,
)?;

let mut trie_changes_map = HashMap::new();
for (shard_uid, update) in trie_updates {
let (_, trie_changes, _) = update.finalize()?;
trie_changes_map.insert(shard_uid, trie_changes);
}
Ok(trie_changes_map)
Ok(trie_updates)
}

/// add `values` (key-value pairs of items stored in states) to build states for new shards
Expand Down Expand Up @@ -275,6 +272,9 @@ fn apply_delayed_receipts_to_split_states_impl(
TrieKey::DelayedReceiptIndices,
delayed_receipts_indices_by_shard.get(shard_uid).unwrap(),
);
// StateChangeCause should always be Resharding for processing split state.
// We do not want to commit the state_changes from resharding as they are already handled while
// processing parent shard
trie_update.commit(StateChangeCause::Resharding);
}
Ok(())
Expand Down
44 changes: 28 additions & 16 deletions nearcore/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1076,28 +1076,40 @@ impl RuntimeAdapter for NightshadeRuntime {
block_hash: &CryptoHash,
state_roots: HashMap<ShardUId, StateRoot>,
next_epoch_shard_layout: &ShardLayout,
state_changes: StateChangesForSplitStates,
state_changes_for_split_states: StateChangesForSplitStates,
) -> Result<Vec<ApplySplitStateResult>, Error> {
let trie_changes = self.tries.apply_state_changes_to_split_states(
let trie_updates = self.tries.apply_state_changes_to_split_states(
&state_roots,
state_changes,
state_changes_for_split_states,
&|account_id| account_id_to_shard_uid(account_id, next_epoch_shard_layout),
)?;

Ok(trie_changes
.into_iter()
.map(|(shard_uid, trie_changes)| ApplySplitStateResult {
let mut applied_split_state_results: Vec<_> = vec![];
for (shard_uid, trie_update) in trie_updates {
let (_, trie_changes, state_changes) = trie_update.finalize()?;
// All state changes that are related to split state should have StateChangeCause as Resharding
// We do not want to commit the state_changes from resharding as they are already handled while
// processing parent shard
debug_assert!(state_changes.iter().all(|raw_state_changes| raw_state_changes
.changes
.iter()
.all(|state_change| state_change.cause == StateChangeCause::Resharding)));
let new_root = trie_changes.new_root;
let wrapped_trie_changes = WrappedTrieChanges::new(
self.get_tries(),
shard_uid,
trie_changes,
state_changes,
*block_hash,
);
applied_split_state_results.push(ApplySplitStateResult {
shard_uid,
new_root: trie_changes.new_root,
trie_changes: WrappedTrieChanges::new(
self.get_tries(),
shard_uid,
trie_changes,
vec![],
*block_hash,
),
})
.collect())
new_root,
trie_changes: wrapped_trie_changes,
});
}

Ok(applied_split_state_results)
}

fn apply_state_part(
Expand Down

0 comments on commit 1a928e7

Please sign in to comment.