From 2da5901930ea9a97a5101738acacbe4a7570ec55 Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Wed, 18 Nov 2020 20:48:34 +0000 Subject: [PATCH] Input: Make restartable sources fully async. (#15) Redresses a previous holdover from an attempt to get Restartable sources to work more neatly inside the synchronous mixer thread. This prevents `Restartable::*` from blocking without warning. The initial fix at the time was to perform the restart work on a task provided by the tokio runtime as `executor::block_on` needs to be run from within a valid async runtime. Naturally, this completely missed the point that these closures should/could be async, without any need to fudge async functions into a sync wrapper. Also removes the `From` for normal closures, as this will probably act as a footgun for folks on a single-threaded executor. --- examples/twilight/src/main.rs | 2 +- src/input/restartable.rs | 122 ++++++++++++++++++---------------- 2 files changed, 65 insertions(+), 59 deletions(-) diff --git a/examples/twilight/src/main.rs b/examples/twilight/src/main.rs index 436450332..eb511d268 100644 --- a/examples/twilight/src/main.rs +++ b/examples/twilight/src/main.rs @@ -184,7 +184,7 @@ async fn play(msg: Message, state: State) -> Result<(), Box Result { - recreator.call_restart(None).map(move |source| Self { + pub async fn new(mut recreator: impl Restart + Send + 'static) -> Result { + recreator.call_restart(None).await.map(move |source| Self { async_handle: None, awaiting_source: None, position: 0, @@ -61,32 +61,24 @@ impl Restartable { } /// Create a new restartable ffmpeg source for a local file. - pub fn ffmpeg + Send + Clone + 'static>(path: P) -> Result { - Self::new(FfmpegRestarter { path }) + pub async fn ffmpeg + Send + Clone + Sync + 'static>(path: P) -> Result { + Self::new(FfmpegRestarter { path }).await } /// Create a new restartable ytdl source. /// /// The cost of restarting and seeking will probably be *very* high: /// expect a pause if you seek backwards. - pub fn ytdl + Send + Clone + 'static>(uri: P) -> Result { - Self::new(move |time: Option| { - if let Some(time) = time { - let ts = format!("{}.{}", time.as_secs(), time.subsec_millis()); - - executor::block_on(_ytdl(uri.as_ref(), &["-ss", &ts])) - } else { - executor::block_on(ytdl(uri.as_ref())) - } - }) + pub async fn ytdl + Send + Clone + Sync + 'static>(uri: P) -> Result { + Self::new(YtdlRestarter { uri }).await } /// Create a new restartable ytdl source, using the first result of a youtube search. /// /// The cost of restarting and seeking will probably be *very* high: /// expect a pause if you seek backwards. - pub fn ytdl_search(name: &str) -> Result { - Self::ytdl(format!("ytsearch1:{}", name)) + pub async fn ytdl_search(name: &str) -> Result { + Self::ytdl(format!("ytsearch1:{}", name)).await } pub(crate) fn prep_with_handle(&mut self, handle: Handle) { @@ -97,64 +89,76 @@ impl Restartable { /// Trait used to create an instance of a [`Reader`] at instantiation and when /// a backwards seek is needed. /// -/// Many closures derive this automatically. -/// /// [`Reader`]: ../reader/enum.Reader.html +#[async_trait] pub trait Restart { /// Tries to create a replacement source. - fn call_restart(&mut self, time: Option) -> Result; + async fn call_restart(&mut self, time: Option) -> Result; } struct FfmpegRestarter

where - P: AsRef + Send, + P: AsRef + Send + Sync, { path: P, } +#[async_trait] impl

Restart for FfmpegRestarter

where - P: AsRef + Send, + P: AsRef + Send + Sync, { - fn call_restart(&mut self, time: Option) -> Result { - executor::block_on(async { - if let Some(time) = time { - let is_stereo = is_stereo(self.path.as_ref()) - .await - .unwrap_or_else(|_e| (false, Default::default())); - let stereo_val = if is_stereo.0 { "2" } else { "1" }; - - let ts = format!("{}.{}", time.as_secs(), time.subsec_millis()); - _ffmpeg_optioned( - self.path.as_ref(), - &["-ss", &ts], - &[ - "-f", - "s16le", - "-ac", - stereo_val, - "-ar", - "48000", - "-acodec", - "pcm_f32le", - "-", - ], - Some(is_stereo), - ) + async fn call_restart(&mut self, time: Option) -> Result { + if let Some(time) = time { + let is_stereo = is_stereo(self.path.as_ref()) .await - } else { - ffmpeg(self.path.as_ref()).await - } - }) + .unwrap_or_else(|_e| (false, Default::default())); + let stereo_val = if is_stereo.0 { "2" } else { "1" }; + + let ts = format!("{}.{}", time.as_secs(), time.subsec_millis()); + _ffmpeg_optioned( + self.path.as_ref(), + &["-ss", &ts], + &[ + "-f", + "s16le", + "-ac", + stereo_val, + "-ar", + "48000", + "-acodec", + "pcm_f32le", + "-", + ], + Some(is_stereo), + ) + .await + } else { + ffmpeg(self.path.as_ref()).await + } } } -impl

Restart for P +struct YtdlRestarter

+where + P: AsRef + Send + Sync, +{ + uri: P, +} + +#[async_trait] +impl

Restart for YtdlRestarter

where - P: FnMut(Option) -> Result + Send + 'static, + P: AsRef + Send + Sync, { - fn call_restart(&mut self, time: Option) -> Result { - (self)(time) + async fn call_restart(&mut self, time: Option) -> Result { + if let Some(time) = time { + let ts = format!("{}.{}", time.as_secs(), time.subsec_millis()); + + _ytdl(self.uri.as_ref(), &["-ss", &ts]).await + } else { + ytdl(self.uri.as_ref()).await + } } } @@ -258,9 +262,11 @@ impl Seek for Restartable { if let Some(mut rec) = recreator { handle.spawn(async move { - let ret_val = rec.call_restart(Some( - utils::byte_count_to_timestamp(offset, stereo), - )); + let ret_val = rec + .call_restart(Some(utils::byte_count_to_timestamp( + offset, stereo, + ))) + .await; let _ = tx.send(ret_val.map(Box::new).map(|v| (v, rec))); });