diff --git a/Cargo.lock b/Cargo.lock index 279b1ac2716..98a094d62fd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -440,17 +440,6 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" -[[package]] -name = "async-recursion" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e97ce7de6cf12de5d7226c73f5ba9811622f4db3a5b91b55c53e987e5f91cba" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.32", -] - [[package]] name = "async-stream" version = "0.3.5" @@ -3829,8 +3818,8 @@ version = "0.0.0" dependencies = [ "actix", "anyhow", - "async-recursion", "futures", + "lazy_static", "near-chain-configs", "near-client", "near-crypto", diff --git a/Cargo.toml b/Cargo.toml index 12b6001f593..8984366189e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.0.0" # managed by cargo-workspaces, see below +version = "0.0.0" # managed by cargo-workspaces, see below authors = ["Near Inc "] edition = "2021" rust-version = "1.76.0" @@ -120,7 +120,6 @@ anyhow = "1.0.62" arbitrary = { version = "1.2.3", features = ["derive"] } arc-swap = "1.5" assert_matches = "1.5.0" -async-recursion = "1.0.4" async-trait = "0.1.58" awc = { version = "3", features = ["openssl"] } backtrace = "0.3" @@ -131,7 +130,9 @@ blake2 = "0.9.1" bn = { package = "zeropool-bn", version = "0.5.11", default-features = false } # TODO: remove this override when https://github.com/camshaft/bolero/issues/196 is fixed upstream # Currently the changes here are: https://github.com/camshaft/bolero/compare/master...Ekleog-NEAR:bolero:reduce-list-tests-run -bolero = { version = "0.10.0", git = "https://github.com/Ekleog-NEAR/bolero", rev = "c37993bd70dcf5b1778b03daf29f686225e9a504", features = ["arbitrary"] } +bolero = { version = "0.10.0", git = "https://github.com/Ekleog-NEAR/bolero", rev = "c37993bd70dcf5b1778b03daf29f686225e9a504", features = [ + "arbitrary", +] } borsh = { version = "1.0.0", features = ["derive", "rc"] } bs58 = "0.4" bytes = "1" @@ -143,18 +144,28 @@ chrono = { version = "0.4.19", features = ["serde"] } clap = { version = "4.2.0", features = ["derive", "env", "string"] } cloud-storage = "0.11.1" cpu-time = "1.0" -criterion = { version = "0.5.1", default_features = false, features = ["html_reports", "cargo_bench_support"] } +criterion = { version = "0.5.1", default_features = false, features = [ + "html_reports", + "cargo_bench_support", +] } crossbeam = "0.8" crossbeam-channel = "0.5.8" csv = "1.2.1" -curve25519-dalek = { version = "4.1.1", default-features = false, features = ["alloc", "precomputed-tables", "rand_core"] } +curve25519-dalek = { version = "4.1.1", default-features = false, features = [ + "alloc", + "precomputed-tables", + "rand_core", +] } derive-enum-from-into = "0.1.1" derive_more = "0.99.9" dirs = "4" dynasm = "2.0" dynasmrt = "2.0" easy-ext = "0.2" -ed25519-dalek = { version = "2.1.0", default-features = false, features = ["hazmat", "rand_core"] } +ed25519-dalek = { version = "2.1.0", default-features = false, features = [ + "hazmat", + "rand_core", +] } elastic-array = "0.11" enum-map = "2.1.0" enumset = "1.0" @@ -185,10 +196,14 @@ log = "0.4" lru = "0.7.2" memoffset = "0.8" more-asserts = "0.2" -near-account-id = { version = "1.0.0-alpha.4", features = ["internal_unstable", "serde", "borsh"] } +near-account-id = { version = "1.0.0-alpha.4", features = [ + "internal_unstable", + "serde", + "borsh", +] } near-actix-test-utils = { path = "test-utils/actix-test-utils" } near-amend-genesis = { path = "tools/amend-genesis" } -near-database-tool= { path = "tools/database" } +near-database-tool = { path = "tools/database" } near-async = { path = "core/async" } near-async-derive = { path = "core/async-derive" } near-cache = { path = "utils/near-cache" } @@ -205,7 +220,7 @@ nearcore = { path = "nearcore" } near-crypto = { path = "core/crypto" } near-dyn-configs = { path = "core/dyn-configs" } near-epoch-manager = { path = "chain/epoch-manager" } -near-epoch-sync-tool = { path = "tools/epoch-sync"} +near-epoch-sync-tool = { path = "tools/epoch-sync" } near-flat-storage = { path = "tools/flat-storage" } near-fork-network = { path = "tools/fork-network" } near-fmt = { path = "utils/fmt" } @@ -214,7 +229,9 @@ near-indexer-primitives = { path = "chain/indexer-primitives" } near-jsonrpc = { path = "chain/jsonrpc" } near-jsonrpc-adversarial-primitives = { path = "chain/jsonrpc-adversarial-primitives" } near-jsonrpc-client = { path = "chain/jsonrpc/client" } -near-jsonrpc-primitives = { path = "chain/jsonrpc-primitives", features = ["full"] } +near-jsonrpc-primitives = { path = "chain/jsonrpc-primitives", features = [ + "full", +] } near-jsonrpc-tests = { path = "chain/jsonrpc/jsonrpc-tests" } near-mainnet-res = { path = "utils/mainnet-res" } near-mirror = { path = "tools/mirror" } @@ -290,7 +307,13 @@ ripemd = "0.1.1" rkyv = "0.7.31" rlimit = "0.7" rlp = "0.5.2" -rocksdb = { version = "0.21.0", default-features = false, features = ["snappy", "lz4", "zstd", "zlib", "jemalloc"] } +rocksdb = { version = "0.21.0", default-features = false, features = [ + "snappy", + "lz4", + "zstd", + "zlib", + "jemalloc", +] } runtime-tester = { path = "test-utils/runtime-tester" } rusqlite = { version = "0.29.0", features = ["bundled", "chrono", "functions"] } rustc-demangle = "0.1" @@ -338,26 +361,43 @@ tracing = { version = "0.1.36", features = ["std"] } tracing-appender = "0.2.2" tracing-opentelemetry = "0.17.0" tracing-span-tree = "0.1" -tracing-subscriber = { version = "0.3.15", features = ["env-filter", "fmt", "registry", "std"] } +tracing-subscriber = { version = "0.3.15", features = [ + "env-filter", + "fmt", + "registry", + "std", +] } trybuild = "1.0.11" turn = "0.6" wasm-encoder = "0.27.0" wasmer-compiler = { package = "wasmer-compiler-near", version = "=2.4.1" } wasmer-compiler-singlepass = { package = "wasmer-compiler-singlepass-near", version = "=2.4.1" } wasmer-engine = { package = "wasmer-engine-near", version = "=2.4.1" } -wasmer-engine-universal = { package = "wasmer-engine-universal-near", version = "=2.4.1", features = ["compiler"] } -wasmer-runtime = { version = "0.18.0", package = "wasmer-runtime-near", features = ["default-backend-singlepass"], default-features = false } +wasmer-engine-universal = { package = "wasmer-engine-universal-near", version = "=2.4.1", features = [ + "compiler", +] } +wasmer-runtime = { version = "0.18.0", package = "wasmer-runtime-near", features = [ + "default-backend-singlepass", +], default-features = false } wasmer-runtime-core = { version = "0.18.2", package = "wasmer-runtime-core-near" } wasmer-types = { package = "wasmer-types-near", version = "=2.4.1" } wasmer-vm = { package = "wasmer-vm-near", version = "=2.4.1" } wasmparser = "0.78" # TODO: unify at least the versions of wasmparser we have in our codebase wasmprinter = "0.2" wasm-smith = "0.10" -wasmtime = { version = "14.0.4", default-features = false, features = ["cranelift"] } +wasmtime = { version = "14.0.4", default-features = false, features = [ + "cranelift", +] } wast = "40.0" wat = "1.0.40" webrtc-util = "0.7" -winapi = { version = "0.3", features = ["winbase", "memoryapi", "errhandlingapi", "winnt", "impl-default"] } +winapi = { version = "0.3", features = [ + "winbase", + "memoryapi", + "errhandlingapi", + "winnt", + "impl-default", +] } xshell = "0.2.1" xz2 = "0.1.6" yansi = "0.5.1" diff --git a/chain/indexer/CHANGELOG.md b/chain/indexer/CHANGELOG.md index 124275e2f64..9ca6fca53f1 100644 --- a/chain/indexer/CHANGELOG.md +++ b/chain/indexer/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## 1.38.x + +* Make `build_streamer_message` public to allow custom indexer to reuse this function (e.g. build an indexer that streams optimistic block finalities, indexer that streams only blocks satisfying some condition, etc.) +* Add local cache for delayed local receipts to avoid fetching blocks from the node everytime we need to find a delayed local receipt + * Add metric to watch how far back in the history we went to find a delayed local receipt when cache didn't work +* Remove unnecessary `#[async_recursion]` from `build_streamer_message` function +* Clean up some useless debug logs and add more useful ones + ## 1.32.x * Add `nightly` feature to NEAR Indexer Framework to respect this feature for `nearcore` lib (requried for `betanet`) diff --git a/chain/indexer/Cargo.toml b/chain/indexer/Cargo.toml index 4338b9e7fa4..fc5cdbc109b 100644 --- a/chain/indexer/Cargo.toml +++ b/chain/indexer/Cargo.toml @@ -14,7 +14,6 @@ workspace = true [dependencies] actix.workspace = true anyhow.workspace = true -async-recursion.workspace = true futures.workspace = true once_cell.workspace = true rocksdb.workspace = true @@ -22,6 +21,7 @@ serde.workspace = true serde_json.workspace = true tokio.workspace = true tracing.workspace = true +lazy_static.workspace = true nearcore.workspace = true near-client.workspace = true diff --git a/chain/indexer/src/lib.rs b/chain/indexer/src/lib.rs index 7b26ca091df..75cb18e13c8 100644 --- a/chain/indexer/src/lib.rs +++ b/chain/indexer/src/lib.rs @@ -14,9 +14,10 @@ pub use near_indexer_primitives::{ StreamerMessage, }; -mod streamer; pub use streamer::build_streamer_message; +mod streamer; + pub const INDEXER: &str = "indexer"; /// Config wrapper to simplify signature and usage of `nearcore::init_configs` diff --git a/chain/indexer/src/streamer/fetchers.rs b/chain/indexer/src/streamer/fetchers.rs index 31a40d7c908..fda4bbc283e 100644 --- a/chain/indexer/src/streamer/fetchers.rs +++ b/chain/indexer/src/streamer/fetchers.rs @@ -17,6 +17,7 @@ use super::INDEXER; pub(crate) async fn fetch_status( client: &Addr, ) -> Result { + tracing::debug!(target: INDEXER, "Fetching status"); client .send(near_client::Status { is_health_check: false, detailed: false }.with_span_context()) .await? @@ -28,6 +29,7 @@ pub(crate) async fn fetch_status( pub(crate) async fn fetch_latest_block( client: &Addr, ) -> Result { + tracing::debug!(target: INDEXER, "Fetching latest block"); client .send( near_client::GetBlock(near_primitives::types::BlockReference::Finality( @@ -44,6 +46,7 @@ pub(crate) async fn fetch_block_by_height( client: &Addr, height: u64, ) -> Result { + tracing::debug!(target: INDEXER, "Fetching block by height: {}", height); client .send( near_client::GetBlock(near_primitives::types::BlockId::Height(height).into()) @@ -58,6 +61,7 @@ pub(crate) async fn fetch_block( client: &Addr, hash: CryptoHash, ) -> Result { + tracing::debug!(target: INDEXER, "Fetching block by hash: {}", hash); client .send( near_client::GetBlock(near_primitives::types::BlockId::Hash(hash).into()) @@ -72,6 +76,7 @@ pub(crate) async fn fetch_state_changes( block_hash: CryptoHash, epoch_id: near_primitives::types::EpochId, ) -> Result, FailedToFetchData> { + tracing::debug!(target: INDEXER, "Fetching state changes for block: {}, epoch_id: {:?}", block_hash, epoch_id); client .send( near_client::GetStateChangesWithCauseInBlockForTrackedShards { block_hash, epoch_id } @@ -90,6 +95,7 @@ pub(crate) async fn fetch_outcomes( HashMap>, FailedToFetchData, > { + tracing::debug!(target: INDEXER, "Fetching outcomes for block: {}", block_hash); let outcomes = client .send(near_client::GetExecutionOutcomesForBlock { block_hash }.with_span_context()) .await? @@ -100,6 +106,7 @@ pub(crate) async fn fetch_outcomes( Vec, > = HashMap::new(); for (shard_id, shard_outcomes) in outcomes { + tracing::debug!(target: INDEXER, "Fetching outcomes with receipts for shard: {}", shard_id); let mut outcomes_with_receipts: Vec = vec![]; for outcome in shard_outcomes { let receipt = match fetch_receipt_by_id(&client, outcome.id).await { @@ -129,6 +136,7 @@ async fn fetch_receipt_by_id( client: &Addr, receipt_id: CryptoHash, ) -> Result, FailedToFetchData> { + tracing::debug!(target: INDEXER, "Fetching receipt by id: {}", receipt_id); client .send(near_client::GetReceipt { receipt_id }.with_span_context()) .await? @@ -141,6 +149,7 @@ async fn fetch_single_chunk( client: &Addr, chunk_hash: near_primitives::hash::CryptoHash, ) -> Result { + tracing::debug!(target: INDEXER, "Fetching chunk by hash: {}", chunk_hash); client .send(near_client::GetChunk::ChunkHash(chunk_hash.into()).with_span_context()) .await? @@ -153,6 +162,7 @@ pub(crate) async fn fetch_block_chunks( client: &Addr, block: &views::BlockView, ) -> Result, FailedToFetchData> { + tracing::debug!(target: INDEXER, "Fetching chunks for block #{}", block.header.height); let mut futures: futures::stream::FuturesUnordered<_> = block .chunks .iter() @@ -170,6 +180,7 @@ pub(crate) async fn fetch_protocol_config( client: &Addr, block_hash: near_primitives::hash::CryptoHash, ) -> Result { + tracing::debug!(target: INDEXER, "Fetching protocol config for block: {}", block_hash); Ok(client .send( near_client::GetProtocolConfig(types::BlockReference::from(types::BlockId::Hash( diff --git a/chain/indexer/src/streamer/metrics.rs b/chain/indexer/src/streamer/metrics.rs index 42fcc86f95e..55233ad2344 100644 --- a/chain/indexer/src/streamer/metrics.rs +++ b/chain/indexer/src/streamer/metrics.rs @@ -43,3 +43,11 @@ pub(crate) static BUILD_STREAMER_MESSAGE_TIME: Lazy = Lazy::new(|| { ) .unwrap() }); + +pub(crate) static LOCAL_RECEIPT_LOOKUP_IN_HISTORY_BLOCKS_BACK: Lazy = Lazy::new(|| { + try_create_int_gauge( + "near_indexer_local_receipt_lookup_in_history_blocks_back", + "Time taken to lookup a receipt in history blocks back", + ) + .unwrap() +}); diff --git a/chain/indexer/src/streamer/mod.rs b/chain/indexer/src/streamer/mod.rs index df598d20d0f..388639c8ed5 100644 --- a/chain/indexer/src/streamer/mod.rs +++ b/chain/indexer/src/streamer/mod.rs @@ -1,14 +1,14 @@ -use self::errors::FailedToFetchData; -use self::fetchers::{ - fetch_block, fetch_block_by_height, fetch_block_chunks, fetch_latest_block, fetch_outcomes, - fetch_state_changes, fetch_status, -}; -use self::utils::convert_transactions_sir_into_local_receipts; -use crate::streamer::fetchers::fetch_protocol_config; -use crate::INDEXER; -use crate::{AwaitForNodeSyncedEnum, IndexerConfig}; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use std::time::Duration; + use actix::Addr; -use async_recursion::async_recursion; +use lazy_static::lazy_static; +use rocksdb::DB; +use tokio::sync::mpsc; +use tokio::time; +use tracing::{debug, error, info}; + use near_indexer_primitives::{ IndexerChunkView, IndexerExecutionOutcomeWithOptionalReceipt, IndexerExecutionOutcomeWithReceipt, IndexerShard, IndexerTransactionWithOutcome, @@ -17,17 +17,27 @@ use near_indexer_primitives::{ use near_parameters::RuntimeConfig; use near_primitives::hash::CryptoHash; use near_primitives::views; -use rocksdb::DB; -use std::time::Duration; -use tokio::sync::mpsc; -use tokio::time; -use tracing::{debug, info}; + +use self::errors::FailedToFetchData; +use self::fetchers::{ + fetch_block, fetch_block_by_height, fetch_block_chunks, fetch_latest_block, fetch_outcomes, + fetch_state_changes, fetch_status, +}; +use self::utils::convert_transactions_sir_into_local_receipts; +use crate::streamer::fetchers::fetch_protocol_config; +use crate::INDEXER; +use crate::{AwaitForNodeSyncedEnum, IndexerConfig}; mod errors; mod fetchers; mod metrics; mod utils; +lazy_static! { + static ref DELAYED_LOCAL_RECEIPTS_CACHE: Arc>> = + Arc::new(RwLock::new(HashMap::new())); +} + const INTERVAL: Duration = Duration::from_millis(500); /// Blocks #47317863 and #47317864 with restored receipts. @@ -63,7 +73,6 @@ fn test_problematic_blocks_hash() { /// This function supposed to return the entire `StreamerMessage`. /// It fetches the block and all related parts (chunks, outcomes, state changes etc.) /// and returns everything together in one struct -#[async_recursion] pub async fn build_streamer_message( client: &Addr, block: views::BlockView, @@ -138,6 +147,16 @@ pub async fn build_streamer_message( { debug_assert!(outcome.receipt.is_none()); outcome.receipt = Some(receipt.clone()); + } else { + if let Ok(mut cache) = DELAYED_LOCAL_RECEIPTS_CACHE.write() { + cache.insert(receipt.receipt_id, receipt.clone()); + } else { + tracing::warn!( + target: INDEXER, + "Unable to insert receipt {} into DELAYED_LOCAL_RECEIPTS_CACHE", + receipt.receipt_id, + ); + } } } @@ -149,34 +168,44 @@ pub async fn build_streamer_message( let receipt = if let Some(receipt) = receipt { receipt } else { - // Receipt might be missing only in case of delayed local receipt - // that appeared in some of the previous blocks - // we will be iterating over previous blocks until we found the receipt - let mut prev_block_tried = 0u16; - let mut prev_block_hash = block.header.prev_hash; - 'find_local_receipt: loop { - if prev_block_tried > 1000 { - panic!("Failed to find local receipt in 1000 prev blocks"); + // Attempt to extract the receipt or decide to fetch it based on cache access success + let maybe_receipt = { + match DELAYED_LOCAL_RECEIPTS_CACHE.write() { + Ok(mut cache) => { + // Lock acquired, attempt to remove the receipt + cache.remove(&execution_outcome.id) + } + Err(_) => { + // Failed to acquire lock, log this event and decide to fetch the receipt + tracing::warn!( + target: INDEXER, + "Failed to acquire DELAYED_LOCAL_RECEIPTS_CACHE lock, starting to look for receipt {} in up to 1000 blocks back in time", + execution_outcome.id, + ); + None // Indicate that receipt needs to be fetched + } } - let prev_block = match fetch_block(&client, prev_block_hash).await { - Ok(block) => block, - Err(err) => panic!("Unable to get previous block: {:?}", err), - }; - - prev_block_hash = prev_block.header.prev_hash; - - if let Some(receipt) = find_local_receipt_by_id_in_block( + }; + + // Depending on whether you got the receipt from the cache, proceed + if let Some(receipt) = maybe_receipt { + // Receipt was found in cache + receipt + } else { + // Receipt not found in cache or failed to acquire lock, proceed to look it up + // in the history of blocks (up to 1000 blocks back) + tracing::warn!( + target: INDEXER, + "Receipt {} is missing in block and in DELAYED_LOCAL_RECEIPTS_CACHE, looking for it in up to 1000 blocks back in time", + execution_outcome.id, + ); + lookup_delayed_local_receipt_in_previous_blocks( &client, &runtime_config, - prev_block, + block.clone(), execution_outcome.id, ) .await? - { - break 'find_local_receipt receipt; - } - - prev_block_tried += 1; } }; receipt_execution_outcomes @@ -233,6 +262,56 @@ pub async fn build_streamer_message( Ok(StreamerMessage { block, shards: indexer_shards }) } +// Receipt might be missing only in case of delayed local receipt +// that appeared in some of the previous blocks +// we will be iterating over previous blocks until we found the receipt +// or panic if we didn't find it in 1000 blocks +async fn lookup_delayed_local_receipt_in_previous_blocks( + client: &Addr, + runtime_config: &RuntimeConfig, + block: views::BlockView, + receipt_id: CryptoHash, +) -> Result { + let mut prev_block_tried = 0u16; + let mut prev_block_hash = block.header.prev_hash; + 'find_local_receipt: loop { + if prev_block_tried > 1000 { + panic!("Failed to find local receipt in 1000 prev blocks"); + } + // Log a warning every 100 blocks + if prev_block_tried % 100 == 0 { + tracing::warn!( + target: INDEXER, + "Still looking for receipt {} in previous blocks. {} blocks back already", + receipt_id, + prev_block_tried, + ); + } + let prev_block = match fetch_block(&client, prev_block_hash).await { + Ok(block) => block, + Err(err) => panic!("Unable to get previous block: {:?}", err), + }; + + prev_block_hash = prev_block.header.prev_hash; + + if let Some(receipt) = + find_local_receipt_by_id_in_block(&client, &runtime_config, prev_block, receipt_id) + .await? + { + tracing::debug!( + target: INDEXER, + "Found receipt {} in previous block {}", + receipt_id, + prev_block_tried, + ); + metrics::LOCAL_RECEIPT_LOOKUP_IN_HISTORY_BLOCKS_BACK.set(prev_block_tried as i64); + break 'find_local_receipt Ok(receipt); + } + + prev_block_tried += 1; + } +} + /// Function that tries to find specific local receipt by it's ID and returns it /// otherwise returns None async fn find_local_receipt_by_id_in_block( @@ -296,8 +375,11 @@ pub(crate) async fn start( .path() .join("indexer"); - // TODO: implement proper error handling - let db = DB::open_default(indexer_db_path).unwrap(); + let db = match DB::open_default(indexer_db_path) { + Ok(db) => db, + Err(err) => panic!("Unable to open indexer db: {:?}", err), + }; + let mut last_synced_block_height: Option = None; 'main: loop { @@ -353,9 +435,9 @@ pub(crate) async fn start( match response { Ok(streamer_message) => { - debug!(target: INDEXER, "{:#?}", &streamer_message); + debug!(target: INDEXER, "Sending streamer message for block #{} to the listener", streamer_message.block.header.height); if blocks_sink.send(streamer_message).await.is_err() { - info!( + error!( target: INDEXER, "Unable to send StreamerMessage to listener, listener doesn't listen. terminating..." );