-
Notifications
You must be signed in to change notification settings - Fork 795
feat: add paginated logs #1285
feat: add paginated logs #1285
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
use super::{JsonRpcClient, Middleware, PinBoxFut, Provider}; | ||
use ethers_core::types::{Filter, Log, U64}; | ||
use futures_core::stream::Stream; | ||
use std::{ | ||
collections::VecDeque, | ||
pin::Pin, | ||
task::{Context, Poll}, | ||
}; | ||
|
||
pub struct LogQuery<'a, P> { | ||
provider: &'a Provider<P>, | ||
filter: Filter, | ||
from_block: Option<U64>, | ||
page_size: u64, | ||
current_logs: VecDeque<Log>, | ||
last_block: Option<U64>, | ||
state: LogQueryState<'a>, | ||
} | ||
|
||
enum LogQueryState<'a> { | ||
Initial, | ||
LoadLastBlock(PinBoxFut<'a, U64>), | ||
LoadLogs(PinBoxFut<'a, Vec<Log>>), | ||
Consume, | ||
} | ||
|
||
impl<'a, P> LogQuery<'a, P> | ||
where | ||
P: JsonRpcClient, | ||
{ | ||
pub fn new(provider: &'a Provider<P>, filter: &Filter) -> Self { | ||
Self { | ||
provider, | ||
filter: filter.clone(), | ||
from_block: filter.get_from_block(), | ||
page_size: 10000, | ||
current_logs: VecDeque::new(), | ||
last_block: None, | ||
state: LogQueryState::Initial, | ||
} | ||
} | ||
|
||
/// set page size for pagination | ||
pub fn with_page_size(mut self, page_size: u64) -> Self { | ||
self.page_size = page_size; | ||
self | ||
} | ||
} | ||
|
||
macro_rules! rewake_with_new_state { | ||
($ctx:ident, $this:ident, $new_state:expr) => { | ||
$this.state = $new_state; | ||
$ctx.waker().wake_by_ref(); | ||
meetmangukiya marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return Poll::Pending | ||
}; | ||
} | ||
|
||
impl<'a, P> Stream for LogQuery<'a, P> | ||
where | ||
P: JsonRpcClient, | ||
{ | ||
type Item = Log; | ||
|
||
fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> { | ||
match &mut self.state { | ||
LogQueryState::Initial => { | ||
if !self.filter.is_paginatable() { | ||
// if not paginatable, load logs and consume | ||
let filter = self.filter.clone(); | ||
let provider = self.provider; | ||
let fut = Box::pin(async move { provider.get_logs(&filter).await }); | ||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); | ||
} else { | ||
// if paginatable, load last block | ||
let fut = self.provider.get_block_number(); | ||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLastBlock(fut)); | ||
} | ||
} | ||
LogQueryState::LoadLastBlock(fut) => { | ||
self.last_block = Some( | ||
futures_util::ready!(fut.as_mut().poll(ctx)) | ||
.expect("error occurred loading last block"), | ||
); | ||
|
||
let from_block = self.filter.get_from_block().unwrap(); | ||
let to_block = from_block + self.page_size; | ||
self.from_block = Some(to_block); | ||
|
||
let filter = self.filter.clone().from_block(from_block).to_block(to_block); | ||
let provider = self.provider; | ||
// load first page of logs | ||
let fut = Box::pin(async move { provider.get_logs(&filter).await }); | ||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); | ||
} | ||
LogQueryState::LoadLogs(fut) => { | ||
let logs = futures_util::ready!(fut.as_mut().poll(ctx)) | ||
.expect("error occurred loading logs"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Panicking on failed RPCs is not very nice. I'd much prefer if this was a TryStream instead. Will submit PR when I get to it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Implemented |
||
self.current_logs = VecDeque::from(logs); | ||
rewake_with_new_state!(ctx, self, LogQueryState::Consume); | ||
} | ||
LogQueryState::Consume => { | ||
let log = self.current_logs.pop_front(); | ||
if log.is_none() { | ||
// consumed all the logs | ||
if !self.filter.is_paginatable() { | ||
Poll::Ready(None) | ||
} else { | ||
// load new logs if there are still more pages to go through | ||
let from_block = self.from_block.unwrap(); | ||
let to_block = from_block + self.page_size; | ||
|
||
// no more pages to load, and everything is consumed | ||
if from_block > self.last_block.unwrap() { | ||
return Poll::Ready(None) | ||
} | ||
// load next page | ||
self.from_block = Some(to_block); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will verify this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @philsippl is correct, will fix this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
let filter = self.filter.clone().from_block(from_block).to_block(to_block); | ||
let provider = self.provider; | ||
let fut = Box::pin(async move { provider.get_logs(&filter).await }); | ||
rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); | ||
} | ||
} else { | ||
Poll::Ready(log) | ||
} | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,34 @@ | ||
use ethers::{abi::AbiDecode, prelude::*, utils::keccak256}; | ||
use eyre::Result; | ||
use std::sync::Arc; | ||
|
||
#[tokio::main] | ||
async fn main() -> Result<()> { | ||
let client = | ||
Provider::<Ws>::connect("wss://mainnet.infura.io/ws/v3/c60b0bb42f8a4c6481ecd229eddaca27") | ||
.await?; | ||
let client = Arc::new(client); | ||
|
||
let last_block = client.get_block(BlockNumber::Latest).await?.unwrap().number.unwrap(); | ||
println!("last_block: {}", last_block); | ||
|
||
let erc20_transfer_filter = Filter::new() | ||
.from_block(last_block - 10000) | ||
.topic0(ValueOrArray::Value(H256::from(keccak256("Transfer(address,address,uint256)")))); | ||
|
||
let mut stream = client.get_logs_paginated(&erc20_transfer_filter, 10); | ||
|
||
while let Some(log) = stream.next().await { | ||
println!( | ||
"block: {:?}, tx: {:?}, token: {:?}, from: {:?}, to: {:?}, amount: {:?}", | ||
log.block_number, | ||
log.transaction_hash, | ||
log.address, | ||
log.topics.get(1), | ||
log.topics.get(2), | ||
U256::decode(log.data) | ||
); | ||
} | ||
|
||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prestwich wdyt about unifying this logic in the default get_logs behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think it's a pretty reasonable way to smooth over inconsistent RPC paging requirements 🤔