Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(resharding): state-stats improvements #10119

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub const EPOCH_START_INFO_BLOCKS: u64 = 500;

/// Defines whether in case of adversarial block production invalid blocks can
/// be produced.
#[cfg(feature = "test_features")]
#[derive(PartialEq, Eq)]
pub enum AdvProduceBlocksMode {
All,
Expand Down
4 changes: 3 additions & 1 deletion chain/client/src/client_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use crate::adapter::{
BlockApproval, BlockHeadersResponse, BlockResponse, ProcessTxRequest, ProcessTxResponse,
RecvChallenge, SetNetworkInfo, StateResponse,
};
use crate::client::{AdvProduceBlocksMode, Client, EPOCH_START_INFO_BLOCKS};
#[cfg(feature = "test_features")]
use crate::client::AdvProduceBlocksMode;
use crate::client::{Client, EPOCH_START_INFO_BLOCKS};
use crate::config_updater::ConfigUpdater;
use crate::debug::new_network_info_view;
use crate::info::{display_sync_status, InfoHelper};
Expand Down
2 changes: 1 addition & 1 deletion core/primitives/src/state_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::fmt::{Display, Formatter};

/// Record in the state storage.
#[serde_as]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)]
pub enum StateRecord {
/// Account information.
Account { account_id: AccountId, account: Account },
Expand Down
263 changes: 176 additions & 87 deletions tools/state-viewer/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use crate::tx_dump::dump_tx_from_block;
use crate::{apply_chunk, epoch_info};
use ansi_term::Color::Red;
use bytesize::ByteSize;
use itertools::GroupBy;
use itertools::Itertools;
use near_chain::chain::collect_receipts_from_response;
use near_chain::migrations::check_if_block_is_first_with_chunk_of_version;
use near_chain::types::ApplyTransactionResult;
Expand All @@ -27,9 +29,12 @@ use near_primitives::sharding::ChunkHash;
use near_primitives::state::FlatStateValue;
use near_primitives::state_record::state_record_to_account_id;
use near_primitives::state_record::StateRecord;
use near_primitives::trie_key::col::NON_DELAYED_RECEIPT_COLUMNS;
use near_primitives::trie_key::TrieKey;
use near_primitives::types::{chunk_extra::ChunkExtra, BlockHeight, ShardId, StateRoot};
use near_primitives_core::types::Gas;
use near_store::flat::FlatStorageChunkView;
use near_store::flat::FlatStorageManager;
use near_store::test_utils::create_test_store;
use near_store::TrieStorage;
use near_store::{DBCol, Store, Trie, TrieCache, TrieCachingStorage, TrieConfig, TrieDBStorage};
Expand Down Expand Up @@ -1062,52 +1067,150 @@ pub(crate) fn clear_cache(store: Store) {
store_update.commit().unwrap();
}

#[derive(PartialEq, Eq)]
pub struct StateStatsStateRecord {
key: Vec<u8>,
value: Vec<u8>,
}
/// Prints the state statistics for all shards. Please note that it relies on
/// the live flat storage and may break if the node is not stopped.
pub(crate) fn print_state_stats(home_dir: &Path, store: Store, near_config: NearConfig) {
let (epoch_manager, runtime, _, block_header) =
load_trie(store.clone(), home_dir, &near_config);

impl StateStatsStateRecord {
pub fn size(&self) -> ByteSize {
ByteSize::b(self.key.len() as u64 + self.value.len() as u64)
let block_hash = *block_header.hash();
let shard_layout = epoch_manager.get_shard_layout_from_prev_block(&block_hash).unwrap();

let flat_storage_manager = runtime.get_flat_storage_manager();
for shard_uid in shard_layout.get_shard_uids() {
print_state_stats_for_shard_uid(&store, &flat_storage_manager, block_hash, shard_uid);
}
}

impl Ord for StateStatsStateRecord {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.size().cmp(&other.size()).reverse()
/// Prints the state statistics for a single shard.
fn print_state_stats_for_shard_uid(
store: &Store,
flat_storage_manager: &FlatStorageManager,
block_hash: CryptoHash,
shard_uid: ShardUId,
) {
flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap();
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid);
let chunk_view = flat_storage_manager.chunk_view(shard_uid, block_hash).unwrap();

let mut state_stats = StateStats::default();

// iteratate for the first time to get the size statistics
let group_by = get_state_stats_group_by(&chunk_view, &trie_storage);
let iter = get_state_stats_account_iter(&group_by);
for state_stats_account in iter {
state_stats.push(state_stats_account);
}
}

impl PartialOrd for StateStatsStateRecord {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
// iterate for the second time to find the middle account
let group_by = get_state_stats_group_by(&chunk_view, &trie_storage);
let iter = get_state_stats_account_iter(&group_by);
let mut current_size = ByteSize::default();
for state_stats_account in iter {
current_size += state_stats_account.size;
if 2 * current_size.as_u64() > state_stats.total_size.as_u64() {
state_stats.middle_state_record = Some(state_stats_account);
break;
}
}

tracing::info!(target: "state_viewer", "{shard_uid:?}");
tracing::info!(target: "state_viewer", "{state_stats:#?}");
}

impl std::fmt::Debug for StateStatsStateRecord {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let state_record = StateRecord::from_raw_key_value(self.key.clone(), self.value.clone());
/// Gets the flat state iterator from the chunk view, rearranges it to be sorted
/// by the account id, rather than type, account id and finally groups the
/// records by account id while collecting aggregate statistics.
fn get_state_stats_group_by<'a>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aah this is very clever! Took me some time to understand what's going on, had to read up on the kmerge and group_by functions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It took me real long to get the types, lifespans and whatnots in order but yeah it's pretty now :)

chunk_view: &'a FlatStorageChunkView,
trie_storage: &'a TrieDBStorage,
) -> GroupBy<
AccountId,
impl Iterator<Item = StateStatsStateRecord> + 'a,
impl FnMut(&StateStatsStateRecord) -> AccountId,
> {
// The flat state iterator is sorted by type, account id. In order to
// rearrange it we get the iterators for each type and later merge them by
// the account id.
let type_iters = NON_DELAYED_RECEIPT_COLUMNS
.iter()
.map(|(type_byte, _)| {
chunk_view.iter_flat_state_entries(Some(&[*type_byte]), Some(&[*type_byte + 1]))
})
.into_iter();

// Filter out any errors.
let type_iters = type_iters.map(|type_iter| type_iter.filter_map(|item| item.ok())).into_iter();

// Read the values from and convert items to StateStatsStateRecord.
let type_iters = type_iters
.map(move |type_iter| {
type_iter.filter_map(move |(key, value)| {
let value = read_flat_state_value(&trie_storage, value);
let key_size = key.len() as u64;
let value_size = value.len() as u64;
let size = ByteSize::b(key_size + value_size);
let state_record = StateRecord::from_raw_key_value(key, value);
state_record.map(|state_record| StateStatsStateRecord {
account_id: state_record_to_account_id(&state_record).clone(),
state_record,
size,
})
})
})
.into_iter();

let Some(state_record) = state_record else { return None::<StateRecord>.fmt(f) };
// Merge the iterators for different types. The StateStatsStateRecord
// implements the Ord and PartialOrd traits that compare items by their
// account ids.
let iter = type_iters.kmerge().into_iter();

f.debug_struct("StateStatsStateRecord")
.field("account_id", &state_record_to_account_id(&state_record).as_str())
.field("type", &state_record.get_type_string())
.field("size", &self.size())
.finish()
// Finally, group by the account id.
iter.group_by(|state_record| state_record.account_id.clone())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would group_by &state_record.account_id work here?

}

/// Given the StateStatsStateRecords grouped by the account id returns an
/// iterator of StateStatsAccount.
fn get_state_stats_account_iter<'a>(
group_by: &'a GroupBy<
AccountId,
impl Iterator<Item = StateStatsStateRecord> + 'a,
impl FnMut(&StateStatsStateRecord) -> AccountId,
>,
) -> impl Iterator<Item = StateStatsAccount> + 'a {
// aggregate size for each account id group
group_by
.into_iter()
.map(|(account_id, group)| {
let mut size = ByteSize::b(0);
for state_stats_state_record in group {
size += state_stats_state_record.size;
}
StateStatsAccount { account_id, size }
})
.into_iter()
}

/// Helper function to read the value from flat storage.
/// It either returns the inlined value or reads ref value from the storage.
fn read_flat_state_value(
trie_storage: &TrieDBStorage,
flat_state_value: FlatStateValue,
) -> Vec<u8> {
match flat_state_value {
FlatStateValue::Ref(val) => trie_storage.retrieve_raw_bytes(&val.hash).unwrap().to_vec(),
FlatStateValue::Inlined(val) => val,
}
}

/// StateStats is used for storing the state statistics of a single trie.
#[derive(Default)]
pub struct StateStats {
pub total_size: ByteSize,
pub total_count: usize,

pub top_state_records: BinaryHeap<StateStatsStateRecord>,

pub middle_state_record: Option<StateStatsStateRecord>,
pub middle_state_record: Option<StateStatsAccount>,
pub top_accounts: BinaryHeap<StateStatsAccount>,
}

impl core::fmt::Debug for StateStats {
Expand All @@ -1123,84 +1226,70 @@ impl core::fmt::Debug for StateStats {
.field("total_count", &self.total_count)
.field("average_size", &average_size)
.field("middle_state_record", &self.middle_state_record.as_ref().unwrap())
.field("top_state_records", &self.top_state_records)
.field("top_accounts", &self.top_accounts)
.finish()
}
}

impl StateStats {
pub fn new() -> Self {
Self::default()
}

pub fn push(&mut self, key: Vec<u8>, value: Vec<u8>) -> () {
self.total_size += key.len() as u64 + value.len() as u64;
pub fn push(&mut self, state_stats_account: StateStatsAccount) {
self.total_size += state_stats_account.size;
self.total_count += 1;

self.top_state_records.push(StateStatsStateRecord { key, value });
if self.top_state_records.len() > 5 {
self.top_state_records.pop();
self.top_accounts.push(state_stats_account);
if self.top_accounts.len() > 5 {
self.top_accounts.pop();
}
}
}

fn read_flat_state_value(
trie_storage: &TrieDBStorage,
flat_state_value: FlatStateValue,
) -> Vec<u8> {
match flat_state_value {
FlatStateValue::Ref(val) => trie_storage.retrieve_raw_bytes(&val.hash).unwrap().to_vec(),
FlatStateValue::Inlined(val) => val,
}
/// StateStatsStateRecord stores the state record and associated information.
/// It's used as a helper struct for merging state records from different record types.
#[derive(Eq, PartialEq)]
struct StateStatsStateRecord {
state_record: StateRecord,
account_id: AccountId,
size: ByteSize,
}

pub(crate) fn print_state_stats(home_dir: &Path, store: Store, near_config: NearConfig) {
let (epoch_manager, runtime, _, block_header) =
load_trie(store.clone(), home_dir, &near_config);

let block_hash = *block_header.hash();
let shard_layout = epoch_manager.get_shard_layout_from_prev_block(&block_hash).unwrap();

for shard_uid in shard_layout.get_shard_uids() {
let flat_storage_manager = runtime.get_flat_storage_manager();
flat_storage_manager.create_flat_storage_for_shard(shard_uid).unwrap();
let trie_storage = TrieDBStorage::new(store.clone(), shard_uid);
let chunk_view = flat_storage_manager.chunk_view(shard_uid, block_hash).unwrap();

let mut state_stats = StateStats::new();

let iter = chunk_view.iter_flat_state_entries(None, None);
for item in iter {
let Ok((key, value)) = item else {
continue;
};

let value = read_flat_state_value(&trie_storage, value);

state_stats.push(key, value);
}
impl Ord for StateStatsStateRecord {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.account_id.cmp(&other.account_id)
}
}

let middle_size = state_stats.total_size.as_u64() / 2;
impl PartialOrd for StateStatsStateRecord {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

let mut current_size = 0;
let iter = chunk_view.iter_flat_state_entries(None, None);
for item in iter {
let Ok((key, value)) = item else {
tracing::warn!(target: "state_viewer", ?item, "error in iter item");
continue;
};
/// StateStatsAccount stores aggregated information about an account.
/// It is the result of the grouping of state records belonging to the same account.
#[derive(PartialEq, Eq)]
pub struct StateStatsAccount {
pub account_id: AccountId,
pub size: ByteSize,
}

let value = read_flat_state_value(&trie_storage, value);
impl Ord for StateStatsAccount {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.size.cmp(&other.size).reverse()
}
}

current_size += key.len() + value.len();
if middle_size <= current_size as u64 {
state_stats.middle_state_record = Some(StateStatsStateRecord { key, value });
break;
}
}
impl PartialOrd for StateStatsAccount {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

tracing::info!(target: "state_viewer", "{shard_uid:?}");
tracing::info!(target: "state_viewer", "{state_stats:#?}");
impl std::fmt::Debug for StateStatsAccount {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StateStatsStateRecord")
.field("account_id", &self.account_id.as_str())
.field("size", &self.size)
.finish()
}
}

Expand Down
Loading