Skip to content

Commit

Permalink
feat: Dump and apply state changes for a range of blocks. (#8628)
Browse files Browse the repository at this point in the history
This simulates shard shadowing.
In order to fully correctly replay state changes and get matching state roots, all kinds of state changes need to be applied, including the ones which currently are filtered out. To achieve this I add a compilation feature.
Furthermore, `DelayedReceipt{,Indices}` state changes have no information about the shard they are coming from, which leads to two issues:
1) If two shards have a `DelayedReceipt` change, they will override one another, and the StateChanges column will contain only one of them.
2) We don't know to which shard to apply these state changes.

The solution is to encode `ShardUId` in the row key of the `StateChanges` column for these two kids of state changes.
The other kinds of state changes are matched to shards by their `account_id`s or `receiver_id`s. Combination of these two solutions lets us have a backwards-compatible solution.
  • Loading branch information
nikurt authored Mar 9, 2023
1 parent 4b66a6c commit cd12121
Show file tree
Hide file tree
Showing 10 changed files with 520 additions and 32 deletions.
83 changes: 80 additions & 3 deletions core/primitives/src/trie_key.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::hash::CryptoHash;
use crate::types::AccountId;
use std::mem::size_of;

use borsh::{BorshDeserialize, BorshSerialize};

use near_crypto::PublicKey;
use std::mem::size_of;

use crate::hash::CryptoHash;
use crate::types::AccountId;

pub(crate) const ACCOUNT_DATA_SEPARATOR: u8 = b',';

Expand Down Expand Up @@ -208,6 +211,22 @@ impl TrieKey {
self.append_into(&mut buf);
buf
}

/// Extracts account id from a TrieKey if available.
pub fn get_account_id(&self) -> Option<AccountId> {
match self {
TrieKey::Account { account_id, .. } => Some(account_id.clone()),
TrieKey::ContractCode { account_id, .. } => Some(account_id.clone()),
TrieKey::AccessKey { account_id, .. } => Some(account_id.clone()),
TrieKey::ReceivedData { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::PostponedReceiptId { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::PendingDataCount { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::PostponedReceipt { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::DelayedReceiptIndices => None,
TrieKey::DelayedReceipt { .. } => None,
TrieKey::ContractData { account_id, .. } => Some(account_id.clone()),
}
}
}

// TODO: Remove once we switch to non-raw keys everywhere.
Expand Down Expand Up @@ -624,4 +643,62 @@ mod tests {
let raw_key = key.to_vec();
assert!(trie_key_parsers::parse_account_id_from_raw_key(&raw_key).unwrap().is_none());
}

#[test]
fn test_account_id_from_trie_key() {
let account_id = OK_ACCOUNT_IDS[0].parse::<AccountId>().unwrap();

assert_eq!(
TrieKey::Account { account_id: account_id.clone() }.get_account_id(),
Some(account_id.clone())
);
assert_eq!(
TrieKey::ContractCode { account_id: account_id.clone() }.get_account_id(),
Some(account_id.clone())
);
assert_eq!(
TrieKey::AccessKey {
account_id: account_id.clone(),
public_key: PublicKey::empty(KeyType::ED25519)
}
.get_account_id(),
Some(account_id.clone())
);
assert_eq!(
TrieKey::ReceivedData { receiver_id: account_id.clone(), data_id: Default::default() }
.get_account_id(),
Some(account_id.clone())
);
assert_eq!(
TrieKey::PostponedReceiptId {
receiver_id: account_id.clone(),
data_id: Default::default()
}
.get_account_id(),
Some(account_id.clone())
);
assert_eq!(
TrieKey::PendingDataCount {
receiver_id: account_id.clone(),
receipt_id: Default::default()
}
.get_account_id(),
Some(account_id.clone())
);
assert_eq!(
TrieKey::PostponedReceipt {
receiver_id: account_id.clone(),
receipt_id: Default::default()
}
.get_account_id(),
Some(account_id.clone())
);
assert_eq!(TrieKey::DelayedReceipt { index: Default::default() }.get_account_id(), None);
assert_eq!(TrieKey::DelayedReceiptIndices.get_account_id(), None);
assert_eq!(
TrieKey::ContractData { account_id: account_id.clone(), key: Default::default() }
.get_account_id(),
Some(account_id.clone())
);
}
}
22 changes: 22 additions & 0 deletions core/primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,3 +975,25 @@ impl TrieNodesCount {
})
}
}

/// State changes for a range of blocks.
/// Expects that a block is present at most once in the list.
#[derive(borsh::BorshDeserialize, borsh::BorshSerialize)]
pub struct StateChangesForBlockRange {
pub blocks: Vec<StateChangesForBlock>,
}

/// State changes for a single block.
/// Expects that a shard is present at most once in the list of state changes.
#[derive(borsh::BorshDeserialize, borsh::BorshSerialize)]
pub struct StateChangesForBlock {
pub block_hash: CryptoHash,
pub state_changes: Vec<StateChangesForShard>,
}

/// Key and value of a StateChanges column.
#[derive(borsh::BorshDeserialize, borsh::BorshSerialize)]
pub struct StateChangesForShard {
pub shard_id: ShardId,
pub state_changes: Vec<RawStateChangesWithTrieKey>,
}
2 changes: 2 additions & 0 deletions core/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ assert_matches.workspace = true
bencher.workspace = true
insta.workspace = true
rand.workspace = true
thiserror.workspace = true

[[bench]]
name = "trie_bench"
Expand All @@ -56,6 +57,7 @@ no_cache = []
single_thread_rocksdb = [] # Deactivate RocksDB IO background threads
test_features = []
protocol_feature_flat_state = []
serialize_all_state_changes = []

nightly_protocol = []
nightly = [
Expand Down
193 changes: 170 additions & 23 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::io;
use std::rc::Rc;
use std::sync::{Arc, RwLock};

Expand Down Expand Up @@ -383,18 +382,42 @@ impl WrappedTrieChanges {
"Resharding changes must never be finalized."
);

// Filtering trie keys for user facing RPC reporting.
// NOTE: If the trie key is not one of the account specific, it may cause key conflict
// when the node tracks multiple shards. See #2563.
match &change_with_trie_key.trie_key {
TrieKey::Account { .. }
| TrieKey::ContractCode { .. }
| TrieKey::AccessKey { .. }
| TrieKey::ContractData { .. } => {}
_ => continue,
let storage_key = if cfg!(feature = "serialize_all_state_changes") {
// Serialize all kinds of state changes without any filtering.
// Without this it's not possible to replay state changes to get an identical state root.

// This branch will become the default in the near future.

match change_with_trie_key.trie_key.get_account_id() {
// If a TrieKey itself doesn't identify the Shard, then we need to add shard id to the row key.
None => KeyForStateChanges::delayed_receipt_key_from_trie_key(
&self.block_hash,
&change_with_trie_key.trie_key,
&self.shard_uid,
),
// TrieKey has enough information to identify the shard it comes from.
_ => KeyForStateChanges::from_trie_key(
&self.block_hash,
&change_with_trie_key.trie_key,
),
}
} else {
// This branch is the current neard behavior.
// Only a subset of state changes get serialized.

// Filtering trie keys for user facing RPC reporting.
// NOTE: If the trie key is not one of the account specific, it may cause key conflict
// when the node tracks multiple shards. See #2563.
match &change_with_trie_key.trie_key {
TrieKey::Account { .. }
| TrieKey::ContractCode { .. }
| TrieKey::AccessKey { .. }
| TrieKey::ContractData { .. } => {}
_ => continue,
};
KeyForStateChanges::from_trie_key(&self.block_hash, &change_with_trie_key.trie_key)
};
let storage_key =
KeyForStateChanges::from_trie_key(&self.block_hash, &change_with_trie_key.trie_key);

store_update.set(
DBCol::StateChanges,
storage_key.as_ref(),
Expand All @@ -403,7 +426,7 @@ impl WrappedTrieChanges {
}
}

pub fn trie_changes_into(&mut self, store_update: &mut StoreUpdate) -> io::Result<()> {
pub fn trie_changes_into(&mut self, store_update: &mut StoreUpdate) -> std::io::Result<()> {
store_update.set_ser(
DBCol::TrieChanges,
&shard_layout::get_block_shard_uid(&self.block_hash, &self.shard_uid),
Expand All @@ -412,6 +435,12 @@ impl WrappedTrieChanges {
}
}

#[derive(thiserror::Error, Debug)]
pub enum KeyForStateChangesError {
#[error("Row key of StateChange of kind DelayedReceipt or DelayedReceiptIndices doesn't contain ShardUId: row_key: {0:?} ; trie_key: {1:?}")]
DelayedReceiptRowKeyError(Vec<u8>, TrieKey),
}

#[derive(derive_more::AsRef, derive_more::Into)]
pub struct KeyForStateChanges(Vec<u8>);

Expand Down Expand Up @@ -443,22 +472,46 @@ impl KeyForStateChanges {
key
}

/// Without changing the existing TrieKey format, encodes ShardUId into the row key.
/// See `delayed_receipt_key_from_trie_key` for decoding this row key.
pub fn delayed_receipt_key_from_trie_key(
block_hash: &CryptoHash,
trie_key: &TrieKey,
shard_uid: &ShardUId,
) -> Self {
let mut key = Self::new(block_hash, trie_key.len() + std::mem::size_of::<ShardUId>());
trie_key.append_into(&mut key.0);
key.0.extend(shard_uid.to_bytes());
key
}

/// Extracts ShardUId from row key which contains ShardUId encoded.
/// See `delayed_receipt_key_from_trie_key` for encoding ShardUId into the row key.
pub fn delayed_receipt_key_decode_shard_uid(
row_key: &[u8],
block_hash: &CryptoHash,
trie_key: &TrieKey,
) -> Result<ShardUId, KeyForStateChangesError> {
let prefix = KeyForStateChanges::from_trie_key(block_hash, trie_key);
let prefix = prefix.as_ref();

let suffix = &row_key[prefix.len()..];
let shard_uid = ShardUId::try_from(suffix).map_err(|_err| {
KeyForStateChangesError::DelayedReceiptRowKeyError(row_key.to_vec(), trie_key.clone())
})?;
Ok(shard_uid)
}

/// Iterates over deserialized row values where row key matches `self`.
pub fn find_iter<'a>(
&'a self,
store: &'a Store,
) -> impl Iterator<Item = Result<RawStateChangesWithTrieKey, std::io::Error>> + 'a {
let prefix_len = Self::estimate_prefix_len();
debug_assert!(self.0.len() >= prefix_len);
store.iter_prefix_ser::<RawStateChangesWithTrieKey>(DBCol::StateChanges, &self.0).map(
move |change| {
// Split off the irrelevant part of the key, so only the original trie_key is left.
let (key, state_changes) = change?;
debug_assert!(key.starts_with(&self.0));
Ok(state_changes)
},
)
// Split off the irrelevant part of the key, so only the original trie_key is left.
self.find_rows_iter(store).map(|row| row.map(|kv| kv.1))
}

/// Iterates over deserialized row values where the row key matches `self` exactly.
pub fn find_exact_iter<'a>(
&'a self,
store: &'a Store,
Expand All @@ -480,4 +533,98 @@ impl KeyForStateChanges {
}
})
}

/// Iterates over pairs of `(row key, deserialized row value)` where row key matches `self`.
pub fn find_rows_iter<'a>(
&'a self,
store: &'a Store,
) -> impl Iterator<Item = Result<(Box<[u8]>, RawStateChangesWithTrieKey), std::io::Error>> + 'a
{
let prefix_len = Self::estimate_prefix_len();
debug_assert!(
self.0.len() >= prefix_len,
"Key length: {}, prefix length: {}, key: {:?}",
self.0.len(),
prefix_len,
self.0
);
store.iter_prefix_ser::<RawStateChangesWithTrieKey>(DBCol::StateChanges, &self.0).map(
move |change| {
// Split off the irrelevant part of the key, so only the original trie_key is left.
let (key, state_changes) = change?;
debug_assert!(key.starts_with(&self.0), "Key: {:?}, row key: {:?}", self.0, key);
Ok((key, state_changes))
},
)
}
}

#[cfg(test)]
mod test {
use super::*;
use std::str::FromStr;

#[test]
fn test_delayed_receipt_row_key() {
let trie_key1 = TrieKey::DelayedReceipt { index: 1 };
let trie_key2 = TrieKey::DelayedReceiptIndices {};
let shard_uid = ShardUId { version: 10, shard_id: 5 };

// Random value.
let block_hash =
CryptoHash::from_str("32222222222233333333334444444444445555555777").unwrap();

for trie_key in [trie_key1.clone(), trie_key2.clone()] {
let row_key = KeyForStateChanges::delayed_receipt_key_from_trie_key(
&block_hash,
&trie_key,
&shard_uid,
);

let got_shard_uid = KeyForStateChanges::delayed_receipt_key_decode_shard_uid(
row_key.as_ref(),
&block_hash,
&trie_key,
)
.unwrap();
assert_eq!(shard_uid, got_shard_uid);
}

// Forget to add a ShardUId to the key, fail to extra a ShardUId from the key.
let row_key_without_shard_uid = KeyForStateChanges::from_trie_key(&block_hash, &trie_key1);
assert!(KeyForStateChanges::delayed_receipt_key_decode_shard_uid(
row_key_without_shard_uid.as_ref(),
&block_hash,
&trie_key1
)
.is_err());

// Add an extra byte to the key, fail to extra a ShardUId from the key.
let mut row_key_extra_bytes = KeyForStateChanges::delayed_receipt_key_from_trie_key(
&block_hash,
&trie_key1,
&shard_uid,
);
row_key_extra_bytes.0.extend([8u8]);
assert!(KeyForStateChanges::delayed_receipt_key_decode_shard_uid(
row_key_extra_bytes.as_ref(),
&block_hash,
&trie_key1
)
.is_err());

// This is the internal detail of how delayed_receipt_key_from_trie_key() works.
let mut row_key_with_single_shard_uid =
KeyForStateChanges::from_trie_key(&block_hash, &trie_key1);
row_key_with_single_shard_uid.0.extend(shard_uid.to_bytes());
assert_eq!(
KeyForStateChanges::delayed_receipt_key_decode_shard_uid(
row_key_with_single_shard_uid.as_ref(),
&block_hash,
&trie_key1
)
.unwrap(),
shard_uid
);
}
}
2 changes: 2 additions & 0 deletions nearcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ protocol_feature_fix_contract_loading_cost = [
"near-vm-runner/protocol_feature_fix_contract_loading_cost",
]
protocol_feature_flat_state = ["near-store/protocol_feature_flat_state", "near-chain/protocol_feature_flat_state", "node-runtime/protocol_feature_flat_state"]
serialize_all_state_changes = ["near-store/serialize_all_state_changes"]

nightly = [
"nightly_protocol",
Expand All @@ -119,6 +120,7 @@ nightly = [
"protocol_feature_fix_staking_threshold",
"protocol_feature_fix_contract_loading_cost",
"protocol_feature_flat_state",
"serialize_all_state_changes",
]
nightly_protocol = [
"near-primitives/nightly_protocol",
Expand Down
Loading

0 comments on commit cd12121

Please sign in to comment.