From 3cab65434bec7c8b7c77b5470a1db0c987a0bd6c Mon Sep 17 00:00:00 2001 From: Green Baneling Date: Fri, 5 Jan 2024 22:54:31 +0100 Subject: [PATCH] Moved insertion of the blocks into the `BlockImporter` instead of the executor (#1577) Related to https://github.com/FuelLabs/fuel-core/issues/1549 We don't need to insert the block in the executor because it is the block importer's responsibility. It verifies the block's validity and decides whether we want to insert a new block or not. Plus, storing blocks and transactions is not part of the state transition. This change also adds the ability to produce the block with a defined order of the transactions. It may be useful in the tests. --- CHANGELOG.md | 5 + Cargo.lock | 1 + bin/fuel-core/src/cli/run.rs | 6 +- crates/fuel-core/src/database.rs | 2 + crates/fuel-core/src/database/storage.rs | 7 + crates/fuel-core/src/executor.rs | 37 +----- crates/fuel-core/src/service.rs | 3 + .../src/service/adapters/block_importer.rs | 59 ++++++--- .../service/adapters/consensus_module/poa.rs | 41 ++++-- .../src/service/adapters/executor.rs | 9 +- .../src/service/adapters/producer.rs | 34 ++++- crates/fuel-core/src/service/config.rs | 6 +- crates/fuel-core/src/service/genesis.rs | 6 - crates/fuel-core/src/service/sub_services.rs | 3 +- .../consensus_module/poa/src/ports.rs | 14 +- .../consensus_module/poa/src/service.rs | 56 ++++++-- .../consensus_module/poa/src/service_test.rs | 6 +- .../service_test/manually_produce_tests.rs | 5 +- crates/services/executor/src/executor.rs | 24 +--- crates/services/executor/src/ports.rs | 11 +- crates/services/importer/Cargo.toml | 1 + crates/services/importer/src/config.rs | 16 +++ crates/services/importer/src/importer.rs | 49 ++++--- crates/services/importer/src/importer/test.rs | 120 +++++++++--------- crates/services/importer/src/ports.rs | 24 ++-- .../services/producer/src/block_producer.rs | 67 ++++++++-- .../producer/src/block_producer/tests.rs | 12 +- crates/services/producer/src/mocks.rs | 35 +---- crates/services/producer/src/ports.rs | 8 +- crates/storage/src/tables.rs | 11 ++ tests/Cargo.toml | 2 +- tests/tests/tx.rs | 115 +++++------------ 32 files changed, 450 insertions(+), 345 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b2cf0978023..5870b438e50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,11 @@ and this project adheres to [Semantic Versioning](http://semver.org/). Description of the upcoming release here. + +### Changed + +- [#1577](https://github.com/FuelLabs/fuel-core/pull/1577): Moved insertion of sealed blocks into the `BlockImporter` instead of the executor. + ## [Version 0.22.0] ### Added diff --git a/Cargo.lock b/Cargo.lock index bca80d0ab2b..264c9099a88 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2842,6 +2842,7 @@ version = "0.22.0" dependencies = [ "anyhow", "derive_more", + "fuel-core-chain-config", "fuel-core-metrics", "fuel-core-storage", "fuel-core-trace", diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index e6210d8330c..9b1faeff4ad 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -300,6 +300,9 @@ impl Command { max_wait_time: max_wait_time.into(), }; + let block_importer = + fuel_core::service::config::fuel_core_importer::Config::new(&chain_conf); + let config = Config { addr, api_request_timeout: api_request_timeout.into(), @@ -328,8 +331,7 @@ impl Command { coinbase_recipient, metrics, }, - block_executor: Default::default(), - block_importer: Default::default(), + block_importer, #[cfg(feature = "relayer")] relayer: relayer_cfg, #[cfg(feature = "p2p")] diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index d2fb65cfddd..29ace79dcd1 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -150,6 +150,8 @@ pub enum Column { ContractsStateMerkleData = 23, /// See [`ContractsStateMerkleMetadata`](storage::ContractsStateMerkleMetadata) ContractsStateMerkleMetadata = 24, + /// See [`ProcessedTransactions`](storage::ProcessedTransactions) + ProcessedTransactions = 25, } impl Column { diff --git a/crates/fuel-core/src/database/storage.rs b/crates/fuel-core/src/database/storage.rs index 6ceab3a776b..2c2c5333c6e 100644 --- a/crates/fuel-core/src/database/storage.rs +++ b/crates/fuel-core/src/database/storage.rs @@ -3,6 +3,7 @@ use crate::database::{ Database, }; use fuel_core_storage::{ + tables::ProcessedTransactions, Error as StorageError, Mappable, MerkleRoot, @@ -160,6 +161,12 @@ impl DatabaseColumn for FuelBlockSecondaryKeyBlockHeights { } } +impl DatabaseColumn for ProcessedTransactions { + fn column() -> Column { + Column::ProcessedTransactions + } +} + impl DatabaseColumn for FuelBlockMerkleData { fn column() -> Column { Column::FuelBlockMerkleData diff --git a/crates/fuel-core/src/executor.rs b/crates/fuel-core/src/executor.rs index 36ece1ca9a8..04a770582ce 100644 --- a/crates/fuel-core/src/executor.rs +++ b/crates/fuel-core/src/executor.rs @@ -20,7 +20,6 @@ mod tests { ContractsRawCode, Messages, Receipts, - Transactions, }, StorageAsMut, }; @@ -1571,7 +1570,7 @@ mod tests { .into(); let db = &mut Database::default(); - let mut executor = create_executor( + let executor = create_executor( db.clone(), Config { utxo_validation_default: false, @@ -1607,16 +1606,6 @@ mod tests { assert_eq!(executed_tx.inputs()[0].balance_root(), Some(&empty_state)); assert_eq!(executed_tx.outputs()[0].state_root(), Some(&empty_state)); assert_eq!(executed_tx.outputs()[0].balance_root(), Some(&empty_state)); - - let expected_tx = block.transactions()[1].clone(); - let storage_tx = executor - .database - .storage::() - .get(&executed_tx.id(&ChainId::default())) - .unwrap() - .unwrap() - .into_owned(); - assert_eq!(storage_tx, expected_tx); } #[test] @@ -1638,7 +1627,7 @@ mod tests { .into(); let db = &mut Database::default(); - let mut executor = create_executor( + let executor = create_executor( db.clone(), Config { utxo_validation_default: false, @@ -1680,16 +1669,6 @@ mod tests { ); assert_eq!(executed_tx.inputs()[0].state_root(), Some(&empty_state)); assert_eq!(executed_tx.inputs()[0].balance_root(), Some(&empty_state)); - - let expected_tx = block.transactions()[1].clone(); - let storage_tx = executor - .database - .storage::() - .get(&expected_tx.id(&ChainId::default())) - .unwrap() - .unwrap() - .into_owned(); - assert_eq!(storage_tx, expected_tx); } #[test] @@ -1751,7 +1730,7 @@ mod tests { .clone(); let db = &mut Database::default(); - let mut executor = create_executor( + let executor = create_executor( db.clone(), Config { utxo_validation_default: false, @@ -1793,16 +1772,6 @@ mod tests { executed_tx.inputs()[0].balance_root(), executed_tx.outputs()[0].balance_root() ); - - let expected_tx = block.transactions()[1].clone(); - let storage_tx = executor - .database - .storage::() - .get(&expected_tx.id(&ChainId::default())) - .unwrap() - .unwrap() - .into_owned(); - assert_eq!(storage_tx, expected_tx); } #[test] diff --git a/crates/fuel-core/src/service.rs b/crates/fuel-core/src/service.rs index a6497bfc4a7..3d5240cab28 100644 --- a/crates/fuel-core/src/service.rs +++ b/crates/fuel-core/src/service.rs @@ -19,6 +19,7 @@ pub use config::{ }; pub use fuel_core_services::Service as ServiceTrait; +use crate::service::adapters::PoAAdapter; pub use fuel_core_consensus_module::RelayerVerifierConfig; use self::adapters::BlockImporterAdapter; @@ -32,6 +33,8 @@ pub mod sub_services; #[derive(Clone)] pub struct SharedState { + /// The PoA adaptor around the shared state of the consensus module. + pub poa_adapter: PoAAdapter, /// The transaction pool shared state. pub txpool: fuel_core_txpool::service::SharedState, /// The P2P network shared state. diff --git a/crates/fuel-core/src/service/adapters/block_importer.rs b/crates/fuel-core/src/service/adapters/block_importer.rs index 3fc939a0f7b..89627483c8d 100644 --- a/crates/fuel-core/src/service/adapters/block_importer.rs +++ b/crates/fuel-core/src/service/adapters/block_importer.rs @@ -18,7 +18,11 @@ use fuel_core_importer::{ }; use fuel_core_poa::ports::RelayerPort; use fuel_core_storage::{ - tables::SealedBlockConsensus, + tables::{ + FuelBlocks, + SealedBlockConsensus, + Transactions, + }, transactional::StorageTransaction, Result as StorageResult, StorageAsMut, @@ -27,13 +31,14 @@ use fuel_core_types::{ blockchain::{ block::Block, consensus::Consensus, - primitives::{ - BlockId, - DaBlockHeight, - }, + primitives::DaBlockHeight, SealedBlock, }, - fuel_types::BlockHeight, + fuel_tx::UniqueIdentifier, + fuel_types::{ + BlockHeight, + ChainId, + }, services::executor::{ ExecutionTypes, Result as ExecutorResult, @@ -42,7 +47,10 @@ use fuel_core_types::{ }; use std::sync::Arc; -use super::MaybeRelayerAdapter; +use super::{ + MaybeRelayerAdapter, + TransactionsSource, +}; impl BlockImporterAdapter { pub fn new( @@ -112,8 +120,8 @@ impl RelayerPort for MaybeRelayerAdapter { } impl ImporterDatabase for Database { - fn latest_block_height(&self) -> StorageResult { - self.latest_height() + fn latest_block_height(&self) -> StorageResult> { + Ok(self.ids_of_latest_block()?.map(|(height, _)| height)) } fn increase_tx_count(&self, new_txs_count: u64) -> StorageResult { @@ -122,14 +130,29 @@ impl ImporterDatabase for Database { } impl ExecutorDatabase for Database { - fn seal_block( + fn store_new_block( &mut self, - block_id: &BlockId, - consensus: &Consensus, - ) -> StorageResult> { - self.storage::() - .insert(block_id, consensus) - .map_err(Into::into) + chain_id: &ChainId, + block: &SealedBlock, + ) -> StorageResult { + let block_id = block.entity.id(); + let mut found = self + .storage::() + .insert(&block_id, &block.entity.compress(chain_id))? + .is_some(); + found |= self + .storage::() + .insert(&block_id, &block.consensus)? + .is_some(); + + // TODO: Use `batch_insert` from https://github.com/FuelLabs/fuel-core/pull/1576 + for tx in block.entity.transactions() { + found |= self + .storage::() + .insert(&tx.id(chain_id), tx)? + .is_some(); + } + Ok(!found) } } @@ -141,6 +164,8 @@ impl Executor for ExecutorAdapter { block: Block, ) -> ExecutorResult>> { - self._execute_without_commit(ExecutionTypes::Validation(block)) + self._execute_without_commit::(ExecutionTypes::Validation( + block, + )) } } diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index 46ed86fdc1b..ac446c71675 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -17,8 +17,12 @@ use fuel_core_poa::{ BlockImporter, P2pPort, TransactionPool, + TransactionsSource, + }, + service::{ + Mode, + SharedState, }, - service::SharedState, }; use fuel_core_services::stream::BoxStream; use fuel_core_storage::transactional::StorageTransaction; @@ -45,6 +49,18 @@ impl PoAAdapter { pub fn new(shared_state: Option) -> Self { Self { shared_state } } + + pub async fn manually_produce_blocks( + &self, + start_time: Option, + mode: Mode, + ) -> anyhow::Result<()> { + self.shared_state + .as_ref() + .ok_or(anyhow!("The block production is disabled"))? + .manually_produce_block(start_time, mode) + .await + } } #[async_trait::async_trait] @@ -54,10 +70,7 @@ impl ConsensusModulePort for PoAAdapter { start_time: Option, number_of_blocks: u32, ) -> anyhow::Result<()> { - self.shared_state - .as_ref() - .ok_or(anyhow!("The block production is disabled"))? - .manually_produce_block(start_time, number_of_blocks) + self.manually_produce_blocks(start_time, Mode::Blocks { number_of_blocks }) .await } } @@ -91,11 +104,23 @@ impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter { &self, height: BlockHeight, block_time: Tai64, + source: TransactionsSource, max_gas: Word, ) -> anyhow::Result>> { - self.block_producer - .produce_and_execute_block(height, block_time, max_gas) - .await + match source { + TransactionsSource::TxPool => { + self.block_producer + .produce_and_execute_block_txpool(height, block_time, max_gas) + .await + } + TransactionsSource::SpecificTransactions(txs) => { + self.block_producer + .produce_and_execute_block_transactions( + height, block_time, txs, max_gas, + ) + .await + } + } } } diff --git a/crates/fuel-core/src/service/adapters/executor.rs b/crates/fuel-core/src/service/adapters/executor.rs index bb6f27083f3..bb8e46042db 100644 --- a/crates/fuel-core/src/service/adapters/executor.rs +++ b/crates/fuel-core/src/service/adapters/executor.rs @@ -50,10 +50,13 @@ impl fuel_core_executor::ports::TransactionsSource for TransactionsSource { } impl ExecutorAdapter { - pub(crate) fn _execute_without_commit( + pub(crate) fn _execute_without_commit( &self, - block: ExecutionBlockWithSource, - ) -> ExecutorResult>> { + block: ExecutionBlockWithSource, + ) -> ExecutorResult>> + where + TxSource: fuel_core_executor::ports::TransactionsSource, + { let executor = Executor { database: self.relayer.database.clone(), relayer: self.relayer.clone(), diff --git a/crates/fuel-core/src/service/adapters/producer.rs b/crates/fuel-core/src/service/adapters/producer.rs index 5def3cc1943..f966c48e337 100644 --- a/crates/fuel-core/src/service/adapters/producer.rs +++ b/crates/fuel-core/src/service/adapters/producer.rs @@ -11,6 +11,7 @@ use crate::{ sub_services::BlockProducerService, }, }; +use fuel_core_executor::executor::OnceTransactionsSource; use fuel_core_producer::ports::TxPool; use fuel_core_storage::{ not_found, @@ -25,7 +26,10 @@ use fuel_core_types::{ primitives, }, fuel_tx, - fuel_tx::Receipt, + fuel_tx::{ + Receipt, + Transaction, + }, fuel_types::{ BlockHeight, Bytes32, @@ -61,18 +65,38 @@ impl TxPool for TxPoolAdapter { } } -#[async_trait::async_trait] -impl fuel_core_producer::ports::Executor for ExecutorAdapter { +impl fuel_core_producer::ports::Executor for ExecutorAdapter { type Database = Database; - type TxSource = TransactionsSource; fn execute_without_commit( &self, - component: Components, + component: Components, ) -> ExecutorResult>> { self._execute_without_commit(ExecutionTypes::Production(component)) } +} + +impl fuel_core_producer::ports::Executor> for ExecutorAdapter { + type Database = Database; + + fn execute_without_commit( + &self, + component: Components>, + ) -> ExecutorResult>> { + let Components { + header_to_produce, + transactions_source, + gas_limit, + } = component; + self._execute_without_commit(ExecutionTypes::Production(Components { + header_to_produce, + transactions_source: OnceTransactionsSource::new(transactions_source), + gas_limit, + })) + } +} +impl fuel_core_producer::ports::DryRunner for ExecutorAdapter { fn dry_run( &self, block: Components, diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index f0cabfda032..5aafec6446b 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -30,6 +30,7 @@ use fuel_core_p2p::config::{ #[cfg(feature = "relayer")] use fuel_core_relayer::Config as RelayerConfig; +pub use fuel_core_importer; pub use fuel_core_poa::Trigger; #[derive(Clone, Debug)] @@ -51,7 +52,6 @@ pub struct Config { pub vm: VMConfig, pub txpool: fuel_core_txpool::Config, pub block_producer: fuel_core_producer::Config, - pub block_executor: fuel_core_executor::Config, pub block_importer: fuel_core_importer::Config, #[cfg(feature = "relayer")] pub relayer: Option, @@ -73,6 +73,7 @@ pub struct Config { impl Config { pub fn local_node() -> Self { let chain_conf = ChainConfig::local_testnet(); + let block_importer = fuel_core_importer::Config::new(&chain_conf); let utxo_validation = false; let min_gas_price = 0; @@ -99,8 +100,7 @@ impl Config { ..fuel_core_txpool::Config::default() }, block_producer: Default::default(), - block_executor: Default::default(), - block_importer: Default::default(), + block_importer, #[cfg(feature = "relayer")] relayer: None, #[cfg(feature = "p2p")] diff --git a/crates/fuel-core/src/service/genesis.rs b/crates/fuel-core/src/service/genesis.rs index 8da0fd49637..8039f438d12 100644 --- a/crates/fuel-core/src/service/genesis.rs +++ b/crates/fuel-core/src/service/genesis.rs @@ -16,7 +16,6 @@ use fuel_core_storage::{ ContractsInfo, ContractsLatestUtxo, ContractsRawCode, - FuelBlocks, Messages, }, transactional::Transactional, @@ -125,11 +124,6 @@ fn import_genesis_block( &[], ); - let block_id = block.id(); - database.storage::().insert( - &block_id, - &block.compress(&config.chain_conf.consensus_parameters.chain_id), - )?; let consensus = Consensus::Genesis(genesis); let block = SealedBlock { entity: block, diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 36abbf6c54b..1523fe41c15 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -205,13 +205,14 @@ pub fn init_sub_services( Box::new(database.clone()), Box::new(tx_pool_adapter), Box::new(producer_adapter), - Box::new(poa_adapter), + Box::new(poa_adapter.clone()), Box::new(p2p_adapter), config.query_log_threshold_time, config.api_request_timeout, )?; let shared = SharedState { + poa_adapter, txpool: txpool.shared.clone(), #[cfg(feature = "p2p")] network: network.as_ref().map(|n| n.shared.clone()), diff --git a/crates/services/consensus_module/poa/src/ports.rs b/crates/services/consensus_module/poa/src/ports.rs index 967f64ef94c..fdb8a2d11de 100644 --- a/crates/services/consensus_module/poa/src/ports.rs +++ b/crates/services/consensus_module/poa/src/ports.rs @@ -9,7 +9,10 @@ use fuel_core_types::{ primitives::DaBlockHeight, }, fuel_asm::Word, - fuel_tx::TxId, + fuel_tx::{ + Transaction, + TxId, + }, fuel_types::{ BlockHeight, Bytes32, @@ -40,6 +43,14 @@ pub trait TransactionPool: Send + Sync { #[cfg(test)] use fuel_core_storage::test_helpers::EmptyStorage; +/// The source of transactions for the block. +pub enum TransactionsSource { + /// The source of transactions for the block is the `TxPool`. + TxPool, + /// Use specific transactions for the block. + SpecificTransactions(Vec), +} + #[cfg_attr(test, mockall::automock(type Database=EmptyStorage;))] #[async_trait::async_trait] pub trait BlockProducer: Send + Sync { @@ -49,6 +60,7 @@ pub trait BlockProducer: Send + Sync { &self, height: BlockHeight, block_time: Tai64, + source: TransactionsSource, max_gas: Word, ) -> anyhow::Result>>; } diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 46b84e14a26..3ec7b8727d8 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -8,6 +8,7 @@ use crate::{ BlockProducer, P2pPort, TransactionPool, + TransactionsSource, }, sync::{ SyncState, @@ -42,7 +43,10 @@ use fuel_core_types::{ }, fuel_asm::Word, fuel_crypto::Signature, - fuel_tx::TxId, + fuel_tx::{ + Transaction, + TxId, + }, fuel_types::BlockHeight, secrecy::{ ExposeSecret, @@ -81,16 +85,13 @@ impl SharedState { pub async fn manually_produce_block( &self, start_time: Option, - number_of_blocks: u32, + mode: Mode, ) -> anyhow::Result<()> { let (sender, receiver) = oneshot::channel(); self.request_sender .send(Request::ManualBlocks(( - ManualProduction { - start_time, - number_of_blocks, - }, + ManualProduction { start_time, mode }, sender, ))) .await?; @@ -98,9 +99,16 @@ impl SharedState { } } +pub enum Mode { + /// Produces `number_of_blocks` blocks using `TxPool` as a source of transactions. + Blocks { number_of_blocks: u32 }, + /// Produces one block with the given transactions. + BlockWithTransactions(Vec), +} + struct ManualProduction { pub start_time: Option, - pub number_of_blocks: u32, + pub mode: Mode, } /// Requests accepted by the task. @@ -248,9 +256,10 @@ where &self, height: BlockHeight, block_time: Tai64, + source: TransactionsSource, ) -> anyhow::Result>> { self.block_producer - .produce_and_execute_block(height, block_time, self.block_gas_limit) + .produce_and_execute_block(height, block_time, source, self.block_gas_limit) .await } @@ -258,6 +267,7 @@ where self.produce_block( self.next_height(), self.next_time(RequestType::Trigger)?, + TransactionsSource::TxPool, RequestType::Trigger, ) .await @@ -272,10 +282,28 @@ where } else { self.next_time(RequestType::Manual)? }; - for _ in 0..block_production.number_of_blocks { - self.produce_block(self.next_height(), block_time, RequestType::Manual) + match block_production.mode { + Mode::Blocks { number_of_blocks } => { + for _ in 0..number_of_blocks { + self.produce_block( + self.next_height(), + block_time, + TransactionsSource::TxPool, + RequestType::Manual, + ) + .await?; + block_time = self.next_time(RequestType::Manual)?; + } + } + Mode::BlockWithTransactions(txs) => { + self.produce_block( + self.next_height(), + block_time, + TransactionsSource::SpecificTransactions(txs), + RequestType::Manual, + ) .await?; - block_time = self.next_time(RequestType::Manual)?; + } } Ok(()) } @@ -284,6 +312,7 @@ where &mut self, height: BlockHeight, block_time: Tai64, + source: TransactionsSource, request_type: RequestType, ) -> anyhow::Result<()> { let last_block_created = Instant::now(); @@ -304,7 +333,10 @@ where tx_status, }, db_transaction, - ) = self.signal_produce_block(height, block_time).await?.into(); + ) = self + .signal_produce_block(height, block_time, source) + .await? + .into(); let mut tx_ids_to_remove = Vec::with_capacity(skipped_transactions.len()); for (tx_id, err) in skipped_transactions { diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index 864a4e7d94b..44525e3be62 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -123,7 +123,7 @@ impl TestContextBuilder { let mut producer = MockBlockProducer::default(); producer .expect_produce_and_execute_block() - .returning(|_, _, _| { + .returning(|_, _, _, _| { Ok(UncommittedResult::new( ExecutionResult { block: Default::default(), @@ -272,7 +272,7 @@ async fn remove_skipped_transactions() { block_producer .expect_produce_and_execute_block() .times(1) - .returning(move |_, _, _| { + .returning(move |_, _, _, _| { Ok(UncommittedResult::new( ExecutionResult { block: Default::default(), @@ -357,7 +357,7 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { block_producer .expect_produce_and_execute_block() - .returning(|_, _, _| panic!("Block production should not be called")); + .returning(|_, _, _, _| panic!("Block production should not be called")); let mut block_importer = MockBlockImporter::default(); diff --git a/crates/services/consensus_module/poa/src/service_test/manually_produce_tests.rs b/crates/services/consensus_module/poa/src/service_test/manually_produce_tests.rs index 47bb8d5e30f..3699fffb39b 100644 --- a/crates/services/consensus_module/poa/src/service_test/manually_produce_tests.rs +++ b/crates/services/consensus_module/poa/src/service_test/manually_produce_tests.rs @@ -1,3 +1,4 @@ +use crate::service::Mode; use fuel_core_types::{ blockchain::block::Block, tai64::Tai64, @@ -82,7 +83,7 @@ async fn can_manually_produce_block( let mut producer = MockBlockProducer::default(); producer .expect_produce_and_execute_block() - .returning(|_, time, _| { + .returning(|_, time, _, _| { let mut block = Block::default(); block.header_mut().consensus.time = time; block.header_mut().recalculate_metadata(); @@ -101,7 +102,7 @@ async fn can_manually_produce_block( ctx.service .shared - .manually_produce_block(Some(start_time), number_of_blocks) + .manually_produce_block(Some(start_time), Mode::Blocks { number_of_blocks }) .await .unwrap(); for tx in txs { diff --git a/crates/services/executor/src/executor.rs b/crates/services/executor/src/executor.rs index 15706793410..6be1e94498a 100644 --- a/crates/services/executor/src/executor.rs +++ b/crates/services/executor/src/executor.rs @@ -12,11 +12,10 @@ use fuel_core_storage::{ Coins, ContractsInfo, ContractsLatestUtxo, - FuelBlocks, Messages, + ProcessedTransactions, Receipts, SpentMessages, - Transactions, }, transactional::{ StorageTransaction, @@ -458,17 +457,6 @@ where // ------------ GraphQL API Functionality END ------------ - // insert block into database - block_st_transaction - .as_mut() - .storage::() - .insert( - &finalized_block_id, - &result - .block - .compress(&self.config.consensus_parameters.chain_id), - )?; - // Get the complete fuel block. Ok(UncommittedResult::new(result, block_st_transaction)) } @@ -629,7 +617,7 @@ where // Throw a clear error if the transaction id is a duplicate if tx_st_transaction .as_ref() - .storage::() + .storage::() .contains_key(tx_id)? { return Err(ExecutorError::TransactionIdCollision(*tx_id)) @@ -823,8 +811,8 @@ where if block_st_transaction .as_mut() - .storage::() - .insert(&coinbase_id, &tx)? + .storage::() + .insert(&coinbase_id, &())? .is_some() { return Err(ExecutorError::TransactionIdCollision(coinbase_id)) @@ -979,8 +967,8 @@ where // Store tx into the block db transaction tx_st_transaction .as_mut() - .storage::() - .insert(&tx_id, &final_tx)?; + .storage::() + .insert(&tx_id, &())?; // persist receipts self.persist_receipts(&tx_id, &receipts, tx_st_transaction.as_mut())?; diff --git a/crates/services/executor/src/ports.rs b/crates/services/executor/src/ports.rs index 0c4c32a1deb..1ca5a5058fd 100644 --- a/crates/services/executor/src/ports.rs +++ b/crates/services/executor/src/ports.rs @@ -6,11 +6,10 @@ use fuel_core_storage::{ ContractsLatestUtxo, ContractsRawCode, ContractsState, - FuelBlocks, Messages, + ProcessedTransactions, Receipts, SpentMessages, - Transactions, }, transactional::Transactional, vm_storage::VmStorageRequirements, @@ -109,11 +108,9 @@ pub trait TxIdOwnerRecorder { // TODO: Remove `Clone` bound pub trait ExecutorDatabaseTrait: - StorageMutate - + StorageMutate - + StorageMutate - + MerkleRootStorage - + StorageInspect + StorageMutate + + StorageMutate + + MerkleRootStorage + MessageIsSpent + StorageMutate + StorageMutate diff --git a/crates/services/importer/Cargo.toml b/crates/services/importer/Cargo.toml index a8b359ceb76..7cd93840428 100644 --- a/crates/services/importer/Cargo.toml +++ b/crates/services/importer/Cargo.toml @@ -12,6 +12,7 @@ description = "Fuel Block Importer" [dependencies] anyhow = { workspace = true } derive_more = { workspace = true } +fuel-core-chain-config = { workspace = true } fuel-core-metrics = { workspace = true } fuel-core-storage = { workspace = true } fuel-core-types = { workspace = true } diff --git a/crates/services/importer/src/config.rs b/crates/services/importer/src/config.rs index ddb17391427..c551127c68a 100644 --- a/crates/services/importer/src/config.rs +++ b/crates/services/importer/src/config.rs @@ -1,14 +1,30 @@ +use fuel_core_chain_config::ChainConfig; +use fuel_core_types::fuel_types::ChainId; + #[derive(Debug, Clone)] pub struct Config { pub max_block_notify_buffer: usize, pub metrics: bool, + pub chain_id: ChainId, +} + +impl Config { + pub fn new(chain_config: &ChainConfig) -> Self { + Self { + max_block_notify_buffer: 1 << 10, + metrics: false, + chain_id: chain_config.consensus_parameters.chain_id, + } + } } +#[cfg(test)] impl Default for Config { fn default() -> Self { Self { max_block_notify_buffer: 1 << 10, metrics: false, + chain_id: ChainId::default(), } } } diff --git a/crates/services/importer/src/importer.rs b/crates/services/importer/src/importer.rs index ca1256005bb..056c4010410 100644 --- a/crates/services/importer/src/importer.rs +++ b/crates/services/importer/src/importer.rs @@ -9,9 +9,9 @@ use crate::{ }; use fuel_core_metrics::importer::importer_metrics; use fuel_core_storage::{ + not_found, transactional::StorageTransaction, Error as StorageError, - IsNotFound, }; use fuel_core_types::{ blockchain::{ @@ -22,7 +22,10 @@ use fuel_core_types::{ primitives::BlockId, SealedBlock, }, - fuel_types::BlockHeight, + fuel_types::{ + BlockHeight, + ChainId, + }, services::{ block_importer::{ ImportResult, @@ -59,8 +62,8 @@ pub enum Error { )] InvalidUnderlyingDatabaseGenesisState, #[display(fmt = "The wrong state of database after execution of the block.\ - The actual height is {_1}, when the next expected height is {_0}.")] - InvalidDatabaseStateAfterExecution(BlockHeight, BlockHeight), + The actual height is {_1:?}, when the next expected height is {_0:?}.")] + InvalidDatabaseStateAfterExecution(Option, Option), #[display(fmt = "Got overflow during increasing the height.")] Overflow, #[display(fmt = "The non-generic block can't have zero height.")] @@ -96,7 +99,7 @@ impl From for anyhow::Error { #[cfg(test)] impl PartialEq for Error { fn eq(&self, other: &Self) -> bool { - format!("{self:?}") == format!("{other:?}") + format!("{self}") == format!("{other}") } } @@ -104,6 +107,7 @@ pub struct Importer { database: D, executor: E, verifier: V, + chain_id: ChainId, broadcast: broadcast::Sender>, guard: tokio::sync::Semaphore, } @@ -116,6 +120,7 @@ impl Importer { database, executor, verifier, + chain_id: config.chain_id, broadcast, guard: tokio::sync::Semaphore::new(1), } @@ -187,7 +192,6 @@ where let (result, mut db_tx) = result.into(); let block = &result.sealed_block.entity; let consensus = &result.sealed_block.consensus; - let block_id = block.id(); let actual_next_height = *block.header().height(); // During importing of the genesis block, the database should not be initialized @@ -196,9 +200,9 @@ where // database height + 1. let expected_next_height = match consensus { Consensus::Genesis(_) => { - let result = self.database.latest_block_height(); - let found = !result.is_not_found(); - // Because the genesis block is not committed, it should return non found error. + let result = self.database.latest_block_height()?; + let found = result.is_some(); + // Because the genesis block is not committed, it should return `None`. // If we find the latest height, something is wrong with the state of the database. if found { return Err(Error::InvalidUnderlyingDatabaseGenesisState) @@ -210,7 +214,10 @@ where return Err(Error::ZeroNonGenericHeight) } - let last_db_height = self.database.latest_block_height()?; + let last_db_height = self + .database + .latest_block_height()? + .ok_or(not_found!("Latest block height"))?; last_db_height .checked_add(1u32) .ok_or(Error::Overflow)? @@ -228,18 +235,19 @@ where let db_after_execution = db_tx.as_mut(); // Importer expects that `UncommittedResult` contains the result of block - // execution(It includes the block itself). + // execution without block itself. + let expected_height = self.database.latest_block_height()?; let actual_height = db_after_execution.latest_block_height()?; - if expected_next_height != actual_height { + if expected_height != actual_height { return Err(Error::InvalidDatabaseStateAfterExecution( - expected_next_height, + expected_height, actual_height, )) } - db_after_execution - .seal_block(&block_id, &result.sealed_block.consensus)? - .should_be_unique(&expected_next_height)?; + if !db_after_execution.store_new_block(&self.chain_id, &result.sealed_block)? { + return Err(Error::NotUnique(expected_next_height)) + } // Update the total tx count in chain metadata let total_txs = db_after_execution @@ -252,7 +260,7 @@ where importer_metrics().total_txs_count.set(total_txs as i64); importer_metrics() .block_height - .set(*actual_height.deref() as i64); + .set(*actual_next_height.deref() as i64); let current_time = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() @@ -273,8 +281,11 @@ where // Errors are optimistically handled via fallback to default values since the metrics // should get updated regularly anyways and these errors will be discovered and handled // correctly in more mission critical areas (such as _commit_result) - let current_block_height = - self.database.latest_block_height().unwrap_or_default(); + let current_block_height = self + .database + .latest_block_height() + .unwrap_or_default() + .unwrap_or_default(); let total_tx_count = self.database.increase_tx_count(0).unwrap_or_default(); importer_metrics() diff --git a/crates/services/importer/src/importer/test.rs b/crates/services/importer/src/importer/test.rs index 24db5d043c3..897be9f9945 100644 --- a/crates/services/importer/src/importer/test.rs +++ b/crates/services/importer/src/importer/test.rs @@ -10,7 +10,6 @@ use crate::{ }; use anyhow::anyhow; use fuel_core_storage::{ - not_found, transactional::{ StorageTransaction, Transaction as TransactionTrait, @@ -22,11 +21,13 @@ use fuel_core_types::{ blockchain::{ block::Block, consensus::Consensus, - primitives::BlockId, SealedBlock, }, fuel_tx::TxId, - fuel_types::BlockHeight, + fuel_types::{ + BlockHeight, + ChainId, + }, services::{ block_importer::{ ImportResult, @@ -50,16 +51,16 @@ mockall::mock! { pub Database {} impl ImporterDatabase for Database { - fn latest_block_height(&self) -> StorageResult; + fn latest_block_height(&self) -> StorageResult>; fn increase_tx_count(&self, new_txs_count: u64) -> StorageResult; } impl ExecutorDatabase for Database { - fn seal_block( + fn store_new_block( &mut self, - block_id: &BlockId, - consensus: &Consensus, - ) -> StorageResult>; + chain_id: &ChainId, + block: &SealedBlock, + ) -> StorageResult; } impl TransactionTrait for Database { @@ -109,30 +110,35 @@ fn poa_block(height: u32) -> SealedBlock { fn underlying_db(result: R) -> impl Fn() -> MockDatabase where - R: Fn() -> StorageResult + Send + Clone + 'static, + R: Fn() -> StorageResult> + Send + Clone + 'static, { move || { let result = result.clone(); let mut db = MockDatabase::default(); db.expect_latest_block_height() - .returning(move || result().map(Into::into)); + .returning(move || result().map(|v| v.map(Into::into))); db.expect_increase_tx_count().returning(Ok); db } } -fn executor_db(height: H, seal: S, commits: usize) -> impl Fn() -> MockDatabase +fn executor_db( + height: H, + store_block: B, + commits: usize, +) -> impl Fn() -> MockDatabase where - H: Fn() -> StorageResult + Send + Clone + 'static, - S: Fn() -> StorageResult> + Send + Clone + 'static, + H: Fn() -> StorageResult> + Send + Clone + 'static, + B: Fn() -> StorageResult + Send + Clone + 'static, { move || { let height = height.clone(); - let seal = seal.clone(); + let store_block = store_block.clone(); let mut db = MockDatabase::default(); db.expect_latest_block_height() - .returning(move || height().map(Into::into)); - db.expect_seal_block().returning(move |_, _| seal()); + .returning(move || height().map(|v| v.map(Into::into))); + db.expect_store_new_block() + .returning(move |_, _| store_block()); db.expect_commit().times(commits).returning(|| Ok(())); db.expect_increase_tx_count().returning(Ok); db @@ -143,16 +149,12 @@ fn ok(entity: T) -> impl Fn() -> Result + Clone { move || Ok(entity.clone()) } -fn not_found() -> StorageResult { - Err(not_found!("Not found")) -} - fn storage_failure() -> StorageResult { Err(StorageError::Other(anyhow!("Some failure"))) } fn storage_failure_error() -> Error { - Error::StorageError(StorageError::Other(anyhow!("Some failure"))) + storage_failure::<()>().unwrap_err().into() } fn ex_result(height: u32, skipped_transactions: usize) -> MockExecutionResult { @@ -200,7 +202,7 @@ fn verification_failure() -> anyhow::Result { } fn verification_failure_error() -> Error { - Error::FailedVerification(anyhow!("Not verified")) + Error::FailedVerification(verification_failure::<()>().unwrap_err()) } fn verifier(result: R) -> MockBlockVerifier @@ -219,45 +221,45 @@ where //////////////// //////////// Genesis Block /////////// //////////////// #[test_case( genesis(0), - underlying_db(not_found), - executor_db(ok(0), ok(None), 1) + underlying_db(ok(None)), + executor_db(ok(None), ok(true), 1) => Ok(()); "successfully imports genesis block when latest block not found" )] #[test_case( genesis(113), - underlying_db(not_found), - executor_db(ok(113), ok(None), 1) + underlying_db(ok(None)), + executor_db(ok(None), ok(true), 1) => Ok(()); "successfully imports block at arbitrary height when executor db expects it and last block not found" )] #[test_case( genesis(0), underlying_db(storage_failure), - executor_db(ok(0), ok(None), 0) - => Err(Error::InvalidUnderlyingDatabaseGenesisState); + executor_db(ok(Some(0)), ok(true), 0) + => Err(storage_failure_error()); "fails to import genesis when underlying database fails" )] #[test_case( genesis(0), - underlying_db(ok(0)), - executor_db(ok(0), ok(None), 0) + underlying_db(ok(Some(0))), + executor_db(ok(Some(0)), ok(true), 0) => Err(Error::InvalidUnderlyingDatabaseGenesisState); "fails to import genesis block when already exists" )] #[test_case( genesis(1), - underlying_db(not_found), - executor_db(ok(0), ok(None), 0) - => Err(Error::InvalidDatabaseStateAfterExecution(1u32.into(), 0u32.into())); + underlying_db(ok(None)), + executor_db(ok(Some(0)), ok(true), 0) + => Err(Error::InvalidDatabaseStateAfterExecution(None, Some(0u32.into()))); "fails to import genesis block when next height is not 0" )] #[test_case( genesis(0), - underlying_db(not_found), - executor_db(ok(0), ok(Some(Consensus::Genesis(Default::default()))), 0) + underlying_db(ok(None)), + executor_db(ok(None), ok(false), 0) => Err(Error::NotUnique(0u32.into())); - "fails to import genesis block when consensus exists for height 0" + "fails to import genesis block when block exists for height 0" )] fn commit_result_genesis( sealed_block: SealedBlock, @@ -270,66 +272,66 @@ fn commit_result_genesis( //////////////////////////// PoA Block //////////////////////////// #[test_case( poa_block(1), - underlying_db(ok(0)), - executor_db(ok(1), ok(None), 1) + underlying_db(ok(Some(0))), + executor_db(ok(Some(0)), ok(true), 1) => Ok(()); "successfully imports block at height 1 when latest block is genesis" )] #[test_case( poa_block(113), - underlying_db(ok(112)), - executor_db(ok(113), ok(None), 1) + underlying_db(ok(Some(112))), + executor_db(ok(Some(112)), ok(true), 1) => Ok(()); "successfully imports block at arbitrary height when latest block height is one fewer and executor db expects it" )] #[test_case( poa_block(0), - underlying_db(ok(0)), - executor_db(ok(1), ok(None), 0) + underlying_db(ok(Some(0))), + executor_db(ok(Some(1)), ok(true), 0) => Err(Error::ZeroNonGenericHeight); "fails to import PoA block with height 0" )] #[test_case( poa_block(113), - underlying_db(ok(111)), - executor_db(ok(113), ok(None), 0) + underlying_db(ok(Some(111))), + executor_db(ok(Some(113)), ok(true), 0) => Err(Error::IncorrectBlockHeight(112u32.into(), 113u32.into())); "fails to import block at height 113 when latest block height is 111" )] #[test_case( poa_block(113), - underlying_db(ok(114)), - executor_db(ok(113), ok(None), 0) + underlying_db(ok(Some(114))), + executor_db(ok(Some(113)), ok(true), 0) => Err(Error::IncorrectBlockHeight(115u32.into(), 113u32.into())); "fails to import block at height 113 when latest block height is 114" )] #[test_case( poa_block(113), - underlying_db(ok(112)), - executor_db(ok(114), ok(None), 0) - => Err(Error::InvalidDatabaseStateAfterExecution(113u32.into(), 114u32.into())); + underlying_db(ok(Some(112))), + executor_db(ok(Some(114)), ok(true), 0) + => Err(Error::InvalidDatabaseStateAfterExecution(Some(112u32.into()), Some(114u32.into()))); "fails to import block 113 when executor db expects height 114" )] #[test_case( poa_block(113), - underlying_db(ok(112)), - executor_db(storage_failure, ok(None), 0) + underlying_db(ok(Some(112))), + executor_db(storage_failure, ok(true), 0) => Err(storage_failure_error()); "fails to import block when executor db fails to find latest block" )] #[test_case( poa_block(113), - underlying_db(ok(112)), - executor_db(ok(113), ok(Some(Consensus::PoA(Default::default()))), 0) + underlying_db(ok(Some(112))), + executor_db(ok(Some(112)), ok(false), 0) => Err(Error::NotUnique(113u32.into())); - "fails to import block when consensus exists for block" + "fails to import block when block exists" )] #[test_case( poa_block(113), - underlying_db(ok(112)), - executor_db(ok(113), storage_failure, 0) + underlying_db(ok(Some(112))), + executor_db(ok(Some(112)), storage_failure, 0) => Err(storage_failure_error()); - "fails to import block when executor db fails to find consensus" + "fails to import block when executor db fails to find block" )] fn commit_result_and_execute_and_commit_poa( sealed_block: SealedBlock, @@ -513,10 +515,10 @@ where let previous_height = expected_height.checked_sub(1).unwrap_or_default(); let execute_and_commit_result = execute_and_commit_assert( sealed_block, - underlying_db(ok(previous_height))(), + underlying_db(ok(Some(previous_height)))(), executor( block_after_execution, - executor_db(ok(expected_height), ok(None), commits)(), + executor_db(ok(Some(previous_height)), ok(true), commits)(), ), verifier(verifier_result), ); diff --git a/crates/services/importer/src/ports.rs b/crates/services/importer/src/ports.rs index ce0449a8743..51c14e5085b 100644 --- a/crates/services/importer/src/ports.rs +++ b/crates/services/importer/src/ports.rs @@ -6,9 +6,12 @@ use fuel_core_types::{ blockchain::{ block::Block, consensus::Consensus, - primitives::BlockId, + SealedBlock, + }, + fuel_types::{ + BlockHeight, + ChainId, }, - fuel_types::BlockHeight, services::executor::{ Result as ExecutorResult, UncommittedResult, @@ -32,7 +35,7 @@ pub trait Executor: Send + Sync { /// The database port used by the block importer. pub trait ImporterDatabase { /// Returns the latest block height. - fn latest_block_height(&self) -> StorageResult; + fn latest_block_height(&self) -> StorageResult>; /// Update metadata about the total number of transactions on the chain. /// Returns the total count after the update. fn increase_tx_count(&self, new_txs_count: u64) -> StorageResult; @@ -40,13 +43,16 @@ pub trait ImporterDatabase { /// The port for returned database from the executor. pub trait ExecutorDatabase: ImporterDatabase { - /// Assigns the `Consensus` data to the block under the `block_id`. - /// Return the previous value at the `height`, if any. - fn seal_block( + /// Inserts the `SealedBlock`. + /// + /// The method returns `true` if the block is a new, otherwise `false`. + // TODO: Remove `chain_id` from the signature, but for that transactions inside + // the block should have `cached_id`. We need to guarantee that from the Rust-type system. + fn store_new_block( &mut self, - block_id: &BlockId, - consensus: &Consensus, - ) -> StorageResult>; + chain_id: &ChainId, + block: &SealedBlock, + ) -> StorageResult; } #[cfg_attr(test, mockall::automock)] diff --git a/crates/services/producer/src/block_producer.rs b/crates/services/producer/src/block_producer.rs index 3e57c794195..93a5949c541 100644 --- a/crates/services/producer/src/block_producer.rs +++ b/crates/services/producer/src/block_producer.rs @@ -72,20 +72,21 @@ pub struct Producer { pub lock: Mutex<()>, } -impl - Producer +impl Producer where Database: ports::BlockProducerDatabase + 'static, - TxPool: ports::TxPool + 'static, - Executor: ports::Executor + 'static, { - /// Produces and execute block for the specified height - pub async fn produce_and_execute_block( + /// Produces and execute block for the specified height. + async fn produce_and_execute( &self, height: BlockHeight, block_time: Tai64, + tx_source: impl FnOnce(BlockHeight) -> TxSource, max_gas: Word, - ) -> anyhow::Result>> { + ) -> anyhow::Result>> + where + Executor: ports::Executor + 'static, + { // - get previous block info (hash, root, etc) // - select best da_height from relayer // - get available txs from txpool @@ -97,7 +98,7 @@ where // prevent simultaneous block production calls, the guard will drop at the end of this fn. let _production_guard = self.lock.lock().await; - let source = self.txpool.get_source(height); + let source = tx_source(height); let header = self.new_header(height, block_time).await?; @@ -107,7 +108,7 @@ where gas_limit: max_gas, }; - // Store the context string incase we error. + // Store the context string in case we error. let context_string = format!("Failed to produce block {height:?} due to execution failure"); let result = self @@ -119,7 +120,55 @@ where debug!("Produced block with result: {:?}", result.result()); Ok(result) } +} +impl + Producer +where + Database: ports::BlockProducerDatabase + 'static, + TxPool: ports::TxPool + 'static, + Executor: ports::Executor + 'static, +{ + /// Produces and execute block for the specified height with transactions from the `TxPool`. + pub async fn produce_and_execute_block_txpool( + &self, + height: BlockHeight, + block_time: Tai64, + max_gas: Word, + ) -> anyhow::Result>> { + self.produce_and_execute( + height, + block_time, + |height| self.txpool.get_source(height), + max_gas, + ) + .await + } +} + +impl Producer +where + Database: ports::BlockProducerDatabase + 'static, + Executor: ports::Executor, Database = ExecutorDB> + 'static, +{ + /// Produces and execute block for the specified height with `transactions`. + pub async fn produce_and_execute_block_transactions( + &self, + height: BlockHeight, + block_time: Tai64, + transactions: Vec, + max_gas: Word, + ) -> anyhow::Result>> { + self.produce_and_execute(height, block_time, |_| transactions, max_gas) + .await + } +} + +impl Producer +where + Database: ports::BlockProducerDatabase + 'static, + Executor: ports::DryRunner + 'static, +{ // TODO: Support custom `block_time` for `dry_run`. /// Simulate a transaction without altering any state. Does not aquire the production lock /// since it is basically a "read only" operation and shouldn't get in the way of normal diff --git a/crates/services/producer/src/block_producer/tests.rs b/crates/services/producer/src/block_producer/tests.rs index f9e959d16c8..2263004c925 100644 --- a/crates/services/producer/src/block_producer/tests.rs +++ b/crates/services/producer/src/block_producer/tests.rs @@ -42,7 +42,7 @@ async fn cant_produce_at_genesis_height() { let producer = ctx.producer(); let err = producer - .produce_and_execute_block(0u32.into(), Tai64::now(), 1_000_000_000) + .produce_and_execute_block_txpool(0u32.into(), Tai64::now(), 1_000_000_000) .await .expect_err("expected failure"); @@ -58,7 +58,7 @@ async fn can_produce_initial_block() { let producer = ctx.producer(); let result = producer - .produce_and_execute_block(1u32.into(), Tai64::now(), 1_000_000_000) + .produce_and_execute_block_txpool(1u32.into(), Tai64::now(), 1_000_000_000) .await; assert!(result.is_ok()); @@ -93,7 +93,7 @@ async fn can_produce_next_block() { let ctx = TestContext::default_from_db(db); let producer = ctx.producer(); let result = producer - .produce_and_execute_block( + .produce_and_execute_block_txpool( prev_height .succ() .expect("The block height should be valid"), @@ -112,7 +112,7 @@ async fn cant_produce_if_no_previous_block() { let producer = ctx.producer(); let err = producer - .produce_and_execute_block(100u32.into(), Tai64::now(), 1_000_000_000) + .produce_and_execute_block_txpool(100u32.into(), Tai64::now(), 1_000_000_000) .await .expect_err("expected failure"); @@ -156,7 +156,7 @@ async fn cant_produce_if_previous_block_da_height_too_high() { let producer = ctx.producer(); let err = producer - .produce_and_execute_block( + .produce_and_execute_block_txpool( prev_height .succ() .expect("The block height should be valid"), @@ -187,7 +187,7 @@ async fn production_fails_on_execution_error() { let producer = ctx.producer(); let err = producer - .produce_and_execute_block(1u32.into(), Tai64::now(), 1_000_000_000) + .produce_and_execute_block_txpool(1u32.into(), Tai64::now(), 1_000_000_000) .await .expect_err("expected failure"); diff --git a/crates/services/producer/src/mocks.rs b/crates/services/producer/src/mocks.rs index 69ca3d482dd..eadfcfed0df 100644 --- a/crates/services/producer/src/mocks.rs +++ b/crates/services/producer/src/mocks.rs @@ -20,8 +20,6 @@ use fuel_core_types::{ }, primitives::DaBlockHeight, }, - fuel_tx, - fuel_tx::Receipt, fuel_types::{ Address, BlockHeight, @@ -133,14 +131,12 @@ fn to_block(component: Components>) -> Block { Block::new(component.header_to_produce, transactions, &[]) } -impl Executor for MockExecutor { +impl Executor> for MockExecutor { type Database = MockDb; - /// The source of transaction used by the executor. - type TxSource = Vec; fn execute_without_commit( &self, - component: Components, + component: Components>, ) -> ExecutorResult>> { let block = to_block(component); // simulate executor inserting a block @@ -158,26 +154,16 @@ impl Executor for MockExecutor { StorageTransaction::new(self.0.clone()), )) } - - fn dry_run( - &self, - _block: Components, - _utxo_validation: Option, - ) -> ExecutorResult>> { - Ok(Default::default()) - } } pub struct FailingMockExecutor(pub Mutex>); -impl Executor for FailingMockExecutor { +impl Executor> for FailingMockExecutor { type Database = MockDb; - /// The source of transaction used by the executor. - type TxSource = Vec; fn execute_without_commit( &self, - component: Components, + component: Components>, ) -> ExecutorResult>> { // simulate an execution failure let mut err = self.0.lock().unwrap(); @@ -195,19 +181,6 @@ impl Executor for FailingMockExecutor { )) } } - - fn dry_run( - &self, - _block: Components, - _utxo_validation: Option, - ) -> ExecutorResult>> { - let mut err = self.0.lock().unwrap(); - if let Some(err) = err.take() { - Err(err) - } else { - Ok(Default::default()) - } - } } #[derive(Clone, Default, Debug)] diff --git a/crates/services/producer/src/ports.rs b/crates/services/producer/src/ports.rs index fb53df1934d..1af44bc9d46 100644 --- a/crates/services/producer/src/ports.rs +++ b/crates/services/producer/src/ports.rs @@ -58,19 +58,19 @@ pub trait Relayer: Send + Sync { ) -> anyhow::Result; } -pub trait Executor: Send + Sync { +pub trait Executor: Send + Sync { /// The database used by the executor. type Database; - /// The source of transaction used by the executor. - type TxSource; /// Executes the block and returns the result of execution with uncommitted database /// transaction. fn execute_without_commit( &self, - component: Components, + component: Components, ) -> ExecutorResult>>; +} +pub trait DryRunner: Send + Sync { /// Executes the block without committing it to the database. During execution collects the /// receipts to return them. The `utxo_validation` field can be used to disable the validation /// of utxos during execution. diff --git a/crates/storage/src/tables.rs b/crates/storage/src/tables.rs index 2c2df585f13..27f5cb2fb23 100644 --- a/crates/storage/src/tables.rs +++ b/crates/storage/src/tables.rs @@ -121,5 +121,16 @@ impl Mappable for Transactions { type OwnedValue = Transaction; } +/// The storage table of processed transactions that were executed in the past. +/// The table helps to drop duplicated transactions. +pub struct ProcessedTransactions; + +impl Mappable for ProcessedTransactions { + type Key = Self::OwnedKey; + type OwnedKey = TxId; + type Value = Self::OwnedValue; + type OwnedValue = (); +} + // TODO: Add macro to define all common tables to avoid copy/paste of the code. // TODO: Add macro to define common unit tests. diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 9ed4728fb71..9faa23ec731 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -24,7 +24,7 @@ ethers = "2" fuel-core = { path = "../crates/fuel-core", default-features = false, features = ["test-helpers"] } fuel-core-benches = { path = "../benches" } fuel-core-client = { path = "../crates/client", features = ["test-helpers"] } -fuel-core-executor = { workspace = true, features = ["test-helpers"] } +fuel-core-executor = { workspace = true } fuel-core-p2p = { path = "../crates/services/p2p", features = ["test-helpers"], optional = true } fuel-core-poa = { path = "../crates/services/consensus_module/poa" } fuel-core-relayer = { path = "../crates/services/relayer", features = [ diff --git a/tests/tests/tx.rs b/tests/tests/tx.rs index 82948de6b0e..da1db7b1beb 100644 --- a/tests/tests/tx.rs +++ b/tests/tests/tx.rs @@ -1,9 +1,7 @@ use crate::helpers::TestContext; use fuel_core::{ - database::Database, schema::tx::receipt::all_receipts, service::{ - adapters::MaybeRelayerAdapter, Config, FuelService, }, @@ -16,20 +14,12 @@ use fuel_core_client::client::{ types::TransactionStatus, FuelClient, }; -use fuel_core_executor::executor::Executor; +use fuel_core_poa::service::Mode; use fuel_core_types::{ - blockchain::{ - block::PartialFuelBlock, - header::{ - ConsensusHeader, - PartialBlockHeader, - }, - }, fuel_asm::*, + fuel_crypto::SecretKey, fuel_tx::*, fuel_types::ChainId, - services::executor::ExecutionBlock, - tai64::Tai64, }; use itertools::Itertools; use rand::{ @@ -503,55 +493,30 @@ async fn get_transactions_by_owner_supports_cursor(direction: PageDirection) { #[tokio::test] async fn get_transactions_from_manual_blocks() { - let (executor, db) = get_executor_and_db(); - // get access to a client - let context = initialize_client(db).await; + let context = TestContext::new(100).await; // create 10 txs - let txs: Vec = (0..10).map(create_mock_tx).collect(); + let txs: Vec<_> = (0..10).map(create_mock_tx).collect(); // make 1st test block - let first_test_block = PartialFuelBlock { - header: PartialBlockHeader { - consensus: ConsensusHeader { - height: 1u32.into(), - time: Tai64::now(), - ..Default::default() - }, - ..Default::default() - }, - - // set the first 5 ids of the manually saved txs - transactions: txs.iter().take(5).cloned().collect(), - }; + let first_batch = txs.iter().take(5).cloned().collect(); + context + .srv + .shared + .poa_adapter + .manually_produce_blocks(None, Mode::BlockWithTransactions(first_batch)) + .await + .expect("Should produce first block with first 5 transactions."); // make 2nd test block - let second_test_block = PartialFuelBlock { - header: PartialBlockHeader { - consensus: ConsensusHeader { - height: 2u32.into(), - time: Tai64::now(), - ..Default::default() - }, - ..Default::default() - }, - // set the last 5 ids of the manually saved txs - transactions: txs.iter().skip(5).take(5).cloned().collect(), - }; - - // process blocks and save block height - executor - .execute_and_commit( - ExecutionBlock::Production(first_test_block), - Default::default(), - ) - .unwrap(); - executor - .execute_and_commit( - ExecutionBlock::Production(second_test_block), - Default::default(), - ) - .unwrap(); + let second_batch = txs.iter().skip(5).take(5).cloned().collect(); + context + .srv + .shared + .poa_adapter + .manually_produce_blocks(None, Mode::BlockWithTransactions(second_batch)) + .await + .expect("Should produce block with last 5 transactions."); // Query for first 4: [0, 1, 2, 3] let page_request_forwards = PaginationRequest { @@ -672,38 +637,18 @@ async fn get_owned_transactions() { assert_eq!(&charlie_txs, &[tx1, tx2, tx3]); } -fn get_executor_and_db() -> (Executor, Database) { - let db = Database::default(); - let relayer = MaybeRelayerAdapter { - database: db.clone(), - #[cfg(feature = "relayer")] - relayer_synced: None, - #[cfg(feature = "relayer")] - da_deploy_height: 0u64.into(), - }; - let executor = Executor { - relayer, - database: db.clone(), - config: Default::default(), - }; - - (executor, db) -} - -async fn initialize_client(db: Database) -> TestContext { - let config = Config::local_node(); - let srv = FuelService::from_database(db, config).await.unwrap(); - let client = FuelClient::from(srv.bound_address); - TestContext { - srv, - rng: StdRng::seed_from_u64(0x123), - client, - } -} - // add random val for unique tx fn create_mock_tx(val: u64) -> Transaction { + let mut rng = StdRng::seed_from_u64(val); + TransactionBuilder::script(val.to_be_bytes().to_vec(), Default::default()) - .add_random_fee_input() + .add_unsigned_coin_input( + SecretKey::random(&mut rng), + rng.gen(), + 1_000_000, + Default::default(), + Default::default(), + Default::default(), + ) .finalize_as_transaction() }