Skip to content

Commit

Permalink
Fallback to synchronous bury when no tokio runtime is available
Browse files Browse the repository at this point in the history
  • Loading branch information
florian-g2 committed Sep 6, 2024
1 parent db6c36d commit b4d4a36
Showing 1 changed file with 64 additions and 41 deletions.
105 changes: 64 additions & 41 deletions src/os/windows/limbo/tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use crate::{
os::windows::{winprelude::*, FileHandle},
DebugExpectExt, LOCK_POISON,
};
use std::sync::{Mutex, OnceLock};
use std::sync::{Mutex};
use tokio::{
fs::File,
net::windows::named_pipe::{NamedPipeClient, NamedPipeServer},
runtime::{self, Handle as RuntimeHandle, Runtime},
runtime::{Handle as RuntimeHandle},
sync::mpsc::{unbounded_channel, UnboundedSender},
task,
};
Expand Down Expand Up @@ -38,58 +38,81 @@ impl AsRawHandle for Corpse {
}

type Limbo = UnboundedSender<Corpse>;
static LIMBO: OnceLock<Mutex<Limbo>> = OnceLock::new();
static LIMBO_RT: OnceLock<Runtime> = OnceLock::new();

fn static_runtime_handle() -> &'static RuntimeHandle {
LIMBO_RT
.get_or_init(|| {
runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_io()
.thread_name("Tokio limbo dispatcher")
.thread_stack_size(1024 * 1024)
.build()
.expect(
"\
failed to build Tokio limbo helper (only necessary if the first pipe to be dropped happens to go \
out of scope outside of another Tokio runtime)",
)
})
.handle()
}
static LIMBO: Mutex<Option<Limbo>> = Mutex::new(None);

fn bury(c: Corpse) {
task::spawn_blocking(move || {
let handle = c.as_int_handle();
FileHandle::flush_hndl(handle).debug_expect("limbo flush failed");
});
task::spawn_blocking(move || bury_sync(c));
}

fn create_limbo() -> Limbo {
let (tx, mut rx) = unbounded_channel();
fn bury_sync(c: Corpse) {
let handle = c.as_int_handle();
FileHandle::flush_hndl(handle).debug_expect("limbo flush failed");
}

let mut _guard = None;
fn create_limbo() -> Option<Limbo> {
if RuntimeHandle::try_current().is_err() {
_guard = Some(static_runtime_handle().enter());
return None;
}

let (tx, mut rx) = unbounded_channel();
task::spawn(async move {
while let Some(c) = rx.recv().await {
bury(c);
}
});

tx

if tx.is_closed() {
// The tokio runtime may still have a handle, but we're right in the process of the runtime shutdown.
// When tokio is shutting down, it will drop tasks directly and synchronously at task::spawn methods.
// tx.is_closed() will evaluate to true in that case, because the channel receiver is dropped along with the task.
None
} else {
Some(tx)
}
}

pub(crate) fn send_off(c: Corpse) {
let mutex = LIMBO.get_or_init(|| Mutex::new(create_limbo()));
let mut limbo = mutex.lock().expect(LOCK_POISON);
if let Err(c) = limbo.send(c) {
*limbo = create_limbo();
limbo
.send(c.0)
.ok()
.debug_expect("fresh Tokio limbo helper died immediately after being created");
}
let mut limbo_guard = LIMBO.lock().expect(LOCK_POISON);

let limbo = match limbo_guard.as_ref() {
Some(limbo) => Some(limbo),
// if no limbo exists, create one
None => {
*limbo_guard = create_limbo();
limbo_guard.as_ref()
}
};

let Some(limbo) = limbo else {
// no tokio runtime available for limbo, falling back to synchronous bury
drop(limbo_guard);
bury_sync(c);
return;
};

// try to send the corpse to the limbo
let c = match limbo.send(c) {
Ok(_) => return,
Err(c) => c.0,
};

// we lost the limbo, but maybe it ran on a different tokio runtime which has died in the meantime
// try again using a fresh limbo on the current tokio runtime

*limbo_guard = create_limbo();
let Some(limbo) = limbo_guard.as_ref() else {
// no limbo available, falling back to synchronous bury
drop(limbo_guard);
bury_sync(c);
return;
};

let c = match limbo.send(c) {
Ok(_) => return,
Err(c) => c.0,
};

// we lost the limbo again, now we have no other option than to bury the corpse synchronously
drop(limbo_guard);
bury_sync(c);
}

0 comments on commit b4d4a36

Please sign in to comment.