diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f58f41e441..6072923e063 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,9 +6,19 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] -### Fixed +### Added +- [2324](https://github.com/FuelLabs/fuel-core/pull/2324): Added metrics for sync, async processor and for all GraphQL queries. + +## Fixed - [2320](https://github.com/FuelLabs/fuel-core/issues/2320): Prevent `/health` and `/v1/health` from being throttled by the concurrency limiter. - [2322](https://github.com/FuelLabs/fuel-core/issues/2322): Set the salt of genesis contracts to zero on execution. +- [2324](https://github.com/FuelLabs/fuel-core/pull/2324): Ignore peer if we already are syncing transactions from it. + +### Changed + +#### Breaking +- [2311](https://github.com/FuelLabs/fuel-core/pull/2311): Changed the text of the error returned by the executor if gas overflows. + ## [Version 0.38.0] diff --git a/crates/fuel-core/src/graphql_api/api_service.rs b/crates/fuel-core/src/graphql_api/api_service.rs index d1ccbce203e..7d666d0c242 100644 --- a/crates/fuel-core/src/graphql_api/api_service.rs +++ b/crates/fuel-core/src/graphql_api/api_service.rs @@ -57,6 +57,10 @@ use axum::{ Json, Router, }; +use fuel_core_metrics::futures::{ + metered_future::MeteredFuture, + FuturesMetrics, +}; use fuel_core_services::{ RunnableService, RunnableTask, @@ -65,6 +69,7 @@ use fuel_core_services::{ use fuel_core_storage::transactional::AtomicView; use fuel_core_types::fuel_types::BlockHeight; use futures::Stream; +use hyper::rt::Executor; use serde_json::json; use std::{ future::Future, @@ -116,6 +121,23 @@ pub struct Task { server: Pin> + Send + 'static>>, } +#[derive(Clone)] +struct ExecutorWithMetrics { + metric: FuturesMetrics, +} + +impl Executor for ExecutorWithMetrics +where + F: Future + Send + 'static, + F::Output: Send + 'static, +{ + fn execute(&self, fut: F) { + let future = MeteredFuture::new(fut, self.metric.clone()); + + tokio::task::spawn(future); + } +} + #[async_trait::async_trait] impl RunnableService for GraphqlService { const NAME: &'static str = "GraphQL"; @@ -137,9 +159,13 @@ impl RunnableService for GraphqlService { ) -> anyhow::Result { let mut state = state.clone(); let ServerParams { router, listener } = params; + let metric = ExecutorWithMetrics { + metric: FuturesMetrics::obtain_futures_metrics("GraphQLFutures"), + }; let server = axum::Server::from_tcp(listener) .unwrap() + .executor(metric) .serve(router.into_make_service()) .with_graceful_shutdown(async move { state diff --git a/crates/fuel-core/src/schema/contract.rs b/crates/fuel-core/src/schema/contract.rs index cc6bf406d9b..2abc0e53a06 100644 --- a/crates/fuel-core/src/schema/contract.rs +++ b/crates/fuel-core/src/schema/contract.rs @@ -71,7 +71,7 @@ pub struct ContractQuery; #[Object] impl ContractQuery { - #[graphql(complexity = "QUERY_COSTS.storage_read")] + #[graphql(complexity = "QUERY_COSTS.storage_read + child_complexity")] async fn contract( &self, ctx: &Context<'_>, diff --git a/crates/fuel-core/src/service/adapters/txpool.rs b/crates/fuel-core/src/service/adapters/txpool.rs index f88d5ca44d4..c9497b046b9 100644 --- a/crates/fuel-core/src/service/adapters/txpool.rs +++ b/crates/fuel-core/src/service/adapters/txpool.rs @@ -195,7 +195,7 @@ impl fuel_core_txpool::ports::TxPoolPersistentStorage for OnChainIterableKeyValu fn utxo(&self, utxo_id: &UtxoId) -> StorageResult> { self.storage::() .get(utxo_id) - .map(|t| t.map(|t| t.as_ref().clone())) + .map(|t| t.map(|t| t.into_owned())) } fn contract_exist(&self, contract_id: &ContractId) -> StorageResult { @@ -209,7 +209,7 @@ impl fuel_core_txpool::ports::TxPoolPersistentStorage for OnChainIterableKeyValu fn message(&self, id: &Nonce) -> StorageResult> { self.storage::() .get(id) - .map(|t| t.map(|t| t.as_ref().clone())) + .map(|t| t.map(|t| t.into_owned())) } } diff --git a/crates/metrics/Cargo.toml b/crates/metrics/Cargo.toml index 5a2efc0fc6b..7851071b268 100644 --- a/crates/metrics/Cargo.toml +++ b/crates/metrics/Cargo.toml @@ -18,4 +18,4 @@ regex = "1" tracing = { workspace = true } [dev-dependencies] -tokio = { workspace = true, features = ["macros"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] } diff --git a/crates/metrics/src/services.rs b/crates/metrics/src/futures.rs similarity index 50% rename from crates/metrics/src/services.rs rename to crates/metrics/src/futures.rs index b2226f9a024..9204848dcd1 100644 --- a/crates/metrics/src/services.rs +++ b/crates/metrics/src/futures.rs @@ -5,10 +5,13 @@ use prometheus_client::{ }; use std::ops::Deref; -/// The statistic of the service life cycle. -#[derive(Default, Debug)] -pub struct ServicesMetrics { - /// The time spent for real actions by the service. +pub mod future_tracker; +pub mod metered_future; + +/// The statistic of the futures life cycle. +#[derive(Default, Debug, Clone)] +pub struct FuturesMetrics { + /// The time spent for real actions by the future. /// /// Time is in nanoseconds. // TODO: Use `AtomicU128` when it is stable, otherwise, the field can overflow at some point. @@ -20,35 +23,38 @@ pub struct ServicesMetrics { pub idle: Counter, } -impl ServicesMetrics { - pub fn register_service(service_name: &str) -> ServicesMetrics { +impl FuturesMetrics { + pub fn obtain_futures_metrics(futures_name: &str) -> FuturesMetrics { let reg = regex::Regex::new("^[a-zA-Z_:][a-zA-Z0-9_:]*$").expect("It is a valid Regex"); - if !reg.is_match(service_name) { - panic!("The service {} has incorrect name.", service_name); + if !reg.is_match(futures_name) { + panic!("The futures metric {} has incorrect name.", futures_name); } - let lifecycle = ServicesMetrics::default(); + let lifecycle = FuturesMetrics::default(); let mut lock = global_registry().registry.lock(); - // Check that it is a unique service. + // Check that it is a unique futures. let mut encoded_bytes = String::new(); encode(&mut encoded_bytes, lock.deref()) - .expect("Unable to decode service metrics"); + .expect("Unable to decode futures metrics"); - let reg = regex::Regex::new(format!("\\b{}\\b", service_name).as_str()) + let reg = regex::Regex::new(format!("\\b{}\\b", futures_name).as_str()) .expect("It is a valid Regex"); if reg.is_match(encoded_bytes.as_str()) { - tracing::warn!("Service with '{}' name is already registered", service_name); + tracing::warn!( + "Futures metrics with '{}' name is already registered", + futures_name + ); } lock.register( - format!("{}_idle_ns", service_name), - format!("The idle time of the {} service", service_name), + format!("{}_idle_ns", futures_name), + format!("The idle time of the {} future", futures_name), lifecycle.idle.clone(), ); lock.register( - format!("{}_busy_ns", service_name), - format!("The busy time of the {} service", service_name), + format!("{}_busy_ns", futures_name), + format!("The busy time of the {} future", futures_name), lifecycle.busy.clone(), ); @@ -58,6 +64,6 @@ impl ServicesMetrics { #[test] fn register_success() { - ServicesMetrics::register_service("Foo"); - ServicesMetrics::register_service("Bar"); + FuturesMetrics::obtain_futures_metrics("Foo"); + FuturesMetrics::obtain_futures_metrics("Bar"); } diff --git a/crates/metrics/src/future_tracker.rs b/crates/metrics/src/futures/future_tracker.rs similarity index 88% rename from crates/metrics/src/future_tracker.rs rename to crates/metrics/src/futures/future_tracker.rs index a3beb43672c..26f00d1b59c 100644 --- a/crates/metrics/src/future_tracker.rs +++ b/crates/metrics/src/futures/future_tracker.rs @@ -1,3 +1,4 @@ +use crate::futures::FuturesMetrics; use core::{ future::Future, pin::Pin, @@ -13,11 +14,27 @@ use std::time::Instant; #[derive(Debug)] pub struct ExecutionTime { /// The time spent for real action of the future. - pub busy: Duration, + busy: Duration, /// The idle time of the future. - pub idle: Duration, + idle: Duration, /// The output of the future. - pub output: Output, + output: Output, +} + +impl ExecutionTime { + /// Extracts the future output and records the execution report into the metrics. + pub fn extract(self, metric: &FuturesMetrics) -> Output { + // TODO: Use `u128` when `AtomicU128` is stable. + metric.busy.inc_by( + u64::try_from(self.busy.as_nanos()) + .expect("The task doesn't live longer than `u64`"), + ); + metric.idle.inc_by( + u64::try_from(self.idle.as_nanos()) + .expect("The task doesn't live longer than `u64`"), + ); + self.output + } } /// A guard representing a span which has been entered and is currently diff --git a/crates/metrics/src/futures/metered_future.rs b/crates/metrics/src/futures/metered_future.rs new file mode 100644 index 00000000000..e2685ebc9b0 --- /dev/null +++ b/crates/metrics/src/futures/metered_future.rs @@ -0,0 +1,74 @@ +use crate::futures::{ + future_tracker::FutureTracker, + FuturesMetrics, +}; +use std::{ + future::Future, + pin::Pin, + task::{ + Context, + Poll, + }, +}; + +pin_project_lite::pin_project! { + /// Future that tracks its execution with [`FutureTracker`] and reports it back when done + /// to [`FuturesMetrics`]. + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct MeteredFuture { + #[pin] + future: FutureTracker, + metrics: FuturesMetrics, + } +} + +impl MeteredFuture { + /// Create a new `MeteredFuture` with the given future and metrics. + pub fn new(future: F, metrics: FuturesMetrics) -> Self { + Self { + future: FutureTracker::new(future), + metrics, + } + } +} + +impl Future for MeteredFuture +where + F: Future, +{ + type Output = F::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + + match this.future.poll(cx) { + Poll::Ready(output) => { + let output = output.extract(this.metrics); + Poll::Ready(output) + } + Poll::Pending => Poll::Pending, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + #[tokio::test] + async fn hybrid_time_correct() { + let future = async { + tokio::time::sleep(Duration::from_secs(2)).await; + std::thread::sleep(Duration::from_secs(1)); + }; + let metrics = FuturesMetrics::obtain_futures_metrics("test"); + let wrapper_future = MeteredFuture::new(future, metrics.clone()); + let _ = wrapper_future.await; + let busy = Duration::from_nanos(metrics.busy.get()); + let idle = Duration::from_nanos(metrics.idle.get()); + assert_eq!(idle.as_secs(), 2); + assert_eq!(busy.as_secs(), 1); + } +} diff --git a/crates/metrics/src/lib.rs b/crates/metrics/src/lib.rs index 3717f2e9da3..f7e4e22f7ca 100644 --- a/crates/metrics/src/lib.rs +++ b/crates/metrics/src/lib.rs @@ -20,11 +20,10 @@ pub struct GlobalRegistry { } pub mod core_metrics; -pub mod future_tracker; +pub mod futures; pub mod graphql_metrics; pub mod importer; pub mod p2p_metrics; -pub mod services; pub mod txpool_metrics; // recommended bucket defaults for logging response times diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index cc2d0770d66..3ff09da0beb 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -772,9 +772,13 @@ where Instant::now().checked_add(heartbeat_check_interval).expect( "The heartbeat check interval should be small enough to do frequently", ); - let db_heavy_task_processor = - SyncProcessor::new(database_read_threads, 1024 * 10)?; - let tx_pool_heavy_task_processor = AsyncProcessor::new(tx_pool_threads, 32)?; + let db_heavy_task_processor = SyncProcessor::new( + "P2P_DatabaseProcessor", + database_read_threads, + 1024 * 10, + )?; + let tx_pool_heavy_task_processor = + AsyncProcessor::new("P2P_TxPoolLookUpProcessor", tx_pool_threads, 32)?; let request_sender = broadcast.request_sender.clone(); let task = Task { @@ -1563,8 +1567,8 @@ pub mod tests { tx_pool: FakeTxPool, request_receiver, request_sender, - db_heavy_task_processor: SyncProcessor::new(1, 1).unwrap(), - tx_pool_heavy_task_processor: AsyncProcessor::new(1, 1).unwrap(), + db_heavy_task_processor: SyncProcessor::new("Test", 1, 1).unwrap(), + tx_pool_heavy_task_processor: AsyncProcessor::new("Test", 1, 1).unwrap(), broadcast, max_headers_per_request: 0, max_txs_per_request: 100, @@ -1653,8 +1657,8 @@ pub mod tests { next_block_height: FakeBlockImporter.next_block_height(), request_receiver, request_sender, - db_heavy_task_processor: SyncProcessor::new(1, 1).unwrap(), - tx_pool_heavy_task_processor: AsyncProcessor::new(1, 1).unwrap(), + db_heavy_task_processor: SyncProcessor::new("Test", 1, 1).unwrap(), + tx_pool_heavy_task_processor: AsyncProcessor::new("Test", 1, 1).unwrap(), broadcast, max_headers_per_request: 0, max_txs_per_request: 100, @@ -1715,8 +1719,8 @@ pub mod tests { next_block_height, request_receiver, request_sender, - db_heavy_task_processor: SyncProcessor::new(1, 1).unwrap(), - tx_pool_heavy_task_processor: AsyncProcessor::new(1, 1).unwrap(), + db_heavy_task_processor: SyncProcessor::new("Test", 1, 1).unwrap(), + tx_pool_heavy_task_processor: AsyncProcessor::new("Test", 1, 1).unwrap(), broadcast, max_headers_per_request: 0, max_txs_per_request: 100, diff --git a/crates/services/src/async_processor.rs b/crates/services/src/async_processor.rs index 3b364ad96ac..6a5b43f395f 100644 --- a/crates/services/src/async_processor.rs +++ b/crates/services/src/async_processor.rs @@ -1,3 +1,7 @@ +use fuel_core_metrics::futures::{ + metered_future::MeteredFuture, + FuturesMetrics, +}; use std::{ future::Future, sync::Arc, @@ -13,6 +17,7 @@ use tokio::{ /// A processor that can execute async tasks with a limit on the number of tasks that can be /// executed concurrently. pub struct AsyncProcessor { + metric: FuturesMetrics, semaphore: Arc, thread_pool: Option, } @@ -36,6 +41,7 @@ impl AsyncProcessor { /// Create a new `AsyncProcessor` with the given number of threads and the number of pending /// tasks. pub fn new( + metric_name: &str, number_of_threads: usize, number_of_pending_tasks: usize, ) -> anyhow::Result { @@ -51,7 +57,9 @@ impl AsyncProcessor { None }; let semaphore = Arc::new(Semaphore::new(number_of_pending_tasks)); + let metric = FuturesMetrics::obtain_futures_metrics(metric_name); Ok(Self { + metric, thread_pool, semaphore, }) @@ -74,13 +82,15 @@ impl AsyncProcessor { { let permit = reservation.0; let future = async move { - let _drop = permit; - future.await + let permit = permit; + future.await; + drop(permit) }; + let metered_future = MeteredFuture::new(future, self.metric.clone()); if let Some(runtime) = &self.thread_pool { - runtime.spawn(future); + runtime.spawn(metered_future); } else { - tokio::spawn(future); + tokio::spawn(metered_future); } } @@ -110,7 +120,7 @@ mod tests { // Given let number_of_pending_tasks = 1; let heavy_task_processor = - AsyncProcessor::new(1, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); // When let (sender, mut receiver) = tokio::sync::oneshot::channel(); @@ -129,7 +139,7 @@ mod tests { // Given let number_of_pending_tasks = 1; let heavy_task_processor = - AsyncProcessor::new(1, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); let first_spawn_result = heavy_task_processor.try_spawn(async move { sleep(Duration::from_secs(1)); }); @@ -148,7 +158,7 @@ mod tests { fn second_spawn_works_when_first_is_finished() { let number_of_pending_tasks = 1; let heavy_task_processor = - AsyncProcessor::new(1, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); // Given let (sender, receiver) = tokio::sync::oneshot::channel(); @@ -175,7 +185,7 @@ mod tests { // Given let number_of_pending_tasks = 10; let heavy_task_processor = - AsyncProcessor::new(1, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); for _ in 0..number_of_pending_tasks { // When @@ -188,13 +198,14 @@ mod tests { } } - #[test] - fn executes_10_tasks_for_10_seconds_with_one_thread() { + #[tokio::test] + async fn executes_10_tasks_for_10_seconds_with_one_thread() { // Given let number_of_pending_tasks = 10; let number_of_threads = 1; let heavy_task_processor = - AsyncProcessor::new(number_of_threads, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + .unwrap(); // When let (broadcast_sender, mut broadcast_receiver) = @@ -211,19 +222,22 @@ mod tests { drop(broadcast_sender); // Then - futures::executor::block_on(async move { - while broadcast_receiver.recv().await.is_ok() {} - assert!(instant.elapsed() >= Duration::from_secs(10)); - }); + while broadcast_receiver.recv().await.is_ok() {} + assert!(instant.elapsed() >= Duration::from_secs(10)); + let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get()); + assert_eq!(duration.as_secs(), 10); + let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get()); + assert_eq!(duration.as_secs(), 0); } - #[test] - fn executes_10_tasks_for_2_seconds_with_10_thread() { + #[tokio::test] + async fn executes_10_tasks_for_2_seconds_with_10_thread() { // Given let number_of_pending_tasks = 10; let number_of_threads = 10; let heavy_task_processor = - AsyncProcessor::new(number_of_threads, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + .unwrap(); // When let (broadcast_sender, mut broadcast_receiver) = @@ -240,19 +254,22 @@ mod tests { drop(broadcast_sender); // Then - futures::executor::block_on(async move { - while broadcast_receiver.recv().await.is_ok() {} - assert!(instant.elapsed() <= Duration::from_secs(2)); - }); + while broadcast_receiver.recv().await.is_ok() {} + assert!(instant.elapsed() <= Duration::from_secs(2)); + let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get()); + assert_eq!(duration.as_secs(), 10); + let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get()); + assert_eq!(duration.as_secs(), 0); } - #[test] - fn executes_10_tasks_for_2_seconds_with_1_thread() { + #[tokio::test] + async fn executes_10_tasks_for_2_seconds_with_1_thread() { // Given let number_of_pending_tasks = 10; let number_of_threads = 10; let heavy_task_processor = - AsyncProcessor::new(number_of_threads, number_of_pending_tasks).unwrap(); + AsyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + .unwrap(); // When let (broadcast_sender, mut broadcast_receiver) = @@ -269,9 +286,11 @@ mod tests { drop(broadcast_sender); // Then - futures::executor::block_on(async move { - while broadcast_receiver.recv().await.is_ok() {} - assert!(instant.elapsed() <= Duration::from_secs(2)); - }); + while broadcast_receiver.recv().await.is_ok() {} + assert!(instant.elapsed() <= Duration::from_secs(2)); + let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get()); + assert_eq!(duration.as_secs(), 0); + let duration = Duration::from_nanos(heavy_task_processor.metric.idle.get()); + assert_eq!(duration.as_secs(), 10); } } diff --git a/crates/services/src/service.rs b/crates/services/src/service.rs index 5c2b5b5a699..83cb095700a 100644 --- a/crates/services/src/service.rs +++ b/crates/services/src/service.rs @@ -6,9 +6,9 @@ use crate::{ Shared, }; use anyhow::anyhow; -use fuel_core_metrics::{ +use fuel_core_metrics::futures::{ future_tracker::FutureTracker, - services::ServicesMetrics, + FuturesMetrics, }; use futures::FutureExt; use std::any::Any; @@ -141,7 +141,7 @@ where /// Initializes a new `ServiceRunner` containing a `RunnableService` with parameters for underlying `Task` pub fn new_with_params(service: S, params: S::TaskParams) -> Self { let shared = service.shared_data(); - let metric = ServicesMetrics::register_service(S::NAME); + let metric = FuturesMetrics::obtain_futures_metrics(S::NAME); let state = initialize_loop(service, params, metric); Self { shared, state } } @@ -242,7 +242,7 @@ where fn initialize_loop( service: S, params: S::TaskParams, - metric: ServicesMetrics, + metric: FuturesMetrics, ) -> Shared> where S: RunnableService + 'static, @@ -299,7 +299,7 @@ async fn run( service: S, sender: Shared>, params: S::TaskParams, - metric: ServicesMetrics, + metric: FuturesMetrics, ) where S: RunnableService + 'static, { @@ -342,7 +342,7 @@ async fn run( async fn run_task( task: &mut S, mut state: StateWatcher, - metric: &ServicesMetrics, + metric: &FuturesMetrics, ) -> Option> { let mut got_panic = None; @@ -358,18 +358,7 @@ async fn run_task( } let tracked_result = panic_result.expect("Checked the panic above"); - - // TODO: Use `u128` when `AtomicU128` is stable. - metric.busy.inc_by( - u64::try_from(tracked_result.busy.as_nanos()) - .expect("The task doesn't live longer than `u64`"), - ); - metric.idle.inc_by( - u64::try_from(tracked_result.idle.as_nanos()) - .expect("The task doesn't live longer than `u64`"), - ); - - let result = tracked_result.output; + let result = tracked_result.extract(metric); match result { Ok(should_continue) => { diff --git a/crates/services/src/sync_processor.rs b/crates/services/src/sync_processor.rs index 5510ca50f43..36fb53bc2a6 100644 --- a/crates/services/src/sync_processor.rs +++ b/crates/services/src/sync_processor.rs @@ -1,3 +1,7 @@ +use fuel_core_metrics::futures::{ + metered_future::MeteredFuture, + FuturesMetrics, +}; use std::sync::Arc; use tokio::sync::{ OwnedSemaphorePermit, @@ -7,6 +11,7 @@ use tokio::sync::{ /// A processor that can execute sync tasks with a limit on the number of tasks that can /// wait in the queue. The number of threads defines how many tasks can be executed in parallel. pub struct SyncProcessor { + metric: FuturesMetrics, rayon_thread_pool: Option, semaphore: Arc, } @@ -22,6 +27,7 @@ impl SyncProcessor { /// Create a new `SyncProcessor` with the given number of threads and the number of pending /// tasks. pub fn new( + metric_name: &str, number_of_threads: usize, number_of_pending_tasks: usize, ) -> anyhow::Result { @@ -35,7 +41,9 @@ impl SyncProcessor { None }; let semaphore = Arc::new(Semaphore::new(number_of_pending_tasks)); + let metric = FuturesMetrics::obtain_futures_metrics(metric_name); Ok(Self { + metric, rayon_thread_pool, semaphore, }) @@ -57,14 +65,17 @@ impl SyncProcessor { F: FnOnce() + Send + 'static, { let permit = reservation.0; + let sync_future = async move { + // When task started its works we can free the space. + drop(permit); + op() + }; + let metered_future = MeteredFuture::new(sync_future, self.metric.clone()); if let Some(rayon_thread_pool) = &self.rayon_thread_pool { - rayon_thread_pool.spawn_fifo(move || { - // When task started its works we can free the space. - drop(permit); - op() - }); + rayon_thread_pool + .spawn_fifo(move || futures::executor::block_on(metered_future)); } else { - op() + futures::executor::block_on(metered_future) } } @@ -94,7 +105,7 @@ mod tests { // Given let number_of_pending_tasks = 1; let heavy_task_processor = - SyncProcessor::new(1, number_of_pending_tasks).unwrap(); + SyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); // When let (sender, receiver) = tokio::sync::oneshot::channel(); @@ -115,7 +126,7 @@ mod tests { // Given let number_of_pending_tasks = 1; let heavy_task_processor = - SyncProcessor::new(1, number_of_pending_tasks).unwrap(); + SyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); let first_spawn_result = heavy_task_processor.try_spawn(move || { sleep(Duration::from_secs(1)); }); @@ -134,7 +145,7 @@ mod tests { async fn second_spawn_works_when_first_is_finished() { let number_of_pending_tasks = 1; let heavy_task_processor = - SyncProcessor::new(1, number_of_pending_tasks).unwrap(); + SyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); // Given let (sender, receiver) = tokio::sync::oneshot::channel(); @@ -159,7 +170,7 @@ mod tests { // Given let number_of_pending_tasks = 10; let heavy_task_processor = - SyncProcessor::new(1, number_of_pending_tasks).unwrap(); + SyncProcessor::new("Test", 1, number_of_pending_tasks).unwrap(); for _ in 0..number_of_pending_tasks { // When @@ -178,7 +189,8 @@ mod tests { let number_of_pending_tasks = 10; let number_of_threads = 1; let heavy_task_processor = - SyncProcessor::new(number_of_threads, number_of_pending_tasks).unwrap(); + SyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + .unwrap(); // When let (broadcast_sender, mut broadcast_receiver) = @@ -197,6 +209,8 @@ mod tests { // Then while broadcast_receiver.recv().await.is_ok() {} assert!(instant.elapsed() >= Duration::from_secs(10)); + let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get()); + assert_eq!(duration.as_secs(), 10); } #[tokio::test] @@ -205,7 +219,8 @@ mod tests { let number_of_pending_tasks = 10; let number_of_threads = 1; let heavy_task_processor = - SyncProcessor::new(number_of_threads, number_of_pending_tasks).unwrap(); + SyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + .unwrap(); // When let (broadcast_sender, mut broadcast_receiver) = @@ -240,7 +255,8 @@ mod tests { let number_of_pending_tasks = 10; let number_of_threads = 10; let heavy_task_processor = - SyncProcessor::new(number_of_threads, number_of_pending_tasks).unwrap(); + SyncProcessor::new("Test", number_of_threads, number_of_pending_tasks) + .unwrap(); // When let (broadcast_sender, mut broadcast_receiver) = @@ -259,5 +275,7 @@ mod tests { // Then while broadcast_receiver.recv().await.is_ok() {} assert!(instant.elapsed() <= Duration::from_secs(2)); + let duration = Duration::from_nanos(heavy_task_processor.metric.busy.get()); + assert_eq!(duration.as_secs(), 10); } } diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index b59f3360049..f2926495d66 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -75,7 +75,10 @@ use fuel_core_types::{ use futures::StreamExt; use parking_lot::RwLock; use std::{ - collections::VecDeque, + collections::{ + HashSet, + VecDeque, + }, sync::Arc, time::{ SystemTime, @@ -181,7 +184,8 @@ pub struct Task { p2p_sync_process: AsyncProcessor, pruner: TransactionPruner, pool: Shared, - current_height: Arc>, + current_height: Shared, + tx_sync_history: Shared>, shared_state: SharedState, } @@ -190,7 +194,7 @@ impl RunnableService for Task where View: TxPoolPersistentStorage, { - const NAME: &'static str = "TxPoolv2"; + const NAME: &'static str = "TxPool"; type SharedData = SharedState; @@ -503,7 +507,17 @@ where let p2p = self.p2p.clone(); let pool = self.pool.clone(); let txs_insert_sender = self.shared_state.write_pool_requests_sender.clone(); + let tx_sync_history = self.tx_sync_history.clone(); async move { + { + let mut tx_sync_history = tx_sync_history.write(); + + // We already synced with this peer in the past. + if !tx_sync_history.insert(peer_id.clone()) { + return + } + } + let peer_tx_ids = p2p .request_tx_ids(peer_id.clone()) .await @@ -586,6 +600,13 @@ where .tx_status_sender .send_squeezed_out(tx.id(), Error::Removed(RemovedReason::Ttl)); } + + { + // Each time when we prune transactions, clear the history of synchronization + // to have a chance to sync this transaction again from other peers. + let mut tx_sync_history = self.tx_sync_history.write(); + tx_sync_history.clear(); + } } fn process_read(&self, request: ReadPoolRequest) { @@ -717,12 +738,14 @@ where }; let transaction_verifier_process = SyncProcessor::new( + "TxPool_TxVerifierProcessor", config.heavy_work.number_threads_to_verify_transactions, config.heavy_work.size_of_verification_queue, ) .unwrap(); let p2p_sync_process = AsyncProcessor::new( + "TxPool_P2PSynchronizationProcessor", config.heavy_work.number_threads_p2p_sync, config.heavy_work.size_of_p2p_sync_queue, ) @@ -750,5 +773,6 @@ where current_height: Arc::new(RwLock::new(current_height)), pool: Arc::new(RwLock::new(txpool)), shared_state, + tx_sync_history: Default::default(), }) }