Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/improve event stream #251

Merged
merged 8 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 67 additions & 65 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions chains/ethereum/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ serde.workspace = true
serde_json.workspace = true
thiserror = "1.0"
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tokio-stream = { version = "0.1", features = ["sync"] }
tracing = "0.1"
url = "2.4"

Expand Down
94 changes: 89 additions & 5 deletions chains/ethereum/server/src/block_stream.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,106 @@
#![allow(dead_code)]
use crate::{
finalized_block_stream::FinalizedBlockStream, new_heads::NewHeadsStream, state::State,
use super::{
event_stream::{EthereumEventStream, NewBlock},
state::State,
};
use futures_util::StreamExt;
use rosetta_config_ethereum::Event as EthEvent;
use rosetta_core::{stream::Stream, types::BlockIdentifier, BlockOrIdentifier, ClientEvent};
use rosetta_ethereum_backend::{
ext::types::{rpc::RpcBlock, H256},
jsonrpsee::core::{client::Subscription, ClientError as RpcError},
EthereumPubSub,
};
use std::{
pin::Pin,
task::{Context, Poll},
};

pub struct BlockStream<RPC>
where
RPC: for<'s> EthereumPubSub<Error = RpcError, NewHeadsStream<'s> = Subscription<RpcBlock<H256>>>
+ Clone
+ Unpin
+ Send
+ Sync
+ 'static,
RPC::SubscriptionError: Send + Sync,
{
finalized: FinalizedBlockStream<RPC>,
new_heads: NewHeadsStream<RPC>,
block_stream: Option<EthereumEventStream<RPC>>,
state: State,
}

impl<RPC> BlockStream<RPC>
where
RPC: for<'s> EthereumPubSub<Error = RpcError, NewHeadsStream<'s> = Subscription<RpcBlock<H256>>>
+ Clone
+ Unpin
+ Send
+ Sync
+ 'static,
RPC::SubscriptionError: Send + Sync,
{
#[must_use]
pub fn new(client: RPC, state: State) -> Self {
Self { block_stream: Some(EthereumEventStream::new(client)), state }
}
}

impl<RPC> Stream for BlockStream<RPC>
where
RPC: for<'s> EthereumPubSub<Error = RpcError, NewHeadsStream<'s> = Subscription<RpcBlock<H256>>>
+ Clone
+ Unpin
+ Send
+ Sync
+ 'static,
RPC::SubscriptionError: Send + Sync,
{
type Item = ClientEvent<BlockIdentifier, EthEvent>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Some(mut block_stream) = self.block_stream.take() else {
return Poll::Ready(None);
};

let mut failures = 0;
loop {
match block_stream.poll_next_unpin(cx) {
Poll::Ready(Some(new_block)) => {
let block_id = {
let header = new_block.sealed_block().header();
BlockOrIdentifier::Identifier(BlockIdentifier {
index: header.number(),
hash: header.hash().0,
})
};
let is_finalized = matches!(new_block, NewBlock::Finalized(_));
if let Err(err) = self.state.import(new_block.into_sealed_block()) {
failures += 1;
tracing::warn!("failed to import block {block_id} ({failures}): {:?}", err);
if failures >= 5 {
return Poll::Ready(None);
}
continue;
}

let event = if is_finalized {
ClientEvent::NewFinalized(block_id)
} else {
ClientEvent::NewHead(block_id)
};
self.block_stream = Some(block_stream);
break Poll::Ready(Some(event));
},
Poll::Ready(None) => break Poll::Ready(None),
Poll::Pending => {
self.block_stream = Some(block_stream);
break Poll::Pending;
},
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
24 changes: 17 additions & 7 deletions chains/ethereum/server/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
#![allow(clippy::option_if_let_else)]
use crate::{
event_stream::EthereumEventStream,
block_stream::BlockStream,
log_filter::LogFilter,
proof::verify_proof,
shared_stream::SharedStream,
state::State,
utils::{
AtBlockExt, DefaultFeeEstimatorConfig, EthereumRpcExt, PartialBlock,
PolygonFeeEstimatorConfig,
},
};
use anyhow::{Context, Result};
use futures_util::StreamExt;
use rosetta_config_ethereum::{
ext::types::{
crypto::{Crypto, DefaultCrypto, Keypair, Signer},
Expand All @@ -27,14 +30,14 @@ use rosetta_config_ethereum::{
use rosetta_core::{
crypto::{address::Address, PublicKey},
types::{BlockIdentifier, PartialBlockIdentifier},
BlockchainConfig,
BlockchainConfig, ClientEvent,
};
use rosetta_ethereum_backend::{
jsonrpsee::{
core::client::{ClientT, SubscriptionClientT},
Adapter,
},
BlockRange, EthereumPubSub, EthereumRpc, ExitReason,
BlockRange, EthereumRpc, ExitReason,
};
use std::sync::{
atomic::{self, Ordering},
Expand Down Expand Up @@ -73,6 +76,7 @@ pub struct EthereumClient<P> {
nonce: Arc<std::sync::atomic::AtomicU64>,
private_key: Option<[u8; 32]>,
log_filter: Arc<std::sync::Mutex<LogFilter>>,
// event_stream: SharedStream<BlockStream<Adapter<P>>>
}

impl<P> Clone for EthereumClient<P>
Expand Down Expand Up @@ -516,11 +520,17 @@ where

impl<P> EthereumClient<P>
where
P: SubscriptionClientT + Send + Sync + 'static,
P: SubscriptionClientT + Unpin + Clone + Send + Sync + 'static,
{
#[allow(clippy::missing_errors_doc)]
pub async fn listen(&self) -> Result<EthereumEventStream<'_, P>> {
let new_heads = EthereumPubSub::new_heads(&self.backend).await?;
Ok(EthereumEventStream::new(self, new_heads))
pub async fn listen(&self) -> Result<SharedStream<BlockStream<Adapter<P>>>> {
let best_finalized_block = self.finalized_block(None).await?;
let mut stream = BlockStream::new(self.backend.clone(), State::new(best_finalized_block));
match stream.next().await {
Some(ClientEvent::Close(msg)) => anyhow::bail!(msg),
None => anyhow::bail!("Failed to open the event stream"),
Some(_) => {},
}
Ok(SharedStream::new(stream, 100))
}
}
Loading
Loading