From c3a935541d911cd9afa6838a1bb02e4f499b244d Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Tue, 8 Oct 2024 10:51:03 +0200 Subject: [PATCH] task: add task size to tracing instrumentation (#6881) In Tokio, the futures for tasks are stored on the stack unless they are explicitly boxed, either by the user or auto-boxed by Tokio when they are especially large. Auto-boxing now also occurs in release mode (since #6826). Having very large futures can be problematic as it can cause a stack overflow. In some cases it might be desireable to have smaller futures, even if they are placed on the heap. This change adds the size of the future driving an async task or the function driving a blocking task to the tracing instrumentation. In the case of a future that is auto-boxed by Tokio, both the final size as well the original size before boxing is included. To do this, a new struct `SpawnMeta` gets passed down from where a future might get boxed to where the instrumentation is added. This contains the task name (optionally) and the original future or function size. If the `tokio_unstable` cfg flag and the `tracing` feature aren't both enabled, then this struct will be zero sized, which is a small improvement on the previous behavior of unconditionally passing down an `Option<&str>` for the name. This will make this information immediately available in Tokio Console, and will enable new lints which will warn users if they have large futures (just for async tasks). We have some tests under the `tracing-instrumentation` crate which test that the `size.bytes` and `original_size.bytes` fields are set correctly. The minimal version of `tracing` required for Tokio has been bumped from 0.1.25 to 0.1.29 to get the `Value` impl on `Option`. Given that the current version is 0.1.40, this seems reasonable, especially given that Tracing's MSRV is still lower than Tokio's in the latest version. --- tokio-util/Cargo.toml | 2 +- tokio/Cargo.toml | 2 +- tokio/src/runtime/blocking/pool.rs | 50 ++++---- tokio/src/runtime/handle.rs | 25 ++-- tokio/src/runtime/runtime.rs | 22 ++-- tokio/src/task/builder.rs | 39 +++--- tokio/src/task/local.rs | 36 +++--- tokio/src/task/spawn.rs | 14 +-- tokio/src/util/trace.rs | 114 +++++++++++++++--- .../tracing-instrumentation/tests/task.rs | 77 ++++++++++++ 10 files changed, 278 insertions(+), 103 deletions(-) diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index a73eec8799a..12f70be862e 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -42,7 +42,7 @@ futures-io = { version = "0.3.0", optional = true } futures-util = { version = "0.3.0", optional = true } pin-project-lite = "0.2.11" slab = { version = "0.4.4", optional = true } # Backs `DelayQueue` -tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } +tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true } [target.'cfg(tokio_unstable)'.dependencies] hashbrown = { version = "0.14.0", default-features = false, optional = true } diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 4c5f7d46acb..b07f50150b7 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -101,7 +101,7 @@ socket2 = { version = "0.5.5", optional = true, features = [ "all" ] } # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. [target.'cfg(tokio_unstable)'.dependencies] -tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } # Not in full +tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true } # Not in full # Currently unstable. The API exposed by these features may be broken at any time. # Requires `--cfg tokio_unstable` to enable. diff --git a/tokio/src/runtime/blocking/pool.rs b/tokio/src/runtime/blocking/pool.rs index f8466a19bd9..7eec91d23d9 100644 --- a/tokio/src/runtime/blocking/pool.rs +++ b/tokio/src/runtime/blocking/pool.rs @@ -8,6 +8,7 @@ use crate::runtime::builder::ThreadNameFn; use crate::runtime::task::{self, JoinHandle}; use crate::runtime::{Builder, Callback, Handle, BOX_FUTURE_THRESHOLD}; use crate::util::metric_atomics::MetricAtomicUsize; +use crate::util::trace::{blocking_task, SpawnMeta}; use std::collections::{HashMap, VecDeque}; use std::fmt; @@ -299,10 +300,21 @@ impl Spawner { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (join_handle, spawn_result) = if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, None, rt) + let fn_size = std::mem::size_of::(); + let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD { + self.spawn_blocking_inner( + Box::new(func), + Mandatory::NonMandatory, + SpawnMeta::new_unnamed(fn_size), + rt, + ) } else { - self.spawn_blocking_inner(func, Mandatory::NonMandatory, None, rt) + self.spawn_blocking_inner( + func, + Mandatory::NonMandatory, + SpawnMeta::new_unnamed(fn_size), + rt, + ) }; match spawn_result { @@ -326,18 +338,19 @@ impl Spawner { F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let (join_handle, spawn_result) = if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { + let fn_size = std::mem::size_of::(); + let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD { self.spawn_blocking_inner( Box::new(func), Mandatory::Mandatory, - None, + SpawnMeta::new_unnamed(fn_size), rt, ) } else { self.spawn_blocking_inner( func, Mandatory::Mandatory, - None, + SpawnMeta::new_unnamed(fn_size), rt, ) }; @@ -355,35 +368,16 @@ impl Spawner { &self, func: F, is_mandatory: Mandatory, - name: Option<&str>, + spawn_meta: SpawnMeta<'_>, rt: &Handle, ) -> (JoinHandle, Result<(), SpawnError>) where F: FnOnce() -> R + Send + 'static, R: Send + 'static, { - let fut = BlockingTask::new(func); let id = task::Id::next(); - #[cfg(all(tokio_unstable, feature = "tracing"))] - let fut = { - use tracing::Instrument; - let location = std::panic::Location::caller(); - let span = tracing::trace_span!( - target: "tokio::task::blocking", - "runtime.spawn", - kind = %"blocking", - task.name = %name.unwrap_or_default(), - task.id = id.as_u64(), - "fn" = %std::any::type_name::(), - loc.file = location.file(), - loc.line = location.line(), - loc.col = location.column(), - ); - fut.instrument(span) - }; - - #[cfg(not(all(tokio_unstable, feature = "tracing")))] - let _ = name; + let fut = + blocking_task::>(BlockingTask::new(func), spawn_meta, id.as_u64()); let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id); diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 7e3cd1504e5..9026e8773a0 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -18,10 +18,11 @@ pub struct Handle { use crate::runtime::task::JoinHandle; use crate::runtime::BOX_FUTURE_THRESHOLD; use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR}; +use crate::util::trace::SpawnMeta; use std::future::Future; use std::marker::PhantomData; -use std::{error, fmt}; +use std::{error, fmt, mem}; /// Runtime context guard. /// @@ -189,10 +190,11 @@ impl Handle { F: Future + Send + 'static, F::Output: Send + 'static, { - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - self.spawn_named(Box::pin(future), None) + let fut_size = mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.spawn_named(future, None) + self.spawn_named(future, SpawnMeta::new_unnamed(fut_size)) } } @@ -296,15 +298,16 @@ impl Handle { /// [`tokio::time`]: crate::time #[track_caller] pub fn block_on(&self, future: F) -> F::Output { - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - self.block_on_inner(Box::pin(future)) + let fut_size = mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.block_on_inner(future) + self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - fn block_on_inner(&self, future: F) -> F::Output { + fn block_on_inner(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output { #[cfg(all( tokio_unstable, tokio_taskdump, @@ -316,7 +319,7 @@ impl Handle { #[cfg(all(tokio_unstable, feature = "tracing"))] let future = - crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64()); + crate::util::trace::task(future, "block_on", _meta, super::task::Id::next().as_u64()); // Enter the runtime context. This sets the current driver handles and // prevents blocking an existing runtime. @@ -326,7 +329,7 @@ impl Handle { } #[track_caller] - pub(crate) fn spawn_named(&self, future: F, _name: Option<&str>) -> JoinHandle + pub(crate) fn spawn_named(&self, future: F, _meta: SpawnMeta<'_>) -> JoinHandle where F: Future + Send + 'static, F::Output: Send + 'static, @@ -341,7 +344,7 @@ impl Handle { ))] let future = super::task::trace::Trace::root(future); #[cfg(all(tokio_unstable, feature = "tracing"))] - let future = crate::util::trace::task(future, "task", _name, id.as_u64()); + let future = crate::util::trace::task(future, "task", _meta, id.as_u64()); self.inner.spawn(future, id) } diff --git a/tokio/src/runtime/runtime.rs b/tokio/src/runtime/runtime.rs index 74061d24ce8..4b22ae9747c 100644 --- a/tokio/src/runtime/runtime.rs +++ b/tokio/src/runtime/runtime.rs @@ -3,8 +3,10 @@ use crate::runtime::blocking::BlockingPool; use crate::runtime::scheduler::CurrentThread; use crate::runtime::{context, EnterGuard, Handle}; use crate::task::JoinHandle; +use crate::util::trace::SpawnMeta; use std::future::Future; +use std::mem; use std::time::Duration; cfg_rt_multi_thread! { @@ -241,10 +243,13 @@ impl Runtime { F: Future + Send + 'static, F::Output: Send + 'static, { - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - self.handle.spawn_named(Box::pin(future), None) + let fut_size = mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.handle + .spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.handle.spawn_named(future, None) + self.handle + .spawn_named(future, SpawnMeta::new_unnamed(fut_size)) } } @@ -329,15 +334,16 @@ impl Runtime { /// [handle]: fn@Handle::block_on #[track_caller] pub fn block_on(&self, future: F) -> F::Output { - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - self.block_on_inner(Box::pin(future)) + let fut_size = mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - self.block_on_inner(future) + self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - fn block_on_inner(&self, future: F) -> F::Output { + fn block_on_inner(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output { #[cfg(all( tokio_unstable, tokio_taskdump, @@ -351,7 +357,7 @@ impl Runtime { let future = crate::util::trace::task( future, "block_on", - None, + _meta, crate::runtime::task::Id::next().as_u64(), ); diff --git a/tokio/src/task/builder.rs b/tokio/src/task/builder.rs index c98849b2746..6053352a01c 100644 --- a/tokio/src/task/builder.rs +++ b/tokio/src/task/builder.rs @@ -2,8 +2,9 @@ use crate::{ runtime::{Handle, BOX_FUTURE_THRESHOLD}, task::{JoinHandle, LocalSet}, + util::trace::SpawnMeta, }; -use std::{future::Future, io}; +use std::{future::Future, io, mem}; /// Factory which is used to configure the properties of a new task. /// @@ -88,10 +89,11 @@ impl<'a> Builder<'a> { Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - Ok(if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - super::spawn::spawn_inner(Box::pin(future), self.name) + let fut_size = mem::size_of::(); + Ok(if fut_size > BOX_FUTURE_THRESHOLD { + super::spawn::spawn_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size)) } else { - super::spawn::spawn_inner(future, self.name) + super::spawn::spawn_inner(future, SpawnMeta::new(self.name, fut_size)) }) } @@ -108,10 +110,11 @@ impl<'a> Builder<'a> { Fut: Future + Send + 'static, Fut::Output: Send + 'static, { - Ok(if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - handle.spawn_named(Box::pin(future), self.name) + let fut_size = mem::size_of::(); + Ok(if fut_size > BOX_FUTURE_THRESHOLD { + handle.spawn_named(Box::pin(future), SpawnMeta::new(self.name, fut_size)) } else { - handle.spawn_named(future, self.name) + handle.spawn_named(future, SpawnMeta::new(self.name, fut_size)) }) } @@ -135,10 +138,11 @@ impl<'a> Builder<'a> { Fut: Future + 'static, Fut::Output: 'static, { - Ok(if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - super::local::spawn_local_inner(Box::pin(future), self.name) + let fut_size = mem::size_of::(); + Ok(if fut_size > BOX_FUTURE_THRESHOLD { + super::local::spawn_local_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size)) } else { - super::local::spawn_local_inner(future, self.name) + super::local::spawn_local_inner(future, SpawnMeta::new(self.name, fut_size)) }) } @@ -159,7 +163,12 @@ impl<'a> Builder<'a> { Fut: Future + 'static, Fut::Output: 'static, { - Ok(local_set.spawn_named(future, self.name)) + let fut_size = mem::size_of::(); + Ok(if fut_size > BOX_FUTURE_THRESHOLD { + local_set.spawn_named(Box::pin(future), SpawnMeta::new(self.name, fut_size)) + } else { + local_set.spawn_named(future, SpawnMeta::new(self.name, fut_size)) + }) } /// Spawns blocking code on the blocking threadpool. @@ -200,19 +209,19 @@ impl<'a> Builder<'a> { Output: Send + 'static, { use crate::runtime::Mandatory; - let (join_handle, spawn_result) = if std::mem::size_of::() > BOX_FUTURE_THRESHOLD - { + let fn_size = mem::size_of::(); + let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD { handle.inner.blocking_spawner().spawn_blocking_inner( Box::new(function), Mandatory::NonMandatory, - self.name, + SpawnMeta::new(self.name, fn_size), handle, ) } else { handle.inner.blocking_spawner().spawn_blocking_inner( function, Mandatory::NonMandatory, - self.name, + SpawnMeta::new(self.name, fn_size), handle, ) }; diff --git a/tokio/src/task/local.rs b/tokio/src/task/local.rs index 90d4d3612e8..d5341937893 100644 --- a/tokio/src/task/local.rs +++ b/tokio/src/task/local.rs @@ -6,6 +6,7 @@ use crate::runtime; use crate::runtime::task::{self, JoinHandle, LocalOwnedTasks, Task, TaskHarnessScheduleHooks}; use crate::runtime::{context, ThreadId, BOX_FUTURE_THRESHOLD}; use crate::sync::AtomicWaker; +use crate::util::trace::SpawnMeta; use crate::util::RcCell; use std::cell::Cell; @@ -13,6 +14,7 @@ use std::collections::VecDeque; use std::fmt; use std::future::Future; use std::marker::PhantomData; +use std::mem; use std::pin::Pin; use std::rc::Rc; use std::task::Poll; @@ -367,22 +369,23 @@ cfg_rt! { F: Future + 'static, F::Output: 'static, { - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - spawn_local_inner(Box::pin(future), None) + let fut_size = std::mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + spawn_local_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - spawn_local_inner(future, None) + spawn_local_inner(future, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - pub(super) fn spawn_local_inner(future: F, name: Option<&str>) -> JoinHandle + pub(super) fn spawn_local_inner(future: F, meta: SpawnMeta<'_>) -> JoinHandle where F: Future + 'static, F::Output: 'static { match CURRENT.with(|LocalData { ctx, .. }| ctx.get()) { None => panic!("`spawn_local` called from outside of a `task::LocalSet`"), - Some(cx) => cx.spawn(future, name) + Some(cx) => cx.spawn(future, meta) } } } @@ -521,7 +524,12 @@ impl LocalSet { F: Future + 'static, F::Output: 'static, { - self.spawn_named(future, None) + let fut_size = mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) + } else { + self.spawn_named(future, SpawnMeta::new_unnamed(fut_size)) + } } /// Runs a future to completion on the provided runtime, driving any local @@ -643,26 +651,22 @@ impl LocalSet { pub(in crate::task) fn spawn_named( &self, future: F, - name: Option<&str>, + meta: SpawnMeta<'_>, ) -> JoinHandle where F: Future + 'static, F::Output: 'static, { - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - self.spawn_named_inner(Box::pin(future), name) - } else { - self.spawn_named_inner(future, name) - } + self.spawn_named_inner(future, meta) } #[track_caller] - fn spawn_named_inner(&self, future: F, name: Option<&str>) -> JoinHandle + fn spawn_named_inner(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle where F: Future + 'static, F::Output: 'static, { - let handle = self.context.spawn(future, name); + let handle = self.context.spawn(future, meta); // Because a task was spawned from *outside* the `LocalSet`, wake the // `LocalSet` future to execute the new task, if it hasn't been woken. @@ -949,13 +953,13 @@ impl Drop for LocalSet { impl Context { #[track_caller] - fn spawn(&self, future: F, name: Option<&str>) -> JoinHandle + fn spawn(&self, future: F, meta: SpawnMeta<'_>) -> JoinHandle where F: Future + 'static, F::Output: 'static, { let id = crate::runtime::task::Id::next(); - let future = crate::util::trace::task(future, "local", name, id.as_u64()); + let future = crate::util::trace::task(future, "local", meta, id.as_u64()); // Safety: called from the thread that owns the `LocalSet` let (handle, notified) = { diff --git a/tokio/src/task/spawn.rs b/tokio/src/task/spawn.rs index 4208ac6e0c6..7c748226121 100644 --- a/tokio/src/task/spawn.rs +++ b/tokio/src/task/spawn.rs @@ -1,5 +1,6 @@ use crate::runtime::BOX_FUTURE_THRESHOLD; use crate::task::JoinHandle; +use crate::util::trace::SpawnMeta; use std::future::Future; @@ -167,17 +168,16 @@ cfg_rt! { F: Future + Send + 'static, F::Output: Send + 'static, { - // preventing stack overflows on debug mode, by quickly sending the - // task to the heap. - if std::mem::size_of::() > BOX_FUTURE_THRESHOLD { - spawn_inner(Box::pin(future), None) + let fut_size = std::mem::size_of::(); + if fut_size > BOX_FUTURE_THRESHOLD { + spawn_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size)) } else { - spawn_inner(future, None) + spawn_inner(future, SpawnMeta::new_unnamed(fut_size)) } } #[track_caller] - pub(super) fn spawn_inner(future: T, name: Option<&str>) -> JoinHandle + pub(super) fn spawn_inner(future: T, meta: SpawnMeta<'_>) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, @@ -197,7 +197,7 @@ cfg_rt! { ))] let future = task::trace::Trace::root(future); let id = task::Id::next(); - let task = crate::util::trace::task(future, "task", name, id.as_u64()); + let task = crate::util::trace::task(future, "task", meta, id.as_u64()); match context::with_current(|handle| handle.spawn(task, id)) { Ok(join_handle) => join_handle, diff --git a/tokio/src/util/trace.rs b/tokio/src/util/trace.rs index e1827686ca9..97006df474e 100644 --- a/tokio/src/util/trace.rs +++ b/tokio/src/util/trace.rs @@ -1,34 +1,110 @@ -cfg_trace! { - cfg_rt! { +cfg_rt! { + use std::marker::PhantomData; + + pub(crate) struct SpawnMeta<'a> { + /// The name of the task + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) name: Option<&'a str>, + /// The original size of the future or function being spawned + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) original_size: usize, + _pd: PhantomData<&'a ()>, + } + + impl<'a> SpawnMeta<'a> { + /// Create new spawn meta with a name and original size (before possible auto-boxing) + #[cfg(all(tokio_unstable, feature = "tracing"))] + pub(crate) fn new(name: Option<&'a str>, original_size: usize) -> Self { + Self { + name, + original_size, + _pd: PhantomData, + } + } + + /// Create a new unnamed spawn meta with the original size (before possible auto-boxing) + pub(crate) fn new_unnamed(original_size: usize) -> Self { + #[cfg(not(all(tokio_unstable, feature = "tracing")))] + let _original_size = original_size; + + Self { + #[cfg(all(tokio_unstable, feature = "tracing"))] + name: None, + #[cfg(all(tokio_unstable, feature = "tracing"))] + original_size, + _pd: PhantomData, + } + } + } + + cfg_trace! { use core::{ pin::Pin, task::{Context, Poll}, }; use pin_project_lite::pin_project; + use std::mem; use std::future::Future; + use tracing::instrument::Instrument; pub(crate) use tracing::instrument::Instrumented; #[inline] #[track_caller] - pub(crate) fn task(task: F, kind: &'static str, name: Option<&str>, id: u64) -> Instrumented { + pub(crate) fn task(task: F, kind: &'static str, meta: SpawnMeta<'_>, id: u64) -> Instrumented { #[track_caller] - fn get_span(kind: &'static str, name: Option<&str>, id: u64) -> tracing::Span { + fn get_span(kind: &'static str, spawn_meta: SpawnMeta<'_>, id: u64, task_size: usize) -> tracing::Span { let location = std::panic::Location::caller(); + let original_size = if spawn_meta.original_size != task_size { + Some(spawn_meta.original_size) + } else { + None + }; tracing::trace_span!( target: "tokio::task", parent: None, "runtime.spawn", %kind, - task.name = %name.unwrap_or_default(), + task.name = %spawn_meta.name.unwrap_or_default(), task.id = id, + original_size.bytes = original_size, + size.bytes = task_size, loc.file = location.file(), loc.line = location.line(), loc.col = location.column(), ) } use tracing::instrument::Instrument; - let span = get_span(kind, name, id); + let span = get_span(kind, meta, id, mem::size_of::()); + task.instrument(span) + } + + #[inline] + #[track_caller] + pub(crate) fn blocking_task(task: Fut, spawn_meta: SpawnMeta<'_>, id: u64) -> Instrumented { + let location = std::panic::Location::caller(); + + let fn_size = mem::size_of::(); + let original_size = if spawn_meta.original_size != fn_size { + Some(spawn_meta.original_size) + } else { + None + }; + + let span = tracing::trace_span!( + target: "tokio::task::blocking", + "runtime.spawn", + kind = %"blocking", + task.name = %spawn_meta.name.unwrap_or_default(), + task.id = id, + "fn" = %std::any::type_name::(), + original_size.bytes = original_size, + size.bytes = fn_size, + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ); task.instrument(span) + } pub(crate) fn async_op(inner: P, resource_span: tracing::Span, source: &str, poll_op_name: &'static str, inherits_child_attrs: bool) -> InstrumentedAsyncOp @@ -83,7 +159,23 @@ cfg_trace! { } } } + + cfg_not_trace! { + #[inline] + pub(crate) fn task(task: F, _kind: &'static str, _meta: SpawnMeta<'_>, _id: u64) -> F { + // nop + task + } + + #[inline] + pub(crate) fn blocking_task(task: Fut, _spawn_meta: SpawnMeta<'_>, _id: u64) -> Fut { + let _ = PhantomData::<&Fn>; + // nop + task + } + } } + cfg_time! { #[track_caller] pub(crate) fn caller_location() -> Option<&'static std::panic::Location<'static>> { @@ -93,13 +185,3 @@ cfg_time! { None } } - -cfg_not_trace! { - cfg_rt! { - #[inline] - pub(crate) fn task(task: F, _: &'static str, _name: Option<&str>, _: u64) -> F { - // nop - task - } - } -} diff --git a/tokio/tests/tracing-instrumentation/tests/task.rs b/tokio/tests/tracing-instrumentation/tests/task.rs index 7bdb078e32c..fb215ca7ce0 100644 --- a/tokio/tests/tracing-instrumentation/tests/task.rs +++ b/tokio/tests/tracing-instrumentation/tests/task.rs @@ -3,6 +3,8 @@ //! These tests ensure that the instrumentation for task spawning and task //! lifecycles is correct. +use std::{mem, time::Duration}; + use tokio::task; use tracing_mock::{expect, span::NewSpan, subscriber}; @@ -93,6 +95,81 @@ async fn task_builder_loc_file_recorded() { handle.assert_finished(); } +#[tokio::test] +async fn task_spawn_sizes_recorded() { + let future = futures::future::ready(()); + let size = mem::size_of_val(&future) as u64; + + let task_span = expect::span() + .named("runtime.spawn") + .with_target("tokio::task") + // TODO(hds): check that original_size.bytes is NOT recorded when this can be done in + // tracing-mock without listing every other field. + .with_field(expect::field("size.bytes").with_value(&size)); + + let (subscriber, handle) = subscriber::mock().new_span(task_span).run_with_handle(); + + { + let _guard = tracing::subscriber::set_default(subscriber); + + task::Builder::new() + .spawn(future) + .unwrap() + .await + .expect("failed to await join handle"); + } + + handle.assert_finished(); +} + +#[tokio::test] +async fn task_big_spawn_sizes_recorded() { + let future = { + async fn big() { + let mut a = [0_u8; N]; + for (idx, item) in a.iter_mut().enumerate() { + *item = (idx % 256) as u8; + } + tokio::time::sleep(Duration::from_millis(10)).await; + for (idx, item) in a.iter_mut().enumerate() { + assert_eq!(*item, (idx % 256) as u8); + } + } + + // This is larger than the release auto-boxing threshold + big::<20_000>() + }; + + fn boxed_size(_: &T) -> usize { + mem::size_of::>() + } + let size = mem::size_of_val(&future) as u64; + let boxed_size = boxed_size(&future); + + let task_span = expect::span() + .named("runtime.spawn") + .with_target("tokio::task") + .with_field( + expect::field("size.bytes") + .with_value(&boxed_size) + .and(expect::field("original_size.bytes").with_value(&size)), + ); + + let (subscriber, handle) = subscriber::mock().new_span(task_span).run_with_handle(); + + { + let _guard = tracing::subscriber::set_default(subscriber); + + task::Builder::new() + .spawn(future) + .unwrap() + .await + .expect("failed to await join handle"); + } + + handle.assert_finished(); +} + /// Expect a task with name /// /// This is a convenience function to create the expectation for a new task