diff --git a/CHANGELOG.md b/CHANGELOG.md index 4aad039f8b5..80f96ab1272 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Description of the upcoming release here. - [#1591](https://github.com/FuelLabs/fuel-core/pull/1591): Simplify libp2p dependencies and not depend on all sub modules directly. - [#1585](https://github.com/FuelLabs/fuel-core/pull/1585): Let `NetworkBehaviour` macro generate `FuelBehaviorEvent` in p2p +- [#1579](https://github.com/FuelLabs/fuel-core/pull/1579): The change extracts the off-chain-related logic from the executor and moves it to the GraphQL off-chain worker. It creates two new concepts - Off-chain and On-chain databases where the GraphQL worker has exclusive ownership of the database and may modify it without intersecting with the On-chain database. - [#1577](https://github.com/FuelLabs/fuel-core/pull/1577): Moved insertion of sealed blocks into the `BlockImporter` instead of the executor. - [#1601](https://github.com/FuelLabs/fuel-core/pull/1601): Fix formatting in docs and check that `cargo doc` passes in the CI. diff --git a/Cargo.lock b/Cargo.lock index 96c51e0ff38..c7b103f7743 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2841,6 +2841,7 @@ dependencies = [ "mockall", "test-case", "tokio", + "tokio-rayon", "tracing", ] diff --git a/crates/fuel-core/src/coins_query.rs b/crates/fuel-core/src/coins_query.rs index a7e042d1e45..9c41fd06054 100644 --- a/crates/fuel-core/src/coins_query.rs +++ b/crates/fuel-core/src/coins_query.rs @@ -1,5 +1,5 @@ use crate::{ - fuel_core_graphql_api::service::Database, + fuel_core_graphql_api::database::ReadView, query::asset_query::{ AssetQuery, AssetSpendTarget, @@ -95,7 +95,7 @@ impl SpendQuery { } /// Return [`AssetQuery`]s. - pub fn asset_queries<'a>(&'a self, db: &'a Database) -> Vec> { + pub fn asset_queries<'a>(&'a self, db: &'a ReadView) -> Vec> { self.query_per_asset .iter() .map(|asset| { @@ -159,7 +159,7 @@ pub fn largest_first(query: &AssetQuery) -> Result, CoinsQueryErro // An implementation of the method described on: https://iohk.io/en/blog/posts/2018/07/03/self-organisation-in-coin-selection/ pub fn random_improve( - db: &Database, + db: &ReadView, spend_query: &SpendQuery, ) -> Result>, CoinsQueryError> { let mut coins_per_asset = vec![]; @@ -229,7 +229,7 @@ mod tests { SpendQuery, }, database::Database, - fuel_core_graphql_api::service::Database as ServiceDatabase, + fuel_core_graphql_api::api_service::ReadDatabase as ServiceDatabase, query::asset_query::{ AssetQuery, AssetSpendTarget, @@ -323,15 +323,19 @@ mod tests { let result: Vec<_> = spend_query .iter() .map(|asset| { - largest_first(&AssetQuery::new(owner, asset, base_asset_id, None, db)) - .map(|coins| { - coins - .iter() - .map(|coin| { - (*coin.asset_id(base_asset_id), coin.amount()) - }) - .collect() - }) + largest_first(&AssetQuery::new( + owner, + asset, + base_asset_id, + None, + &db.view(), + )) + .map(|coins| { + coins + .iter() + .map(|coin| (*coin.asset_id(base_asset_id), coin.amount())) + .collect() + }) }) .try_collect()?; Ok(result) @@ -484,7 +488,7 @@ mod tests { db: &ServiceDatabase, ) -> Result, CoinsQueryError> { let coins = random_improve( - db, + &db.view(), &SpendQuery::new(owner, &query_per_asset, None, base_asset_id)?, ); @@ -682,7 +686,7 @@ mod tests { Some(excluded_ids), base_asset_id, )?; - let coins = random_improve(&db.service_database(), &spend_query); + let coins = random_improve(&db.service_database().view(), &spend_query); // Transform result for convenience coins.map(|coins| { @@ -840,7 +844,7 @@ mod tests { } let coins = random_improve( - &db.service_database(), + &db.service_database().view(), &SpendQuery::new( owner, &[AssetSpendTarget { @@ -930,7 +934,8 @@ mod tests { } fn service_database(&self) -> ServiceDatabase { - Box::new(self.database.clone()) + let database = self.database.clone(); + ServiceDatabase::new(database.clone(), database) } } @@ -980,18 +985,22 @@ mod tests { pub fn owned_coins(&self, owner: &Address) -> Vec { use crate::query::CoinQueryData; - let db = self.service_database(); - db.owned_coins_ids(owner, None, IterDirection::Forward) - .map(|res| res.map(|id| db.coin(id).unwrap())) + let query = self.service_database(); + let query = query.view(); + query + .owned_coins_ids(owner, None, IterDirection::Forward) + .map(|res| res.map(|id| query.coin(id).unwrap())) .try_collect() .unwrap() } pub fn owned_messages(&self, owner: &Address) -> Vec { use crate::query::MessageQueryData; - let db = self.service_database(); - db.owned_message_ids(owner, None, IterDirection::Forward) - .map(|res| res.map(|id| db.message(&id).unwrap())) + let query = self.service_database(); + let query = query.view(); + query + .owned_message_ids(owner, None, IterDirection::Forward) + .map(|res| res.map(|id| query.message(&id).unwrap())) .try_collect() .unwrap() } diff --git a/crates/fuel-core/src/executor.rs b/crates/fuel-core/src/executor.rs index 85fb0318718..8b74df131b1 100644 --- a/crates/fuel-core/src/executor.rs +++ b/crates/fuel-core/src/executor.rs @@ -19,7 +19,6 @@ mod tests { Coins, ContractsRawCode, Messages, - Receipts, }, StorageAsMut, }; @@ -662,23 +661,18 @@ mod tests { coinbase_recipient: config_coinbase, ..Default::default() }; - let mut producer = create_executor(Default::default(), config); + let producer = create_executor(Default::default(), config); let mut block = Block::default(); *block.transactions_mut() = vec![script.clone().into()]; - assert!(producer + let ExecutionResult { tx_status, .. } = producer .execute_and_commit( ExecutionBlock::Production(block.into()), - Default::default() + Default::default(), ) - .is_ok()); - let receipts = producer - .database - .storage::() - .get(&script.id(&producer.config.consensus_parameters.chain_id)) - .unwrap() - .unwrap(); + .expect("Should execute the block"); + let receipts = &tx_status[0].receipts; if let Some(Receipt::Return { val, .. }) = receipts.first() { *val == 1 @@ -2756,20 +2750,16 @@ mod tests { }, ); - executor + let ExecutionResult { tx_status, .. } = executor .execute_and_commit( ExecutionBlock::Production(block), ExecutionOptions { utxo_validation: true, }, ) - .unwrap(); + .expect("Should execute the block"); - let receipts = database - .storage::() - .get(&tx.id(&ChainId::default())) - .unwrap() - .unwrap(); + let receipts = &tx_status[0].receipts; assert_eq!(block_height as u64, receipts[0].val().unwrap()); } @@ -2835,21 +2825,16 @@ mod tests { }, ); - executor + let ExecutionResult { tx_status, .. } = executor .execute_and_commit( ExecutionBlock::Production(block), ExecutionOptions { utxo_validation: true, }, ) - .unwrap(); - - let receipts = database - .storage::() - .get(&tx.id(&ChainId::default())) - .unwrap() - .unwrap(); + .expect("Should execute the block"); + let receipts = &tx_status[0].receipts; assert_eq!(time.0, receipts[0].val().unwrap()); } } diff --git a/crates/fuel-core/src/graphql_api.rs b/crates/fuel-core/src/graphql_api.rs index 3fd27a3c19b..12603d964a5 100644 --- a/crates/fuel-core/src/graphql_api.rs +++ b/crates/fuel-core/src/graphql_api.rs @@ -9,9 +9,12 @@ use fuel_core_types::{ }; use std::net::SocketAddr; +pub mod api_service; +pub mod database; pub(crate) mod metrics_extension; pub mod ports; -pub mod service; +pub(crate) mod view_extension; +pub mod worker_service; #[derive(Clone, Debug)] pub struct Config { diff --git a/crates/fuel-core/src/graphql_api/service.rs b/crates/fuel-core/src/graphql_api/api_service.rs similarity index 89% rename from crates/fuel-core/src/graphql_api/service.rs rename to crates/fuel-core/src/graphql_api/api_service.rs index 6c6879ae308..15023a5995f 100644 --- a/crates/fuel-core/src/graphql_api/service.rs +++ b/crates/fuel-core/src/graphql_api/api_service.rs @@ -1,13 +1,17 @@ use crate::{ - fuel_core_graphql_api::ports::{ - BlockProducerPort, - ConsensusModulePort, - DatabasePort, - P2pPort, - TxPoolPort, - }, - graphql_api::{ + fuel_core_graphql_api::{ + database::{ + OffChainView, + OnChainView, + }, metrics_extension::MetricsExtension, + ports::{ + BlockProducerPort, + ConsensusModulePort, + P2pPort, + TxPoolPort, + }, + view_extension::ViewExtension, Config, }, schema::{ @@ -55,6 +59,7 @@ use fuel_core_services::{ RunnableTask, StateWatcher, }; +use fuel_core_storage::transactional::AtomicView; use futures::Stream; use serde_json::json; use std::{ @@ -75,7 +80,7 @@ use tower_http::{ pub type Service = fuel_core_services::ServiceRunner; -pub type Database = Box; +pub use super::database::ReadDatabase; pub type BlockProducer = Box; // In the future GraphQL should not be aware of `TxPool`. It should @@ -160,28 +165,35 @@ impl RunnableTask for Task { // Need a seperate Data Object for each Query endpoint, cannot be avoided #[allow(clippy::too_many_arguments)] -pub fn new_service( +pub fn new_service( config: Config, schema: CoreSchemaBuilder, - database: Database, + on_database: OnChain, + off_database: OffChain, txpool: TxPool, producer: BlockProducer, consensus_module: ConsensusModule, p2p_service: P2pService, log_threshold_ms: Duration, request_timeout: Duration, -) -> anyhow::Result { +) -> anyhow::Result +where + OnChain: AtomicView + 'static, + OffChain: AtomicView + 'static, +{ let network_addr = config.addr; + let combined_read_database = ReadDatabase::new(on_database, off_database); let schema = schema .data(config) - .data(database) + .data(combined_read_database) .data(txpool) .data(producer) .data(consensus_module) .data(p2p_service) .extension(async_graphql::extensions::Tracing) .extension(MetricsExtension::new(log_threshold_ms)) + .extension(ViewExtension::new()) .finish(); let router = Router::new() diff --git a/crates/fuel-core/src/graphql_api/database.rs b/crates/fuel-core/src/graphql_api/database.rs new file mode 100644 index 00000000000..feb9a638c18 --- /dev/null +++ b/crates/fuel-core/src/graphql_api/database.rs @@ -0,0 +1,234 @@ +use crate::fuel_core_graphql_api::ports::{ + DatabaseBlocks, + DatabaseChain, + DatabaseContracts, + DatabaseMessageProof, + DatabaseMessages, + OffChainDatabase, + OnChainDatabase, +}; +use fuel_core_storage::{ + iter::{ + BoxedIter, + IterDirection, + }, + tables::Receipts, + transactional::AtomicView, + Error as StorageError, + Mappable, + Result as StorageResult, + StorageInspect, +}; +use fuel_core_txpool::types::{ + ContractId, + TxId, +}; +use fuel_core_types::{ + blockchain::primitives::{ + BlockId, + DaBlockHeight, + }, + entities::message::{ + MerkleProof, + Message, + }, + fuel_tx::{ + Address, + AssetId, + TxPointer, + UtxoId, + }, + fuel_types::{ + BlockHeight, + Nonce, + }, + services::{ + graphql_api::ContractBalance, + txpool::TransactionStatus, + }, +}; +use std::{ + borrow::Cow, + sync::Arc, +}; + +/// The on-chain view of the database used by the [`ReadView`] to fetch on-chain data. +pub type OnChainView = Arc; +/// The off-chain view of the database used by the [`ReadView`] to fetch off-chain data. +pub type OffChainView = Arc; + +/// The container of the on-chain and off-chain database view provides. +/// It is used only by `ViewExtension` to create a [`ReadView`]. +pub struct ReadDatabase { + /// The on-chain database view provider. + on_chain: Box>, + /// The off-chain database view provider. + off_chain: Box>, +} + +impl ReadDatabase { + /// Creates a new [`ReadDatabase`] with the given on-chain and off-chain database view providers. + pub fn new(on_chain: OnChain, off_chain: OffChain) -> Self + where + OnChain: AtomicView + 'static, + OffChain: AtomicView + 'static, + { + Self { + on_chain: Box::new(on_chain), + off_chain: Box::new(off_chain), + } + } + + /// Creates a consistent view of the database. + pub fn view(&self) -> ReadView { + // TODO: Use the same height for both views to guarantee consistency. + // It is not possible to implement until `view_at` is implemented for the `AtomicView`. + // https://github.com/FuelLabs/fuel-core/issues/1582 + ReadView { + on_chain: self.on_chain.latest_view(), + off_chain: self.off_chain.latest_view(), + } + } +} + +pub struct ReadView { + on_chain: OnChainView, + off_chain: OffChainView, +} + +impl DatabaseBlocks for ReadView { + fn block_id(&self, height: &BlockHeight) -> StorageResult { + self.on_chain.block_id(height) + } + + fn blocks_ids( + &self, + start: Option, + direction: IterDirection, + ) -> BoxedIter<'_, StorageResult<(BlockHeight, BlockId)>> { + self.on_chain.blocks_ids(start, direction) + } + + fn ids_of_latest_block(&self) -> StorageResult<(BlockHeight, BlockId)> { + self.on_chain.ids_of_latest_block() + } +} + +impl StorageInspect for ReadView +where + M: Mappable, + dyn OnChainDatabase: StorageInspect, +{ + type Error = StorageError; + + fn get(&self, key: &M::Key) -> StorageResult>> { + self.on_chain.get(key) + } + + fn contains_key(&self, key: &M::Key) -> StorageResult { + self.on_chain.contains_key(key) + } +} + +impl DatabaseMessages for ReadView { + fn all_messages( + &self, + start_message_id: Option, + direction: IterDirection, + ) -> BoxedIter<'_, StorageResult> { + self.on_chain.all_messages(start_message_id, direction) + } + + fn message_is_spent(&self, nonce: &Nonce) -> StorageResult { + self.on_chain.message_is_spent(nonce) + } + + fn message_exists(&self, nonce: &Nonce) -> StorageResult { + self.on_chain.message_exists(nonce) + } +} + +impl DatabaseContracts for ReadView { + fn contract_balances( + &self, + contract: ContractId, + start_asset: Option, + direction: IterDirection, + ) -> BoxedIter> { + self.on_chain + .contract_balances(contract, start_asset, direction) + } +} + +impl DatabaseChain for ReadView { + fn chain_name(&self) -> StorageResult { + self.on_chain.chain_name() + } + + fn da_height(&self) -> StorageResult { + self.on_chain.da_height() + } +} + +impl DatabaseMessageProof for ReadView { + fn block_history_proof( + &self, + message_block_height: &BlockHeight, + commit_block_height: &BlockHeight, + ) -> StorageResult { + self.on_chain + .block_history_proof(message_block_height, commit_block_height) + } +} + +impl OnChainDatabase for ReadView {} + +impl StorageInspect for ReadView { + type Error = StorageError; + + fn get( + &self, + key: &::Key, + ) -> StorageResult::OwnedValue>>> { + self.off_chain.get(key) + } + + fn contains_key(&self, key: &::Key) -> StorageResult { + self.off_chain.contains_key(key) + } +} + +impl OffChainDatabase for ReadView { + fn owned_message_ids( + &self, + owner: &Address, + start_message_id: Option, + direction: IterDirection, + ) -> BoxedIter<'_, StorageResult> { + self.off_chain + .owned_message_ids(owner, start_message_id, direction) + } + + fn owned_coins_ids( + &self, + owner: &Address, + start_coin: Option, + direction: IterDirection, + ) -> BoxedIter<'_, StorageResult> { + self.off_chain.owned_coins_ids(owner, start_coin, direction) + } + + fn tx_status(&self, tx_id: &TxId) -> StorageResult { + self.off_chain.tx_status(tx_id) + } + + fn owned_transactions_ids( + &self, + owner: Address, + start: Option, + direction: IterDirection, + ) -> BoxedIter> { + self.off_chain + .owned_transactions_ids(owner, start, direction) + } +} diff --git a/crates/fuel-core/src/graphql_api/ports.rs b/crates/fuel-core/src/graphql_api/ports.rs index b897acb2489..44ff62b79b3 100644 --- a/crates/fuel-core/src/graphql_api/ports.rs +++ b/crates/fuel-core/src/graphql_api/ports.rs @@ -14,7 +14,6 @@ use fuel_core_storage::{ Messages, Receipts, SealedBlockConsensus, - SpentMessages, Transactions, }, Error as StorageError, @@ -57,14 +56,41 @@ use fuel_core_types::{ }; use std::sync::Arc; -/// The database port expected by GraphQL API service. -pub trait DatabasePort: +pub trait OffChainDatabase: + Send + Sync + StorageInspect +{ + fn owned_message_ids( + &self, + owner: &Address, + start_message_id: Option, + direction: IterDirection, + ) -> BoxedIter<'_, StorageResult>; + + fn owned_coins_ids( + &self, + owner: &Address, + start_coin: Option, + direction: IterDirection, + ) -> BoxedIter<'_, StorageResult>; + + fn tx_status(&self, tx_id: &TxId) -> StorageResult; + + fn owned_transactions_ids( + &self, + owner: Address, + start: Option, + direction: IterDirection, + ) -> BoxedIter>; +} + +/// The on chain database port expected by GraphQL API service. +pub trait OnChainDatabase: Send + Sync + DatabaseBlocks - + DatabaseTransactions + + StorageInspect + DatabaseMessages - + DatabaseCoins + + StorageInspect + DatabaseContracts + DatabaseChain + DatabaseMessageProof @@ -87,33 +113,8 @@ pub trait DatabaseBlocks: fn ids_of_latest_block(&self) -> StorageResult<(BlockHeight, BlockId)>; } -/// Trait that specifies all the getters required for transactions. -pub trait DatabaseTransactions: - StorageInspect - + StorageInspect -{ - fn tx_status(&self, tx_id: &TxId) -> StorageResult; - - fn owned_transactions_ids( - &self, - owner: Address, - start: Option, - direction: IterDirection, - ) -> BoxedIter>; -} - /// Trait that specifies all the getters required for messages. -pub trait DatabaseMessages: - StorageInspect - + StorageInspect -{ - fn owned_message_ids( - &self, - owner: &Address, - start_message_id: Option, - direction: IterDirection, - ) -> BoxedIter<'_, StorageResult>; - +pub trait DatabaseMessages: StorageInspect { fn all_messages( &self, start_message_id: Option, @@ -125,16 +126,6 @@ pub trait DatabaseMessages: fn message_exists(&self, nonce: &Nonce) -> StorageResult; } -/// Trait that specifies all the getters required for coins. -pub trait DatabaseCoins: StorageInspect { - fn owned_coins_ids( - &self, - owner: &Address, - start_coin: Option, - direction: IterDirection, - ) -> BoxedIter<'_, StorageResult>; -} - /// Trait that specifies all the getters required for contract. pub trait DatabaseContracts: StorageInspect @@ -174,7 +165,7 @@ pub trait TxPoolPort: Send + Sync { } #[async_trait] -pub trait DryRunExecution { +pub trait BlockProducerPort: Send + Sync { async fn dry_run_tx( &self, transaction: Transaction, @@ -183,8 +174,6 @@ pub trait DryRunExecution { ) -> anyhow::Result>; } -pub trait BlockProducerPort: Send + Sync + DryRunExecution {} - #[async_trait::async_trait] pub trait ConsensusModulePort: Send + Sync { async fn manually_produce_blocks( @@ -209,3 +198,51 @@ pub trait DatabaseMessageProof: Send + Sync { pub trait P2pPort: Send + Sync { async fn all_peer_info(&self) -> anyhow::Result>; } + +pub mod worker { + use fuel_core_services::stream::BoxStream; + use fuel_core_storage::{ + tables::Receipts, + transactional::Transactional, + Error as StorageError, + Result as StorageResult, + StorageMutate, + }; + use fuel_core_types::{ + fuel_tx::{ + Address, + Bytes32, + }, + fuel_types::BlockHeight, + services::{ + block_importer::SharedImportResult, + txpool::TransactionStatus, + }, + }; + + pub trait OffChainDatabase: + Send + + Sync + + StorageMutate + + Transactional + { + fn record_tx_id_owner( + &mut self, + owner: &Address, + block_height: BlockHeight, + tx_idx: u16, + tx_id: &Bytes32, + ) -> StorageResult>; + + fn update_tx_status( + &mut self, + id: &Bytes32, + status: TransactionStatus, + ) -> StorageResult>; + } + + pub trait BlockImporter { + /// Returns a stream of imported block. + fn block_events(&self) -> BoxStream; + } +} diff --git a/crates/fuel-core/src/graphql_api/view_extension.rs b/crates/fuel-core/src/graphql_api/view_extension.rs new file mode 100644 index 00000000000..ca482fe9878 --- /dev/null +++ b/crates/fuel-core/src/graphql_api/view_extension.rs @@ -0,0 +1,44 @@ +use crate::graphql_api::database::ReadDatabase; +use async_graphql::{ + extensions::{ + Extension, + ExtensionContext, + ExtensionFactory, + NextPrepareRequest, + }, + Request, + ServerResult, +}; +use std::sync::Arc; + +/// The extension that adds the `ReadView` to the request context. +/// It guarantees that the request works with the one view of the database, +/// and external database modification cannot affect the result. +pub(crate) struct ViewExtension; + +impl ViewExtension { + pub fn new() -> Self { + Self + } +} + +impl ExtensionFactory for ViewExtension { + fn create(&self) -> Arc { + Arc::new(ViewExtension::new()) + } +} + +#[async_trait::async_trait] +impl Extension for ViewExtension { + async fn prepare_request( + &self, + ctx: &ExtensionContext<'_>, + request: Request, + next: NextPrepareRequest<'_>, + ) -> ServerResult { + let database: &ReadDatabase = ctx.data_unchecked(); + let view = database.view(); + let request = request.data(view); + next.run(ctx, request).await + } +} diff --git a/crates/fuel-core/src/graphql_api/worker_service.rs b/crates/fuel-core/src/graphql_api/worker_service.rs new file mode 100644 index 00000000000..22f54719227 --- /dev/null +++ b/crates/fuel-core/src/graphql_api/worker_service.rs @@ -0,0 +1,284 @@ +use crate::fuel_core_graphql_api::ports; +use fuel_core_services::{ + stream::BoxStream, + EmptyShared, + RunnableService, + RunnableTask, + ServiceRunner, + StateWatcher, +}; +use fuel_core_storage::{ + tables::Receipts, + Result as StorageResult, + StorageAsMut, +}; +use fuel_core_types::{ + blockchain::block::Block, + fuel_tx::{ + field::{ + Inputs, + Outputs, + }, + input::coin::{ + CoinPredicate, + CoinSigned, + }, + Input, + Output, + Receipt, + Transaction, + TxId, + UniqueIdentifier, + }, + fuel_types::{ + BlockHeight, + Bytes32, + }, + services::{ + block_importer::{ + ImportResult, + SharedImportResult, + }, + executor::TransactionExecutionStatus, + txpool::from_executor_to_status, + }, +}; +use futures::{ + FutureExt, + StreamExt, +}; + +/// The off-chain GraphQL API worker task processes the imported blocks +/// and actualize the information used by the GraphQL service. +pub struct Task { + block_importer: BoxStream, + database: D, +} + +impl Task +where + D: ports::worker::OffChainDatabase, +{ + fn process_block(&mut self, result: SharedImportResult) -> anyhow::Result<()> { + // TODO: Implement the creation of indexes for the messages and coins. + // Implement table `BlockId -> BlockHeight` to get the block height by block id. + // https://github.com/FuelLabs/fuel-core/issues/1583 + let mut transaction = self.database.transaction(); + // save the status for every transaction using the finalized block id + self.persist_transaction_status(&result, transaction.as_mut())?; + + // save the associated owner for each transaction in the block + self.index_tx_owners_for_block( + &result.sealed_block.entity, + transaction.as_mut(), + )?; + transaction.commit()?; + + Ok(()) + } + + /// Associate all transactions within a block to their respective UTXO owners + fn index_tx_owners_for_block( + &self, + block: &Block, + block_st_transaction: &mut D, + ) -> anyhow::Result<()> { + for (tx_idx, tx) in block.transactions().iter().enumerate() { + let block_height = *block.header().height(); + let inputs; + let outputs; + let tx_idx = u16::try_from(tx_idx).map_err(|e| { + anyhow::anyhow!("The block has more than `u16::MAX` transactions, {}", e) + })?; + let tx_id = tx.cached_id().expect( + "The imported block should contains only transactions with cached id", + ); + match tx { + Transaction::Script(tx) => { + inputs = tx.inputs().as_slice(); + outputs = tx.outputs().as_slice(); + } + Transaction::Create(tx) => { + inputs = tx.inputs().as_slice(); + outputs = tx.outputs().as_slice(); + } + Transaction::Mint(_) => continue, + } + self.persist_owners_index( + block_height, + inputs, + outputs, + &tx_id, + tx_idx, + block_st_transaction, + )?; + } + Ok(()) + } + + /// Index the tx id by owner for all of the inputs and outputs + fn persist_owners_index( + &self, + block_height: BlockHeight, + inputs: &[Input], + outputs: &[Output], + tx_id: &Bytes32, + tx_idx: u16, + db: &mut D, + ) -> StorageResult<()> { + let mut owners = vec![]; + for input in inputs { + if let Input::CoinSigned(CoinSigned { owner, .. }) + | Input::CoinPredicate(CoinPredicate { owner, .. }) = input + { + owners.push(owner); + } + } + + for output in outputs { + match output { + Output::Coin { to, .. } + | Output::Change { to, .. } + | Output::Variable { to, .. } => { + owners.push(to); + } + Output::Contract(_) | Output::ContractCreated { .. } => {} + } + } + + // dedupe owners from inputs and outputs prior to indexing + owners.sort(); + owners.dedup(); + + for owner in owners { + db.record_tx_id_owner(owner, block_height, tx_idx, tx_id)?; + } + + Ok(()) + } + + fn persist_transaction_status( + &self, + import_result: &ImportResult, + db: &mut D, + ) -> StorageResult<()> { + for TransactionExecutionStatus { + id, + result, + receipts, + } in import_result.tx_status.iter() + { + let status = from_executor_to_status( + &import_result.sealed_block.entity, + result.clone(), + ); + + if db.update_tx_status(id, status)?.is_some() { + return Err(anyhow::anyhow!( + "Transaction status already exists for tx {}", + id + ) + .into()); + } + + self.persist_receipts(id, receipts, db)?; + } + Ok(()) + } + + fn persist_receipts( + &self, + tx_id: &TxId, + receipts: &[Receipt], + db: &mut D, + ) -> StorageResult<()> { + if db.storage::().insert(tx_id, receipts)?.is_some() { + return Err(anyhow::anyhow!("Receipts already exist for tx {}", tx_id).into()); + } + Ok(()) + } +} + +#[async_trait::async_trait] +impl RunnableService for Task +where + D: ports::worker::OffChainDatabase, +{ + const NAME: &'static str = "GraphQL_Off_Chain_Worker"; + type SharedData = EmptyShared; + type Task = Self; + type TaskParams = (); + + fn shared_data(&self) -> Self::SharedData { + EmptyShared + } + + async fn into_task( + self, + _: &StateWatcher, + _: Self::TaskParams, + ) -> anyhow::Result { + // TODO: It is possible that the node was shut down before we processed all imported blocks. + // It could lead to some missed blocks and the database's inconsistent state. + // Because the result of block execution is not stored on the chain, it is impossible + // to actualize the database without executing the block at the previous state + // of the blockchain. When `AtomicView::view_at` is implemented, we can + // process all missed blocks and actualize the database here. + // https://github.com/FuelLabs/fuel-core/issues/1584 + Ok(self) + } +} + +#[async_trait::async_trait] +impl RunnableTask for Task +where + D: ports::worker::OffChainDatabase, +{ + async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { + let should_continue; + tokio::select! { + biased; + + _ = watcher.while_started() => { + should_continue = false; + } + + result = self.block_importer.next() => { + if let Some(block) = result { + self.process_block(block)?; + + should_continue = true + } else { + should_continue = false + } + } + } + Ok(should_continue) + } + + async fn shutdown(mut self) -> anyhow::Result<()> { + // Process all remaining blocks before shutdown to not lose any data. + loop { + let result = self.block_importer.next().now_or_never(); + + if let Some(Some(block)) = result { + self.process_block(block)?; + } else { + break; + } + } + Ok(()) + } +} + +pub fn new_service(block_importer: I, database: D) -> ServiceRunner> +where + I: ports::worker::BlockImporter, + D: ports::worker::OffChainDatabase, +{ + let block_importer = block_importer.block_events(); + ServiceRunner::new(Task { + block_importer, + database, + }) +} diff --git a/crates/fuel-core/src/query/balance.rs b/crates/fuel-core/src/query/balance.rs index c5977422257..ecbc47620bd 100644 --- a/crates/fuel-core/src/query/balance.rs +++ b/crates/fuel-core/src/query/balance.rs @@ -1,4 +1,4 @@ -use crate::fuel_core_graphql_api::service::Database; +use crate::fuel_core_graphql_api::database::ReadView; use asset_query::{ AssetQuery, AssetSpendTarget, @@ -43,7 +43,7 @@ pub trait BalanceQueryData: Send + Sync { ) -> BoxedIter>; } -impl BalanceQueryData for Database { +impl BalanceQueryData for ReadView { fn balance( &self, owner: Address, diff --git a/crates/fuel-core/src/query/balance/asset_query.rs b/crates/fuel-core/src/query/balance/asset_query.rs index e93c9d0f304..ee0266b1245 100644 --- a/crates/fuel-core/src/query/balance/asset_query.rs +++ b/crates/fuel-core/src/query/balance/asset_query.rs @@ -1,5 +1,5 @@ use crate::{ - graphql_api::service::Database, + graphql_api::database::ReadView, query::{ CoinQueryData, MessageQueryData, @@ -58,7 +58,7 @@ pub struct AssetsQuery<'a> { pub owner: &'a Address, pub assets: Option>, pub exclude: Option<&'a Exclude>, - pub database: &'a Database, + pub database: &'a ReadView, pub base_asset_id: &'a AssetId, } @@ -67,7 +67,7 @@ impl<'a> AssetsQuery<'a> { owner: &'a Address, assets: Option>, exclude: Option<&'a Exclude>, - database: &'a Database, + database: &'a ReadView, base_asset_id: &'a AssetId, ) -> Self { Self { @@ -171,7 +171,7 @@ pub struct AssetQuery<'a> { pub owner: &'a Address, pub asset: &'a AssetSpendTarget, pub exclude: Option<&'a Exclude>, - pub database: &'a Database, + pub database: &'a ReadView, query: AssetsQuery<'a>, } @@ -181,7 +181,7 @@ impl<'a> AssetQuery<'a> { asset: &'a AssetSpendTarget, base_asset_id: &'a AssetId, exclude: Option<&'a Exclude>, - database: &'a Database, + database: &'a ReadView, ) -> Self { let mut allowed = HashSet::new(); allowed.insert(&asset.id); diff --git a/crates/fuel-core/src/query/block.rs b/crates/fuel-core/src/query/block.rs index 66cba1f941b..8aeed56f76d 100644 --- a/crates/fuel-core/src/query/block.rs +++ b/crates/fuel-core/src/query/block.rs @@ -1,4 +1,4 @@ -use crate::graphql_api::ports::DatabasePort; +use crate::fuel_core_graphql_api::ports::OnChainDatabase; use fuel_core_storage::{ iter::{ BoxedIter, @@ -26,7 +26,7 @@ pub trait SimpleBlockData: Send + Sync { fn block(&self, id: &BlockId) -> StorageResult; } -impl SimpleBlockData for D { +impl SimpleBlockData for D { fn block(&self, id: &BlockId) -> StorageResult { let block = self .storage::() @@ -56,7 +56,7 @@ pub trait BlockQueryData: Send + Sync + SimpleBlockData { fn consensus(&self, id: &BlockId) -> StorageResult; } -impl BlockQueryData for D { +impl BlockQueryData for D { fn block_id(&self, height: &BlockHeight) -> StorageResult { self.block_id(height) } diff --git a/crates/fuel-core/src/query/chain.rs b/crates/fuel-core/src/query/chain.rs index 88ce035ba1b..b9408ddfcd3 100644 --- a/crates/fuel-core/src/query/chain.rs +++ b/crates/fuel-core/src/query/chain.rs @@ -1,4 +1,4 @@ -use crate::graphql_api::ports::DatabasePort; +use crate::fuel_core_graphql_api::ports::OnChainDatabase; use fuel_core_storage::Result as StorageResult; use fuel_core_types::blockchain::primitives::DaBlockHeight; @@ -8,7 +8,7 @@ pub trait ChainQueryData: Send + Sync { fn da_height(&self) -> StorageResult; } -impl ChainQueryData for D { +impl ChainQueryData for D { fn name(&self) -> StorageResult { self.chain_name() } diff --git a/crates/fuel-core/src/query/coin.rs b/crates/fuel-core/src/query/coin.rs index d31b60690e9..171a88168bd 100644 --- a/crates/fuel-core/src/query/coin.rs +++ b/crates/fuel-core/src/query/coin.rs @@ -1,4 +1,7 @@ -use crate::graphql_api::ports::DatabasePort; +use crate::fuel_core_graphql_api::ports::{ + OffChainDatabase, + OnChainDatabase, +}; use fuel_core_storage::{ iter::{ BoxedIter, @@ -34,7 +37,7 @@ pub trait CoinQueryData: Send + Sync { ) -> BoxedIter>; } -impl CoinQueryData for D { +impl CoinQueryData for D { fn coin(&self, utxo_id: UtxoId) -> StorageResult { let coin = self .storage::() diff --git a/crates/fuel-core/src/query/contract.rs b/crates/fuel-core/src/query/contract.rs index d05d90999bb..d4bbb8b5d62 100644 --- a/crates/fuel-core/src/query/contract.rs +++ b/crates/fuel-core/src/query/contract.rs @@ -1,4 +1,4 @@ -use crate::graphql_api::ports::DatabasePort; +use crate::fuel_core_graphql_api::ports::OnChainDatabase; use fuel_core_storage::{ iter::{ BoxedIter, @@ -43,7 +43,7 @@ pub trait ContractQueryData: Send + Sync { ) -> BoxedIter>; } -impl ContractQueryData for D { +impl ContractQueryData for D { fn contract_id(&self, id: ContractId) -> StorageResult { let contract_exists = self.storage::().contains_key(&id)?; if contract_exists { diff --git a/crates/fuel-core/src/query/message.rs b/crates/fuel-core/src/query/message.rs index b1ce17e4bb9..334c24dc0d7 100644 --- a/crates/fuel-core/src/query/message.rs +++ b/crates/fuel-core/src/query/message.rs @@ -3,7 +3,8 @@ use crate::{ ports::{ DatabaseMessageProof, DatabaseMessages, - DatabasePort, + OffChainDatabase, + OnChainDatabase, }, IntoApiResult, }, @@ -80,7 +81,7 @@ pub trait MessageQueryData: Send + Sync { ) -> BoxedIter>; } -impl MessageQueryData for D { +impl MessageQueryData for D { fn message(&self, id: &Nonce) -> StorageResult { self.storage::() .get(id)? @@ -128,7 +129,10 @@ pub trait MessageProofData: ) -> StorageResult; } -impl MessageProofData for D { +impl MessageProofData for D +where + D: OnChainDatabase + OffChainDatabase + ?Sized, +{ fn transaction_status( &self, transaction_id: &TxId, diff --git a/crates/fuel-core/src/query/tx.rs b/crates/fuel-core/src/query/tx.rs index 74d325e33ae..ebc2531f27f 100644 --- a/crates/fuel-core/src/query/tx.rs +++ b/crates/fuel-core/src/query/tx.rs @@ -1,4 +1,7 @@ -use crate::graphql_api::ports::DatabasePort; +use crate::fuel_core_graphql_api::ports::{ + OffChainDatabase, + OnChainDatabase, +}; use fuel_core_storage::{ iter::{ BoxedIter, @@ -32,7 +35,10 @@ pub trait SimpleTransactionData: Send + Sync { fn transaction(&self, transaction_id: &TxId) -> StorageResult; } -impl SimpleTransactionData for D { +impl SimpleTransactionData for D +where + D: OnChainDatabase + OffChainDatabase + ?Sized, +{ fn transaction(&self, tx_id: &TxId) -> StorageResult { self.storage::() .get(tx_id) @@ -57,7 +63,10 @@ pub trait TransactionQueryData: Send + Sync + SimpleTransactionData { ) -> BoxedIter>; } -impl TransactionQueryData for D { +impl TransactionQueryData for D +where + D: OnChainDatabase + OffChainDatabase + ?Sized, +{ fn status(&self, tx_id: &TxId) -> StorageResult { self.tx_status(tx_id) } diff --git a/crates/fuel-core/src/schema/balance.rs b/crates/fuel-core/src/schema/balance.rs index 9188696a897..da5a72ada58 100644 --- a/crates/fuel-core/src/schema/balance.rs +++ b/crates/fuel-core/src/schema/balance.rs @@ -1,6 +1,6 @@ use crate::{ fuel_core_graphql_api::{ - service::Database, + database::ReadView, Config, }, query::BalanceQueryData, @@ -56,12 +56,12 @@ impl BalanceQuery { #[graphql(desc = "address of the owner")] owner: Address, #[graphql(desc = "asset_id of the coin")] asset_id: AssetId, ) -> async_graphql::Result { - let data: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let base_asset_id = *ctx .data_unchecked::() .consensus_parameters .base_asset_id(); - let balance = data.balance(owner.0, asset_id.0, base_asset_id)?.into(); + let balance = query.balance(owner.0, asset_id.0, base_asset_id)?.into(); Ok(balance) } @@ -82,7 +82,7 @@ impl BalanceQuery { if before.is_some() || after.is_some() { return Err(anyhow!("pagination is not yet supported").into()) } - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); crate::schema::query_pagination(after, before, first, last, |_, direction| { let owner = filter.owner.into(); let base_asset_id = *ctx diff --git a/crates/fuel-core/src/schema/block.rs b/crates/fuel-core/src/schema/block.rs index 5d503f281bc..a092600c071 100644 --- a/crates/fuel-core/src/schema/block.rs +++ b/crates/fuel-core/src/schema/block.rs @@ -4,13 +4,11 @@ use super::scalars::{ }; use crate::{ fuel_core_graphql_api::{ - service::{ - ConsensusModule, - Database, - }, + api_service::ConsensusModule, + database::ReadView, Config as GraphQLConfig, + IntoApiResult, }, - graphql_api::IntoApiResult, query::{ BlockQueryData, SimpleBlockData, @@ -96,7 +94,7 @@ impl Block { } async fn consensus(&self, ctx: &Context<'_>) -> async_graphql::Result { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let id = self.0.header().id(); let consensus = query.consensus(&id)?; @@ -107,7 +105,7 @@ impl Block { &self, ctx: &Context<'_>, ) -> async_graphql::Result> { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); self.0 .transactions() .iter() @@ -192,7 +190,7 @@ impl BlockQuery { #[graphql(desc = "ID of the block")] id: Option, #[graphql(desc = "Height of the block")] height: Option, ) -> async_graphql::Result> { - let data: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let id = match (id, height) { (Some(_), Some(_)) => { return Err(async_graphql::Error::new( @@ -202,14 +200,14 @@ impl BlockQuery { (Some(id), None) => Ok(id.0.into()), (None, Some(height)) => { let height: u32 = height.into(); - data.block_id(&height.into()) + query.block_id(&height.into()) } (None, None) => { return Err(async_graphql::Error::new("Missing either id or height")) } }; - id.and_then(|id| data.block(&id)).into_api_result() + id.and_then(|id| query.block(&id)).into_api_result() } async fn blocks( @@ -220,9 +218,9 @@ impl BlockQuery { last: Option, before: Option, ) -> async_graphql::Result> { - let db: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); crate::schema::query_pagination(after, before, first, last, |start, direction| { - Ok(blocks_query(db, start.map(Into::into), direction)) + Ok(blocks_query(query, start.map(Into::into), direction)) }) .await } @@ -253,16 +251,16 @@ impl HeaderQuery { last: Option, before: Option, ) -> async_graphql::Result> { - let db: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); crate::schema::query_pagination(after, before, first, last, |start, direction| { - Ok(blocks_query(db, start.map(Into::into), direction)) + Ok(blocks_query(query, start.map(Into::into), direction)) }) .await } } fn blocks_query( - query: &Database, + query: &ReadView, start: Option, direction: IterDirection, ) -> BoxedIter> @@ -292,7 +290,7 @@ impl BlockMutation { start_timestamp: Option, blocks_to_produce: U32, ) -> async_graphql::Result { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let consensus_module = ctx.data_unchecked::(); let config = ctx.data_unchecked::().clone(); diff --git a/crates/fuel-core/src/schema/chain.rs b/crates/fuel-core/src/schema/chain.rs index e1df56c7eb2..7c8bb918aa3 100644 --- a/crates/fuel-core/src/schema/chain.rs +++ b/crates/fuel-core/src/schema/chain.rs @@ -1,6 +1,6 @@ use crate::{ fuel_core_graphql_api::{ - service::Database, + database::ReadView, Config as GraphQLConfig, }, query::{ @@ -683,19 +683,19 @@ impl HeavyOperation { #[Object] impl ChainInfo { async fn name(&self, ctx: &Context<'_>) -> async_graphql::Result { - let data: &Database = ctx.data_unchecked(); - Ok(data.name()?) + let query: &ReadView = ctx.data_unchecked(); + Ok(query.name()?) } async fn latest_block(&self, ctx: &Context<'_>) -> async_graphql::Result { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let latest_block = query.latest_block()?.into(); Ok(latest_block) } async fn da_height(&self, ctx: &Context<'_>) -> U64 { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let height = query .da_height() diff --git a/crates/fuel-core/src/schema/coins.rs b/crates/fuel-core/src/schema/coins.rs index 60a75add8f9..476058016be 100644 --- a/crates/fuel-core/src/schema/coins.rs +++ b/crates/fuel-core/src/schema/coins.rs @@ -4,10 +4,10 @@ use crate::{ SpendQuery, }, fuel_core_graphql_api::{ + database::ReadView, Config as GraphQLConfig, IntoApiResult, }, - graphql_api::service::Database, query::{ asset_query::AssetSpendTarget, CoinQueryData, @@ -152,8 +152,8 @@ impl CoinQuery { ctx: &Context<'_>, #[graphql(desc = "The ID of the coin")] utxo_id: UtxoId, ) -> async_graphql::Result> { - let data: &Database = ctx.data_unchecked(); - data.coin(utxo_id.0).into_api_result() + let query: &ReadView = ctx.data_unchecked(); + query.coin(utxo_id.0).into_api_result() } /// Gets all unspent coins of some `owner` maybe filtered with by `asset_id` per page. @@ -166,7 +166,7 @@ impl CoinQuery { last: Option, before: Option, ) -> async_graphql::Result> { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); crate::schema::query_pagination(after, before, first, last, |start, direction| { let owner: fuel_tx::Address = filter.owner.into(); let coins = query @@ -240,9 +240,9 @@ impl CoinQuery { let spend_query = SpendQuery::new(owner, &query_per_asset, excluded_ids, *base_asset_id)?; - let db = ctx.data_unchecked::(); + let query: &ReadView = ctx.data_unchecked(); - let coins = random_improve(db, &spend_query)? + let coins = random_improve(query, &spend_query)? .into_iter() .map(|coins| { coins diff --git a/crates/fuel-core/src/schema/contract.rs b/crates/fuel-core/src/schema/contract.rs index 2409041925d..16a26b87704 100644 --- a/crates/fuel-core/src/schema/contract.rs +++ b/crates/fuel-core/src/schema/contract.rs @@ -1,6 +1,6 @@ use crate::{ fuel_core_graphql_api::{ - service::Database, + database::ReadView, IntoApiResult, }, query::ContractQueryData, @@ -41,16 +41,16 @@ impl Contract { } async fn bytecode(&self, ctx: &Context<'_>) -> async_graphql::Result { - let context: &Database = ctx.data_unchecked(); - context + let query: &ReadView = ctx.data_unchecked(); + query .contract_bytecode(self.0) .map(HexString) .map_err(Into::into) } async fn salt(&self, ctx: &Context<'_>) -> async_graphql::Result { - let context: &Database = ctx.data_unchecked(); - context + let query: &ReadView = ctx.data_unchecked(); + query .contract_salt(self.0) .map(Into::into) .map_err(Into::into) @@ -67,8 +67,8 @@ impl ContractQuery { ctx: &Context<'_>, #[graphql(desc = "ID of the Contract")] id: ContractId, ) -> async_graphql::Result> { - let data: &Database = ctx.data_unchecked(); - data.contract_id(id.0).into_api_result() + let query: &ReadView = ctx.data_unchecked(); + query.contract_id(id.0).into_api_result() } } @@ -108,8 +108,8 @@ impl ContractBalanceQuery { ) -> async_graphql::Result { let contract_id = contract.into(); let asset_id = asset.into(); - let context: &Database = ctx.data_unchecked(); - context + let query: &ReadView = ctx.data_unchecked(); + query .contract_balance(contract_id, asset_id) .into_api_result() .map(|result| { @@ -135,7 +135,7 @@ impl ContractBalanceQuery { ) -> async_graphql::Result< Connection, > { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); crate::schema::query_pagination(after, before, first, last, |start, direction| { let balances = query diff --git a/crates/fuel-core/src/schema/message.rs b/crates/fuel-core/src/schema/message.rs index 75707190e22..dfc17606864 100644 --- a/crates/fuel-core/src/schema/message.rs +++ b/crates/fuel-core/src/schema/message.rs @@ -1,5 +1,3 @@ -use std::ops::Deref; - use super::{ block::Header, scalars::{ @@ -12,7 +10,10 @@ use super::{ }, }; use crate::{ - fuel_core_graphql_api::service::Database, + fuel_core_graphql_api::{ + database::ReadView, + ports::DatabaseBlocks, + }, query::MessageQueryData, schema::scalars::{ BlockId, @@ -75,7 +76,7 @@ impl MessageQuery { before: Option, ) -> async_graphql::Result> { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); crate::schema::query_pagination( after, before, @@ -114,12 +115,12 @@ impl MessageQuery { commit_block_id: Option, commit_block_height: Option, ) -> async_graphql::Result> { - let data: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let block_id = match (commit_block_id, commit_block_height) { (Some(commit_block_id), None) => commit_block_id.0.into(), (None, Some(commit_block_height)) => { let block_height = commit_block_height.0.into(); - data.block_id(&block_height)? + query.block_id(&block_height)? } _ => Err(anyhow::anyhow!( "Either `commit_block_id` or `commit_block_height` must be provided exclusively" @@ -127,7 +128,7 @@ impl MessageQuery { }; Ok(crate::query::message_proof( - data.deref(), + query, transaction_id.into(), nonce.into(), block_id, @@ -140,8 +141,8 @@ impl MessageQuery { ctx: &Context<'_>, nonce: Nonce, ) -> async_graphql::Result { - let data: &Database = ctx.data_unchecked(); - let status = crate::query::message_status(data.deref(), nonce.into())?; + let query: &ReadView = ctx.data_unchecked(); + let status = crate::query::message_status(query, nonce.into())?; Ok(status.into()) } } diff --git a/crates/fuel-core/src/schema/node_info.rs b/crates/fuel-core/src/schema/node_info.rs index 97ef85167c0..647b0c4215e 100644 --- a/crates/fuel-core/src/schema/node_info.rs +++ b/crates/fuel-core/src/schema/node_info.rs @@ -47,7 +47,7 @@ impl NodeInfo { async fn peers(&self, _ctx: &Context<'_>) -> async_graphql::Result> { #[cfg(feature = "p2p")] { - let p2p: &crate::fuel_core_graphql_api::service::P2pService = + let p2p: &crate::fuel_core_graphql_api::api_service::P2pService = _ctx.data_unchecked(); let peer_info = p2p.all_peer_info().await?; let peers = peer_info.into_iter().map(PeerInfo).collect(); diff --git a/crates/fuel-core/src/schema/tx.rs b/crates/fuel-core/src/schema/tx.rs index 0d772b86854..19a8599b10c 100644 --- a/crates/fuel-core/src/schema/tx.rs +++ b/crates/fuel-core/src/schema/tx.rs @@ -1,25 +1,29 @@ use crate::{ fuel_core_graphql_api::{ - service::{ + api_service::{ BlockProducer, - Database, TxPool, }, + database::ReadView, + ports::OffChainDatabase, + Config, IntoApiResult, }, - graphql_api::Config, query::{ transaction_status_change, BlockQueryData, SimpleTransactionData, TransactionQueryData, }, - schema::scalars::{ - Address, - HexString, - SortedTxCursor, - TransactionId, - TxPointer, + schema::{ + scalars::{ + Address, + HexString, + SortedTxCursor, + TransactionId, + TxPointer, + }, + tx::types::TransactionStatus, }, }; use async_graphql::{ @@ -48,7 +52,10 @@ use fuel_core_types::{ }, fuel_types, fuel_types::canonical::Deserialize, - fuel_vm::checked_transaction::EstimatePredicates, + fuel_vm::checked_transaction::{ + CheckPredicateParams, + EstimatePredicates, + }, services::txpool, }; use futures::{ @@ -63,9 +70,6 @@ use std::{ use tokio_stream::StreamExt; use types::Transaction; -use self::types::TransactionStatus; -use fuel_core_types::fuel_vm::checked_transaction::CheckPredicateParams; - pub mod input; pub mod output; pub mod receipt; @@ -81,7 +85,7 @@ impl TxQuery { ctx: &Context<'_>, #[graphql(desc = "The ID of the transaction")] id: TransactionId, ) -> async_graphql::Result> { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let id = id.0; let txpool = ctx.data_unchecked::(); @@ -105,8 +109,7 @@ impl TxQuery { ) -> async_graphql::Result< Connection, > { - let db_query: &Database = ctx.data_unchecked(); - let tx_query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); crate::schema::query_pagination( after, before, @@ -115,7 +118,7 @@ impl TxQuery { |start: &Option, direction| { let start = *start; let block_id = start.map(|sorted| sorted.block_height); - let all_block_ids = db_query.compressed_blocks(block_id, direction); + let all_block_ids = query.compressed_blocks(block_id, direction); let all_txs = all_block_ids .map(move |block| { @@ -145,7 +148,7 @@ impl TxQuery { }); let all_txs = all_txs.map(|result: StorageResult| { result.and_then(|sorted| { - let tx = tx_query.transaction(&sorted.tx_id.0)?; + let tx = query.transaction(&sorted.tx_id.0)?; Ok((sorted, Transaction::from_tx(sorted.tx_id.0, tx))) }) @@ -167,7 +170,7 @@ impl TxQuery { before: Option, ) -> async_graphql::Result> { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let config = ctx.data_unchecked::(); let owner = fuel_types::Address::from(owner); @@ -298,11 +301,11 @@ impl TxStatusSubscription { ) -> anyhow::Result> + 'a> { let txpool = ctx.data_unchecked::(); - let db = ctx.data_unchecked::(); + let query: &ReadView = ctx.data_unchecked(); let rx = txpool.tx_update_subscribe(id.into())?; Ok(transaction_status_change( - move |id| match db.tx_status(&id) { + move |id| match query.tx_status(&id) { Ok(status) => Ok(Some(status)), Err(StorageError::NotFound(_, _)) => Ok(txpool .submission_time(id) diff --git a/crates/fuel-core/src/schema/tx/types.rs b/crates/fuel-core/src/schema/tx/types.rs index 41b06f5cb3c..fcd0e110ff2 100644 --- a/crates/fuel-core/src/schema/tx/types.rs +++ b/crates/fuel-core/src/schema/tx/types.rs @@ -5,10 +5,8 @@ use super::{ }; use crate::{ fuel_core_graphql_api::{ - service::{ - Database, - TxPool, - }, + api_service::TxPool, + database::ReadView, Config, IntoApiResult, }, @@ -160,7 +158,7 @@ impl SuccessStatus { } async fn block(&self, ctx: &Context<'_>) -> async_graphql::Result { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let block = query.block(&self.block_id)?; Ok(block.into()) } @@ -174,8 +172,8 @@ impl SuccessStatus { } async fn receipts(&self, ctx: &Context<'_>) -> async_graphql::Result> { - let db = ctx.data_unchecked::(); - let receipts = db + let query: &ReadView = ctx.data_unchecked(); + let receipts = query .receipts(&self.tx_id) .unwrap_or_default() .into_iter() @@ -201,7 +199,7 @@ impl FailureStatus { } async fn block(&self, ctx: &Context<'_>) -> async_graphql::Result { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let block = query.block(&self.block_id)?; Ok(block.into()) } @@ -219,8 +217,8 @@ impl FailureStatus { } async fn receipts(&self, ctx: &Context<'_>) -> async_graphql::Result> { - let db = ctx.data_unchecked::(); - let receipts = db + let query: &ReadView = ctx.data_unchecked(); + let receipts = query .receipts(&self.tx_id) .unwrap_or_default() .into_iter() @@ -526,7 +524,7 @@ impl Transaction { ctx: &Context<'_>, ) -> async_graphql::Result> { let id = self.1; - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let txpool = ctx.data_unchecked::(); get_tx_status(id, query, txpool).map_err(Into::into) } @@ -535,7 +533,7 @@ impl Transaction { &self, ctx: &Context<'_>, ) -> async_graphql::Result>> { - let query: &Database = ctx.data_unchecked(); + let query: &ReadView = ctx.data_unchecked(); let receipts = query .receipts(&self.1) .into_api_result::, async_graphql::Error>()?; @@ -622,7 +620,7 @@ impl Transaction { #[tracing::instrument(level = "debug", skip(query, txpool), ret, err)] pub(crate) fn get_tx_status( id: fuel_core_types::fuel_types::Bytes32, - query: &Database, + query: &ReadView, txpool: &TxPool, ) -> Result, StorageError> { match query diff --git a/crates/fuel-core/src/service.rs b/crates/fuel-core/src/service.rs index 1f58e7afd78..7fee7ddbe1d 100644 --- a/crates/fuel-core/src/service.rs +++ b/crates/fuel-core/src/service.rs @@ -44,7 +44,7 @@ pub struct SharedState { /// The Relayer shared state. pub relayer: Option>, /// The GraphQL shared state. - pub graph_ql: crate::fuel_core_graphql_api::service::SharedState, + pub graph_ql: crate::fuel_core_graphql_api::api_service::SharedState, /// The underlying database. pub database: Database, /// Subscribe to new block production. @@ -305,9 +305,9 @@ mod tests { i += 1; } - // current services: graphql, txpool, PoA + // current services: graphql, graphql worker, txpool, PoA #[allow(unused_mut)] - let mut expected_services = 3; + let mut expected_services = 4; // Relayer service is disabled with `Config::local_node`. // #[cfg(feature = "relayer")] diff --git a/crates/fuel-core/src/service/adapters/block_importer.rs b/crates/fuel-core/src/service/adapters/block_importer.rs index 89627483c8d..7fdfb2c3035 100644 --- a/crates/fuel-core/src/service/adapters/block_importer.rs +++ b/crates/fuel-core/src/service/adapters/block_importer.rs @@ -70,11 +70,7 @@ impl BlockImporterAdapter { &self, sealed_block: SealedBlock, ) -> anyhow::Result<()> { - tokio::task::spawn_blocking({ - let importer = self.block_importer.clone(); - move || importer.execute_and_commit(sealed_block) - }) - .await??; + self.block_importer.execute_and_commit(sealed_block).await?; Ok(()) } } diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index ac446c71675..9e57c2cf0ed 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -1,5 +1,3 @@ -use std::ops::Deref; - use crate::{ database::Database, fuel_core_graphql_api::ports::ConsensusModulePort, @@ -124,15 +122,17 @@ impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter { } } +#[async_trait::async_trait] impl BlockImporter for BlockImporterAdapter { type Database = Database; - fn commit_result( + async fn commit_result( &self, result: UncommittedImporterResult>, ) -> anyhow::Result<()> { self.block_importer .commit_result(result) + .await .map_err(Into::into) } @@ -140,7 +140,7 @@ impl BlockImporter for BlockImporterAdapter { Box::pin( BroadcastStream::new(self.block_importer.subscribe()) .filter_map(|result| result.ok()) - .map(|r| r.deref().into()), + .map(BlockImportInfo::from), ) } } diff --git a/crates/fuel-core/src/service/adapters/executor.rs b/crates/fuel-core/src/service/adapters/executor.rs index b4a6b29e7cb..dbeece6c739 100644 --- a/crates/fuel-core/src/service/adapters/executor.rs +++ b/crates/fuel-core/src/service/adapters/executor.rs @@ -16,26 +16,19 @@ use fuel_core_executor::{ use fuel_core_storage::{ transactional::StorageTransaction, Error as StorageError, - Result as StorageResult, }; use fuel_core_types::{ blockchain::primitives::DaBlockHeight, entities::message::Message, fuel_tx, fuel_tx::Receipt, - fuel_types::{ - Address, - BlockHeight, - Bytes32, - Nonce, - }, + fuel_types::Nonce, services::{ block_producer::Components, executor::{ Result as ExecutorResult, UncommittedResult, }, - txpool::TransactionStatus, }, }; @@ -84,36 +77,6 @@ impl fuel_core_executor::refs::ContractStorageTrait for Database { type InnerError = StorageError; } -impl fuel_core_executor::ports::MessageIsSpent for Database { - type Error = StorageError; - - fn message_is_spent(&self, nonce: &Nonce) -> StorageResult { - self.message_is_spent(nonce) - } -} - -impl fuel_core_executor::ports::TxIdOwnerRecorder for Database { - type Error = StorageError; - - fn record_tx_id_owner( - &mut self, - owner: &Address, - block_height: BlockHeight, - tx_idx: u16, - tx_id: &Bytes32, - ) -> Result, Self::Error> { - self.record_tx_id_owner(owner, block_height, tx_idx, tx_id) - } - - fn update_tx_status( - &mut self, - id: &Bytes32, - status: TransactionStatus, - ) -> Result, Self::Error> { - self.update_tx_status(id, status) - } -} - impl fuel_core_executor::ports::ExecutorDatabaseTrait for Database {} impl fuel_core_executor::ports::RelayerPort for MaybeRelayerAdapter { diff --git a/crates/fuel-core/src/service/adapters/graphql_api.rs b/crates/fuel-core/src/service/adapters/graphql_api.rs index 4faea60040a..e83efc44e08 100644 --- a/crates/fuel-core/src/service/adapters/graphql_api.rs +++ b/crates/fuel-core/src/service/adapters/graphql_api.rs @@ -1,20 +1,13 @@ -use super::BlockProducerAdapter; +use super::{ + BlockImporterAdapter, + BlockProducerAdapter, +}; use crate::{ - database::{ - transactions::OwnedTransactionIndexCursor, - Database, - }, + database::Database, fuel_core_graphql_api::ports::{ + worker, BlockProducerPort, - DatabaseBlocks, - DatabaseChain, - DatabaseCoins, - DatabaseContracts, DatabaseMessageProof, - DatabaseMessages, - DatabasePort, - DatabaseTransactions, - DryRunExecution, P2pPort, TxPoolPort, }, @@ -25,51 +18,22 @@ use crate::{ }; use async_trait::async_trait; use fuel_core_services::stream::BoxStream; -use fuel_core_storage::{ - iter::{ - BoxedIter, - IntoBoxedIter, - IterDirection, - }, - not_found, - Error as StorageError, - Result as StorageResult, -}; +use fuel_core_storage::Result as StorageResult; use fuel_core_txpool::{ service::TxStatusMessage, - types::{ - ContractId, - TxId, - }, + types::TxId, }; use fuel_core_types::{ - blockchain::primitives::{ - BlockId, - DaBlockHeight, - }, - entities::message::{ - MerkleProof, - Message, - }, + entities::message::MerkleProof, fuel_tx::{ - Address, - AssetId, Receipt as TxReceipt, Transaction, - TxPointer, - UtxoId, - }, - fuel_types::{ - BlockHeight, - Nonce, }, + fuel_types::BlockHeight, services::{ - graphql_api::ContractBalance, + block_importer::SharedImportResult, p2p::PeerInfo, - txpool::{ - InsertionResult, - TransactionStatus, - }, + txpool::InsertionResult, }, tai64::Tai64, }; @@ -78,140 +42,8 @@ use std::{ sync::Arc, }; -impl DatabaseBlocks for Database { - fn block_id(&self, height: &BlockHeight) -> StorageResult { - self.get_block_id(height) - .and_then(|height| height.ok_or(not_found!("BlockId"))) - } - - fn blocks_ids( - &self, - start: Option, - direction: IterDirection, - ) -> BoxedIter<'_, StorageResult<(BlockHeight, BlockId)>> { - self.all_block_ids(start, direction) - .map(|result| result.map_err(StorageError::from)) - .into_boxed() - } - - fn ids_of_latest_block(&self) -> StorageResult<(BlockHeight, BlockId)> { - self.ids_of_latest_block() - .transpose() - .ok_or(not_found!("BlockId"))? - } -} - -impl DatabaseTransactions for Database { - fn tx_status(&self, tx_id: &TxId) -> StorageResult { - self.get_tx_status(tx_id) - .transpose() - .ok_or(not_found!("TransactionId"))? - } - - fn owned_transactions_ids( - &self, - owner: Address, - start: Option, - direction: IterDirection, - ) -> BoxedIter> { - let start = start.map(|tx_pointer| OwnedTransactionIndexCursor { - block_height: tx_pointer.block_height(), - tx_idx: tx_pointer.tx_index(), - }); - self.owned_transactions(owner, start, Some(direction)) - .map(|result| result.map_err(StorageError::from)) - .into_boxed() - } -} - -impl DatabaseMessages for Database { - fn owned_message_ids( - &self, - owner: &Address, - start_message_id: Option, - direction: IterDirection, - ) -> BoxedIter<'_, StorageResult> { - self.owned_message_ids(owner, start_message_id, Some(direction)) - .map(|result| result.map_err(StorageError::from)) - .into_boxed() - } - - fn all_messages( - &self, - start_message_id: Option, - direction: IterDirection, - ) -> BoxedIter<'_, StorageResult> { - self.all_messages(start_message_id, Some(direction)) - .map(|result| result.map_err(StorageError::from)) - .into_boxed() - } - - fn message_is_spent(&self, nonce: &Nonce) -> StorageResult { - self.message_is_spent(nonce) - } - - fn message_exists(&self, nonce: &Nonce) -> StorageResult { - self.message_exists(nonce) - } -} - -impl DatabaseCoins for Database { - fn owned_coins_ids( - &self, - owner: &Address, - start_coin: Option, - direction: IterDirection, - ) -> BoxedIter<'_, StorageResult> { - self.owned_coins_ids(owner, start_coin, Some(direction)) - .map(|res| res.map_err(StorageError::from)) - .into_boxed() - } -} - -impl DatabaseContracts for Database { - fn contract_balances( - &self, - contract: ContractId, - start_asset: Option, - direction: IterDirection, - ) -> BoxedIter> { - self.contract_balances(contract, start_asset, Some(direction)) - .map(move |result| { - result - .map_err(StorageError::from) - .map(|(asset_id, amount)| ContractBalance { - owner: contract, - amount, - asset_id, - }) - }) - .into_boxed() - } -} - -impl DatabaseChain for Database { - fn chain_name(&self) -> StorageResult { - pub const DEFAULT_NAME: &str = "Fuel.testnet"; - - Ok(self - .get_chain_name()? - .unwrap_or_else(|| DEFAULT_NAME.to_string())) - } - - fn da_height(&self) -> StorageResult { - #[cfg(feature = "relayer")] - { - use fuel_core_relayer::ports::RelayerDb; - self.get_finalized_da_height() - } - #[cfg(not(feature = "relayer"))] - { - Ok(0u64.into()) - } - } -} - -impl DatabasePort for Database {} +mod off_chain; +mod on_chain; #[async_trait] impl TxPoolPort for TxPoolAdapter { @@ -253,7 +85,7 @@ impl DatabaseMessageProof for Database { } #[async_trait] -impl DryRunExecution for BlockProducerAdapter { +impl BlockProducerPort for BlockProducerAdapter { async fn dry_run_tx( &self, transaction: Transaction, @@ -266,8 +98,6 @@ impl DryRunExecution for BlockProducerAdapter { } } -impl BlockProducerPort for BlockProducerAdapter {} - #[async_trait::async_trait] impl P2pPort for P2PAdapter { async fn all_peer_info(&self) -> anyhow::Result> { @@ -305,3 +135,13 @@ impl P2pPort for P2PAdapter { } } } + +impl worker::BlockImporter for BlockImporterAdapter { + fn block_events(&self) -> BoxStream { + use futures::StreamExt; + fuel_core_services::stream::IntoBoxStream::into_boxed( + tokio_stream::wrappers::BroadcastStream::new(self.block_importer.subscribe()) + .filter_map(|r| futures::future::ready(r.ok())), + ) + } +} diff --git a/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs new file mode 100644 index 00000000000..86fc7002a02 --- /dev/null +++ b/crates/fuel-core/src/service/adapters/graphql_api/off_chain.rs @@ -0,0 +1,117 @@ +use crate::{ + database::{ + transactions::OwnedTransactionIndexCursor, + Database, + }, + fuel_core_graphql_api::{ + database::OffChainView, + ports::{ + worker, + OffChainDatabase, + }, + }, +}; +use fuel_core_storage::{ + iter::{ + BoxedIter, + IntoBoxedIter, + IterDirection, + }, + not_found, + transactional::AtomicView, + Error as StorageError, + Result as StorageResult, +}; +use fuel_core_txpool::types::TxId; +use fuel_core_types::{ + fuel_tx::{ + Address, + Bytes32, + TxPointer, + UtxoId, + }, + fuel_types::{ + BlockHeight, + Nonce, + }, + services::txpool::TransactionStatus, +}; +use std::sync::Arc; + +impl OffChainDatabase for Database { + fn owned_message_ids( + &self, + owner: &Address, + start_message_id: Option, + direction: IterDirection, + ) -> BoxedIter<'_, StorageResult> { + self.owned_message_ids(owner, start_message_id, Some(direction)) + .map(|result| result.map_err(StorageError::from)) + .into_boxed() + } + + fn owned_coins_ids( + &self, + owner: &Address, + start_coin: Option, + direction: IterDirection, + ) -> BoxedIter<'_, StorageResult> { + self.owned_coins_ids(owner, start_coin, Some(direction)) + .map(|res| res.map_err(StorageError::from)) + .into_boxed() + } + + fn tx_status(&self, tx_id: &TxId) -> StorageResult { + self.get_tx_status(tx_id) + .transpose() + .ok_or(not_found!("TransactionId"))? + } + + fn owned_transactions_ids( + &self, + owner: Address, + start: Option, + direction: IterDirection, + ) -> BoxedIter> { + let start = start.map(|tx_pointer| OwnedTransactionIndexCursor { + block_height: tx_pointer.block_height(), + tx_idx: tx_pointer.tx_index(), + }); + self.owned_transactions(owner, start, Some(direction)) + .map(|result| result.map_err(StorageError::from)) + .into_boxed() + } +} + +impl AtomicView for Database { + fn view_at(&self, _: BlockHeight) -> StorageResult { + unimplemented!( + "Unimplemented until of the https://github.com/FuelLabs/fuel-core/issues/451" + ) + } + + fn latest_view(&self) -> OffChainView { + // TODO: https://github.com/FuelLabs/fuel-core/issues/1581 + Arc::new(self.clone()) + } +} + +impl worker::OffChainDatabase for Database { + fn record_tx_id_owner( + &mut self, + owner: &Address, + block_height: BlockHeight, + tx_idx: u16, + tx_id: &Bytes32, + ) -> StorageResult> { + Database::record_tx_id_owner(self, owner, block_height, tx_idx, tx_id) + } + + fn update_tx_status( + &mut self, + id: &Bytes32, + status: TransactionStatus, + ) -> StorageResult> { + Database::update_tx_status(self, id, status) + } +} diff --git a/crates/fuel-core/src/service/adapters/graphql_api/on_chain.rs b/crates/fuel-core/src/service/adapters/graphql_api/on_chain.rs new file mode 100644 index 00000000000..dd9c9937ffa --- /dev/null +++ b/crates/fuel-core/src/service/adapters/graphql_api/on_chain.rs @@ -0,0 +1,140 @@ +use crate::{ + database::Database, + fuel_core_graphql_api::{ + database::OnChainView, + ports::{ + DatabaseBlocks, + DatabaseChain, + DatabaseContracts, + DatabaseMessages, + OnChainDatabase, + }, + }, +}; +use fuel_core_storage::{ + iter::{ + BoxedIter, + IntoBoxedIter, + IterDirection, + }, + not_found, + transactional::AtomicView, + Error as StorageError, + Result as StorageResult, +}; +use fuel_core_txpool::types::ContractId; +use fuel_core_types::{ + blockchain::primitives::{ + BlockId, + DaBlockHeight, + }, + entities::message::Message, + fuel_tx::AssetId, + fuel_types::{ + BlockHeight, + Nonce, + }, + services::graphql_api::ContractBalance, +}; +use std::sync::Arc; + +impl DatabaseBlocks for Database { + fn block_id(&self, height: &BlockHeight) -> StorageResult { + self.get_block_id(height) + .and_then(|height| height.ok_or(not_found!("BlockId"))) + } + + fn blocks_ids( + &self, + start: Option, + direction: IterDirection, + ) -> BoxedIter<'_, StorageResult<(BlockHeight, BlockId)>> { + self.all_block_ids(start, direction) + .map(|result| result.map_err(StorageError::from)) + .into_boxed() + } + + fn ids_of_latest_block(&self) -> StorageResult<(BlockHeight, BlockId)> { + self.ids_of_latest_block() + .transpose() + .ok_or(not_found!("BlockId"))? + } +} + +impl DatabaseMessages for Database { + fn all_messages( + &self, + start_message_id: Option, + direction: IterDirection, + ) -> BoxedIter<'_, StorageResult> { + self.all_messages(start_message_id, Some(direction)) + .map(|result| result.map_err(StorageError::from)) + .into_boxed() + } + + fn message_is_spent(&self, nonce: &Nonce) -> StorageResult { + self.message_is_spent(nonce) + } + + fn message_exists(&self, nonce: &Nonce) -> StorageResult { + self.message_exists(nonce) + } +} + +impl DatabaseContracts for Database { + fn contract_balances( + &self, + contract: ContractId, + start_asset: Option, + direction: IterDirection, + ) -> BoxedIter> { + self.contract_balances(contract, start_asset, Some(direction)) + .map(move |result| { + result + .map_err(StorageError::from) + .map(|(asset_id, amount)| ContractBalance { + owner: contract, + amount, + asset_id, + }) + }) + .into_boxed() + } +} + +impl DatabaseChain for Database { + fn chain_name(&self) -> StorageResult { + pub const DEFAULT_NAME: &str = "Fuel.testnet"; + + Ok(self + .get_chain_name()? + .unwrap_or_else(|| DEFAULT_NAME.to_string())) + } + + fn da_height(&self) -> StorageResult { + #[cfg(feature = "relayer")] + { + use fuel_core_relayer::ports::RelayerDb; + self.get_finalized_da_height() + } + #[cfg(not(feature = "relayer"))] + { + Ok(0u64.into()) + } + } +} + +impl OnChainDatabase for Database {} + +impl AtomicView for Database { + fn view_at(&self, _: BlockHeight) -> StorageResult { + unimplemented!( + "Unimplemented until of the https://github.com/FuelLabs/fuel-core/issues/451" + ) + } + + fn latest_view(&self) -> OnChainView { + // TODO: https://github.com/FuelLabs/fuel-core/issues/1581 + Arc::new(self.clone()) + } +} diff --git a/crates/fuel-core/src/service/adapters/txpool.rs b/crates/fuel-core/src/service/adapters/txpool.rs index 6f1593f6d77..ccd33474df6 100644 --- a/crates/fuel-core/src/service/adapters/txpool.rs +++ b/crates/fuel-core/src/service/adapters/txpool.rs @@ -7,7 +7,6 @@ use crate::{ }; use fuel_core_services::stream::BoxStream; use fuel_core_storage::{ - not_found, tables::{ Coins, ContractsRawCode, @@ -33,7 +32,7 @@ use fuel_core_types::{ Nonce, }, services::{ - block_importer::ImportResult, + block_importer::SharedImportResult, p2p::{ GossipsubMessageAcceptance, GossipsubMessageInfo, @@ -44,7 +43,7 @@ use fuel_core_types::{ use std::sync::Arc; impl BlockImporter for BlockImporterAdapter { - fn block_events(&self) -> BoxStream> { + fn block_events(&self) -> BoxStream { use tokio_stream::{ wrappers::BroadcastStream, StreamExt, @@ -144,13 +143,4 @@ impl fuel_core_txpool::ports::TxPoolDb for Database { fn current_block_height(&self) -> StorageResult { self.latest_height() } - - fn transaction_status( - &self, - tx_id: &fuel_core_types::fuel_types::Bytes32, - ) -> StorageResult { - self.get_tx_status(tx_id) - .transpose() - .ok_or(not_found!("TransactionId"))? - } } diff --git a/crates/fuel-core/src/service/genesis.rs b/crates/fuel-core/src/service/genesis.rs index 8039f438d12..9942df0a810 100644 --- a/crates/fuel-core/src/service/genesis.rs +++ b/crates/fuel-core/src/service/genesis.rs @@ -136,7 +136,8 @@ fn import_genesis_block( (), (), ); - importer.commit_result(UncommittedImportResult::new( + // We commit Genesis block before start of any service, so there is no listeners. + importer.commit_result_without_awaiting_listeners(UncommittedImportResult::new( ImportResult::new_from_local(block, vec![]), database_transaction, ))?; diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 1523fe41c15..ba8dc05e93a 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -3,6 +3,7 @@ use super::adapters::P2PAdapter; use crate::{ database::Database, + fuel_core_graphql_api, fuel_core_graphql_api::Config as GraphQLConfig, schema::build_schema, service::{ @@ -41,7 +42,7 @@ pub type BlockProducerService = fuel_core_producer::block_producer::Producer< TxPoolAdapter, ExecutorAdapter, >; -pub type GraphQL = crate::fuel_core_graphql_api::service::Service; +pub type GraphQL = crate::fuel_core_graphql_api::api_service::Service; pub fn init_sub_services( config: &Config, @@ -189,20 +190,28 @@ pub fn init_sub_services( ) .data(database.clone()); - let graph_ql = crate::fuel_core_graphql_api::service::new_service( - GraphQLConfig { - addr: config.addr, - utxo_validation: config.utxo_validation, - debug: config.debug, - vm_backtrace: config.vm.backtrace, - min_gas_price: config.txpool.min_gas_price, - max_tx: config.txpool.max_tx, - max_depth: config.txpool.max_depth, - consensus_parameters: config.chain_conf.consensus_parameters.clone(), - consensus_key: config.consensus_key.clone(), - }, + let graphql_worker = fuel_core_graphql_api::worker_service::new_service( + importer_adapter.clone(), + database.clone(), + ); + + let graphql_config = GraphQLConfig { + addr: config.addr, + utxo_validation: config.utxo_validation, + debug: config.debug, + vm_backtrace: config.vm.backtrace, + min_gas_price: config.txpool.min_gas_price, + max_tx: config.txpool.max_tx, + max_depth: config.txpool.max_depth, + consensus_parameters: config.chain_conf.consensus_parameters.clone(), + consensus_key: config.consensus_key.clone(), + }; + + let graph_ql = fuel_core_graphql_api::api_service::new_service( + graphql_config, schema, - Box::new(database.clone()), + database.clone(), + database.clone(), Box::new(tx_pool_adapter), Box::new(producer_adapter), Box::new(poa_adapter.clone()), @@ -249,5 +258,7 @@ pub fn init_sub_services( } } + services.push(Box::new(graphql_worker)); + Ok((services, shared)) } diff --git a/crates/services/consensus_module/poa/src/ports.rs b/crates/services/consensus_module/poa/src/ports.rs index fdb8a2d11de..c93180645bc 100644 --- a/crates/services/consensus_module/poa/src/ports.rs +++ b/crates/services/consensus_module/poa/src/ports.rs @@ -66,10 +66,11 @@ pub trait BlockProducer: Send + Sync { } #[cfg_attr(test, mockall::automock(type Database=EmptyStorage;))] +#[async_trait::async_trait] pub trait BlockImporter: Send + Sync { type Database; - fn commit_result( + async fn commit_result( &self, result: UncommittedImportResult>, ) -> anyhow::Result<()>; diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 3ec7b8727d8..4fd65a220e4 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -356,10 +356,12 @@ where consensus: seal, }; // Import the sealed block - self.block_importer.commit_result(Uncommitted::new( - ImportResult::new_from_local(block, tx_status), - db_transaction, - ))?; + self.block_importer + .commit_result(Uncommitted::new( + ImportResult::new_from_local(block, tx_status), + db_transaction, + )) + .await?; // Update last block time self.last_height = height; diff --git a/crates/services/executor/src/executor.rs b/crates/services/executor/src/executor.rs index c3290cf3eb5..a2041c56f4d 100644 --- a/crates/services/executor/src/executor.rs +++ b/crates/services/executor/src/executor.rs @@ -14,7 +14,6 @@ use fuel_core_storage::{ ContractsLatestUtxo, Messages, ProcessedTransactions, - Receipts, SpentMessages, }, transactional::{ @@ -23,7 +22,6 @@ use fuel_core_storage::{ }, StorageAsMut, StorageAsRef, - StorageInspect, }; use fuel_core_types::{ blockchain::{ @@ -45,11 +43,9 @@ use fuel_core_types::{ fuel_tx::{ field::{ InputContract, - Inputs, MintAmount, MintAssetId, OutputContract, - Outputs, TxPointer as TxPointerField, }, input, @@ -79,7 +75,6 @@ use fuel_core_types::{ Transaction, TxId, TxPointer, - UniqueIdentifier, UtxoId, }, fuel_types::{ @@ -123,7 +118,6 @@ use fuel_core_types::{ TransactionValidityError, UncommittedResult, }, - txpool::TransactionStatus, }, }; use parking_lot::Mutex as ParkingMutex; @@ -267,11 +261,11 @@ where let ( ExecutionResult { - block, skipped_transactions, + tx_status, .. }, - temporary_db, + _temporary_db, ) = self .execute_without_commit(ExecutionTypes::DryRun(component), options)? .into(); @@ -281,19 +275,11 @@ where return Err(err) } - block - .transactions() - .iter() - .map(|tx| { - let id = tx.id(&self.config.consensus_parameters.chain_id); - StorageInspect::::get(temporary_db.as_ref(), &id) - .transpose() - .unwrap_or_else(|| Ok(Default::default())) - .map(|v| v.into_owned()) - }) - .collect::>, _>>() - .map_err(Into::into) - // drop `temporary_db` without committing to avoid altering state. + Ok(tx_status + .into_iter() + .map(|tx| tx.receipts) + .collect::>>()) + // drop `_temporary_db` without committing to avoid altering state. } } @@ -447,16 +433,6 @@ where tx_status, }; - // ------------ GraphQL API Functionality BEGIN ------------ - - // save the status for every transaction using the finalized block id - self.persist_transaction_status(&result, block_st_transaction.as_mut())?; - - // save the associated owner for each transaction in the block - self.index_tx_owners_for_block(&result.block, block_st_transaction.as_mut())?; - - // ------------ GraphQL API Functionality END ------------ - // Get the complete fuel block. Ok(UncommittedResult::new(result, block_st_transaction)) } @@ -807,6 +783,7 @@ where execution_data.tx_status.push(TransactionExecutionStatus { id: coinbase_id, result: TransactionExecutionResult::Success { result: None }, + receipts: vec![], }); if block_st_transaction @@ -895,7 +872,10 @@ where debug_assert_eq!(tx.id(&self.config.consensus_parameters.chain_id), tx_id); } - // Wrap inputs in the execution kind. + // TODO: We need to call this function before `vm.transact` but we can't do that because of + // `Checked` immutability requirements. So we do it here after its execution for now. + // But it should be fixed in the future. + // https://github.com/FuelLabs/fuel-vm/issues/651 self.compute_inputs( match execution_kind { ExecutionKind::DryRun => ExecutionTypes::DryRun(tx.inputs_mut()), @@ -970,9 +950,6 @@ where .storage::() .insert(&tx_id, &())?; - // persist receipts - self.persist_receipts(&tx_id, &receipts, tx_st_transaction.as_mut())?; - let status = if reverted { self.log_backtrace(&vm, &receipts); // get reason for revert @@ -1004,14 +981,15 @@ where .checked_add(tx_fee) .ok_or(ExecutorError::FeeOverflow)?; execution_data.used_gas = execution_data.used_gas.saturating_add(used_gas); + execution_data + .message_ids + .extend(receipts.iter().filter_map(|r| r.message_id())); // queue up status for this tx to be stored once block id is finalized. execution_data.tx_status.push(TransactionExecutionStatus { id: tx_id, result: status, + receipts, }); - execution_data - .message_ids - .extend(receipts.iter().filter_map(|r| r.message_id())); Ok(final_tx) } @@ -1070,7 +1048,7 @@ where | Input::MessageDataSigned(MessageDataSigned { nonce, .. }) | Input::MessageDataPredicate(MessageDataPredicate { nonce, .. }) => { // Eagerly return already spent if status is known. - if db.message_is_spent(nonce)? { + if db.storage::().contains_key(nonce)? { return Err( TransactionValidityError::MessageAlreadySpent(*nonce).into() ) @@ -1545,130 +1523,6 @@ where Ok(()) } - - fn persist_receipts( - &self, - tx_id: &TxId, - receipts: &[Receipt], - db: &mut D, - ) -> ExecutorResult<()> { - if db.storage::().insert(tx_id, receipts)?.is_some() { - return Err(ExecutorError::OutputAlreadyExists) - } - Ok(()) - } - - /// Associate all transactions within a block to their respective UTXO owners - fn index_tx_owners_for_block( - &self, - block: &Block, - block_st_transaction: &mut D, - ) -> ExecutorResult<()> { - for (tx_idx, tx) in block.transactions().iter().enumerate() { - let block_height = *block.header().height(); - let inputs; - let outputs; - let tx_idx = - u16::try_from(tx_idx).map_err(|_| ExecutorError::TooManyTransactions)?; - let tx_id = tx.id(&self.config.consensus_parameters.chain_id); - match tx { - Transaction::Script(tx) => { - inputs = tx.inputs().as_slice(); - outputs = tx.outputs().as_slice(); - } - Transaction::Create(tx) => { - inputs = tx.inputs().as_slice(); - outputs = tx.outputs().as_slice(); - } - Transaction::Mint(_) => continue, - } - self.persist_owners_index( - block_height, - inputs, - outputs, - &tx_id, - tx_idx, - block_st_transaction, - )?; - } - Ok(()) - } - - /// Index the tx id by owner for all of the inputs and outputs - fn persist_owners_index( - &self, - block_height: BlockHeight, - inputs: &[Input], - outputs: &[Output], - tx_id: &Bytes32, - tx_idx: u16, - db: &mut D, - ) -> ExecutorResult<()> { - let mut owners = vec![]; - for input in inputs { - if let Input::CoinSigned(CoinSigned { owner, .. }) - | Input::CoinPredicate(CoinPredicate { owner, .. }) = input - { - owners.push(owner); - } - } - - for output in outputs { - match output { - Output::Coin { to, .. } - | Output::Change { to, .. } - | Output::Variable { to, .. } => { - owners.push(to); - } - Output::Contract(_) | Output::ContractCreated { .. } => {} - } - } - - // dedupe owners from inputs and outputs prior to indexing - owners.sort(); - owners.dedup(); - - for owner in owners { - db.record_tx_id_owner(owner, block_height, tx_idx, tx_id)?; - } - - Ok(()) - } - - fn persist_transaction_status( - &self, - result: &ExecutionResult, - db: &mut D, - ) -> ExecutorResult<()> { - let time = result.block.header().time(); - let block_id = result.block.id(); - for TransactionExecutionStatus { id, result } in result.tx_status.iter() { - match result { - TransactionExecutionResult::Success { result } => { - db.update_tx_status( - id, - TransactionStatus::Success { - block_id, - time, - result: *result, - }, - )?; - } - TransactionExecutionResult::Failed { result, reason } => { - db.update_tx_status( - id, - TransactionStatus::Failed { - block_id, - time, - result: *result, - reason: reason.clone(), - }, - )?; - } - } - } - Ok(()) - } } trait Fee { diff --git a/crates/services/executor/src/ports.rs b/crates/services/executor/src/ports.rs index 0cb93e319e5..e9c5b1b9b4e 100644 --- a/crates/services/executor/src/ports.rs +++ b/crates/services/executor/src/ports.rs @@ -8,14 +8,12 @@ use fuel_core_storage::{ ContractsState, Messages, ProcessedTransactions, - Receipts, SpentMessages, }, transactional::Transactional, vm_storage::VmStorageRequirements, Error as StorageError, MerkleRootStorage, - StorageInspect, StorageMutate, StorageRead, }; @@ -25,18 +23,14 @@ use fuel_core_types::{ entities::message::Message, fuel_tx, fuel_tx::{ - Address, - Bytes32, TxId, UniqueIdentifier, }, fuel_types::{ - BlockHeight, ChainId, Nonce, }, fuel_vm::checked_transaction::CheckedTransaction, - services::txpool::TransactionStatus, }; use fuel_core_types::fuel_tx::ContractId; @@ -79,50 +73,20 @@ pub trait RelayerPort { ) -> anyhow::Result>; } -pub trait MessageIsSpent: - StorageInspect - + StorageInspect -{ - type Error; - - fn message_is_spent(&self, nonce: &Nonce) -> Result; -} - -pub trait TxIdOwnerRecorder { - type Error; - - fn record_tx_id_owner( - &mut self, - owner: &Address, - block_height: BlockHeight, - tx_idx: u16, - tx_id: &Bytes32, - ) -> Result, Self::Error>; - - fn update_tx_status( - &mut self, - id: &Bytes32, - status: TransactionStatus, - ) -> Result, Self::Error>; -} - // TODO: Remove `Clone` bound pub trait ExecutorDatabaseTrait: - StorageMutate + StorageMutate + StorageMutate + MerkleRootStorage - + MessageIsSpent + StorageMutate + StorageMutate + StorageMutate - + StorageMutate + StorageMutate - + StorageRead + + StorageRead + StorageMutate + MerkleRootStorage + VmStorageRequirements + Transactional - + TxIdOwnerRecorder + Clone { } diff --git a/crates/services/importer/Cargo.toml b/crates/services/importer/Cargo.toml index 7cd93840428..6b47a8272f3 100644 --- a/crates/services/importer/Cargo.toml +++ b/crates/services/importer/Cargo.toml @@ -17,6 +17,7 @@ fuel-core-metrics = { workspace = true } fuel-core-storage = { workspace = true } fuel-core-types = { workspace = true } tokio = { workspace = true, features = ["full"] } +tokio-rayon = { workspace = true } tracing = { workspace = true } [dev-dependencies] diff --git a/crates/services/importer/src/config.rs b/crates/services/importer/src/config.rs index c551127c68a..0e9d938be93 100644 --- a/crates/services/importer/src/config.rs +++ b/crates/services/importer/src/config.rs @@ -22,7 +22,7 @@ impl Config { impl Default for Config { fn default() -> Self { Self { - max_block_notify_buffer: 1 << 10, + max_block_notify_buffer: 1, metrics: false, chain_id: ChainId::default(), } diff --git a/crates/services/importer/src/importer.rs b/crates/services/importer/src/importer.rs index 056c4010410..d75709e1c9e 100644 --- a/crates/services/importer/src/importer.rs +++ b/crates/services/importer/src/importer.rs @@ -29,6 +29,7 @@ use fuel_core_types::{ services::{ block_importer::{ ImportResult, + SharedImportResult, UncommittedResult, }, executor, @@ -38,7 +39,10 @@ use fuel_core_types::{ }; use std::{ ops::Deref, - sync::Arc, + sync::{ + Arc, + Mutex, + }, time::{ Instant, SystemTime, @@ -47,6 +51,7 @@ use std::{ }; use tokio::sync::{ broadcast, + oneshot, TryAcquireError, }; @@ -105,10 +110,14 @@ impl PartialEq for Error { pub struct Importer { database: D, - executor: E, - verifier: V, + executor: Arc, + verifier: Arc, chain_id: ChainId, - broadcast: broadcast::Sender>, + broadcast: broadcast::Sender, + /// The channel to notify about the end of the processing of the previous block by all listeners. + /// It is used to await until all receivers of the notification process the `SharedImportResult` + /// before starting committing a new block. + prev_block_process_result: Mutex>>, guard: tokio::sync::Semaphore, } @@ -118,15 +127,16 @@ impl Importer { Self { database, - executor, - verifier, + executor: Arc::new(executor), + verifier: Arc::new(verifier), chain_id: config.chain_id, broadcast, + prev_block_process_result: Default::default(), guard: tokio::sync::Semaphore::new(1), } } - pub fn subscribe(&self) -> broadcast::Receiver> { + pub fn subscribe(&self) -> broadcast::Receiver { self.broadcast.subscribe() } @@ -162,7 +172,7 @@ where /// /// Only one commit may be in progress at the time. All other calls will fail. /// Returns an error if called while another call is in progress. - pub fn commit_result( + pub async fn commit_result( &self, result: UncommittedResult>, ) -> Result<(), Error> @@ -170,9 +180,36 @@ where ExecutorDatabase: ports::ExecutorDatabase, { let _guard = self.lock()?; + // It is safe to unwrap the channel because we have the `_guard`. + let previous_block_result = self + .prev_block_process_result + .lock() + .expect("poisoned") + .take(); + + // Await until all receivers of the notification process the result. + if let Some(channel) = previous_block_result { + let _ = channel.await; + } + self._commit_result(result) } + /// The method works in the same way as [`Importer::commit_result`], but it doesn't + /// wait for listeners to process the result. + pub fn commit_result_without_awaiting_listeners( + &self, + result: UncommittedResult>, + ) -> Result<(), Error> + where + ExecutorDatabase: ports::ExecutorDatabase, + { + let _guard = self.lock()?; + self._commit_result(result)?; + Ok(()) + } + + /// The method commits the result of the block execution and notifies about a new imported block. #[tracing::instrument( skip_all, fields( @@ -270,7 +307,13 @@ where .set(current_time); tracing::info!("Committed block {:#x}", result.sealed_block.entity.id()); - let _ = self.broadcast.send(Arc::new(result)); + + // The `tokio::sync::oneshot::Sender` is used to notify about the end + // of the processing of a new block by all listeners. + let (sender, receiver) = oneshot::channel(); + let _ = self.broadcast.send(Arc::new(Awaiter::new(result, sender))); + *self.prev_block_process_result.lock().expect("poisoned") = Some(receiver); + Ok(()) } @@ -324,13 +367,24 @@ where pub fn verify_and_execute_block( &self, sealed_block: SealedBlock, + ) -> Result>, Error> { + Self::verify_and_execute_block_inner( + self.executor.clone(), + self.verifier.clone(), + sealed_block, + ) + } + + fn verify_and_execute_block_inner( + executor: Arc, + verifier: Arc, + sealed_block: SealedBlock, ) -> Result>, Error> { let consensus = sealed_block.consensus; let block = sealed_block.entity; let sealed_block_id = block.id(); - let result_of_verification = - self.verifier.verify_block_fields(&consensus, &block); + let result_of_verification = verifier.verify_block_fields(&consensus, &block); if let Err(err) = result_of_verification { return Err(Error::FailedVerification(err)) } @@ -350,8 +404,7 @@ where tx_status, }, db_tx, - ) = self - .executor + ) = executor .execute_without_commit(block) .map_err(Error::FailedExecution)? .into(); @@ -380,19 +433,47 @@ where impl Importer where - IDatabase: ImporterDatabase, - E: Executor, - V: BlockVerifier, + IDatabase: ImporterDatabase + 'static, + E: Executor + 'static, + V: BlockVerifier + 'static, { /// The method validates the `Block` fields and commits the `SealedBlock`. /// It is a combination of the [`Importer::verify_and_execute_block`] and [`Importer::commit_result`]. - pub fn execute_and_commit(&self, sealed_block: SealedBlock) -> Result<(), Error> { + pub async fn execute_and_commit( + &self, + sealed_block: SealedBlock, + ) -> Result<(), Error> { let _guard = self.lock()?; + + let executor = self.executor.clone(); + let verifier = self.verifier.clone(); + let (result, execute_time) = tokio_rayon::spawn_fifo(|| { + let start = Instant::now(); + let result = + Self::verify_and_execute_block_inner(executor, verifier, sealed_block); + let execute_time = start.elapsed().as_secs_f64(); + (result, execute_time) + }) + .await; + + let result = result?; + + // It is safe to unwrap the channel because we have the `_guard`. + let previous_block_result = self + .prev_block_process_result + .lock() + .expect("poisoned") + .take(); + + // Await until all receivers of the notification process the result. + if let Some(channel) = previous_block_result { + let _ = channel.await; + } + let start = Instant::now(); - let result = self.verify_and_execute_block(sealed_block)?; let commit_result = self._commit_result(result); - // record the execution time to prometheus - let time = start.elapsed().as_secs_f64(); + let commit_time = start.elapsed().as_secs_f64(); + let time = execute_time + commit_time; importer_metrics().execute_and_commit_duration.observe(time); // return execution result commit_result @@ -412,3 +493,34 @@ impl ShouldBeUnique for Option { } } } + +/// The wrapper around `ImportResult` to notify about the end of the processing of a new block. +struct Awaiter { + result: ImportResult, + release_channel: Option>, +} + +impl Drop for Awaiter { + fn drop(&mut self) { + if let Some(release_channel) = core::mem::take(&mut self.release_channel) { + let _ = release_channel.send(()); + } + } +} + +impl Deref for Awaiter { + type Target = ImportResult; + + fn deref(&self) -> &Self::Target { + &self.result + } +} + +impl Awaiter { + fn new(result: ImportResult, channel: oneshot::Sender<()>) -> Self { + Self { + result, + release_channel: Some(channel), + } + } +} diff --git a/crates/services/importer/src/importer/test.rs b/crates/services/importer/src/importer/test.rs index 897be9f9945..717271093fd 100644 --- a/crates/services/importer/src/importer/test.rs +++ b/crates/services/importer/src/importer/test.rs @@ -261,12 +261,13 @@ where => Err(Error::NotUnique(0u32.into())); "fails to import genesis block when block exists for height 0" )] -fn commit_result_genesis( +#[tokio::test] +async fn commit_result_genesis( sealed_block: SealedBlock, underlying_db: impl Fn() -> MockDatabase, executor_db: impl Fn() -> MockDatabase, ) -> Result<(), Error> { - commit_result_assert(sealed_block, underlying_db(), executor_db()) + commit_result_assert(sealed_block, underlying_db(), executor_db()).await } //////////////////////////// PoA Block //////////////////////////// @@ -333,7 +334,8 @@ fn commit_result_genesis( => Err(storage_failure_error()); "fails to import block when executor db fails to find block" )] -fn commit_result_and_execute_and_commit_poa( +#[tokio::test] +async fn commit_result_and_execute_and_commit_poa( sealed_block: SealedBlock, underlying_db: impl Fn() -> MockDatabase, executor_db: impl Fn() -> MockDatabase, @@ -342,18 +344,19 @@ fn commit_result_and_execute_and_commit_poa( // validation rules(-> test cases) during committing the result. let height = *sealed_block.entity.header().height(); let commit_result = - commit_result_assert(sealed_block.clone(), underlying_db(), executor_db()); + commit_result_assert(sealed_block.clone(), underlying_db(), executor_db()).await; let execute_and_commit_result = execute_and_commit_assert( sealed_block, underlying_db(), executor(ok(ex_result(height.into(), 0)), executor_db()), verifier(ok(())), - ); + ) + .await; assert_eq!(commit_result, execute_and_commit_result); commit_result } -fn commit_result_assert( +async fn commit_result_assert( sealed_block: SealedBlock, underlying_db: MockDatabase, executor_db: MockDatabase, @@ -366,23 +369,22 @@ fn commit_result_assert( ); let mut imported_blocks = importer.subscribe(); - let result = importer.commit_result(uncommitted_result); + let result = importer.commit_result(uncommitted_result).await; if result.is_ok() { let actual_sealed_block = imported_blocks.try_recv().unwrap(); assert_eq!(actual_sealed_block.sealed_block, expected_to_broadcast); - assert_eq!( - imported_blocks - .try_recv() - .expect_err("We should broadcast only one block"), - TryRecvError::Empty - ) + if let Err(err) = imported_blocks.try_recv() { + assert_eq!(err, TryRecvError::Empty); + } else { + panic!("We should broadcast only one block"); + } } result } -fn execute_and_commit_assert( +async fn execute_and_commit_assert( sealed_block: SealedBlock, underlying_db: MockDatabase, executor: MockExecutor, @@ -392,24 +394,24 @@ fn execute_and_commit_assert( let importer = Importer::new(Default::default(), underlying_db, executor, verifier); let mut imported_blocks = importer.subscribe(); - let result = importer.execute_and_commit(sealed_block); + let result = importer.execute_and_commit(sealed_block).await; if result.is_ok() { let actual_sealed_block = imported_blocks.try_recv().unwrap(); assert_eq!(actual_sealed_block.sealed_block, expected_to_broadcast); - assert_eq!( - imported_blocks - .try_recv() - .expect_err("We should broadcast only one block"), - TryRecvError::Empty - ) + + if let Err(err) = imported_blocks.try_recv() { + assert_eq!(err, TryRecvError::Empty); + } else { + panic!("We should broadcast only one block"); + } } result } -#[test] -fn commit_result_fail_when_locked() { +#[tokio::test] +async fn commit_result_fail_when_locked() { let importer = Importer::new(Default::default(), MockDatabase::default(), (), ()); let uncommitted_result = UncommittedResult::new( ImportResult::default(), @@ -418,13 +420,13 @@ fn commit_result_fail_when_locked() { let _guard = importer.lock(); assert_eq!( - importer.commit_result(uncommitted_result), + importer.commit_result(uncommitted_result).await, Err(Error::SemaphoreError(TryAcquireError::NoPermits)) ); } -#[test] -fn execute_and_commit_fail_when_locked() { +#[tokio::test] +async fn execute_and_commit_fail_when_locked() { let importer = Importer::new( Default::default(), MockDatabase::default(), @@ -434,7 +436,7 @@ fn execute_and_commit_fail_when_locked() { let _guard = importer.lock(); assert_eq!( - importer.execute_and_commit(Default::default()), + importer.execute_and_commit(Default::default()).await, Err(Error::SemaphoreError(TryAcquireError::NoPermits)) ); } @@ -491,7 +493,8 @@ fn one_lock_at_the_same_time() { => Err(verification_failure_error()); "commit fails if verification fails" )] -fn execute_and_commit_and_verify_and_execute_block_poa( +#[tokio::test] +async fn execute_and_commit_and_verify_and_execute_block_poa( sealed_block: SealedBlock, block_after_execution: P, verifier_result: V, @@ -521,7 +524,8 @@ where executor_db(ok(Some(previous_height)), ok(true), commits)(), ), verifier(verifier_result), - ); + ) + .await; assert_eq!(verify_and_execute_result, execute_and_commit_result); execute_and_commit_result } diff --git a/crates/services/importer/src/ports.rs b/crates/services/importer/src/ports.rs index 51c14e5085b..99f097fefe5 100644 --- a/crates/services/importer/src/ports.rs +++ b/crates/services/importer/src/ports.rs @@ -33,7 +33,7 @@ pub trait Executor: Send + Sync { } /// The database port used by the block importer. -pub trait ImporterDatabase { +pub trait ImporterDatabase: Send + Sync { /// Returns the latest block height. fn latest_block_height(&self) -> StorageResult>; /// Update metadata about the total number of transactions on the chain. @@ -57,7 +57,7 @@ pub trait ExecutorDatabase: ImporterDatabase { #[cfg_attr(test, mockall::automock)] /// The verifier of the block. -pub trait BlockVerifier { +pub trait BlockVerifier: Send + Sync { /// Verifies the consistency of the block fields for the block's height. /// It includes the verification of **all** fields, it includes the consensus rules for /// the corresponding height. diff --git a/crates/services/txpool/src/mock_db.rs b/crates/services/txpool/src/mock_db.rs index 157e5e7f27a..5435585a3f1 100644 --- a/crates/services/txpool/src/mock_db.rs +++ b/crates/services/txpool/src/mock_db.rs @@ -95,11 +95,4 @@ impl TxPoolDb for MockDb { fn current_block_height(&self) -> StorageResult { Ok(Default::default()) } - - fn transaction_status( - &self, - _tx_id: &fuel_core_types::fuel_types::Bytes32, - ) -> StorageResult { - unimplemented!() - } } diff --git a/crates/services/txpool/src/ports.rs b/crates/services/txpool/src/ports.rs index de51f429e93..375d7066982 100644 --- a/crates/services/txpool/src/ports.rs +++ b/crates/services/txpool/src/ports.rs @@ -11,18 +11,16 @@ use fuel_core_types::{ }, fuel_types::{ BlockHeight, - Bytes32, ContractId, Nonce, }, services::{ - block_importer::ImportResult, + block_importer::SharedImportResult, p2p::{ GossipsubMessageAcceptance, GossipsubMessageInfo, NetworkData, }, - txpool::TransactionStatus, }, }; use std::sync::Arc; @@ -46,7 +44,7 @@ pub trait PeerToPeer: Send + Sync { pub trait BlockImporter: Send + Sync { /// Wait until the next block is available - fn block_events(&self) -> BoxStream>; + fn block_events(&self) -> BoxStream; } pub trait TxPoolDb: Send + Sync { @@ -59,6 +57,4 @@ pub trait TxPoolDb: Send + Sync { fn is_message_spent(&self, message_id: &Nonce) -> StorageResult; fn current_block_height(&self) -> StorageResult; - - fn transaction_status(&self, tx_id: &Bytes32) -> StorageResult; } diff --git a/crates/services/txpool/src/service.rs b/crates/services/txpool/src/service.rs index e247e196a77..38ac9b75929 100644 --- a/crates/services/txpool/src/service.rs +++ b/crates/services/txpool/src/service.rs @@ -34,7 +34,6 @@ use fuel_core_types::{ Bytes32, }, services::{ - block_importer::ImportResult, p2p::{ GossipData, GossipsubMessageAcceptance, @@ -52,6 +51,7 @@ use fuel_core_types::{ }; use anyhow::anyhow; +use fuel_core_types::services::block_importer::SharedImportResult; use parking_lot::Mutex as ParkingMutex; use std::{ sync::Arc, @@ -143,7 +143,7 @@ impl Clone for SharedState { pub struct Task { gossiped_tx_stream: BoxStream, - committed_block_stream: BoxStream>, + committed_block_stream: BoxStream, shared: SharedState, ttl_timer: tokio::time::Interval, } @@ -201,14 +201,13 @@ where result = self.committed_block_stream.next() => { if let Some(result) = result { - let block = result + let block = &result .sealed_block - .entity - .compress(&self.shared.consensus_params.chain_id); + .entity; self.shared.txpool.lock().block_update( &self.shared.tx_status_sender, - block.header().height(), - block.transactions() + block, + &result.tx_status, ); should_continue = true; } else { diff --git a/crates/services/txpool/src/service/test_helpers.rs b/crates/services/txpool/src/service/test_helpers.rs index decaf2f98d1..3cf532bfa8b 100644 --- a/crates/services/txpool/src/service/test_helpers.rs +++ b/crates/services/txpool/src/service/test_helpers.rs @@ -21,7 +21,10 @@ use fuel_core_types::{ TransactionBuilder, Word, }, - services::p2p::GossipsubMessageAcceptance, + services::{ + block_importer::ImportResult, + p2p::GossipsubMessageAcceptance, + }, }; use std::cell::RefCell; @@ -103,7 +106,7 @@ mockall::mock! { pub Importer {} impl BlockImporter for Importer { - fn block_events(&self) -> BoxStream>; + fn block_events(&self) -> BoxStream; } } @@ -115,7 +118,7 @@ impl MockImporter { let stream = fuel_core_services::stream::unfold(blocks, |mut blocks| async { let block = blocks.pop(); if let Some(sealed_block) = block { - let result = + let result: SharedImportResult = Arc::new(ImportResult::new_from_local(sealed_block, vec![])); Some((result, blocks)) diff --git a/crates/services/txpool/src/txpool.rs b/crates/services/txpool/src/txpool.rs index 50c7d2484e0..1c3c0376e8d 100644 --- a/crates/services/txpool/src/txpool.rs +++ b/crates/services/txpool/src/txpool.rs @@ -35,8 +35,16 @@ use fuel_core_types::{ tai64::Tai64, }; +use crate::service::TxStatusMessage; use fuel_core_metrics::txpool_metrics::txpool_metrics; -use fuel_core_types::fuel_vm::checked_transaction::CheckPredicateParams; +use fuel_core_types::{ + blockchain::block::Block, + fuel_vm::checked_transaction::CheckPredicateParams, + services::{ + executor::TransactionExecutionStatus, + txpool::from_executor_to_status, + }, +}; use std::{ cmp::Reverse, collections::HashMap, @@ -315,14 +323,19 @@ where pub fn block_update( &mut self, tx_status_sender: &TxStatusChange, - height: &BlockHeight, - transactions: &[TxId], + block: &Block, + tx_status: &[TransactionExecutionStatus], // spend_outputs: [Input], added_outputs: [AddedOutputs] ) { - for tx_id in transactions { - let tx_id = *tx_id; - let result = self.database.transaction_status(&tx_id); - tx_status_sender.send_complete(tx_id, height, result); + let height = block.header().height(); + for status in tx_status { + let tx_id = status.id; + let status = from_executor_to_status(block, status.result.clone()); + tx_status_sender.send_complete( + tx_id, + height, + TxStatusMessage::Status(status), + ); self.remove_committed_tx(&tx_id); } } diff --git a/crates/storage/src/tables.rs b/crates/storage/src/tables.rs index 95f0c711006..1ec13b0f034 100644 --- a/crates/storage/src/tables.rs +++ b/crates/storage/src/tables.rs @@ -40,6 +40,7 @@ impl Mappable for FuelBlocks { /// Unique identifier of the fuel block. type Key = Self::OwnedKey; // TODO: Seems it would be faster to use `BlockHeight` as primary key. + // https://github.com/FuelLabs/fuel-core/issues/1580. type OwnedKey = BlockId; type Value = Self::OwnedValue; type OwnedValue = CompressedBlock; diff --git a/crates/storage/src/transactional.rs b/crates/storage/src/transactional.rs index d44041113b2..854557bd117 100644 --- a/crates/storage/src/transactional.rs +++ b/crates/storage/src/transactional.rs @@ -1,6 +1,7 @@ //! The primitives to work with storage in transactional mode. use crate::Result as StorageResult; +use fuel_core_types::fuel_types::BlockHeight; #[cfg_attr(feature = "test-helpers", mockall::automock(type Storage = crate::test_helpers::EmptyStorage;))] /// The types is transactional and may create `StorageTransaction`. @@ -75,3 +76,13 @@ impl StorageTransaction { self.transaction.commit() } } + +/// Provides a view of the storage at the given height. +/// It guarantees to be atomic, meaning the view is immutable to outside modifications. +pub trait AtomicView: Send + Sync { + /// Returns the view of the storage at the given `height`. + fn view_at(&self, height: BlockHeight) -> StorageResult; + + /// Returns the view of the storage for the latest block height. + fn latest_view(&self) -> View; +} diff --git a/crates/types/src/services/block_importer.rs b/crates/types/src/services/block_importer.rs index 494abb8b572..276a305b960 100644 --- a/crates/types/src/services/block_importer.rs +++ b/crates/types/src/services/block_importer.rs @@ -10,11 +10,16 @@ use crate::{ Uncommitted, }, }; +use core::ops::Deref; +use std::sync::Arc; /// The uncommitted result of the block importing. pub type UncommittedResult = Uncommitted; +/// The alias for the `ImportResult` that can be shared between threads. +pub type SharedImportResult = Arc + Send + Sync>; + /// The result of the block import. #[derive(Debug)] #[cfg_attr(any(test, feature = "test-helpers"), derive(Default))] @@ -27,6 +32,14 @@ pub struct ImportResult { pub source: Source, } +impl Deref for ImportResult { + type Target = Self; + + fn deref(&self) -> &Self::Target { + self + } +} + /// The source producer of the block. #[derive(Debug, Clone, Copy, PartialEq, Default)] pub enum Source { @@ -87,8 +100,8 @@ impl BlockImportInfo { } } -impl From<&ImportResult> for BlockImportInfo { - fn from(result: &ImportResult) -> Self { +impl From for BlockImportInfo { + fn from(result: SharedImportResult) -> Self { Self { block_header: result.sealed_block.entity.header().clone(), source: result.source, diff --git a/crates/types/src/services/executor.rs b/crates/types/src/services/executor.rs index 95efa755b77..f240b31bba7 100644 --- a/crates/types/src/services/executor.rs +++ b/crates/types/src/services/executor.rs @@ -9,6 +9,7 @@ use crate::{ primitives::BlockId, }, fuel_tx::{ + Receipt, TxId, UtxoId, ValidityError, @@ -53,6 +54,8 @@ pub struct TransactionExecutionStatus { pub id: Bytes32, /// The result of the executed transaction. pub result: TransactionExecutionResult, + /// The receipts generated by the executed transaction. + pub receipts: Vec, } /// The result of transaction execution. diff --git a/crates/types/src/services/txpool.rs b/crates/types/src/services/txpool.rs index c323761ec82..4cc483e6c7b 100644 --- a/crates/types/src/services/txpool.rs +++ b/crates/types/src/services/txpool.rs @@ -1,7 +1,10 @@ //! Types for interoperability with the txpool service use crate::{ - blockchain::primitives::BlockId, + blockchain::{ + block::Block, + primitives::BlockId, + }, fuel_asm::Word, fuel_tx::{ field::{ @@ -27,6 +30,7 @@ use crate::{ checked_transaction::Checked, ProgramState, }, + services::executor::TransactionExecutionResult, }; use fuel_vm_private::checked_transaction::CheckedTransaction; use std::{ @@ -199,6 +203,30 @@ pub enum TransactionStatus { }, } +/// Converts the transaction execution result to the transaction status. +pub fn from_executor_to_status( + block: &Block, + result: TransactionExecutionResult, +) -> TransactionStatus { + let time = block.header().time(); + let block_id = block.id(); + match result { + TransactionExecutionResult::Success { result } => TransactionStatus::Success { + block_id, + time, + result, + }, + TransactionExecutionResult::Failed { result, reason } => { + TransactionStatus::Failed { + block_id, + time, + result, + reason: reason.clone(), + } + } + } +} + #[allow(missing_docs)] #[derive(thiserror::Error, Debug, PartialEq, Eq, Clone)] #[non_exhaustive]