-
Notifications
You must be signed in to change notification settings - Fork 618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(resharding): state-stats improvements #10119
Merged
Merged
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,8 @@ use crate::tx_dump::dump_tx_from_block; | |
use crate::{apply_chunk, epoch_info}; | ||
use ansi_term::Color::Red; | ||
use bytesize::ByteSize; | ||
use itertools::GroupBy; | ||
use itertools::Itertools; | ||
use near_chain::chain::collect_receipts_from_response; | ||
use near_chain::migrations::check_if_block_is_first_with_chunk_of_version; | ||
use near_chain::types::ApplyTransactionResult; | ||
|
@@ -27,9 +29,12 @@ use near_primitives::sharding::ChunkHash; | |
use near_primitives::state::FlatStateValue; | ||
use near_primitives::state_record::state_record_to_account_id; | ||
use near_primitives::state_record::StateRecord; | ||
use near_primitives::trie_key::col::NON_DELAYED_RECEIPT_COLUMNS; | ||
use near_primitives::trie_key::TrieKey; | ||
use near_primitives::types::{chunk_extra::ChunkExtra, BlockHeight, ShardId, StateRoot}; | ||
use near_primitives_core::types::Gas; | ||
use near_store::flat::FlatStorageChunkView; | ||
use near_store::flat::FlatStorageManager; | ||
use near_store::test_utils::create_test_store; | ||
use near_store::TrieStorage; | ||
use near_store::{DBCol, Store, Trie, TrieCache, TrieCachingStorage, TrieConfig, TrieDBStorage}; | ||
|
@@ -1062,52 +1067,150 @@ pub(crate) fn clear_cache(store: Store) { | |
store_update.commit().unwrap(); | ||
} | ||
|
||
#[derive(PartialEq, Eq)] | ||
pub struct StateStatsStateRecord { | ||
key: Vec<u8>, | ||
value: Vec<u8>, | ||
} | ||
/// Prints the state statistics for all shards. Please note that it relies on | ||
/// the live flat storage and may break if the node is not stopped. | ||
pub(crate) fn print_state_stats(home_dir: &Path, store: Store, near_config: NearConfig) { | ||
let (epoch_manager, runtime, _, block_header) = | ||
load_trie(store.clone(), home_dir, &near_config); | ||
|
||
impl StateStatsStateRecord { | ||
pub fn size(&self) -> ByteSize { | ||
ByteSize::b(self.key.len() as u64 + self.value.len() as u64) | ||
let block_hash = *block_header.hash(); | ||
let shard_layout = epoch_manager.get_shard_layout_from_prev_block(&block_hash).unwrap(); | ||
|
||
let flat_storage_manager = runtime.get_flat_storage_manager(); | ||
for shard_uid in shard_layout.get_shard_uids() { | ||
print_state_stats_for_shard_uid(&store, &flat_storage_manager, block_hash, shard_uid); | ||
} | ||
} | ||
|
||
impl Ord for StateStatsStateRecord { | ||
fn cmp(&self, other: &Self) -> std::cmp::Ordering { | ||
self.size().cmp(&other.size()).reverse() | ||
/// Prints the state statistics for a single shard. | ||
fn print_state_stats_for_shard_uid( | ||
store: &Store, | ||
flat_storage_manager: &FlatStorageManager, | ||
block_hash: CryptoHash, | ||
shard_uid: ShardUId, | ||
) { | ||
flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap(); | ||
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid); | ||
let chunk_view = flat_storage_manager.chunk_view(shard_uid, block_hash).unwrap(); | ||
|
||
let mut state_stats = StateStats::default(); | ||
|
||
// iteratate for the first time to get the size statistics | ||
let group_by = get_state_stats_group_by(&chunk_view, &trie_storage); | ||
let iter = get_state_stats_account_iter(&group_by); | ||
for state_stats_account in iter { | ||
state_stats.push(state_stats_account); | ||
} | ||
} | ||
|
||
impl PartialOrd for StateStatsStateRecord { | ||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { | ||
Some(self.cmp(other)) | ||
// iterate for the second time to find the middle account | ||
let group_by = get_state_stats_group_by(&chunk_view, &trie_storage); | ||
let iter = get_state_stats_account_iter(&group_by); | ||
let mut current_size = ByteSize::default(); | ||
for state_stats_account in iter { | ||
current_size += state_stats_account.size; | ||
if 2 * current_size.as_u64() > state_stats.total_size.as_u64() { | ||
state_stats.middle_state_record = Some(state_stats_account); | ||
break; | ||
} | ||
} | ||
|
||
tracing::info!(target: "state_viewer", "{shard_uid:?}"); | ||
tracing::info!(target: "state_viewer", "{state_stats:#?}"); | ||
} | ||
|
||
impl std::fmt::Debug for StateStatsStateRecord { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
let state_record = StateRecord::from_raw_key_value(self.key.clone(), self.value.clone()); | ||
/// Gets the flat state iterator from the chunk view, rearranges it to be sorted | ||
/// by the account id, rather than type, account id and finally groups the | ||
/// records by account id while collecting aggregate statistics. | ||
fn get_state_stats_group_by<'a>( | ||
chunk_view: &'a FlatStorageChunkView, | ||
trie_storage: &'a TrieDBStorage, | ||
) -> GroupBy< | ||
AccountId, | ||
impl Iterator<Item = StateStatsStateRecord> + 'a, | ||
impl FnMut(&StateStatsStateRecord) -> AccountId, | ||
> { | ||
// The flat state iterator is sorted by type, account id. In order to | ||
// rearrange it we get the iterators for each type and later merge them by | ||
// the account id. | ||
let type_iters = NON_DELAYED_RECEIPT_COLUMNS | ||
.iter() | ||
.map(|(type_byte, _)| { | ||
chunk_view.iter_flat_state_entries(Some(&[*type_byte]), Some(&[*type_byte + 1])) | ||
}) | ||
.into_iter(); | ||
|
||
// Filter out any errors. | ||
let type_iters = type_iters.map(|type_iter| type_iter.filter_map(|item| item.ok())).into_iter(); | ||
|
||
// Read the values from and convert items to StateStatsStateRecord. | ||
let type_iters = type_iters | ||
.map(move |type_iter| { | ||
type_iter.filter_map(move |(key, value)| { | ||
let value = read_flat_state_value(&trie_storage, value); | ||
let key_size = key.len() as u64; | ||
let value_size = value.len() as u64; | ||
let size = ByteSize::b(key_size + value_size); | ||
let state_record = StateRecord::from_raw_key_value(key, value); | ||
state_record.map(|state_record| StateStatsStateRecord { | ||
account_id: state_record_to_account_id(&state_record).clone(), | ||
state_record, | ||
size, | ||
}) | ||
}) | ||
}) | ||
.into_iter(); | ||
|
||
let Some(state_record) = state_record else { return None::<StateRecord>.fmt(f) }; | ||
// Merge the iterators for different types. The StateStatsStateRecord | ||
// implements the Ord and PartialOrd traits that compare items by their | ||
// account ids. | ||
let iter = type_iters.kmerge().into_iter(); | ||
|
||
f.debug_struct("StateStatsStateRecord") | ||
.field("account_id", &state_record_to_account_id(&state_record).as_str()) | ||
.field("type", &state_record.get_type_string()) | ||
.field("size", &self.size()) | ||
.finish() | ||
// Finally, group by the account id. | ||
iter.group_by(|state_record| state_record.account_id.clone()) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would group_by |
||
} | ||
|
||
/// Given the StateStatsStateRecords grouped by the account id returns an | ||
/// iterator of StateStatsAccount. | ||
fn get_state_stats_account_iter<'a>( | ||
group_by: &'a GroupBy< | ||
AccountId, | ||
impl Iterator<Item = StateStatsStateRecord> + 'a, | ||
impl FnMut(&StateStatsStateRecord) -> AccountId, | ||
>, | ||
) -> impl Iterator<Item = StateStatsAccount> + 'a { | ||
// aggregate size for each account id group | ||
group_by | ||
.into_iter() | ||
.map(|(account_id, group)| { | ||
let mut size = ByteSize::b(0); | ||
for state_stats_state_record in group { | ||
size += state_stats_state_record.size; | ||
} | ||
StateStatsAccount { account_id, size } | ||
}) | ||
.into_iter() | ||
} | ||
|
||
/// Helper function to read the value from flat storage. | ||
/// It either returns the inlined value or reads ref value from the storage. | ||
fn read_flat_state_value( | ||
trie_storage: &TrieDBStorage, | ||
flat_state_value: FlatStateValue, | ||
) -> Vec<u8> { | ||
match flat_state_value { | ||
FlatStateValue::Ref(val) => trie_storage.retrieve_raw_bytes(&val.hash).unwrap().to_vec(), | ||
FlatStateValue::Inlined(val) => val, | ||
} | ||
} | ||
|
||
/// StateStats is used for storing the state statistics of a single trie. | ||
#[derive(Default)] | ||
pub struct StateStats { | ||
pub total_size: ByteSize, | ||
pub total_count: usize, | ||
|
||
pub top_state_records: BinaryHeap<StateStatsStateRecord>, | ||
|
||
pub middle_state_record: Option<StateStatsStateRecord>, | ||
pub middle_state_record: Option<StateStatsAccount>, | ||
pub top_accounts: BinaryHeap<StateStatsAccount>, | ||
} | ||
|
||
impl core::fmt::Debug for StateStats { | ||
|
@@ -1123,84 +1226,70 @@ impl core::fmt::Debug for StateStats { | |
.field("total_count", &self.total_count) | ||
.field("average_size", &average_size) | ||
.field("middle_state_record", &self.middle_state_record.as_ref().unwrap()) | ||
.field("top_state_records", &self.top_state_records) | ||
.field("top_accounts", &self.top_accounts) | ||
.finish() | ||
} | ||
} | ||
|
||
impl StateStats { | ||
pub fn new() -> Self { | ||
Self::default() | ||
} | ||
|
||
pub fn push(&mut self, key: Vec<u8>, value: Vec<u8>) -> () { | ||
self.total_size += key.len() as u64 + value.len() as u64; | ||
pub fn push(&mut self, state_stats_account: StateStatsAccount) { | ||
self.total_size += state_stats_account.size; | ||
self.total_count += 1; | ||
|
||
self.top_state_records.push(StateStatsStateRecord { key, value }); | ||
if self.top_state_records.len() > 5 { | ||
self.top_state_records.pop(); | ||
self.top_accounts.push(state_stats_account); | ||
if self.top_accounts.len() > 5 { | ||
self.top_accounts.pop(); | ||
} | ||
} | ||
} | ||
|
||
fn read_flat_state_value( | ||
trie_storage: &TrieDBStorage, | ||
flat_state_value: FlatStateValue, | ||
) -> Vec<u8> { | ||
match flat_state_value { | ||
FlatStateValue::Ref(val) => trie_storage.retrieve_raw_bytes(&val.hash).unwrap().to_vec(), | ||
FlatStateValue::Inlined(val) => val, | ||
} | ||
/// StateStatsStateRecord stores the state record and associated information. | ||
/// It's used as a helper struct for merging state records from different record types. | ||
#[derive(Eq, PartialEq)] | ||
struct StateStatsStateRecord { | ||
state_record: StateRecord, | ||
account_id: AccountId, | ||
size: ByteSize, | ||
} | ||
|
||
pub(crate) fn print_state_stats(home_dir: &Path, store: Store, near_config: NearConfig) { | ||
let (epoch_manager, runtime, _, block_header) = | ||
load_trie(store.clone(), home_dir, &near_config); | ||
|
||
let block_hash = *block_header.hash(); | ||
let shard_layout = epoch_manager.get_shard_layout_from_prev_block(&block_hash).unwrap(); | ||
|
||
for shard_uid in shard_layout.get_shard_uids() { | ||
let flat_storage_manager = runtime.get_flat_storage_manager(); | ||
flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap(); | ||
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid); | ||
let chunk_view = flat_storage_manager.chunk_view(shard_uid, block_hash).unwrap(); | ||
|
||
let mut state_stats = StateStats::new(); | ||
|
||
let iter = chunk_view.iter_flat_state_entries(None, None); | ||
for item in iter { | ||
let Ok((key, value)) = item else { | ||
continue; | ||
}; | ||
|
||
let value = read_flat_state_value(&trie_storage, value); | ||
|
||
state_stats.push(key, value); | ||
} | ||
impl Ord for StateStatsStateRecord { | ||
fn cmp(&self, other: &Self) -> std::cmp::Ordering { | ||
self.account_id.cmp(&other.account_id) | ||
} | ||
} | ||
|
||
let middle_size = state_stats.total_size.as_u64() / 2; | ||
impl PartialOrd for StateStatsStateRecord { | ||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { | ||
Some(self.cmp(other)) | ||
} | ||
} | ||
|
||
let mut current_size = 0; | ||
let iter = chunk_view.iter_flat_state_entries(None, None); | ||
for item in iter { | ||
let Ok((key, value)) = item else { | ||
tracing::warn!(target: "state_viewer", ?item, "error in iter item"); | ||
continue; | ||
}; | ||
/// StateStatsAccount stores aggregated information about an account. | ||
/// It is the result of the grouping of state records belonging to the same account. | ||
#[derive(PartialEq, Eq)] | ||
pub struct StateStatsAccount { | ||
pub account_id: AccountId, | ||
pub size: ByteSize, | ||
} | ||
|
||
let value = read_flat_state_value(&trie_storage, value); | ||
impl Ord for StateStatsAccount { | ||
fn cmp(&self, other: &Self) -> std::cmp::Ordering { | ||
self.size.cmp(&other.size).reverse() | ||
} | ||
} | ||
|
||
current_size += key.len() + value.len(); | ||
if middle_size <= current_size as u64 { | ||
state_stats.middle_state_record = Some(StateStatsStateRecord { key, value }); | ||
break; | ||
} | ||
} | ||
impl PartialOrd for StateStatsAccount { | ||
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> { | ||
Some(self.cmp(other)) | ||
} | ||
} | ||
|
||
tracing::info!(target: "state_viewer", "{shard_uid:?}"); | ||
tracing::info!(target: "state_viewer", "{state_stats:#?}"); | ||
impl std::fmt::Debug for StateStatsAccount { | ||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
f.debug_struct("StateStatsStateRecord") | ||
.field("account_id", &self.account_id.as_str()) | ||
.field("size", &self.size) | ||
.finish() | ||
} | ||
} | ||
|
||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aah this is very clever! Took me some time to understand what's going on, had to read up on the kmerge and group_by functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me real long to get the types, lifespans and whatnots in order but yeah it's pretty now :)