From 14f6ddc6c2c6c861e4ec8cbfe6b865353986068f Mon Sep 17 00:00:00 2001 From: Bear Wang Date: Wed, 15 May 2024 15:21:01 +0800 Subject: [PATCH] Fix inconsistent `latest_block_hash` (#1424) * Add `best_hash` method * Update earliest tag matched value * Fix CI tests * Code clean * Fix review comments * Update comment * Fix compile after solving conflicts * Remove the broken changes * Update trait bound * Simplify trait bound --- client/api/src/backend.rs | 3 +++ client/cli/src/frontier_db_cmd/mapping_db.rs | 13 ++++++------ client/cli/src/frontier_db_cmd/meta_db.rs | 11 +++++----- client/cli/src/frontier_db_cmd/mod.rs | 7 +++---- client/cli/src/frontier_db_cmd/tests.rs | 4 ++-- client/db/src/kv/mod.rs | 21 +++++++++++--------- client/db/src/kv/upgrade.rs | 6 ++++-- client/db/src/lib.rs | 8 +++++--- client/db/src/sql/mod.rs | 12 +++++++++-- client/mapping-sync/src/kv/mod.rs | 9 +++++---- client/mapping-sync/src/kv/worker.rs | 8 ++------ client/mapping-sync/src/sql/mod.rs | 2 +- client/rpc/src/lib.rs | 12 ++++++++--- template/node/src/command.rs | 2 +- template/node/src/eth.rs | 10 +++++----- template/node/src/service.rs | 15 +++++++------- ts-tests/tests/test-contract-methods.ts | 1 + 17 files changed, 83 insertions(+), 61 deletions(-) diff --git a/client/api/src/backend.rs b/client/api/src/backend.rs index 7ad26b95a8..7bdfb72beb 100644 --- a/client/api/src/backend.rs +++ b/client/api/src/backend.rs @@ -52,6 +52,9 @@ pub trait Backend: Send + Sync { fn is_indexed(&self) -> bool { self.log_indexer().is_indexed() } + + /// Get the hash of the latest substrate block fully indexed by the backend. + async fn latest_block_hash(&self) -> Result; } #[derive(Debug, Eq, PartialEq)] diff --git a/client/cli/src/frontier_db_cmd/mapping_db.rs b/client/cli/src/frontier_db_cmd/mapping_db.rs index 0b1e56410c..511837f74a 100644 --- a/client/cli/src/frontier_db_cmd/mapping_db.rs +++ b/client/cli/src/frontier_db_cmd/mapping_db.rs @@ -40,22 +40,21 @@ pub enum MappingKey { EthBlockOrTransactionHash(H256), } -pub struct MappingDb<'a, C, B: BlockT> { +pub struct MappingDb<'a, B: BlockT, C> { cmd: &'a FrontierDbCmd, client: Arc, - backend: Arc>, + backend: Arc>, } -impl<'a, C, B: BlockT> MappingDb<'a, C, B> +impl<'a, B: BlockT, C> MappingDb<'a, B, C> where - C: ProvideRuntimeApi, + C: HeaderBackend + ProvideRuntimeApi, C::Api: EthereumRuntimeRPCApi, - C: HeaderBackend, { pub fn new( cmd: &'a FrontierDbCmd, client: Arc, - backend: Arc>, + backend: Arc>, ) -> Self { Self { cmd, @@ -176,4 +175,4 @@ where } } -impl<'a, C, B: BlockT> FrontierDbMessage for MappingDb<'a, C, B> {} +impl<'a, B: BlockT, C: HeaderBackend> FrontierDbMessage for MappingDb<'a, B, C> {} diff --git a/client/cli/src/frontier_db_cmd/meta_db.rs b/client/cli/src/frontier_db_cmd/meta_db.rs index 878bd101ac..8abe3d7e6d 100644 --- a/client/cli/src/frontier_db_cmd/meta_db.rs +++ b/client/cli/src/frontier_db_cmd/meta_db.rs @@ -24,6 +24,7 @@ use std::{ use ethereum_types::H256; use serde::Deserialize; +use sp_blockchain::HeaderBackend; // Substrate use sp_runtime::traits::Block as BlockT; @@ -57,13 +58,13 @@ impl FromStr for MetaKey { } } -pub struct MetaDb<'a, B: BlockT> { +pub struct MetaDb<'a, B: BlockT, C> { cmd: &'a FrontierDbCmd, - backend: Arc>, + backend: Arc>, } -impl<'a, B: BlockT> MetaDb<'a, B> { - pub fn new(cmd: &'a FrontierDbCmd, backend: Arc>) -> Self { +impl<'a, B: BlockT, C: HeaderBackend> MetaDb<'a, B, C> { + pub fn new(cmd: &'a FrontierDbCmd, backend: Arc>) -> Self { Self { cmd, backend } } @@ -151,4 +152,4 @@ impl<'a, B: BlockT> MetaDb<'a, B> { } } -impl<'a, B: BlockT> FrontierDbMessage for MetaDb<'a, B> {} +impl<'a, B: BlockT, C: HeaderBackend> FrontierDbMessage for MetaDb<'a, B, C> {} diff --git a/client/cli/src/frontier_db_cmd/mod.rs b/client/cli/src/frontier_db_cmd/mod.rs index a82436b6f3..4453625337 100644 --- a/client/cli/src/frontier_db_cmd/mod.rs +++ b/client/cli/src/frontier_db_cmd/mod.rs @@ -98,15 +98,14 @@ pub enum DbValue { } impl FrontierDbCmd { - pub fn run( + pub fn run( &self, client: Arc, - backend: Arc>, + backend: Arc>, ) -> sc_cli::Result<()> where - C: ProvideRuntimeApi, + C: HeaderBackend + ProvideRuntimeApi, C::Api: fp_rpc::EthereumRuntimeRPCApi, - C: HeaderBackend, { match self.column { Column::Meta => { diff --git a/client/cli/src/frontier_db_cmd/tests.rs b/client/cli/src/frontier_db_cmd/tests.rs index 2f9078a45e..9ebf9db74a 100644 --- a/client/cli/src/frontier_db_cmd/tests.rs +++ b/client/cli/src/frontier_db_cmd/tests.rs @@ -49,8 +49,8 @@ type OpaqueBlock = pub fn open_frontier_backend>( client: Arc, path: PathBuf, -) -> Result>, String> { - Ok(Arc::new(fc_db::kv::Backend::::new( +) -> Result>, String> { + Ok(Arc::new(fc_db::kv::Backend::::new( client, &fc_db::kv::DatabaseSettings { source: sc_client_db::DatabaseSource::RocksDb { diff --git a/client/db/src/kv/mod.rs b/client/db/src/kv/mod.rs index c70ec5f51d..23710104f0 100644 --- a/client/db/src/kv/mod.rs +++ b/client/db/src/kv/mod.rs @@ -62,14 +62,15 @@ pub mod static_keys { } #[derive(Clone)] -pub struct Backend { +pub struct Backend { + client: Arc, meta: Arc>, mapping: Arc>, log_indexer: LogIndexerBackend, } #[async_trait::async_trait] -impl fc_api::Backend for Backend { +impl> fc_api::Backend for Backend { async fn block_hash( &self, ethereum_block_hash: &H256, @@ -88,6 +89,10 @@ impl fc_api::Backend for Backend { fn log_indexer(&self) -> &dyn fc_api::LogIndexerBackend { &self.log_indexer } + + async fn latest_block_hash(&self) -> Result { + Ok(self.client.info().best_hash) + } } #[derive(Clone, Default)] @@ -115,8 +120,8 @@ pub fn frontier_database_dir(db_config_dir: &Path, db_path: &str) -> PathBuf { db_config_dir.join("frontier").join(db_path) } -impl Backend { - pub fn open>( +impl> Backend { + pub fn open( client: Arc, database: &DatabaseSource, db_config_dir: &Path, @@ -148,13 +153,11 @@ impl Backend { ) } - pub fn new>( - client: Arc, - config: &DatabaseSettings, - ) -> Result { - let db = utils::open_database::(client, config)?; + pub fn new(client: Arc, config: &DatabaseSettings) -> Result { + let db = utils::open_database::(client.clone(), config)?; Ok(Self { + client, mapping: Arc::new(MappingDb { db: db.clone(), write_lock: Arc::new(Mutex::new(())), diff --git a/client/db/src/kv/upgrade.rs b/client/db/src/kv/upgrade.rs index 0799d12488..01c809738d 100644 --- a/client/db/src/kv/upgrade.rs +++ b/client/db/src/kv/upgrade.rs @@ -348,8 +348,10 @@ mod tests { pub fn open_frontier_backend>( client: Arc, setting: &crate::kv::DatabaseSettings, - ) -> Result>, String> { - Ok(Arc::new(crate::kv::Backend::::new(client, setting)?)) + ) -> Result>, String> { + Ok(Arc::new(crate::kv::Backend::::new( + client, setting, + )?)) } #[cfg_attr(not(feature = "rocksdb"), ignore)] diff --git a/client/db/src/lib.rs b/client/db/src/lib.rs index c15f22ee74..9079b19839 100644 --- a/client/db/src/lib.rs +++ b/client/db/src/lib.rs @@ -18,6 +18,8 @@ #![warn(unused_crate_dependencies)] +use std::sync::Arc; + // Substrate pub use sc_client_db::DatabaseSource; use sp_runtime::traits::Block as BlockT; @@ -27,8 +29,8 @@ pub mod kv; pub mod sql; #[derive(Clone)] -pub enum Backend { - KeyValue(kv::Backend), +pub enum Backend { + KeyValue(Arc>), #[cfg(feature = "sql")] - Sql(sql::Backend), + Sql(Arc>), } diff --git a/client/db/src/sql/mod.rs b/client/db/src/sql/mod.rs index b848aa5bde..4ae91acc6c 100644 --- a/client/db/src/sql/mod.rs +++ b/client/db/src/sql/mod.rs @@ -96,7 +96,6 @@ pub enum BackendConfig<'a> { pub struct Backend { /// The Sqlite connection. pool: SqlitePool, - /// The additional overrides for the logs handler. overrides: Arc>, @@ -687,7 +686,7 @@ where } /// Retrieve the block hash for the last indexed canon block. - pub async fn get_last_indexed_canon_block(&self) -> Result { + pub async fn last_indexed_canon_block(&self) -> Result { let row = sqlx::query( "SELECT b.substrate_block_hash FROM blocks AS b INNER JOIN sync_status AS s @@ -854,6 +853,15 @@ impl> fc_api::Backend for Backend { fn log_indexer(&self) -> &dyn fc_api::LogIndexerBackend { self } + + async fn latest_block_hash(&self) -> Result { + // Retrieves the block hash for the latest indexed block, maybe it's not canon. + sqlx::query("SELECT substrate_block_hash FROM blocks ORDER BY block_number DESC LIMIT 1") + .fetch_one(self.pool()) + .await + .map(|row| H256::from_slice(&row.get::, _>(0)[..])) + .map_err(|e| format!("Failed to fetch best hash: {}", e)) + } } #[async_trait::async_trait] diff --git a/client/mapping-sync/src/kv/mod.rs b/client/mapping-sync/src/kv/mod.rs index 5aea3c7a5b..34cdbeeb7b 100644 --- a/client/mapping-sync/src/kv/mod.rs +++ b/client/mapping-sync/src/kv/mod.rs @@ -111,11 +111,11 @@ where pub fn sync_genesis_block( client: &C, - backend: &fc_db::kv::Backend, + backend: &fc_db::kv::Backend, header: &Block::Header, ) -> Result<(), String> where - C: ProvideRuntimeApi, + C: HeaderBackend + ProvideRuntimeApi, C::Api: EthereumRuntimeRPCApi, { let substrate_block_hash = header.hash(); @@ -282,13 +282,14 @@ where Ok(synced_any) } -pub fn fetch_header( +pub fn fetch_header( substrate_backend: &BE, - frontier_backend: &fc_db::kv::Backend, + frontier_backend: &fc_db::kv::Backend, checking_tip: Block::Hash, sync_from: ::Number, ) -> Result, String> where + C: HeaderBackend, BE: HeaderBackend, { if frontier_backend.mapping().is_synced(&checking_tip)? { diff --git a/client/mapping-sync/src/kv/worker.rs b/client/mapping-sync/src/kv/worker.rs index 4760f3229f..08d94f4f21 100644 --- a/client/mapping-sync/src/kv/worker.rs +++ b/client/mapping-sync/src/kv/worker.rs @@ -70,10 +70,6 @@ impl MappingSyncWorker { overrides: Arc>, frontier_backend: Arc>, retry_times: usize, - sync_from: ::Number, - strategy: SyncStrategy, - sync_oracle: Arc, - pubsub_notification_sinks: Arc< crate::EthereumBlockNotificationSinks>, >, ) -> Self { @@ -259,7 +255,7 @@ mod tests { }); let frontier_backend = Arc::new( - fc_db::kv::Backend::::new( + fc_db::kv::Backend::::new( client.clone(), &fc_db::kv::DatabaseSettings { source: sc_client_db::DatabaseSource::RocksDb { @@ -409,7 +405,7 @@ mod tests { }); let frontier_backend = Arc::new( - fc_db::kv::Backend::::new( + fc_db::kv::Backend::::new( client.clone(), &fc_db::kv::DatabaseSettings { source: sc_client_db::DatabaseSource::RocksDb { diff --git a/client/mapping-sync/src/sql/mod.rs b/client/mapping-sync/src/sql/mod.rs index 8840e9cc9f..dc30ddaeba 100644 --- a/client/mapping-sync/src/sql/mod.rs +++ b/client/mapping-sync/src/sql/mod.rs @@ -89,7 +89,7 @@ where match cmd { WorkerCommand::ResumeSync => { // Attempt to resume from last indexed block. If there is no data in the db, sync genesis. - match indexer_backend.get_last_indexed_canon_block().await.ok() { + match indexer_backend.last_indexed_canon_block().await.ok() { Some(last_block_hash) => { log::debug!(target: "frontier-sql", "Resume from last block {last_block_hash:?}"); if let Some(parent_hash) = client diff --git a/client/rpc/src/lib.rs b/client/rpc/src/lib.rs index afc7df5c56..e678f309c7 100644 --- a/client/rpc/src/lib.rs +++ b/client/rpc/src/lib.rs @@ -208,7 +208,13 @@ pub mod frontier_backend_client { } } BlockNumberOrHash::Num(number) => Some(BlockId::Number(number.unique_saturated_into())), - BlockNumberOrHash::Latest => Some(BlockId::Hash(client.info().best_hash)), + BlockNumberOrHash::Latest => match backend.latest_block_hash().await { + Ok(hash) => Some(BlockId::Hash(hash)), + Err(e) => { + log::warn!(target: "rpc", "Failed to get latest block hash from the sql db: {:?}", e); + Some(BlockId::Hash(client.info().best_hash)) + } + }, BlockNumberOrHash::Earliest => Some(BlockId::Hash(client.info().genesis_hash)), BlockNumberOrHash::Pending => None, BlockNumberOrHash::Safe => Some(BlockId::Hash(client.info().finalized_hash)), @@ -366,8 +372,8 @@ mod tests { fn open_frontier_backend>( client: Arc, path: PathBuf, - ) -> Result>, String> { - Ok(Arc::new(fc_db::kv::Backend::::new( + ) -> Result>, String> { + Ok(Arc::new(fc_db::kv::Backend::::new( client, &fc_db::kv::DatabaseSettings { source: sc_client_db::DatabaseSource::RocksDb { diff --git a/template/node/src/command.rs b/template/node/src/command.rs index 55c99240e3..8d97fd3dd4 100644 --- a/template/node/src/command.rs +++ b/template/node/src/command.rs @@ -234,7 +234,7 @@ pub fn run() -> sc_cli::Result<()> { let (client, _, _, _, frontier_backend) = service::new_chain_ops(&mut config, &cli.eth)?; let frontier_backend = match frontier_backend { - fc_db::Backend::KeyValue(kv) => std::sync::Arc::new(kv), + fc_db::Backend::KeyValue(kv) => kv, _ => panic!("Only fc_db::Backend::KeyValue supported"), }; cmd.run(client, frontier_backend) diff --git a/template/node/src/eth.rs b/template/node/src/eth.rs index 7400943a91..565e7f979a 100644 --- a/template/node/src/eth.rs +++ b/template/node/src/eth.rs @@ -22,7 +22,7 @@ use frontier_template_runtime::opaque::Block; use crate::client::{FullBackend, FullClient}; /// Frontier DB backend type. -pub type FrontierBackend = fc_db::Backend; +pub type FrontierBackend = fc_db::Backend; pub fn db_config_dir(config: &Configuration) -> PathBuf { config.base_path.config_dir(config.chain_spec.id()) @@ -126,7 +126,7 @@ pub async fn spawn_frontier_tasks( task_manager: &TaskManager, client: Arc>, backend: Arc, - frontier_backend: FrontierBackend, + frontier_backend: Arc>>, filter_pool: Option, overrides: Arc>, fee_history_cache: FeeHistoryCache, @@ -144,7 +144,7 @@ pub async fn spawn_frontier_tasks( Executor: NativeExecutionDispatch + 'static, { // Spawn main mapping sync worker background task. - match frontier_backend { + match &*frontier_backend { fc_db::Backend::KeyValue(b) => { task_manager.spawn_essential_handle().spawn( "frontier-mapping-sync-worker", @@ -172,10 +172,10 @@ pub async fn spawn_frontier_tasks( fc_mapping_sync::sql::SyncWorker::run( client.clone(), backend, - Arc::new(b), + b.clone(), client.import_notification_stream(), fc_mapping_sync::sql::SyncWorkerConfig { - read_notification_timeout: Duration::from_secs(10), + read_notification_timeout: Duration::from_secs(30), check_indexed_blocks_interval: Duration::from_secs(60), }, fc_mapping_sync::SyncStrategy::Parachain, diff --git a/template/node/src/service.rs b/template/node/src/service.rs index c6c856f931..522441b86f 100644 --- a/template/node/src/service.rs +++ b/template/node/src/service.rs @@ -118,11 +118,11 @@ where let overrides = crate::rpc::overrides_handle(client.clone()); let frontier_backend = match eth_config.frontier_backend_type { - BackendType::KeyValue => FrontierBackend::KeyValue(fc_db::kv::Backend::open( + BackendType::KeyValue => FrontierBackend::KeyValue(Arc::new(fc_db::kv::Backend::open( Arc::clone(&client), &config.database, &db_config_dir(config), - )?), + )?)), BackendType::Sql => { let db_path = db_config_dir(config).join("sql"); std::fs::create_dir_all(&db_path).expect("failed creating sql db directory"); @@ -142,7 +142,7 @@ where overrides.clone(), )) .unwrap_or_else(|err| panic!("failed creating sql backend: {:?}", err)); - FrontierBackend::Sql(backend) + FrontierBackend::Sql(Arc::new(backend)) } }; @@ -349,6 +349,7 @@ where let role = config.role.clone(); let force_authoring = config.force_authoring; let name = config.network.node_name.clone(); + let frontier_backend = Arc::new(frontier_backend); let enable_grandpa = !config.disable_grandpa && sealing.is_none(); let prometheus_registry = config.prometheus_registry().cloned(); @@ -414,9 +415,9 @@ where enable_dev_signer, network: network.clone(), sync: sync_service.clone(), - frontier_backend: match frontier_backend.clone() { - fc_db::Backend::KeyValue(b) => Arc::new(b), - fc_db::Backend::Sql(b) => Arc::new(b), + frontier_backend: match &*frontier_backend { + fc_db::Backend::KeyValue(b) => b.clone(), + fc_db::Backend::Sql(b) => b.clone(), }, overrides: overrides.clone(), block_data_cache: block_data_cache.clone(), @@ -710,7 +711,7 @@ pub fn new_chain_ops( Arc, BasicQueue, TaskManager, - FrontierBackend, + FrontierBackend, ), ServiceError, > { diff --git a/ts-tests/tests/test-contract-methods.ts b/ts-tests/tests/test-contract-methods.ts index d8da2273f2..9eb5c526a9 100644 --- a/ts-tests/tests/test-contract-methods.ts +++ b/ts-tests/tests/test-contract-methods.ts @@ -74,6 +74,7 @@ describeWithFrontier("Frontier RPC (Contract Methods)", (context) => { let number = (await context.web3.eth.getBlock("latest")).number; let last = number + BLOCK_HASH_COUNT; for (let i = number; i <= last; i++) { + await new Promise((resolve) => setTimeout(resolve, 60)); let hash = (await context.web3.eth.getBlock("latest")).hash; expect(await contract.methods.blockHash(i).call()).to.eq(hash); await createAndFinalizeBlockNowait(context.web3);