Skip to content

Commit

Permalink
gio: add spawn_blocking
Browse files Browse the repository at this point in the history
  • Loading branch information
jf2048 committed Nov 14, 2022
1 parent c387a54 commit 9b62dbc
Showing 1 changed file with 68 additions and 0 deletions.
68 changes: 68 additions & 0 deletions gio/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use crate::AsyncResult;
use crate::Cancellable;
use futures_channel::oneshot;
use glib::object::IsA;
use glib::object::ObjectType as ObjectType_;
use glib::signal::connect_raw;
Expand All @@ -10,7 +11,9 @@ use glib::translate::*;
use glib::value::ValueType;
use glib::Cast;
use std::boxed::Box as Box_;
use std::future::Future;
use std::mem::transmute;
use std::panic;
use std::ptr;

glib::wrapper! {
Expand Down Expand Up @@ -382,6 +385,71 @@ impl<V: ValueType + Send> Task<V> {
unsafe impl<V: ValueType + Send> Send for Task<V> {}
unsafe impl<V: ValueType + Send> Sync for Task<V> {}

// rustdoc-stripper-ignore-next
/// A handle to a task running on the I/O thread pool.
///
/// Like [`std::thread::JoinHandle`] for a blocking I/O task rather than a thread. The return value
/// from the task can be retrieved by awaiting on this handle. Dropping the handle "detaches" the
/// task, allowing it to complete but discarding the return value.
#[derive(Debug)]
#[repr(transparent)]
pub struct JoinHandle<T> {
rx: oneshot::Receiver<std::thread::Result<T>>,
}

impl<T> JoinHandle<T> {
#[inline]
fn new() -> (Self, oneshot::Sender<std::thread::Result<T>>) {
let (tx, rx) = oneshot::channel();
(Self { rx }, tx)
}
}

impl<T> Future for JoinHandle<T> {
type Output = std::thread::Result<T>;
#[inline]
fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
std::pin::Pin::new(&mut self.rx)
.poll(cx)
.map(|r| r.unwrap())
}
}

impl<T> futures_core::FusedFuture for JoinHandle<T> {
#[inline]
fn is_terminated(&self) -> bool {
self.rx.is_terminated()
}
}

// rustdoc-stripper-ignore-next
/// Runs a blocking I/O task on the I/O thread pool.
///
/// Calls `func` on the internal Gio thread pool for blocking I/O operations. The thread pool is
/// shared with other Gio async I/O operations, and may rate-limit the tasks it receives. Callers
/// may want to avoid blocking indefinitely by making sure blocking calls eventually time out.
///
/// This function should not be used to spawn async tasks. Instead, use
/// [`glib::MainContext::spawn`] or [`glib::MainContext::spawn_local`] to run a future.
pub fn spawn_blocking<T, F>(func: F) -> JoinHandle<T>
where
T: Send + 'static,
F: FnOnce() -> T + Send + 'static,
{
// use Cancellable::NONE as source obj to fulfill `Send` requirement
let task = unsafe { Task::<bool>::new(Cancellable::NONE, Cancellable::NONE, |_, _| {}) };
let (join, tx) = JoinHandle::new();
task.run_in_thread(move |_, _: Option<&Cancellable>, _| {
let res = panic::catch_unwind(panic::AssertUnwindSafe(func));
let _ = tx.send(res);
});

join
}

#[cfg(test)]
mod test {
use super::*;
Expand Down

0 comments on commit 9b62dbc

Please sign in to comment.