Skip to content

Commit

Permalink
task: add task size to tracing instrumentation (#6881)
Browse files Browse the repository at this point in the history
In Tokio, the futures for tasks are stored on the stack unless they are
explicitly boxed, either by the user or auto-boxed by Tokio when they
are especially large. Auto-boxing now also occurs in release mode
(since #6826).

Having very large futures can be problematic as it can cause a stack
overflow. In some cases it might be desireable to have smaller futures,
even if they are placed on the heap.

This change adds the size of the future driving an async task or the
function driving a blocking task to the tracing instrumentation. In the
case of a future that is auto-boxed by Tokio, both the final size as well
the original size before boxing is included.

To do this, a new struct `SpawnMeta` gets passed down from where a
future might get boxed to where the instrumentation is added. This
contains the task name (optionally) and the original future or function
size. If the `tokio_unstable` cfg flag and the `tracing` feature aren't both
enabled, then this struct will be zero sized, which is a small improvement
on the previous behavior of unconditionally passing down an `Option<&str>`
for the name.

This will make this information immediately available in Tokio Console,
and will enable new lints which will warn users if they have large futures
(just for async tasks).

We have some tests under the `tracing-instrumentation` crate which test
that the `size.bytes` and `original_size.bytes` fields are set correctly.

The minimal version of `tracing` required for Tokio has been bumped from
0.1.25 to 0.1.29 to get the `Value` impl on `Option<T>`. Given that the current
version is 0.1.40, this seems reasonable, especially given that Tracing's MSRV
is still lower than Tokio's in the latest version.
  • Loading branch information
hds authored Oct 8, 2024
1 parent 29cd6ec commit c3a9355
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 103 deletions.
2 changes: 1 addition & 1 deletion tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ futures-io = { version = "0.3.0", optional = true }
futures-util = { version = "0.3.0", optional = true }
pin-project-lite = "0.2.11"
slab = { version = "0.4.4", optional = true } # Backs `DelayQueue`
tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true }
tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true }

[target.'cfg(tokio_unstable)'.dependencies]
hashbrown = { version = "0.14.0", default-features = false, optional = true }
Expand Down
2 changes: 1 addition & 1 deletion tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ socket2 = { version = "0.5.5", optional = true, features = [ "all" ] }
# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
[target.'cfg(tokio_unstable)'.dependencies]
tracing = { version = "0.1.25", default-features = false, features = ["std"], optional = true } # Not in full
tracing = { version = "0.1.29", default-features = false, features = ["std"], optional = true } # Not in full

# Currently unstable. The API exposed by these features may be broken at any time.
# Requires `--cfg tokio_unstable` to enable.
Expand Down
50 changes: 22 additions & 28 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::runtime::builder::ThreadNameFn;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{Builder, Callback, Handle, BOX_FUTURE_THRESHOLD};
use crate::util::metric_atomics::MetricAtomicUsize;
use crate::util::trace::{blocking_task, SpawnMeta};

use std::collections::{HashMap, VecDeque};
use std::fmt;
Expand Down Expand Up @@ -299,10 +300,21 @@ impl Spawner {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, spawn_result) = if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
self.spawn_blocking_inner(Box::new(func), Mandatory::NonMandatory, None, rt)
let fn_size = std::mem::size_of::<F>();
let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD {
self.spawn_blocking_inner(
Box::new(func),
Mandatory::NonMandatory,
SpawnMeta::new_unnamed(fn_size),
rt,
)
} else {
self.spawn_blocking_inner(func, Mandatory::NonMandatory, None, rt)
self.spawn_blocking_inner(
func,
Mandatory::NonMandatory,
SpawnMeta::new_unnamed(fn_size),
rt,
)
};

match spawn_result {
Expand All @@ -326,18 +338,19 @@ impl Spawner {
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let (join_handle, spawn_result) = if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
let fn_size = std::mem::size_of::<F>();
let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD {
self.spawn_blocking_inner(
Box::new(func),
Mandatory::Mandatory,
None,
SpawnMeta::new_unnamed(fn_size),
rt,
)
} else {
self.spawn_blocking_inner(
func,
Mandatory::Mandatory,
None,
SpawnMeta::new_unnamed(fn_size),
rt,
)
};
Expand All @@ -355,35 +368,16 @@ impl Spawner {
&self,
func: F,
is_mandatory: Mandatory,
name: Option<&str>,
spawn_meta: SpawnMeta<'_>,
rt: &Handle,
) -> (JoinHandle<R>, Result<(), SpawnError>)
where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
let fut = BlockingTask::new(func);
let id = task::Id::next();
#[cfg(all(tokio_unstable, feature = "tracing"))]
let fut = {
use tracing::Instrument;
let location = std::panic::Location::caller();
let span = tracing::trace_span!(
target: "tokio::task::blocking",
"runtime.spawn",
kind = %"blocking",
task.name = %name.unwrap_or_default(),
task.id = id.as_u64(),
"fn" = %std::any::type_name::<F>(),
loc.file = location.file(),
loc.line = location.line(),
loc.col = location.column(),
);
fut.instrument(span)
};

#[cfg(not(all(tokio_unstable, feature = "tracing")))]
let _ = name;
let fut =
blocking_task::<F, BlockingTask<F>>(BlockingTask::new(func), spawn_meta, id.as_u64());

let (task, handle) = task::unowned(fut, BlockingSchedule::new(rt), id);

Expand Down
25 changes: 14 additions & 11 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ pub struct Handle {
use crate::runtime::task::JoinHandle;
use crate::runtime::BOX_FUTURE_THRESHOLD;
use crate::util::error::{CONTEXT_MISSING_ERROR, THREAD_LOCAL_DESTROYED_ERROR};
use crate::util::trace::SpawnMeta;

use std::future::Future;
use std::marker::PhantomData;
use std::{error, fmt};
use std::{error, fmt, mem};

/// Runtime context guard.
///
Expand Down Expand Up @@ -189,10 +190,11 @@ impl Handle {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
self.spawn_named(Box::pin(future), None)
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
} else {
self.spawn_named(future, None)
self.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
}
}

Expand Down Expand Up @@ -296,15 +298,16 @@ impl Handle {
/// [`tokio::time`]: crate::time
#[track_caller]
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
self.block_on_inner(Box::pin(future))
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
} else {
self.block_on_inner(future)
self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
}
}

#[track_caller]
fn block_on_inner<F: Future>(&self, future: F) -> F::Output {
fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
#[cfg(all(
tokio_unstable,
tokio_taskdump,
Expand All @@ -316,7 +319,7 @@ impl Handle {

#[cfg(all(tokio_unstable, feature = "tracing"))]
let future =
crate::util::trace::task(future, "block_on", None, super::task::Id::next().as_u64());
crate::util::trace::task(future, "block_on", _meta, super::task::Id::next().as_u64());

// Enter the runtime context. This sets the current driver handles and
// prevents blocking an existing runtime.
Expand All @@ -326,7 +329,7 @@ impl Handle {
}

#[track_caller]
pub(crate) fn spawn_named<F>(&self, future: F, _name: Option<&str>) -> JoinHandle<F::Output>
pub(crate) fn spawn_named<F>(&self, future: F, _meta: SpawnMeta<'_>) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
Expand All @@ -341,7 +344,7 @@ impl Handle {
))]
let future = super::task::trace::Trace::root(future);
#[cfg(all(tokio_unstable, feature = "tracing"))]
let future = crate::util::trace::task(future, "task", _name, id.as_u64());
let future = crate::util::trace::task(future, "task", _meta, id.as_u64());
self.inner.spawn(future, id)
}

Expand Down
22 changes: 14 additions & 8 deletions tokio/src/runtime/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ use crate::runtime::blocking::BlockingPool;
use crate::runtime::scheduler::CurrentThread;
use crate::runtime::{context, EnterGuard, Handle};
use crate::task::JoinHandle;
use crate::util::trace::SpawnMeta;

use std::future::Future;
use std::mem;
use std::time::Duration;

cfg_rt_multi_thread! {
Expand Down Expand Up @@ -241,10 +243,13 @@ impl Runtime {
F: Future + Send + 'static,
F::Output: Send + 'static,
{
if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
self.handle.spawn_named(Box::pin(future), None)
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.handle
.spawn_named(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
} else {
self.handle.spawn_named(future, None)
self.handle
.spawn_named(future, SpawnMeta::new_unnamed(fut_size))
}
}

Expand Down Expand Up @@ -329,15 +334,16 @@ impl Runtime {
/// [handle]: fn@Handle::block_on
#[track_caller]
pub fn block_on<F: Future>(&self, future: F) -> F::Output {
if std::mem::size_of::<F>() > BOX_FUTURE_THRESHOLD {
self.block_on_inner(Box::pin(future))
let fut_size = mem::size_of::<F>();
if fut_size > BOX_FUTURE_THRESHOLD {
self.block_on_inner(Box::pin(future), SpawnMeta::new_unnamed(fut_size))
} else {
self.block_on_inner(future)
self.block_on_inner(future, SpawnMeta::new_unnamed(fut_size))
}
}

#[track_caller]
fn block_on_inner<F: Future>(&self, future: F) -> F::Output {
fn block_on_inner<F: Future>(&self, future: F, _meta: SpawnMeta<'_>) -> F::Output {
#[cfg(all(
tokio_unstable,
tokio_taskdump,
Expand All @@ -351,7 +357,7 @@ impl Runtime {
let future = crate::util::trace::task(
future,
"block_on",
None,
_meta,
crate::runtime::task::Id::next().as_u64(),
);

Expand Down
39 changes: 24 additions & 15 deletions tokio/src/task/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
use crate::{
runtime::{Handle, BOX_FUTURE_THRESHOLD},
task::{JoinHandle, LocalSet},
util::trace::SpawnMeta,
};
use std::{future::Future, io};
use std::{future::Future, io, mem};

/// Factory which is used to configure the properties of a new task.
///
Expand Down Expand Up @@ -88,10 +89,11 @@ impl<'a> Builder<'a> {
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
Ok(if std::mem::size_of::<Fut>() > BOX_FUTURE_THRESHOLD {
super::spawn::spawn_inner(Box::pin(future), self.name)
let fut_size = mem::size_of::<Fut>();
Ok(if fut_size > BOX_FUTURE_THRESHOLD {
super::spawn::spawn_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size))
} else {
super::spawn::spawn_inner(future, self.name)
super::spawn::spawn_inner(future, SpawnMeta::new(self.name, fut_size))
})
}

Expand All @@ -108,10 +110,11 @@ impl<'a> Builder<'a> {
Fut: Future + Send + 'static,
Fut::Output: Send + 'static,
{
Ok(if std::mem::size_of::<Fut>() > BOX_FUTURE_THRESHOLD {
handle.spawn_named(Box::pin(future), self.name)
let fut_size = mem::size_of::<Fut>();
Ok(if fut_size > BOX_FUTURE_THRESHOLD {
handle.spawn_named(Box::pin(future), SpawnMeta::new(self.name, fut_size))
} else {
handle.spawn_named(future, self.name)
handle.spawn_named(future, SpawnMeta::new(self.name, fut_size))
})
}

Expand All @@ -135,10 +138,11 @@ impl<'a> Builder<'a> {
Fut: Future + 'static,
Fut::Output: 'static,
{
Ok(if std::mem::size_of::<Fut>() > BOX_FUTURE_THRESHOLD {
super::local::spawn_local_inner(Box::pin(future), self.name)
let fut_size = mem::size_of::<Fut>();
Ok(if fut_size > BOX_FUTURE_THRESHOLD {
super::local::spawn_local_inner(Box::pin(future), SpawnMeta::new(self.name, fut_size))
} else {
super::local::spawn_local_inner(future, self.name)
super::local::spawn_local_inner(future, SpawnMeta::new(self.name, fut_size))
})
}

Expand All @@ -159,7 +163,12 @@ impl<'a> Builder<'a> {
Fut: Future + 'static,
Fut::Output: 'static,
{
Ok(local_set.spawn_named(future, self.name))
let fut_size = mem::size_of::<Fut>();
Ok(if fut_size > BOX_FUTURE_THRESHOLD {
local_set.spawn_named(Box::pin(future), SpawnMeta::new(self.name, fut_size))
} else {
local_set.spawn_named(future, SpawnMeta::new(self.name, fut_size))
})
}

/// Spawns blocking code on the blocking threadpool.
Expand Down Expand Up @@ -200,19 +209,19 @@ impl<'a> Builder<'a> {
Output: Send + 'static,
{
use crate::runtime::Mandatory;
let (join_handle, spawn_result) = if std::mem::size_of::<Function>() > BOX_FUTURE_THRESHOLD
{
let fn_size = mem::size_of::<Function>();
let (join_handle, spawn_result) = if fn_size > BOX_FUTURE_THRESHOLD {
handle.inner.blocking_spawner().spawn_blocking_inner(
Box::new(function),
Mandatory::NonMandatory,
self.name,
SpawnMeta::new(self.name, fn_size),
handle,
)
} else {
handle.inner.blocking_spawner().spawn_blocking_inner(
function,
Mandatory::NonMandatory,
self.name,
SpawnMeta::new(self.name, fn_size),
handle,
)
};
Expand Down
Loading

0 comments on commit c3a9355

Please sign in to comment.