From f10e5293012214245b454238ae29beded222e76d Mon Sep 17 00:00:00 2001 From: Bohdan Khorolets Date: Mon, 7 Feb 2022 21:32:34 +0200 Subject: [PATCH] refactor(indexer): Move `state_changes` from `StreamerMessage` root to `IndexerShard` instead --- Cargo.lock | 10 +++ Cargo.toml | 1 + chain/client-primitives/src/types.rs | 11 ++- chain/client/src/lib.rs | 5 +- chain/client/src/view_client.rs | 62 +++++++++++-- chain/indexer-primitives/Cargo.toml | 16 ++++ .../src/lib.rs} | 2 +- chain/indexer/CHANGELOG.md | 86 +++++++++++-------- chain/indexer/Cargo.toml | 1 + chain/indexer/src/streamer/fetchers.rs | 7 +- chain/indexer/src/streamer/mod.rs | 34 +++++--- chain/indexer/src/streamer/utils.rs | 2 +- 12 files changed, 176 insertions(+), 61 deletions(-) create mode 100644 chain/indexer-primitives/Cargo.toml rename chain/{indexer/src/streamer/types.rs => indexer-primitives/src/lib.rs} (100%) diff --git a/Cargo.lock b/Cargo.lock index 5d8c297f1b8..f9074fc83a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1987,6 +1987,15 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "indexer-primitives" +version = "0.0.0" +dependencies = [ + "near-primitives", + "serde", + "serde_json", +] + [[package]] name = "indexmap" version = "1.8.0" @@ -2696,6 +2705,7 @@ dependencies = [ "anyhow", "async-recursion", "futures", + "indexer-primitives", "near-chain-configs", "near-client", "near-crypto", diff --git a/Cargo.toml b/Cargo.toml index 63a5978d655..79a56719d5c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ members = [ "chain/client-primitives", "chain/network", "chain/indexer", + "chain/indexer-primitives", "chain/jsonrpc", "chain/jsonrpc/fuzz", "chain/jsonrpc/client", diff --git a/chain/client-primitives/src/types.rs b/chain/client-primitives/src/types.rs index c300eb610fa..941231925da 100644 --- a/chain/client-primitives/src/types.rs +++ b/chain/client-primitives/src/types.rs @@ -13,7 +13,7 @@ use near_primitives::hash::CryptoHash; use near_primitives::merkle::{MerklePath, PartialMerkleTree}; use near_primitives::sharding::ChunkHash; use near_primitives::types::{ - AccountId, BlockHeight, BlockReference, EpochReference, MaybeBlockId, ShardId, + AccountId, BlockHeight, BlockReference, EpochId, EpochReference, MaybeBlockId, ShardId, TransactionOrReceiptId, }; use near_primitives::utils::generate_random_string; @@ -619,6 +619,15 @@ impl Message for GetStateChangesWithCauseInBlock { type Result = Result; } +pub struct GetStateChangesWithCauseInBlockForTrackedShards { + pub block_hash: CryptoHash, + pub epoch_id: EpochId, +} + +impl Message for GetStateChangesWithCauseInBlockForTrackedShards { + type Result = Result, GetStateChangesError>; +} + pub struct GetExecutionOutcome { pub id: TransactionOrReceiptId, } diff --git a/chain/client/src/lib.rs b/chain/client/src/lib.rs index 5710706a924..839506d8d1d 100644 --- a/chain/client/src/lib.rs +++ b/chain/client/src/lib.rs @@ -2,8 +2,9 @@ pub use near_client_primitives::types::{ Error, GetBlock, GetBlockProof, GetBlockProofResponse, GetBlockWithMerkleTree, GetChunk, GetExecutionOutcome, GetExecutionOutcomeResponse, GetExecutionOutcomesForBlock, GetGasPrice, GetNetworkInfo, GetNextLightClientBlock, GetProtocolConfig, GetReceipt, GetStateChanges, - GetStateChangesInBlock, GetStateChangesWithCauseInBlock, GetValidatorInfo, GetValidatorOrdered, - Query, QueryError, Status, StatusResponse, SyncStatus, TxStatus, TxStatusError, + GetStateChangesInBlock, GetStateChangesWithCauseInBlock, + GetStateChangesWithCauseInBlockForTrackedShards, GetValidatorInfo, GetValidatorOrdered, Query, + QueryError, Status, StatusResponse, SyncStatus, TxStatus, TxStatusError, }; pub use crate::client::Client; diff --git a/chain/client/src/view_client.rs b/chain/client/src/view_client.rs index ad4cee01009..854b2e83765 100644 --- a/chain/client/src/view_client.rs +++ b/chain/client/src/view_client.rs @@ -22,8 +22,8 @@ use near_client_primitives::types::{ GetBlockWithMerkleTree, GetChunkError, GetExecutionOutcome, GetExecutionOutcomeError, GetExecutionOutcomesForBlock, GetGasPrice, GetGasPriceError, GetNextLightClientBlockError, GetProtocolConfig, GetProtocolConfigError, GetReceipt, GetReceiptError, GetStateChangesError, - GetStateChangesWithCauseInBlock, GetValidatorInfoError, Query, QueryError, TxStatus, - TxStatusError, + GetStateChangesWithCauseInBlock, GetStateChangesWithCauseInBlockForTrackedShards, + GetValidatorInfoError, Query, QueryError, TxStatus, TxStatusError, }; use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest}; #[cfg(feature = "test_features")] @@ -44,14 +44,14 @@ use near_primitives::syncing::{ }; use near_primitives::types::{ AccountId, BlockHeight, BlockId, BlockReference, EpochId, EpochReference, Finality, - MaybeBlockId, ShardId, TransactionOrReceiptId, + MaybeBlockId, ShardId, StateChangeValue, TransactionOrReceiptId, }; use near_primitives::views::validator_stake_view::ValidatorStakeView; use near_primitives::views::{ BlockView, ChunkView, EpochValidatorInfo, ExecutionOutcomeWithIdView, FinalExecutionOutcomeView, FinalExecutionOutcomeViewEnum, FinalExecutionStatus, GasPriceView, - LightClientBlockView, QueryRequest, QueryResponse, ReceiptView, StateChangesKindsView, - StateChangesView, + LightClientBlockView, QueryRequest, QueryResponse, ReceiptView, StateChangeWithCauseView, + StateChangesKindsView, StateChangesView, }; use crate::{ @@ -758,6 +758,58 @@ impl Handler for ViewClientActor { } } +/// Returns a hashmap where the key represents the ShardID and the value +/// is the list of changes in a store with causes for a given block. +impl Handler for ViewClientActor { + type Result = Result, GetStateChangesError>; + + #[perf] + fn handle( + &mut self, + msg: GetStateChangesWithCauseInBlockForTrackedShards, + _: &mut Self::Context, + ) -> Self::Result { + let state_changes_with_cause_in_block = + self.chain.store().get_state_changes_with_cause_in_block(&msg.block_hash)?; + + let mut state_changes_with_cause_split_by_shard_id: HashMap = + HashMap::new(); + for state_change_with_cause in state_changes_with_cause_in_block { + let account_id = match &state_change_with_cause.value { + StateChangeValue::AccountUpdate { account_id, .. } => account_id.clone(), + StateChangeValue::AccountDeletion { account_id } => account_id.clone(), + StateChangeValue::AccessKeyUpdate { account_id, .. } => account_id.clone(), + StateChangeValue::AccessKeyDeletion { account_id, .. } => account_id.clone(), + StateChangeValue::DataUpdate { account_id, .. } => account_id.clone(), + StateChangeValue::DataDeletion { account_id, .. } => account_id.clone(), + StateChangeValue::ContractCodeUpdate { account_id, .. } => account_id.clone(), + StateChangeValue::ContractCodeDeletion { account_id } => account_id.clone(), + }; + let shard_id = if let Ok(shard_id) = + self.runtime_adapter.account_id_to_shard_id(&account_id, &msg.epoch_id) + { + shard_id + } else { + return Err(GetStateChangesError::IOError { + error_message: format!("Failed to get ShardID from AccountID {}", account_id), + }); + }; + + let mut state_changes = if let Some(changes_with_cause) = + state_changes_with_cause_split_by_shard_id.remove(&shard_id) + { + changes_with_cause + } else { + Vec::::new() + }; + state_changes.push(state_change_with_cause.into()); + state_changes_with_cause_split_by_shard_id.insert(shard_id, state_changes); + } + + Ok(state_changes_with_cause_split_by_shard_id) + } +} + /// Returns the next light client block, given the hash of the last block known to the light client. /// There are three cases: /// 1. The last block known to the light client is in the same epoch as the tip: diff --git a/chain/indexer-primitives/Cargo.toml b/chain/indexer-primitives/Cargo.toml new file mode 100644 index 00000000000..4626fcfa999 --- /dev/null +++ b/chain/indexer-primitives/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "indexer-primitives" +version = "0.0.0" +authors = ["Near Inc "] +publish = true +# Please update rust-toolchain.toml as well when changing version here: +rust-version = "1.56.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +serde = { version = "1", features = [ "derive" ] } +serde_json = "1.0.55" + +near-primitives = { path = "../../core/primitives" } \ No newline at end of file diff --git a/chain/indexer/src/streamer/types.rs b/chain/indexer-primitives/src/lib.rs similarity index 100% rename from chain/indexer/src/streamer/types.rs rename to chain/indexer-primitives/src/lib.rs index fbddfedf16c..1001bf74cf1 100644 --- a/chain/indexer/src/streamer/types.rs +++ b/chain/indexer-primitives/src/lib.rs @@ -8,7 +8,6 @@ pub use near_primitives::{types, views}; pub struct StreamerMessage { pub block: views::BlockView, pub shards: Vec, - pub state_changes: views::StateChangesView, } #[derive(Debug, Serialize, Deserialize)] @@ -42,4 +41,5 @@ pub struct IndexerShard { pub shard_id: types::ShardId, pub chunk: Option, pub receipt_execution_outcomes: Vec, + pub state_changes: views::StateChangesView, } diff --git a/chain/indexer/CHANGELOG.md b/chain/indexer/CHANGELOG.md index a15c788686c..13757e5e683 100644 --- a/chain/indexer/CHANGELOG.md +++ b/chain/indexer/CHANGELOG.md @@ -1,14 +1,26 @@ # Changelog +## 1.25.x (UNRELEASED) + +- `state_changes` is moved to `IndexerShard` struct and represent only + the state changes happened on the specific shard +- All the NEAR Indexer Framework types was extracted to a separate crate `indexer-primitives` + +## Breaking changes + +The field `state_changes` is moved from the root of `StreamerMessage` +to the `IndexerShard.state_changes` and now contains only changes related +to the specific shard. + ## 0.10.1 -* (mainnet only) Add additional handler to inject restored receipts to the block #47317863. See [PR 4248](https://github.com/near/nearcore/pull/4248) for reference +- (mainnet only) Add additional handler to inject restored receipts to the block #47317863. See [PR 4248](https://github.com/near/nearcore/pull/4248) for reference ## 0.10.0 -* Add additional logs on Indexer Framework start -* Avoid double genesis validation by removing the explicit validation on Indexer instantiation -* Replaced the method how genesis is being read to optimize memory usage +- Add additional logs on Indexer Framework start +- Avoid double genesis validation by removing the explicit validation on Indexer instantiation +- Replaced the method how genesis is being read to optimize memory usage ## Breaking changes @@ -16,41 +28,41 @@ Since the change of reading genesis method to optimize memory usage. You'd be ab ## 0.9.2 -* Optimize the delayed receipts tracking process introduced in previous version to avoid indexer stuck. +- Optimize the delayed receipts tracking process introduced in previous version to avoid indexer stuck. ## 0.9.1 -* Introduce a hot-fix. Execution outcome for local receipt might appear not in the same block as the receipt. Local receipts are not saved in database and unable to be fetched. To include a receipt in `IndexerExecutionOutcomeWithReceipt` and prevent NEAR Indexer Framework from panic we fetch previous blocks to find corresponding local receipt to include. +- Introduce a hot-fix. Execution outcome for local receipt might appear not in the same block as the receipt. Local receipts are not saved in database and unable to be fetched. To include a receipt in `IndexerExecutionOutcomeWithReceipt` and prevent NEAR Indexer Framework from panic we fetch previous blocks to find corresponding local receipt to include. ## 0.9.0 (do not use this version, it contains a bug) -* Introduce `IndexerShard` structure which contains corresponding chunks and `IndexerExecutionOutcomeWithReceipt` -* `receipt` field in `IndexerExecutionOutcomeWithReceipt` is no longer optional as it used to be always set anyway, -so now we explicitly communicate this relation ("every outcome has a corresponding receipt") through the type system -* Introduce `IndexerExecutionOutcomeWithOptionalReceipt` which is the same as `IndexerExecutionOutcomeWithReceipt` -but with optional `receipt` field. +- Introduce `IndexerShard` structure which contains corresponding chunks and `IndexerExecutionOutcomeWithReceipt` +- `receipt` field in `IndexerExecutionOutcomeWithReceipt` is no longer optional as it used to be always set anyway, + so now we explicitly communicate this relation ("every outcome has a corresponding receipt") through the type system +- Introduce `IndexerExecutionOutcomeWithOptionalReceipt` which is the same as `IndexerExecutionOutcomeWithReceipt` + but with optional `receipt` field. ## Breaking changes -* `IndexerChunkView` doesn't contain field `receipt_execution_outcomes` anymore, this field has been moved to `IndexerShard` -* `StreamerMessage` structure was aligned more with NEAR Protocol specification and now looks like: +- `IndexerChunkView` doesn't contain field `receipt_execution_outcomes` anymore, this field has been moved to `IndexerShard` +- `StreamerMessage` structure was aligned more with NEAR Protocol specification and now looks like: ``` StreamerMessage { block: BlockView, shards: Vec, - state_changes: StateChangesView, + state_changes: StateChangesView, } ``` ## 0.8.1 -* Add `InitConfigArgs` and `indexer_init_configs` +- Add `InitConfigArgs` and `indexer_init_configs` As current `neard::init_configs()` signature is a bit hard to read and use we introduce `InitConfigArgs` struct to make a process of passing arguments more explicit. That's why we introduce `indexer_init_configs` which is just a wrapper on `neard::init_configs()` but takes `dir` and `InitConfigArgs` as an input. ## 0.8.0 -* Upgrade dependencies +- Upgrade dependencies ## Breaking change @@ -60,57 +72,57 @@ created and started on the Indexer implementation, not on the Indexer Framework ## 0.7.0 -* State changes return changes with cause instead of kinds +- State changes return changes with cause instead of kinds ## Breaking changes -* `StreamerMessage` now contains `StateChangesView` which is an alias for `Vec`, previously it contained `StateChangesKindsView` +- `StreamerMessage` now contains `StateChangesView` which is an alias for `Vec`, previously it contained `StateChangesKindsView` ## 0.6.0 -* Add a way to turn off the requirement to wait for the node to be fully synced before starting streaming. +- Add a way to turn off the requirement to wait for the node to be fully synced before starting streaming. ## Breaking changes -* `IndexerConfig` was extended with another field `await_for_node_synced`. Corresponding enum is `AwaitForNodeSyncedEnum` with variants: - * `WaitForFullSync` - await for node to be fully synced (previous default behaviour) - * `StreamWhileSyncing`- start streaming right away while node is syncing (it's useful in case of Indexing from genesis) +- `IndexerConfig` was extended with another field `await_for_node_synced`. Corresponding enum is `AwaitForNodeSyncedEnum` with variants: + - `WaitForFullSync` - await for node to be fully synced (previous default behaviour) + - `StreamWhileSyncing`- start streaming right away while node is syncing (it's useful in case of Indexing from genesis) ## 0.5.0 -* Attach receipt execution outcomes to a relevant chunk and preserve their execution order (!) +- Attach receipt execution outcomes to a relevant chunk and preserve their execution order (!) ## Breaking changes Since #3529 nearcore stores `ExecutionOutcome`s in their execution order, and we can also attribute outcomes to specific chunk. That's why: -* `receipt_execution_outcomes` was moved from `StreamerMessage` to a relevant `IndexerChunkView` -* `ExecutionOutcomesWithReceipts` type alias was removed (just use `Vec` instead) + +- `receipt_execution_outcomes` was moved from `StreamerMessage` to a relevant `IndexerChunkView` +- `ExecutionOutcomesWithReceipts` type alias was removed (just use `Vec` instead) ## 0.4.0 -* Prepend chunk's receipts with local receipts to attach latter to specific chunk +- Prepend chunk's receipts with local receipts to attach latter to specific chunk ## Breaking changes -* For local receipt to have a relation to specific chunk we have prepended them to original receipts in particular chunk -as in the most cases local receipts are executed before normal receipts. That's why there is no reason to have `local_receipts` -field in `StreamerMessage` struct anymore. `local_receipts` field was removed. +- For local receipt to have a relation to specific chunk we have prepended them to original receipts in particular chunk + as in the most cases local receipts are executed before normal receipts. That's why there is no reason to have `local_receipts` + field in `StreamerMessage` struct anymore. `local_receipts` field was removed. ## 0.3.1 -* Add local receipt to `receipt_execution_outcomes` if possible +- Add local receipt to `receipt_execution_outcomes` if possible ## 0.3.0 - ### Breaking changes -* To extended the `receipt_execution_outcomes` with information about the corresponding receipt we had to break the API -(the old outcome structure is just one layer deeper now [under `execution_outcome` field]) +- To extended the `receipt_execution_outcomes` with information about the corresponding receipt we had to break the API + (the old outcome structure is just one layer deeper now [under `execution_outcome` field]) ## 0.2.0 -* Refactor the way of fetching `ExecutionOutcome`s (use the new way to get all of them for specific block) -* Rename `StreamerMessage.outcomes` field to `receipt_execution_outcomes` and change type to `HashMap` and now it includes only `ExecutionOutcome`s for receipts (no transactions) -* Introduce `IndexerTransactionWithOutcome` struct to contain `SignedTransactionView` and `ExecutionOutcomeWithId` for the transaction -* Introduce `IndexerChunkView` to replace `StreamerMessage.chunks` to include `IndexerTransactionWithOutcome` vec in `transactions` +- Refactor the way of fetching `ExecutionOutcome`s (use the new way to get all of them for specific block) +- Rename `StreamerMessage.outcomes` field to `receipt_execution_outcomes` and change type to `HashMap` and now it includes only `ExecutionOutcome`s for receipts (no transactions) +- Introduce `IndexerTransactionWithOutcome` struct to contain `SignedTransactionView` and `ExecutionOutcomeWithId` for the transaction +- Introduce `IndexerChunkView` to replace `StreamerMessage.chunks` to include `IndexerTransactionWithOutcome` vec in `transactions` diff --git a/chain/indexer/Cargo.toml b/chain/indexer/Cargo.toml index 41b13cc515b..a12cc4a3d43 100644 --- a/chain/indexer/Cargo.toml +++ b/chain/indexer/Cargo.toml @@ -18,6 +18,7 @@ serde = { version = "1", features = [ "derive" ] } serde_json = "1.0.55" tokio = { version = "1.1", features = ["time", "sync"] } +indexer-primitives = { path = "../indexer-primitives" } nearcore = { path = "../../nearcore" } near-client = { path = "../client" } near-chain-configs = { path = "../../core/chain-configs" } diff --git a/chain/indexer/src/streamer/fetchers.rs b/chain/indexer/src/streamer/fetchers.rs index 67b827007e2..ecdbb8dc542 100644 --- a/chain/indexer/src/streamer/fetchers.rs +++ b/chain/indexer/src/streamer/fetchers.rs @@ -10,8 +10,8 @@ pub use near_primitives::hash::CryptoHash; pub use near_primitives::{types, views}; use super::errors::FailedToFetchData; -use super::types::IndexerExecutionOutcomeWithOptionalReceipt; use super::INDEXER; +use indexer_primitives::IndexerExecutionOutcomeWithOptionalReceipt; pub(crate) async fn fetch_status( client: &Addr, @@ -60,9 +60,10 @@ pub(crate) async fn fetch_block_by_hash( pub(crate) async fn fetch_state_changes( client: &Addr, block_hash: CryptoHash, -) -> Result { + epoch_id: near_primitives::types::EpochId, +) -> Result, FailedToFetchData> { client - .send(near_client::GetStateChangesWithCauseInBlock { block_hash }) + .send(near_client::GetStateChangesWithCauseInBlockForTrackedShards { block_hash, epoch_id }) .await? .map_err(|err| FailedToFetchData::String(err.to_string())) } diff --git a/chain/indexer/src/streamer/mod.rs b/chain/indexer/src/streamer/mod.rs index 01aacc56ec6..83b5d2785d8 100644 --- a/chain/indexer/src/streamer/mod.rs +++ b/chain/indexer/src/streamer/mod.rs @@ -17,18 +17,17 @@ use self::fetchers::{ fetch_block_by_hash, fetch_block_by_height, fetch_block_chunks, fetch_latest_block, fetch_outcomes, fetch_state_changes, fetch_status, }; -pub use self::types::{ +use self::utils::convert_transactions_sir_into_local_receipts; +use crate::streamer::fetchers::fetch_protocol_config; +use crate::INDEXER; +pub use indexer_primitives::{ IndexerChunkView, IndexerExecutionOutcomeWithOptionalReceipt, IndexerExecutionOutcomeWithReceipt, IndexerShard, IndexerTransactionWithOutcome, StreamerMessage, }; -use self::utils::convert_transactions_sir_into_local_receipts; -use crate::streamer::fetchers::fetch_protocol_config; -use crate::INDEXER; mod errors; mod fetchers; -mod types; mod utils; const INTERVAL: Duration = Duration::from_millis(500); @@ -77,14 +76,31 @@ async fn build_streamer_message( as near_primitives::types::NumShards; let mut shards_outcomes = fetch_outcomes(&client, block.header.hash).await?; + let mut state_changes = fetch_state_changes( + &client, + block.header.hash, + near_primitives::types::EpochId(block.header.epoch_id.clone()), + ) + .await?; let mut indexer_shards = (0..num_shards) - .map(|shard_id| IndexerShard { shard_id, chunk: None, receipt_execution_outcomes: vec![] }) + .map(|shard_id| IndexerShard { + shard_id, + chunk: None, + receipt_execution_outcomes: vec![], + state_changes: vec![], + }) .collect::>(); for chunk in chunks { let views::ChunkView { transactions, author, header, receipts: chunk_non_local_receipts } = chunk; + let shard_id = header.shard_id.clone() as usize; + + indexer_shards[shard_id].state_changes = state_changes + .remove(&header.shard_id) + .expect("StateChanges for given shard should be present"); + let mut outcomes = shards_outcomes .remove(&header.shard_id) .expect("Execution outcomes for given shard should be present"); @@ -128,8 +144,6 @@ async fn build_streamer_message( let mut chunk_receipts = chunk_local_receipts; - let shard_id = header.shard_id.clone() as usize; - let mut receipt_execution_outcomes: Vec = vec![]; for outcome in receipt_outcomes { let IndexerExecutionOutcomeWithOptionalReceipt { execution_outcome, receipt } = outcome; @@ -217,9 +231,7 @@ async fn build_streamer_message( ) } - let state_changes = fetch_state_changes(&client, block.header.hash).await?; - - Ok(StreamerMessage { block, shards: indexer_shards, state_changes }) + Ok(StreamerMessage { block, shards: indexer_shards }) } /// Function that tries to find specific local receipt by it's ID and returns it diff --git a/chain/indexer/src/streamer/utils.rs b/chain/indexer/src/streamer/utils.rs index 598a80bea1a..57add3b323b 100644 --- a/chain/indexer/src/streamer/utils.rs +++ b/chain/indexer/src/streamer/utils.rs @@ -5,7 +5,7 @@ use node_runtime::config::tx_cost; use super::errors::FailedToFetchData; use super::fetchers::fetch_block_by_hash; -use super::IndexerTransactionWithOutcome; +use indexer_primitives::IndexerTransactionWithOutcome; pub(crate) async fn convert_transactions_sir_into_local_receipts( client: &Addr,