Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dump and apply state changes for a range of blocks. #8628

Merged
merged 9 commits into from
Mar 9, 2023
Merged
83 changes: 80 additions & 3 deletions core/primitives/src/trie_key.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::hash::CryptoHash;
use crate::types::AccountId;
use std::mem::size_of;

use borsh::{BorshDeserialize, BorshSerialize};

use near_crypto::PublicKey;
use std::mem::size_of;

use crate::hash::CryptoHash;
use crate::types::AccountId;

pub(crate) const ACCOUNT_DATA_SEPARATOR: u8 = b',';

Expand Down Expand Up @@ -208,6 +211,22 @@ impl TrieKey {
self.append_into(&mut buf);
buf
}

/// Extracts account id from a TrieKey if available.
pub fn get_account_id(&self) -> Option<AccountId> {
match self {
TrieKey::Account { account_id, .. } => Some(account_id.clone()),
TrieKey::ContractCode { account_id, .. } => Some(account_id.clone()),
TrieKey::AccessKey { account_id, .. } => Some(account_id.clone()),
TrieKey::ReceivedData { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::PostponedReceiptId { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::PendingDataCount { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::PostponedReceipt { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::DelayedReceiptIndices => None,
TrieKey::DelayedReceipt { .. } => None,
TrieKey::ContractData { account_id, .. } => Some(account_id.clone()),
}
}
}

// TODO: Remove once we switch to non-raw keys everywhere.
Expand Down Expand Up @@ -624,4 +643,62 @@ mod tests {
let raw_key = key.to_vec();
assert!(trie_key_parsers::parse_account_id_from_raw_key(&raw_key).unwrap().is_none());
}

#[test]
fn test_account_id_from_trie_key() {
let account_id = OK_ACCOUNT_IDS[0].parse::<AccountId>().unwrap();

assert_eq!(
TrieKey::Account { account_id: account_id.clone() }.get_account_id(),
Some(account_id.clone())
);
assert_eq!(
TrieKey::ContractCode { account_id: account_id.clone() }.get_account_id(),
Some(account_id.clone())
);
assert_eq!(
TrieKey::AccessKey {
account_id: account_id.clone(),
public_key: PublicKey::empty(KeyType::ED25519)
}
.get_account_id(),
Some(account_id.clone())
);
assert_eq!(
TrieKey::ReceivedData { receiver_id: account_id.clone(), data_id: Default::default() }
.get_account_id(),
Some(account_id.clone())
);
assert_eq!(
TrieKey::PostponedReceiptId {
receiver_id: account_id.clone(),
data_id: Default::default()
}
.get_account_id(),
Some(account_id.clone())
);
assert_eq!(
TrieKey::PendingDataCount {
receiver_id: account_id.clone(),
receipt_id: Default::default()
}
.get_account_id(),
Some(account_id.clone())
);
assert_eq!(
TrieKey::PostponedReceipt {
receiver_id: account_id.clone(),
receipt_id: Default::default()
}
.get_account_id(),
Some(account_id.clone())
);
assert_eq!(TrieKey::DelayedReceipt { index: Default::default() }.get_account_id(), None);
assert_eq!(TrieKey::DelayedReceiptIndices.get_account_id(), None);
assert_eq!(
TrieKey::ContractData { account_id: account_id.clone(), key: Default::default() }
.get_account_id(),
Some(account_id.clone())
);
}
}
22 changes: 22 additions & 0 deletions core/primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,3 +975,25 @@ impl TrieNodesCount {
})
}
}

/// State changes for a range of blocks.
/// Expects that a block is present at most once in the list.
#[derive(borsh::BorshDeserialize, borsh::BorshSerialize)]
pub struct StateChangesForBlockRange {
pub blocks: Vec<StateChangesForBlock>,
}

/// State changes for a single block.
/// Expects that a shard is present at most once in the list of state changes.
#[derive(borsh::BorshDeserialize, borsh::BorshSerialize)]
pub struct StateChangesForBlock {
pub block_hash: CryptoHash,
pub state_changes: Vec<StateChangesForShard>,
}

/// Key and value of a StateChanges column.
#[derive(borsh::BorshDeserialize, borsh::BorshSerialize)]
pub struct StateChangesForShard {
pub shard_id: ShardId,
pub state_changes: Vec<RawStateChangesWithTrieKey>,
}
2 changes: 2 additions & 0 deletions core/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ assert_matches.workspace = true
bencher.workspace = true
insta.workspace = true
rand.workspace = true
thiserror.workspace = true

[[bench]]
name = "trie_bench"
Expand All @@ -56,6 +57,7 @@ no_cache = []
single_thread_rocksdb = [] # Deactivate RocksDB IO background threads
test_features = []
protocol_feature_flat_state = []
serialize_all_state_changes = []

nightly_protocol = []
nightly = [
Expand Down
193 changes: 170 additions & 23 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::io;
use std::rc::Rc;
use std::sync::{Arc, RwLock};

Expand Down Expand Up @@ -383,18 +382,42 @@ impl WrappedTrieChanges {
"Resharding changes must never be finalized."
);

// Filtering trie keys for user facing RPC reporting.
// NOTE: If the trie key is not one of the account specific, it may cause key conflict
// when the node tracks multiple shards. See #2563.
match &change_with_trie_key.trie_key {
TrieKey::Account { .. }
| TrieKey::ContractCode { .. }
| TrieKey::AccessKey { .. }
| TrieKey::ContractData { .. } => {}
_ => continue,
let storage_key = if cfg!(feature = "serialize_all_state_changes") {
nikurt marked this conversation as resolved.
Show resolved Hide resolved
// Serialize all kinds of state changes without any filtering.
// Without this it's not possible to replay state changes to get an identical state root.

// This branch will become the default in the near future.

match change_with_trie_key.trie_key.get_account_id() {
// If a TrieKey itself doesn't identify the Shard, then we need to add shard id to the row key.
None => KeyForStateChanges::delayed_receipt_key_from_trie_key(
&self.block_hash,
&change_with_trie_key.trie_key,
&self.shard_uid,
),
// TrieKey has enough information to identify the shard it comes from.
_ => KeyForStateChanges::from_trie_key(
&self.block_hash,
&change_with_trie_key.trie_key,
),
}
} else {
// This branch is the current neard behavior.
// Only a subset of state changes get serialized.

// Filtering trie keys for user facing RPC reporting.
// NOTE: If the trie key is not one of the account specific, it may cause key conflict
// when the node tracks multiple shards. See #2563.
match &change_with_trie_key.trie_key {
TrieKey::Account { .. }
| TrieKey::ContractCode { .. }
| TrieKey::AccessKey { .. }
| TrieKey::ContractData { .. } => {}
_ => continue,
};
KeyForStateChanges::from_trie_key(&self.block_hash, &change_with_trie_key.trie_key)
};
let storage_key =
KeyForStateChanges::from_trie_key(&self.block_hash, &change_with_trie_key.trie_key);

store_update.set(
DBCol::StateChanges,
storage_key.as_ref(),
Expand All @@ -403,7 +426,7 @@ impl WrappedTrieChanges {
}
}

pub fn trie_changes_into(&mut self, store_update: &mut StoreUpdate) -> io::Result<()> {
pub fn trie_changes_into(&mut self, store_update: &mut StoreUpdate) -> std::io::Result<()> {
store_update.set_ser(
DBCol::TrieChanges,
&shard_layout::get_block_shard_uid(&self.block_hash, &self.shard_uid),
Expand All @@ -412,6 +435,12 @@ impl WrappedTrieChanges {
}
}

#[derive(thiserror::Error, Debug)]
pub enum KeyForStateChangesError {
#[error("Row key of StateChange of kind DelayedReceipt or DelayedReceiptIndices doesn't contain ShardUId: row_key: {0:?} ; trie_key: {1:?}")]
DelayedReceiptRowKeyError(Vec<u8>, TrieKey),
}

#[derive(derive_more::AsRef, derive_more::Into)]
pub struct KeyForStateChanges(Vec<u8>);

Expand Down Expand Up @@ -443,22 +472,46 @@ impl KeyForStateChanges {
key
}

/// Without changing the existing TrieKey format, encodes ShardUId into the row key.
/// See `delayed_receipt_key_from_trie_key` for decoding this row key.
pub fn delayed_receipt_key_from_trie_key(
block_hash: &CryptoHash,
trie_key: &TrieKey,
shard_uid: &ShardUId,
) -> Self {
let mut key = Self::new(block_hash, trie_key.len() + std::mem::size_of::<ShardUId>());
trie_key.append_into(&mut key.0);
key.0.extend(shard_uid.to_bytes());
key
}

/// Extracts ShardUId from row key which contains ShardUId encoded.
/// See `delayed_receipt_key_from_trie_key` for encoding ShardUId into the row key.
pub fn delayed_receipt_key_decode_shard_uid(
row_key: &[u8],
block_hash: &CryptoHash,
trie_key: &TrieKey,
) -> Result<ShardUId, KeyForStateChangesError> {
let prefix = KeyForStateChanges::from_trie_key(block_hash, trie_key);
let prefix = prefix.as_ref();

let suffix = &row_key[prefix.len()..];
let shard_uid = ShardUId::try_from(suffix).map_err(|_err| {
KeyForStateChangesError::DelayedReceiptRowKeyError(row_key.to_vec(), trie_key.clone())
})?;
Ok(shard_uid)
}

/// Iterates over deserialized row values where row key matches `self`.
pub fn find_iter<'a>(
&'a self,
store: &'a Store,
) -> impl Iterator<Item = Result<RawStateChangesWithTrieKey, std::io::Error>> + 'a {
let prefix_len = Self::estimate_prefix_len();
debug_assert!(self.0.len() >= prefix_len);
store.iter_prefix_ser::<RawStateChangesWithTrieKey>(DBCol::StateChanges, &self.0).map(
move |change| {
// Split off the irrelevant part of the key, so only the original trie_key is left.
let (key, state_changes) = change?;
debug_assert!(key.starts_with(&self.0));
Ok(state_changes)
},
)
// Split off the irrelevant part of the key, so only the original trie_key is left.
self.find_rows_iter(store).map(|row| row.map(|kv| kv.1))
}

/// Iterates over deserialized row values where the row key matches `self` exactly.
pub fn find_exact_iter<'a>(
&'a self,
store: &'a Store,
Expand All @@ -480,4 +533,98 @@ impl KeyForStateChanges {
}
})
}

/// Iterates over pairs of `(row key, deserialized row value)` where row key matches `self`.
pub fn find_rows_iter<'a>(
nikurt marked this conversation as resolved.
Show resolved Hide resolved
&'a self,
store: &'a Store,
) -> impl Iterator<Item = Result<(Box<[u8]>, RawStateChangesWithTrieKey), std::io::Error>> + 'a
{
let prefix_len = Self::estimate_prefix_len();
debug_assert!(
self.0.len() >= prefix_len,
"Key length: {}, prefix length: {}, key: {:?}",
self.0.len(),
prefix_len,
self.0
);
store.iter_prefix_ser::<RawStateChangesWithTrieKey>(DBCol::StateChanges, &self.0).map(
move |change| {
// Split off the irrelevant part of the key, so only the original trie_key is left.
let (key, state_changes) = change?;
debug_assert!(key.starts_with(&self.0), "Key: {:?}, row key: {:?}", self.0, key);
Ok((key, state_changes))
},
)
}
}

#[cfg(test)]
mod test {
use super::*;
use std::str::FromStr;

#[test]
fn test_delayed_receipt_row_key() {
let trie_key1 = TrieKey::DelayedReceipt { index: 1 };
let trie_key2 = TrieKey::DelayedReceiptIndices {};
let shard_uid = ShardUId { version: 10, shard_id: 5 };

// Random value.
let block_hash =
CryptoHash::from_str("32222222222233333333334444444444445555555777").unwrap();

for trie_key in [trie_key1.clone(), trie_key2.clone()] {
let row_key = KeyForStateChanges::delayed_receipt_key_from_trie_key(
&block_hash,
&trie_key,
&shard_uid,
);

let got_shard_uid = KeyForStateChanges::delayed_receipt_key_decode_shard_uid(
row_key.as_ref(),
&block_hash,
&trie_key,
)
.unwrap();
assert_eq!(shard_uid, got_shard_uid);
}

// Forget to add a ShardUId to the key, fail to extra a ShardUId from the key.
let row_key_without_shard_uid = KeyForStateChanges::from_trie_key(&block_hash, &trie_key1);
assert!(KeyForStateChanges::delayed_receipt_key_decode_shard_uid(
row_key_without_shard_uid.as_ref(),
&block_hash,
&trie_key1
)
.is_err());

// Add an extra byte to the key, fail to extra a ShardUId from the key.
let mut row_key_extra_bytes = KeyForStateChanges::delayed_receipt_key_from_trie_key(
&block_hash,
&trie_key1,
&shard_uid,
);
row_key_extra_bytes.0.extend([8u8]);
assert!(KeyForStateChanges::delayed_receipt_key_decode_shard_uid(
row_key_extra_bytes.as_ref(),
&block_hash,
&trie_key1
)
.is_err());

// This is the internal detail of how delayed_receipt_key_from_trie_key() works.
let mut row_key_with_single_shard_uid =
KeyForStateChanges::from_trie_key(&block_hash, &trie_key1);
row_key_with_single_shard_uid.0.extend(shard_uid.to_bytes());
assert_eq!(
KeyForStateChanges::delayed_receipt_key_decode_shard_uid(
row_key_with_single_shard_uid.as_ref(),
&block_hash,
&trie_key1
)
.unwrap(),
shard_uid
);
}
}
2 changes: 2 additions & 0 deletions nearcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ protocol_feature_fix_contract_loading_cost = [
"near-vm-runner/protocol_feature_fix_contract_loading_cost",
]
protocol_feature_flat_state = ["near-store/protocol_feature_flat_state", "near-chain/protocol_feature_flat_state", "node-runtime/protocol_feature_flat_state"]
serialize_all_state_changes = ["near-store/serialize_all_state_changes"]

nightly = [
"nightly_protocol",
Expand All @@ -119,6 +120,7 @@ nightly = [
"protocol_feature_fix_staking_threshold",
"protocol_feature_fix_contract_loading_cost",
"protocol_feature_flat_state",
"serialize_all_state_changes",
]
nightly_protocol = [
"near-primitives/nightly_protocol",
Expand Down
Loading