Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing the issue with duplicate transaction synchronization processes #2324

Merged
merged 13 commits into from
Oct 11, 2024
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Version 0.38.0]

### Added

- [2324](https://github.com/FuelLabs/fuel-core/pull/2324): Added metrics for sync, async processor and for all GraphQL queries.

### Fixed

- [2324](https://github.com/FuelLabs/fuel-core/pull/2324): Ignore peer if we already are syncing transactions from it.

## [Version 0.38.0]

### Added
- [2309](https://github.com/FuelLabs/fuel-core/pull/2309): Limit number of concurrent queries to the graphql service.
- [2216](https://github.com/FuelLabs/fuel-core/pull/2216): Add more function to the state and task of TxPoolV2 to handle the future interactions with others modules (PoA, BlockProducer, BlockImporter and P2P).
Expand Down
26 changes: 26 additions & 0 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ use axum::{
Json,
Router,
};
use fuel_core_metrics::futures::{
metered_future::MeteredFuture,
FuturesMetrics,
};
use fuel_core_services::{
RunnableService,
RunnableTask,
Expand All @@ -65,6 +69,7 @@ use fuel_core_services::{
use fuel_core_storage::transactional::AtomicView;
use fuel_core_types::fuel_types::BlockHeight;
use futures::Stream;
use hyper::rt::Executor;
use serde_json::json;
use std::{
future::Future,
Expand Down Expand Up @@ -116,6 +121,23 @@ pub struct Task {
server: Pin<Box<dyn Future<Output = hyper::Result<()>> + Send + 'static>>,
}

#[derive(Clone)]
struct ExecutorWithMetrics {
metric: FuturesMetrics,
}

impl<F> Executor<F> for ExecutorWithMetrics
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
fn execute(&self, fut: F) {
let future = MeteredFuture::new(fut, self.metric.clone());

tokio::task::spawn(future);
}
}

#[async_trait::async_trait]
impl RunnableService for GraphqlService {
const NAME: &'static str = "GraphQL";
Expand All @@ -137,9 +159,13 @@ impl RunnableService for GraphqlService {
) -> anyhow::Result<Self::Task> {
let mut state = state.clone();
let ServerParams { router, listener } = params;
let metric = ExecutorWithMetrics {
metric: FuturesMetrics::obtain_futures_metrics("GraphQLFutures"),
};

let server = axum::Server::from_tcp(listener)
.unwrap()
.executor(metric)
.serve(router.into_make_service())
.with_graceful_shutdown(async move {
state
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/schema/contract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ pub struct ContractQuery;

#[Object]
impl ContractQuery {
#[graphql(complexity = "QUERY_COSTS.storage_read")]
#[graphql(complexity = "QUERY_COSTS.storage_read + child_complexity")]
async fn contract(
&self,
ctx: &Context<'_>,
Expand Down
4 changes: 2 additions & 2 deletions crates/fuel-core/src/service/adapters/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl fuel_core_txpool::ports::TxPoolPersistentStorage for OnChainIterableKeyValu
fn utxo(&self, utxo_id: &UtxoId) -> StorageResult<Option<CompressedCoin>> {
self.storage::<Coins>()
.get(utxo_id)
.map(|t| t.map(|t| t.as_ref().clone()))
.map(|t| t.map(|t| t.into_owned()))
}

fn contract_exist(&self, contract_id: &ContractId) -> StorageResult<bool> {
Expand All @@ -209,7 +209,7 @@ impl fuel_core_txpool::ports::TxPoolPersistentStorage for OnChainIterableKeyValu
fn message(&self, id: &Nonce) -> StorageResult<Option<Message>> {
self.storage::<Messages>()
.get(id)
.map(|t| t.map(|t| t.as_ref().clone()))
.map(|t| t.map(|t| t.into_owned()))
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ regex = "1"
tracing = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, features = ["macros"] }
tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] }
44 changes: 25 additions & 19 deletions crates/metrics/src/services.rs → crates/metrics/src/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ use prometheus_client::{
};
use std::ops::Deref;

/// The statistic of the service life cycle.
#[derive(Default, Debug)]
pub struct ServicesMetrics {
/// The time spent for real actions by the service.
pub mod future_tracker;
pub mod metered_future;

/// The statistic of the futures life cycle.
#[derive(Default, Debug, Clone)]
pub struct FuturesMetrics {
/// The time spent for real actions by the future.
///
/// Time is in nanoseconds.
// TODO: Use `AtomicU128` when it is stable, otherwise, the field can overflow at some point.
Expand All @@ -20,35 +23,38 @@ pub struct ServicesMetrics {
pub idle: Counter,
}

impl ServicesMetrics {
pub fn register_service(service_name: &str) -> ServicesMetrics {
impl FuturesMetrics {
pub fn obtain_futures_metrics(futures_name: &str) -> FuturesMetrics {
let reg =
regex::Regex::new("^[a-zA-Z_:][a-zA-Z0-9_:]*$").expect("It is a valid Regex");
if !reg.is_match(service_name) {
panic!("The service {} has incorrect name.", service_name);
if !reg.is_match(futures_name) {
panic!("The futures metric {} has incorrect name.", futures_name);
}
let lifecycle = ServicesMetrics::default();
let lifecycle = FuturesMetrics::default();
let mut lock = global_registry().registry.lock();

// Check that it is a unique service.
// Check that it is a unique futures.
let mut encoded_bytes = String::new();
encode(&mut encoded_bytes, lock.deref())
.expect("Unable to decode service metrics");
.expect("Unable to decode futures metrics");

let reg = regex::Regex::new(format!("\\b{}\\b", service_name).as_str())
let reg = regex::Regex::new(format!("\\b{}\\b", futures_name).as_str())
.expect("It is a valid Regex");
if reg.is_match(encoded_bytes.as_str()) {
tracing::warn!("Service with '{}' name is already registered", service_name);
tracing::warn!(
"Futures metrics with '{}' name is already registered",
futures_name
);
}

lock.register(
format!("{}_idle_ns", service_name),
format!("The idle time of the {} service", service_name),
format!("{}_idle_ns", futures_name),
format!("The idle time of the {} future", futures_name),
lifecycle.idle.clone(),
);
lock.register(
format!("{}_busy_ns", service_name),
format!("The busy time of the {} service", service_name),
format!("{}_busy_ns", futures_name),
format!("The busy time of the {} future", futures_name),
lifecycle.busy.clone(),
);

Expand All @@ -58,6 +64,6 @@ impl ServicesMetrics {

#[test]
fn register_success() {
ServicesMetrics::register_service("Foo");
ServicesMetrics::register_service("Bar");
FuturesMetrics::obtain_futures_metrics("Foo");
FuturesMetrics::obtain_futures_metrics("Bar");
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::futures::FuturesMetrics;
use core::{
future::Future,
pin::Pin,
Expand All @@ -13,11 +14,27 @@ use std::time::Instant;
#[derive(Debug)]
pub struct ExecutionTime<Output> {
/// The time spent for real action of the future.
pub busy: Duration,
busy: Duration,
/// The idle time of the future.
pub idle: Duration,
idle: Duration,
/// The output of the future.
pub output: Output,
output: Output,
}

impl<Output> ExecutionTime<Output> {
/// Extracts the future output and records the execution report into the metrics.
pub fn extract(self, metric: &FuturesMetrics) -> Output {
// TODO: Use `u128` when `AtomicU128` is stable.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we shoould probably have an issue for this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it might be wise to use something like https://docs.rs/portable-atomic for now

metric.busy.inc_by(
u64::try_from(self.busy.as_nanos())
.expect("The task doesn't live longer than `u64`"),
);
metric.idle.inc_by(
u64::try_from(self.idle.as_nanos())
.expect("The task doesn't live longer than `u64`"),
);
self.output
}
}

/// A guard representing a span which has been entered and is currently
Expand Down
74 changes: 74 additions & 0 deletions crates/metrics/src/futures/metered_future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
use crate::futures::{
future_tracker::FutureTracker,
FuturesMetrics,
};
use std::{
future::Future,
pin::Pin,
task::{
Context,
Poll,
},
};

pin_project_lite::pin_project! {
/// Future that tracks its execution with [`FutureTracker`] and reports it back when done
/// to [`FuturesMetrics`].
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct MeteredFuture<F> {
#[pin]
future: FutureTracker<F>,
metrics: FuturesMetrics,
}
}

impl<F> MeteredFuture<F> {
/// Create a new `MeteredFuture` with the given future and metrics.
pub fn new(future: F, metrics: FuturesMetrics) -> Self {
Self {
future: FutureTracker::new(future),
metrics,
}
}
}

impl<F> Future for MeteredFuture<F>
where
F: Future,
{
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();

match this.future.poll(cx) {
Poll::Ready(output) => {
let output = output.extract(this.metrics);
Poll::Ready(output)
}
Poll::Pending => Poll::Pending,
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;

#[tokio::test]
async fn hybrid_time_correct() {
let future = async {
tokio::time::sleep(Duration::from_secs(2)).await;
std::thread::sleep(Duration::from_secs(1));
};
let metrics = FuturesMetrics::obtain_futures_metrics("test");
let wrapper_future = MeteredFuture::new(future, metrics.clone());
let _ = wrapper_future.await;
let busy = Duration::from_nanos(metrics.busy.get());
let idle = Duration::from_nanos(metrics.idle.get());
assert_eq!(idle.as_secs(), 2);
assert_eq!(busy.as_secs(), 1);
}
}
3 changes: 1 addition & 2 deletions crates/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ pub struct GlobalRegistry {
}

pub mod core_metrics;
pub mod future_tracker;
pub mod futures;
pub mod graphql_metrics;
pub mod importer;
pub mod p2p_metrics;
pub mod services;
pub mod txpool_metrics;

// recommended bucket defaults for logging response times
Expand Down
22 changes: 13 additions & 9 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,9 +772,13 @@ where
Instant::now().checked_add(heartbeat_check_interval).expect(
"The heartbeat check interval should be small enough to do frequently",
);
let db_heavy_task_processor =
SyncProcessor::new(database_read_threads, 1024 * 10)?;
let tx_pool_heavy_task_processor = AsyncProcessor::new(tx_pool_threads, 32)?;
let db_heavy_task_processor = SyncProcessor::new(
"P2P_DatabaseProcessor",
database_read_threads,
1024 * 10,
)?;
let tx_pool_heavy_task_processor =
AsyncProcessor::new("P2P_TxPoolLookUpProcessor", tx_pool_threads, 32)?;
let request_sender = broadcast.request_sender.clone();

let task = Task {
Expand Down Expand Up @@ -1563,8 +1567,8 @@ pub mod tests {
tx_pool: FakeTxPool,
request_receiver,
request_sender,
db_heavy_task_processor: SyncProcessor::new(1, 1).unwrap(),
tx_pool_heavy_task_processor: AsyncProcessor::new(1, 1).unwrap(),
db_heavy_task_processor: SyncProcessor::new("Test", 1, 1).unwrap(),
tx_pool_heavy_task_processor: AsyncProcessor::new("Test", 1, 1).unwrap(),
broadcast,
max_headers_per_request: 0,
max_txs_per_request: 100,
Expand Down Expand Up @@ -1653,8 +1657,8 @@ pub mod tests {
next_block_height: FakeBlockImporter.next_block_height(),
request_receiver,
request_sender,
db_heavy_task_processor: SyncProcessor::new(1, 1).unwrap(),
tx_pool_heavy_task_processor: AsyncProcessor::new(1, 1).unwrap(),
db_heavy_task_processor: SyncProcessor::new("Test", 1, 1).unwrap(),
tx_pool_heavy_task_processor: AsyncProcessor::new("Test", 1, 1).unwrap(),
broadcast,
max_headers_per_request: 0,
max_txs_per_request: 100,
Expand Down Expand Up @@ -1715,8 +1719,8 @@ pub mod tests {
next_block_height,
request_receiver,
request_sender,
db_heavy_task_processor: SyncProcessor::new(1, 1).unwrap(),
tx_pool_heavy_task_processor: AsyncProcessor::new(1, 1).unwrap(),
db_heavy_task_processor: SyncProcessor::new("Test", 1, 1).unwrap(),
tx_pool_heavy_task_processor: AsyncProcessor::new("Test", 1, 1).unwrap(),
broadcast,
max_headers_per_request: 0,
max_txs_per_request: 100,
Expand Down
Loading
Loading