Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metrics counter for finished spawned tasks #4481

Merged
merged 3 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions crates/tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use futures_util::{
future::{select, BoxFuture},
pin_mut, Future, FutureExt, TryFutureExt,
};
use metrics::IncCounterOnDrop;
use std::{
any::Any,
fmt::{Display, Formatter},
Expand Down Expand Up @@ -271,9 +272,16 @@ impl TaskExecutor {
{
let on_shutdown = self.on_shutdown.clone();

let task = async move {
pin_mut!(fut);
let _ = select(on_shutdown, fut).await;
// Clone only the specific counter that we need.
let finished_regular_tasks_metrics = self.metrics.finished_regular_tasks.clone();
// Wrap the original future to increment the finished tasks counter upon completion
let task = {
async move {
// Create an instance of IncCounterOnDrop with the counter to increment
let _inc_counter_on_drop = IncCounterOnDrop::new(finished_regular_tasks_metrics);
pin_mut!(fut);
let _ = select(on_shutdown, fut).await;
}
}
.in_current_span();

Expand Down Expand Up @@ -341,7 +349,11 @@ impl TaskExecutor {
})
.in_current_span();

// Clone only the specific counter that we need.
let finished_critical_tasks_metrics = self.metrics.finished_critical_tasks.clone();
let task = async move {
// Create an instance of IncCounterOnDrop with the counter to increment
let _inc_counter_on_drop = IncCounterOnDrop::new(finished_critical_tasks_metrics);
pin_mut!(task);
let _ = select(on_shutdown, task).await;
};
Expand Down Expand Up @@ -403,7 +415,7 @@ impl TaskExecutor {

impl TaskSpawner for TaskExecutor {
fn spawn(&self, fut: BoxFuture<'static, ()>) -> JoinHandle<()> {
self.metrics.inc_regular_task();
self.metrics.inc_regular_tasks();
self.spawn(fut)
}

Expand Down
23 changes: 21 additions & 2 deletions crates/tasks/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,36 @@ use reth_metrics::{metrics::Counter, Metrics};
pub struct TaskExecutorMetrics {
/// Number of spawned critical tasks
pub(crate) critical_tasks: Counter,

/// Number of finished spawned critical tasks
pub(crate) finished_critical_tasks: Counter,
/// Number of spawned regular tasks
pub(crate) regular_tasks: Counter,
/// Number of finished spawned regular tasks
pub(crate) finished_regular_tasks: Counter,
}

impl TaskExecutorMetrics {
pub(crate) fn inc_critical_tasks(&self) {
self.critical_tasks.increment(1);
}

pub(crate) fn inc_regular_task(&self) {
pub(crate) fn inc_regular_tasks(&self) {
self.regular_tasks.increment(1);
}
}

/// Helper type for increasing counters even if a task fails.
pub struct IncCounterOnDrop(Counter);

impl IncCounterOnDrop {
/// Create a new `IncCounterOnDrop`.
pub fn new(counter: Counter) -> Self {
IncCounterOnDrop(counter)
}
}

impl Drop for IncCounterOnDrop {
fn drop(&mut self) {
self.0.increment(1);
}
}
Loading