Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Dump and apply state changes for a range of blocks. #8628

Merged
merged 9 commits into from
Mar 9, 2023
1 change: 1 addition & 0 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub mod debug;
mod info;
mod metrics;
mod rocksdb_metrics;
pub mod state_changes;
pub mod sync;
pub mod test_utils;
#[cfg(test)]
Expand Down
39 changes: 39 additions & 0 deletions chain/client/src/state_changes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::hash::CryptoHash;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::{AccountId, EpochId, ShardId};
use near_store::KeyForStateChanges;

pub fn get_state_change_shard_id(
nikurt marked this conversation as resolved.
Show resolved Hide resolved
row_key: &[u8],
trie_key: &TrieKey,
block_hash: &CryptoHash,
epoch_id: &EpochId,
epoch_manager: &dyn EpochManagerAdapter,
) -> Result<ShardId, near_chain_primitives::error::Error> {
tracing::info!(target: "get_state_change_shard_id", ?trie_key);
nikurt marked this conversation as resolved.
Show resolved Hide resolved
if let Some(account_id) = get_account_id_from_trie_key(trie_key) {
let shard_id = epoch_manager.account_id_to_shard_id(&account_id, epoch_id)?;
Ok(shard_id)
} else {
let shard_uid =
KeyForStateChanges::delayed_receipt_key_decode_shard_uid(row_key, block_hash, trie_key)
.map_err(|err| near_chain_primitives::error::Error::Other(err.to_string()))?;
Ok(shard_uid.shard_id as ShardId)
}
}

fn get_account_id_from_trie_key(trie_key: &TrieKey) -> Option<AccountId> {
nikurt marked this conversation as resolved.
Show resolved Hide resolved
match trie_key {
TrieKey::Account { account_id, .. } => Some(account_id.clone()),
TrieKey::ContractCode { account_id, .. } => Some(account_id.clone()),
TrieKey::AccessKey { account_id, .. } => Some(account_id.clone()),
TrieKey::ReceivedData { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::PostponedReceiptId { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::PendingDataCount { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::PostponedReceipt { receiver_id, .. } => Some(receiver_id.clone()),
TrieKey::DelayedReceiptIndices => None,
TrieKey::DelayedReceipt { .. } => None,
TrieKey::ContractData { account_id, .. } => Some(account_id.clone()),
}
}
22 changes: 22 additions & 0 deletions core/primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -975,3 +975,25 @@ impl TrieNodesCount {
})
}
}

/// State changes for a range of blocks.
/// Expects that a block is present at most once in the list.
#[derive(borsh::BorshDeserialize, borsh::BorshSerialize)]
pub struct StateChangesForBlockRange {
pub blocks: Vec<StateChangesForBlock>,
}

/// State changes for a single block.
/// Expects that a shard is present at most once in the list of state changes.
#[derive(borsh::BorshDeserialize, borsh::BorshSerialize)]
pub struct StateChangesForBlock {
pub block_hash: CryptoHash,
pub state_changes: Vec<StateChangesForShard>,
}

/// Key and value of a StateChanges column.
#[derive(borsh::BorshDeserialize, borsh::BorshSerialize)]
pub struct StateChangesForShard {
pub shard_id: ShardId,
pub state_changes: Vec<RawStateChangesWithTrieKey>,
}
2 changes: 2 additions & 0 deletions core/store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ assert_matches.workspace = true
bencher.workspace = true
insta.workspace = true
rand.workspace = true
thiserror.workspace = true

[[bench]]
name = "trie_bench"
Expand All @@ -56,6 +57,7 @@ no_cache = []
single_thread_rocksdb = [] # Deactivate RocksDB IO background threads
test_features = []
protocol_feature_flat_state = []
serialize_all_state_changes = []

nightly_protocol = []
nightly = [
Expand Down
173 changes: 150 additions & 23 deletions core/store/src/trie/shard_tries.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use std::io;
use std::rc::Rc;
use std::sync::{Arc, RwLock};

Expand Down Expand Up @@ -382,18 +381,34 @@ impl WrappedTrieChanges {
"Resharding changes must never be finalized."
);

// Filtering trie keys for user facing RPC reporting.
// NOTE: If the trie key is not one of the account specific, it may cause key conflict
// when the node tracks multiple shards. See #2563.
match &change_with_trie_key.trie_key {
TrieKey::Account { .. }
| TrieKey::ContractCode { .. }
| TrieKey::AccessKey { .. }
| TrieKey::ContractData { .. } => {}
_ => continue,
let storage_key = if cfg!(feature = "serialize_all_state_changes") {
nikurt marked this conversation as resolved.
Show resolved Hide resolved
match &change_with_trie_key.trie_key {
nikurt marked this conversation as resolved.
Show resolved Hide resolved
TrieKey::DelayedReceiptIndices | TrieKey::DelayedReceipt { .. } => {
KeyForStateChanges::delayed_receipt_key_from_trie_key(
&self.block_hash,
&change_with_trie_key.trie_key,
&self.shard_uid,
)
}
_ => KeyForStateChanges::from_trie_key(
&self.block_hash,
&change_with_trie_key.trie_key,
),
}
} else {
// Filtering trie keys for user facing RPC reporting.
// NOTE: If the trie key is not one of the account specific, it may cause key conflict
// when the node tracks multiple shards. See #2563.
match &change_with_trie_key.trie_key {
TrieKey::Account { .. }
| TrieKey::ContractCode { .. }
| TrieKey::AccessKey { .. }
| TrieKey::ContractData { .. } => {}
_ => continue,
};
KeyForStateChanges::from_trie_key(&self.block_hash, &change_with_trie_key.trie_key)
};
let storage_key =
KeyForStateChanges::from_trie_key(&self.block_hash, &change_with_trie_key.trie_key);

store_update.set(
DBCol::StateChanges,
storage_key.as_ref(),
Expand All @@ -402,7 +417,7 @@ impl WrappedTrieChanges {
}
}

pub fn trie_changes_into(&mut self, store_update: &mut StoreUpdate) -> io::Result<()> {
pub fn trie_changes_into(&mut self, store_update: &mut StoreUpdate) -> std::io::Result<()> {
store_update.set_ser(
DBCol::TrieChanges,
&shard_layout::get_block_shard_uid(&self.block_hash, &self.shard_uid),
Expand All @@ -411,6 +426,12 @@ impl WrappedTrieChanges {
}
}

#[derive(thiserror::Error, Debug)]
pub enum KeyForStateChangesError {
#[error("Row key of StateChange of kind DelayedReceipt or DelayedReceiptIndices doesn't contain ShardUId: row_key: {0:?} ; trie_key: {1:?}")]
DelayedReceiptRowKeyError(Vec<u8>, TrieKey),
}

#[derive(derive_more::AsRef, derive_more::Into)]
pub struct KeyForStateChanges(Vec<u8>);

Expand Down Expand Up @@ -442,20 +463,65 @@ impl KeyForStateChanges {
key
}

/// Without changing the existing TrieKey format, encodes ShardUId into the row key.
/// See `delayed_receipt_key_from_trie_key` for decoding this row key.
pub fn delayed_receipt_key_from_trie_key(
block_hash: &CryptoHash,
trie_key: &TrieKey,
shard_uid: &ShardUId,
) -> Self {
let mut key = Self::new(block_hash, trie_key.len());
nikurt marked this conversation as resolved.
Show resolved Hide resolved
trie_key.append_into(&mut key.0);
key.0.extend(shard_uid.version.to_le_bytes());
key.0.extend(shard_uid.shard_id.to_le_bytes());
key
}

/// Extracts ShardUId from row key which contains ShardIOd encoded.
nikurt marked this conversation as resolved.
Show resolved Hide resolved
/// See `delayed_receipt_key_from_trie_key` for encoding ShardUId into the row key.
pub fn delayed_receipt_key_decode_shard_uid(
row_key: &[u8],
block_hash: &CryptoHash,
trie_key: &TrieKey,
) -> Result<ShardUId, KeyForStateChangesError> {
let prefix = KeyForStateChanges::from_trie_key(block_hash, trie_key);
let prefix = prefix.as_ref();

let suffix = &row_key[prefix.len()..];
if suffix.is_empty() {
return Err(KeyForStateChangesError::DelayedReceiptRowKeyError(
row_key.to_vec(),
trie_key.clone(),
));
}
let version_chunk = &suffix[..std::mem::size_of::<ShardVersion>()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code (505-523) should be put in a method of ShardUId ('deserialize_from_key'?)

And similarly - line 483 + 484 should be moved to ShardUid method too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, as that code is already available as to_bytes() and try_from

let version = ShardVersion::from_le_bytes(version_chunk.try_into().map_err(|err| {
tracing::debug!(target: "state-changes", ?err, "Can't read `version` from row key of Delayed Receipt state change");
KeyForStateChangesError::DelayedReceiptRowKeyError(row_key.to_vec(),trie_key.clone())
})?);

let suffix = &suffix[std::mem::size_of::<ShardVersion>()..];
if suffix.is_empty() {
return Err(KeyForStateChangesError::DelayedReceiptRowKeyError(
row_key.to_vec(),
trie_key.clone(),
));
}
let shard_id_chunk = &suffix[..std::mem::size_of::<u32>()];
let shard_id = u32::from_le_bytes(shard_id_chunk.try_into().map_err(|err| {
tracing::debug!(target: "state-changes", ?err, "Can't read `shard_id` from row key of Delayed Receipt state change");
KeyForStateChangesError::DelayedReceiptRowKeyError(row_key.to_vec(),trie_key.clone())
})?);

Ok(ShardUId { shard_id, version })
}

pub fn find_iter<'a>(
&'a self,
store: &'a Store,
) -> impl Iterator<Item = Result<RawStateChangesWithTrieKey, std::io::Error>> + 'a {
let prefix_len = Self::estimate_prefix_len();
debug_assert!(self.0.len() >= prefix_len);
store.iter_prefix_ser::<RawStateChangesWithTrieKey>(DBCol::StateChanges, &self.0).map(
move |change| {
// Split off the irrelevant part of the key, so only the original trie_key is left.
let (key, state_changes) = change?;
debug_assert!(key.starts_with(&self.0));
Ok(state_changes)
},
)
// Split off the irrelevant part of the key, so only the original trie_key is left.
self.find_rows_iter(store).map(|row| row.map(|kv| kv.1))
}

pub fn find_exact_iter<'a>(
Expand All @@ -479,4 +545,65 @@ impl KeyForStateChanges {
}
})
}

pub fn find_rows_iter<'a>(
nikurt marked this conversation as resolved.
Show resolved Hide resolved
&'a self,
store: &'a Store,
) -> impl Iterator<Item = Result<(Box<[u8]>, RawStateChangesWithTrieKey), std::io::Error>> + 'a
{
let prefix_len = Self::estimate_prefix_len();
debug_assert!(self.0.len() >= prefix_len);
nikurt marked this conversation as resolved.
Show resolved Hide resolved
store.iter_prefix_ser::<RawStateChangesWithTrieKey>(DBCol::StateChanges, &self.0).map(
move |change| {
// Split off the irrelevant part of the key, so only the original trie_key is left.
let (key, state_changes) = change?;
debug_assert!(key.starts_with(&self.0));
nikurt marked this conversation as resolved.
Show resolved Hide resolved
Ok((key, state_changes))
},
)
}
}

#[cfg(test)]
mod test {
use super::*;
use std::str::FromStr;

#[test]
fn test_delayed_receipt_row_key() {
let trie_key1 = TrieKey::DelayedReceipt { index: 1 };
let trie_key2 = TrieKey::DelayedReceiptIndices {};
let shard_uid = ShardUId { version: 10, shard_id: 5 };

// Random value.
let block_hash =
CryptoHash::from_str("32222222222233333333334444444444445555555777").unwrap();

let row_key1 = KeyForStateChanges::delayed_receipt_key_from_trie_key(
&block_hash,
&trie_key1,
&shard_uid,
);
let row_key2 = KeyForStateChanges::delayed_receipt_key_from_trie_key(
&block_hash,
&trie_key2,
&shard_uid,
);

let got_shard_uid1 = KeyForStateChanges::delayed_receipt_key_decode_shard_uid(
row_key1.as_ref(),
&block_hash,
&trie_key1,
)
.unwrap();
let got_shard_uid2 = KeyForStateChanges::delayed_receipt_key_decode_shard_uid(
nikurt marked this conversation as resolved.
Show resolved Hide resolved
row_key2.as_ref(),
&block_hash,
&trie_key2,
)
.unwrap();

assert_eq!(shard_uid, got_shard_uid1);
assert_eq!(shard_uid, got_shard_uid2);
}
}
2 changes: 2 additions & 0 deletions nearcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ protocol_feature_fix_contract_loading_cost = [
"near-vm-runner/protocol_feature_fix_contract_loading_cost",
]
protocol_feature_flat_state = ["near-store/protocol_feature_flat_state", "near-chain/protocol_feature_flat_state", "node-runtime/protocol_feature_flat_state"]
serialize_all_state_changes = ["near-store/serialize_all_state_changes"]

nightly = [
"nightly_protocol",
Expand All @@ -119,6 +120,7 @@ nightly = [
"protocol_feature_fix_staking_threshold",
"protocol_feature_fix_contract_loading_cost",
"protocol_feature_flat_state",
"serialize_all_state_changes",
]
nightly_protocol = [
"near-primitives/nightly_protocol",
Expand Down
1 change: 1 addition & 0 deletions neard/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ delay_detector = ["nearcore/delay_detector"]
rosetta_rpc = ["nearcore/rosetta_rpc"]
json_rpc = ["nearcore/json_rpc"]
protocol_feature_fix_staking_threshold = ["nearcore/protocol_feature_fix_staking_threshold"]
serialize_all_state_changes = ["nearcore/serialize_all_state_changes"]
protocol_feature_flat_state = ["nearcore/protocol_feature_flat_state", "near-flat-storage/protocol_feature_flat_state"]

nightly = [
Expand Down
1 change: 1 addition & 0 deletions tools/state-viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tracing.workspace = true

near-chain = { path = "../../chain/chain" }
near-chain-configs = { path = "../../core/chain-configs" }
near-client = { path = "../../chain/client" }
near-crypto = { path = "../../core/crypto" }
near-epoch-manager = { path = "../../chain/epoch-manager" }
near-network = { path = "../../chain/network" }
Expand Down
Loading