Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(indexer): Move state_changes from StreamerMessage root to IndexerShard instead #6255

Merged
merged 14 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
/chain/jsonrpc/ @khorolets @rtsai123
/chain/jsonrpc-primitives/ @khorolets @miraclx @rtsai123
/chain/indexer/ @khorolets
/chain/indexer-primitives/ @khorolets
/chain/rosetta-rpc/ @frol @mina86
/utils @pmnoxx @mm-near

Expand Down
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"chain/client-primitives",
"chain/network",
"chain/indexer",
"chain/indexer-primitives",
"chain/jsonrpc",
"chain/jsonrpc/fuzz",
"chain/jsonrpc/client",
Expand Down
11 changes: 10 additions & 1 deletion chain/client-primitives/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use near_primitives::hash::CryptoHash;
use near_primitives::merkle::{MerklePath, PartialMerkleTree};
use near_primitives::sharding::ChunkHash;
use near_primitives::types::{
AccountId, BlockHeight, BlockReference, EpochReference, MaybeBlockId, ShardId,
AccountId, BlockHeight, BlockReference, EpochId, EpochReference, MaybeBlockId, ShardId,
TransactionOrReceiptId,
};
use near_primitives::utils::generate_random_string;
Expand Down Expand Up @@ -626,6 +626,15 @@ impl Message for GetStateChangesWithCauseInBlock {
type Result = Result<StateChangesView, GetStateChangesError>;
}

pub struct GetStateChangesWithCauseInBlockForTrackedShards {
pub block_hash: CryptoHash,
pub epoch_id: EpochId,
}

impl Message for GetStateChangesWithCauseInBlockForTrackedShards {
type Result = Result<HashMap<ShardId, StateChangesView>, GetStateChangesError>;
}

pub struct GetExecutionOutcome {
pub id: TransactionOrReceiptId,
}
Expand Down
6 changes: 3 additions & 3 deletions chain/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ pub use near_client_primitives::types::{
Error, GetBlock, GetBlockHash, GetBlockProof, GetBlockProofResponse, GetBlockWithMerkleTree,
GetChunk, GetExecutionOutcome, GetExecutionOutcomeResponse, GetExecutionOutcomesForBlock,
GetGasPrice, GetNetworkInfo, GetNextLightClientBlock, GetProtocolConfig, GetReceipt,
GetStateChanges, GetStateChangesInBlock, GetStateChangesWithCauseInBlock, GetValidatorInfo,
GetValidatorOrdered, Query, QueryError, Status, StatusResponse, SyncStatus, TxStatus,
TxStatusError,
GetStateChanges, GetStateChangesInBlock, GetStateChangesWithCauseInBlock,
GetStateChangesWithCauseInBlockForTrackedShards, GetValidatorInfo, GetValidatorOrdered, Query,
QueryError, Status, StatusResponse, SyncStatus, TxStatus, TxStatusError,
};

pub use crate::client::Client;
Expand Down
42 changes: 40 additions & 2 deletions chain/client/src/view_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ use near_client_primitives::types::{
GetBlockProofResponse, GetBlockWithMerkleTree, GetChunkError, GetExecutionOutcome,
GetExecutionOutcomeError, GetExecutionOutcomesForBlock, GetGasPrice, GetGasPriceError,
GetNextLightClientBlockError, GetProtocolConfig, GetProtocolConfigError, GetReceipt,
GetReceiptError, GetStateChangesError, GetStateChangesWithCauseInBlock, GetValidatorInfoError,
Query, QueryError, TxStatus, TxStatusError,
GetReceiptError, GetStateChangesError, GetStateChangesWithCauseInBlock,
GetStateChangesWithCauseInBlockForTrackedShards, GetValidatorInfoError, Query, QueryError,
TxStatus, TxStatusError,
};
use near_network::types::{NetworkRequests, PeerManagerAdapter, PeerManagerMessageRequest};
#[cfg(feature = "test_features")]
Expand Down Expand Up @@ -783,6 +784,43 @@ impl Handler<GetStateChangesWithCauseInBlock> for ViewClientActor {
}
}

/// Returns a hashmap where the key represents the ShardID and the value
/// is the list of changes in a store with causes for a given block.
impl Handler<GetStateChangesWithCauseInBlockForTrackedShards> for ViewClientActor {
type Result = Result<HashMap<ShardId, StateChangesView>, GetStateChangesError>;

#[perf]
fn handle(
&mut self,
msg: GetStateChangesWithCauseInBlockForTrackedShards,
_: &mut Self::Context,
) -> Self::Result {
let state_changes_with_cause_in_block =
self.chain.store().get_state_changes_with_cause_in_block(&msg.block_hash)?;

let mut state_changes_with_cause_split_by_shard_id: HashMap<ShardId, StateChangesView> =
HashMap::new();
for state_change_with_cause in state_changes_with_cause_in_block {
let account_id = state_change_with_cause.value.affected_account_id();
let shard_id = match self
.runtime_adapter
.account_id_to_shard_id(account_id, &msg.epoch_id)
{
Ok(shard_id) => shard_id,
Err(err) => {
return Err(GetStateChangesError::IOError { error_message: format!("{}", err) })
}
};

let state_changes =
state_changes_with_cause_split_by_shard_id.entry(shard_id).or_default();
state_changes.push(state_change_with_cause.into());
}

Ok(state_changes_with_cause_split_by_shard_id)
}
}

/// Returns the next light client block, given the hash of the last block known to the light client.
/// There are three cases:
/// 1. The last block known to the light client is in the same epoch as the tip:
Expand Down
19 changes: 19 additions & 0 deletions chain/indexer-primitives/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "near-indexer-primitives"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a readme explaining why this crates needs to exist, what should belong to this crate, and what shouldn't belong to this crate? "this crates holds types" doesn't answer those questions for me -- we might as well keep them in indexer, and we also might add some logic here, why not?

It seems to me that what actually happens is that "this crate defines over-the-wire schema for JSON messages used by indexer HTTP API". Ie, it seems that this is not about Rust, but rather about a network protocol/API we expose. That is, if true, a very important bit of information, and it currently is missing from the description.

🤔 wait, I think the above is not true. I've just tried removing Serialize, Deserialize from all intexer messages, and that seems to work -- nothing in this repository seems to rely on serialization. I guess, I am just confused as to why this particular bunch of types needs special treatment and is worth a separate crate :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matklad I'll add a little README.

We are making NEAR Lake - in a few words it's an indexer that is going to save all the data from the network to AWS S3 in JSON files. We are starting some NEAR Lake Framework which is going to read from AWS S3 and stream all the data. Both of the mentioned projects will use indexer primitives, in nearest future, there are going to be even more projects.

The first goal is to make it possible that projects like NEAR Lake Framework and the ones that use it as a dependency won't require an entire nearcore in dependencies in order to actually use only those primitives.

And we are going to extend the number of types in indexer primitives and adjust the serde_json parameters to improve the way the types are serialized.

I hope that makes sense. Let me know if you have questions regarding this extraction to a separate crate :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, this makes sense! And yeah, I want to stress that the salient point of this thing then is not the Rust types, but rather the JSON schema, and all of the related backwards compatibility concerns.

And yeah, this totally needs to be a separate crate. I wonder though if -types or -views would be a better public name than -primitives. In today's usage primitives is essentially a kitchen sink, and it defines much more than just data schema.

I also wonder if, long term, this should live in some other repository. It feels odd that this defines a data schema, but the actual data isn't used by the repo (so, it's easy for nearcore developers to break this stuff without realizing this). Contrast this with views, which also do define the schema, but this schema is than used by the code in this repository (the node's RPC API). It seems nicer if indexer-framework defines just the Rust API (without Serialize), and let's the consumers define their own data schema.

🤔 yeah, I guess long-term I can see two worlds:

  • In one world, "indexer schema" is something defined completely outside of nearcore as a part of the lake framework. In this world, there's a separate indexer-types crate, but it lives in the repo which implements the actual indexer. We can also imagine two different lakes by two different providers with different schema
  • In another world, we say that "indexer schema" is something universal, and is as much a part of nearcore as JSON RPC API. In this world, I would expect this to be a module in the crate that defines JSON RPC API (so, near_primitives::views::indexer today). In this world, while there might be alternative implementations of the lake, we'd expect them to use the same schema.

But yeah, this is long-term, starting with moving this to a separate crate might make sense (but also do consider just creating a views submodule)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matklad TL;DR: There are several things at play, and you properly identified the long-term plans we actually have.

The types defined in this crate are Rust types for Indexer Framework, which is now tightly coupled with nearcore. You don't see any use of these APIs in nearcore repo because it is designed to be a library (you can find the usage in indexer-for-* projects out there), and we want to stabilize this API in the long term in terms of Rust types and while we already have our own top-level structure, there are still quite some types used in depth (e.g. Transaction and ExecutionOutcome types). Given there is an initiative to decouple JSON RPC types #5516, we decided to postpone solving all the problems.

Second, indeed, we want to be able to serialize the types for near-lake and deserialize it on the reader side without pulling the whole nearcore (we could have get away with pulling the whole nearcore, but due to #3803 it makes things a bit harder on our side, since we often receive questions about M1 support in our tooling and we use M1 ourselves)

Third, there is a plan to further solidify the APIs and extract near-indexer into a separate repo, but to make that happen, nearcore should be published as a crate, and its APIs also stable.

version = "0.0.0"
authors = ["Near Inc <hello@nearprotocol.com>"]
publish = true
# Please update rust-toolchain.toml as well when changing version here:
rust-version = "1.56.0"
edition = "2021"
license = "MIT OR Apache-2.0"
repository = "https://github.com/near/nearcore"
description = "This crate hosts structures for the NEAR Indexer Framework types"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde = { version = "1", features = [ "derive" ] }
serde_json = "1.0.55"

near-primitives = { path = "../../core/primitives" }
1 change: 1 addition & 0 deletions chain/indexer-primitives/LICENSE-APACHE
1 change: 1 addition & 0 deletions chain/indexer-primitives/LICENSE-MIT
3 changes: 3 additions & 0 deletions chain/indexer-primitives/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# near-indexer-primitives

This crate holds the types that is used in NEAR Indexer Framework to allow other projects to use them without a need to depend on entire `nearcore`.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub use near_primitives::{types, views};
pub struct StreamerMessage {
pub block: views::BlockView,
pub shards: Vec<IndexerShard>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have to worry about backwards compatibility ? is this message used in communication between two different binaries ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little bit above in this PR we have talked about it, let me know if that doesn't answer your question

#6255 (comment)

pub state_changes: views::StateChangesView,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -42,4 +41,5 @@ pub struct IndexerShard {
pub shard_id: types::ShardId,
pub chunk: Option<IndexerChunkView>,
pub receipt_execution_outcomes: Vec<IndexerExecutionOutcomeWithReceipt>,
pub state_changes: views::StateChangesView,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have any tests for this ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a conversation re: indexer and tests with @matklad recently

Here's the start #6195 (comment)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so - no tests ? :-(

}
34 changes: 23 additions & 11 deletions chain/indexer/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## 1.26.x (UNRELEASED)

* `state_changes` field is moved from the top-level `StreamerMessage` to `IndexerShard` struct to align better with the sharded nature of NEAR protocol. In the future, when nearcore will be able to track only a subset of shards, this API will work naturally, so we take pro-active measures to solidify the APIs
* All the NEAR Indexer Framework types were extracted to a separate crate `near-indexer-primitives`
* Increase the streamer size from 16 to 100 in order to increase the speed of streaming messages (affects reindexing jobs)

## Breaking changes

The field `state_changes` is moved from the root of `StreamerMessage`
to the `IndexerShard.state_changes` and now contains only changes related
to the specific shard.

## 0.10.1

* (mainnet only) Add additional handler to inject restored receipts to the block #47317863. See [PR 4248](https://github.com/near/nearcore/pull/4248) for reference
Expand All @@ -25,10 +37,10 @@ Since the change of reading genesis method to optimize memory usage. You'd be ab
## 0.9.0 (do not use this version, it contains a bug)

* Introduce `IndexerShard` structure which contains corresponding chunks and `IndexerExecutionOutcomeWithReceipt`
* `receipt` field in `IndexerExecutionOutcomeWithReceipt` is no longer optional as it used to be always set anyway,
so now we explicitly communicate this relation ("every outcome has a corresponding receipt") through the type system
* `receipt` field in `IndexerExecutionOutcomeWithReceipt` is no longer optional as it used to be always set anyway,
so now we explicitly communicate this relation ("every outcome has a corresponding receipt") through the type system
* Introduce `IndexerExecutionOutcomeWithOptionalReceipt` which is the same as `IndexerExecutionOutcomeWithReceipt`
but with optional `receipt` field.
but with optional `receipt` field.

## Breaking changes

Expand All @@ -38,7 +50,7 @@ but with optional `receipt` field.
StreamerMessage {
block: BlockView,
shards: Vec<IndexerShard>,
state_changes: StateChangesView,
state_changes: StateChangesView,
}
```

Expand All @@ -64,7 +76,7 @@ created and started on the Indexer implementation, not on the Indexer Framework

## Breaking changes

* `StreamerMessage` now contains `StateChangesView` which is an alias for `Vec<StateChangesWithCauseView>`, previously it contained `StateChangesKindsView`
* `StreamerMessage` now contains `StateChangesView` which is an alias for `Vec<StateChangesWithCauseView>`, previously it contained `StateChangesKindsView`

## 0.6.0

Expand All @@ -73,8 +85,8 @@ created and started on the Indexer implementation, not on the Indexer Framework
## Breaking changes

* `IndexerConfig` was extended with another field `await_for_node_synced`. Corresponding enum is `AwaitForNodeSyncedEnum` with variants:
* `WaitForFullSync` - await for node to be fully synced (previous default behaviour)
* `StreamWhileSyncing`- start streaming right away while node is syncing (it's useful in case of Indexing from genesis)
- `WaitForFullSync` - await for node to be fully synced (previous default behaviour)
- `StreamWhileSyncing`- start streaming right away while node is syncing (it's useful in case of Indexing from genesis)

## 0.5.0

Expand All @@ -83,6 +95,7 @@ created and started on the Indexer implementation, not on the Indexer Framework
## Breaking changes

Since #3529 nearcore stores `ExecutionOutcome`s in their execution order, and we can also attribute outcomes to specific chunk. That's why:

* `receipt_execution_outcomes` was moved from `StreamerMessage` to a relevant `IndexerChunkView`
* `ExecutionOutcomesWithReceipts` type alias was removed (just use `Vec<IndexerExecutionOutcomeWithReceipt>` instead)

Expand All @@ -93,20 +106,19 @@ Since #3529 nearcore stores `ExecutionOutcome`s in their execution order, and we
## Breaking changes

* For local receipt to have a relation to specific chunk we have prepended them to original receipts in particular chunk
as in the most cases local receipts are executed before normal receipts. That's why there is no reason to have `local_receipts`
field in `StreamerMessage` struct anymore. `local_receipts` field was removed.
as in the most cases local receipts are executed before normal receipts. That's why there is no reason to have `local_receipts`
field in `StreamerMessage` struct anymore. `local_receipts` field was removed.

## 0.3.1

* Add local receipt to `receipt_execution_outcomes` if possible

## 0.3.0


### Breaking changes

* To extended the `receipt_execution_outcomes` with information about the corresponding receipt we had to break the API
(the old outcome structure is just one layer deeper now [under `execution_outcome` field])
(the old outcome structure is just one layer deeper now [under `execution_outcome` field])

## 0.2.0

Expand Down
1 change: 1 addition & 0 deletions chain/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ nearcore = { path = "../../nearcore" }
near-client = { path = "../client" }
near-chain-configs = { path = "../../core/chain-configs" }
near-crypto = { path = "../../core/crypto" }
near-indexer-primitives = { path = "../indexer-primitives" }
near-primitives = { path = "../../core/primitives" }
node-runtime = { path = "../../runtime/runtime" }
8 changes: 4 additions & 4 deletions chain/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
use anyhow::Context;
use tokio::sync::mpsc;

use near_chain_configs::GenesisValidationMode;
pub use near_primitives;
use near_primitives::types::Gas;
pub use nearcore::{get_default_home, init_configs, NearConfig};

pub use self::streamer::{
pub use near_indexer_primitives::{
IndexerChunkView, IndexerExecutionOutcomeWithOptionalReceipt,
IndexerExecutionOutcomeWithReceipt, IndexerShard, IndexerTransactionWithOutcome,
StreamerMessage,
};
use near_chain_configs::GenesisValidationMode;

mod streamer;

Expand Down Expand Up @@ -113,8 +113,8 @@ impl Indexer {
}

/// Boots up `near_indexer::streamer`, so it monitors the new blocks with chunks, transactions, receipts, and execution outcomes inside. The returned stream handler should be drained and handled on the user side.
pub fn streamer(&self) -> mpsc::Receiver<streamer::StreamerMessage> {
let (sender, receiver) = mpsc::channel(16);
pub fn streamer(&self) -> mpsc::Receiver<StreamerMessage> {
let (sender, receiver) = mpsc::channel(100);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was it changed to 100? can you add a comment ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the PR description

Increase the streaming channel size from 16 to 100 (reindexing speed)

actix::spawn(streamer::start(
self.view_client.clone(),
self.client.clone(),
Expand Down
11 changes: 6 additions & 5 deletions chain/indexer/src/streamer/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use actix::Addr;
use futures::stream::StreamExt;
use tracing::warn;

pub use near_primitives::hash::CryptoHash;
pub use near_primitives::{types, views};
use near_indexer_primitives::IndexerExecutionOutcomeWithOptionalReceipt;
use near_primitives::hash::CryptoHash;
use near_primitives::{types, views};

use super::errors::FailedToFetchData;
use super::types::IndexerExecutionOutcomeWithOptionalReceipt;
use super::INDEXER;

pub(crate) async fn fetch_status(
Expand Down Expand Up @@ -60,9 +60,10 @@ pub(crate) async fn fetch_block_by_hash(
pub(crate) async fn fetch_state_changes(
client: &Addr<near_client::ViewClientActor>,
block_hash: CryptoHash,
) -> Result<views::StateChangesView, FailedToFetchData> {
epoch_id: near_primitives::types::EpochId,
) -> Result<HashMap<near_primitives::types::ShardId, views::StateChangesView>, FailedToFetchData> {
client
.send(near_client::GetStateChangesWithCauseInBlock { block_hash })
.send(near_client::GetStateChangesWithCauseInBlockForTrackedShards { block_hash, epoch_id })
.await?
.map_err(|err| FailedToFetchData::String(err.to_string()))
}
Expand Down
Loading