From b2453091e726772802b216a477841b816a137718 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Mon, 1 Feb 2021 13:43:26 +0000 Subject: [PATCH] Fix: hand off process killing to blocking thread, await all children. This should make dropping `ChildContainer`s and their parent `Input`s safer in async contexts. It seems like SIGINT is insufficient to make wait terminate, but SIGKILL suffices. This introduced a new problem, namely that we have to remember and wait on *every* pid we create. This should, hopefully, put the issue of zombie processes to bed for good. --- Cargo.toml | 5 --- src/input/child.rs | 68 ++++++++++++++++++++++++++++------------- src/input/ffmpeg_src.rs | 4 +-- src/input/ytdl_src.rs | 8 +++-- 4 files changed, 53 insertions(+), 32 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 13762b9fd..e2319b631 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -52,10 +52,6 @@ version = "0.10" [dependencies.futures] version = "0.3" -[dependencies.nix] -version = "0.19" -optional = true - [dependencies.parking_lot] optional = true version = "0.11" @@ -137,7 +133,6 @@ driver = [ "byteorder", "discortp", "flume", - "nix", "parking_lot", "rand", "serenity-voice-model", diff --git a/src/input/child.rs b/src/input/child.rs index 6cede9e53..565f4f508 100644 --- a/src/input/child.rs +++ b/src/input/child.rs @@ -1,55 +1,79 @@ use super::*; use std::{ io::{BufReader, Read}, + mem, process::Child, }; +use tokio::runtime::Handle; use tracing::debug; -#[cfg(unix)] -use nix::{ - sys::signal::{self, Signal}, - unistd::Pid, -}; - /// Handle for a child process which ensures that any subprocesses are properly closed /// on drop. +/// +/// # Warning +/// To allow proper cleanup of child processes, if you create a process chain you must +/// make sure to use `From>`. Here, the *last* process in the `Vec` will be +/// used as the audio byte source. #[derive(Debug)] -pub struct ChildContainer(Child); +pub struct ChildContainer(Vec); -pub(crate) fn child_to_reader(child: Child) -> Reader { +pub(crate) fn children_to_reader(children: Vec) -> Reader { Reader::Pipe(BufReader::with_capacity( STEREO_FRAME_SIZE * mem::size_of::() * CHILD_BUFFER_LEN, - ChildContainer(child), + ChildContainer(children), )) } impl From for Reader { fn from(container: Child) -> Self { - child_to_reader::(container) + children_to_reader::(vec![container]) + } +} + +impl From> for Reader { + fn from(container: Vec) -> Self { + children_to_reader::(container) } } impl Read for ChildContainer { fn read(&mut self, buffer: &mut [u8]) -> IoResult { - self.0.stdout.as_mut().unwrap().read(buffer) + match self.0.last_mut() { + Some(ref mut child) => child.stdout.as_mut().unwrap().read(buffer), + None => Ok(0), + } } } impl Drop for ChildContainer { fn drop(&mut self) { - #[cfg(not(unix))] - let attempt = self.0.kill(); + let children = mem::take(&mut self.0); - #[cfg(unix)] - let attempt = { - let pid = Pid::from_raw(self.0.id() as i32); - let _ = signal::kill(pid, Signal::SIGINT); + if let Ok(handle) = Handle::try_current() { + handle.spawn_blocking(move || { + cleanup_child_processes(children); + }); + } else { + cleanup_child_processes(children); + } + } +} - self.0.wait() - }; +fn cleanup_child_processes(mut children: Vec) { + let attempt = if let Some(child) = children.last_mut() { + child.kill() + } else { + return; + }; - if let Err(e) = attempt { - debug!("Error awaiting child process: {:?}", e); - } + let attempt = attempt.and_then(|_| { + children + .iter_mut() + .rev() + .try_for_each(|child| child.wait().map(|_| ())) + }); + + if let Err(e) = attempt { + debug!("Error awaiting child process: {:?}", e); } } diff --git a/src/input/ffmpeg_src.rs b/src/input/ffmpeg_src.rs index 928d21033..a8e73caa3 100644 --- a/src/input/ffmpeg_src.rs +++ b/src/input/ffmpeg_src.rs @@ -1,5 +1,5 @@ use super::{ - child_to_reader, + children_to_reader, error::{Error, Result}, Codec, Container, @@ -115,7 +115,7 @@ pub(crate) async fn _ffmpeg_optioned( Ok(Input::new( is_stereo, - child_to_reader::(command), + children_to_reader::(vec![command]), Codec::FloatPcm, Container::Raw, Some(metadata), diff --git a/src/input/ytdl_src.rs b/src/input/ytdl_src.rs index 0ced1318b..4464a8925 100644 --- a/src/input/ytdl_src.rs +++ b/src/input/ytdl_src.rs @@ -1,5 +1,5 @@ use super::{ - child_to_reader, + children_to_reader, error::{Error, Result}, Codec, Container, @@ -92,12 +92,14 @@ pub(crate) async fn _ytdl(uri: &str, pre_args: &[&str]) -> Result { youtube_dl.stderr = Some(returned_stderr); + let taken_stdout = youtube_dl.stdout.take().ok_or(Error::Stdout)?; + let ffmpeg = Command::new("ffmpeg") .args(pre_args) .arg("-i") .arg("-") .args(&ffmpeg_args) - .stdin(youtube_dl.stdout.ok_or(Error::Stdout)?) + .stdin(taken_stdout) .stderr(Stdio::null()) .stdout(Stdio::piped()) .spawn()?; @@ -108,7 +110,7 @@ pub(crate) async fn _ytdl(uri: &str, pre_args: &[&str]) -> Result { Ok(Input::new( true, - child_to_reader::(ffmpeg), + children_to_reader::(vec![youtube_dl, ffmpeg]), Codec::FloatPcm, Container::Raw, Some(metadata),