Skip to content

Commit

Permalink
Input: Make restartable sources fully async. (serenity-rs#15)
Browse files Browse the repository at this point in the history
Redresses a previous holdover from an attempt to get Restartable sources to work more neatly inside the synchronous mixer thread. This prevents `Restartable::*` from blocking without warning.

The initial fix at the time was to perform the restart work on a task provided by the tokio runtime as `executor::block_on` needs to be run from within a valid async runtime. Naturally, this completely missed the point that these closures should/could be async, without any need to fudge async functions into a sync wrapper.

Also removes the `From` for normal closures, as this will probably act as a footgun for folks on a single-threaded executor.
  • Loading branch information
FelixMcFelix authored Nov 18, 2020
1 parent cb7d8cc commit 2da5901
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 59 deletions.
2 changes: 1 addition & 1 deletion examples/twilight/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ async fn play(msg: Message, state: State) -> Result<(), Box<dyn Error + Send + S

let guild_id = msg.guild_id.unwrap();

if let Ok(song) = Restartable::ytdl(msg.content.clone()) {
if let Ok(song) = Restartable::ytdl(msg.content.clone()).await {
let input = Input::from(song);

let content = format!(
Expand Down
122 changes: 64 additions & 58 deletions src/input/restartable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
//! success/failure is confirmed, the track produces silence.
use super::*;
use async_trait::async_trait;
use flume::{Receiver, TryRecvError};
use futures::executor;
use std::{
ffi::OsStr,
fmt::{Debug, Error as FormatError, Formatter},
Expand Down Expand Up @@ -50,8 +50,8 @@ pub struct Restartable {

impl Restartable {
/// Create a new source, which can be restarted using a `recreator` function.
pub fn new(mut recreator: impl Restart + Send + 'static) -> Result<Self> {
recreator.call_restart(None).map(move |source| Self {
pub async fn new(mut recreator: impl Restart + Send + 'static) -> Result<Self> {
recreator.call_restart(None).await.map(move |source| Self {
async_handle: None,
awaiting_source: None,
position: 0,
Expand All @@ -61,32 +61,24 @@ impl Restartable {
}

/// Create a new restartable ffmpeg source for a local file.
pub fn ffmpeg<P: AsRef<OsStr> + Send + Clone + 'static>(path: P) -> Result<Self> {
Self::new(FfmpegRestarter { path })
pub async fn ffmpeg<P: AsRef<OsStr> + Send + Clone + Sync + 'static>(path: P) -> Result<Self> {
Self::new(FfmpegRestarter { path }).await
}

/// Create a new restartable ytdl source.
///
/// The cost of restarting and seeking will probably be *very* high:
/// expect a pause if you seek backwards.
pub fn ytdl<P: AsRef<str> + Send + Clone + 'static>(uri: P) -> Result<Self> {
Self::new(move |time: Option<Duration>| {
if let Some(time) = time {
let ts = format!("{}.{}", time.as_secs(), time.subsec_millis());

executor::block_on(_ytdl(uri.as_ref(), &["-ss", &ts]))
} else {
executor::block_on(ytdl(uri.as_ref()))
}
})
pub async fn ytdl<P: AsRef<str> + Send + Clone + Sync + 'static>(uri: P) -> Result<Self> {
Self::new(YtdlRestarter { uri }).await
}

/// Create a new restartable ytdl source, using the first result of a youtube search.
///
/// The cost of restarting and seeking will probably be *very* high:
/// expect a pause if you seek backwards.
pub fn ytdl_search(name: &str) -> Result<Self> {
Self::ytdl(format!("ytsearch1:{}", name))
pub async fn ytdl_search(name: &str) -> Result<Self> {
Self::ytdl(format!("ytsearch1:{}", name)).await
}

pub(crate) fn prep_with_handle(&mut self, handle: Handle) {
Expand All @@ -97,64 +89,76 @@ impl Restartable {
/// Trait used to create an instance of a [`Reader`] at instantiation and when
/// a backwards seek is needed.
///
/// Many closures derive this automatically.
///
/// [`Reader`]: ../reader/enum.Reader.html
#[async_trait]
pub trait Restart {
/// Tries to create a replacement source.
fn call_restart(&mut self, time: Option<Duration>) -> Result<Input>;
async fn call_restart(&mut self, time: Option<Duration>) -> Result<Input>;
}

struct FfmpegRestarter<P>
where
P: AsRef<OsStr> + Send,
P: AsRef<OsStr> + Send + Sync,
{
path: P,
}

#[async_trait]
impl<P> Restart for FfmpegRestarter<P>
where
P: AsRef<OsStr> + Send,
P: AsRef<OsStr> + Send + Sync,
{
fn call_restart(&mut self, time: Option<Duration>) -> Result<Input> {
executor::block_on(async {
if let Some(time) = time {
let is_stereo = is_stereo(self.path.as_ref())
.await
.unwrap_or_else(|_e| (false, Default::default()));
let stereo_val = if is_stereo.0 { "2" } else { "1" };

let ts = format!("{}.{}", time.as_secs(), time.subsec_millis());
_ffmpeg_optioned(
self.path.as_ref(),
&["-ss", &ts],
&[
"-f",
"s16le",
"-ac",
stereo_val,
"-ar",
"48000",
"-acodec",
"pcm_f32le",
"-",
],
Some(is_stereo),
)
async fn call_restart(&mut self, time: Option<Duration>) -> Result<Input> {
if let Some(time) = time {
let is_stereo = is_stereo(self.path.as_ref())
.await
} else {
ffmpeg(self.path.as_ref()).await
}
})
.unwrap_or_else(|_e| (false, Default::default()));
let stereo_val = if is_stereo.0 { "2" } else { "1" };

let ts = format!("{}.{}", time.as_secs(), time.subsec_millis());
_ffmpeg_optioned(
self.path.as_ref(),
&["-ss", &ts],
&[
"-f",
"s16le",
"-ac",
stereo_val,
"-ar",
"48000",
"-acodec",
"pcm_f32le",
"-",
],
Some(is_stereo),
)
.await
} else {
ffmpeg(self.path.as_ref()).await
}
}
}

impl<P> Restart for P
struct YtdlRestarter<P>
where
P: AsRef<str> + Send + Sync,
{
uri: P,
}

#[async_trait]
impl<P> Restart for YtdlRestarter<P>
where
P: FnMut(Option<Duration>) -> Result<Input> + Send + 'static,
P: AsRef<str> + Send + Sync,
{
fn call_restart(&mut self, time: Option<Duration>) -> Result<Input> {
(self)(time)
async fn call_restart(&mut self, time: Option<Duration>) -> Result<Input> {
if let Some(time) = time {
let ts = format!("{}.{}", time.as_secs(), time.subsec_millis());

_ytdl(self.uri.as_ref(), &["-ss", &ts]).await
} else {
ytdl(self.uri.as_ref()).await
}
}
}

Expand Down Expand Up @@ -258,9 +262,11 @@ impl Seek for Restartable {

if let Some(mut rec) = recreator {
handle.spawn(async move {
let ret_val = rec.call_restart(Some(
utils::byte_count_to_timestamp(offset, stereo),
));
let ret_val = rec
.call_restart(Some(utils::byte_count_to_timestamp(
offset, stereo,
)))
.await;

let _ = tx.send(ret_val.map(Box::new).map(|v| (v, rec)));
});
Expand Down

0 comments on commit 2da5901

Please sign in to comment.