Skip to content

Commit

Permalink
refacor(indexer): Add local cache for delayed local receipts. Metrics…
Browse files Browse the repository at this point in the history
…, log clean up and improvements (#10798)

Today on testnet, we encountered very weird behavior from the indexers.
Indexing BPS dropped to 0.004 blocks per second. This speed is
unacceptable, especially when indexing the blocks already synced by the
node.

After an investigation, we found the suspect. The most obvious suspect
was the process of looking for the local delayed receipt. This kind of
receipt is treated differently by the indexer. On the indexer side, we
tend to include them in the `StreamerMessage` while `nearcore` avoids
storing them to save some space.

During some special periods (e.g., congestion), the number of local
delayed receipts grows, leading to extra work by the indexer.

This PR introduces a local (in-memory) cache for the indexer instance to
keep such receipts until they are needed. This should prevent the
example from having to walk back through history.

When we miss the receipt, we try to find it in previous blocks. Not
knowing where to look exactly, we go back block by block until we find
it. However, we limit this walk up to 1000 blocks because otherwise, the
indexer node would be stuck forever. By the way, that's exactly what it
looks like.

* Add local cache to keep local receipts if we don't observe
`ExecutionOutcome` for them in the same block (Local Delayed Receipts)
* Add a metric to keep track of how far in history we go back when we do
* Clean up and improve some debug messages. I was stupid and didn't know
what I was doing when I coded them three years ago 🤦‍♂️
* Add extra debug message when they were missing
  • Loading branch information
khorolets committed Apr 23, 2024
1 parent 4c8f022 commit 60c8ad8
Show file tree
Hide file tree
Showing 8 changed files with 1,486 additions and 1,324 deletions.
2,540 changes: 1,276 additions & 1,264 deletions Cargo.lock

Large diffs are not rendered by default.

72 changes: 56 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace.package]
version = "1.39.1" # managed by cargo-workspaces, see below
version = "1.39.1" # managed by cargo-workspaces, see below
authors = ["Near Inc <hello@nearprotocol.com>"]
edition = "2021"
rust-version = "1.75.0"
Expand Down Expand Up @@ -113,7 +113,6 @@ anyhow = "1.0.62"
arbitrary = { version = "1.2.3", features = ["derive"] }
arc-swap = "1.5"
assert_matches = "1.5.0"
async-recursion = "1.0.4"
async-trait = "0.1.58"
awc = { version = "3", features = ["openssl"] }
backtrace = "0.3"
Expand All @@ -122,7 +121,9 @@ bencher = "0.1.5"
bitflags = "1.2"
blake2 = "0.9.1"
bn = { package = "zeropool-bn", version = "0.5.11", default-features = false }
bolero = { version = "0.10.0", git = "https://github.com/Ekleog-NEAR/bolero", rev = "56da8e6d1d018519a30b36d85d3a53fe35a42eaf", features = ["arbitrary"] }
bolero = { version = "0.10.0", git = "https://github.com/Ekleog-NEAR/bolero", rev = "56da8e6d1d018519a30b36d85d3a53fe35a42eaf", features = [
"arbitrary",
] }
borsh = { version = "1.0.0", features = ["derive", "rc"] }
bs58 = "0.4"
bytes = "1"
Expand All @@ -137,18 +138,28 @@ clap = { version = "4.2.0", features = ["derive", "env", "string"] }
cloud-storage = "0.11.1"
conqueue = "0.4.0"
cpu-time = "1.0"
criterion = { version = "0.5.1", default_features = false, features = ["html_reports", "cargo_bench_support"] }
criterion = { version = "0.5.1", default_features = false, features = [
"html_reports",
"cargo_bench_support",
] }
crossbeam = "0.8"
crossbeam-channel = "0.5.8"
csv = "1.2.1"
curve25519-dalek = { version = "4.1.1", default-features = false, features = ["alloc", "precomputed-tables", "rand_core"] }
curve25519-dalek = { version = "4.1.1", default-features = false, features = [
"alloc",
"precomputed-tables",
"rand_core",
] }
derive-enum-from-into = "0.1.1"
derive_more = "0.99.9"
dirs = "4"
dynasm = "2.0"
dynasmrt = "2.0"
easy-ext = "0.2"
ed25519-dalek = { version = "2.1.0", default-features = false, features = ["hazmat", "rand_core"] }
ed25519-dalek = { version = "2.1.0", default-features = false, features = [
"hazmat",
"rand_core",
] }
elastic-array = "0.11"
enum-map = "2.1.0"
enumset = "1.0"
Expand Down Expand Up @@ -183,10 +194,14 @@ lru = "0.7.2"
memmap2 = "0.5"
memoffset = "0.8"
more-asserts = "0.2"
near-account-id = { version = "1.0.0-alpha.4", features = ["internal_unstable", "serde", "borsh"] }
near-account-id = { version = "1.0.0-alpha.4", features = [
"internal_unstable",
"serde",
"borsh",
] }
near-actix-test-utils = { path = "test-utils/actix-test-utils" }
near-amend-genesis = { path = "tools/amend-genesis" }
near-database-tool= { path = "tools/database" }
near-database-tool = { path = "tools/database" }
near-async = { path = "core/async" }
near-cache = { path = "utils/near-cache" }
near-chain = { path = "chain/chain" }
Expand All @@ -202,7 +217,7 @@ nearcore = { path = "nearcore" }
near-crypto = { path = "core/crypto" }
near-dyn-configs = { path = "core/dyn-configs" }
near-epoch-manager = { path = "chain/epoch-manager" }
near-epoch-sync-tool = { path = "tools/epoch-sync"}
near-epoch-sync-tool = { path = "tools/epoch-sync" }
near-flat-storage = { path = "tools/flat-storage" }
near-fork-network = { path = "tools/fork-network" }
near-fmt = { path = "utils/fmt" }
Expand All @@ -211,7 +226,9 @@ near-indexer-primitives = { path = "chain/indexer-primitives" }
near-jsonrpc = { path = "chain/jsonrpc" }
near-jsonrpc-adversarial-primitives = { path = "chain/jsonrpc-adversarial-primitives" }
near-jsonrpc-client = { path = "chain/jsonrpc/client" }
near-jsonrpc-primitives = { path = "chain/jsonrpc-primitives", features = ["full"] }
near-jsonrpc-primitives = { path = "chain/jsonrpc-primitives", features = [
"full",
] }
near-jsonrpc-tests = { path = "chain/jsonrpc/jsonrpc-tests" }
near-mainnet-res = { path = "utils/mainnet-res" }
near-mirror = { path = "tools/mirror" }
Expand Down Expand Up @@ -289,7 +306,13 @@ ripemd = "0.1.1"
rkyv = "0.7.31"
rlimit = "0.7"
rlp = "0.5.2"
rocksdb = { version = "0.21.0", default-features = false, features = ["snappy", "lz4", "zstd", "zlib", "jemalloc"] }
rocksdb = { version = "0.21.0", default-features = false, features = [
"snappy",
"lz4",
"zstd",
"zlib",
"jemalloc",
] }
runtime-tester = { path = "test-utils/runtime-tester" }
rusqlite = { version = "0.29.0", features = ["bundled", "chrono", "functions"] }
rustc-demangle = "0.1"
Expand Down Expand Up @@ -339,27 +362,44 @@ tracing = { version = "0.1.36", features = ["std"] }
tracing-appender = "0.2.2"
tracing-opentelemetry = "0.17.0"
tracing-span-tree = "0.1"
tracing-subscriber = { version = "0.3.15", features = ["env-filter", "fmt", "registry", "std"] }
tracing-subscriber = { version = "0.3.15", features = [
"env-filter",
"fmt",
"registry",
"std",
] }
trybuild = "1.0.11"
turn = "0.6"
validator = "0.12"
wasm-encoder = "0.27.0"
wasmer-compiler = { package = "wasmer-compiler-near", version = "=2.4.1" }
wasmer-compiler-singlepass = { package = "wasmer-compiler-singlepass-near", version = "=2.4.1" }
wasmer-engine = { package = "wasmer-engine-near", version = "=2.4.1" }
wasmer-engine-universal = { package = "wasmer-engine-universal-near", version = "=2.4.1", features = ["compiler"] }
wasmer-runtime = { version = "0.18.0", package = "wasmer-runtime-near", features = ["default-backend-singlepass"], default-features = false }
wasmer-engine-universal = { package = "wasmer-engine-universal-near", version = "=2.4.1", features = [
"compiler",
] }
wasmer-runtime = { version = "0.18.0", package = "wasmer-runtime-near", features = [
"default-backend-singlepass",
], default-features = false }
wasmer-runtime-core = { version = "0.18.2", package = "wasmer-runtime-core-near" }
wasmer-types = { package = "wasmer-types-near", version = "=2.4.1" }
wasmer-vm = { package = "wasmer-vm-near", version = "=2.4.1" }
wasmparser = "0.78" # TODO: unify at least the versions of wasmparser we have in our codebase
wasmprinter = "0.2"
wasm-smith = "0.10"
wasmtime = { version = "14.0.4", default-features = false, features = ["cranelift"] }
wasmtime = { version = "14.0.4", default-features = false, features = [
"cranelift",
] }
wast = "40.0"
wat = "1.0.40"
webrtc-util = "0.7"
winapi = { version = "0.3", features = ["winbase", "memoryapi", "errhandlingapi", "winnt", "impl-default"] }
winapi = { version = "0.3", features = [
"winbase",
"memoryapi",
"errhandlingapi",
"winnt",
"impl-default",
] }
xshell = "0.2.1"
xz2 = "0.1.6"
yansi = "0.5.1"
Expand Down
8 changes: 8 additions & 0 deletions chain/indexer/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## 1.38.x

* Make `build_streamer_message` public to allow custom indexer to reuse this function (e.g. build an indexer that streams optimistic block finalities, indexer that streams only blocks satisfying some condition, etc.)
* Add local cache for delayed local receipts to avoid fetching blocks from the node everytime we need to find a delayed local receipt
* Add metric to watch how far back in the history we went to find a delayed local receipt when cache didn't work
* Remove unnecessary `#[async_recursion]` from `build_streamer_message` function
* Clean up some useless debug logs and add more useful ones

## 1.32.x

* Add `nightly` feature to NEAR Indexer Framework to respect this feature for `nearcore` lib (requried for `betanet`)
Expand Down
2 changes: 1 addition & 1 deletion chain/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ workspace = true
[dependencies]
actix.workspace = true
anyhow.workspace = true
async-recursion.workspace = true
futures.workspace = true
once_cell.workspace = true
rocksdb.workspace = true
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tracing.workspace = true
lazy_static.workspace = true

nearcore.workspace = true
near-client.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion chain/indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ pub use near_indexer_primitives::{
StreamerMessage,
};

mod streamer;
pub use streamer::build_streamer_message;

mod streamer;

pub const INDEXER: &str = "indexer";

/// Config wrapper to simplify signature and usage of `nearcore::init_configs`
Expand Down
11 changes: 11 additions & 0 deletions chain/indexer/src/streamer/fetchers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use super::INDEXER;
pub(crate) async fn fetch_status(
client: &Addr<near_client::ClientActor>,
) -> Result<near_primitives::views::StatusResponse, FailedToFetchData> {
tracing::debug!(target: INDEXER, "Fetching status");
client
.send(near_client::Status { is_health_check: false, detailed: false }.with_span_context())
.await?
Expand All @@ -28,6 +29,7 @@ pub(crate) async fn fetch_status(
pub(crate) async fn fetch_latest_block(
client: &Addr<near_client::ViewClientActor>,
) -> Result<views::BlockView, FailedToFetchData> {
tracing::debug!(target: INDEXER, "Fetching latest block");
client
.send(
near_client::GetBlock(near_primitives::types::BlockReference::Finality(
Expand All @@ -44,6 +46,7 @@ pub(crate) async fn fetch_block_by_height(
client: &Addr<near_client::ViewClientActor>,
height: u64,
) -> Result<views::BlockView, FailedToFetchData> {
tracing::debug!(target: INDEXER, "Fetching block by height: {}", height);
client
.send(
near_client::GetBlock(near_primitives::types::BlockId::Height(height).into())
Expand All @@ -58,6 +61,7 @@ pub(crate) async fn fetch_block(
client: &Addr<near_client::ViewClientActor>,
hash: CryptoHash,
) -> Result<views::BlockView, FailedToFetchData> {
tracing::debug!(target: INDEXER, "Fetching block by hash: {}", hash);
client
.send(
near_client::GetBlock(near_primitives::types::BlockId::Hash(hash).into())
Expand All @@ -72,6 +76,7 @@ pub(crate) async fn fetch_state_changes(
block_hash: CryptoHash,
epoch_id: near_primitives::types::EpochId,
) -> Result<HashMap<near_primitives::types::ShardId, views::StateChangesView>, FailedToFetchData> {
tracing::debug!(target: INDEXER, "Fetching state changes for block: {}, epoch_id: {:?}", block_hash, epoch_id);
client
.send(
near_client::GetStateChangesWithCauseInBlockForTrackedShards { block_hash, epoch_id }
Expand All @@ -90,6 +95,7 @@ pub(crate) async fn fetch_outcomes(
HashMap<near_primitives::types::ShardId, Vec<IndexerExecutionOutcomeWithOptionalReceipt>>,
FailedToFetchData,
> {
tracing::debug!(target: INDEXER, "Fetching outcomes for block: {}", block_hash);
let outcomes = client
.send(near_client::GetExecutionOutcomesForBlock { block_hash }.with_span_context())
.await?
Expand All @@ -100,6 +106,7 @@ pub(crate) async fn fetch_outcomes(
Vec<IndexerExecutionOutcomeWithOptionalReceipt>,
> = HashMap::new();
for (shard_id, shard_outcomes) in outcomes {
tracing::debug!(target: INDEXER, "Fetching outcomes with receipts for shard: {}", shard_id);
let mut outcomes_with_receipts: Vec<IndexerExecutionOutcomeWithOptionalReceipt> = vec![];
for outcome in shard_outcomes {
let receipt = match fetch_receipt_by_id(&client, outcome.id).await {
Expand Down Expand Up @@ -129,6 +136,7 @@ async fn fetch_receipt_by_id(
client: &Addr<near_client::ViewClientActor>,
receipt_id: CryptoHash,
) -> Result<Option<views::ReceiptView>, FailedToFetchData> {
tracing::debug!(target: INDEXER, "Fetching receipt by id: {}", receipt_id);
client
.send(near_client::GetReceipt { receipt_id }.with_span_context())
.await?
Expand All @@ -141,6 +149,7 @@ async fn fetch_single_chunk(
client: &Addr<near_client::ViewClientActor>,
chunk_hash: near_primitives::hash::CryptoHash,
) -> Result<views::ChunkView, FailedToFetchData> {
tracing::debug!(target: INDEXER, "Fetching chunk by hash: {}", chunk_hash);
client
.send(near_client::GetChunk::ChunkHash(chunk_hash.into()).with_span_context())
.await?
Expand All @@ -153,6 +162,7 @@ pub(crate) async fn fetch_block_chunks(
client: &Addr<near_client::ViewClientActor>,
block: &views::BlockView,
) -> Result<Vec<views::ChunkView>, FailedToFetchData> {
tracing::debug!(target: INDEXER, "Fetching chunks for block #{}", block.header.height);
let mut futures: futures::stream::FuturesUnordered<_> = block
.chunks
.iter()
Expand All @@ -170,6 +180,7 @@ pub(crate) async fn fetch_protocol_config(
client: &Addr<near_client::ViewClientActor>,
block_hash: near_primitives::hash::CryptoHash,
) -> Result<near_chain_configs::ProtocolConfigView, FailedToFetchData> {
tracing::debug!(target: INDEXER, "Fetching protocol config for block: {}", block_hash);
Ok(client
.send(
near_client::GetProtocolConfig(types::BlockReference::from(types::BlockId::Hash(
Expand Down
8 changes: 8 additions & 0 deletions chain/indexer/src/streamer/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,11 @@ pub(crate) static BUILD_STREAMER_MESSAGE_TIME: Lazy<Histogram> = Lazy::new(|| {
)
.unwrap()
});

pub(crate) static LOCAL_RECEIPT_LOOKUP_IN_HISTORY_BLOCKS_BACK: Lazy<IntGauge> = Lazy::new(|| {
try_create_int_gauge(
"near_indexer_local_receipt_lookup_in_history_blocks_back",
"Time taken to lookup a receipt in history blocks back",
)
.unwrap()
});
Loading

0 comments on commit 60c8ad8

Please sign in to comment.