Skip to content

Commit

Permalink
perf(rpc): optimistically retrieve block if near the tip on `eth_getL…
Browse files Browse the repository at this point in the history
…ogs` (#11582)
  • Loading branch information
joshieDo authored Oct 9, 2024
1 parent fb8bd77 commit b787d9e
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 61 deletions.
67 changes: 43 additions & 24 deletions crates/rpc/rpc-eth-types/src/logs_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use alloy_primitives::TxHash;
use alloy_rpc_types::{FilteredParams, Log};
use reth_chainspec::ChainInfo;
use reth_errors::ProviderError;
use reth_primitives::{BlockNumHash, Receipt};
use reth_primitives::{BlockNumHash, Receipt, SealedBlock};
use reth_storage_api::BlockReader;

/// Returns all matching of a block's receipts when the transaction hashes are known.
Expand Down Expand Up @@ -45,11 +45,20 @@ where
all_logs
}

/// Helper enum to fetch a transaction either from a block or from the provider.
#[derive(Debug)]
pub enum ProviderOrBlock<'a, P: BlockReader> {
/// Provider
Provider(&'a P),
/// [`SealedBlock`]
Block(SealedBlock),
}

/// Appends all matching logs of a block's receipts.
/// If the log matches, look up the corresponding transaction hash.
pub fn append_matching_block_logs(
pub fn append_matching_block_logs<P: BlockReader>(
all_logs: &mut Vec<Log>,
provider: impl BlockReader,
provider_or_block: ProviderOrBlock<'_, P>,
filter: &FilteredParams,
block_num_hash: BlockNumHash,
receipts: &[Receipt],
Expand All @@ -60,8 +69,8 @@ pub fn append_matching_block_logs(
let mut log_index: u64 = 0;

// Lazy loaded number of the first transaction in the block.
// This is useful for blocks with multiple matching logs because it prevents
// re-querying the block body indices.
// This is useful for blocks with multiple matching logs because it
// prevents re-querying the block body indices.
let mut loaded_first_tx_num = None;

// Iterate over receipts and append matching logs.
Expand All @@ -71,27 +80,37 @@ pub fn append_matching_block_logs(

for log in &receipt.logs {
if log_matches_filter(block_num_hash, log, filter) {
let first_tx_num = match loaded_first_tx_num {
Some(num) => num,
None => {
let block_body_indices =
provider.block_body_indices(block_num_hash.number)?.ok_or(
ProviderError::BlockBodyIndicesNotFound(block_num_hash.number),
)?;
loaded_first_tx_num = Some(block_body_indices.first_tx_num);
block_body_indices.first_tx_num
}
};

// if this is the first match in the receipt's logs, look up the transaction hash
if transaction_hash.is_none() {
// This is safe because Transactions and Receipts have the same keys.
let transaction_id = first_tx_num + receipt_idx as u64;
let transaction = provider
.transaction_by_id(transaction_id)?
.ok_or_else(|| ProviderError::TransactionNotFound(transaction_id.into()))?;

transaction_hash = Some(transaction.hash());
transaction_hash = match &provider_or_block {
ProviderOrBlock::Block(block) => {
block.body.transactions.get(receipt_idx).map(|t| t.hash())
}
ProviderOrBlock::Provider(provider) => {
let first_tx_num = match loaded_first_tx_num {
Some(num) => num,
None => {
let block_body_indices = provider
.block_body_indices(block_num_hash.number)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(
block_num_hash.number,
))?;
loaded_first_tx_num = Some(block_body_indices.first_tx_num);
block_body_indices.first_tx_num
}
};

// This is safe because Transactions and Receipts have the same
// keys.
let transaction_id = first_tx_num + receipt_idx as u64;
let transaction =
provider.transaction_by_id(transaction_id)?.ok_or_else(|| {
ProviderError::TransactionNotFound(transaction_id.into())
})?;

Some(transaction.hash())
}
};
}

let log = Log {
Expand Down
81 changes: 44 additions & 37 deletions crates/rpc/rpc/src/eth/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use async_trait::async_trait;
use jsonrpsee::{core::RpcResult, server::IdProvider};
use reth_chainspec::ChainInfo;
use reth_node_api::EthApiTypes;
use reth_primitives::TransactionSignedEcRecovered;
use reth_primitives::{Receipt, SealedBlock, TransactionSignedEcRecovered};
use reth_provider::{BlockIdReader, BlockReader, EvmEnvProvider, ProviderError};
use reth_rpc_eth_api::{EthFilterApiServer, FullEthApiTypes, RpcTransaction, TransactionCompat};
use reth_rpc_eth_types::{
logs_utils::{self, append_matching_block_logs},
logs_utils::{self, append_matching_block_logs, ProviderOrBlock},
EthApiError, EthFilterConfig, EthStateCache, EthSubscriptionIdProvider,
};
use reth_rpc_server_types::{result::rpc_error_with_code, ToRpcResult};
Expand Down Expand Up @@ -376,29 +376,34 @@ where
FilterBlockOption::AtBlockHash(block_hash) => {
// for all matching logs in the block
// get the block header with the hash
let block = self
let header = self
.provider
.header_by_hash_or_number(block_hash.into())?
.ok_or_else(|| ProviderError::HeaderNotFound(block_hash.into()))?;

let block_num_hash = BlockNumHash::new(header.number, block_hash);

// we also need to ensure that the receipts are available and return an error if
// not, in case the block hash been reorged
let receipts = self
.eth_cache
.get_receipts(block_hash)
let (receipts, maybe_block) = self
.receipts_and_maybe_block(
&block_num_hash,
self.provider.chain_info()?.best_number,
)
.await?
.ok_or(EthApiError::HeaderNotFound(block_hash.into()))?;

let mut all_logs = Vec::new();
let filter = FilteredParams::new(Some(filter));
logs_utils::append_matching_block_logs(
append_matching_block_logs(
&mut all_logs,
&self.provider,
&filter,
(block_hash, block.number).into(),
maybe_block
.map(|b| ProviderOrBlock::Block(b))
.unwrap_or_else(|| ProviderOrBlock::Provider(&self.provider)),
&FilteredParams::new(Some(filter)),
block_num_hash,
&receipts,
false,
block.timestamp,
header.timestamp,
)?;

Ok(all_logs)
Expand Down Expand Up @@ -454,7 +459,6 @@ where
chain_info: ChainInfo,
) -> Result<Vec<Log>, EthFilterError> {
trace!(target: "rpc::eth::filter", from=from_block, to=to_block, ?filter, "finding logs in range");
let best_number = chain_info.best_number;

if to_block < from_block {
return Err(EthFilterError::InvalidBlockRangeParams)
Expand All @@ -467,27 +471,6 @@ where
let mut all_logs = Vec::new();
let filter_params = FilteredParams::new(Some(filter.clone()));

if (to_block == best_number) && (from_block == best_number) {
// only one block to check and it's the current best block which we can fetch directly
// Note: In case of a reorg, the best block's hash might have changed, hence we only
// return early of we were able to fetch the best block's receipts
// perf: we're fetching the best block here which is expected to be cached
if let Some((block, receipts)) =
self.eth_cache.get_block_and_receipts(chain_info.best_hash).await?
{
logs_utils::append_matching_block_logs(
&mut all_logs,
&self.provider,
&filter_params,
chain_info.into(),
&receipts,
false,
block.header.timestamp,
)?;
}
return Ok(all_logs)
}

// derive bloom filters from filter input, so we can check headers for matching logs
let address_filter = FilteredParams::address_filter(&filter.address);
let topics_filter = FilteredParams::topics_filter(&filter.topics);
Expand All @@ -514,12 +497,17 @@ where
.ok_or_else(|| ProviderError::HeaderNotFound(header.number.into()))?,
};

if let Some(receipts) = self.eth_cache.get_receipts(block_hash).await? {
let num_hash = BlockNumHash::new(header.number, block_hash);
if let Some((receipts, maybe_block)) =
self.receipts_and_maybe_block(&num_hash, chain_info.best_number).await?
{
append_matching_block_logs(
&mut all_logs,
&self.provider,
maybe_block
.map(|block| ProviderOrBlock::Block(block))
.unwrap_or_else(|| ProviderOrBlock::Provider(&self.provider)),
&filter_params,
BlockNumHash::new(header.number, block_hash),
num_hash,
&receipts,
false,
header.timestamp,
Expand All @@ -540,6 +528,25 @@ where

Ok(all_logs)
}

/// Retrieves receipts and block from cache if near the tip (4 blocks), otherwise only receipts.
async fn receipts_and_maybe_block(
&self,
block_num_hash: &BlockNumHash,
best_number: u64,
) -> Result<Option<(Arc<Vec<Receipt>>, Option<SealedBlock>)>, EthFilterError> {
// The last 4 blocks are most likely cached, so we can just fetch them
let cached_range = best_number.saturating_sub(4)..=best_number;
let receipts_block = if cached_range.contains(&block_num_hash.number) {
self.eth_cache
.get_block_and_receipts(block_num_hash.hash)
.await?
.map(|(b, r)| (r, Some(b)))
} else {
self.eth_cache.get_receipts(block_num_hash.hash).await?.map(|r| (r, None))
};
Ok(receipts_block)
}
}

/// All active filters
Expand Down

0 comments on commit b787d9e

Please sign in to comment.