From f9028cd38c2cb74aa0c322cf703e0b1be2da978b Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Sun, 15 Nov 2020 23:25:37 +0000 Subject: [PATCH] Track Queue extension * Adds a uuid field to tracks and handles to make it easier to identify and match event sources after the fact. * Adds optional feature "builtin-queue" to expose a queue on every driver, as a convenience for users who can guarantee they'll need a queue for every driver/call. * Adds methods to queues to allow access to the currently running track handle, remove a specified queue entry, as well as to mutate the underlying queue from a closure. --- Cargo.toml | 10 +- .../serenity/voice_events_queue/Cargo.toml | 1 + .../serenity/voice_events_queue/src/main.rs | 305 ++++++++++++------ src/driver/mod.rs | 47 ++- src/tracks/handle.rs | 10 +- src/tracks/mod.rs | 18 +- src/tracks/queue.rs | 115 +++++-- 7 files changed, 387 insertions(+), 119 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9755f2757..de24fdd15 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -94,6 +94,11 @@ default-features = false optional = true version = "2" +[dependencies.uuid] +optional = true +version = "0.8" +features = ["v4"] + [dependencies.xsalsa20poly1305] optional = true version = "0.6" @@ -135,9 +140,9 @@ driver = [ "tokio/sync", "tokio/time", "url", + "uuid", "xsalsa20poly1305", ] -youtube-dlc = [] rustls = ["async-tungstenite/tokio-rustls"] native = ["async-tungstenite/tokio-native-tls"] serenity-rustls = ["serenity/rustls_backend", "rustls", "gateway", "serenity-deps"] @@ -149,6 +154,9 @@ simd-zlib = ["twilight-gateway/simd-zlib"] stock-zlib = ["twilight-gateway/stock-zlib"] serenity-deps = ["async-trait"] +youtube-dlc = [] +builtin-queue = [] + [[bench]] name = "mixing" path = "benches/mixing.rs" diff --git a/examples/serenity/voice_events_queue/Cargo.toml b/examples/serenity/voice_events_queue/Cargo.toml index d095e64a6..341063044 100644 --- a/examples/serenity/voice_events_queue/Cargo.toml +++ b/examples/serenity/voice_events_queue/Cargo.toml @@ -10,6 +10,7 @@ tracing-subscriber = "0.2" tracing-futures = "0.2" [dependencies.songbird] +features = ["builtin-queue"] path = "../../../" [dependencies.serenity] diff --git a/examples/serenity/voice_events_queue/src/main.rs b/examples/serenity/voice_events_queue/src/main.rs index dcd1fb3d8..0939fc7e0 100644 --- a/examples/serenity/voice_events_queue/src/main.rs +++ b/examples/serenity/voice_events_queue/src/main.rs @@ -10,35 +10,32 @@ //! features = ["cache", "framework", "standard_framework", "voice"] //! ``` use std::{ - collections::HashMap, env, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, time::Duration, - sync::{atomic::{AtomicUsize, Ordering}, Arc} }; use serenity::{ async_trait, client::{Client, Context, EventHandler}, - http::Http, framework::{ - StandardFramework, standard::{ - Args, CommandResult, macros::{command, group}, + Args, + CommandResult, }, + StandardFramework, }, - model::{ - channel::Message, - gateway::Ready, - misc::Mentionable, - prelude::{ChannelId, GuildId}, - }, + http::Http, + model::{channel::Message, gateway::Ready, misc::Mentionable, prelude::ChannelId}, Result as SerenityResult, }; use songbird::{ input, - tracks::TrackQueue, Event, EventContext, EventHandler as VoiceEventHandler, @@ -46,15 +43,6 @@ use songbird::{ TrackEvent, }; -// This imports `typemap`'s `Key` as `TypeMapKey`. -use serenity::prelude::*; - -struct VoiceQueueManager; - -impl TypeMapKey for VoiceQueueManager { - type Value = Arc>>; -} - struct Handler; #[async_trait] @@ -65,7 +53,9 @@ impl EventHandler for Handler { } #[group] -#[commands(deafen, join, leave, mute, play_fade, queue, skip, stop, ping, undeafen, unmute)] +#[commands( + deafen, join, leave, mute, play_fade, queue, skip, stop, ping, undeafen, unmute +)] struct General; #[tokio::main] @@ -73,12 +63,10 @@ async fn main() { tracing_subscriber::fmt::init(); // Configure the client with your Discord bot token in the environment. - let token = env::var("DISCORD_TOKEN") - .expect("Expected a token in the environment"); + let token = env::var("DISCORD_TOKEN").expect("Expected a token in the environment"); let framework = StandardFramework::new() - .configure(|c| c - .prefix("~")) + .configure(|c| c.prefix("~")) .group(&GENERAL_GROUP); let mut client = Client::builder(&token) @@ -88,15 +76,10 @@ async fn main() { .await .expect("Err creating client"); - // Obtain a lock to the data owned by the client, and insert the client's - // voice manager into it. This allows the voice manager to be accessible by - // event handlers and framework commands. - { - let mut data = client.data.write().await; - data.insert::(Arc::new(Mutex::new(HashMap::new()))); - } - - let _ = client.start().await.map_err(|why| println!("Client ended: {:?}", why)); + let _ = client + .start() + .await + .map_err(|why| println!("Client ended: {:?}", why)); } #[command] @@ -104,8 +87,10 @@ async fn deafen(ctx: &Context, msg: &Message) -> CommandResult { let guild = msg.guild(&ctx.cache).await.unwrap(); let guild_id = guild.id; - let manager = songbird::get(ctx).await - .expect("Songbird Voice client placed in at initialisation.").clone(); + let manager = songbird::get(ctx) + .await + .expect("Songbird Voice client placed in at initialisation.") + .clone(); let handler_lock = match manager.get(guild_id) { Some(handler) => handler, @@ -122,7 +107,11 @@ async fn deafen(ctx: &Context, msg: &Message) -> CommandResult { check_msg(msg.channel_id.say(&ctx.http, "Already deafened").await); } else { if let Err(e) = handler.deafen(true).await { - check_msg(msg.channel_id.say(&ctx.http, format!("Failed: {:?}", e)).await); + check_msg( + msg.channel_id + .say(&ctx.http, format!("Failed: {:?}", e)) + .await, + ); } check_msg(msg.channel_id.say(&ctx.http, "Deafened").await); @@ -138,26 +127,32 @@ async fn join(ctx: &Context, msg: &Message) -> CommandResult { let guild_id = guild.id; let channel_id = guild - .voice_states.get(&msg.author.id) + .voice_states + .get(&msg.author.id) .and_then(|voice_state| voice_state.channel_id); - let connect_to = match channel_id { Some(channel) => channel, None => { check_msg(msg.reply(ctx, "Not in a voice channel").await); return Ok(()); - } + }, }; - let manager = songbird::get(ctx).await - .expect("Songbird Voice client placed in at initialisation.").clone(); + let manager = songbird::get(ctx) + .await + .expect("Songbird Voice client placed in at initialisation.") + .clone(); let (handle_lock, success) = manager.join(guild_id, connect_to).await; if let Ok(_channel) = success { - check_msg(msg.channel_id.say(&ctx.http, &format!("Joined {}", connect_to.mention())).await); + check_msg( + msg.channel_id + .say(&ctx.http, &format!("Joined {}", connect_to.mention())) + .await, + ); let chan_id = msg.channel_id; @@ -167,17 +162,28 @@ async fn join(ctx: &Context, msg: &Message) -> CommandResult { handle.add_global_event( Event::Track(TrackEvent::End), - TrackEndNotifier { chan_id, http: send_http }, + TrackEndNotifier { + chan_id, + http: send_http, + }, ); let send_http = ctx.http.clone(); handle.add_global_event( Event::Periodic(Duration::from_secs(60), None), - ChannelDurationNotifier { chan_id, count: Default::default(), http: send_http }, + ChannelDurationNotifier { + chan_id, + count: Default::default(), + http: send_http, + }, ); } else { - check_msg(msg.channel_id.say(&ctx.http, "Error joining the channel").await); + check_msg( + msg.channel_id + .say(&ctx.http, "Error joining the channel") + .await, + ); } Ok(()) @@ -192,7 +198,11 @@ struct TrackEndNotifier { impl VoiceEventHandler for TrackEndNotifier { async fn act(&self, ctx: &EventContext<'_>) -> Option { if let EventContext::Track(track_list) = ctx { - check_msg(self.chan_id.say(&self.http, &format!("Tracks ended: {}.", track_list.len())).await); + check_msg( + self.chan_id + .say(&self.http, &format!("Tracks ended: {}.", track_list.len())) + .await, + ); } None @@ -209,7 +219,17 @@ struct ChannelDurationNotifier { impl VoiceEventHandler for ChannelDurationNotifier { async fn act(&self, _ctx: &EventContext<'_>) -> Option { let count_before = self.count.fetch_add(1, Ordering::Relaxed); - check_msg(self.chan_id.say(&self.http, &format!("I've been in this channel for {} minutes!", count_before + 1)).await); + check_msg( + self.chan_id + .say( + &self.http, + &format!( + "I've been in this channel for {} minutes!", + count_before + 1 + ), + ) + .await, + ); None } @@ -221,13 +241,19 @@ async fn leave(ctx: &Context, msg: &Message) -> CommandResult { let guild = msg.guild(&ctx.cache).await.unwrap(); let guild_id = guild.id; - let manager = songbird::get(ctx).await - .expect("Songbird Voice client placed in at initialisation.").clone(); + let manager = songbird::get(ctx) + .await + .expect("Songbird Voice client placed in at initialisation.") + .clone(); let has_handler = manager.get(guild_id).is_some(); if has_handler { if let Err(e) = manager.remove(guild_id).await { - check_msg(msg.channel_id.say(&ctx.http, format!("Failed: {:?}", e)).await); + check_msg( + msg.channel_id + .say(&ctx.http, format!("Failed: {:?}", e)) + .await, + ); } check_msg(msg.channel_id.say(&ctx.http, "Left voice channel").await); @@ -244,8 +270,10 @@ async fn mute(ctx: &Context, msg: &Message) -> CommandResult { let guild = msg.guild(&ctx.cache).await.unwrap(); let guild_id = guild.id; - let manager = songbird::get(ctx).await - .expect("Songbird Voice client placed in at initialisation.").clone(); + let manager = songbird::get(ctx) + .await + .expect("Songbird Voice client placed in at initialisation.") + .clone(); let handler_lock = match manager.get(guild_id) { Some(handler) => handler, @@ -262,7 +290,11 @@ async fn mute(ctx: &Context, msg: &Message) -> CommandResult { check_msg(msg.channel_id.say(&ctx.http, "Already muted").await); } else { if let Err(e) = handler.mute(true).await { - check_msg(msg.channel_id.say(&ctx.http, format!("Failed: {:?}", e)).await); + check_msg( + msg.channel_id + .say(&ctx.http, format!("Failed: {:?}", e)) + .await, + ); } check_msg(msg.channel_id.say(&ctx.http, "Now muted").await); @@ -284,14 +316,22 @@ async fn play_fade(ctx: &Context, msg: &Message, mut args: Args) -> CommandResul let url = match args.single::() { Ok(url) => url, Err(_) => { - check_msg(msg.channel_id.say(&ctx.http, "Must provide a URL to a video or audio").await); + check_msg( + msg.channel_id + .say(&ctx.http, "Must provide a URL to a video or audio") + .await, + ); return Ok(()); }, }; if !url.starts_with("http") { - check_msg(msg.channel_id.say(&ctx.http, "Must provide a valid URL").await); + check_msg( + msg.channel_id + .say(&ctx.http, "Must provide a valid URL") + .await, + ); return Ok(()); } @@ -299,8 +339,10 @@ async fn play_fade(ctx: &Context, msg: &Message, mut args: Args) -> CommandResul let guild = msg.guild(&ctx.cache).await.unwrap(); let guild_id = guild.id; - let manager = songbird::get(ctx).await - .expect("Songbird Voice client placed in at initialisation.").clone(); + let manager = songbird::get(ctx) + .await + .expect("Songbird Voice client placed in at initialisation.") + .clone(); if let Some(handler_lock) = manager.get(guild_id) { let mut handler = handler_lock.lock().await; @@ -318,7 +360,7 @@ async fn play_fade(ctx: &Context, msg: &Message, mut args: Args) -> CommandResul // This handler object will allow you to, as needed, // control the audio track via events and further commands. - let song = handler.play_source(source.into()); + let song = handler.play_source(source); let send_http = ctx.http.clone(); let chan_id = msg.channel_id; @@ -326,21 +368,31 @@ async fn play_fade(ctx: &Context, msg: &Message, mut args: Args) -> CommandResul // periodically make a track quieter until it can be no longer heard. let _ = song.add_event( Event::Periodic(Duration::from_secs(5), Some(Duration::from_secs(7))), - SongFader { chan_id, http: send_http }, + SongFader { + chan_id, + http: send_http, + }, ); let send_http = ctx.http.clone(); - + // This shows how to fire an event once an audio track completes, // either due to hitting the end of the bytestream or stopped by user code. let _ = song.add_event( Event::Track(TrackEvent::End), - SongEndNotifier { chan_id, http: send_http }, + SongEndNotifier { + chan_id, + http: send_http, + }, ); check_msg(msg.channel_id.say(&ctx.http, "Playing song").await); } else { - check_msg(msg.channel_id.say(&ctx.http, "Not in a voice channel to play in").await); + check_msg( + msg.channel_id + .say(&ctx.http, "Not in a voice channel to play in") + .await, + ); } Ok(()) @@ -379,7 +431,11 @@ struct SongEndNotifier { #[async_trait] impl VoiceEventHandler for SongEndNotifier { async fn act(&self, _ctx: &EventContext<'_>) -> Option { - check_msg(self.chan_id.say(&self.http, "Song faded out completely!").await); + check_msg( + self.chan_id + .say(&self.http, "Song faded out completely!") + .await, + ); None } @@ -391,14 +447,22 @@ async fn queue(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult { let url = match args.single::() { Ok(url) => url, Err(_) => { - check_msg(msg.channel_id.say(&ctx.http, "Must provide a URL to a video or audio").await); + check_msg( + msg.channel_id + .say(&ctx.http, "Must provide a URL to a video or audio") + .await, + ); return Ok(()); }, }; if !url.starts_with("http") { - check_msg(msg.channel_id.say(&ctx.http, "Must provide a valid URL").await); + check_msg( + msg.channel_id + .say(&ctx.http, "Must provide a valid URL") + .await, + ); return Ok(()); } @@ -406,10 +470,10 @@ async fn queue(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult { let guild = msg.guild(&ctx.cache).await.unwrap(); let guild_id = guild.id; - let manager = songbird::get(ctx).await - .expect("Songbird Voice client placed in at initialisation.").clone(); - let queues_lock = ctx.data.read().await.get::().cloned().expect("Expected VoiceQueueManager in ShareMap."); - let mut track_queues = queues_lock.lock().await; + let manager = songbird::get(ctx) + .await + .expect("Songbird Voice client placed in at initialisation.") + .clone(); if let Some(handler_lock) = manager.get(guild_id) { let mut handler = handler_lock.lock().await; @@ -424,16 +488,22 @@ async fn queue(ctx: &Context, msg: &Message, mut args: Args) -> CommandResult { }, }; - // We need to ensure that this guild has a TrackQueue created for it. - let queue = track_queues.entry(guild_id) - .or_default(); - - // Queueing a track is this easy! - queue.add_source(source, &mut handler); + handler.enqueue_source(source); - check_msg(msg.channel_id.say(&ctx.http, format!("Added song to queue: position {}", queue.len())).await); + check_msg( + msg.channel_id + .say( + &ctx.http, + format!("Added song to queue: position {}", handler.queue().len()), + ) + .await, + ); } else { - check_msg(msg.channel_id.say(&ctx.http, "Not in a voice channel to play in").await); + check_msg( + msg.channel_id + .say(&ctx.http, "Not in a voice channel to play in") + .await, + ); } Ok(()) @@ -445,15 +515,30 @@ async fn skip(ctx: &Context, msg: &Message, _args: Args) -> CommandResult { let guild = msg.guild(&ctx.cache).await.unwrap(); let guild_id = guild.id; - let queues_lock = ctx.data.read().await.get::().cloned().expect("Expected VoiceQueueManager in ShareMap."); - let mut track_queues = queues_lock.lock().await; + let manager = songbird::get(ctx) + .await + .expect("Songbird Voice client placed in at initialisation.") + .clone(); - if let Some(queue) = track_queues.get_mut(&guild_id) { + if let Some(handler_lock) = manager.get(guild_id) { + let handler = handler_lock.lock().await; + let queue = handler.queue(); let _ = queue.skip(); - check_msg(msg.channel_id.say(&ctx.http, format!("Song skipped: {} in queue.", queue.len())).await); + check_msg( + msg.channel_id + .say( + &ctx.http, + format!("Song skipped: {} in queue.", queue.len()), + ) + .await, + ); } else { - check_msg(msg.channel_id.say(&ctx.http, "Not in a voice channel to play in").await); + check_msg( + msg.channel_id + .say(&ctx.http, "Not in a voice channel to play in") + .await, + ); } Ok(()) @@ -465,15 +550,23 @@ async fn stop(ctx: &Context, msg: &Message, _args: Args) -> CommandResult { let guild = msg.guild(&ctx.cache).await.unwrap(); let guild_id = guild.id; - let queues_lock = ctx.data.read().await.get::().cloned().expect("Expected VoiceQueueManager in ShareMap."); - let mut track_queues = queues_lock.lock().await; + let manager = songbird::get(ctx) + .await + .expect("Songbird Voice client placed in at initialisation.") + .clone(); - if let Some(queue) = track_queues.get_mut(&guild_id) { + if let Some(handler_lock) = manager.get(guild_id) { + let handler = handler_lock.lock().await; + let queue = handler.queue(); let _ = queue.stop(); check_msg(msg.channel_id.say(&ctx.http, "Queue cleared.").await); } else { - check_msg(msg.channel_id.say(&ctx.http, "Not in a voice channel to play in").await); + check_msg( + msg.channel_id + .say(&ctx.http, "Not in a voice channel to play in") + .await, + ); } Ok(()) @@ -485,18 +578,28 @@ async fn undeafen(ctx: &Context, msg: &Message) -> CommandResult { let guild = msg.guild(&ctx.cache).await.unwrap(); let guild_id = guild.id; - let manager = songbird::get(ctx).await - .expect("Songbird Voice client placed in at initialisation.").clone(); + let manager = songbird::get(ctx) + .await + .expect("Songbird Voice client placed in at initialisation.") + .clone(); if let Some(handler_lock) = manager.get(guild_id) { let mut handler = handler_lock.lock().await; if let Err(e) = handler.deafen(false).await { - check_msg(msg.channel_id.say(&ctx.http, format!("Failed: {:?}", e)).await); + check_msg( + msg.channel_id + .say(&ctx.http, format!("Failed: {:?}", e)) + .await, + ); } check_msg(msg.channel_id.say(&ctx.http, "Undeafened").await); } else { - check_msg(msg.channel_id.say(&ctx.http, "Not in a voice channel to undeafen in").await); + check_msg( + msg.channel_id + .say(&ctx.http, "Not in a voice channel to undeafen in") + .await, + ); } Ok(()) @@ -507,18 +610,28 @@ async fn undeafen(ctx: &Context, msg: &Message) -> CommandResult { async fn unmute(ctx: &Context, msg: &Message) -> CommandResult { let guild = msg.guild(&ctx.cache).await.unwrap(); let guild_id = guild.id; - let manager = songbird::get(ctx).await - .expect("Songbird Voice client placed in at initialisation.").clone(); + let manager = songbird::get(ctx) + .await + .expect("Songbird Voice client placed in at initialisation.") + .clone(); if let Some(handler_lock) = manager.get(guild_id) { let mut handler = handler_lock.lock().await; if let Err(e) = handler.mute(false).await { - check_msg(msg.channel_id.say(&ctx.http, format!("Failed: {:?}", e)).await); + check_msg( + msg.channel_id + .say(&ctx.http, format!("Failed: {:?}", e)) + .await, + ); } check_msg(msg.channel_id.say(&ctx.http, "Unmuted").await); } else { - check_msg(msg.channel_id.say(&ctx.http, "Not in a voice channel to unmute in").await); + check_msg( + msg.channel_id + .say(&ctx.http, "Not in a voice channel to unmute in") + .await, + ); } Ok(()) diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 792083467..33910fc37 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -19,10 +19,12 @@ use connection::error::Result; pub use crypto::*; pub use decode_mode::DecodeMode; +#[cfg(feature = "builtin-queue")] +use crate::tracks::TrackQueue; use crate::{ events::EventData, input::Input, - tracks::{Track, TrackHandle}, + tracks::{self, Track, TrackHandle}, ConnectionInfo, Event, EventHandler, @@ -34,11 +36,16 @@ use tracing::instrument; /// The control object for a Discord voice connection, handling connection, /// mixing, encoding, en/decryption, and event generation. +/// +/// When compiled with the `"builtin-queue"` feature, each driver includes a track queue +/// as a convenience to prevent the additional overhead of per-guild state management. #[derive(Clone, Debug)] pub struct Driver { config: Config, self_mute: bool, sender: Sender, + #[cfg(feature = "builtin-queue")] + queue: TrackQueue, } impl Driver { @@ -53,6 +60,8 @@ impl Driver { config, self_mute: false, sender, + #[cfg(feature = "builtin-queue")] + queue: Default::default(), } } @@ -226,6 +235,42 @@ impl Driver { } } +#[cfg(feature = "builtin-queue")] +impl Driver { + /// Returns a reference to this driver's built-in queue. + /// + /// Requires the `"builtin-queue"` feature. + /// Queue additions should be made via [`enqueue`] and + /// [`enqueue_source`]. + /// + /// [`enqueue`]: #method.enqueue + /// [`enqueue_source`]: #method.enqueue_source + pub fn queue(&self) -> &TrackQueue { + &self.queue + } + + /// Adds an audio [`Input`] to this driver's built-in queue. + /// + /// Requires the `"builtin-queue"` feature. + /// + /// [`Input`]: ../input/struct.input.html + pub fn enqueue_source(&mut self, source: Input) { + let (mut track, _) = tracks::create_player(source); + self.queue.add_raw(&mut track); + self.play(track); + } + + /// Adds an existing [`Track`] to this driver's built-in queue. + /// + /// Requires the `"builtin-queue"` feature. + /// + /// [`Track`]: ../tracks/struct.track.html + pub fn enqueue(&mut self, mut track: Track) { + self.queue.add_raw(&mut track); + self.play(track); + } +} + impl Default for Driver { fn default() -> Self { Self::new(Default::default()) diff --git a/src/tracks/handle.rs b/src/tracks/handle.rs index effa703fa..c43d9c108 100644 --- a/src/tracks/handle.rs +++ b/src/tracks/handle.rs @@ -5,6 +5,7 @@ use tokio::sync::{ mpsc::{error::SendError, UnboundedSender}, oneshot, }; +use uuid::Uuid; #[derive(Clone, Debug)] /// Handle for safe control of a [`Track`] track from other threads, outside @@ -18,6 +19,7 @@ use tokio::sync::{ pub struct TrackHandle { command_channel: UnboundedSender, seekable: bool, + uuid: Uuid, } impl TrackHandle { @@ -25,10 +27,11 @@ impl TrackHandle { /// the underlying [`Input`] supports seek operations. /// /// [`Input`]: ../input/struct.Input.html - pub fn new(command_channel: UnboundedSender, seekable: bool) -> Self { + pub fn new(command_channel: UnboundedSender, seekable: bool, uuid: Uuid) -> Self { Self { command_channel, seekable, + uuid, } } @@ -149,6 +152,11 @@ impl TrackHandle { } } + /// Returns this handle's (and track's) unique identifier. + pub fn uuid(&self) -> Uuid { + self.uuid + } + #[inline] /// Send a raw command to the [`Track`] object. /// diff --git a/src/tracks/mod.rs b/src/tracks/mod.rs index d60f867ef..7beec4289 100644 --- a/src/tracks/mod.rs +++ b/src/tracks/mod.rs @@ -33,6 +33,7 @@ use tokio::sync::{ }, oneshot::Receiver as OneshotReceiver, }; +use uuid::Uuid; /// Control object for audio playback. /// @@ -117,6 +118,9 @@ pub struct Track { /// Count of remaining loops. pub loops: LoopState, + + /// Unique identifier for this track. + pub(crate) uuid: Uuid, } impl Track { @@ -131,6 +135,8 @@ impl Track { commands: UnboundedReceiver, handle: TrackHandle, ) -> Self { + let uuid = handle.uuid(); + Self { playing: Default::default(), volume: 1.0, @@ -141,6 +147,7 @@ impl Track { commands, handle, loops: LoopState::Finite(0), + uuid, } } @@ -341,6 +348,11 @@ impl Track { out } + + /// Returns this track's unique identifier. + pub fn uuid(&self) -> Uuid { + self.uuid + } } /// Creates a [`Track`] object to pass into the audio context, and a [`TrackHandle`] @@ -354,9 +366,11 @@ impl Track { pub fn create_player(source: Input) -> (Track, TrackHandle) { let (tx, rx) = mpsc::unbounded_channel(); let can_seek = source.is_seekable(); - let player = Track::new_raw(source, rx, TrackHandle::new(tx.clone(), can_seek)); + let handle = TrackHandle::new(tx, can_seek, Uuid::new_v4()); + + let player = Track::new_raw(source, rx, handle.clone()); - (player, TrackHandle::new(tx, can_seek)) + (player, handle) } /// Alias for most result-free calls to a [`TrackHandle`]. diff --git a/src/tracks/queue.rs b/src/tracks/queue.rs index 7f0b30468..bbfab65e8 100644 --- a/src/tracks/queue.rs +++ b/src/tracks/queue.rs @@ -6,16 +6,18 @@ use crate::{ }; use async_trait::async_trait; use parking_lot::Mutex; -use std::{collections::VecDeque, sync::Arc}; +use std::{collections::VecDeque, ops::Deref, sync::Arc}; use tracing::{info, warn}; -#[derive(Default)] /// A simple queue for several audio sources, designed to /// play in sequence. /// /// This makes use of [`TrackEvent`]s to determine when the current /// song or audio file has finished before playing the next entry. /// +/// One of these is automatically included via [`Driver::queue`] when +/// the `"builtin-queue"` feature is enabled. +/// /// `examples/serenity/voice_events_queue` demonstrates how a user might manage, /// track and use this to run a song queue in many guilds in parallel. /// This code is trivial to extend if extra functionality is needed. @@ -50,15 +52,37 @@ use tracing::{info, warn}; /// queue.add_source(source, &mut driver); /// # }; /// ``` - /// /// [`TrackEvent`]: ../events/enum.TrackEvent.html +/// [`Driver::queue`]: ../driver/struct.Driver.html#method.queue +#[derive(Clone, Debug, Default)] pub struct TrackQueue { // NOTE: the choice of a parking lot mutex is quite deliberate inner: Arc>, } -#[derive(Default)] +/// Reference to a track which is known to be part of a queue. +/// +/// Instances *should not* be moved from one queue to another. +#[derive(Debug)] +pub struct Queued(TrackHandle); + +impl Deref for Queued { + type Target = TrackHandle; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Queued { + /// Clones the inner handle + pub fn handle(&self) -> TrackHandle { + self.0.clone() + } +} + +#[derive(Debug, Default)] /// Inner portion of a [`TrackQueue`]. /// /// This abstracts away thread-safety from the user, @@ -66,7 +90,7 @@ pub struct TrackQueue { /// /// [`TrackQueue`]: struct.TrackQueue.html struct TrackQueueCore { - tracks: VecDeque, + tracks: VecDeque, } struct QueueHandler { @@ -77,13 +101,33 @@ struct QueueHandler { impl EventHandler for QueueHandler { async fn act(&self, ctx: &EventContext<'_>) -> Option { let mut inner = self.remote_lock.lock(); + + // Due to possibility that users might remove, reorder, + // or dequeue+stop tracks, we need to verify that the FIRST + // track is the one who has ended. + let front_ended = match ctx { + EventContext::Track(ts) => { + // This slice should have exactly one entry. + // If the ended track has same id as the queue head, then + // we can progress the queue. + let queue_uuid = inner.tracks.front().map(|handle| handle.uuid()); + let ended_uuid = ts.first().map(|handle| handle.1.uuid()); + + queue_uuid.is_some() && queue_uuid == ended_uuid + }, + _ => false, + }; + + if !front_ended { + return None; + } + let _old = inner.tracks.pop_front(); info!("Queued track ended: {:?}.", ctx); info!("{} tracks remain.", inner.tracks.len()); - // If any audio files die unexpectedly, then keep going until we - // find one which works, or we run out. + // Keep going until we find one track which works, or we run out. let mut keep_looking = true; while keep_looking && !inner.tracks.is_empty() { if let Some(new) = inner.tracks.front() { @@ -113,8 +157,8 @@ impl TrackQueue { /// Adds an audio source to the queue, to be played in the channel managed by `handler`. pub fn add_source(&self, source: Input, handler: &mut Driver) { - let (audio, audio_handle) = tracks::create_player(source); - self.add(audio, audio_handle, handler); + let (audio, _) = tracks::create_player(source); + self.add(audio, handler); } /// Adds a [`Track`] object to the queue, to be played in the channel managed by `handler`. @@ -124,11 +168,19 @@ impl TrackQueue { /// /// [`Track`]: struct.Track.html /// [`voice::create_player`]: fn.create_player.html - pub fn add(&self, mut track: Track, track_handle: TrackHandle, handler: &mut Driver) { + pub fn add(&self, mut track: Track, handler: &mut Driver) { + self.add_raw(&mut track); + handler.play(track); + } + + #[inline] + pub(crate) fn add_raw(&self, track: &mut Track) { info!("Track added to queue."); let remote_lock = self.inner.clone(); let mut inner = self.inner.lock(); + let track_handle = track.handle.clone(); + if !inner.tracks.is_empty() { track.pause(); } @@ -142,8 +194,23 @@ impl TrackQueue { track.position, ); - handler.play(track); - inner.tracks.push_back(track_handle); + inner.tracks.push_back(Queued(track_handle)); + } + + /// Returns a handle to the currently playing track. + pub fn current(&self) -> Option { + let inner = self.inner.lock(); + + inner.tracks.front().map(|h| h.handle()) + } + + /// Attempts to remove a track from the specified index. + /// + /// The returned entry can be readded to *this* queue via [`modify_queue`]. + /// + /// [`modify_queue`]: #method.modify_queue + pub fn dequeue(&self, index: usize) -> Option { + self.modify_queue(|vq| vq.remove(index)) } /// Returns the number of tracks currently in the queue. @@ -160,6 +227,18 @@ impl TrackQueue { inner.tracks.is_empty() } + /// Allows modification of the inner queue (i.e., deletion, reordering). + /// + /// Users must be careful to `stop` removed tracks, so as to prevent + /// resource leaks. + pub fn modify_queue(&self, func: F) -> O + where + F: FnOnce(&mut VecDeque) -> O, + { + let mut inner = self.inner.lock(); + func(&mut inner.tracks) + } + /// Pause the track at the head of the queue. pub fn pause(&self) -> TrackResult { let inner = self.inner.lock(); @@ -183,14 +262,14 @@ impl TrackQueue { } /// Stop the currently playing track, and clears the queue. - pub fn stop(&self) -> TrackResult { + pub fn stop(&self) { let mut inner = self.inner.lock(); - let out = inner.stop_current(); - - inner.tracks.clear(); - - out + for track in inner.tracks.drain(..) { + // Errors when removing tracks don't really make + // a difference: an error just implies it's already gone. + let _ = track.stop(); + } } /// Skip to the next track in the queue, if it exists.