Skip to content

Commit

Permalink
refactor(indexer): Move state_changes from StreamerMessage root t…
Browse files Browse the repository at this point in the history
…o `IndexerShard` instead
  • Loading branch information
khorolets committed Feb 7, 2022
1 parent 03e4862 commit f10e529
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 61 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"chain/client-primitives",
"chain/network",
"chain/indexer",
"chain/indexer-primitives",
"chain/jsonrpc",
"chain/jsonrpc/fuzz",
"chain/jsonrpc/client",
Expand Down
11 changes: 10 additions & 1 deletion chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use near_primitives::hash::CryptoHash;
use near_primitives::merkle::{MerklePath, PartialMerkleTree};
use near_primitives::sharding::ChunkHash;
use near_primitives::types::{
AccountId, BlockHeight, BlockReference, EpochReference, MaybeBlockId, ShardId,
AccountId, BlockHeight, BlockReference, EpochId, EpochReference, MaybeBlockId, ShardId,
TransactionOrReceiptId,
};
use near_primitives::utils::generate_random_string;
Expand Down Expand Up @@ -619,6 +619,15 @@ impl Message for GetStateChangesWithCauseInBlock {
type Result = Result<StateChangesView, GetStateChangesError>;
}

pub struct GetStateChangesWithCauseInBlockForTrackedShards {
pub block_hash: CryptoHash,
pub epoch_id: EpochId,
}

impl Message for GetStateChangesWithCauseInBlockForTrackedShards {
type Result = Result<HashMap<ShardId, StateChangesView>, GetStateChangesError>;
}

pub struct GetExecutionOutcome {
pub id: TransactionOrReceiptId,
}
Expand Down
5 changes: 3 additions & 2 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ pub use near_client_primitives::types::{
Error, GetBlock, GetBlockProof, GetBlockProofResponse, GetBlockWithMerkleTree, GetChunk,
GetExecutionOutcome, GetExecutionOutcomeResponse, GetExecutionOutcomesForBlock, GetGasPrice,
GetNetworkInfo, GetNextLightClientBlock, GetProtocolConfig, GetReceipt, GetStateChanges,
GetStateChangesInBlock, GetStateChangesWithCauseInBlock, GetValidatorInfo, GetValidatorOrdered,
Query, QueryError, Status, StatusResponse, SyncStatus, TxStatus, TxStatusError,
GetStateChangesInBlock, GetStateChangesWithCauseInBlock,
GetStateChangesWithCauseInBlockForTrackedShards, GetValidatorInfo, GetValidatorOrdered, Query,
QueryError, Status, StatusResponse, SyncStatus, TxStatus, TxStatusError,
};

pub use crate::client::Client;
Expand Down
62 changes: 57 additions & 5 deletions chain/client/src/view_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use near_client_primitives::types::{
GetBlockWithMerkleTree, GetChunkError, GetExecutionOutcome, GetExecutionOutcomeError,
GetExecutionOutcomesForBlock, GetGasPrice, GetGasPriceError, GetNextLightClientBlockError,
GetProtocolConfig, GetProtocolConfigError, GetReceipt, GetReceiptError, GetStateChangesError,
GetStateChangesWithCauseInBlock, GetValidatorInfoError, Query, QueryError, TxStatus,
TxStatusError,
GetStateChangesWithCauseInBlock, GetStateChangesWithCauseInBlockForTrackedShards,
GetValidatorInfoError, Query, QueryError, TxStatus, TxStatusError,
};
use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest};
#[cfg(feature = "test_features")]
Expand All @@ -44,14 +44,14 @@ use near_primitives::syncing::{
};
use near_primitives::types::{
AccountId, BlockHeight, BlockId, BlockReference, EpochId, EpochReference, Finality,
MaybeBlockId, ShardId, TransactionOrReceiptId,
MaybeBlockId, ShardId, StateChangeValue, TransactionOrReceiptId,
};
use near_primitives::views::validator_stake_view::ValidatorStakeView;
use near_primitives::views::{
BlockView, ChunkView, EpochValidatorInfo, ExecutionOutcomeWithIdView,
FinalExecutionOutcomeView, FinalExecutionOutcomeViewEnum, FinalExecutionStatus, GasPriceView,
LightClientBlockView, QueryRequest, QueryResponse, ReceiptView, StateChangesKindsView,
StateChangesView,
LightClientBlockView, QueryRequest, QueryResponse, ReceiptView, StateChangeWithCauseView,
StateChangesKindsView, StateChangesView,
};

use crate::{
Expand Down Expand Up @@ -758,6 +758,58 @@ impl Handler<GetStateChangesWithCauseInBlock> for ViewClientActor {
}
}

/// Returns a hashmap where the key represents the ShardID and the value
/// is the list of changes in a store with causes for a given block.
impl Handler<GetStateChangesWithCauseInBlockForTrackedShards> for ViewClientActor {
type Result = Result<HashMap<ShardId, StateChangesView>, GetStateChangesError>;

#[perf]
fn handle(
&mut self,
msg: GetStateChangesWithCauseInBlockForTrackedShards,
_: &mut Self::Context,
) -> Self::Result {
let state_changes_with_cause_in_block =
self.chain.store().get_state_changes_with_cause_in_block(&msg.block_hash)?;

let mut state_changes_with_cause_split_by_shard_id: HashMap<ShardId, StateChangesView> =
HashMap::new();
for state_change_with_cause in state_changes_with_cause_in_block {
let account_id = match &state_change_with_cause.value {
StateChangeValue::AccountUpdate { account_id, .. } => account_id.clone(),
StateChangeValue::AccountDeletion { account_id } => account_id.clone(),
StateChangeValue::AccessKeyUpdate { account_id, .. } => account_id.clone(),
StateChangeValue::AccessKeyDeletion { account_id, .. } => account_id.clone(),
StateChangeValue::DataUpdate { account_id, .. } => account_id.clone(),
StateChangeValue::DataDeletion { account_id, .. } => account_id.clone(),
StateChangeValue::ContractCodeUpdate { account_id, .. } => account_id.clone(),
StateChangeValue::ContractCodeDeletion { account_id } => account_id.clone(),
};
let shard_id = if let Ok(shard_id) =
self.runtime_adapter.account_id_to_shard_id(&account_id, &msg.epoch_id)
{
shard_id
} else {
return Err(GetStateChangesError::IOError {
error_message: format!("Failed to get ShardID from AccountID {}", account_id),
});
};

let mut state_changes = if let Some(changes_with_cause) =
state_changes_with_cause_split_by_shard_id.remove(&shard_id)
{
changes_with_cause
} else {
Vec::<StateChangeWithCauseView>::new()
};
state_changes.push(state_change_with_cause.into());
state_changes_with_cause_split_by_shard_id.insert(shard_id, state_changes);
}

Ok(state_changes_with_cause_split_by_shard_id)
}
}

/// Returns the next light client block, given the hash of the last block known to the light client.
/// There are three cases:
/// 1. The last block known to the light client is in the same epoch as the tip:
Expand Down
16 changes: 16 additions & 0 deletions chain/indexer-primitives/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "indexer-primitives"
version = "0.0.0"
authors = ["Near Inc <hello@nearprotocol.com>"]
publish = true
# Please update rust-toolchain.toml as well when changing version here:
rust-version = "1.56.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde = { version = "1", features = [ "derive" ] }
serde_json = "1.0.55"

near-primitives = { path = "../../core/primitives" }
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub use near_primitives::{types, views};
pub struct StreamerMessage {
pub block: views::BlockView,
pub shards: Vec<IndexerShard>,
pub state_changes: views::StateChangesView,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -42,4 +41,5 @@ pub struct IndexerShard {
pub shard_id: types::ShardId,
pub chunk: Option<IndexerChunkView>,
pub receipt_execution_outcomes: Vec<IndexerExecutionOutcomeWithReceipt>,
pub state_changes: views::StateChangesView,
}
86 changes: 49 additions & 37 deletions chain/indexer/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,56 +1,68 @@
# Changelog

## 1.25.x (UNRELEASED)

- `state_changes` is moved to `IndexerShard` struct and represent only
the state changes happened on the specific shard
- All the NEAR Indexer Framework types was extracted to a separate crate `indexer-primitives`

## Breaking changes

The field `state_changes` is moved from the root of `StreamerMessage`
to the `IndexerShard.state_changes` and now contains only changes related
to the specific shard.

## 0.10.1

* (mainnet only) Add additional handler to inject restored receipts to the block #47317863. See [PR 4248](https://github.com/near/nearcore/pull/4248) for reference
- (mainnet only) Add additional handler to inject restored receipts to the block #47317863. See [PR 4248](https://github.com/near/nearcore/pull/4248) for reference

## 0.10.0

* Add additional logs on Indexer Framework start
* Avoid double genesis validation by removing the explicit validation on Indexer instantiation
* Replaced the method how genesis is being read to optimize memory usage
- Add additional logs on Indexer Framework start
- Avoid double genesis validation by removing the explicit validation on Indexer instantiation
- Replaced the method how genesis is being read to optimize memory usage

## Breaking changes

Since the change of reading genesis method to optimize memory usage. You'd be able to iterate over genesis records with `near_config.genesis.for_each_record(|record| {...})`. Nothing is changed for you your indexer does nothing about genesis records.

## 0.9.2

* Optimize the delayed receipts tracking process introduced in previous version to avoid indexer stuck.
- Optimize the delayed receipts tracking process introduced in previous version to avoid indexer stuck.

## 0.9.1

* Introduce a hot-fix. Execution outcome for local receipt might appear not in the same block as the receipt. Local receipts are not saved in database and unable to be fetched. To include a receipt in `IndexerExecutionOutcomeWithReceipt` and prevent NEAR Indexer Framework from panic we fetch previous blocks to find corresponding local receipt to include.
- Introduce a hot-fix. Execution outcome for local receipt might appear not in the same block as the receipt. Local receipts are not saved in database and unable to be fetched. To include a receipt in `IndexerExecutionOutcomeWithReceipt` and prevent NEAR Indexer Framework from panic we fetch previous blocks to find corresponding local receipt to include.

## 0.9.0 (do not use this version, it contains a bug)

* Introduce `IndexerShard` structure which contains corresponding chunks and `IndexerExecutionOutcomeWithReceipt`
* `receipt` field in `IndexerExecutionOutcomeWithReceipt` is no longer optional as it used to be always set anyway,
so now we explicitly communicate this relation ("every outcome has a corresponding receipt") through the type system
* Introduce `IndexerExecutionOutcomeWithOptionalReceipt` which is the same as `IndexerExecutionOutcomeWithReceipt`
but with optional `receipt` field.
- Introduce `IndexerShard` structure which contains corresponding chunks and `IndexerExecutionOutcomeWithReceipt`
- `receipt` field in `IndexerExecutionOutcomeWithReceipt` is no longer optional as it used to be always set anyway,
so now we explicitly communicate this relation ("every outcome has a corresponding receipt") through the type system
- Introduce `IndexerExecutionOutcomeWithOptionalReceipt` which is the same as `IndexerExecutionOutcomeWithReceipt`
but with optional `receipt` field.

## Breaking changes

* `IndexerChunkView` doesn't contain field `receipt_execution_outcomes` anymore, this field has been moved to `IndexerShard`
* `StreamerMessage` structure was aligned more with NEAR Protocol specification and now looks like:
- `IndexerChunkView` doesn't contain field `receipt_execution_outcomes` anymore, this field has been moved to `IndexerShard`
- `StreamerMessage` structure was aligned more with NEAR Protocol specification and now looks like:
```
StreamerMessage {
block: BlockView,
shards: Vec<IndexerShard>,
state_changes: StateChangesView,
state_changes: StateChangesView,
}
```

## 0.8.1

* Add `InitConfigArgs` and `indexer_init_configs`
- Add `InitConfigArgs` and `indexer_init_configs`

As current `neard::init_configs()` signature is a bit hard to read and use we introduce `InitConfigArgs` struct to make a process of passing arguments more explicit. That's why we introduce `indexer_init_configs` which is just a wrapper on `neard::init_configs()` but takes `dir` and `InitConfigArgs` as an input.

## 0.8.0

* Upgrade dependencies
- Upgrade dependencies

## Breaking change

Expand All @@ -60,57 +72,57 @@ created and started on the Indexer implementation, not on the Indexer Framework

## 0.7.0

* State changes return changes with cause instead of kinds
- State changes return changes with cause instead of kinds

## Breaking changes

* `StreamerMessage` now contains `StateChangesView` which is an alias for `Vec<StateChangesWithCauseView>`, previously it contained `StateChangesKindsView`
- `StreamerMessage` now contains `StateChangesView` which is an alias for `Vec<StateChangesWithCauseView>`, previously it contained `StateChangesKindsView`

## 0.6.0

* Add a way to turn off the requirement to wait for the node to be fully synced before starting streaming.
- Add a way to turn off the requirement to wait for the node to be fully synced before starting streaming.

## Breaking changes

* `IndexerConfig` was extended with another field `await_for_node_synced`. Corresponding enum is `AwaitForNodeSyncedEnum` with variants:
* `WaitForFullSync` - await for node to be fully synced (previous default behaviour)
* `StreamWhileSyncing`- start streaming right away while node is syncing (it's useful in case of Indexing from genesis)
- `IndexerConfig` was extended with another field `await_for_node_synced`. Corresponding enum is `AwaitForNodeSyncedEnum` with variants:
- `WaitForFullSync` - await for node to be fully synced (previous default behaviour)
- `StreamWhileSyncing`- start streaming right away while node is syncing (it's useful in case of Indexing from genesis)

## 0.5.0

* Attach receipt execution outcomes to a relevant chunk and preserve their execution order (!)
- Attach receipt execution outcomes to a relevant chunk and preserve their execution order (!)

## Breaking changes

Since #3529 nearcore stores `ExecutionOutcome`s in their execution order, and we can also attribute outcomes to specific chunk. That's why:
* `receipt_execution_outcomes` was moved from `StreamerMessage` to a relevant `IndexerChunkView`
* `ExecutionOutcomesWithReceipts` type alias was removed (just use `Vec<IndexerExecutionOutcomeWithReceipt>` instead)

- `receipt_execution_outcomes` was moved from `StreamerMessage` to a relevant `IndexerChunkView`
- `ExecutionOutcomesWithReceipts` type alias was removed (just use `Vec<IndexerExecutionOutcomeWithReceipt>` instead)

## 0.4.0

* Prepend chunk's receipts with local receipts to attach latter to specific chunk
- Prepend chunk's receipts with local receipts to attach latter to specific chunk

## Breaking changes

* For local receipt to have a relation to specific chunk we have prepended them to original receipts in particular chunk
as in the most cases local receipts are executed before normal receipts. That's why there is no reason to have `local_receipts`
field in `StreamerMessage` struct anymore. `local_receipts` field was removed.
- For local receipt to have a relation to specific chunk we have prepended them to original receipts in particular chunk
as in the most cases local receipts are executed before normal receipts. That's why there is no reason to have `local_receipts`
field in `StreamerMessage` struct anymore. `local_receipts` field was removed.

## 0.3.1

* Add local receipt to `receipt_execution_outcomes` if possible
- Add local receipt to `receipt_execution_outcomes` if possible

## 0.3.0


### Breaking changes

* To extended the `receipt_execution_outcomes` with information about the corresponding receipt we had to break the API
(the old outcome structure is just one layer deeper now [under `execution_outcome` field])
- To extended the `receipt_execution_outcomes` with information about the corresponding receipt we had to break the API
(the old outcome structure is just one layer deeper now [under `execution_outcome` field])

## 0.2.0

* Refactor the way of fetching `ExecutionOutcome`s (use the new way to get all of them for specific block)
* Rename `StreamerMessage.outcomes` field to `receipt_execution_outcomes` and change type to `HashMap<CryptoHash, ExecutionOutcomeWithId>` and now it includes only `ExecutionOutcome`s for receipts (no transactions)
* Introduce `IndexerTransactionWithOutcome` struct to contain `SignedTransactionView` and `ExecutionOutcomeWithId` for the transaction
* Introduce `IndexerChunkView` to replace `StreamerMessage.chunks` to include `IndexerTransactionWithOutcome` vec in `transactions`
- Refactor the way of fetching `ExecutionOutcome`s (use the new way to get all of them for specific block)
- Rename `StreamerMessage.outcomes` field to `receipt_execution_outcomes` and change type to `HashMap<CryptoHash, ExecutionOutcomeWithId>` and now it includes only `ExecutionOutcome`s for receipts (no transactions)
- Introduce `IndexerTransactionWithOutcome` struct to contain `SignedTransactionView` and `ExecutionOutcomeWithId` for the transaction
- Introduce `IndexerChunkView` to replace `StreamerMessage.chunks` to include `IndexerTransactionWithOutcome` vec in `transactions`
1 change: 1 addition & 0 deletions chain/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ serde = { version = "1", features = [ "derive" ] }
serde_json = "1.0.55"
tokio = { version = "1.1", features = ["time", "sync"] }

indexer-primitives = { path = "../indexer-primitives" }
nearcore = { path = "../../nearcore" }
near-client = { path = "../client" }
near-chain-configs = { path = "../../core/chain-configs" }
Expand Down
7 changes: 4 additions & 3 deletions chain/indexer/src/streamer/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ pub use near_primitives::hash::CryptoHash;
pub use near_primitives::{types, views};

use super::errors::FailedToFetchData;
use super::types::IndexerExecutionOutcomeWithOptionalReceipt;
use super::INDEXER;
use indexer_primitives::IndexerExecutionOutcomeWithOptionalReceipt;

pub(crate) async fn fetch_status(
client: &Addr<near_client::ClientActor>,
Expand Down Expand Up @@ -60,9 +60,10 @@ pub(crate) async fn fetch_block_by_hash(
pub(crate) async fn fetch_state_changes(
client: &Addr<near_client::ViewClientActor>,
block_hash: CryptoHash,
) -> Result<views::StateChangesView, FailedToFetchData> {
epoch_id: near_primitives::types::EpochId,
) -> Result<HashMap<near_primitives::types::ShardId, views::StateChangesView>, FailedToFetchData> {
client
.send(near_client::GetStateChangesWithCauseInBlock { block_hash })
.send(near_client::GetStateChangesWithCauseInBlockForTrackedShards { block_hash, epoch_id })
.await?
.map_err(|err| FailedToFetchData::String(err.to_string()))
}
Expand Down
Loading

0 comments on commit f10e529

Please sign in to comment.