diff --git a/console-api/proto/tasks.proto b/console-api/proto/tasks.proto index 6d7c58d07..f676629de 100644 --- a/console-api/proto/tasks.proto +++ b/console-api/proto/tasks.proto @@ -59,6 +59,12 @@ message TaskDetails { // A histogram plus additional data. DurationHistogram histogram = 4; } + + // A histogram of task scheduled durations. + // + // The scheduled duration is the time a task spends between being + // woken and when it is next polled. + DurationHistogram scheduled_times_histogram = 5; } // Data recorded when a new task is spawned. diff --git a/console-api/src/generated/rs.tokio.console.tasks.rs b/console-api/src/generated/rs.tokio.console.tasks.rs index 8bdf62c3f..47da6dfef 100644 --- a/console-api/src/generated/rs.tokio.console.tasks.rs +++ b/console-api/src/generated/rs.tokio.console.tasks.rs @@ -43,6 +43,12 @@ pub struct TaskDetails { /// The timestamp for when the update to the task took place. #[prost(message, optional, tag="2")] pub now: ::core::option::Option<::prost_types::Timestamp>, + /// A histogram of task scheduled durations. + /// + /// The scheduled duration is the time a task spends between being + /// woken and when it is next polled. + #[prost(message, optional, tag="5")] + pub scheduled_times_histogram: ::core::option::Option, /// A histogram of task poll durations. /// /// This is either: diff --git a/console-subscriber/examples/long_sleep.rs b/console-subscriber/examples/long_sleep.rs new file mode 100644 index 000000000..d2ee48583 --- /dev/null +++ b/console-subscriber/examples/long_sleep.rs @@ -0,0 +1,48 @@ +use std::time::Duration; + +use console_subscriber::ConsoleLayer; +use tokio::task::{self, yield_now}; +use tracing::info; + +#[tokio::main(flavor = "multi_thread", worker_threads = 2)] +async fn main() -> Result<(), Box> { + ConsoleLayer::builder() + .with_default_env() + .publish_interval(Duration::from_millis(100)) + .init(); + + let long_sleeps = task::Builder::new() + .name("long-sleeps") + .spawn(long_sleeps(5000)) + .unwrap(); + + let sleep_forever = task::Builder::new() + .name("sleep-forever") + .spawn(sleep_forever(5000)) + .unwrap(); + + match (long_sleeps.await, sleep_forever.await) { + (Ok(_), Ok(_)) => info!("Success"), + (_, _) => info!("Error awaiting tasks."), + } + + tokio::time::sleep(Duration::from_millis(200)).await; + + Ok(()) +} + +async fn long_sleeps(inc: u64) { + let millis = inc; + loop { + std::thread::sleep(Duration::from_millis(millis)); + + yield_now().await; + } +} + +async fn sleep_forever(inc: u64) { + let millis = inc; + loop { + std::thread::sleep(Duration::from_millis(millis)); + } +} diff --git a/console-subscriber/src/aggregator/mod.rs b/console-subscriber/src/aggregator/mod.rs index cf126b1df..7359970b5 100644 --- a/console-subscriber/src/aggregator/mod.rs +++ b/console-subscriber/src/aggregator/mod.rs @@ -327,6 +327,7 @@ impl Aggregator { task_id: Some(id.clone().into()), now, poll_times_histogram: Some(stats.poll_duration_histogram()), + scheduled_times_histogram: Some(stats.scheduled_duration_histogram()), }) { self.details_watchers @@ -374,6 +375,7 @@ impl Aggregator { task_id: Some(id.clone().into()), now: Some(self.base_time.to_timestamp(Instant::now())), poll_times_histogram: Some(task_stats.poll_duration_histogram()), + scheduled_times_histogram: Some(task_stats.scheduled_duration_histogram()), }; watchers.retain(|watch| watch.update(&details)); !watchers.is_empty() diff --git a/console-subscriber/src/builder.rs b/console-subscriber/src/builder.rs index 1e0819dde..ace47db7f 100644 --- a/console-subscriber/src/builder.rs +++ b/console-subscriber/src/builder.rs @@ -50,6 +50,12 @@ pub struct Builder { /// Any polls exceeding this duration will be clamped to this value. Higher /// values will result in more memory usage. pub(super) poll_duration_max: Duration, + + /// The maximum value for the task scheduled duration histogram. + /// + /// Any scheduled times exceeding this duration will be clamped to this + /// value. Higher values will result in more memory usage. + pub(super) scheduled_duration_max: Duration, } impl Default for Builder { @@ -60,6 +66,7 @@ impl Default for Builder { publish_interval: ConsoleLayer::DEFAULT_PUBLISH_INTERVAL, retention: ConsoleLayer::DEFAULT_RETENTION, poll_duration_max: ConsoleLayer::DEFAULT_POLL_DURATION_MAX, + scheduled_duration_max: ConsoleLayer::DEFAULT_SCHEDULED_DURATION_MAX, server_addr: ServerAddr::Tcp(SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT)), recording_path: None, filter_env_var: "RUST_LOG".to_string(), @@ -235,6 +242,23 @@ impl Builder { } } + /// Sets the maximum value for task scheduled duration histograms. + /// + /// Any scheduled duration (the time from a task being woken until it is next + /// polled) exceeding this value will be clamped down to this duration + /// and recorded as an outlier. + /// + /// By default, this is [one second]. Higher values will increase per-task + /// memory usage. + /// + /// [one second]: ConsoleLayer::DEFAULT_SCHEDULED_DURATION_MAX + pub fn scheduled_duration_histogram_max(self, max: Duration) -> Self { + Self { + scheduled_duration_max: max, + ..self + } + } + /// Sets whether tasks, resources, and async ops from the console /// subscriber thread are recorded. /// diff --git a/console-subscriber/src/lib.rs b/console-subscriber/src/lib.rs index d91ee392a..29b85bd75 100644 --- a/console-subscriber/src/lib.rs +++ b/console-subscriber/src/lib.rs @@ -123,6 +123,11 @@ pub struct ConsoleLayer { /// /// By default, this is one second. max_poll_duration_nanos: u64, + + /// Maximum value for the scheduled time histogram. + /// + /// By default, this is one second. + max_scheduled_duration_nanos: u64, } /// A gRPC [`Server`] that implements the [`tokio-console` wire format][wire]. @@ -273,6 +278,7 @@ impl ConsoleLayer { ?config.recording_path, ?config.filter_env_var, ?config.poll_duration_max, + ?config.scheduled_duration_max, ?base_time, "configured console subscriber" ); @@ -310,6 +316,7 @@ impl ConsoleLayer { recorder, base_time, max_poll_duration_nanos: config.poll_duration_max.as_nanos() as u64, + max_scheduled_duration_nanos: config.scheduled_duration_max.as_nanos() as u64, }; (layer, server) } @@ -365,6 +372,15 @@ impl ConsoleLayer { /// See also [`Builder::poll_duration_histogram_max`]. pub const DEFAULT_POLL_DURATION_MAX: Duration = Duration::from_secs(1); + /// The default maximum value for the task scheduled duration histogram. + /// + /// Any scheduled duration (the time from a task being woken until it is next + /// polled) exceeding this will be clamped to this value. By default, the + /// maximum scheduled duration is one second. + /// + /// See also [`Builder::scheduled_duration_histogram_max`]. + pub const DEFAULT_SCHEDULED_DURATION_MAX: Duration = Duration::from_secs(1); + fn is_spawn(&self, meta: &'static Metadata<'static>) -> bool { self.spawn_callsites.contains(meta) } @@ -567,7 +583,11 @@ where fields: record::SerializeFields(fields.clone()), }); if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || { - let stats = Arc::new(stats::TaskStats::new(self.max_poll_duration_nanos, at)); + let stats = Arc::new(stats::TaskStats::new( + self.max_poll_duration_nanos, + self.max_scheduled_duration_nanos, + at, + )); let event = Event::Spawn { id: id.clone(), stats: stats.clone(), diff --git a/console-subscriber/src/stats.rs b/console-subscriber/src/stats.rs index 0e6995260..1ac1e7444 100644 --- a/console-subscriber/src/stats.rs +++ b/console-subscriber/src/stats.rs @@ -117,7 +117,8 @@ struct PollTimestamps { last_poll_ended: Option, busy_time: Duration, scheduled_time: Duration, - histogram: H, + poll_histogram: H, + scheduled_histogram: H, } #[derive(Debug)] @@ -128,8 +129,8 @@ struct Histogram { max_outlier: Option, } -trait RecordPoll { - fn record_poll_duration(&mut self, duration: Duration); +trait RecordDuration { + fn record_duration(&mut self, duration: Duration); } impl TimeAnchor { @@ -153,7 +154,11 @@ impl TimeAnchor { } impl TaskStats { - pub(crate) fn new(poll_duration_max: u64, created_at: Instant) -> Self { + pub(crate) fn new( + poll_duration_max: u64, + scheduled_duration_max: u64, + created_at: Instant, + ) -> Self { Self { is_dirty: AtomicBool::new(true), is_dropped: AtomicBool::new(false), @@ -161,7 +166,8 @@ impl TaskStats { dropped_at: Mutex::new(None), poll_stats: PollStats { timestamps: Mutex::new(PollTimestamps { - histogram: Histogram::new(poll_duration_max), + poll_histogram: Histogram::new(poll_duration_max), + scheduled_histogram: Histogram::new(scheduled_duration_max), first_poll: None, last_wake: None, last_poll_started: None, @@ -240,10 +246,18 @@ impl TaskStats { } pub(crate) fn poll_duration_histogram(&self) -> proto::tasks::task_details::PollTimesHistogram { - let hist = self.poll_stats.timestamps.lock().histogram.to_proto(); + let hist = self.poll_stats.timestamps.lock().poll_histogram.to_proto(); proto::tasks::task_details::PollTimesHistogram::Histogram(hist) } + pub(crate) fn scheduled_duration_histogram(&self) -> proto::tasks::DurationHistogram { + self.poll_stats + .timestamps + .lock() + .scheduled_histogram + .to_proto() + } + #[inline] fn make_dirty(&self) { self.is_dirty.swap(true, AcqRel); @@ -475,7 +489,7 @@ impl ToProto for ResourceStats { // === impl PollStats === -impl PollStats { +impl PollStats { fn wake(&self, at: Instant) { let mut timestamps = self.timestamps.lock(); timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at)); @@ -515,6 +529,10 @@ impl PollStats { return; } }; + + // if we have a scheduled time histogram, add the timestamp + timestamps.scheduled_histogram.record_duration(elapsed); + timestamps.scheduled_time += elapsed; } @@ -550,7 +568,7 @@ impl PollStats { }; // if we have a poll time histogram, add the timestamp - timestamps.histogram.record_poll_duration(elapsed); + timestamps.poll_histogram.record_duration(elapsed); timestamps.busy_time += elapsed; } @@ -636,8 +654,8 @@ impl Histogram { } } -impl RecordPoll for Histogram { - fn record_poll_duration(&mut self, duration: Duration) { +impl RecordDuration for Histogram { + fn record_duration(&mut self, duration: Duration) { let mut duration_ns = duration.as_nanos() as u64; // clamp the duration to the histogram's max value @@ -653,8 +671,8 @@ impl RecordPoll for Histogram { } } -impl RecordPoll for () { - fn record_poll_duration(&mut self, _: Duration) { +impl RecordDuration for () { + fn record_duration(&mut self, _: Duration) { // do nothing } } diff --git a/tokio-console/src/state/histogram.rs b/tokio-console/src/state/histogram.rs index 94b68ab10..03be93339 100644 --- a/tokio-console/src/state/histogram.rs +++ b/tokio-console/src/state/histogram.rs @@ -30,7 +30,7 @@ impl DurationHistogram { }) } - fn from_proto(proto: &proto::DurationHistogram) -> Option { + pub(crate) fn from_proto(proto: &proto::DurationHistogram) -> Option { let histogram = deserialize_histogram(&proto.raw_histogram[..])?; Some(Self { histogram, diff --git a/tokio-console/src/state/mod.rs b/tokio-console/src/state/mod.rs index 1458a1065..91affad16 100644 --- a/tokio-console/src/state/mod.rs +++ b/tokio-console/src/state/mod.rs @@ -221,6 +221,10 @@ impl State { .poll_times_histogram .as_ref() .and_then(histogram::DurationHistogram::from_poll_durations), + scheduled_times_histogram: update + .scheduled_times_histogram + .as_ref() + .and_then(histogram::DurationHistogram::from_proto), }; *self.current_task_details.borrow_mut() = Some(details); diff --git a/tokio-console/src/state/tasks.rs b/tokio-console/src/state/tasks.rs index 1dce737e5..c41665058 100644 --- a/tokio-console/src/state/tasks.rs +++ b/tokio-console/src/state/tasks.rs @@ -32,6 +32,7 @@ pub(crate) struct TasksState { pub(crate) struct Details { pub(crate) span_id: SpanId, pub(crate) poll_times_histogram: Option, + pub(crate) scheduled_times_histogram: Option, } #[derive(Debug, Copy, Clone)] @@ -264,6 +265,10 @@ impl Details { pub(crate) fn poll_times_histogram(&self) -> Option<&DurationHistogram> { self.poll_times_histogram.as_ref() } + + pub(crate) fn scheduled_times_histogram(&self) -> Option<&DurationHistogram> { + self.scheduled_times_histogram.as_ref() + } } impl Task { diff --git a/tokio-console/src/view/durations.rs b/tokio-console/src/view/durations.rs index 29303ed36..e1618dbe0 100644 --- a/tokio-console/src/view/durations.rs +++ b/tokio-console/src/view/durations.rs @@ -36,6 +36,8 @@ pub(crate) struct Durations<'a> { percentiles_title: &'a str, /// Title for histogram sparkline block histogram_title: &'a str, + /// Fixed width for percentiles block + percentiles_width: u16, } impl<'a> Widget for Durations<'a> { @@ -43,7 +45,13 @@ impl<'a> Widget for Durations<'a> { // Only split the durations area in half if we're also drawing a // sparkline. We require UTF-8 to draw the sparkline and also enough width. let (percentiles_area, histogram_area) = if self.styles.utf8 { - let percentiles_width = cmp::max(self.percentiles_title.len() as u16, 13_u16) + 2; + let percentiles_width = match self.percentiles_width { + // Fixed width + width if width > 0 => width, + // Long enough for the title or for a single line + // like "p99: 544.77µs" (13) (and borders on the sides). + _ => cmp::max(self.percentiles_title.len() as u16, 13_u16) + 2, + }; // If there isn't enough width left after drawing the percentiles // then we won't draw the sparkline at all. @@ -88,6 +96,7 @@ impl<'a> Durations<'a> { histogram: None, percentiles_title: "Percentiles", histogram_title: "Histogram", + percentiles_width: 0, } } @@ -105,4 +114,9 @@ impl<'a> Durations<'a> { self.histogram_title = title; self } + + pub(crate) fn percentiles_width(mut self, width: u16) -> Self { + self.percentiles_width = width; + self + } } diff --git a/tokio-console/src/view/task.rs b/tokio-console/src/view/task.rs index c9e021a10..2d6d1d982 100644 --- a/tokio-console/src/view/task.rs +++ b/tokio-console/src/view/task.rs @@ -6,6 +6,7 @@ use crate::{ }; use std::{ cell::RefCell, + cmp, rc::Rc, time::{Duration, SystemTime}, }; @@ -60,47 +61,64 @@ impl TaskView { }) .collect(); - let (controls_area, stats_area, poll_dur_area, fields_area, warnings_area) = - if warnings.is_empty() { - let chunks = Layout::default() - .direction(layout::Direction::Vertical) - .constraints( - [ - // controls - layout::Constraint::Length(1), - // task stats - layout::Constraint::Length(10), - // poll duration - layout::Constraint::Length(9), - // fields - layout::Constraint::Percentage(60), - ] - .as_ref(), - ) - .split(area); - (chunks[0], chunks[1], chunks[2], chunks[3], None) - } else { - let chunks = Layout::default() - .direction(layout::Direction::Vertical) - .constraints( - [ - // controls - layout::Constraint::Length(1), - // warnings (add 2 for top and bottom borders) - layout::Constraint::Length(warnings.len() as u16 + 2), - // task stats - layout::Constraint::Length(10), - // poll duration - layout::Constraint::Length(9), - // fields - layout::Constraint::Percentage(60), - ] - .as_ref(), - ) - .split(area); - - (chunks[0], chunks[2], chunks[3], chunks[4], Some(chunks[1])) - }; + let ( + controls_area, + stats_area, + poll_dur_area, + scheduled_dur_area, + fields_area, + warnings_area, + ) = if warnings.is_empty() { + let chunks = Layout::default() + .direction(layout::Direction::Vertical) + .constraints( + [ + // controls + layout::Constraint::Length(1), + // task stats + layout::Constraint::Length(10), + // poll duration + layout::Constraint::Length(9), + // scheduled duration + layout::Constraint::Length(9), + // fields + layout::Constraint::Percentage(60), + ] + .as_ref(), + ) + .split(area); + (chunks[0], chunks[1], chunks[2], chunks[3], chunks[4], None) + } else { + let chunks = Layout::default() + .direction(layout::Direction::Vertical) + .constraints( + [ + // controls + layout::Constraint::Length(1), + // warnings (add 2 for top and bottom borders) + layout::Constraint::Length(warnings.len() as u16 + 2), + // task stats + layout::Constraint::Length(10), + // poll duration + layout::Constraint::Length(9), + // scheduled duration + layout::Constraint::Length(9), + // fields + layout::Constraint::Percentage(60), + ] + .as_ref(), + ) + .split(area); + + ( + chunks[0], + chunks[2], + chunks[3], + chunks[4], + chunks[5], + Some(chunks[1]), + ) + }; let stats_area = Layout::default() .direction(layout::Direction::Horizontal) @@ -207,16 +225,32 @@ impl TaskView { let task_widget = Paragraph::new(overview).block(styles.border_block().title("Task")); let wakers_widget = Paragraph::new(waker_stats).block(styles.border_block().title("Waker")); + + let poll_percentiles_title = "Poll Times Percentiles"; + let scheduled_percentiles_title = "Sched Times Percentiles"; + let percentiles_width = cmp::max( + poll_percentiles_title.len(), + scheduled_percentiles_title.len(), + ) as u16 + + 2_u16; // extra 2 characters for the border let poll_durations_widget = Durations::new(styles) .histogram(details.and_then(|d| d.poll_times_histogram())) - .percentiles_title("Poll Times Percentiles") - .histogram_title("Poll Times Histogram"); + .percentiles_title(poll_percentiles_title) + .histogram_title("Poll Times Histogram") + .percentiles_width(percentiles_width); + let scheduled_durations_widget = Durations::new(styles) + .histogram(details.and_then(|d| d.scheduled_times_histogram())) + .percentiles_title(scheduled_percentiles_title) + .histogram_title("Scheduled Times Histogram") + .percentiles_width(percentiles_width); + let fields_widget = Paragraph::new(fields).block(styles.border_block().title("Fields")); frame.render_widget(Block::default().title(controls), controls_area); frame.render_widget(task_widget, stats_area[0]); frame.render_widget(wakers_widget, stats_area[1]); frame.render_widget(poll_durations_widget, poll_dur_area); + frame.render_widget(scheduled_durations_widget, scheduled_dur_area); frame.render_widget(fields_widget, fields_area); } }