Skip to content

Commit

Permalink
tokio: deduplicate spawn_blocking (#3017)
Browse files Browse the repository at this point in the history
Move common code and tracing integration into Handle

Fixes #2998
Closes #3004

Signed-off-by: Marc-Antoine Perennou <Marc-Antoine@Perennou.com>
  • Loading branch information
Keruspe authored Oct 21, 2020
1 parent 7d7b79e commit 7fbfa9b
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 21 deletions.
5 changes: 1 addition & 4 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,7 @@ where
F: FnOnce() -> R + Send + 'static,
{
let rt = context::current().expect("not currently running on the Tokio runtime.");

let (task, handle) = task::joinable(BlockingTask::new(func));
let _ = rt.blocking_spawner.spawn(task, &rt);
handle
rt.spawn_blocking(func)
}

#[allow(dead_code)]
Expand Down
25 changes: 25 additions & 0 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::runtime::blocking::task::BlockingTask;
use crate::runtime::task::{self, JoinHandle};
use crate::runtime::{blocking, driver, Spawner};

/// Handle to the runtime.
Expand Down Expand Up @@ -36,4 +38,27 @@ impl Handle {
// {
// context::enter(self.clone(), f)
// }

/// Run the provided function on an executor dedicated to blocking operations.
pub(crate) fn spawn_blocking<F, R>(&self, func: F) -> JoinHandle<R>
where
F: FnOnce() -> R + Send + 'static,
{
#[cfg(feature = "tracing")]
let func = {
let span = tracing::trace_span!(
target: "tokio::task",
"task",
kind = %"blocking",
function = %std::any::type_name::<F>(),
);
move || {
let _g = span.enter();
func()
}
};
let (task, handle) = task::joinable(BlockingTask::new(func));
let _ = self.blocking_spawner.spawn(task, &self);
handle
}
}
5 changes: 1 addition & 4 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ cfg_rt! {

mod blocking;
use blocking::BlockingPool;
use blocking::task::BlockingTask;
pub(crate) use blocking::spawn_blocking;

mod builder;
Expand Down Expand Up @@ -390,9 +389,7 @@ cfg_rt! {
where
F: FnOnce() -> R + Send + 'static,
{
let (task, handle) = task::joinable(BlockingTask::new(func));
let _ = self.handle.blocking_spawner.spawn(task, &self.handle);
handle
self.handle.spawn_blocking(func)
}

/// Run a future to completion on the Tokio runtime. This is the
Expand Down
13 changes: 0 additions & 13 deletions tokio/src/task/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,5 @@ where
F: FnOnce() -> R + Send + 'static,
R: Send + 'static,
{
#[cfg(feature = "tracing")]
let f = {
let span = tracing::trace_span!(
target: "tokio::task",
"task",
kind = %"blocking",
function = %std::any::type_name::<F>(),
);
move || {
let _g = span.enter();
f()
}
};
crate::runtime::spawn_blocking(f)
}

0 comments on commit 7fbfa9b

Please sign in to comment.