Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

feat: add paginated logs #1285

Merged
merged 3 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@

### Unreleased

- Stream of paginated logs that load logs in small pages
[1285](https://github.com/gakonst/ethers-rs/pull/1285)
- Load previous logs before subscribing to new logs in case fromBlock is set
[1264](https://github.com/gakonst/ethers-rs/pull/1264)
- Add retries to the pending transaction future
Expand Down
13 changes: 13 additions & 0 deletions ethers-core/src/types/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,19 @@ impl Filter {
self.topics[3] = Some(topic.into());
self
}

pub fn is_paginatable(&self) -> bool {
self.get_from_block().is_some()
}

pub fn get_from_block(&self) -> Option<U64> {
match self.block_option {
FilterBlockOption::AtBlockHash(_hash) => None,
FilterBlockOption::Range { from_block, to_block: _ } => {
from_block.map(|block| block.as_number()).unwrap_or(None)
}
}
}
}

/// Union type for representing a single value or a vector of values inside a filter
Expand Down
12 changes: 12 additions & 0 deletions ethers-providers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ pub use pending_transaction::PendingTransaction;
mod pending_escalator;
pub use pending_escalator::EscalatingPending;

mod log_query;
pub use log_query::LogQuery;

mod stream;
pub use futures_util::StreamExt;
pub use stream::{interval, FilterWatcher, TransactionStream, DEFAULT_POLL_INTERVAL};
Expand Down Expand Up @@ -421,6 +424,15 @@ pub trait Middleware: Sync + Send + Debug {
self.inner().get_logs(filter).await.map_err(FromErr::from)
}

/// Returns a stream of logs are loaded in pages of given page size
fn get_logs_paginated<'a>(
&'a self,
filter: &Filter,
page_size: u64,
) -> LogQuery<'a, Self::Provider> {
self.inner().get_logs_paginated(filter, page_size)
}
Comment on lines +427 to +434
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@prestwich wdyt about unifying this logic in the default get_logs behavior?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's a pretty reasonable way to smooth over inconsistent RPC paging requirements 🤔


async fn new_filter(&self, filter: FilterKind<'_>) -> Result<U256, Self::Error> {
self.inner().new_filter(filter).await.map_err(FromErr::from)
}
Expand Down
130 changes: 130 additions & 0 deletions ethers-providers/src/log_query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use super::{JsonRpcClient, Middleware, PinBoxFut, Provider};
use ethers_core::types::{Filter, Log, U64};
use futures_core::stream::Stream;
use std::{
collections::VecDeque,
pin::Pin,
task::{Context, Poll},
};

pub struct LogQuery<'a, P> {
provider: &'a Provider<P>,
filter: Filter,
from_block: Option<U64>,
page_size: u64,
current_logs: VecDeque<Log>,
last_block: Option<U64>,
state: LogQueryState<'a>,
}

enum LogQueryState<'a> {
Initial,
LoadLastBlock(PinBoxFut<'a, U64>),
LoadLogs(PinBoxFut<'a, Vec<Log>>),
Consume,
}

impl<'a, P> LogQuery<'a, P>
where
P: JsonRpcClient,
{
pub fn new(provider: &'a Provider<P>, filter: &Filter) -> Self {
Self {
provider,
filter: filter.clone(),
from_block: filter.get_from_block(),
page_size: 10000,
current_logs: VecDeque::new(),
last_block: None,
state: LogQueryState::Initial,
}
}

/// set page size for pagination
pub fn with_page_size(mut self, page_size: u64) -> Self {
self.page_size = page_size;
self
}
}

macro_rules! rewake_with_new_state {
($ctx:ident, $this:ident, $new_state:expr) => {
$this.state = $new_state;
$ctx.waker().wake_by_ref();
meetmangukiya marked this conversation as resolved.
Show resolved Hide resolved
return Poll::Pending
};
}

impl<'a, P> Stream for LogQuery<'a, P>
where
P: JsonRpcClient,
{
type Item = Log;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match &mut self.state {
LogQueryState::Initial => {
if !self.filter.is_paginatable() {
// if not paginatable, load logs and consume
let filter = self.filter.clone();
let provider = self.provider;
let fut = Box::pin(async move { provider.get_logs(&filter).await });
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
} else {
// if paginatable, load last block
let fut = self.provider.get_block_number();
rewake_with_new_state!(ctx, self, LogQueryState::LoadLastBlock(fut));
}
}
LogQueryState::LoadLastBlock(fut) => {
self.last_block = Some(
futures_util::ready!(fut.as_mut().poll(ctx))
.expect("error occurred loading last block"),
);

let from_block = self.filter.get_from_block().unwrap();
let to_block = from_block + self.page_size;
self.from_block = Some(to_block);

let filter = self.filter.clone().from_block(from_block).to_block(to_block);
let provider = self.provider;
// load first page of logs
let fut = Box::pin(async move { provider.get_logs(&filter).await });
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
}
LogQueryState::LoadLogs(fut) => {
let logs = futures_util::ready!(fut.as_mut().poll(ctx))
.expect("error occurred loading logs");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panicking on failed RPCs is not very nice. I'd much prefer if this was a TryStream instead. Will submit PR when I get to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Implemented TryStream along with other fixes here 0143caf

self.current_logs = VecDeque::from(logs);
rewake_with_new_state!(ctx, self, LogQueryState::Consume);
}
LogQueryState::Consume => {
let log = self.current_logs.pop_front();
if log.is_none() {
// consumed all the logs
if !self.filter.is_paginatable() {
Poll::Ready(None)
} else {
// load new logs if there are still more pages to go through
let from_block = self.from_block.unwrap();
let to_block = from_block + self.page_size;

// no more pages to load, and everything is consumed
if from_block > self.last_block.unwrap() {
return Poll::Ready(None)
}
// load next page
self.from_block = Some(to_block);
Copy link

@philsippl philsippl May 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be to_block + 1, otherwise you double-sync blocks at the boundaries.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will verify this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@philsippl is correct, will fix this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


let filter = self.filter.clone().from_block(from_block).to_block(to_block);
let provider = self.provider;
let fut = Box::pin(async move { provider.get_logs(&filter).await });
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
}
} else {
Poll::Ready(log)
}
}
}
}
}
6 changes: 5 additions & 1 deletion ethers-providers/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::{
ens, erc, maybe,
pubsub::{PubsubClient, SubscriptionStream},
stream::{FilterWatcher, DEFAULT_POLL_INTERVAL},
FromErr, Http as HttpProvider, JsonRpcClient, JsonRpcClientWrapper, MockProvider,
FromErr, Http as HttpProvider, JsonRpcClient, JsonRpcClientWrapper, LogQuery, MockProvider,
PendingTransaction, QuorumProvider, RwClient, SyncingStatus,
};

Expand Down Expand Up @@ -647,6 +647,10 @@ impl<P: JsonRpcClient> Middleware for Provider<P> {
self.request("eth_getLogs", [filter]).await
}

fn get_logs_paginated<'a>(&'a self, filter: &Filter, page_size: u64) -> LogQuery<'a, P> {
LogQuery::new(self, filter).with_page_size(page_size)
}

/// Streams matching filter logs
async fn watch<'a>(
&'a self,
Expand Down
34 changes: 34 additions & 0 deletions examples/paginated_logs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use ethers::{abi::AbiDecode, prelude::*, utils::keccak256};
use eyre::Result;
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<()> {
let client =
Provider::<Ws>::connect("wss://mainnet.infura.io/ws/v3/c60b0bb42f8a4c6481ecd229eddaca27")
.await?;
let client = Arc::new(client);

let last_block = client.get_block(BlockNumber::Latest).await?.unwrap().number.unwrap();
println!("last_block: {}", last_block);

let erc20_transfer_filter = Filter::new()
.from_block(last_block - 10000)
.topic0(ValueOrArray::Value(H256::from(keccak256("Transfer(address,address,uint256)"))));

let mut stream = client.get_logs_paginated(&erc20_transfer_filter, 10);

while let Some(log) = stream.next().await {
println!(
"block: {:?}, tx: {:?}, token: {:?}, from: {:?}, to: {:?}, amount: {:?}",
log.block_number,
log.transaction_hash,
log.address,
log.topics.get(1),
log.topics.get(2),
U256::decode(log.data)
);
}

Ok(())
}