From 84ccb2e7e9ff2a44968ff2305ab2809fdd2e0fe6 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Wed, 14 Aug 2024 17:57:59 +0200 Subject: [PATCH 01/25] Create new port method for producing predefined blocks --- .../services/consensus_module/poa/Cargo.toml | 1 + .../consensus_module/poa/src/ports.rs | 6 + .../consensus_module/poa/src/service.rs | 56 +++++++++ .../consensus_module/poa/src/service_test.rs | 115 ++++++++++++++++++ 4 files changed, 178 insertions(+) diff --git a/crates/services/consensus_module/poa/Cargo.toml b/crates/services/consensus_module/poa/Cargo.toml index 3a96a22e015..cd1a4a0d1c8 100644 --- a/crates/services/consensus_module/poa/Cargo.toml +++ b/crates/services/consensus_module/poa/Cargo.toml @@ -28,6 +28,7 @@ mockall = { workspace = true } rand = { workspace = true } test-case = { workspace = true } tokio = { workspace = true, features = ["full", "test-util"] } +fuel-core-services = { workspace = true, features = ["test-helpers"] } [features] test-helpers = [ diff --git a/crates/services/consensus_module/poa/src/ports.rs b/crates/services/consensus_module/poa/src/ports.rs index 41ed8ad4adb..875c18eebc4 100644 --- a/crates/services/consensus_module/poa/src/ports.rs +++ b/crates/services/consensus_module/poa/src/ports.rs @@ -5,6 +5,7 @@ use fuel_core_storage::{ }; use fuel_core_types::{ blockchain::{ + block::Block, header::BlockHeader, primitives::DaBlockHeight, }, @@ -59,6 +60,11 @@ pub trait BlockProducer: Send + Sync { block_time: Tai64, source: TransactionsSource, ) -> anyhow::Result>; + + async fn produce_predefined_block( + &self, + block: &Block, + ) -> anyhow::Result>; } #[cfg_attr(test, mockall::automock)] diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 8a3e01fcca0..17a8ab5aade 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -139,6 +139,7 @@ pub struct MainTask { last_height: BlockHeight, last_timestamp: Tai64, last_block_created: Instant, + predefined_blocks: Vec, trigger: Trigger, /// Deadline clock, used by the triggers timer: DeadlineClock, @@ -157,6 +158,7 @@ where block_producer: B, block_importer: I, p2p_port: P, + predefined_blocks: Vec, ) -> Self { let tx_status_update_stream = txpool.transaction_status_events(); let (request_sender, request_receiver) = mpsc::channel(1024); @@ -195,6 +197,7 @@ where last_height, last_timestamp, last_block_created, + predefined_blocks, trigger, timer: DeadlineClock::new(), sync_task_handle, @@ -387,6 +390,50 @@ where Ok(()) } + async fn produce_predefined_block(&mut self, block: &Block) -> anyhow::Result<()> { + let last_block_created = Instant::now(); + // verify signing key is set + if self.signing_key.is_none() { + return Err(anyhow!("unable to produce blocks without a consensus key")) + } + + // Ask the block producer to create the block + let ( + ExecutionResult { + block, + skipped_transactions: _, + tx_status, + events, + }, + changes, + ) = self + .block_producer + .produce_predefined_block(block) + .await? + .into(); + + // Sign the block and seal it + let seal = seal_block(&self.signing_key, &block)?; + let block = SealedBlock { + entity: block, + consensus: seal, + }; + // Import the sealed block + self.block_importer + .commit_result(Uncommitted::new( + ImportResult::new_from_local(block.clone(), tx_status, events), + changes, + )) + .await?; + + // Update last block time + self.last_height = *block.entity.header().height(); + self.last_timestamp = block.entity.header().time(); + self.last_block_created = last_block_created; + + Ok(()) + } + pub(crate) async fn on_txpool_event(&mut self) -> anyhow::Result<()> { match self.trigger { Trigger::Instant => { @@ -461,6 +508,7 @@ where let should_continue; let mut state = self.sync_task_handle.shared.clone(); // make sure we're synced first + tracing::info!("Waiting for the node to be synced"); while *state.borrow_and_update() == SyncState::NotSynced { tokio::select! { biased; @@ -479,6 +527,7 @@ where } } } + tracing::info!("Node is synced"); if let SyncState::Synced(block_header) = &*state.borrow_and_update() { let (last_height, last_timestamp, last_block_created) = @@ -490,6 +539,11 @@ where } } + let blocks = self.predefined_blocks.clone(); + for block in blocks { + self.produce_predefined_block(&block).await?; + } + tokio::select! { biased; _ = watcher.while_started() => { @@ -552,6 +606,7 @@ where I: BlockImporter + 'static, P: P2pPort, { + let predefined_blocks = Vec::new(); Service::new(MainTask::new( last_block, config, @@ -559,6 +614,7 @@ where block_producer, block_importer, p2p_port, + predefined_blocks, )) } diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index a9aacc3531f..c91b100f426 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -1,25 +1,32 @@ #![allow(clippy::arithmetic_side_effects)] +#![allow(non_snake_case)] use crate::{ new_service, ports::{ + BlockProducer, MockBlockImporter, MockBlockProducer, MockP2pPort, MockTransactionPool, + TransactionsSource, }, service::MainTask, Config, Service, Trigger, }; +use async_trait::async_trait; use fuel_core_services::{ stream::pending, Service as StorageTrait, + ServiceRunner, State, }; +use fuel_core_storage::transactional::Changes; use fuel_core_types::{ blockchain::{ + block::Block, header::BlockHeader, primitives::SecretKeyWrapper, SealedBlock, @@ -332,6 +339,7 @@ async fn remove_skipped_transactions() { let p2p_port = generate_p2p_port(); + let predefined_blocks = vec![]; let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -339,6 +347,7 @@ async fn remove_skipped_transactions() { block_producer, block_importer, p2p_port, + predefined_blocks, ); assert!(task.produce_next_block().await.is_ok()); @@ -379,6 +388,7 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { let p2p_port = generate_p2p_port(); + let predefined_blocks = vec![]; let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -386,6 +396,7 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { block_producer, block_importer, p2p_port, + predefined_blocks, ); // simulate some txpool event to see if any block production is erroneously triggered @@ -397,3 +408,107 @@ fn test_signing_key() -> Secret { let secret_key = SecretKey::random(&mut rng); Secret::new(secret_key.into()) } + +#[derive(Debug, PartialEq)] +enum ProduceBlock { + Predefined(Block), + New(BlockHeight, Tai64), +} + +struct FakeBlockProducer { + block_sender: tokio::sync::mpsc::Sender, +} + +impl FakeBlockProducer { + fn new() -> (Self, tokio::sync::mpsc::Receiver) { + let (block_sender, receiver) = tokio::sync::mpsc::channel(100); + (Self { block_sender }, receiver) + } +} + +#[async_trait] +impl BlockProducer for FakeBlockProducer { + async fn produce_and_execute_block( + &self, + height: BlockHeight, + block_time: Tai64, + _source: TransactionsSource, + ) -> anyhow::Result> { + self.block_sender + .send(ProduceBlock::New(height, block_time)) + .await + .unwrap(); + Ok(UncommittedResult::new( + ExecutionResult { + block: Default::default(), + skipped_transactions: Default::default(), + tx_status: Default::default(), + events: Default::default(), + }, + Default::default(), + )) + } + + async fn produce_predefined_block( + &self, + block: &Block, + ) -> anyhow::Result> { + self.block_sender + .send(ProduceBlock::Predefined(block.clone())) + .await + .unwrap(); + Ok(UncommittedResult::new( + ExecutionResult { + block: Default::default(), + skipped_transactions: Default::default(), + tx_status: Default::default(), + events: Default::default(), + }, + Default::default(), + )) + } +} + +#[tokio::test] +async fn consensus_service__run__will_include_predefined_blocks_before_new_blocks() { + // given + let blocks = vec![]; + let (block_producer, mut block_receiver) = FakeBlockProducer::new(); + let last_block = BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()); + let config = Config { + trigger: Trigger::Instant, + signing_key: Some(test_signing_key()), + metrics: false, + ..Default::default() + }; + let mut block_importer = MockBlockImporter::default(); + block_importer.expect_commit_result().returning(|_| Ok(())); + block_importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::empty())); + let task = MainTask::new( + &last_block, + config, + MockTransactionPool::no_tx_updates(), + block_producer, + block_importer, + generate_p2p_port(), + blocks.clone(), + ); + + // when + ServiceRunner::new(task).start().unwrap(); + + // then + for block in blocks { + assert_eq!( + block_receiver.recv().await.unwrap(), + ProduceBlock::Predefined(block) + ); + } + // let maybe_produced_block = block_receiver.recv().await.unwrap(); + // assert!(matches! { + // maybe_produced_block, + // ProduceBlock::New(_, _) + // }); +} From c242410cde8340ec3d77a4374e3d21ea6e836501 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Wed, 14 Aug 2024 19:22:05 +0200 Subject: [PATCH 02/25] Fix test --- Cargo.lock | 1 + .../services/consensus_module/poa/Cargo.toml | 3 +- .../consensus_module/poa/src/service.rs | 59 ++++++++++--------- .../consensus_module/poa/src/service_test.rs | 17 +++++- crates/services/src/service.rs | 1 + 5 files changed, 48 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 61200d32177..130b979cd03 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3162,6 +3162,7 @@ dependencies = [ "tokio", "tokio-stream", "tracing", + "tracing-subscriber", ] [[package]] diff --git a/crates/services/consensus_module/poa/Cargo.toml b/crates/services/consensus_module/poa/Cargo.toml index cd1a4a0d1c8..9b8e5425e81 100644 --- a/crates/services/consensus_module/poa/Cargo.toml +++ b/crates/services/consensus_module/poa/Cargo.toml @@ -22,13 +22,14 @@ tracing = { workspace = true } [dev-dependencies] fuel-core-poa = { path = ".", features = ["test-helpers"] } +fuel-core-services = { workspace = true, features = ["test-helpers"] } fuel-core-storage = { path = "./../../../storage", features = ["test-helpers"] } fuel-core-types = { path = "./../../../types", features = ["test-helpers"] } mockall = { workspace = true } rand = { workspace = true } test-case = { workspace = true } tokio = { workspace = true, features = ["full", "test-util"] } -fuel-core-services = { workspace = true, features = ["test-helpers"] } +tracing-subscriber = { workspace = true } [features] test-helpers = [ diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 17a8ab5aade..b563d3f7a07 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -1,26 +1,21 @@ -use crate::{ - deadline_clock::{ - DeadlineClock, - OnConflict, - }, - ports::{ - BlockImporter, - BlockProducer, - P2pPort, - TransactionPool, - TransactionsSource, - }, - sync::{ - SyncState, - SyncTask, - }, - Config, - Trigger, +use std::{ + ops::Deref, + time::Duration, }; + use anyhow::{ anyhow, Context, }; +use tokio::{ + sync::{ + mpsc, + oneshot, + }, + time::Instant, +}; +use tokio_stream::StreamExt; + use fuel_core_services::{ stream::BoxStream, RunnableService, @@ -61,18 +56,26 @@ use fuel_core_types::{ }, tai64::Tai64, }; -use std::{ - ops::Deref, - time::Duration, -}; -use tokio::{ + +use crate::{ + deadline_clock::{ + DeadlineClock, + OnConflict, + }, + ports::{ + BlockImporter, + BlockProducer, + P2pPort, + TransactionPool, + TransactionsSource, + }, sync::{ - mpsc, - oneshot, + SyncState, + SyncTask, }, - time::Instant, + Config, + Trigger, }; -use tokio_stream::StreamExt; pub type Service = ServiceRunner>; #[derive(Clone)] @@ -508,7 +511,6 @@ where let should_continue; let mut state = self.sync_task_handle.shared.clone(); // make sure we're synced first - tracing::info!("Waiting for the node to be synced"); while *state.borrow_and_update() == SyncState::NotSynced { tokio::select! { biased; @@ -527,7 +529,6 @@ where } } } - tracing::info!("Node is synced"); if let SyncState::Synced(block_header) = &*state.borrow_and_update() { let (last_height, last_timestamp, last_block_created) = diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index c91b100f426..a8fd8bc6d29 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -471,8 +471,10 @@ impl BlockProducer for FakeBlockProducer { #[tokio::test] async fn consensus_service__run__will_include_predefined_blocks_before_new_blocks() { + tracing_subscriber::fmt::init(); + // given - let blocks = vec![]; + let blocks = vec![Block::default(), Block::default(), Block::default()]; let (block_producer, mut block_receiver) = FakeBlockProducer::new(); let last_block = BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()); let config = Config { @@ -486,18 +488,26 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block block_importer .expect_block_stream() .returning(|| Box::pin(tokio_stream::empty())); + let mut rng = StdRng::seed_from_u64(0); + let tx = make_tx(&mut rng); + let TxPoolContext { txpool, txs, .. } = MockTransactionPool::new_with_txs(vec![tx]); + // let watcher = StateWatcher::started(); let task = MainTask::new( &last_block, config, - MockTransactionPool::no_tx_updates(), + txpool, block_producer, block_importer, generate_p2p_port(), blocks.clone(), ); + tracing::info!("Starting service runner"); // when - ServiceRunner::new(task).start().unwrap(); + let service = ServiceRunner::new(task); + service.start().unwrap(); + + tokio::time::sleep(Duration::from_millis(1000)).await; // then for block in blocks { @@ -511,4 +521,5 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block // maybe_produced_block, // ProduceBlock::New(_, _) // }); + drop(txs); } diff --git a/crates/services/src/service.rs b/crates/services/src/service.rs index 5c2b5b5a699..edbf9167e04 100644 --- a/crates/services/src/service.rs +++ b/crates/services/src/service.rs @@ -251,6 +251,7 @@ where let state = Shared::new(sender); let stop_sender = state.clone(); // Spawned as a task to check if the service is already running and to capture any panics. + tracing::info!("Starting the service {}", S::NAME); tokio::task::spawn( async move { tracing::debug!("running"); From a79c3d2df0a4a2319e65cba8fc5dfe2e74c978a0 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Wed, 14 Aug 2024 20:02:33 +0200 Subject: [PATCH 03/25] Add other part of test --- .../consensus_module/poa/src/service_test.rs | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index a8fd8bc6d29..da3b5697dcf 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -203,6 +203,7 @@ impl MockTransactionPool { txpool .expect_transaction_status_events() .returning(move || { + tracing::info!("hit endpoint"); let status_channel = (status_sender_clone.clone(), status_receiver.clone()); let stream = fuel_core_services::stream::unfold( @@ -478,7 +479,9 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block let (block_producer, mut block_receiver) = FakeBlockProducer::new(); let last_block = BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()); let config = Config { - trigger: Trigger::Instant, + trigger: Trigger::Interval { + block_time: Duration::from_millis(100), + }, signing_key: Some(test_signing_key()), metrics: false, ..Default::default() @@ -491,7 +494,6 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block let mut rng = StdRng::seed_from_u64(0); let tx = make_tx(&mut rng); let TxPoolContext { txpool, txs, .. } = MockTransactionPool::new_with_txs(vec![tx]); - // let watcher = StateWatcher::started(); let task = MainTask::new( &last_block, config, @@ -502,13 +504,10 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block blocks.clone(), ); - tracing::info!("Starting service runner"); // when let service = ServiceRunner::new(task); service.start().unwrap(); - tokio::time::sleep(Duration::from_millis(1000)).await; - // then for block in blocks { assert_eq!( @@ -516,10 +515,10 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block ProduceBlock::Predefined(block) ); } - // let maybe_produced_block = block_receiver.recv().await.unwrap(); - // assert!(matches! { - // maybe_produced_block, - // ProduceBlock::New(_, _) - // }); + let maybe_produced_block = block_receiver.recv().await.unwrap(); + assert!(matches! { + maybe_produced_block, + ProduceBlock::New(_, _) + }); drop(txs); } From 062fdc97ba6049d787091e392e8075e50a7f3795 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Wed, 14 Aug 2024 20:04:06 +0200 Subject: [PATCH 04/25] Remove tracing stuff --- Cargo.lock | 1 - crates/services/consensus_module/poa/Cargo.toml | 1 - crates/services/consensus_module/poa/src/service_test.rs | 3 --- crates/services/src/service.rs | 1 - 4 files changed, 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 130b979cd03..61200d32177 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3162,7 +3162,6 @@ dependencies = [ "tokio", "tokio-stream", "tracing", - "tracing-subscriber", ] [[package]] diff --git a/crates/services/consensus_module/poa/Cargo.toml b/crates/services/consensus_module/poa/Cargo.toml index 9b8e5425e81..d5869c207d0 100644 --- a/crates/services/consensus_module/poa/Cargo.toml +++ b/crates/services/consensus_module/poa/Cargo.toml @@ -29,7 +29,6 @@ mockall = { workspace = true } rand = { workspace = true } test-case = { workspace = true } tokio = { workspace = true, features = ["full", "test-util"] } -tracing-subscriber = { workspace = true } [features] test-helpers = [ diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index da3b5697dcf..aaccf2dbd1d 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -203,7 +203,6 @@ impl MockTransactionPool { txpool .expect_transaction_status_events() .returning(move || { - tracing::info!("hit endpoint"); let status_channel = (status_sender_clone.clone(), status_receiver.clone()); let stream = fuel_core_services::stream::unfold( @@ -472,8 +471,6 @@ impl BlockProducer for FakeBlockProducer { #[tokio::test] async fn consensus_service__run__will_include_predefined_blocks_before_new_blocks() { - tracing_subscriber::fmt::init(); - // given let blocks = vec![Block::default(), Block::default(), Block::default()]; let (block_producer, mut block_receiver) = FakeBlockProducer::new(); diff --git a/crates/services/src/service.rs b/crates/services/src/service.rs index edbf9167e04..5c2b5b5a699 100644 --- a/crates/services/src/service.rs +++ b/crates/services/src/service.rs @@ -251,7 +251,6 @@ where let state = Shared::new(sender); let stop_sender = state.clone(); // Spawned as a task to check if the service is already running and to capture any panics. - tracing::info!("Starting the service {}", S::NAME); tokio::task::spawn( async move { tracing::debug!("running"); From 2d3e3daf1be685b696d392cf7ddf5f702b4aef2f Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Wed, 14 Aug 2024 20:09:15 +0200 Subject: [PATCH 05/25] Remove unnecessary drop --- crates/services/consensus_module/poa/src/service_test.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index aaccf2dbd1d..2d2c0607105 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -490,7 +490,7 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block .returning(|| Box::pin(tokio_stream::empty())); let mut rng = StdRng::seed_from_u64(0); let tx = make_tx(&mut rng); - let TxPoolContext { txpool, txs, .. } = MockTransactionPool::new_with_txs(vec![tx]); + let TxPoolContext { txpool, .. } = MockTransactionPool::new_with_txs(vec![tx]); let task = MainTask::new( &last_block, config, @@ -517,5 +517,4 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block maybe_produced_block, ProduceBlock::New(_, _) }); - drop(txs); } From 42ca09afbf5ebe790a589d41e6364de01c331eec Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Thu, 15 Aug 2024 12:14:25 +0200 Subject: [PATCH 06/25] Add test for producing predefined block at producer level --- .../service/adapters/consensus_module/poa.rs | 8 +++ .../services/producer/src/block_producer.rs | 58 ++++++++++++++++++- .../producer/src/block_producer/tests.rs | 43 +++++++++++++- crates/services/producer/src/mocks.rs | 48 ++++++++++++--- 4 files changed, 146 insertions(+), 11 deletions(-) diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index ff2c36d5261..bb36b0362f6 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -24,6 +24,7 @@ use fuel_core_poa::{ use fuel_core_services::stream::BoxStream; use fuel_core_storage::transactional::Changes; use fuel_core_types::{ + blockchain::block::Block, fuel_tx::TxId, fuel_types::BlockHeight, services::{ @@ -120,6 +121,13 @@ impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter { } } } + + async fn produce_predefined_block( + &self, + block: &Block, + ) -> anyhow::Result> { + self.block_producer.produce_and_execute_predefined(block) + } } #[async_trait::async_trait] diff --git a/crates/services/producer/src/block_producer.rs b/crates/services/producer/src/block_producer.rs index e86120473a2..0786182d4e5 100644 --- a/crates/services/producer/src/block_producer.rs +++ b/crates/services/producer/src/block_producer.rs @@ -17,6 +17,7 @@ use fuel_core_storage::transactional::{ }; use fuel_core_types::{ blockchain::{ + block::Block, header::{ ApplicationHeader, ConsensusHeader, @@ -24,7 +25,10 @@ use fuel_core_types::{ }, primitives::DaBlockHeight, }, - fuel_tx::Transaction, + fuel_tx::{ + field::MintGasPrice, + Transaction, + }, fuel_types::{ BlockHeight, Bytes32, @@ -88,6 +92,58 @@ pub struct Producer + Producer +where + ViewProvider: AtomicView + 'static, + ViewProvider::LatestView: BlockProducerDatabase, + ConsensusProvider: ConsensusParametersProvider, +{ + pub async fn produce_and_execute_predefined( + &self, + block: &Block, + ) -> anyhow::Result> + where + Executor: ports::BlockProducer> + 'static, + { + let _production_guard = self.lock.lock().await; + + let source = block.transactions().to_vec(); + + let height = block.header().consensus().height; + + let header = block.header().into(); + + let mint_tx = block + .transactions() + .last() + .and_then(|tx| tx.as_mint()) + .ok_or(anyhow!( + "The last transaction in the block should be a mint transaction" + ))?; + + let gas_price = *mint_tx.gas_price(); + + let component = Components { + header_to_produce: header, + transactions_source: source, + coinbase_recipient: self.config.coinbase_recipient.unwrap_or_default(), + gas_price, + }; + + // Store the context string in case we error. + let context_string = + format!("Failed to produce block {height:?} due to execution failure"); + let result = self + .executor + .produce_without_commit(component) + .map_err(Into::::into) + .context(context_string)?; + + debug!("Produced block with result: {:?}", result.result()); + Ok(result) + } +} impl Producer where diff --git a/crates/services/producer/src/block_producer/tests.rs b/crates/services/producer/src/block_producer/tests.rs index fb2c7859f24..3825cf2a45c 100644 --- a/crates/services/producer/src/block_producer/tests.rs +++ b/crates/services/producer/src/block_producer/tests.rs @@ -6,6 +6,7 @@ use crate::{ GasPriceProvider, MockConsensusParametersProvider, }, + Bytes32, Error, }, mocks::{ @@ -23,6 +24,7 @@ use fuel_core_producer as _; use fuel_core_types::{ blockchain::{ block::{ + Block, CompressedBlock, PartialFuelBlock, }, @@ -33,7 +35,11 @@ use fuel_core_types::{ }, primitives::DaBlockHeight, }, - fuel_tx::ConsensusParameters, + fuel_tx::{ + ConsensusParameters, + Mint, + Transaction, + }, fuel_types::BlockHeight, services::executor::Error as ExecutorError, tai64::Tai64, @@ -534,6 +540,41 @@ mod produce_and_execute_block_txpool { } } +fn block_with_height(height: impl Into) -> Block { + let header = PartialBlockHeader { + consensus: ConsensusHeader { + height: height.into(), + ..Default::default() + }, + application: ApplicationHeader { + da_height: DaBlockHeight::default(), + ..Default::default() + }, + }; + let mint = Transaction::Mint(Mint::default()); + let txs = vec![mint]; + let outbox_message_ids = vec![]; + let event_inbox_root = Bytes32::default(); + Block::new(header, txs, &outbox_message_ids, event_inbox_root).unwrap() +} + +#[tokio::test] +async fn produce_and_execute_predefined_block__happy() { + // given + let height = 1u32; + let block = block_with_height(height); + let executor = MockExecutorWithCapture::default(); + let ctx = TestContext::default_from_executor(executor.clone()); + + let producer = ctx.producer(); + + // when + let result = producer.produce_and_execute_predefined(&block).await; + + // then + assert!(result.is_ok()); +} + struct TestContext { config: Config, db: MockDb, diff --git a/crates/services/producer/src/mocks.rs b/crates/services/producer/src/mocks.rs index 95fe741c34c..191bfdb20ed 100644 --- a/crates/services/producer/src/mocks.rs +++ b/crates/services/producer/src/mocks.rs @@ -24,6 +24,7 @@ use fuel_core_types::{ }, primitives::DaBlockHeight, }, + fuel_tx::Transaction, fuel_types::{ Address, BlockHeight, @@ -50,7 +51,6 @@ use std::{ Mutex, }, }; - // TODO: Replace mocks with `mockall`. #[derive(Default, Clone)] @@ -107,7 +107,7 @@ impl AsRef for MockDb { } } -fn to_block(component: &Components>) -> Block { +fn arc_pool_tx_comp_to_block(component: &Components>) -> Block { let transactions = component .transactions_source .clone() @@ -123,12 +123,23 @@ fn to_block(component: &Components>) -> Block { .unwrap() } +fn tx_comp_to_block(component: &Components>) -> Block { + let transactions = component.transactions_source.clone(); + Block::new( + component.header_to_produce, + transactions, + &[], + Default::default(), + ) + .unwrap() +} + impl BlockProducer> for MockExecutor { fn produce_without_commit( &self, component: Components>, ) -> ExecutorResult> { - let block = to_block(&component); + let block = arc_pool_tx_comp_to_block(&component); // simulate executor inserting a block let mut block_db = self.0.blocks.lock().unwrap(); block_db.insert( @@ -159,7 +170,7 @@ impl BlockProducer> for FailingMockExecutor { if let Some(err) = err.take() { Err(err) } else { - let block = to_block(&component); + let block = arc_pool_tx_comp_to_block(&component); Ok(UncommittedResult::new( ExecutionResult { block, @@ -174,16 +185,35 @@ impl BlockProducer> for FailingMockExecutor { } #[derive(Clone)] -pub struct MockExecutorWithCapture { - pub captured: Arc>>>>, +pub struct MockExecutorWithCapture { + pub captured: Arc>>>>, } -impl BlockProducer> for MockExecutorWithCapture { +impl BlockProducer> for MockExecutorWithCapture { fn produce_without_commit( &self, component: Components>, ) -> ExecutorResult> { - let block = to_block(&component); + let block = arc_pool_tx_comp_to_block(&component); + *self.captured.lock().unwrap() = Some(component); + Ok(UncommittedResult::new( + ExecutionResult { + block, + skipped_transactions: vec![], + tx_status: vec![], + events: vec![], + }, + Default::default(), + )) + } +} + +impl BlockProducer> for MockExecutorWithCapture { + fn produce_without_commit( + &self, + component: Components>, + ) -> ExecutorResult> { + let block = tx_comp_to_block(&component); *self.captured.lock().unwrap() = Some(component); Ok(UncommittedResult::new( ExecutionResult { @@ -197,7 +227,7 @@ impl BlockProducer> for MockExecutorWithCapture { } } -impl Default for MockExecutorWithCapture { +impl Default for MockExecutorWithCapture { fn default() -> Self { Self { captured: Arc::new(Mutex::new(None)), From 2216015eb65a729c8afc854dfeadc6fb9b9494ca Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Thu, 15 Aug 2024 16:57:10 +0200 Subject: [PATCH 07/25] WIP make more resilient --- .../consensus_module/poa/src/service.rs | 132 ++++++++++-------- .../consensus_module/poa/src/service_test.rs | 52 ++++--- 2 files changed, 105 insertions(+), 79 deletions(-) diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index b563d3f7a07..ea2cc8f557a 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -1,12 +1,13 @@ -use std::{ - ops::Deref, - time::Duration, -}; - use anyhow::{ anyhow, Context, }; +use std::{ + collections::HashMap, + ops::Deref, + sync::Arc, + time::Duration, +}; use tokio::{ sync::{ mpsc, @@ -142,7 +143,7 @@ pub struct MainTask { last_height: BlockHeight, last_timestamp: Tai64, last_block_created: Instant, - predefined_blocks: Vec, + predefined_blocks: HashMap, trigger: Trigger, /// Deadline clock, used by the triggers timer: DeadlineClock, @@ -161,7 +162,7 @@ where block_producer: B, block_importer: I, p2p_port: P, - predefined_blocks: Vec, + predefined_blocks: HashMap, ) -> Self { let tx_status_update_stream = txpool.transaction_status_events(); let (request_sender, request_receiver) = mpsc::channel(1024); @@ -209,10 +210,10 @@ where fn extract_block_info(last_block: &BlockHeader) -> (BlockHeight, Tai64, Instant) { let last_timestamp = last_block.time(); - let duration = + let duration_since_last_block = Duration::from_secs(Tai64::now().0.saturating_sub(last_timestamp.0)); let last_block_created = Instant::now() - .checked_sub(duration) + .checked_sub(duration_since_last_block) .unwrap_or(Instant::now()); let last_height = *last_block.height(); (last_height, last_timestamp, last_block_created) @@ -393,7 +394,10 @@ where Ok(()) } - async fn produce_predefined_block(&mut self, block: &Block) -> anyhow::Result<()> { + async fn produce_predefined_block( + &mut self, + predefined_block: &Block, + ) -> anyhow::Result<()> { let last_block_created = Instant::now(); // verify signing key is set if self.signing_key.is_none() { @@ -411,27 +415,27 @@ where changes, ) = self .block_producer - .produce_predefined_block(block) + .produce_predefined_block(predefined_block) .await? .into(); // Sign the block and seal it let seal = seal_block(&self.signing_key, &block)?; - let block = SealedBlock { + let sealed_block = SealedBlock { entity: block, consensus: seal, }; // Import the sealed block self.block_importer .commit_result(Uncommitted::new( - ImportResult::new_from_local(block.clone(), tx_status, events), + ImportResult::new_from_local(sealed_block.clone(), tx_status, events), changes, )) .await?; // Update last block time - self.last_height = *block.entity.header().height(); - self.last_timestamp = block.entity.header().time(); + self.last_height = *sealed_block.entity.header().height(); + self.last_timestamp = sealed_block.entity.header().time(); self.last_block_created = last_block_created; Ok(()) @@ -463,6 +467,15 @@ where } } } + fn update_last_block_values(&mut self, block_header: &Arc) { + let (last_height, last_timestamp, last_block_created) = + Self::extract_block_info(block_header); + if last_height > self.last_height { + self.last_height = last_height; + self.last_timestamp = last_timestamp; + self.last_block_created = last_block_created; + } + } } #[async_trait::async_trait] @@ -509,16 +522,16 @@ where { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { let should_continue; - let mut state = self.sync_task_handle.shared.clone(); + let mut sync_state = self.sync_task_handle.shared.clone(); // make sure we're synced first - while *state.borrow_and_update() == SyncState::NotSynced { + while *sync_state.borrow_and_update() == SyncState::NotSynced { tokio::select! { biased; result = watcher.while_started() => { should_continue = result?.started(); return Ok(should_continue); } - _ = state.changed() => { + _ = sync_state.changed() => { break; } _ = self.tx_status_update_stream.next() => { @@ -530,58 +543,53 @@ where } } - if let SyncState::Synced(block_header) = &*state.borrow_and_update() { - let (last_height, last_timestamp, last_block_created) = - Self::extract_block_info(block_header); - if last_height > self.last_height { - self.last_height = last_height; - self.last_timestamp = last_timestamp; - self.last_block_created = last_block_created; - } + if let SyncState::Synced(block_header) = &*sync_state.borrow_and_update() { + self.update_last_block_values(block_header); } - let blocks = self.predefined_blocks.clone(); - for block in blocks { + let next_height = self.next_height(); + if let Some(block) = self.predefined_blocks.remove(&next_height) { self.produce_predefined_block(&block).await?; - } - - tokio::select! { - biased; - _ = watcher.while_started() => { - should_continue = false; - } - request = self.request_receiver.recv() => { - if let Some(request) = request { - match request { - Request::ManualBlocks((block, response)) => { - let result = self.produce_manual_blocks(block).await; - let _ = response.send(result); + should_continue = true; + } else { + tokio::select! { + biased; + _ = watcher.while_started() => { + should_continue = false; + } + request = self.request_receiver.recv() => { + if let Some(request) = request { + match request { + Request::ManualBlocks((block, response)) => { + let result = self.produce_manual_blocks(block).await; + let _ = response.send(result); + } } + should_continue = true; + } else { + tracing::error!("The PoA task should be the holder of the `Sender`"); + should_continue = false; } - should_continue = true; - } else { - tracing::error!("The PoA task should be the holder of the `Sender`"); - should_continue = false; } - } - // TODO: This should likely be refactored to use something like tokio::sync::Notify. - // Otherwise, if a bunch of txs are submitted at once and all the txs are included - // into the first block production trigger, we'll still call the event handler - // for each tx after they've already been included into a block. - // The poa service also doesn't care about events unrelated to new tx submissions, - // and shouldn't be awoken when txs are completed or squeezed out of the pool. - txpool_event = self.tx_status_update_stream.next() => { - if txpool_event.is_some() { - self.on_txpool_event().await.context("While processing txpool event")?; + // TODO: This should likely be refactored to use something like tokio::sync::Notify. + // Otherwise, if a bunch of txs are submitted at once and all the txs are included + // into the first block production trigger, we'll still call the event handler + // for each tx after they've already been included into a block. + // The poa service also doesn't care about events unrelated to new tx submissions, + // and shouldn't be awoken when txs are completed or squeezed out of the pool. + txpool_event = self.tx_status_update_stream.next() => { + if txpool_event.is_some() { + self.on_txpool_event().await.context("While processing txpool event")?; + should_continue = true; + } else { + should_continue = false; + } + } + at = self.timer.wait() => { + self.on_timer(at).await.context("While processing timer event")?; should_continue = true; - } else { - should_continue = false; } } - at = self.timer.wait() => { - self.on_timer(at).await.context("While processing timer event")?; - should_continue = true; - } } Ok(should_continue) } @@ -607,7 +615,7 @@ where I: BlockImporter + 'static, P: P2pPort, { - let predefined_blocks = Vec::new(); + let predefined_blocks = HashMap::new(); Service::new(MainTask::new( last_block, config, diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index 2d2c0607105..4a4787b5871 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -27,7 +27,10 @@ use fuel_core_storage::transactional::Changes; use fuel_core_types::{ blockchain::{ block::Block, - header::BlockHeader, + header::{ + BlockHeader, + PartialBlockHeader, + }, primitives::SecretKeyWrapper, SealedBlock, }, @@ -54,7 +57,10 @@ use rand::{ SeedableRng, }; use std::{ - collections::HashSet, + collections::{ + HashMap, + HashSet, + }, sync::{ Arc, Mutex as StdMutex, @@ -339,7 +345,7 @@ async fn remove_skipped_transactions() { let p2p_port = generate_p2p_port(); - let predefined_blocks = vec![]; + let predefined_blocks = HashMap::new(); let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -388,7 +394,7 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { let p2p_port = generate_p2p_port(); - let predefined_blocks = vec![]; + let predefined_blocks = HashMap::new(); let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -410,17 +416,17 @@ fn test_signing_key() -> Secret { } #[derive(Debug, PartialEq)] -enum ProduceBlock { +enum FakeProducedBlock { Predefined(Block), New(BlockHeight, Tai64), } struct FakeBlockProducer { - block_sender: tokio::sync::mpsc::Sender, + block_sender: tokio::sync::mpsc::Sender, } impl FakeBlockProducer { - fn new() -> (Self, tokio::sync::mpsc::Receiver) { + fn new() -> (Self, tokio::sync::mpsc::Receiver) { let (block_sender, receiver) = tokio::sync::mpsc::channel(100); (Self { block_sender }, receiver) } @@ -435,7 +441,7 @@ impl BlockProducer for FakeBlockProducer { _source: TransactionsSource, ) -> anyhow::Result> { self.block_sender - .send(ProduceBlock::New(height, block_time)) + .send(FakeProducedBlock::New(height, block_time)) .await .unwrap(); Ok(UncommittedResult::new( @@ -454,12 +460,12 @@ impl BlockProducer for FakeBlockProducer { block: &Block, ) -> anyhow::Result> { self.block_sender - .send(ProduceBlock::Predefined(block.clone())) + .send(FakeProducedBlock::Predefined(block.clone())) .await .unwrap(); Ok(UncommittedResult::new( ExecutionResult { - block: Default::default(), + block: block.clone(), skipped_transactions: Default::default(), tx_status: Default::default(), events: Default::default(), @@ -469,10 +475,23 @@ impl BlockProducer for FakeBlockProducer { } } +fn block_for_height(height: u32) -> Block { + let mut header = PartialBlockHeader::default(); + header.consensus.height = height.into(); + let transactions = vec![]; + Block::new(header, transactions, Default::default(), Default::default()).unwrap() +} + #[tokio::test] async fn consensus_service__run__will_include_predefined_blocks_before_new_blocks() { // given - let blocks = vec![Block::default(), Block::default(), Block::default()]; + let blocks: HashMap<_, _> = [ + (2u32.into(), block_for_height(2)), + (3u32.into(), block_for_height(3)), + (4u32.into(), block_for_height(4)), + ] + .into_iter() + .collect(); let (block_producer, mut block_receiver) = FakeBlockProducer::new(); let last_block = BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()); let config = Config { @@ -506,15 +525,14 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block service.start().unwrap(); // then - for block in blocks { - assert_eq!( - block_receiver.recv().await.unwrap(), - ProduceBlock::Predefined(block) - ); + for (_, block) in blocks { + let expected = FakeProducedBlock::Predefined(block); + let actual = block_receiver.recv().await.unwrap(); + assert_eq!(expected, actual); } let maybe_produced_block = block_receiver.recv().await.unwrap(); assert!(matches! { maybe_produced_block, - ProduceBlock::New(_, _) + FakeProducedBlock::New(_, _) }); } From 1d74d134a0b1dd4bef81899ab9fc350040a17853 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Thu, 15 Aug 2024 17:04:50 +0200 Subject: [PATCH 08/25] Fix dumb test --- .../services/consensus_module/poa/src/service_test.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index 4a4787b5871..d6f9aeeab62 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -485,13 +485,12 @@ fn block_for_height(height: u32) -> Block { #[tokio::test] async fn consensus_service__run__will_include_predefined_blocks_before_new_blocks() { // given - let blocks: HashMap<_, _> = [ + let blocks = [ (2u32.into(), block_for_height(2)), (3u32.into(), block_for_height(3)), (4u32.into(), block_for_height(4)), - ] - .into_iter() - .collect(); + ]; + let blocks_map: HashMap<_, _> = blocks.clone().into_iter().collect(); let (block_producer, mut block_receiver) = FakeBlockProducer::new(); let last_block = BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()); let config = Config { @@ -517,7 +516,7 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block block_producer, block_importer, generate_p2p_port(), - blocks.clone(), + blocks_map, ); // when @@ -525,7 +524,7 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block service.start().unwrap(); // then - for (_, block) in blocks { + for (_, block) in blocks.into_iter() { let expected = FakeProducedBlock::Predefined(block); let actual = block_receiver.recv().await.unwrap(); assert_eq!(expected, actual); From e88f96def377729daa492174bd59d75b05e7bbd5 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Thu, 15 Aug 2024 17:24:33 +0200 Subject: [PATCH 09/25] Report all skipped txs in predefined blocks --- .../consensus_module/poa/src/config.rs | 3 + .../consensus_module/poa/src/service.rs | 70 +++++++++++++------ .../consensus_module/poa/src/service_test.rs | 2 +- 3 files changed, 51 insertions(+), 24 deletions(-) diff --git a/crates/services/consensus_module/poa/src/config.rs b/crates/services/consensus_module/poa/src/config.rs index 83f8596b23d..7da6cad5b80 100644 --- a/crates/services/consensus_module/poa/src/config.rs +++ b/crates/services/consensus_module/poa/src/config.rs @@ -1,5 +1,6 @@ use fuel_core_types::{ blockchain::primitives::SecretKeyWrapper, + fuel_types::ChainId, secrecy::Secret, }; use tokio::time::Duration; @@ -11,6 +12,7 @@ pub struct Config { pub metrics: bool, pub min_connected_reserved_peers: usize, pub time_until_synced: Duration, + pub chain_id: ChainId, } #[cfg(feature = "test-helpers")] @@ -22,6 +24,7 @@ impl Default for Config { metrics: false, min_connected_reserved_peers: 0, time_until_synced: Duration::ZERO, + chain_id: ChainId::default(), } } } diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index ea2cc8f557a..e65b5a12f07 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -17,6 +17,25 @@ use tokio::{ }; use tokio_stream::StreamExt; +use crate::{ + deadline_clock::{ + DeadlineClock, + OnConflict, + }, + ports::{ + BlockImporter, + BlockProducer, + P2pPort, + TransactionPool, + TransactionsSource, + }, + sync::{ + SyncState, + SyncTask, + }, + Config, + Trigger, +}; use fuel_core_services::{ stream::BoxStream, RunnableService, @@ -41,8 +60,12 @@ use fuel_core_types::{ fuel_tx::{ Transaction, TxId, + UniqueIdentifier, + }, + fuel_types::{ + BlockHeight, + ChainId, }, - fuel_types::BlockHeight, secrecy::{ ExposeSecret, Secret, @@ -58,26 +81,6 @@ use fuel_core_types::{ tai64::Tai64, }; -use crate::{ - deadline_clock::{ - DeadlineClock, - OnConflict, - }, - ports::{ - BlockImporter, - BlockProducer, - P2pPort, - TransactionPool, - TransactionsSource, - }, - sync::{ - SyncState, - SyncTask, - }, - Config, - Trigger, -}; - pub type Service = ServiceRunner>; #[derive(Clone)] pub struct SharedState { @@ -148,6 +151,7 @@ pub struct MainTask { /// Deadline clock, used by the triggers timer: DeadlineClock, sync_task_handle: ServiceRunner, + chain_id: ChainId, } impl MainTask @@ -177,6 +181,7 @@ where min_connected_reserved_peers, time_until_synced, trigger, + chain_id, .. } = config; @@ -205,6 +210,7 @@ where trigger, timer: DeadlineClock::new(), sync_task_handle, + chain_id, } } @@ -397,7 +403,9 @@ where async fn produce_predefined_block( &mut self, predefined_block: &Block, + chain_id: &ChainId, ) -> anyhow::Result<()> { + tracing::info!("Producing predefined block"); let last_block_created = Instant::now(); // verify signing key is set if self.signing_key.is_none() { @@ -408,7 +416,7 @@ where let ( ExecutionResult { block, - skipped_transactions: _, + skipped_transactions, tx_status, events, }, @@ -419,6 +427,21 @@ where .await? .into(); + if !skipped_transactions.is_empty() { + tracing::error!("During block production got invalid transactions"); + let txs = predefined_block.transactions(); + + for (tx_id, err) in skipped_transactions { + let maybe_tx = txs.iter().find(|tx| tx.id(chain_id) == tx_id); + if let Some(tx) = maybe_tx { + tracing::error!( + "During block production got invalid transaction {:?} with error {:?}", + tx, + err + ); + } + } + } // Sign the block and seal it let seal = seal_block(&self.signing_key, &block)?; let sealed_block = SealedBlock { @@ -548,8 +571,9 @@ where } let next_height = self.next_height(); + let chain_id = self.chain_id; if let Some(block) = self.predefined_blocks.remove(&next_height) { - self.produce_predefined_block(&block).await?; + self.produce_predefined_block(&block, &chain_id).await?; should_continue = true; } else { tokio::select! { diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index d6f9aeeab62..9c75a2fc9b5 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -524,7 +524,7 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block service.start().unwrap(); // then - for (_, block) in blocks.into_iter() { + for (_, block) in blocks { let expected = FakeProducedBlock::Predefined(block); let actual = block_receiver.recv().await.unwrap(); assert_eq!(expected, actual); From 15215634a1ea5bcd265f3f7d29b99e36bea18da6 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Thu, 15 Aug 2024 18:03:18 +0200 Subject: [PATCH 10/25] Move predefined blocks stuff to a port --- .../consensus_module/poa/src/ports.rs | 4 ++ .../consensus_module/poa/src/service.rs | 33 ++++++++------ .../consensus_module/poa/src/service_test.rs | 43 ++++++++++++++++--- 3 files changed, 61 insertions(+), 19 deletions(-) diff --git a/crates/services/consensus_module/poa/src/ports.rs b/crates/services/consensus_module/poa/src/ports.rs index 875c18eebc4..9383933bdde 100644 --- a/crates/services/consensus_module/poa/src/ports.rs +++ b/crates/services/consensus_module/poa/src/ports.rs @@ -114,3 +114,7 @@ pub trait SyncPort: Send + Sync { /// await synchronization with the peers async fn sync_with_peers(&mut self) -> anyhow::Result<()>; } + +pub trait PredefinedBlocks: Send + Sync { + fn get_block(&self, height: &BlockHeight) -> Option; +} diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index e65b5a12f07..59caa30bcea 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -3,7 +3,6 @@ use anyhow::{ Context, }; use std::{ - collections::HashMap, ops::Deref, sync::Arc, time::Duration, @@ -26,6 +25,7 @@ use crate::{ BlockImporter, BlockProducer, P2pPort, + PredefinedBlocks, TransactionPool, TransactionsSource, }, @@ -81,7 +81,7 @@ use fuel_core_types::{ tai64::Tai64, }; -pub type Service = ServiceRunner>; +pub type Service = ServiceRunner>; #[derive(Clone)] pub struct SharedState { request_sender: mpsc::Sender, @@ -135,7 +135,7 @@ pub(crate) enum RequestType { Trigger, } -pub struct MainTask { +pub struct MainTask { signing_key: Option>, block_producer: B, block_importer: I, @@ -146,7 +146,7 @@ pub struct MainTask { last_height: BlockHeight, last_timestamp: Tai64, last_block_created: Instant, - predefined_blocks: HashMap, + predefined_blocks: PB, trigger: Trigger, /// Deadline clock, used by the triggers timer: DeadlineClock, @@ -154,10 +154,11 @@ pub struct MainTask { chain_id: ChainId, } -impl MainTask +impl MainTask where T: TransactionPool, I: BlockImporter, + PB: PredefinedBlocks, { pub fn new( last_block: &BlockHeader, @@ -166,7 +167,7 @@ where block_producer: B, block_importer: I, p2p_port: P, - predefined_blocks: HashMap, + predefined_blocks: PB, ) -> Self { let tx_status_update_stream = txpool.transaction_status_events(); let (request_sender, request_receiver) = mpsc::channel(1024); @@ -254,11 +255,12 @@ where } } -impl MainTask +impl MainTask where T: TransactionPool, B: BlockProducer, I: BlockImporter, + PB: PredefinedBlocks, { // Request the block producer to make a new block, and return it when ready async fn signal_produce_block( @@ -502,14 +504,14 @@ where } #[async_trait::async_trait] -impl RunnableService for MainTask +impl RunnableService for MainTask where Self: RunnableTask, { const NAME: &'static str = "PoA"; type SharedData = SharedState; - type Task = MainTask; + type Task = MainTask; type TaskParams = (); fn shared_data(&self) -> Self::SharedData { @@ -537,11 +539,12 @@ where } #[async_trait::async_trait] -impl RunnableTask for MainTask +impl RunnableTask for MainTask where T: TransactionPool, B: BlockProducer, I: BlockImporter, + PB: PredefinedBlocks, { async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { let should_continue; @@ -572,7 +575,8 @@ where let next_height = self.next_height(); let chain_id = self.chain_id; - if let Some(block) = self.predefined_blocks.remove(&next_height) { + let maybe_block = self.predefined_blocks.get_block(&next_height); + if let Some(block) = maybe_block { self.produce_predefined_block(&block, &chain_id).await?; should_continue = true; } else { @@ -625,21 +629,22 @@ where } } -pub fn new_service( +pub fn new_service( last_block: &BlockHeader, config: Config, txpool: T, block_producer: B, block_importer: I, p2p_port: P, -) -> Service + predefined_blocks: PB, +) -> Service where T: TransactionPool + 'static, B: BlockProducer + 'static, I: BlockImporter + 'static, + PB: PredefinedBlocks + 'static, P: P2pPort, { - let predefined_blocks = HashMap::new(); Service::new(MainTask::new( last_block, config, diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index 9c75a2fc9b5..03351c1e01d 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -9,6 +9,7 @@ use crate::{ MockBlockProducer, MockP2pPort, MockTransactionPool, + PredefinedBlocks, TransactionsSource, }, service::MainTask, @@ -161,6 +162,8 @@ impl TestContextBuilder { let p2p_port = generate_p2p_port(); + let predefined_blocks = HashMap::new().into(); + let service = new_service( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -168,14 +171,42 @@ impl TestContextBuilder { producer, importer, p2p_port, + predefined_blocks, ); service.start().unwrap(); TestContext { service } } } +pub struct FakePredefinedBlocks { + blocks: HashMap, +} + +impl From> for FakePredefinedBlocks { + fn from(blocks: HashMap) -> Self { + Self::new(blocks) + } +} + +impl FakePredefinedBlocks { + pub fn new(blocks: HashMap) -> Self { + Self { blocks } + } +} + +impl PredefinedBlocks for FakePredefinedBlocks { + fn get_block(&self, height: &BlockHeight) -> Option { + self.blocks.get(height).cloned() + } +} + struct TestContext { - service: Service, + service: Service< + MockTransactionPool, + MockBlockProducer, + MockBlockImporter, + FakePredefinedBlocks, + >, } impl TestContext { @@ -345,7 +376,8 @@ async fn remove_skipped_transactions() { let p2p_port = generate_p2p_port(); - let predefined_blocks = HashMap::new(); + let predefined_blocks: FakePredefinedBlocks = HashMap::new().into(); + let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -394,7 +426,8 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { let p2p_port = generate_p2p_port(); - let predefined_blocks = HashMap::new(); + let predefined_blocks: FakePredefinedBlocks = HashMap::new().into(); + let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), config, @@ -485,7 +518,7 @@ fn block_for_height(height: u32) -> Block { #[tokio::test] async fn consensus_service__run__will_include_predefined_blocks_before_new_blocks() { // given - let blocks = [ + let blocks: [(BlockHeight, Block); 3] = [ (2u32.into(), block_for_height(2)), (3u32.into(), block_for_height(3)), (4u32.into(), block_for_height(4)), @@ -516,7 +549,7 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block block_producer, block_importer, generate_p2p_port(), - blocks_map, + FakePredefinedBlocks::new(blocks_map), ); // when From 4c758799da220796aa568e958ff714a055d13160 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Thu, 15 Aug 2024 18:06:19 +0200 Subject: [PATCH 11/25] Change Fake to simple impl --- .../consensus_module/poa/src/ports.rs | 23 +++++++++++++ .../consensus_module/poa/src/service_test.rs | 32 +++---------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/crates/services/consensus_module/poa/src/ports.rs b/crates/services/consensus_module/poa/src/ports.rs index 9383933bdde..e5fa26fa336 100644 --- a/crates/services/consensus_module/poa/src/ports.rs +++ b/crates/services/consensus_module/poa/src/ports.rs @@ -30,6 +30,7 @@ use fuel_core_types::{ }, tai64::Tai64, }; +use std::collections::HashMap; #[cfg_attr(test, mockall::automock)] pub trait TransactionPool: Send + Sync { @@ -118,3 +119,25 @@ pub trait SyncPort: Send + Sync { pub trait PredefinedBlocks: Send + Sync { fn get_block(&self, height: &BlockHeight) -> Option; } + +pub struct InMemoryPredefinedBlocks { + blocks: HashMap, +} + +impl From> for InMemoryPredefinedBlocks { + fn from(blocks: HashMap) -> Self { + Self::new(blocks) + } +} + +impl InMemoryPredefinedBlocks { + pub fn new(blocks: HashMap) -> Self { + Self { blocks } + } +} + +impl PredefinedBlocks for InMemoryPredefinedBlocks { + fn get_block(&self, height: &BlockHeight) -> Option { + self.blocks.get(height).cloned() + } +} diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index 03351c1e01d..35985e2188a 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -5,11 +5,11 @@ use crate::{ new_service, ports::{ BlockProducer, + InMemoryPredefinedBlocks, MockBlockImporter, MockBlockProducer, MockP2pPort, MockTransactionPool, - PredefinedBlocks, TransactionsSource, }, service::MainTask, @@ -178,34 +178,12 @@ impl TestContextBuilder { } } -pub struct FakePredefinedBlocks { - blocks: HashMap, -} - -impl From> for FakePredefinedBlocks { - fn from(blocks: HashMap) -> Self { - Self::new(blocks) - } -} - -impl FakePredefinedBlocks { - pub fn new(blocks: HashMap) -> Self { - Self { blocks } - } -} - -impl PredefinedBlocks for FakePredefinedBlocks { - fn get_block(&self, height: &BlockHeight) -> Option { - self.blocks.get(height).cloned() - } -} - struct TestContext { service: Service< MockTransactionPool, MockBlockProducer, MockBlockImporter, - FakePredefinedBlocks, + InMemoryPredefinedBlocks, >, } @@ -376,7 +354,7 @@ async fn remove_skipped_transactions() { let p2p_port = generate_p2p_port(); - let predefined_blocks: FakePredefinedBlocks = HashMap::new().into(); + let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into(); let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), @@ -426,7 +404,7 @@ async fn does_not_produce_when_txpool_empty_in_instant_mode() { let p2p_port = generate_p2p_port(); - let predefined_blocks: FakePredefinedBlocks = HashMap::new().into(); + let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into(); let mut task = MainTask::new( &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), @@ -549,7 +527,7 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block block_producer, block_importer, generate_p2p_port(), - FakePredefinedBlocks::new(blocks_map), + InMemoryPredefinedBlocks::new(blocks_map), ); // when From 7c5ffa5336a500c18dc969674a5f8f37f00a57d3 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Thu, 15 Aug 2024 18:07:58 +0200 Subject: [PATCH 12/25] Update CHANGELOG --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1d2eb162853..54e0c47caed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Added +- [2081](https://github.com/FuelLabs/fuel-core/pull/2081): Enable producer to include predefined blocks. + ## [Version 0.32.1] ### Added From 713339210cc91e00411e91c234e97516bf3742dd Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Thu, 15 Aug 2024 18:30:46 +0200 Subject: [PATCH 13/25] Fix compilation errors upstream --- .../service/adapters/consensus_module/poa.rs | 4 +++- crates/fuel-core/src/service/config.rs | 5 +++++ crates/fuel-core/src/service/sub_services.rs | 20 +++++++++++++++---- .../services/producer/src/block_producer.rs | 12 ++++++----- 4 files changed, 31 insertions(+), 10 deletions(-) diff --git a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs index bb36b0362f6..cf5661e5b0f 100644 --- a/crates/fuel-core/src/service/adapters/consensus_module/poa.rs +++ b/crates/fuel-core/src/service/adapters/consensus_module/poa.rs @@ -126,7 +126,9 @@ impl fuel_core_poa::ports::BlockProducer for BlockProducerAdapter { &self, block: &Block, ) -> anyhow::Result> { - self.block_producer.produce_and_execute_predefined(block) + self.block_producer + .produce_and_execute_predefined(block) + .await } } diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index 1c375a4af41..0e84142916a 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -204,6 +204,11 @@ impl From<&Config> for fuel_core_poa::Config { metrics: false, min_connected_reserved_peers: config.min_connected_reserved_peers, time_until_synced: config.time_until_synced, + chain_id: config + .snapshot_reader + .chain_config() + .consensus_parameters + .chain_id(), } } } diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index e94db198c08..f6232001e57 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -42,7 +42,10 @@ use fuel_core_gas_price_service::fuel_gas_price_updater::{ UpdaterMetadata, V0Metadata, }; -use fuel_core_poa::Trigger; +use fuel_core_poa::{ + ports::InMemoryPredefinedBlocks, + Trigger, +}; use fuel_core_services::{ RunnableService, ServiceRunner, @@ -53,13 +56,20 @@ use fuel_core_storage::{ }; #[cfg(feature = "relayer")] use fuel_core_types::blockchain::primitives::DaBlockHeight; -use std::sync::Arc; +use std::{ + collections::HashMap, + sync::Arc, +}; use tokio::sync::Mutex; mod algorithm_updater; -pub type PoAService = - fuel_core_poa::Service; +pub type PoAService = fuel_core_poa::Service< + TxPoolAdapter, + BlockProducerAdapter, + BlockImporterAdapter, + InMemoryPredefinedBlocks, +>; #[cfg(feature = "p2p")] pub type P2PService = fuel_core_p2p::service::Service; pub type TxPoolSharedState = fuel_core_txpool::service::SharedState< @@ -235,6 +245,7 @@ pub fn init_sub_services( tracing::info!("Enabled manual block production because of `debug` flag"); } + let predefined_blocks: InMemoryPredefinedBlocks = HashMap::new().into(); let poa = (production_enabled).then(|| { fuel_core_poa::new_service( &last_block_header, @@ -243,6 +254,7 @@ pub fn init_sub_services( producer_adapter.clone(), importer_adapter.clone(), p2p_adapter.clone(), + predefined_blocks, ) }); let poa_adapter = PoAAdapter::new(poa.as_ref().map(|service| service.shared.clone())); diff --git a/crates/services/producer/src/block_producer.rs b/crates/services/producer/src/block_producer.rs index 0786182d4e5..5737c9687ba 100644 --- a/crates/services/producer/src/block_producer.rs +++ b/crates/services/producer/src/block_producer.rs @@ -101,20 +101,22 @@ where { pub async fn produce_and_execute_predefined( &self, - block: &Block, + predefined_block: &Block, ) -> anyhow::Result> where Executor: ports::BlockProducer> + 'static, { let _production_guard = self.lock.lock().await; - let source = block.transactions().to_vec(); + let source = predefined_block.transactions().to_vec(); - let height = block.header().consensus().height; + let height = predefined_block.header().consensus().height; - let header = block.header().into(); + let block_time = predefined_block.header().consensus().time; - let mint_tx = block + let header = self.new_header(height, block_time).await?; + + let mint_tx = predefined_block .transactions() .last() .and_then(|tx| tx.as_mint()) From 3745ef91b2b4b4dae3a53d7e486d0a0c6d70eaa4 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Thu, 15 Aug 2024 18:37:07 +0200 Subject: [PATCH 14/25] Fix broken test --- tests/tests/snapshot.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/tests/snapshot.rs b/tests/tests/snapshot.rs index 56b6aaa74a3..729709896a6 100644 --- a/tests/tests/snapshot.rs +++ b/tests/tests/snapshot.rs @@ -28,10 +28,10 @@ async fn loads_snapshot() { // setup config let starting_state = StateConfig { last_block: Some(LastBlockConfig { - block_height: (u32::MAX - 1).into(), + block_height: (u32::MAX - 2).into(), da_block_height: DaBlockHeight(u64::MAX), - consensus_parameters_version: u32::MAX - 1, - state_transition_version: u32::MAX - 1, + consensus_parameters_version: u32::MAX - 2, + state_transition_version: u32::MAX - 2, blocks_root, }), ..StateConfig::randomize(&mut rng) @@ -46,15 +46,15 @@ async fn loads_snapshot() { let actual_state = db.read_state_config().unwrap(); let mut expected = starting_state.sorted(); expected.last_block = Some(LastBlockConfig { - block_height: u32::MAX.into(), + block_height: (u32::MAX - 1).into(), da_block_height: DaBlockHeight(u64::MAX), - consensus_parameters_version: u32::MAX, - state_transition_version: u32::MAX, + consensus_parameters_version: u32::MAX - 1, + state_transition_version: u32::MAX - 1, blocks_root: db .on_chain() .latest_view() .unwrap() - .block_header_merkle_root(&u32::MAX.into()) + .block_header_merkle_root(&(u32::MAX - 1).into()) .unwrap(), }); From c653643a6127fe7fd64e70585069842465f86d87 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Fri, 16 Aug 2024 12:39:11 +0200 Subject: [PATCH 15/25] Add tests for all the important fields for predefined blocks --- Cargo.lock | 1 + crates/services/producer/Cargo.toml | 1 + .../block_producer/tests.txt | 7 + .../services/producer/src/block_producer.rs | 18 +- .../producer/src/block_producer/tests.rs | 193 +++++++++++++++--- 5 files changed, 190 insertions(+), 30 deletions(-) create mode 100644 crates/services/producer/proptest-regressions/block_producer/tests.txt diff --git a/Cargo.lock b/Cargo.lock index fc6e314e417..c784e50bd71 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3176,6 +3176,7 @@ dependencies = [ "fuel-core-trace", "fuel-core-types", "mockall", + "proptest", "rand", "tokio", "tokio-rayon", diff --git a/crates/services/producer/Cargo.toml b/crates/services/producer/Cargo.toml index 51c7b3aee53..5a6e8b91fbc 100644 --- a/crates/services/producer/Cargo.toml +++ b/crates/services/producer/Cargo.toml @@ -24,6 +24,7 @@ tracing = { workspace = true } fuel-core-producer = { path = "", features = ["test-helpers"] } fuel-core-trace = { path = "../../trace" } fuel-core-types = { path = "../../types", features = ["test-helpers"] } +proptest = { workspace = true } rand = { workspace = true } [features] diff --git a/crates/services/producer/proptest-regressions/block_producer/tests.txt b/crates/services/producer/proptest-regressions/block_producer/tests.txt new file mode 100644 index 00000000000..458427bc73d --- /dev/null +++ b/crates/services/producer/proptest-regressions/block_producer/tests.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc 61a23b5af5d4aca893e3920e7d507f59bba55610059053d0d0d12661ef347ea4 # shrinks to block = V1(BlockV1 { header: V1(BlockHeaderV1 { application: ApplicationHeader { da_height: DaBlockHeight(0), consensus_parameters_version: 0, state_transition_bytecode_version: 7, generated: GeneratedApplicationFields { transactions_count: 1, message_receipt_count: 0, transactions_root: 167ff38d512ce7cfc6a39f25bf541c65d35b05a50226ab5c43179efc9a3e92e0, message_outbox_root: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, event_inbox_root: 0000000000000000000000000000000000000000000000000000000000000000 } }, consensus: ConsensusHeader { prev_root: 0000000000000000000000000000000000000000000000000000000000000000, height: 00000002, time: Tai64(4611686018427387914), generated: GeneratedConsensusFields { application_hash: 1f7f913de2f5a64478b4138317b3170454d36a66c61689b8a937bd56c8cbcd55 } }, metadata: Some(BlockHeaderMetadata { id: BlockId(57dbabbe1cc7104758f6e20dfe7fe06693aa0e0112bcf3e7662193cc46ed0ede) }) }), transactions: [Mint(Mint { tx_pointer: TxPointer { block_height: 00000000, tx_index: 0 }, input_contract: Contract { utxo_id: UtxoId { tx_id: 0000000000000000000000000000000000000000000000000000000000000000, output_index: 0 }, balance_root: 0000000000000000000000000000000000000000000000000000000000000000, state_root: 0000000000000000000000000000000000000000000000000000000000000000, tx_pointer: TxPointer { block_height: 00000000, tx_index: 0 }, contract_id: 0000000000000000000000000000000000000000000000000000000000000000 }, output_contract: Contract { input_index: 0, balance_root: 0000000000000000000000000000000000000000000000000000000000000000, state_root: 0000000000000000000000000000000000000000000000000000000000000000 }, mint_amount: 0, mint_asset_id: 0000000000000000000000000000000000000000000000000000000000000000, gas_price: 0, metadata: None })] }) diff --git a/crates/services/producer/src/block_producer.rs b/crates/services/producer/src/block_producer.rs index 5737c9687ba..b60be972227 100644 --- a/crates/services/producer/src/block_producer.rs +++ b/crates/services/producer/src/block_producer.rs @@ -114,7 +114,11 @@ where let block_time = predefined_block.header().consensus().time; - let header = self.new_header(height, block_time).await?; + let da_height = predefined_block.header().application().da_height; + + let header = self + .new_header_with_da_height(height, block_time, da_height) + .await?; let mint_tx = predefined_block .transactions() @@ -354,7 +358,17 @@ where Ok(block_header) } - + /// Create the header for a new block at the provided height + async fn new_header_with_da_height( + &self, + height: BlockHeight, + block_time: Tai64, + da_height: DaBlockHeight, + ) -> anyhow::Result { + let mut block_header = self._new_header(height, block_time)?; + block_header.application.da_height = da_height; + Ok(block_header) + } async fn select_new_da_height( &self, gas_limit: u64, diff --git a/crates/services/producer/src/block_producer/tests.rs b/crates/services/producer/src/block_producer/tests.rs index 3825cf2a45c..9182e46d797 100644 --- a/crates/services/producer/src/block_producer/tests.rs +++ b/crates/services/producer/src/block_producer/tests.rs @@ -36,8 +36,10 @@ use fuel_core_types::{ primitives::DaBlockHeight, }, fuel_tx::{ + field::InputContract, ConsensusParameters, Mint, + Script, Transaction, }, fuel_types::BlockHeight, @@ -540,39 +542,132 @@ mod produce_and_execute_block_txpool { } } -fn block_with_height(height: impl Into) -> Block { - let header = PartialBlockHeader { - consensus: ConsensusHeader { - height: height.into(), - ..Default::default() - }, - application: ApplicationHeader { - da_height: DaBlockHeight::default(), - ..Default::default() - }, - }; - let mint = Transaction::Mint(Mint::default()); - let txs = vec![mint]; - let outbox_message_ids = vec![]; - let event_inbox_root = Bytes32::default(); - Block::new(header, txs, &outbox_message_ids, event_inbox_root).unwrap() +use fuel_core_types::fuel_tx::field::MintGasPrice; +use proptest::{ + prop_compose, + proptest, +}; + +prop_compose! { + fn arb_block()(height in 1..255u8, da_height in 1..255u64, gas_price: u64, num_txs in 0..100u32) -> Block { + let mut txs : Vec<_> = (0..num_txs).map(|_| Transaction::Script(Script::default())).collect(); + let mut inner_mint = Mint::default(); + *inner_mint.gas_price_mut() = gas_price; + let mint = Transaction::Mint(inner_mint); + txs.push(mint); + let header = PartialBlockHeader { + consensus: ConsensusHeader { + height: (height as u32).into(), + ..Default::default() + }, + application: ApplicationHeader { + da_height: DaBlockHeight(da_height), + ..Default::default() + }, + }; + let outbox_message_ids = vec![]; + let event_inbox_root = Bytes32::default(); + Block::new(header, txs, &outbox_message_ids, event_inbox_root).unwrap() + } } -#[tokio::test] -async fn produce_and_execute_predefined_block__happy() { - // given - let height = 1u32; - let block = block_with_height(height); - let executor = MockExecutorWithCapture::default(); - let ctx = TestContext::default_from_executor(executor.clone()); +fn ctx_for_block( + block: &Block, + executor: MockExecutorWithCapture, +) -> TestContext> { + let prev_height = block.header().height().pred().unwrap(); + let prev_da_height = block.header().da_height.as_u64() - 1; + let ctx = TestContextBuilder::new() + .with_prev_height(prev_height) + .with_prev_da_height(prev_da_height.into()) + .build_with_executor(executor); + ctx +} + +// gas_price +proptest! { + #[test] + fn produce_and_execute_predefined_block__contains_expected_gas_price(block in arb_block()) { + let rt = multithreaded_runtime(); - let producer = ctx.producer(); + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); - // when - let result = producer.produce_and_execute_predefined(&block).await; + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); - // then - assert!(result.is_ok()); + // then + let expected_gas_price = *block + .transactions().last().and_then(|tx| tx.as_mint()).unwrap().gas_price(); + let captured = executor.captured.lock().unwrap(); + let actual = captured.as_ref().unwrap().gas_price; + assert_eq!(expected_gas_price, actual); + } + + // time + #[test] + fn produce_and_execute_predefined_block__contains_expected_time(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let expected_time = block.header().consensus().time; + let captured = executor.captured.lock().unwrap(); + let actual = captured.as_ref().unwrap().header_to_produce.consensus.time; + assert_eq!(expected_time, actual); + } + + // coinbase + #[test] + fn produce_and_execute_predefined_block__contains_expected_coinbase(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let expected_coinbase = block.transactions().last().and_then(|tx| tx.as_mint()).unwrap().input_contract().contract_id; + let captured = executor.captured.lock().unwrap(); + let actual = captured.as_ref().unwrap().coinbase_recipient; + assert_eq!(expected_coinbase, actual); + } + + // DA height + #[test] + fn produce_and_execute_predefined_block__contains_expected_da_height(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let expected_da_height = block.header().application().da_height; + let captured = executor.captured.lock().unwrap(); + let actual = captured.as_ref().unwrap().header_to_produce.application.da_height; + assert_eq!(expected_da_height, actual); + } +} + +fn multithreaded_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() } struct TestContext { @@ -768,4 +863,46 @@ impl TestContextBuilder { ..TestContext::default_from_db(db) } } + + fn build_with_executor(&self, executor: Ex) -> TestContext { + let da_height = self.prev_da_height; + let previous_block = PartialFuelBlock { + header: PartialBlockHeader { + application: ApplicationHeader { + da_height, + ..Default::default() + }, + consensus: ConsensusHeader { + height: self.prev_height, + ..Default::default() + }, + }, + transactions: vec![], + } + .generate(&[], Default::default()) + .unwrap() + .compress(&Default::default()); + + let db = MockDb { + blocks: Arc::new(Mutex::new( + vec![(self.prev_height, previous_block)] + .into_iter() + .collect(), + )), + consensus_parameters_version: 0, + state_transition_bytecode_version: 0, + }; + + let mock_relayer = MockRelayer { + latest_block_height: self.latest_block_height, + latest_da_blocks_with_costs: self.blocks_with_gas_costs.clone(), + ..Default::default() + }; + + TestContext { + relayer: mock_relayer, + block_gas_limit: self.block_gas_limit.unwrap_or_default(), + ..TestContext::default_from_db_and_executor(db, executor) + } + } } From a1711ad350f8c90a92ed86bd65bd4eff0a588891 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Fri, 16 Aug 2024 12:47:34 +0200 Subject: [PATCH 16/25] Include randomized recipient contract ids --- crates/services/producer/src/block_producer.rs | 16 ++++++++++------ .../producer/src/block_producer/tests.rs | 10 ++++++++-- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/crates/services/producer/src/block_producer.rs b/crates/services/producer/src/block_producer.rs index b60be972227..017cd2afdd6 100644 --- a/crates/services/producer/src/block_producer.rs +++ b/crates/services/producer/src/block_producer.rs @@ -26,7 +26,10 @@ use fuel_core_types::{ primitives::DaBlockHeight, }, fuel_tx::{ - field::MintGasPrice, + field::{ + InputContract, + MintGasPrice, + }, Transaction, }, fuel_types::{ @@ -108,7 +111,7 @@ where { let _production_guard = self.lock.lock().await; - let source = predefined_block.transactions().to_vec(); + let transactions_source = predefined_block.transactions().to_vec(); let height = predefined_block.header().consensus().height; @@ -116,7 +119,7 @@ where let da_height = predefined_block.header().application().da_height; - let header = self + let header_to_produce = self .new_header_with_da_height(height, block_time, da_height) .await?; @@ -129,11 +132,12 @@ where ))?; let gas_price = *mint_tx.gas_price(); + let coinbase_recipient = mint_tx.input_contract().contract_id; let component = Components { - header_to_produce: header, - transactions_source: source, - coinbase_recipient: self.config.coinbase_recipient.unwrap_or_default(), + header_to_produce, + transactions_source, + coinbase_recipient, gas_price, }; diff --git a/crates/services/producer/src/block_producer/tests.rs b/crates/services/producer/src/block_producer/tests.rs index 9182e46d797..8a61b285298 100644 --- a/crates/services/producer/src/block_producer/tests.rs +++ b/crates/services/producer/src/block_producer/tests.rs @@ -35,6 +35,7 @@ use fuel_core_types::{ }, primitives::DaBlockHeight, }, + fuel_tx, fuel_tx::{ field::InputContract, ConsensusParameters, @@ -549,10 +550,15 @@ use proptest::{ }; prop_compose! { - fn arb_block()(height in 1..255u8, da_height in 1..255u64, gas_price: u64, num_txs in 0..100u32) -> Block { + fn arb_block()(height in 1..255u8, da_height in 1..255u64, gas_price: u64, coinbase_recipient: [u8; 32], num_txs in 0..100u32) -> Block { let mut txs : Vec<_> = (0..num_txs).map(|_| Transaction::Script(Script::default())).collect(); let mut inner_mint = Mint::default(); *inner_mint.gas_price_mut() = gas_price; + *inner_mint.input_contract_mut() = fuel_tx::input::contract::Contract{ + contract_id: coinbase_recipient.into(), + ..Default::default() + }; + let mint = Transaction::Mint(inner_mint); txs.push(mint); let header = PartialBlockHeader { @@ -626,7 +632,7 @@ proptest! { // coinbase #[test] - fn produce_and_execute_predefined_block__contains_expected_coinbase(block in arb_block()) { + fn produce_and_execute_predefined_block__contains_expected_coinbase_recipient(block in arb_block()) { let rt = multithreaded_runtime(); // given From ea34fb1a981280d860f90044bf28280db54c5a39 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Fri, 16 Aug 2024 12:52:44 +0200 Subject: [PATCH 17/25] Remove regression --- .../producer/proptest-regressions/block_producer/tests.txt | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 crates/services/producer/proptest-regressions/block_producer/tests.txt diff --git a/crates/services/producer/proptest-regressions/block_producer/tests.txt b/crates/services/producer/proptest-regressions/block_producer/tests.txt deleted file mode 100644 index 458427bc73d..00000000000 --- a/crates/services/producer/proptest-regressions/block_producer/tests.txt +++ /dev/null @@ -1,7 +0,0 @@ -# Seeds for failure cases proptest has generated in the past. It is -# automatically read and these particular cases re-run before any -# novel cases are generated. -# -# It is recommended to check this file in to source control so that -# everyone who runs the test benefits from these saved cases. -cc 61a23b5af5d4aca893e3920e7d507f59bba55610059053d0d0d12661ef347ea4 # shrinks to block = V1(BlockV1 { header: V1(BlockHeaderV1 { application: ApplicationHeader { da_height: DaBlockHeight(0), consensus_parameters_version: 0, state_transition_bytecode_version: 7, generated: GeneratedApplicationFields { transactions_count: 1, message_receipt_count: 0, transactions_root: 167ff38d512ce7cfc6a39f25bf541c65d35b05a50226ab5c43179efc9a3e92e0, message_outbox_root: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, event_inbox_root: 0000000000000000000000000000000000000000000000000000000000000000 } }, consensus: ConsensusHeader { prev_root: 0000000000000000000000000000000000000000000000000000000000000000, height: 00000002, time: Tai64(4611686018427387914), generated: GeneratedConsensusFields { application_hash: 1f7f913de2f5a64478b4138317b3170454d36a66c61689b8a937bd56c8cbcd55 } }, metadata: Some(BlockHeaderMetadata { id: BlockId(57dbabbe1cc7104758f6e20dfe7fe06693aa0e0112bcf3e7662193cc46ed0ede) }) }), transactions: [Mint(Mint { tx_pointer: TxPointer { block_height: 00000000, tx_index: 0 }, input_contract: Contract { utxo_id: UtxoId { tx_id: 0000000000000000000000000000000000000000000000000000000000000000, output_index: 0 }, balance_root: 0000000000000000000000000000000000000000000000000000000000000000, state_root: 0000000000000000000000000000000000000000000000000000000000000000, tx_pointer: TxPointer { block_height: 00000000, tx_index: 0 }, contract_id: 0000000000000000000000000000000000000000000000000000000000000000 }, output_contract: Contract { input_index: 0, balance_root: 0000000000000000000000000000000000000000000000000000000000000000, state_root: 0000000000000000000000000000000000000000000000000000000000000000 }, mint_amount: 0, mint_asset_id: 0000000000000000000000000000000000000000000000000000000000000000, gas_price: 0, metadata: None })] }) From 025473402d3c0edfcd12dc959877d536840ef101 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Fri, 16 Aug 2024 13:00:46 +0200 Subject: [PATCH 18/25] Do not include mint tx in txs source --- .../block_producer/tests.txt | 7 +++++++ crates/services/producer/src/block_producer.rs | 16 ++++++++-------- .../producer/src/block_producer/tests.rs | 18 ++++++++++++++++++ 3 files changed, 33 insertions(+), 8 deletions(-) create mode 100644 crates/services/producer/proptest-regressions/block_producer/tests.txt diff --git a/crates/services/producer/proptest-regressions/block_producer/tests.txt b/crates/services/producer/proptest-regressions/block_producer/tests.txt new file mode 100644 index 00000000000..bd2f192bf33 --- /dev/null +++ b/crates/services/producer/proptest-regressions/block_producer/tests.txt @@ -0,0 +1,7 @@ +# Seeds for failure cases proptest has generated in the past. It is +# automatically read and these particular cases re-run before any +# novel cases are generated. +# +# It is recommended to check this file in to source control so that +# everyone who runs the test benefits from these saved cases. +cc dd33a239ab8e44231c0bdfc62e5712d60afc5404211bdce6bd6404ea6ed143fc # shrinks to block = V1(BlockV1 { header: V1(BlockHeaderV1 { application: ApplicationHeader { da_height: DaBlockHeight(1), consensus_parameters_version: 0, state_transition_bytecode_version: 7, generated: GeneratedApplicationFields { transactions_count: 1, message_receipt_count: 0, transactions_root: 167ff38d512ce7cfc6a39f25bf541c65d35b05a50226ab5c43179efc9a3e92e0, message_outbox_root: e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855, event_inbox_root: 0000000000000000000000000000000000000000000000000000000000000000 } }, consensus: ConsensusHeader { prev_root: 0000000000000000000000000000000000000000000000000000000000000000, height: 00000001, time: Tai64(4611686018427387914), generated: GeneratedConsensusFields { application_hash: 706ec8c8b182f5cc589bda36f4a218e0e4fc5e850bd21ea48194bc5d7b21e827 } }, metadata: Some(BlockHeaderMetadata { id: BlockId(f6e071429d32c8d90c22552c05d489e89d0c4a3d17d4a753efc16210b917882e) }) }), transactions: [Mint(Mint { tx_pointer: TxPointer { block_height: 00000000, tx_index: 0 }, input_contract: Contract { utxo_id: UtxoId { tx_id: 0000000000000000000000000000000000000000000000000000000000000000, output_index: 0 }, balance_root: 0000000000000000000000000000000000000000000000000000000000000000, state_root: 0000000000000000000000000000000000000000000000000000000000000000, tx_pointer: TxPointer { block_height: 00000000, tx_index: 0 }, contract_id: 0000000000000000000000000000000000000000000000000000000000000000 }, output_contract: Contract { input_index: 0, balance_root: 0000000000000000000000000000000000000000000000000000000000000000, state_root: 0000000000000000000000000000000000000000000000000000000000000000 }, mint_amount: 0, mint_asset_id: 0000000000000000000000000000000000000000000000000000000000000000, gas_price: 0, metadata: None })] }) diff --git a/crates/services/producer/src/block_producer.rs b/crates/services/producer/src/block_producer.rs index 017cd2afdd6..2f1ee90e09f 100644 --- a/crates/services/producer/src/block_producer.rs +++ b/crates/services/producer/src/block_producer.rs @@ -111,7 +111,7 @@ where { let _production_guard = self.lock.lock().await; - let transactions_source = predefined_block.transactions().to_vec(); + let mut transactions_source = predefined_block.transactions().to_vec(); let height = predefined_block.header().consensus().height; @@ -123,13 +123,13 @@ where .new_header_with_da_height(height, block_time, da_height) .await?; - let mint_tx = predefined_block - .transactions() - .last() - .and_then(|tx| tx.as_mint()) - .ok_or(anyhow!( - "The last transaction in the block should be a mint transaction" - ))?; + let maybe_mint_tx = transactions_source.pop(); + let mint_tx = + maybe_mint_tx + .and_then(|tx| tx.as_mint().cloned()) + .ok_or(anyhow!( + "The last transaction in the block should be a mint transaction" + ))?; let gas_price = *mint_tx.gas_price(); let coinbase_recipient = mint_tx.input_contract().contract_id; diff --git a/crates/services/producer/src/block_producer/tests.rs b/crates/services/producer/src/block_producer/tests.rs index 8a61b285298..1c6580af8af 100644 --- a/crates/services/producer/src/block_producer/tests.rs +++ b/crates/services/producer/src/block_producer/tests.rs @@ -667,6 +667,24 @@ proptest! { let actual = captured.as_ref().unwrap().header_to_produce.application.da_height; assert_eq!(expected_da_height, actual); } + + #[test] + fn produce_and_execute_predefined_block__do_not_include_original_mint_in_txs_source(block in arb_block()) { + let rt = multithreaded_runtime(); + + // given + let executor = MockExecutorWithCapture::default(); + let ctx = ctx_for_block(&block, executor.clone()); + + //when + let _ = rt.block_on(ctx.producer().produce_and_execute_predefined(&block)).unwrap(); + + // then + let captured = executor.captured.lock().unwrap(); + let txs_source = &captured.as_ref().unwrap().transactions_source; + let has_a_mint = txs_source.iter().any(|tx| matches!(tx, Transaction::Mint(_))); + assert!(!has_a_mint); + } } fn multithreaded_runtime() -> tokio::runtime::Runtime { From 48773ce7760f43551f44c59a6f3171e08eeea821 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Fri, 16 Aug 2024 13:09:44 +0200 Subject: [PATCH 19/25] Appease Clippy-sama --- crates/services/producer/src/block_producer/tests.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/crates/services/producer/src/block_producer/tests.rs b/crates/services/producer/src/block_producer/tests.rs index 1c6580af8af..14cfdc63f2a 100644 --- a/crates/services/producer/src/block_producer/tests.rs +++ b/crates/services/producer/src/block_producer/tests.rs @@ -577,17 +577,17 @@ prop_compose! { } } +#[allow(clippy::arithmetic_side_effects)] fn ctx_for_block( block: &Block, executor: MockExecutorWithCapture, ) -> TestContext> { let prev_height = block.header().height().pred().unwrap(); let prev_da_height = block.header().da_height.as_u64() - 1; - let ctx = TestContextBuilder::new() + TestContextBuilder::new() .with_prev_height(prev_height) .with_prev_da_height(prev_da_height.into()) - .build_with_executor(executor); - ctx + .build_with_executor(executor) } // gas_price From 859bf50f88f55587b9debd3efaddc08951266be3 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Fri, 16 Aug 2024 13:53:49 +0200 Subject: [PATCH 20/25] Include test for alternating predefined blocks --- .../consensus_module/poa/src/service_test.rs | 62 ++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index 35985e2188a..0af85d4f37e 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -494,7 +494,8 @@ fn block_for_height(height: u32) -> Block { } #[tokio::test] -async fn consensus_service__run__will_include_predefined_blocks_before_new_blocks() { +async fn consensus_service__run__will_include_sequential_predefined_blocks_before_new_blocks( +) { // given let blocks: [(BlockHeight, Block); 3] = [ (2u32.into(), block_for_height(2)), @@ -546,3 +547,62 @@ async fn consensus_service__run__will_include_predefined_blocks_before_new_block FakeProducedBlock::New(_, _) }); } + +#[tokio::test] +async fn consensus_service__run__will_insert_predefined_blocks_in_correct_order() { + // given + let blocks: &[Option<(BlockHeight, Block)>] = &[ + Some((2u32.into(), block_for_height(2))), + None, + Some((4u32.into(), block_for_height(4))), + None, + Some((6u32.into(), block_for_height(6))), + None, + ]; + let blocks_map: HashMap<_, _> = blocks.iter().flat_map(|x| x.to_owned()).collect(); + let (block_producer, mut block_receiver) = FakeBlockProducer::new(); + let last_block = BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()); + let config = Config { + trigger: Trigger::Interval { + block_time: Duration::from_millis(100), + }, + signing_key: Some(test_signing_key()), + metrics: false, + ..Default::default() + }; + let mut block_importer = MockBlockImporter::default(); + block_importer.expect_commit_result().returning(|_| Ok(())); + block_importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::empty())); + let mut rng = StdRng::seed_from_u64(0); + let tx = make_tx(&mut rng); + let TxPoolContext { txpool, .. } = MockTransactionPool::new_with_txs(vec![tx]); + let task = MainTask::new( + &last_block, + config, + txpool, + block_producer, + block_importer, + generate_p2p_port(), + InMemoryPredefinedBlocks::new(blocks_map), + ); + + // when + let service = ServiceRunner::new(task); + service.start().unwrap(); + + // then + for maybe_predefined in blocks { + let actual = block_receiver.recv().await.unwrap(); + if let Some((_, block)) = maybe_predefined { + let expected = FakeProducedBlock::Predefined(block.clone()); + assert_eq!(expected, actual); + } else { + assert!(matches! { + actual, + FakeProducedBlock::New(_, _) + }); + } + } +} From b1f38439777b520351d1a7237df569003e3752e8 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Fri, 16 Aug 2024 14:44:49 +0200 Subject: [PATCH 21/25] Improve test, early return :(, improve skipped tx reporting, return error instead of ignore --- Cargo.lock | 2 + .../services/consensus_module/poa/Cargo.toml | 2 + .../consensus_module/poa/src/service.rs | 41 ++++++++----------- .../consensus_module/poa/src/service_test.rs | 18 ++++---- 4 files changed, 33 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1905b045814..8901cc23ae3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3156,6 +3156,8 @@ dependencies = [ "fuel-core-types", "mockall", "rand", + "serde", + "serde_json", "test-case", "tokio", "tokio-stream", diff --git a/crates/services/consensus_module/poa/Cargo.toml b/crates/services/consensus_module/poa/Cargo.toml index d5869c207d0..95a3bf0361c 100644 --- a/crates/services/consensus_module/poa/Cargo.toml +++ b/crates/services/consensus_module/poa/Cargo.toml @@ -19,6 +19,8 @@ fuel-core-types = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true } tracing = { workspace = true } +serde = { workspace = true , features = ["derive"] } +serde_json = { workspace = true } [dev-dependencies] fuel-core-poa = { path = ".", features = ["test-helpers"] } diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 59caa30bcea..1b3bde675be 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -60,12 +60,8 @@ use fuel_core_types::{ fuel_tx::{ Transaction, TxId, - UniqueIdentifier, - }, - fuel_types::{ - BlockHeight, - ChainId, }, + fuel_types::BlockHeight, secrecy::{ ExposeSecret, Secret, @@ -73,6 +69,7 @@ use fuel_core_types::{ services::{ block_importer::ImportResult, executor::{ + Error as ExecutorError, ExecutionResult, UncommittedResult as UncommittedExecutionResult, }, @@ -80,6 +77,7 @@ use fuel_core_types::{ }, tai64::Tai64, }; +use serde::Serialize; pub type Service = ServiceRunner>; #[derive(Clone)] @@ -151,7 +149,6 @@ pub struct MainTask { /// Deadline clock, used by the triggers timer: DeadlineClock, sync_task_handle: ServiceRunner, - chain_id: ChainId, } impl MainTask @@ -182,7 +179,6 @@ where min_connected_reserved_peers, time_until_synced, trigger, - chain_id, .. } = config; @@ -211,7 +207,6 @@ where trigger, timer: DeadlineClock::new(), sync_task_handle, - chain_id, } } @@ -405,7 +400,6 @@ where async fn produce_predefined_block( &mut self, predefined_block: &Block, - chain_id: &ChainId, ) -> anyhow::Result<()> { tracing::info!("Producing predefined block"); let last_block_created = Instant::now(); @@ -430,19 +424,15 @@ where .into(); if !skipped_transactions.is_empty() { - tracing::error!("During block production got invalid transactions"); - let txs = predefined_block.transactions(); - - for (tx_id, err) in skipped_transactions { - let maybe_tx = txs.iter().find(|tx| tx.id(chain_id) == tx_id); - if let Some(tx) = maybe_tx { - tracing::error!( - "During block production got invalid transaction {:?} with error {:?}", - tx, - err + let block_and_skipped = PredefinedBlockWithSkippedTransactions { + block: predefined_block.clone(), + skipped_transactions, + }; + let serialized = serde_json::to_string_pretty(&block_and_skipped)?; + tracing::error!( + "During block production got invalid transactions: BEGIN {} END", + serialized ); - } - } } // Sign the block and seal it let seal = seal_block(&self.signing_key, &block)?; @@ -503,6 +493,12 @@ where } } +#[derive(Serialize)] +struct PredefinedBlockWithSkippedTransactions { + block: Block, + skipped_transactions: Vec<(TxId, ExecutorError)>, +} + #[async_trait::async_trait] impl RunnableService for MainTask where @@ -574,10 +570,9 @@ where } let next_height = self.next_height(); - let chain_id = self.chain_id; let maybe_block = self.predefined_blocks.get_block(&next_height); if let Some(block) = maybe_block { - self.produce_predefined_block(&block, &chain_id).await?; + self.produce_predefined_block(&block).await?; should_continue = true; } else { tokio::select! { diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index 0af85d4f37e..4c0855d6a4e 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -551,15 +551,19 @@ async fn consensus_service__run__will_include_sequential_predefined_blocks_befor #[tokio::test] async fn consensus_service__run__will_insert_predefined_blocks_in_correct_order() { // given - let blocks: &[Option<(BlockHeight, Block)>] = &[ - Some((2u32.into(), block_for_height(2))), + let predefined_blocks: &[Option<(BlockHeight, Block)>] = &[ None, - Some((4u32.into(), block_for_height(4))), + Some((3u32.into(), block_for_height(3))), None, - Some((6u32.into(), block_for_height(6))), + Some((5u32.into(), block_for_height(5))), + None, + Some((7u32.into(), block_for_height(7))), None, ]; - let blocks_map: HashMap<_, _> = blocks.iter().flat_map(|x| x.to_owned()).collect(); + let predefined_blocks_map: HashMap<_, _> = predefined_blocks + .iter() + .flat_map(|x| x.to_owned()) + .collect(); let (block_producer, mut block_receiver) = FakeBlockProducer::new(); let last_block = BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()); let config = Config { @@ -585,7 +589,7 @@ async fn consensus_service__run__will_insert_predefined_blocks_in_correct_order( block_producer, block_importer, generate_p2p_port(), - InMemoryPredefinedBlocks::new(blocks_map), + InMemoryPredefinedBlocks::new(predefined_blocks_map), ); // when @@ -593,7 +597,7 @@ async fn consensus_service__run__will_insert_predefined_blocks_in_correct_order( service.start().unwrap(); // then - for maybe_predefined in blocks { + for maybe_predefined in predefined_blocks { let actual = block_receiver.recv().await.unwrap(); if let Some((_, block)) = maybe_predefined { let expected = FakeProducedBlock::Predefined(block.clone()); From ea26f63c1c9935053fe88b927a07751c22505e96 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Fri, 16 Aug 2024 14:53:03 +0200 Subject: [PATCH 22/25] Fix import order, early return :( --- .../services/consensus_module/poa/Cargo.toml | 4 +- .../consensus_module/poa/src/service.rs | 67 ++++++++++--------- 2 files changed, 36 insertions(+), 35 deletions(-) diff --git a/crates/services/consensus_module/poa/Cargo.toml b/crates/services/consensus_module/poa/Cargo.toml index 95a3bf0361c..8a525196a99 100644 --- a/crates/services/consensus_module/poa/Cargo.toml +++ b/crates/services/consensus_module/poa/Cargo.toml @@ -16,11 +16,11 @@ fuel-core-chain-config = { workspace = true } fuel-core-services = { workspace = true } fuel-core-storage = { workspace = true } fuel-core-types = { workspace = true } +serde = { workspace = true , features = ["derive"] } +serde_json = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true } tracing = { workspace = true } -serde = { workspace = true , features = ["derive"] } -serde_json = { workspace = true } [dev-dependencies] fuel-core-poa = { path = ".", features = ["test-helpers"] } diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 1b3bde675be..43a9878b346 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -574,46 +574,47 @@ where if let Some(block) = maybe_block { self.produce_predefined_block(&block).await?; should_continue = true; - } else { - tokio::select! { - biased; - _ = watcher.while_started() => { - should_continue = false; - } - request = self.request_receiver.recv() => { - if let Some(request) = request { - match request { - Request::ManualBlocks((block, response)) => { - let result = self.produce_manual_blocks(block).await; - let _ = response.send(result); - } + return Ok(should_continue) + } + tokio::select! { + biased; + _ = watcher.while_started() => { + should_continue = false; + } + request = self.request_receiver.recv() => { + if let Some(request) = request { + match request { + Request::ManualBlocks((block, response)) => { + let result = self.produce_manual_blocks(block).await; + let _ = response.send(result); } - should_continue = true; - } else { - tracing::error!("The PoA task should be the holder of the `Sender`"); - should_continue = false; - } - } - // TODO: This should likely be refactored to use something like tokio::sync::Notify. - // Otherwise, if a bunch of txs are submitted at once and all the txs are included - // into the first block production trigger, we'll still call the event handler - // for each tx after they've already been included into a block. - // The poa service also doesn't care about events unrelated to new tx submissions, - // and shouldn't be awoken when txs are completed or squeezed out of the pool. - txpool_event = self.tx_status_update_stream.next() => { - if txpool_event.is_some() { - self.on_txpool_event().await.context("While processing txpool event")?; - should_continue = true; - } else { - should_continue = false; } + should_continue = true; + } else { + tracing::error!("The PoA task should be the holder of the `Sender`"); + should_continue = false; } - at = self.timer.wait() => { - self.on_timer(at).await.context("While processing timer event")?; + } + // TODO: This should likely be refactored to use something like tokio::sync::Notify. + // Otherwise, if a bunch of txs are submitted at once and all the txs are included + // into the first block production trigger, we'll still call the event handler + // for each tx after they've already been included into a block. + // The poa service also doesn't care about events unrelated to new tx submissions, + // and shouldn't be awoken when txs are completed or squeezed out of the pool. + txpool_event = self.tx_status_update_stream.next() => { + if txpool_event.is_some() { + self.on_txpool_event().await.context("While processing txpool event")?; should_continue = true; + } else { + should_continue = false; } } + at = self.timer.wait() => { + self.on_timer(at).await.context("While processing timer event")?; + should_continue = true; + } } + Ok(should_continue) } From 058fdc46ec004e151cbb1f529df76f771222c90d Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Fri, 16 Aug 2024 14:55:46 +0200 Subject: [PATCH 23/25] Fix formatting --- crates/services/consensus_module/poa/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/consensus_module/poa/Cargo.toml b/crates/services/consensus_module/poa/Cargo.toml index 8a525196a99..3ae865c7de2 100644 --- a/crates/services/consensus_module/poa/Cargo.toml +++ b/crates/services/consensus_module/poa/Cargo.toml @@ -16,7 +16,7 @@ fuel-core-chain-config = { workspace = true } fuel-core-services = { workspace = true } fuel-core-storage = { workspace = true } fuel-core-types = { workspace = true } -serde = { workspace = true , features = ["derive"] } +serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true } From 6a2deedc19fac1aeb53ef0c1d381c3cca2a7af28 Mon Sep 17 00:00:00 2001 From: Mitch Turner Date: Fri, 16 Aug 2024 15:01:15 +0200 Subject: [PATCH 24/25] Add minor optimization --- crates/services/producer/src/block_producer.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/services/producer/src/block_producer.rs b/crates/services/producer/src/block_producer.rs index 2f1ee90e09f..4ccb7d57d7e 100644 --- a/crates/services/producer/src/block_producer.rs +++ b/crates/services/producer/src/block_producer.rs @@ -141,14 +141,13 @@ where gas_price, }; - // Store the context string in case we error. - let context_string = - format!("Failed to produce block {height:?} due to execution failure"); let result = self .executor .produce_without_commit(component) .map_err(Into::::into) - .context(context_string)?; + .with_context(|| { + format!("Failed to produce block {height:?} due to execution failure") + })?; debug!("Produced block with result: {:?}", result.result()); Ok(result) From 0d310e5c96a95783bdd4f1b6dc3b89cd3bb20120 Mon Sep 17 00:00:00 2001 From: green Date: Fri, 16 Aug 2024 18:32:59 +0200 Subject: [PATCH 25/25] Disable block production to avoid overflow --- tests/tests/snapshot.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/tests/tests/snapshot.rs b/tests/tests/snapshot.rs index 729709896a6..33a2848dc4a 100644 --- a/tests/tests/snapshot.rs +++ b/tests/tests/snapshot.rs @@ -10,7 +10,10 @@ use fuel_core::{ FuelService, }, }; -use fuel_core_poa::ports::Database; +use fuel_core_poa::{ + ports::Database, + Trigger, +}; use fuel_core_storage::transactional::AtomicView; use fuel_core_types::blockchain::primitives::DaBlockHeight; use rand::{ @@ -28,15 +31,18 @@ async fn loads_snapshot() { // setup config let starting_state = StateConfig { last_block: Some(LastBlockConfig { - block_height: (u32::MAX - 2).into(), + block_height: (u32::MAX - 1).into(), da_block_height: DaBlockHeight(u64::MAX), - consensus_parameters_version: u32::MAX - 2, - state_transition_version: u32::MAX - 2, + consensus_parameters_version: u32::MAX - 1, + state_transition_version: u32::MAX - 1, blocks_root, }), ..StateConfig::randomize(&mut rng) }; - let config = Config::local_node_with_state_config(starting_state.clone()); + // Disable block production + let mut config = Config::local_node_with_state_config(starting_state.clone()); + config.debug = false; + config.block_production = Trigger::Never; // setup server & client let _ = FuelService::from_combined_database(db.clone(), config) @@ -46,15 +52,15 @@ async fn loads_snapshot() { let actual_state = db.read_state_config().unwrap(); let mut expected = starting_state.sorted(); expected.last_block = Some(LastBlockConfig { - block_height: (u32::MAX - 1).into(), + block_height: u32::MAX.into(), da_block_height: DaBlockHeight(u64::MAX), - consensus_parameters_version: u32::MAX - 1, - state_transition_version: u32::MAX - 1, + consensus_parameters_version: u32::MAX, + state_transition_version: u32::MAX, blocks_root: db .on_chain() .latest_view() .unwrap() - .block_header_merkle_root(&(u32::MAX - 1).into()) + .block_header_merkle_root(&u32::MAX.into()) .unwrap(), });