Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stabilize worker_total_busy_duration #6899

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 44 additions & 22 deletions tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,31 @@
//! This file contains mocks of the types in src/runtime/metrics

use std::thread::ThreadId;
use crate::runtime::WorkerMetrics;
use std::sync::atomic::Ordering::Relaxed;
use std::time::{Duration, Instant};

pub(crate) struct SchedulerMetrics {}

pub(crate) struct WorkerMetrics {}
/// The `MetricsBatch` struct in this mock implementation provides a minimal,
/// simplified version of `batch::MetricsBatch`. It contains only the basic fields
/// required to track the total busy duration (`busy_duration_total`) .
///
/// This mock is used to stabilize the API `worker_total_busy_duration`
/// without relying on the full metrics collection logic. In the real implementation,
/// additional fields provide more detailed tracking of worker activity.
///
/// This mock can be further enriched when stabilizing other worker metrics, such as
/// `worker_thread_id`, `worker_park_count` and so on
///
/// When more worker metrics are stabilized, we can remove this mock and switch back
/// to `batch::MetricsBatch`
pub(crate) struct MetricsBatch {
/// The total busy duration in nanoseconds.
busy_duration_total: u64,

pub(crate) struct MetricsBatch {}
/// Instant at which work last resumed (continued after park).
processing_scheduled_tasks_started_at: Instant,
}

#[derive(Clone, Default)]
pub(crate) struct HistogramBuilder {}
Expand All @@ -20,32 +39,31 @@ impl SchedulerMetrics {
pub(crate) fn inc_remote_schedule_count(&self) {}
}

impl WorkerMetrics {
pub(crate) fn new() -> Self {
Self {}
}

pub(crate) fn from_config(config: &crate::runtime::Config) -> Self {
// Prevent the dead-code warning from being triggered
let _ = &config.metrics_poll_count_histogram;
Self::new()
}

pub(crate) fn set_queue_depth(&self, _len: usize) {}
pub(crate) fn set_thread_id(&self, _thread_id: ThreadId) {}
}

impl MetricsBatch {
pub(crate) fn new(_: &WorkerMetrics) -> Self {
Self {}
let now = Instant::now();

MetricsBatch {
busy_duration_total: 0,
processing_scheduled_tasks_started_at: now,
}
}

pub(crate) fn submit(&mut self, _to: &WorkerMetrics, _mean_poll_time: u64) {}
pub(crate) fn submit(&mut self, worker: &WorkerMetrics, _mean_poll_time: u64) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand correctly that this function duplicates part of the submit function in batch::MetricsBatch?

I think this is a problematic way of gradually stabilizing metrics, as it opens the possibility of having divirging implementations if a change is made to the "real" MetricsBatch by someone who doesn't realise that there is another one.

This is additionally confusing because this effectively becomes the "stable" implementation, but it lives in a module called mock.

I would propose that we instead split the metrics::MetricsBatch implementation into stable (always compiles) and unstable (gated by cfg option), the same way we've done elsewhere in this PR. The same as with another comment, we would group all the unstable functions into a single cfg_unstable_metrics! block.

worker
.busy_duration_total
.store(self.busy_duration_total, Relaxed);
}
pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn unparked(&mut self) {}
pub(crate) fn inc_local_schedule_count(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {}
pub(crate) fn start_processing_scheduled_tasks(&mut self) {
self.processing_scheduled_tasks_started_at = Instant::now();
}
pub(crate) fn end_processing_scheduled_tasks(&mut self) {
let busy_duration = self.processing_scheduled_tasks_started_at.elapsed();
self.busy_duration_total += duration_as_u64(busy_duration);
}
pub(crate) fn start_poll(&mut self) {}
pub(crate) fn end_poll(&mut self) {}
}
Expand All @@ -57,3 +75,7 @@ cfg_rt_multi_thread! {
pub(crate) fn incr_overflow_count(&mut self) {}
}
}

fn duration_as_u64(dur: Duration) -> u64 {
u64::try_from(dur.as_nanos()).unwrap_or(u64::MAX)
}
7 changes: 5 additions & 2 deletions tokio/src/runtime/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ cfg_unstable_metrics! {

mod histogram;
pub(crate) use histogram::{Histogram, HistogramBatch, HistogramBuilder};

#[allow(unreachable_pub)] // rust-lang/rust#57411
pub use histogram::{HistogramScale, HistogramConfiguration, LogHistogram, LogHistogramBuilder, InvalidHistogramConfiguration};

Expand All @@ -34,7 +35,9 @@ cfg_unstable_metrics! {
}

cfg_not_unstable_metrics! {
mod mock;
mod worker;
pub(crate) use worker::WorkerMetrics;

pub(crate) use mock::{SchedulerMetrics, WorkerMetrics, MetricsBatch, HistogramBuilder};
mod mock;
pub(crate) use mock::{SchedulerMetrics, MetricsBatch, HistogramBuilder};
}
104 changes: 54 additions & 50 deletions tokio/src/runtime/metrics/runtime.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use crate::runtime::Handle;
#[allow(unused_imports)]
use std::time::Duration;

cfg_64bit_metrics! {
use std::sync::atomic::Ordering::Relaxed;
}

cfg_unstable_metrics! {
use std::ops::Range;
use std::thread::ThreadId;
cfg_64bit_metrics! {
use std::sync::atomic::Ordering::Relaxed;
}
use std::time::Duration;
}

/// Handle to the runtime's metrics.
Expand Down Expand Up @@ -96,6 +98,54 @@ impl RuntimeMetrics {
self.handle.inner.injection_queue_depth()
}

cfg_64bit_metrics! {
/// Returns the amount of time the given worker thread has been busy.
///
/// The worker busy duration starts at zero when the runtime is created and
/// increases whenever the worker is spending time processing work. Using
/// this value can indicate the load of the given worker. If a lot of time
/// is spent busy, then the worker is under load and will check for inbound
/// events less often.
///
/// The timer is monotonically increasing. It is never decremented or reset
/// to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_total_busy_duration(0);
/// println!("worker 0 was busy for a total of {:?}", n);
/// }
/// ```
pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.worker_metrics(worker)
.busy_duration_total
.load(Relaxed);
Duration::from_nanos(nanos)
}
}

cfg_unstable_metrics! {

/// Returns the number of additional threads spawned by the runtime.
Expand Down Expand Up @@ -543,52 +593,6 @@ impl RuntimeMetrics {
.load(Relaxed)
}

/// Returns the amount of time the given worker thread has been busy.
///
/// The worker busy duration starts at zero when the runtime is created and
/// increases whenever the worker is spending time processing work. Using
/// this value can indicate the load of the given worker. If a lot of time
/// is spent busy, then the worker is under load and will check for inbound
/// events less often.
///
/// The timer is monotonically increasing. It is never decremented or reset
/// to zero.
///
/// # Arguments
///
/// `worker` is the index of the worker being queried. The given value must
/// be between 0 and `num_workers()`. The index uniquely identifies a single
/// worker and will continue to identify the worker throughout the lifetime
/// of the runtime instance.
///
/// # Panics
///
/// The method panics when `worker` represents an invalid worker, i.e. is
/// greater than or equal to `num_workers()`.
///
/// # Examples
///
/// ```
/// use tokio::runtime::Handle;
///
/// #[tokio::main]
/// async fn main() {
/// let metrics = Handle::current().metrics();
///
/// let n = metrics.worker_total_busy_duration(0);
/// println!("worker 0 was busy for a total of {:?}", n);
/// }
/// ```
pub fn worker_total_busy_duration(&self, worker: usize) -> Duration {
let nanos = self
.handle
.inner
.worker_metrics(worker)
.busy_duration_total
.load(Relaxed);
Duration::from_nanos(nanos)
}

/// Returns the number of tasks scheduled from **within** the runtime on the
/// given worker's local queue.
///
Expand Down
59 changes: 47 additions & 12 deletions tokio/src/runtime/metrics/worker.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::runtime::metrics::Histogram;
use crate::runtime::Config;
use crate::util::metric_atomics::{MetricAtomicU64, MetricAtomicUsize};
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Mutex;
use std::thread::ThreadId;

cfg_unstable_metrics! {
use crate::runtime::metrics::Histogram;
}

/// Retrieve runtime worker metrics.
///
/// **Note**: This is an [unstable API][unstable]. The public API of this type
Expand All @@ -15,40 +18,60 @@ use std::thread::ThreadId;
#[derive(Debug, Default)]
#[repr(align(128))]
pub(crate) struct WorkerMetrics {
#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary? Since this isn't a public method, it won't appear in the documentation.

/// Number of times the worker parked.
pub(crate) park_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of times the worker parked and unparked.
pub(crate) park_unpark_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of times the worker woke then parked again without doing work.
pub(crate) noop_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of tasks the worker stole.
pub(crate) steal_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of times the worker stole
pub(crate) steal_operations: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of tasks the worker polled.
pub(crate) poll_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// EWMA task poll time, in nanoseconds.
pub(crate) mean_poll_time: MetricAtomicU64,

/// Amount of time the worker spent doing work vs. parking.
pub(crate) busy_duration_total: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of tasks scheduled for execution on the worker's local queue.
pub(crate) local_schedule_count: MetricAtomicU64,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// Number of tasks moved from the local queue to the global queue to free space.
pub(crate) overflow_count: MetricAtomicU64,

/// Number of tasks currently in the local queue. Used only by the
/// current-thread scheduler.
pub(crate) queue_depth: MetricAtomicUsize,

#[cfg(tokio_unstable)]
#[cfg_attr(docsrs, doc(cfg(tokio_unstable)))]
/// If `Some`, tracks the number of polls by duration range.
pub(super) poll_count_histogram: Option<Histogram>,

Expand All @@ -57,29 +80,41 @@ pub(crate) struct WorkerMetrics {
}

impl WorkerMetrics {
pub(crate) fn from_config(config: &Config) -> WorkerMetrics {
let mut worker_metrics = WorkerMetrics::new();
worker_metrics.poll_count_histogram = config
.metrics_poll_count_histogram
.as_ref()
.map(|histogram_builder| histogram_builder.build());
worker_metrics
cfg_unstable_metrics! {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better if we grouped all the unstable functions together at the bottom of the impl block, instead of spreading them out.

pub(crate) fn from_config(config: &Config) -> WorkerMetrics {
let mut worker_metrics = WorkerMetrics::new();
worker_metrics.poll_count_histogram = config
.metrics_poll_count_histogram
.as_ref()
.map(|histogram_builder| histogram_builder.build());
worker_metrics
}
}

cfg_not_unstable_metrics! {
pub(crate) fn from_config(_: &Config) -> WorkerMetrics {
WorkerMetrics::new()
}
}

pub(crate) fn new() -> WorkerMetrics {
WorkerMetrics::default()
}

pub(crate) fn queue_depth(&self) -> usize {
self.queue_depth.load(Relaxed)
cfg_unstable_metrics! {
pub(crate) fn queue_depth(&self) -> usize {
self.queue_depth.load(Relaxed)
}
}

pub(crate) fn set_queue_depth(&self, len: usize) {
self.queue_depth.store(len, Relaxed);
}

pub(crate) fn thread_id(&self) -> Option<ThreadId> {
*self.thread_id.lock().unwrap()
cfg_unstable_metrics! {
pub(crate) fn thread_id(&self) -> Option<ThreadId> {
*self.thread_id.lock().unwrap()
}
}

pub(crate) fn set_thread_id(&self, thread_id: ThreadId) {
Expand Down
10 changes: 5 additions & 5 deletions tokio/src/runtime/scheduler/current_thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,11 @@ impl Handle {
pub(crate) fn injection_queue_depth(&self) -> usize {
self.shared.inject.len()
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
}
}

cfg_unstable_metrics! {
Expand All @@ -574,11 +579,6 @@ cfg_unstable_metrics! {
&self.shared.scheduler_metrics
}

pub(crate) fn worker_metrics(&self, worker: usize) -> &WorkerMetrics {
assert_eq!(0, worker);
&self.shared.worker_metrics
}

pub(crate) fn worker_local_queue_depth(&self, worker: usize) -> usize {
self.worker_metrics(worker).queue_depth()
}
Expand Down
Loading
Loading