Skip to content

Commit

Permalink
Fix inconsistent latest_block_hash (polkadot-evm#1424)
Browse files Browse the repository at this point in the history
* Add `best_hash` method

* Update earliest tag matched value

* Fix CI tests

* Code clean

* Fix review comments

* Update comment

* Fix compile after solving conflicts

* Remove the broken changes

* Update trait bound

* Simplify trait bound
  • Loading branch information
boundless-forest committed Sep 23, 2024
1 parent cb1c8a7 commit 14f6ddc
Show file tree
Hide file tree
Showing 17 changed files with 83 additions and 61 deletions.
3 changes: 3 additions & 0 deletions client/api/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ pub trait Backend<Block: BlockT>: Send + Sync {
fn is_indexed(&self) -> bool {
self.log_indexer().is_indexed()
}

/// Get the hash of the latest substrate block fully indexed by the backend.
async fn latest_block_hash(&self) -> Result<Block::Hash, String>;
}

#[derive(Debug, Eq, PartialEq)]
Expand Down
13 changes: 6 additions & 7 deletions client/cli/src/frontier_db_cmd/mapping_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,21 @@ pub enum MappingKey {
EthBlockOrTransactionHash(H256),
}

pub struct MappingDb<'a, C, B: BlockT> {
pub struct MappingDb<'a, B: BlockT, C> {
cmd: &'a FrontierDbCmd,
client: Arc<C>,
backend: Arc<fc_db::kv::Backend<B>>,
backend: Arc<fc_db::kv::Backend<B, C>>,
}

impl<'a, C, B: BlockT> MappingDb<'a, C, B>
impl<'a, B: BlockT, C> MappingDb<'a, B, C>
where
C: ProvideRuntimeApi<B>,
C: HeaderBackend<B> + ProvideRuntimeApi<B>,
C::Api: EthereumRuntimeRPCApi<B>,
C: HeaderBackend<B>,
{
pub fn new(
cmd: &'a FrontierDbCmd,
client: Arc<C>,
backend: Arc<fc_db::kv::Backend<B>>,
backend: Arc<fc_db::kv::Backend<B, C>>,
) -> Self {
Self {
cmd,
Expand Down Expand Up @@ -176,4 +175,4 @@ where
}
}

impl<'a, C, B: BlockT> FrontierDbMessage for MappingDb<'a, C, B> {}
impl<'a, B: BlockT, C: HeaderBackend<B>> FrontierDbMessage for MappingDb<'a, B, C> {}
11 changes: 6 additions & 5 deletions client/cli/src/frontier_db_cmd/meta_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{

use ethereum_types::H256;
use serde::Deserialize;
use sp_blockchain::HeaderBackend;
// Substrate
use sp_runtime::traits::Block as BlockT;

Expand Down Expand Up @@ -57,13 +58,13 @@ impl FromStr for MetaKey {
}
}

pub struct MetaDb<'a, B: BlockT> {
pub struct MetaDb<'a, B: BlockT, C> {
cmd: &'a FrontierDbCmd,
backend: Arc<fc_db::kv::Backend<B>>,
backend: Arc<fc_db::kv::Backend<B, C>>,
}

impl<'a, B: BlockT> MetaDb<'a, B> {
pub fn new(cmd: &'a FrontierDbCmd, backend: Arc<fc_db::kv::Backend<B>>) -> Self {
impl<'a, B: BlockT, C: HeaderBackend<B>> MetaDb<'a, B, C> {
pub fn new(cmd: &'a FrontierDbCmd, backend: Arc<fc_db::kv::Backend<B, C>>) -> Self {
Self { cmd, backend }
}

Expand Down Expand Up @@ -151,4 +152,4 @@ impl<'a, B: BlockT> MetaDb<'a, B> {
}
}

impl<'a, B: BlockT> FrontierDbMessage for MetaDb<'a, B> {}
impl<'a, B: BlockT, C: HeaderBackend<B>> FrontierDbMessage for MetaDb<'a, B, C> {}
7 changes: 3 additions & 4 deletions client/cli/src/frontier_db_cmd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,14 @@ pub enum DbValue<H> {
}

impl FrontierDbCmd {
pub fn run<C, B: BlockT>(
pub fn run<B: BlockT, C>(
&self,
client: Arc<C>,
backend: Arc<fc_db::kv::Backend<B>>,
backend: Arc<fc_db::kv::Backend<B, C>>,
) -> sc_cli::Result<()>
where
C: ProvideRuntimeApi<B>,
C: HeaderBackend<B> + ProvideRuntimeApi<B>,
C::Api: fp_rpc::EthereumRuntimeRPCApi<B>,
C: HeaderBackend<B>,
{
match self.column {
Column::Meta => {
Expand Down
4 changes: 2 additions & 2 deletions client/cli/src/frontier_db_cmd/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type OpaqueBlock =
pub fn open_frontier_backend<Block: BlockT, C: HeaderBackend<Block>>(
client: Arc<C>,
path: PathBuf,
) -> Result<Arc<fc_db::kv::Backend<Block>>, String> {
Ok(Arc::new(fc_db::kv::Backend::<Block>::new(
) -> Result<Arc<fc_db::kv::Backend<Block, C>>, String> {
Ok(Arc::new(fc_db::kv::Backend::<Block, C>::new(
client,
&fc_db::kv::DatabaseSettings {
source: sc_client_db::DatabaseSource::RocksDb {
Expand Down
21 changes: 12 additions & 9 deletions client/db/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,15 @@ pub mod static_keys {
}

#[derive(Clone)]
pub struct Backend<Block: BlockT> {
pub struct Backend<Block: BlockT, C> {
client: Arc<C>,
meta: Arc<MetaDb<Block>>,
mapping: Arc<MappingDb<Block>>,
log_indexer: LogIndexerBackend<Block>,
}

#[async_trait::async_trait]
impl<Block: BlockT> fc_api::Backend<Block> for Backend<Block> {
impl<Block: BlockT, C: HeaderBackend<Block>> fc_api::Backend<Block> for Backend<Block, C> {
async fn block_hash(
&self,
ethereum_block_hash: &H256,
Expand All @@ -88,6 +89,10 @@ impl<Block: BlockT> fc_api::Backend<Block> for Backend<Block> {
fn log_indexer(&self) -> &dyn fc_api::LogIndexerBackend<Block> {
&self.log_indexer
}

async fn latest_block_hash(&self) -> Result<Block::Hash, String> {
Ok(self.client.info().best_hash)
}
}

#[derive(Clone, Default)]
Expand Down Expand Up @@ -115,8 +120,8 @@ pub fn frontier_database_dir(db_config_dir: &Path, db_path: &str) -> PathBuf {
db_config_dir.join("frontier").join(db_path)
}

impl<Block: BlockT> Backend<Block> {
pub fn open<C: HeaderBackend<Block>>(
impl<Block: BlockT, C: HeaderBackend<Block>> Backend<Block, C> {
pub fn open(
client: Arc<C>,
database: &DatabaseSource,
db_config_dir: &Path,
Expand Down Expand Up @@ -148,13 +153,11 @@ impl<Block: BlockT> Backend<Block> {
)
}

pub fn new<C: HeaderBackend<Block>>(
client: Arc<C>,
config: &DatabaseSettings,
) -> Result<Self, String> {
let db = utils::open_database::<Block, C>(client, config)?;
pub fn new(client: Arc<C>, config: &DatabaseSettings) -> Result<Self, String> {
let db = utils::open_database::<Block, C>(client.clone(), config)?;

Ok(Self {
client,
mapping: Arc::new(MappingDb {
db: db.clone(),
write_lock: Arc::new(Mutex::new(())),
Expand Down
6 changes: 4 additions & 2 deletions client/db/src/kv/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,10 @@ mod tests {
pub fn open_frontier_backend<Block: BlockT, C: HeaderBackend<Block>>(
client: Arc<C>,
setting: &crate::kv::DatabaseSettings,
) -> Result<Arc<crate::kv::Backend<Block>>, String> {
Ok(Arc::new(crate::kv::Backend::<Block>::new(client, setting)?))
) -> Result<Arc<crate::kv::Backend<Block, C>>, String> {
Ok(Arc::new(crate::kv::Backend::<Block, C>::new(
client, setting,
)?))
}

#[cfg_attr(not(feature = "rocksdb"), ignore)]
Expand Down
8 changes: 5 additions & 3 deletions client/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#![warn(unused_crate_dependencies)]

use std::sync::Arc;

// Substrate
pub use sc_client_db::DatabaseSource;
use sp_runtime::traits::Block as BlockT;
Expand All @@ -27,8 +29,8 @@ pub mod kv;
pub mod sql;

#[derive(Clone)]
pub enum Backend<Block: BlockT> {
KeyValue(kv::Backend<Block>),
pub enum Backend<Block: BlockT, C> {
KeyValue(Arc<kv::Backend<Block, C>>),
#[cfg(feature = "sql")]
Sql(sql::Backend<Block>),
Sql(Arc<sql::Backend<Block>>),
}
12 changes: 10 additions & 2 deletions client/db/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ pub enum BackendConfig<'a> {
pub struct Backend<Block: BlockT> {
/// The Sqlite connection.
pool: SqlitePool,

/// The additional overrides for the logs handler.
overrides: Arc<OverrideHandle<Block>>,

Expand Down Expand Up @@ -687,7 +686,7 @@ where
}

/// Retrieve the block hash for the last indexed canon block.
pub async fn get_last_indexed_canon_block(&self) -> Result<H256, Error> {
pub async fn last_indexed_canon_block(&self) -> Result<H256, Error> {
let row = sqlx::query(
"SELECT b.substrate_block_hash FROM blocks AS b
INNER JOIN sync_status AS s
Expand Down Expand Up @@ -854,6 +853,15 @@ impl<Block: BlockT<Hash = H256>> fc_api::Backend<Block> for Backend<Block> {
fn log_indexer(&self) -> &dyn fc_api::LogIndexerBackend<Block> {
self
}

async fn latest_block_hash(&self) -> Result<Block::Hash, String> {
// Retrieves the block hash for the latest indexed block, maybe it's not canon.
sqlx::query("SELECT substrate_block_hash FROM blocks ORDER BY block_number DESC LIMIT 1")
.fetch_one(self.pool())
.await
.map(|row| H256::from_slice(&row.get::<Vec<u8>, _>(0)[..]))
.map_err(|e| format!("Failed to fetch best hash: {}", e))
}
}

#[async_trait::async_trait]
Expand Down
9 changes: 5 additions & 4 deletions client/mapping-sync/src/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ where

pub fn sync_genesis_block<Block: BlockT, C>(
client: &C,
backend: &fc_db::kv::Backend<Block>,
backend: &fc_db::kv::Backend<Block, C>,
header: &Block::Header,
) -> Result<(), String>
where
C: ProvideRuntimeApi<Block>,
C: HeaderBackend<Block> + ProvideRuntimeApi<Block>,
C::Api: EthereumRuntimeRPCApi<Block>,
{
let substrate_block_hash = header.hash();
Expand Down Expand Up @@ -282,13 +282,14 @@ where
Ok(synced_any)
}

pub fn fetch_header<Block: BlockT, BE>(
pub fn fetch_header<Block: BlockT, C, BE>(
substrate_backend: &BE,
frontier_backend: &fc_db::kv::Backend<Block>,
frontier_backend: &fc_db::kv::Backend<Block, C>,
checking_tip: Block::Hash,
sync_from: <Block::Header as HeaderT>::Number,
) -> Result<Option<Block::Header>, String>
where
C: HeaderBackend<Block>,
BE: HeaderBackend<Block>,
{
if frontier_backend.mapping().is_synced(&checking_tip)? {
Expand Down
8 changes: 2 additions & 6 deletions client/mapping-sync/src/kv/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,6 @@ impl<Block: BlockT, C, BE> MappingSyncWorker<Block, C, BE> {
overrides: Arc<OverrideHandle<Block>>,
frontier_backend: Arc<fc_db::kv::Backend<Block>>,
retry_times: usize,
sync_from: <Block::Header as HeaderT>::Number,
strategy: SyncStrategy,
sync_oracle: Arc<dyn SyncOracle + Send + Sync + 'static>,
pubsub_notification_sinks: Arc<
crate::EthereumBlockNotificationSinks<crate::EthereumBlockNotification<Block>>,
>,
) -> Self {
Expand Down Expand Up @@ -259,7 +255,7 @@ mod tests {
});

let frontier_backend = Arc::new(
fc_db::kv::Backend::<OpaqueBlock>::new(
fc_db::kv::Backend::<OpaqueBlock, _>::new(
client.clone(),
&fc_db::kv::DatabaseSettings {
source: sc_client_db::DatabaseSource::RocksDb {
Expand Down Expand Up @@ -409,7 +405,7 @@ mod tests {
});

let frontier_backend = Arc::new(
fc_db::kv::Backend::<OpaqueBlock>::new(
fc_db::kv::Backend::<OpaqueBlock, _>::new(
client.clone(),
&fc_db::kv::DatabaseSettings {
source: sc_client_db::DatabaseSource::RocksDb {
Expand Down
2 changes: 1 addition & 1 deletion client/mapping-sync/src/sql/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ where
match cmd {
WorkerCommand::ResumeSync => {
// Attempt to resume from last indexed block. If there is no data in the db, sync genesis.
match indexer_backend.get_last_indexed_canon_block().await.ok() {
match indexer_backend.last_indexed_canon_block().await.ok() {
Some(last_block_hash) => {
log::debug!(target: "frontier-sql", "Resume from last block {last_block_hash:?}");
if let Some(parent_hash) = client
Expand Down
12 changes: 9 additions & 3 deletions client/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,13 @@ pub mod frontier_backend_client {
}
}
BlockNumberOrHash::Num(number) => Some(BlockId::Number(number.unique_saturated_into())),
BlockNumberOrHash::Latest => Some(BlockId::Hash(client.info().best_hash)),
BlockNumberOrHash::Latest => match backend.latest_block_hash().await {
Ok(hash) => Some(BlockId::Hash(hash)),
Err(e) => {
log::warn!(target: "rpc", "Failed to get latest block hash from the sql db: {:?}", e);
Some(BlockId::Hash(client.info().best_hash))
}
},
BlockNumberOrHash::Earliest => Some(BlockId::Hash(client.info().genesis_hash)),
BlockNumberOrHash::Pending => None,
BlockNumberOrHash::Safe => Some(BlockId::Hash(client.info().finalized_hash)),
Expand Down Expand Up @@ -366,8 +372,8 @@ mod tests {
fn open_frontier_backend<Block: BlockT, C: HeaderBackend<Block>>(
client: Arc<C>,
path: PathBuf,
) -> Result<Arc<fc_db::kv::Backend<Block>>, String> {
Ok(Arc::new(fc_db::kv::Backend::<Block>::new(
) -> Result<Arc<fc_db::kv::Backend<Block, C>>, String> {
Ok(Arc::new(fc_db::kv::Backend::<Block, C>::new(
client,
&fc_db::kv::DatabaseSettings {
source: sc_client_db::DatabaseSource::RocksDb {
Expand Down
2 changes: 1 addition & 1 deletion template/node/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ pub fn run() -> sc_cli::Result<()> {
let (client, _, _, _, frontier_backend) =
service::new_chain_ops(&mut config, &cli.eth)?;
let frontier_backend = match frontier_backend {
fc_db::Backend::KeyValue(kv) => std::sync::Arc::new(kv),
fc_db::Backend::KeyValue(kv) => kv,
_ => panic!("Only fc_db::Backend::KeyValue supported"),
};
cmd.run(client, frontier_backend)
Expand Down
10 changes: 5 additions & 5 deletions template/node/src/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use frontier_template_runtime::opaque::Block;
use crate::client::{FullBackend, FullClient};

/// Frontier DB backend type.
pub type FrontierBackend = fc_db::Backend<Block>;
pub type FrontierBackend<C> = fc_db::Backend<Block, C>;

pub fn db_config_dir(config: &Configuration) -> PathBuf {
config.base_path.config_dir(config.chain_spec.id())
Expand Down Expand Up @@ -126,7 +126,7 @@ pub async fn spawn_frontier_tasks<RuntimeApi, Executor>(
task_manager: &TaskManager,
client: Arc<FullClient<RuntimeApi, Executor>>,
backend: Arc<FullBackend>,
frontier_backend: FrontierBackend,
frontier_backend: Arc<FrontierBackend<FullClient<RuntimeApi, Executor>>>,
filter_pool: Option<FilterPool>,
overrides: Arc<OverrideHandle<Block>>,
fee_history_cache: FeeHistoryCache,
Expand All @@ -144,7 +144,7 @@ pub async fn spawn_frontier_tasks<RuntimeApi, Executor>(
Executor: NativeExecutionDispatch + 'static,
{
// Spawn main mapping sync worker background task.
match frontier_backend {
match &*frontier_backend {
fc_db::Backend::KeyValue(b) => {
task_manager.spawn_essential_handle().spawn(
"frontier-mapping-sync-worker",
Expand Down Expand Up @@ -172,10 +172,10 @@ pub async fn spawn_frontier_tasks<RuntimeApi, Executor>(
fc_mapping_sync::sql::SyncWorker::run(
client.clone(),
backend,
Arc::new(b),
b.clone(),
client.import_notification_stream(),
fc_mapping_sync::sql::SyncWorkerConfig {
read_notification_timeout: Duration::from_secs(10),
read_notification_timeout: Duration::from_secs(30),
check_indexed_blocks_interval: Duration::from_secs(60),
},
fc_mapping_sync::SyncStrategy::Parachain,
Expand Down
Loading

0 comments on commit 14f6ddc

Please sign in to comment.