From c860c5f89d4cec34970bdf1e68c131fbf406e227 Mon Sep 17 00:00:00 2001 From: Lohann Paterno Coutinho Ferreira Date: Fri, 12 Jul 2024 00:02:52 -0300 Subject: [PATCH 1/8] Remove duplicated code --- chains/ethereum/server/src/client.rs | 6 +- chains/ethereum/server/src/event_stream.rs | 174 ++------------------- chains/ethereum/server/src/lib.rs | 2 +- 3 files changed, 21 insertions(+), 161 deletions(-) diff --git a/chains/ethereum/server/src/client.rs b/chains/ethereum/server/src/client.rs index e5b00bc4..29dc99a4 100644 --- a/chains/ethereum/server/src/client.rs +++ b/chains/ethereum/server/src/client.rs @@ -516,11 +516,11 @@ where impl

EthereumClient

where - P: SubscriptionClientT + Send + Sync + 'static, + P: SubscriptionClientT + Unpin + Clone + Send + Sync + 'static, { #[allow(clippy::missing_errors_doc)] - pub async fn listen(&self) -> Result> { + pub async fn listen(&self) -> Result> { let new_heads = EthereumPubSub::new_heads(&self.backend).await?; - Ok(EthereumEventStream::new(self, new_heads)) + Ok(EthereumEventStream::new(self.backend.clone().0, new_heads)) } } diff --git a/chains/ethereum/server/src/event_stream.rs b/chains/ethereum/server/src/event_stream.rs index 3c411a2d..ca1659d5 100644 --- a/chains/ethereum/server/src/event_stream.rs +++ b/chains/ethereum/server/src/event_stream.rs @@ -1,44 +1,44 @@ -use crate::{client::EthereumClient, utils::PartialBlock}; -use futures_util::{future::BoxFuture, FutureExt, StreamExt}; +use super::finalized_block_stream::FinalizedBlockStream; +use futures_util::StreamExt; use rosetta_config_ethereum::Event; use rosetta_core::{stream::Stream, types::BlockIdentifier, BlockOrIdentifier, ClientEvent}; use rosetta_ethereum_backend::{ ext::types::{crypto::DefaultCrypto, rpc::RpcBlock, H256}, - jsonrpsee::core::client::{Subscription, SubscriptionClientT}, + jsonrpsee::{ + core::client::{Subscription, SubscriptionClientT}, + Adapter, + }, }; -use std::{cmp::Ordering, pin::Pin, task::Poll}; +use std::{pin::Pin, task::Poll}; // Maximum number of failures in sequence before closing the stream const FAILURE_THRESHOLD: u32 = 10; -pub struct EthereumEventStream<'a, P: SubscriptionClientT + Send + Sync + 'static> { +pub struct EthereumEventStream { /// Ethereum subscription for new heads new_head_stream: Option>>, /// Finalized blocks stream - finalized_stream: Option>, + finalized_stream: Option>>, /// Count the number of failed attempts to retrieve the latest block failures: u32, } -impl

EthereumEventStream<'_, P> +impl

EthereumEventStream

where - P: SubscriptionClientT + Send + Sync + 'static, + P: SubscriptionClientT + Unpin + Clone + Send + Sync + 'static, { - pub fn new( - client: &EthereumClient

, - subscription: Subscription>, - ) -> EthereumEventStream<'_, P> { - EthereumEventStream { + pub fn new(client: P, subscription: Subscription>) -> Self { + Self { new_head_stream: Some(subscription), - finalized_stream: Some(FinalizedBlockStream::new(client)), + finalized_stream: Some(FinalizedBlockStream::new(Adapter(client))), failures: 0, } } } -impl

Stream for EthereumEventStream<'_, P> +impl

Stream for EthereumEventStream

where - P: SubscriptionClientT + Send + Sync + 'static, + P: SubscriptionClientT + Unpin + Clone + Send + Sync + 'static, { type Item = ClientEvent; @@ -53,9 +53,8 @@ where // Poll the finalized block stream match finalized_stream.poll_next_unpin(cx) { - Poll::Ready(Some(Ok(block))) => { + Poll::Ready(Some(block)) => { self.finalized_stream = Some(finalized_stream); - return Poll::Ready(Some(ClientEvent::NewFinalized( BlockOrIdentifier::Identifier(BlockIdentifier::new( block.header().header().number, @@ -63,10 +62,6 @@ where )), ))); }, - Poll::Ready(Some(Err(error))) => { - self.new_head_stream = None; - return Poll::Ready(Some(ClientEvent::Close(error))); - }, Poll::Ready(None) => { self.new_head_stream = None; return Poll::Ready(None); @@ -105,7 +100,6 @@ where }, Err(error) => { self.failures += 1; - println!("[RPC BUG] invalid latest block: {error}"); tracing::error!("[RPC BUG] invalid latest block: {error}"); continue; }, @@ -114,11 +108,6 @@ where // Reset failure counter self.failures = 0; - // Store the new latest block - if let Some(finalized_stream) = self.finalized_stream.as_mut() { - finalized_stream.update_latest_block(block.index); - } - self.new_head_stream = Some(new_head_stream); return Poll::Ready(Some(ClientEvent::NewHead(BlockOrIdentifier::Identifier( block, @@ -133,132 +122,3 @@ where } } } - -struct FinalizedBlockStream<'a, P> -where - P: SubscriptionClientT + Send + Sync + 'static, -{ - /// Ethereum client used to retrieve the finalized block - client: &'a EthereumClient

, - /// Cache the latest block, used for retrieve the latest finalized block - /// see [`BlockFinalityStrategy`] - latest_block: Option, - /// Ethereum client doesn't support subscribing for finalized blocks, as workaround - /// everytime we receive a new head, we query the latest finalized block - future: Option>>, - /// Cache the best finalized block, we use this to avoid emitting two - /// [`ClientEvent::NewFinalized`] for the same block - best_finalized_block: Option, - /// Count the number of failed attempts to retrieve the finalized block - failures: u32, - /// Waker used to wake up the stream when a new block is available - waker: Option, -} - -impl<'a, P> FinalizedBlockStream<'a, P> -where - P: SubscriptionClientT + Send + Sync + 'static, -{ - pub fn new(client: &EthereumClient

) -> FinalizedBlockStream<'_, P> { - FinalizedBlockStream { - client, - latest_block: None, - future: None, - best_finalized_block: None, - failures: 0, - waker: None, - } - } - - pub fn update_latest_block(&mut self, number: u64) { - if Some(number) == self.latest_block { - return; - } - self.latest_block = Some(number); - if self.future.is_none() { - self.future = Some(self.finalized_block()); - } - if let Some(waker) = self.waker.take() { - waker.wake(); - } - } - - fn finalized_block<'c>(&'c self) -> BoxFuture<'a, anyhow::Result> { - self.client.finalized_block(self.latest_block).boxed() - } -} - -impl

Stream for FinalizedBlockStream<'_, P> -where - P: SubscriptionClientT + Send + Sync + 'static, -{ - type Item = Result; - - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - loop { - // Check the failure count - match self.failures.cmp(&FAILURE_THRESHOLD) { - Ordering::Greater => return Poll::Ready(None), - Ordering::Equal => { - self.failures += 1; - self.future = None; - return Poll::Ready(Some(Err(format!( - "More than {FAILURE_THRESHOLD} failures in sequence", - )))); - }, - Ordering::Less => {}, - } - - // If the future is not ready, store the waker and return pending - let Some(mut future) = self.future.take() else { - self.waker = Some(cx.waker().clone()); - return Poll::Pending; - }; - - match future.poll_unpin(cx) { - Poll::Ready(Ok(block)) => { - // Store the waker - self.waker = Some(cx.waker().clone()); - - // Skip if the finalized block is equal to the best finalized block - if let Some(best_finalized_block) = self.best_finalized_block.take() { - if block.header().hash() == best_finalized_block.header().hash() { - self.best_finalized_block = Some(best_finalized_block); - break Poll::Pending; - } - tracing::debug!( - "new finalized block {} {:?}", - block.header().number(), - block.header().hash() - ); - } - - // Cache the new best finalized block - self.best_finalized_block = Some(block.clone()); - - // Return the best finalized block - break Poll::Ready(Some(Ok(block))); - }, - Poll::Ready(Err(error)) => { - // Increment failure count - self.failures += 1; - tracing::error!( - "failed to retrieve finalized block: {error:?} {}", - self.failures - ); - - // Retry to retrieve the latest finalized block. - self.future = Some(self.finalized_block()); - continue; - }, - Poll::Pending => { - self.future = Some(future); - break Poll::Pending; - }, - } - } - } -} diff --git a/chains/ethereum/server/src/lib.rs b/chains/ethereum/server/src/lib.rs index 82ff9e18..da22a580 100644 --- a/chains/ethereum/server/src/lib.rs +++ b/chains/ethereum/server/src/lib.rs @@ -106,7 +106,7 @@ impl MaybeWsEthereumClient { impl BlockchainClient for MaybeWsEthereumClient { type MetadataParams = EthereumMetadataParams; type Metadata = EthereumMetadata; - type EventStream<'a> = EthereumEventStream<'a, DefaultClient>; + type EventStream<'a> = EthereumEventStream where Self: 'a; type Call = EthQuery; type CallResult = EthQueryResult; From 66b22ab7893df250fc75d3dfd0f4181b93a9ca68 Mon Sep 17 00:00:00 2001 From: Lohann Paterno Coutinho Ferreira Date: Fri, 12 Jul 2024 00:39:19 -0300 Subject: [PATCH 2/8] Use a more robust new heads stream --- chains/ethereum/server/src/block_stream.rs | 1 + chains/ethereum/server/src/client.rs | 16 ++- chains/ethereum/server/src/event_stream.rs | 118 ++++++++---------- .../server/src/finalized_block_stream.rs | 8 +- chains/ethereum/server/src/lib.rs | 3 +- chains/ethereum/server/src/new_heads.rs | 19 ++- 6 files changed, 84 insertions(+), 81 deletions(-) diff --git a/chains/ethereum/server/src/block_stream.rs b/chains/ethereum/server/src/block_stream.rs index 1cb2628f..8f540804 100644 --- a/chains/ethereum/server/src/block_stream.rs +++ b/chains/ethereum/server/src/block_stream.rs @@ -15,6 +15,7 @@ where + Send + Sync + 'static, + RPC::SubscriptionError: Send + Sync, { finalized: FinalizedBlockStream, new_heads: NewHeadsStream, diff --git a/chains/ethereum/server/src/client.rs b/chains/ethereum/server/src/client.rs index 29dc99a4..5d8d23f3 100644 --- a/chains/ethereum/server/src/client.rs +++ b/chains/ethereum/server/src/client.rs @@ -9,6 +9,7 @@ use crate::{ }, }; use anyhow::{Context, Result}; +use futures_util::StreamExt; use rosetta_config_ethereum::{ ext::types::{ crypto::{Crypto, DefaultCrypto, Keypair, Signer}, @@ -27,14 +28,14 @@ use rosetta_config_ethereum::{ use rosetta_core::{ crypto::{address::Address, PublicKey}, types::{BlockIdentifier, PartialBlockIdentifier}, - BlockchainConfig, + BlockchainConfig, ClientEvent, }; use rosetta_ethereum_backend::{ jsonrpsee::{ core::client::{ClientT, SubscriptionClientT}, Adapter, }, - BlockRange, EthereumPubSub, EthereumRpc, ExitReason, + BlockRange, EthereumRpc, ExitReason, }; use std::sync::{ atomic::{self, Ordering}, @@ -519,8 +520,13 @@ where P: SubscriptionClientT + Unpin + Clone + Send + Sync + 'static, { #[allow(clippy::missing_errors_doc)] - pub async fn listen(&self) -> Result> { - let new_heads = EthereumPubSub::new_heads(&self.backend).await?; - Ok(EthereumEventStream::new(self.backend.clone().0, new_heads)) + pub async fn listen(&self) -> Result>> { + let mut stream = EthereumEventStream::new(self.backend.clone()); + match stream.next().await { + Some(ClientEvent::Close(msg)) => anyhow::bail!(msg), + None => anyhow::bail!("Failed to open the event stream"), + Some(_) => {}, + } + Ok(stream) } } diff --git a/chains/ethereum/server/src/event_stream.rs b/chains/ethereum/server/src/event_stream.rs index ca1659d5..6740df68 100644 --- a/chains/ethereum/server/src/event_stream.rs +++ b/chains/ethereum/server/src/event_stream.rs @@ -1,44 +1,57 @@ -use super::finalized_block_stream::FinalizedBlockStream; +use super::{finalized_block_stream::FinalizedBlockStream, new_heads::NewHeadsStream}; use futures_util::StreamExt; use rosetta_config_ethereum::Event; use rosetta_core::{stream::Stream, types::BlockIdentifier, BlockOrIdentifier, ClientEvent}; use rosetta_ethereum_backend::{ - ext::types::{crypto::DefaultCrypto, rpc::RpcBlock, H256}, - jsonrpsee::{ - core::client::{Subscription, SubscriptionClientT}, - Adapter, - }, + ext::types::{rpc::RpcBlock, H256}, + jsonrpsee::core::client::{error::Error as RpcError, Subscription}, + EthereumPubSub, }; use std::{pin::Pin, task::Poll}; -// Maximum number of failures in sequence before closing the stream -const FAILURE_THRESHOLD: u32 = 10; - -pub struct EthereumEventStream { - /// Ethereum subscription for new heads - new_head_stream: Option>>, +pub struct EthereumEventStream +where + C: for<'s> EthereumPubSub = Subscription>> + + Clone + + Unpin + + Send + + Sync + + 'static, + C::SubscriptionError: Send + Sync, +{ + /// Latest block stream + new_head_stream: Option>, /// Finalized blocks stream - finalized_stream: Option>>, - /// Count the number of failed attempts to retrieve the latest block - failures: u32, + finalized_stream: Option>, } -impl

EthereumEventStream

+impl EthereumEventStream where - P: SubscriptionClientT + Unpin + Clone + Send + Sync + 'static, + C: for<'s> EthereumPubSub = Subscription>> + + Clone + + Unpin + + Send + + Sync + + 'static, + C::SubscriptionError: Send + Sync, { - pub fn new(client: P, subscription: Subscription>) -> Self { + pub fn new(client: C) -> Self { Self { - new_head_stream: Some(subscription), - finalized_stream: Some(FinalizedBlockStream::new(Adapter(client))), - failures: 0, + new_head_stream: Some(NewHeadsStream::new(client.clone())), + finalized_stream: Some(FinalizedBlockStream::new(client)), } } } -impl

Stream for EthereumEventStream

+impl Stream for EthereumEventStream where - P: SubscriptionClientT + Unpin + Clone + Send + Sync + 'static, + C: for<'s> EthereumPubSub = Subscription>> + + Clone + + Unpin + + Send + + Sync + + 'static, + C::SubscriptionError: Send + Sync, { type Item = ClientEvent; @@ -77,48 +90,25 @@ where return Poll::Ready(None); }; - loop { - if self.failures >= FAILURE_THRESHOLD { - self.new_head_stream = None; - self.finalized_stream = None; - return Poll::Ready(Some(ClientEvent::Close( - "More than 10 failures in sequence".into(), - ))); - } - - match new_head_stream.poll_next_unpin(cx) { - Poll::Ready(Some(block)) => { - // Convert raw block to block identifier - let block = match block { - Ok(block) => { - let header = if let Some(hash) = block.hash { - block.header.seal(hash) - } else { - block.header.seal_slow::() - }; - BlockIdentifier::new(header.number(), header.hash().0) - }, - Err(error) => { - self.failures += 1; - tracing::error!("[RPC BUG] invalid latest block: {error}"); - continue; - }, - }; - - // Reset failure counter - self.failures = 0; + match new_head_stream.poll_next_unpin(cx) { + Poll::Ready(Some(block)) => { + // Convert block to block identifier + let block = { + let header = block.header(); + BlockIdentifier::new(header.number(), header.hash().0) + }; - self.new_head_stream = Some(new_head_stream); - return Poll::Ready(Some(ClientEvent::NewHead(BlockOrIdentifier::Identifier( - block, - )))); - }, - Poll::Ready(None) => return Poll::Ready(None), - Poll::Pending => { - self.new_head_stream = Some(new_head_stream); - break Poll::Pending; - }, - }; + self.new_head_stream = Some(new_head_stream); + Poll::Ready(Some(ClientEvent::NewHead(BlockOrIdentifier::Identifier(block)))) + }, + Poll::Ready(None) => { + self.finalized_stream = None; + Poll::Ready(None) + }, + Poll::Pending => { + self.new_head_stream = Some(new_head_stream); + Poll::Pending + }, } } } diff --git a/chains/ethereum/server/src/finalized_block_stream.rs b/chains/ethereum/server/src/finalized_block_stream.rs index 466b0998..10f8414c 100644 --- a/chains/ethereum/server/src/finalized_block_stream.rs +++ b/chains/ethereum/server/src/finalized_block_stream.rs @@ -41,9 +41,6 @@ struct Statistics { /// Latest known finalized block. best_finalized_block: Option

, - /// required number of successful polls before starting to adjust the polling interval. - probation_period: u32, - /// Incremented the best finalized block is parent of the new block. /// Ex: if the best known finalized block is 100, and the new block is 101. new: u32, @@ -91,7 +88,9 @@ impl Statistics { self.new += 1; true } else { - let gap_size = i32::try_from(new_block.number - expected).unwrap_or(1); + debug_assert!(new_block.number > expected, "Non monotonically increasing finalized block number"); + // Cap the gap_size to `ADJUST_THRESHOLD`. + let gap_size = i32::try_from(new_block.number - expected).unwrap_or(1).min(ADJUST_THRESHOLD); self.gaps += 1; self.adjust_threshold -= gap_size; true @@ -149,7 +148,6 @@ where backend, statistics: Statistics { best_finalized_block: None, - probation_period: 0, new: 0, duplicated: 0, gaps: 0, diff --git a/chains/ethereum/server/src/lib.rs b/chains/ethereum/server/src/lib.rs index da22a580..a07ffafa 100644 --- a/chains/ethereum/server/src/lib.rs +++ b/chains/ethereum/server/src/lib.rs @@ -9,6 +9,7 @@ use rosetta_core::{ types::{BlockIdentifier, PartialBlockIdentifier}, BlockchainClient, BlockchainConfig, }; +use rosetta_ethereum_backend::jsonrpsee::Adapter; use rosetta_server::ws::{default_client, default_http_client, DefaultClient, HttpClient}; use url::Url; @@ -106,7 +107,7 @@ impl MaybeWsEthereumClient { impl BlockchainClient for MaybeWsEthereumClient { type MetadataParams = EthereumMetadataParams; type Metadata = EthereumMetadata; - type EventStream<'a> = EthereumEventStream where Self: 'a; + type EventStream<'a> = EthereumEventStream> where Self: 'a; type Call = EthQuery; type CallResult = EthQueryResult; diff --git a/chains/ethereum/server/src/new_heads.rs b/chains/ethereum/server/src/new_heads.rs index 8b42fc73..3916c6ae 100644 --- a/chains/ethereum/server/src/new_heads.rs +++ b/chains/ethereum/server/src/new_heads.rs @@ -26,20 +26,22 @@ struct PollLatestBlock(RPC); impl FutureFactory for PollLatestBlock where RPC: EthereumRpc + Send + Sync + 'static, + RPC::Error: Send + Sync, { - type Output = Result, RpcError>; + type Output = Result, ::Error>; type Future<'a> = BoxFuture<'a, Self::Output>; fn new_future(&mut self) -> Self::Future<'_> { async move { let Some(block) = self.0.block(AtBlock::Latest).await? else { return Ok(None); }; - let Some(hash) = block.hash else { - return Err(RpcError::Custom( - "[report this bug] the api returned the latest block without hash".to_string(), - )); + let block = if let Some(hash) = block.hash { + block.seal(hash) + } else { + tracing::warn!("[report this bug] the api returned the latest block without hash, computing block hash manually"); + block.seal_slow::() }; - Ok(Some(block.seal(hash))) + Ok(Some(block)) } .boxed() } @@ -88,6 +90,7 @@ where + Send + Sync + 'static, + RPC::SubscriptionError: Send + Sync, { Subscription(AutoSubscribe, NewHeadsSubscriber>), Polling(CircuitBreaker>, ()>), @@ -101,6 +104,7 @@ where + Send + Sync + 'static, + RPC::SubscriptionError: Send + Sync, { #[must_use] pub const fn new(backend: RPC) -> Self { @@ -117,6 +121,7 @@ where + Send + Sync + 'static, + RPC::SubscriptionError: Send + Sync, { /// Subscription or Polling to new block headers. state: State, @@ -135,6 +140,7 @@ where + Send + Sync + 'static, + RPC::SubscriptionError: Send + Sync, { #[must_use] pub const fn new(backend: RPC) -> Self { @@ -149,6 +155,7 @@ where + Send + Sync + 'static, + RPC::SubscriptionError: Send + Sync, { type Item = PartialBlock; From e1b7b608a161975e5b4b4c99cc0239c0936b5815 Mon Sep 17 00:00:00 2001 From: Lohann Paterno Coutinho Ferreira Date: Sat, 13 Jul 2024 07:05:24 -0300 Subject: [PATCH 3/8] Implement SharedStream --- Cargo.lock | 3 + chains/ethereum/server/Cargo.toml | 1 + chains/ethereum/server/src/block_stream.rs | 92 ++++++++- chains/ethereum/server/src/client.rs | 12 +- chains/ethereum/server/src/event_stream.rs | 70 +++++-- .../server/src/finalized_block_stream.rs | 8 +- chains/ethereum/server/src/lib.rs | 4 +- chains/ethereum/server/src/multi_block.rs | 40 ++++ chains/ethereum/server/src/shared_stream.rs | 180 ++++++++++++++++++ chains/ethereum/server/src/state.rs | 57 +++++- rosetta-core/Cargo.toml | 1 + rosetta-core/src/lib.rs | 14 +- rosetta-core/src/types.rs | 9 +- 13 files changed, 454 insertions(+), 37 deletions(-) create mode 100644 chains/ethereum/server/src/shared_stream.rs diff --git a/Cargo.lock b/Cargo.lock index ea7fc40b..0198ce1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5497,6 +5497,7 @@ version = "0.6.0" dependencies = [ "anyhow", "async-trait", + "const-hex", "fluent-uri", "futures-util", "rosetta-crypto", @@ -5682,6 +5683,7 @@ dependencies = [ "sha3", "thiserror", "tokio", + "tokio-stream", "tracing", "tracing-subscriber 0.3.18", "url", @@ -7937,6 +7939,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/chains/ethereum/server/Cargo.toml b/chains/ethereum/server/Cargo.toml index e240d804..a00f670a 100644 --- a/chains/ethereum/server/Cargo.toml +++ b/chains/ethereum/server/Cargo.toml @@ -25,6 +25,7 @@ serde.workspace = true serde_json.workspace = true thiserror = "1.0" tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } +tokio-stream = { version = "0.1", features = ["sync"] } tracing = "0.1" url = "2.4" diff --git a/chains/ethereum/server/src/block_stream.rs b/chains/ethereum/server/src/block_stream.rs index 8f540804..47e31264 100644 --- a/chains/ethereum/server/src/block_stream.rs +++ b/chains/ethereum/server/src/block_stream.rs @@ -1,23 +1,107 @@ #![allow(dead_code)] -use crate::{ - finalized_block_stream::FinalizedBlockStream, new_heads::NewHeadsStream, state::State, +use super::{ + event_stream::{EthereumEventStream, NewBlock}, + state::State, }; +use futures_util::StreamExt; +use rosetta_config_ethereum::Event as EthEvent; +use rosetta_core::{stream::Stream, types::BlockIdentifier, BlockOrIdentifier, ClientEvent}; use rosetta_ethereum_backend::{ ext::types::{rpc::RpcBlock, H256}, jsonrpsee::core::{client::Subscription, ClientError as RpcError}, EthereumPubSub, }; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; pub struct BlockStream where RPC: for<'s> EthereumPubSub = Subscription>> + + Clone + Unpin + Send + Sync + 'static, RPC::SubscriptionError: Send + Sync, { - finalized: FinalizedBlockStream, - new_heads: NewHeadsStream, + block_stream: Option>, state: State, } + +impl BlockStream +where + RPC: for<'s> EthereumPubSub = Subscription>> + + Clone + + Unpin + + Send + + Sync + + 'static, + RPC::SubscriptionError: Send + Sync, +{ + #[must_use] + pub fn new(client: RPC, state: State) -> Self { + Self { block_stream: Some(EthereumEventStream::new(client)), state } + } +} + +impl Stream for BlockStream +where + RPC: for<'s> EthereumPubSub = Subscription>> + + Clone + + Unpin + + Send + + Sync + + 'static, + RPC::SubscriptionError: Send + Sync, +{ + type Item = ClientEvent; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Some(mut block_stream) = self.block_stream.take() else { + return Poll::Ready(None); + }; + + let mut failures = 0; + loop { + match block_stream.poll_next_unpin(cx) { + Poll::Ready(Some(new_block)) => { + let block_id = { + let header = new_block.sealed_block().header(); + BlockOrIdentifier::Identifier(BlockIdentifier { + index: header.number(), + hash: header.hash().0, + }) + }; + let is_finalized = matches!(new_block, NewBlock::Finalized(_)); + if let Err(err) = self.state.import(new_block.into_sealed_block()) { + failures += 1; + tracing::warn!("failed to import block {block_id} ({failures}): {:?}", err); + if failures >= 5 { + return Poll::Ready(None); + } + continue; + } + + let event = if is_finalized { + ClientEvent::NewHead(block_id) + } else { + ClientEvent::NewFinalized(block_id) + }; + self.block_stream = Some(block_stream); + break Poll::Ready(Some(event)); + }, + Poll::Ready(None) => break Poll::Ready(None), + Poll::Pending => { + self.block_stream = Some(block_stream); + break Poll::Pending; + }, + } + } + } + + fn size_hint(&self) -> (usize, Option) { + (0, None) + } +} diff --git a/chains/ethereum/server/src/client.rs b/chains/ethereum/server/src/client.rs index 5d8d23f3..79844a2d 100644 --- a/chains/ethereum/server/src/client.rs +++ b/chains/ethereum/server/src/client.rs @@ -1,8 +1,10 @@ #![allow(clippy::option_if_let_else)] use crate::{ - event_stream::EthereumEventStream, + block_stream::BlockStream, log_filter::LogFilter, proof::verify_proof, + shared_stream::SharedStream, + state::State, utils::{ AtBlockExt, DefaultFeeEstimatorConfig, EthereumRpcExt, PartialBlock, PolygonFeeEstimatorConfig, @@ -74,6 +76,7 @@ pub struct EthereumClient

{ nonce: Arc, private_key: Option<[u8; 32]>, log_filter: Arc>, + // event_stream: SharedStream>> } impl

Clone for EthereumClient

@@ -520,13 +523,14 @@ where P: SubscriptionClientT + Unpin + Clone + Send + Sync + 'static, { #[allow(clippy::missing_errors_doc)] - pub async fn listen(&self) -> Result>> { - let mut stream = EthereumEventStream::new(self.backend.clone()); + pub async fn listen(&self) -> Result>>> { + let best_finalized_block = self.finalized_block(None).await?; + let mut stream = BlockStream::new(self.backend.clone(), State::new(best_finalized_block)); match stream.next().await { Some(ClientEvent::Close(msg)) => anyhow::bail!(msg), None => anyhow::bail!("Failed to open the event stream"), Some(_) => {}, } - Ok(stream) + Ok(SharedStream::new(stream, 100)) } } diff --git a/chains/ethereum/server/src/event_stream.rs b/chains/ethereum/server/src/event_stream.rs index 6740df68..18cc2476 100644 --- a/chains/ethereum/server/src/event_stream.rs +++ b/chains/ethereum/server/src/event_stream.rs @@ -1,7 +1,7 @@ use super::{finalized_block_stream::FinalizedBlockStream, new_heads::NewHeadsStream}; use futures_util::StreamExt; -use rosetta_config_ethereum::Event; -use rosetta_core::{stream::Stream, types::BlockIdentifier, BlockOrIdentifier, ClientEvent}; +use rosetta_config_ethereum::ext::types::SealedBlock; +use rosetta_core::stream::Stream; use rosetta_ethereum_backend::{ ext::types::{rpc::RpcBlock, H256}, jsonrpsee::core::client::{error::Error as RpcError, Subscription}, @@ -9,6 +9,46 @@ use rosetta_ethereum_backend::{ }; use std::{pin::Pin, task::Poll}; +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum NewBlock { + NewHead(SealedBlock), + Finalized(SealedBlock), +} + +impl NewBlock { + #[must_use] + pub const fn new_head(block: SealedBlock) -> Self { + Self::NewHead(block) + } + + #[must_use] + pub const fn new_finalized(block: SealedBlock) -> Self { + Self::Finalized(block) + } + + #[must_use] + pub fn into_sealed_block(self) -> SealedBlock { + match self { + Self::Finalized(block) | Self::NewHead(block) => block, + } + } + + #[must_use] + pub const fn sealed_block(&self) -> &SealedBlock { + match self { + Self::Finalized(block) | Self::NewHead(block) => block, + } + } +} + +impl From for SealedBlock { + fn from(new_block: NewBlock) -> Self { + match new_block { + NewBlock::Finalized(block) | NewBlock::NewHead(block) => block, + } + } +} + pub struct EthereumEventStream where C: for<'s> EthereumPubSub = Subscription>> @@ -53,7 +93,7 @@ where + 'static, C::SubscriptionError: Send + Sync, { - type Item = ClientEvent; + type Item = NewBlock; fn poll_next( mut self: Pin<&mut Self>, @@ -68,12 +108,13 @@ where match finalized_stream.poll_next_unpin(cx) { Poll::Ready(Some(block)) => { self.finalized_stream = Some(finalized_stream); - return Poll::Ready(Some(ClientEvent::NewFinalized( - BlockOrIdentifier::Identifier(BlockIdentifier::new( - block.header().header().number, - block.header().hash().0, - )), - ))); + return Poll::Ready(Some(NewBlock::new_finalized(block))); + // return Poll::Ready(Some(ClientEvent::NewFinalized( + // BlockOrIdentifier::Identifier(BlockIdentifier::new( + // block.header().header().number, + // block.header().hash().0, + // )), + // ))); }, Poll::Ready(None) => { self.new_head_stream = None; @@ -93,13 +134,14 @@ where match new_head_stream.poll_next_unpin(cx) { Poll::Ready(Some(block)) => { // Convert block to block identifier - let block = { - let header = block.header(); - BlockIdentifier::new(header.number(), header.hash().0) - }; + // let block = { + // let header = block.header(); + // BlockIdentifier::new(header.number(), header.hash().0) + // }; self.new_head_stream = Some(new_head_stream); - Poll::Ready(Some(ClientEvent::NewHead(BlockOrIdentifier::Identifier(block)))) + Poll::Ready(Some(NewBlock::new_head(block))) + // Poll::Ready(Some(ClientEvent::NewHead(BlockOrIdentifier::Identifier(block)))) }, Poll::Ready(None) => { self.finalized_stream = None; diff --git a/chains/ethereum/server/src/finalized_block_stream.rs b/chains/ethereum/server/src/finalized_block_stream.rs index 10f8414c..c0417abf 100644 --- a/chains/ethereum/server/src/finalized_block_stream.rs +++ b/chains/ethereum/server/src/finalized_block_stream.rs @@ -88,9 +88,13 @@ impl Statistics { self.new += 1; true } else { - debug_assert!(new_block.number > expected, "Non monotonically increasing finalized block number"); + debug_assert!( + new_block.number > expected, + "Non monotonically increasing finalized block number" + ); // Cap the gap_size to `ADJUST_THRESHOLD`. - let gap_size = i32::try_from(new_block.number - expected).unwrap_or(1).min(ADJUST_THRESHOLD); + let gap_size = + i32::try_from(new_block.number - expected).unwrap_or(1).min(ADJUST_THRESHOLD); self.gaps += 1; self.adjust_threshold -= gap_size; true diff --git a/chains/ethereum/server/src/lib.rs b/chains/ethereum/server/src/lib.rs index a07ffafa..21c7c4d2 100644 --- a/chains/ethereum/server/src/lib.rs +++ b/chains/ethereum/server/src/lib.rs @@ -1,4 +1,5 @@ use anyhow::Result; +use block_stream::BlockStream; pub use client::EthereumClient; pub use rosetta_config_ethereum::{ EthereumMetadata, EthereumMetadataParams, Event, Query as EthQuery, QueryItem, @@ -21,6 +22,7 @@ mod log_filter; mod multi_block; mod new_heads; mod proof; +mod shared_stream; mod state; mod utils; @@ -107,7 +109,7 @@ impl MaybeWsEthereumClient { impl BlockchainClient for MaybeWsEthereumClient { type MetadataParams = EthereumMetadataParams; type Metadata = EthereumMetadata; - type EventStream<'a> = EthereumEventStream> where Self: 'a; + type EventStream<'a> = shared_stream::SharedStream>> where Self: 'a; type Call = EthQuery; type CallResult = EthQueryResult; diff --git a/chains/ethereum/server/src/multi_block.rs b/chains/ethereum/server/src/multi_block.rs index 0c0d08f0..e5fdaeff 100644 --- a/chains/ethereum/server/src/multi_block.rs +++ b/chains/ethereum/server/src/multi_block.rs @@ -5,6 +5,7 @@ use std::{ }; use crate::utils::{FullBlock, PartialBlock}; +use rosetta_core::{types::BlockIdentifier, BlockOrIdentifier}; use rosetta_ethereum_backend::ext::types::{ crypto::DefaultCrypto, Header, SealedHeader, H256, U256, }; @@ -183,3 +184,42 @@ impl Ord for BlockRef { } } } + +impl From<&'_ MultiBlock> for BlockRef { + fn from(block: &'_ MultiBlock) -> Self { + block.as_block_ref() + } +} + +impl From<&'_ SealedHeader> for BlockRef { + fn from(block: &'_ SealedHeader) -> Self { + Self { number: block.number(), hash: block.hash() } + } +} + +impl From<&'_ PartialBlock> for BlockRef { + fn from(block: &'_ PartialBlock) -> Self { + Self::from(block.header()) + } +} + +impl From<&'_ FullBlock> for BlockRef { + fn from(block: &'_ FullBlock) -> Self { + Self::from(block.header()) + } +} + +impl From<&'_ BlockIdentifier> for BlockRef { + fn from(identifier: &'_ BlockIdentifier) -> Self { + Self { number: identifier.index, hash: H256(identifier.hash) } + } +} + +impl From<&'_ BlockOrIdentifier> for BlockRef { + fn from(identifier: &'_ BlockOrIdentifier) -> Self { + match identifier { + BlockOrIdentifier::Identifier(id) => Self::from(id), + BlockOrIdentifier::Block(block) => Self::from(&block.block_identifier), + } + } +} diff --git a/chains/ethereum/server/src/shared_stream.rs b/chains/ethereum/server/src/shared_stream.rs new file mode 100644 index 00000000..ce68385d --- /dev/null +++ b/chains/ethereum/server/src/shared_stream.rs @@ -0,0 +1,180 @@ +use futures_util::{future::Shared, Future, FutureExt, Stream, StreamExt}; +use std::{ + pin::Pin, + sync::{Arc, Weak}, + task::{Context, Poll}, +}; +use tokio_stream::wrappers::{errors::BroadcastStreamRecvError, BroadcastStream}; + +pub struct SharedStream +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + inner: Inner, + stream: Option::Item>>, +} + +impl SharedStream +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + #[must_use] + pub fn new(stream: T, capacity: usize) -> Self { + let (tx, rx) = tokio::sync::broadcast::channel::<::Item>(capacity); + let inner = Inner::new(stream, tx); + Self { inner, stream: Some(BroadcastStream::new(rx)) } + } +} + +impl Stream for SharedStream +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + type Item = ::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Some(mut stream) = self.stream.take() else { + panic!("stream polled after completion"); + }; + + // Poll the transmitter + match self.inner.future.poll_unpin(cx) { + Poll::Ready(()) => return Poll::Ready(None), + Poll::Pending => {}, + } + + // Poll the receiver + loop { + match stream.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(value))) => { + self.stream = Some(stream); + break Poll::Ready(Some(value)); + }, + Poll::Ready(Some(Err(value))) => match value { + BroadcastStreamRecvError::Lagged(gap) => { + tracing::warn!("broadcast stream lagged by {gap} messages"); + continue; + }, + }, + Poll::Ready(None) => { + // Stream has ended + break Poll::Ready(None); + }, + Poll::Pending => { + self.stream = Some(stream); + break Poll::Pending; + }, + } + } + } +} + +impl Clone for SharedStream +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + stream: self.inner.outbound_channel.upgrade().map(|channel| { + let receiver = channel.subscribe(); + BroadcastStream::new(receiver) + }), + } + } +} + +struct Inner +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + future: Shared>, + /// Map of listener IDs to their respective channels + outbound_channel: Weak::Item>>, +} + +impl Inner +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + #[must_use] + pub fn new( + stream: T, + outbound_channel: tokio::sync::broadcast::Sender<::Item>, + ) -> Self { + let outbound_channel = Arc::new(outbound_channel); + let outbound_channel_ref = Arc::downgrade(&outbound_channel); + let future = BroadcastFuture { stream, outbound_channel: Some(outbound_channel) }; + Self { future: future.shared(), outbound_channel: outbound_channel_ref } + } +} + +impl Clone for Inner +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + fn clone(&self) -> Self { + Self { + future: Shared::clone(&self.future), + outbound_channel: self.outbound_channel.clone(), + } + } +} + +#[pin_project::pin_project] +struct BroadcastFuture +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + #[pin] + stream: T, + /// Map of listener IDs to their respective channels + outbound_channel: Option::Item>>>, +} + +impl Future for BroadcastFuture +where + T: Stream + Unpin, + T::Item: Clone + Send + Sync + 'static, +{ + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut this = self.project(); + + // Check if the stream has ended + let Some(outbound_channel) = this.outbound_channel.take() else { + panic!("future polled after completion"); + }; + + // Poll the stream + loop { + match this.stream.poll_next_unpin(cx) { + Poll::Ready(Some(value)) => { + // Broadcast the message to all listeners + // SAFETY: this should never happen, there must be always at least one listener + assert!( + outbound_channel.send(value).is_ok(), + "[report this bug] failed to broadcast message, no one is listening." + ); + }, + Poll::Ready(None) => { + // Stream has ended + break Poll::Ready(()); + }, + Poll::Pending => { + *this.outbound_channel = Some(outbound_channel); + break Poll::Pending; + }, + } + } + } +} diff --git a/chains/ethereum/server/src/state.rs b/chains/ethereum/server/src/state.rs index c7d9bbb8..b7733e48 100644 --- a/chains/ethereum/server/src/state.rs +++ b/chains/ethereum/server/src/state.rs @@ -1,5 +1,8 @@ #![allow(dead_code)] -use std::collections::{BTreeMap, VecDeque}; +use std::{ + collections::{BTreeMap, VecDeque}, + sync::{Arc, RwLock}, +}; use crate::multi_block::{BlockRef, MultiBlock}; use fork_tree::FinalizationResult; @@ -17,8 +20,34 @@ pub enum Error { BlockNotFound(H256), } -/// Manages the client state +#[derive(Debug, Clone)] pub struct State { + inner: Arc>, +} + +impl State { + pub fn new>(best_finalized_block: B) -> Self { + Self { inner: Arc::new(RwLock::new(StateInner::new(best_finalized_block))) } + } + + pub fn import>(&self, block: B) -> Result<(), fork_tree::Error> { + #[allow(clippy::unwrap_used)] + self.inner.write().unwrap().import(block) + } + + pub fn finalize>( + &self, + finalized_block_ref: B, + ) -> Result, fork_tree::Error> { + let finalized_block_ref = finalized_block_ref.into(); + #[allow(clippy::unwrap_used)] + self.inner.write().unwrap().finalize(finalized_block_ref) + } +} + +/// Manages the client state +#[derive(Debug, PartialEq)] +struct StateInner { /// Map of block hashes to their full block data blocks: HashMap, /// Maps an orphan block to missing block @@ -29,10 +58,12 @@ pub struct State { fork_tree: ForkTree, /// List of finalized finalized blocks finalized_blocks: VecDeque, + /// latest known block + latest_block: BlockRef, } -impl State { - pub fn new>(best_finalized_block: B) -> Self { +impl StateInner { + fn new>(best_finalized_block: B) -> Self { let best_finalized_block = best_finalized_block.into(); let best_finalized_block_ref = best_finalized_block.as_block_ref(); let best_finalized_block_parent = best_finalized_block.parent_hash(); @@ -69,6 +100,7 @@ impl State { missing: HashMap::new(), fork_tree, finalized_blocks, + latest_block: best_finalized_block_ref, } } @@ -124,7 +156,7 @@ impl State { } #[allow(clippy::too_many_lines)] - pub fn import>(&mut self, block: B) -> Result<(), fork_tree::Error> { + fn import>(&mut self, block: B) -> Result<(), fork_tree::Error> { let block = block.into(); // Check if the block is already in the cache, if so, update it @@ -234,7 +266,7 @@ impl State { Ok(()) } - pub fn finalize( + fn finalize( &mut self, finalized_block_ref: BlockRef, ) -> Result, fork_tree::Error> { @@ -373,7 +405,7 @@ mod tests { // // (where N is not a part of fork tree) let block_a = create_block(H256::zero(), 1, 1); - let mut state = State::new(block_a.clone()); + let state = State::new(block_a.clone()); let block_b = create_block(block_a.hash(), 2, 2); let block_c = create_block(block_b.hash(), 3, 3); let block_d = create_block(block_c.hash(), 4, 4); @@ -436,7 +468,7 @@ mod tests { // // (where N is not a part of fork tree) let block_a = create_block(H256::zero(), 1, 1); - let mut state = State::new(block_a.clone()); + let state = State::new(block_a.clone()); let block_b = create_block(block_a.hash(), 2, 2); let block_c = create_block(block_b.hash(), 3, 3); let block_d = create_block(block_c.hash(), 4, 4); @@ -476,8 +508,13 @@ mod tests { state.import(block).unwrap(); } - assert_eq!(state.orphans.len(), 4); - assert_eq!(state.missing.len(), 1); + #[allow(clippy::significant_drop_tightening)] + { + let inner = state.inner.read().unwrap(); + assert_eq!(inner.orphans.len(), 4); + assert_eq!(inner.missing.len(), 1); + drop(inner); + } // Finalize block A let retracted = state.finalize(block_a.as_block_ref()).unwrap(); diff --git a/rosetta-core/Cargo.toml b/rosetta-core/Cargo.toml index 8a79d25e..6b39d82b 100644 --- a/rosetta-core/Cargo.toml +++ b/rosetta-core/Cargo.toml @@ -9,6 +9,7 @@ description = "Provides traits and definitions shared by the server and client c [dependencies] anyhow = "1.0" async-trait = "0.1" +const-hex = { version = "1.9", default-features = false, features = ["alloc"] } fluent-uri = "0.1" futures-util = "0.3" rosetta-crypto.workspace = true diff --git a/rosetta-core/src/lib.rs b/rosetta-core/src/lib.rs index a393bfb0..a06fca8d 100644 --- a/rosetta-core/src/lib.rs +++ b/rosetta-core/src/lib.rs @@ -13,7 +13,7 @@ use anyhow::Result; use async_trait::async_trait; pub use futures_util::{future, stream}; use serde::{de::DeserializeOwned, Serialize}; -use std::sync::Arc; +use std::{fmt::Display, sync::Arc}; use futures_util::stream::Empty; pub use node_uri::{NodeUri, NodeUriError}; @@ -73,6 +73,18 @@ impl From for BlockOrIdentifier { } } +impl Display for BlockOrIdentifier +where + ID: Display, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Identifier(id) => Display::fmt(id, f), + Self::Block(block) => Display::fmt(&block.block_identifier, f), + } + } +} + /// Event produced by a handler. #[derive(Debug, Clone, PartialEq, Eq)] pub enum ClientEvent { diff --git a/rosetta-core/src/types.rs b/rosetta-core/src/types.rs index fbe645ae..40ce5354 100644 --- a/rosetta-core/src/types.rs +++ b/rosetta-core/src/types.rs @@ -11,7 +11,7 @@ pub use rosetta_types::{ SignatureType, TransactionIdentifier, }; -use std::vec::Vec; +use std::{fmt::Display, vec::Vec}; /// Block : Blocks contain an array of Transactions that occurred at a particular `BlockIdentifier`. /// A hard requirement for blocks returned by Rosetta implementations is that they MUST be @@ -54,6 +54,13 @@ impl BlockIdentifier { } } +impl Display for BlockIdentifier { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let hash_hex = const_hex::encode_prefixed(self.hash); + write!(f, "{}: {}", self.index, hash_hex) + } +} + /// `PartialBlockIdentifier` : When fetching data by `BlockIdentifier`, it may be possible to only /// specify the index or hash. If neither property is specified, it is assumed that the client is /// making a request at the current block. From 833beea6f8d4a16c4a828c09c1df00105b47e3c6 Mon Sep 17 00:00:00 2001 From: Lohann Paterno Coutinho Ferreira Date: Mon, 15 Jul 2024 11:05:01 -0300 Subject: [PATCH 4/8] Fix block stream --- chains/ethereum/server/src/block_stream.rs | 4 ++-- chains/ethereum/server/src/lib.rs | 3 +++ rosetta-core/src/lib.rs | 16 ++++++++++++++-- rosetta-core/src/types.rs | 12 +++++++++++- 4 files changed, 30 insertions(+), 5 deletions(-) diff --git a/chains/ethereum/server/src/block_stream.rs b/chains/ethereum/server/src/block_stream.rs index 47e31264..39e2c578 100644 --- a/chains/ethereum/server/src/block_stream.rs +++ b/chains/ethereum/server/src/block_stream.rs @@ -85,9 +85,9 @@ where } let event = if is_finalized { - ClientEvent::NewHead(block_id) - } else { ClientEvent::NewFinalized(block_id) + } else { + ClientEvent::NewHead(block_id) }; self.block_stream = Some(block_stream); break Poll::Ready(Some(event)); diff --git a/chains/ethereum/server/src/lib.rs b/chains/ethereum/server/src/lib.rs index 21c7c4d2..7a35fd06 100644 --- a/chains/ethereum/server/src/lib.rs +++ b/chains/ethereum/server/src/lib.rs @@ -40,6 +40,7 @@ pub mod ext { pub use rosetta_config_ethereum as config; pub use rosetta_core as core; pub use rosetta_ethereum_backend as backend; + pub use futures_util; } #[derive(Clone)] @@ -80,9 +81,11 @@ impl MaybeWsEthereumClient { ) -> Result { let uri = Url::parse(addr.as_ref())?; if uri.scheme() == "ws" || uri.scheme() == "wss" { + tracing::trace!("Initializing Ethereum client with Websocket at {uri}"); let client = default_client(uri.as_str(), None).await?; Self::from_jsonrpsee(config, client, private_key).await } else { + tracing::trace!("Initializing Ethereum client with Http at {uri}"); let http_connection = default_http_client(uri.as_str())?; // let http_connection = Http::new(uri); let client = EthereumClient::new(config, http_connection, private_key).await?; diff --git a/rosetta-core/src/lib.rs b/rosetta-core/src/lib.rs index a06fca8d..48c5cbdd 100644 --- a/rosetta-core/src/lib.rs +++ b/rosetta-core/src/lib.rs @@ -13,7 +13,7 @@ use anyhow::Result; use async_trait::async_trait; pub use futures_util::{future, stream}; use serde::{de::DeserializeOwned, Serialize}; -use std::{fmt::Display, sync::Arc}; +use std::{fmt::{Display, Debug}, sync::Arc}; use futures_util::stream::Empty; pub use node_uri::{NodeUri, NodeUriError}; @@ -41,7 +41,7 @@ pub struct BlockchainConfig { pub testnet: bool, } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, PartialEq, Eq)] pub enum BlockOrIdentifier { Identifier(ID), Block(Block), @@ -73,6 +73,18 @@ impl From for BlockOrIdentifier { } } +impl Debug for BlockOrIdentifier +where + ID: Debug, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Identifier(id) => Debug::fmt(id, f), + Self::Block(block) => Debug::fmt(&block.block_identifier, f), + } + } +} + impl Display for BlockOrIdentifier where ID: Display, diff --git a/rosetta-core/src/types.rs b/rosetta-core/src/types.rs index 40ce5354..008d7c8e 100644 --- a/rosetta-core/src/types.rs +++ b/rosetta-core/src/types.rs @@ -35,7 +35,7 @@ pub struct Block { } /// `BlockIdentifier` : The `block_identifier` uniquely identifies a block in a particular network. -#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)] +#[derive(Clone, PartialEq, Eq, Default, Serialize, Deserialize)] pub struct BlockIdentifier { /// This is also known as the block height. #[serde(rename = "index")] @@ -61,6 +61,16 @@ impl Display for BlockIdentifier { } } +impl Debug for BlockIdentifier { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { + let hash_hex = const_hex::encode_prefixed(self.hash); + f.debug_struct("BlockIdentifier") + .field("index", &self.index) + .field("hash", &hash_hex) + .finish() + } +} + /// `PartialBlockIdentifier` : When fetching data by `BlockIdentifier`, it may be possible to only /// specify the index or hash. If neither property is specified, it is assumed that the client is /// making a request at the current block. From 36653f5d0371da47fab90fc5ea5fc07aa5c155f4 Mon Sep 17 00:00:00 2001 From: Lohann Paterno Coutinho Ferreira Date: Mon, 15 Jul 2024 11:16:17 -0300 Subject: [PATCH 5/8] remove unused code --- chains/ethereum/server/src/event_stream.rs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/chains/ethereum/server/src/event_stream.rs b/chains/ethereum/server/src/event_stream.rs index 18cc2476..7ddfe042 100644 --- a/chains/ethereum/server/src/event_stream.rs +++ b/chains/ethereum/server/src/event_stream.rs @@ -109,12 +109,6 @@ where Poll::Ready(Some(block)) => { self.finalized_stream = Some(finalized_stream); return Poll::Ready(Some(NewBlock::new_finalized(block))); - // return Poll::Ready(Some(ClientEvent::NewFinalized( - // BlockOrIdentifier::Identifier(BlockIdentifier::new( - // block.header().header().number, - // block.header().hash().0, - // )), - // ))); }, Poll::Ready(None) => { self.new_head_stream = None; @@ -133,15 +127,8 @@ where match new_head_stream.poll_next_unpin(cx) { Poll::Ready(Some(block)) => { - // Convert block to block identifier - // let block = { - // let header = block.header(); - // BlockIdentifier::new(header.number(), header.hash().0) - // }; - self.new_head_stream = Some(new_head_stream); Poll::Ready(Some(NewBlock::new_head(block))) - // Poll::Ready(Some(ClientEvent::NewHead(BlockOrIdentifier::Identifier(block)))) }, Poll::Ready(None) => { self.finalized_stream = None; From 737945388f7031f203110df38d7ce494ed2e7bd3 Mon Sep 17 00:00:00 2001 From: Lohann Paterno Coutinho Ferreira Date: Mon, 15 Jul 2024 11:27:31 -0300 Subject: [PATCH 6/8] cargo update --- Cargo.lock | 129 ++++++++++++++++++++++++++--------------------------- deny.toml | 26 +++++++---- 2 files changed, 82 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0198ce1e..f569f9a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -193,7 +193,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -209,7 +209,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "syn-solidity", "tiny-keccak", ] @@ -225,7 +225,7 @@ dependencies = [ "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "syn-solidity", ] @@ -739,7 +739,7 @@ checksum = "6e0c28dcc82d7c8ead5cb13beb15405b57b8546e93215673ff8ca0349a028107" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -786,7 +786,7 @@ checksum = "3c87f3f15e7794432337fc718554eaa4dc8f04c9677a950ffe366f20a162ae42" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1049,9 +1049,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.0" +version = "1.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" +checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" dependencies = [ "serde", ] @@ -1111,13 +1111,12 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.0" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaff6f8ce506b9773fa786672d63fc7a191ffea1be33f72bbd4aeacefca9ffc8" +checksum = "324c74f2155653c90b04f25b2a47a8a631360cb908f92a772695f430c7e31052" dependencies = [ "jobserver", "libc", - "once_cell", ] [[package]] @@ -1557,7 +1556,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1605,7 +1604,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.11.1", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1627,7 +1626,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core 0.20.10", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1703,7 +1702,7 @@ checksum = "d65d7ce8132b7c0e54497a4d9a55a1c2a0912a0d786cf894472ba818fba45762" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1714,7 +1713,7 @@ checksum = "62d671cc41a825ebabc75757b62d3d168c577f9149b2d49ece1dad1f72119d25" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1727,7 +1726,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version 0.4.0", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -1820,7 +1819,7 @@ dependencies = [ "proc-macro2", "quote", "regex", - "syn 2.0.70", + "syn 2.0.71", "termcolor", "toml", "walkdir", @@ -2208,7 +2207,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "syn 2.0.70", + "syn 2.0.71", "toml", "walkdir", ] @@ -2226,7 +2225,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -2252,7 +2251,7 @@ dependencies = [ "serde", "serde_json", "strum 0.26.3", - "syn 2.0.70", + "syn 2.0.71", "tempfile", "thiserror", "tiny-keccak", @@ -2440,7 +2439,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -2762,7 +2761,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -3154,9 +3153,9 @@ dependencies = [ [[package]] name = "http-body" -version = "1.0.0" +version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", "http 1.1.0", @@ -3171,7 +3170,7 @@ dependencies = [ "bytes", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -3261,7 +3260,7 @@ dependencies = [ "futures-util", "h2 0.4.5", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "httparse", "itoa", "pin-project-lite", @@ -3314,7 +3313,7 @@ dependencies = [ "futures-channel", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "hyper 1.4.1", "pin-project-lite", "socket2 0.5.7", @@ -3738,7 +3737,7 @@ dependencies = [ "futures-timer", "futures-util", "http 1.1.0", - "http-body 1.0.0", + "http-body 1.0.1", "http-body-util", "jsonrpsee-types 0.24.0", "pin-project", @@ -3779,7 +3778,7 @@ checksum = "52dc99c70619e252e6adc5e95144323505a69a1742771de5b3f2071e1595b363" dependencies = [ "async-trait", "base64 0.22.1", - "http-body 1.0.0", + "http-body 1.0.1", "hyper 1.4.1", "hyper-rustls 0.27.2", "hyper-util", @@ -4396,7 +4395,7 @@ dependencies = [ "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4480,7 +4479,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4736,7 +4735,7 @@ dependencies = [ "phf_shared 0.11.2", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4774,7 +4773,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4855,7 +4854,7 @@ dependencies = [ "polkavm-common 0.8.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4867,7 +4866,7 @@ dependencies = [ "polkavm-common 0.9.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4877,7 +4876,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "15e85319a0d5129dc9f021c62607e0804f5fb777a05cdda44d750ac0732def66" dependencies = [ "polkavm-derive-impl 0.8.0", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4887,7 +4886,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ba81f7b5faac81e528eb6158a6f3c9e0bb1008e0ffa19653bc8dea925ecb429" dependencies = [ "polkavm-derive-impl 0.9.0", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -4968,7 +4967,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" dependencies = [ "proc-macro2", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -5265,7 +5264,7 @@ checksum = "bcc303e793d3734489387d205e9b186fac9c6cfacedd98cbb2e8a5943595f3e6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -6235,7 +6234,7 @@ dependencies = [ "proc-macro2", "quote", "scale-info", - "syn 2.0.70", + "syn 2.0.71", "thiserror", ] @@ -6371,9 +6370,9 @@ dependencies = [ [[package]] name = "security-framework" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c627723fd09706bacdb5cf41499e95098555af3c3c29d014dc3c458ef6be11c0" +checksum = "897b2245f0b511c87893af39b033e5ca9cce68824c4d7e7630b5a1d339658d02" dependencies = [ "bitflags 2.6.0", "core-foundation", @@ -6385,9 +6384,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.11.0" +version = "2.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "317936bbbd05227752583946b9e66d7ce3b489f84e11a94a510b4437fef407d7" +checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" dependencies = [ "core-foundation-sys", "libc", @@ -6473,7 +6472,7 @@ checksum = "e0cd7e117be63d3c3678776753929474f3b04a43a080c744d6b0ae2a8c28e222" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -6544,7 +6543,7 @@ dependencies = [ "darling 0.20.10", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -6983,7 +6982,7 @@ checksum = "48d09fa0a5f7299fb81ee25ae3853d26200f7a348148aed6de76be905c007dbe" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7115,7 +7114,7 @@ dependencies = [ "proc-macro-crate 3.1.0", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7401,7 +7400,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7488,7 +7487,7 @@ dependencies = [ "scale-info", "scale-typegen", "subxt-metadata", - "syn 2.0.70", + "syn 2.0.71", "thiserror", "tokio", ] @@ -7551,7 +7550,7 @@ dependencies = [ "quote", "scale-typegen", "subxt-codegen", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7621,9 +7620,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.70" +version = "2.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0209b68b3613b093e0ec905354eccaedcfe83b8cb37cbdeae64026c3064c16" +checksum = "b146dcf730474b4bcd16c311627b31ede9ab149045db4d6088b3becaea046462" dependencies = [ "proc-macro2", "quote", @@ -7639,7 +7638,7 @@ dependencies = [ "paste", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7726,22 +7725,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c546c80d6be4bc6a00c0f01730c08df82eaa7a7a61f11d656526506112cc1709" +checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.61" +version = "1.0.62" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46c3384250002a6d5af4d114f2845d37b57521033f30d5c3f46c4d70e1197533" +checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -7874,7 +7873,7 @@ checksum = "5f5ae998a069d4b5aba8ee9dad856af7d520c3699e6159b185c2acd48155d39a" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -8093,7 +8092,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -8512,7 +8511,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "wasm-bindgen-shared", ] @@ -8546,7 +8545,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -9117,7 +9116,7 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] @@ -9137,7 +9136,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.70", + "syn 2.0.71", ] [[package]] diff --git a/deny.toml b/deny.toml index 0cce99c9..2097cfdb 100644 --- a/deny.toml +++ b/deny.toml @@ -6,6 +6,8 @@ targets = [ ] [licenses] # ----------------------------------------------------------------- # +version = 2 + # The lint level for crates which do not have a detectable license unlicensed = "deny" @@ -90,18 +92,20 @@ wildcards = "allow" highlight = "all" [advisories] # --------------------------------------------------------------- # +version = 2 + # The path where the advisory database is cloned/fetched into db-path = "~/.cargo/advisory-db" # The url(s) of the advisory databases to use db-urls = ["https://github.com/rustsec/advisory-db"] -# The lint level for security vulnerabilities -vulnerability = "deny" - # The lint level for unmaintained crates unmaintained = "deny" +# The lint level for crates that have been marked as unsound in an advisory database +unsound = "deny" + # The lint level for crates that have been yanked from their source registry yanked = "deny" @@ -113,11 +117,17 @@ notice = "deny" # A list of advisory IDs to ignore. Note that ignored advisories will still # output a note when they are encountered. ignore = [ - 'RUSTSEC-2021-0060', # Create `aes-soft` has been merged into the `aes` crate - 'RUSTSEC-2021-0064', # Crate `cpuid-bool` has been renamed to `cpufeatures` - 'RUSTSEC-2021-0139', # ansi_term is Unmaintained - 'RUSTSEC-2022-0093', # related issue: https://github.com/Analog-Labs/chain-connectors/issues/162 - 'RUSTSEC-2024-0344', # Timing variabilit on curve25519-dalek, which can potentially leak private keys + # Create `aes-soft` has been merged into the `aes` crate + { id = 'RUSTSEC-2021-0060', reason = "Will be fixed in a future PR" }, + + # Crate `cpuid-bool` has been renamed to `cpufeatures` + { id = 'RUSTSEC-2021-0064', reason = "Will be fixed in a future PR" }, + + # ansi_term is Unmaintained + { id = 'RUSTSEC-2021-0139', reason = "Will be fixed in a future PR" }, + + # Timing variabilit on curve25519-dalek, which can potentially leak private keys + { id = 'RUSTSEC-2024-0344', reason = "Waiting for third-part libraries to update to the fixed version" }, ] # This section is considered when running `cargo deny check sources`. From d11744a7d5018521436b0261d97607ef5ba4f5db Mon Sep 17 00:00:00 2001 From: Lohann Paterno Coutinho Ferreira Date: Mon, 15 Jul 2024 11:37:05 -0300 Subject: [PATCH 7/8] Remove deprecated cargo deny keys --- deny.toml | 32 +------------------------------- 1 file changed, 1 insertion(+), 31 deletions(-) diff --git a/deny.toml b/deny.toml index 2097cfdb..6bcefd73 100644 --- a/deny.toml +++ b/deny.toml @@ -1,3 +1,4 @@ +[graph] # cargo-deny is really only ever intended to run on the "normal" tier-1 targets targets = [ { triple = "x86_64-unknown-linux-gnu" }, @@ -8,9 +9,6 @@ targets = [ [licenses] # ----------------------------------------------------------------- # version = 2 -# The lint level for crates which do not have a detectable license -unlicensed = "deny" - # List of explicitly allowed licenses # See https://spdx.org/licenses/ for list of possible licenses # [possible values: any SPDX 3.11 short identifier (+ optional exception)]. @@ -27,27 +25,10 @@ allow = [ "Unicode-DFS-2016", ] -# Lint level for licenses considered copyleft -copyleft = "deny" -# Blanket approval or denial for OSI-approved or FSF Free/Libre licenses -# * both - The license will be approved if it is both OSI-approved *AND* FSF -# * either - The license will be approved if it is either OSI-approved *OR* FSF -# * osi-only - The license will be approved if is OSI-approved *AND NOT* FSF -# * fsf-only - The license will be approved if is FSF *AND NOT* OSI-approved -# * neither - This predicate is ignored and the default lint level is used - -allow-osi-fsf-free = "neither" -# Lint level used when no other predicates are matched -# 1. License isn't in the allow or deny lists -# 2. License isn't copyleft -# 3. License isn't OSI/FSF, or allow-osi-fsf-free = "neither" - -default = "deny" # The confidence threshold for detecting a license from license text. # The higher the value, the more closely the license text must be to the # canonical license text of a valid SPDX license file. # [possible values: any between 0.0 and 1.0]. - confidence-threshold = 0.9 # Allow 1 or more licenses on a per-crate basis, so that particular licenses # aren't accepted for every possible crate as with the normal allow list @@ -100,20 +81,9 @@ db-path = "~/.cargo/advisory-db" # The url(s) of the advisory databases to use db-urls = ["https://github.com/rustsec/advisory-db"] -# The lint level for unmaintained crates -unmaintained = "deny" - -# The lint level for crates that have been marked as unsound in an advisory database -unsound = "deny" - # The lint level for crates that have been yanked from their source registry yanked = "deny" -# The lint level for crates with security notices. Note that as of -# 2019-12-17 there are no security notice advisories in -# https://github.com/rustsec/advisory-db -notice = "deny" - # A list of advisory IDs to ignore. Note that ignored advisories will still # output a note when they are encountered. ignore = [ From f9b651ac88e34717e8c2e9587f89ac7c3765e37c Mon Sep 17 00:00:00 2001 From: Lohann Paterno Coutinho Ferreira Date: Mon, 15 Jul 2024 11:41:40 -0300 Subject: [PATCH 8/8] Remove unused code --- chains/ethereum/server/src/block_stream.rs | 1 - .../server/src/finalized_block_stream.rs | 1 - chains/ethereum/server/src/lib.rs | 2 +- chains/ethereum/server/src/logs_stream.rs | 1 - chains/ethereum/server/src/multi_block.rs | 1 - chains/ethereum/server/src/new_heads.rs | 1 - chains/ethereum/server/src/state.rs | 42 ------------------- rosetta-core/src/lib.rs | 5 ++- 8 files changed, 5 insertions(+), 49 deletions(-) diff --git a/chains/ethereum/server/src/block_stream.rs b/chains/ethereum/server/src/block_stream.rs index 39e2c578..00920549 100644 --- a/chains/ethereum/server/src/block_stream.rs +++ b/chains/ethereum/server/src/block_stream.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use super::{ event_stream::{EthereumEventStream, NewBlock}, state::State, diff --git a/chains/ethereum/server/src/finalized_block_stream.rs b/chains/ethereum/server/src/finalized_block_stream.rs index c0417abf..ef3e7d08 100644 --- a/chains/ethereum/server/src/finalized_block_stream.rs +++ b/chains/ethereum/server/src/finalized_block_stream.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use crate::utils::{EthereumRpcExt, PartialBlock}; use futures_timer::Delay; use futures_util::{future::BoxFuture, FutureExt, Stream}; diff --git a/chains/ethereum/server/src/lib.rs b/chains/ethereum/server/src/lib.rs index 7a35fd06..650cf95e 100644 --- a/chains/ethereum/server/src/lib.rs +++ b/chains/ethereum/server/src/lib.rs @@ -37,10 +37,10 @@ pub mod config { #[doc(hidden)] pub mod ext { pub use anyhow; + pub use futures_util; pub use rosetta_config_ethereum as config; pub use rosetta_core as core; pub use rosetta_ethereum_backend as backend; - pub use futures_util; } #[derive(Clone)] diff --git a/chains/ethereum/server/src/logs_stream.rs b/chains/ethereum/server/src/logs_stream.rs index c05a8d7f..6533cb79 100644 --- a/chains/ethereum/server/src/logs_stream.rs +++ b/chains/ethereum/server/src/logs_stream.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use futures_util::{future::BoxFuture, Stream, StreamExt}; use rosetta_ethereum_backend::{ ext::types::{crypto::DefaultCrypto, rpc::RpcBlock, AtBlock, SealedBlock, H256}, diff --git a/chains/ethereum/server/src/multi_block.rs b/chains/ethereum/server/src/multi_block.rs index e5fdaeff..ef48e3a4 100644 --- a/chains/ethereum/server/src/multi_block.rs +++ b/chains/ethereum/server/src/multi_block.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use std::{ cmp::Ordering, hash::{Hash, Hasher}, diff --git a/chains/ethereum/server/src/new_heads.rs b/chains/ethereum/server/src/new_heads.rs index 3916c6ae..080cbc42 100644 --- a/chains/ethereum/server/src/new_heads.rs +++ b/chains/ethereum/server/src/new_heads.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt}; use rosetta_ethereum_backend::{ ext::types::{crypto::DefaultCrypto, rpc::RpcBlock, AtBlock, SealedBlock, H256}, diff --git a/chains/ethereum/server/src/state.rs b/chains/ethereum/server/src/state.rs index b7733e48..99bf6269 100644 --- a/chains/ethereum/server/src/state.rs +++ b/chains/ethereum/server/src/state.rs @@ -1,4 +1,3 @@ -#![allow(dead_code)] use std::{ collections::{BTreeMap, VecDeque}, sync::{Arc, RwLock}, @@ -11,9 +10,6 @@ use rosetta_config_ethereum::ext::types::H256; type ForkTree = fork_tree::ForkTree; -/// Maximum number of blocks that can be skipped when importing a block -const MAX_BLOCK_GAP: u64 = 1000; - #[derive(Debug, PartialEq, Eq, thiserror::Error)] pub enum Error { #[error("block not found: {0}")] @@ -117,44 +113,6 @@ impl StateInner { Ok(()) } - fn insert_orphan_block( - &mut self, - block: MultiBlock, - mut children: BTreeMap, - ) { - // Add block to the orphan list - let missing_ref = if let Some(parent_ref) = self.orphans.get(&block.parent_ref()).copied() { - self.orphans.insert(block.as_block_ref(), parent_ref); - parent_ref - } else { - let parent_ref = block.parent_ref(); - self.orphans.insert(block.as_block_ref(), parent_ref); - parent_ref - }; - - // Update children missing references - for child_ref in children.keys().copied() { - self.orphans.insert(child_ref, missing_ref); - } - - // Add block to the orphan list - match self.missing.entry(missing_ref) { - Entry::Occupied(mut entry) => { - let orphans = entry.get_mut(); - if let Some(cached) = orphans.get_mut(&block.as_block_ref()) { - cached.upgrade(block); - } else { - orphans.insert(block.as_block_ref(), block); - } - orphans.extend(children); - }, - Entry::Vacant(entry) => { - children.insert(block.as_block_ref(), block); - entry.insert(children); - }, - } - } - #[allow(clippy::too_many_lines)] fn import>(&mut self, block: B) -> Result<(), fork_tree::Error> { let block = block.into(); diff --git a/rosetta-core/src/lib.rs b/rosetta-core/src/lib.rs index 48c5cbdd..55528713 100644 --- a/rosetta-core/src/lib.rs +++ b/rosetta-core/src/lib.rs @@ -13,7 +13,10 @@ use anyhow::Result; use async_trait::async_trait; pub use futures_util::{future, stream}; use serde::{de::DeserializeOwned, Serialize}; -use std::{fmt::{Display, Debug}, sync::Arc}; +use std::{ + fmt::{Debug, Display}, + sync::Arc, +}; use futures_util::stream::Empty; pub use node_uri::{NodeUri, NodeUriError};