Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Songbird: Tokio 1.0 #36

Merged
merged 1 commit into from
Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ version = "0.1"
default-features = false
features = ["tokio-runtime"]
optional = true
version = "0.9"
version = "0.11"

[dependencies.audiopus]
optional = true
Expand Down Expand Up @@ -58,13 +58,15 @@ version = "0.8"

[dependencies.serenity]
optional = true
#version = "0.10"
default-features = false
features = ["voice", "gateway"]
git = "https://github.com/serenity-rs/serenity"
branch = "current"

[dependencies.serenity-voice-model]
optional = true
#version = "0.10"
git = "https://github.com/serenity-rs/serenity"
branch = "current"

Expand All @@ -78,18 +80,22 @@ version = "0.1"

[dependencies.tokio]
optional = true
version = "0.2"
version = "1.0"
default-features = false

[dependencies.twilight-gateway]
optional = true
version = "0.2"
#version = "0.3"
default-features = false
git = "https://github.com/twilight-rs/twilight"
branch = "v0.3"

[dependencies.twilight-model]
optional = true
version = "0.2"
#version = "0.3"
default-features = false
git = "https://github.com/twilight-rs/twilight"
branch = "v0.3"

[dependencies.typemap_rev]
optional = true
Expand Down Expand Up @@ -135,13 +141,12 @@ driver = [
"serenity-voice-model",
"spin_sleep",
"streamcatcher",
"tokio/blocking",
"tokio/fs",
"tokio/io-util",
"tokio/macros",
"tokio/net",
"tokio/process",
"tokio/rt-core",
"tokio/rt",
"tokio/sync",
"tokio/time",
"typemap_rev",
Expand Down
4 changes: 2 additions & 2 deletions examples/serenity/voice/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ git = "https://github.com/serenity-rs/serenity"
branch = "current"

[dependencies.tokio]
version = "0.2"
features = ["macros"]
version = "1.0"
features = ["macros", "rt-multi-thread"]
4 changes: 2 additions & 2 deletions examples/serenity/voice_events_queue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ git = "https://github.com/serenity-rs/serenity"
branch = "current"

[dependencies.tokio]
version = "0.2"
features = ["macros"]
version = "1.0"
features = ["macros", "rt-multi-thread"]
9 changes: 5 additions & 4 deletions examples/serenity/voice_receive/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ authors = ["my name <my@email.address>"]
edition = "2018"

[dependencies]
env_logger = "~0.6"
log = "~0.4"
tracing = "0.1"
tracing-subscriber = "0.2"
tracing-futures = "0.2"

[dependencies.songbird]
path = "../../../"
Expand All @@ -17,5 +18,5 @@ git = "https://github.com/serenity-rs/serenity"
branch = "current"

[dependencies.tokio]
version = "0.2"
features = ["macros"]
version = "1.0"
features = ["macros", "rt-multi-thread"]
2 changes: 2 additions & 0 deletions examples/serenity/voice_receive/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ struct General;

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();

// Configure the client with your Discord bot token in the environment.
let token = env::var("DISCORD_TOKEN")
.expect("Expected a token in the environment");
Expand Down
4 changes: 2 additions & 2 deletions examples/serenity/voice_storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ git = "https://github.com/serenity-rs/serenity"
branch = "current"

[dependencies.tokio]
version = "0.2"
features = ["macros"]
version = "1.0"
features = ["macros", "rt-multi-thread"]
10 changes: 5 additions & 5 deletions examples/twilight/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ futures = "0.3"
tracing = "0.1"
tracing-subscriber = "0.2"
serde_json = { version = "1" }
tokio = { features = ["macros", "rt-threaded", "sync"], version = "0.2" }
twilight-gateway = "0.2"
twilight-http = "0.2"
twilight-model = "0.2"
twilight-standby = "0.2"
tokio = { features = ["macros", "rt-multi-thread", "sync"], version = "1" }
twilight-gateway = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" }
twilight-http = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" }
twilight-model = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" }
twilight-standby = { git = "https://github.com/twilight-rs/twilight", branch = "v0.3" }

[dependencies.songbird]
path = "../.."
Expand Down
8 changes: 5 additions & 3 deletions src/driver/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
use discortp::discord::{IpDiscoveryPacket, IpDiscoveryType, MutableIpDiscoveryPacket};
use error::{Error, Result};
use flume::Sender;
use std::{net::IpAddr, str::FromStr};
use std::{net::IpAddr, str::FromStr, sync::Arc};
use tokio::net::UdpSocket;
use tracing::{debug, info, instrument};
use url::Url;
Expand Down Expand Up @@ -97,7 +97,7 @@ impl Connection {
return Err(Error::CryptoModeUnavailable);
}

let mut udp = UdpSocket::bind("0.0.0.0:0").await?;
let udp = UdpSocket::bind("0.0.0.0:0").await?;
udp.connect((ready.ip, ready.port)).await?;

// Follow Discord's IP Discovery procedures, in case NAT tunnelling is needed.
Expand Down Expand Up @@ -161,7 +161,9 @@ impl Connection {
let (ws_msg_tx, ws_msg_rx) = flume::unbounded();
let (udp_sender_msg_tx, udp_sender_msg_rx) = flume::unbounded();
let (udp_receiver_msg_tx, udp_receiver_msg_rx) = flume::unbounded();
let (udp_rx, udp_tx) = udp.split();

let udp_rx = Arc::new(udp);
let udp_tx = Arc::clone(&udp_rx);

let ssrc = ready.ssrc;

Expand Down
8 changes: 4 additions & 4 deletions src/driver/tasks/udp_rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use discortp::{
PacketSize,
};
use flume::Receiver;
use std::collections::HashMap;
use tokio::net::udp::RecvHalf;
use std::{collections::HashMap, sync::Arc};
use tokio::net::UdpSocket;
use tracing::{error, info, instrument, warn};
use xsalsa20poly1305::XSalsa20Poly1305 as Cipher;

Expand Down Expand Up @@ -236,7 +236,7 @@ struct UdpRx {
config: Config,
packet_buffer: [u8; VOICE_PACKET_MAX],
rx: Receiver<UdpRxMessage>,
udp_socket: RecvHalf,
udp_socket: Arc<UdpSocket>,
}

impl UdpRx {
Expand Down Expand Up @@ -391,7 +391,7 @@ pub(crate) async fn runner(
rx: Receiver<UdpRxMessage>,
cipher: Cipher,
config: Config,
udp_socket: RecvHalf,
udp_socket: Arc<UdpSocket>,
) {
info!("UDP receive handle started.");

Expand Down
9 changes: 5 additions & 4 deletions src/driver/tasks/udp_tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ use super::message::*;
use crate::constants::*;
use discortp::discord::MutableKeepalivePacket;
use flume::Receiver;
use std::sync::Arc;
use tokio::{
net::udp::SendHalf,
time::{timeout_at, Elapsed, Instant},
net::UdpSocket,
time::{timeout_at, Instant},
};
use tracing::{error, info, instrument, trace};

#[instrument(skip(udp_msg_rx))]
pub(crate) async fn runner(udp_msg_rx: Receiver<UdpTxMessage>, ssrc: u32, mut udp_tx: SendHalf) {
pub(crate) async fn runner(udp_msg_rx: Receiver<UdpTxMessage>, ssrc: u32, udp_tx: Arc<UdpSocket>) {
info!("UDP transmit handle started.");

let mut keepalive_bytes = [0u8; MutableKeepalivePacket::minimum_packet_size()];
Expand All @@ -22,7 +23,7 @@ pub(crate) async fn runner(udp_msg_rx: Receiver<UdpTxMessage>, ssrc: u32, mut ud
loop {
use UdpTxMessage::*;
match timeout_at(ka_time, udp_msg_rx.recv_async()).await {
Err(Elapsed { .. }) => {
Err(_) => {
trace!("Sending UDP Keepalive.");
if let Err(e) = udp_tx.send(&keepalive_bytes[..]).await {
error!("Fatal UDP keepalive send error: {:?}.", e);
Expand Down
2 changes: 1 addition & 1 deletion src/driver/tasks/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl AuxNetwork {
let mut ws_error = false;
let mut should_reconnect = false;

let hb = time::delay_until(next_heartbeat);
let hb = time::sleep_until(next_heartbeat);

tokio::select! {
_ = hb => {
Expand Down
4 changes: 2 additions & 2 deletions src/tracks/command.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;
use crate::events::EventData;
use flume::Sender;
use std::time::Duration;
use tokio::sync::oneshot::Sender as OneshotSender;

/// A request from external code using a [`TrackHandle`] to modify
/// or act upon an [`Track`] object.
Expand All @@ -27,7 +27,7 @@ pub enum TrackCommand {
/// Run some closure on this track, with direct access to the core object.
Do(Box<dyn FnOnce(&mut Track) + Send + Sync + 'static>),
/// Request a read-only view of this track's state.
Request(OneshotSender<Box<TrackState>>),
Request(Sender<Box<TrackState>>),
/// Change the loop count/strategy of this track.
Loop(LoopState),
/// Prompts a track's input to become live and usable, if it is not already.
Expand Down
11 changes: 6 additions & 5 deletions src/tracks/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ use crate::{
events::{Event, EventData, EventHandler},
input::Metadata,
};
use flume::Sender;
use std::{fmt, sync::Arc, time::Duration};
use tokio::sync::{mpsc::UnboundedSender, oneshot, RwLock};
use tokio::sync::RwLock;
use typemap_rev::TypeMap;
use uuid::Uuid;

Expand All @@ -25,7 +26,7 @@ pub struct TrackHandle {
}

struct InnerHandle {
command_channel: UnboundedSender<TrackCommand>,
command_channel: Sender<TrackCommand>,
seekable: bool,
uuid: Uuid,
metadata: Box<Metadata>,
Expand All @@ -50,7 +51,7 @@ impl TrackHandle {
///
/// [`Input`]: crate::input::Input
pub fn new(
command_channel: UnboundedSender<TrackCommand>,
command_channel: Sender<TrackCommand>,
seekable: bool,
uuid: Uuid,
metadata: Box<Metadata>,
Expand Down Expand Up @@ -159,10 +160,10 @@ impl TrackHandle {

/// Request playback information and state from the audio context.
pub async fn get_info(&self) -> TrackResult<Box<TrackState>> {
let (tx, rx) = oneshot::channel();
let (tx, rx) = flume::bounded(1);
self.send(TrackCommand::Request(tx))?;

rx.await.map_err(|_| TrackError::Finished)
rx.recv_async().await.map_err(|_| TrackError::Finished)
}

/// Set an audio track to loop indefinitely.
Expand Down
14 changes: 5 additions & 9 deletions src/tracks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ mod state;
pub use self::{command::*, error::*, handle::*, looping::*, mode::*, queue::*, state::*};

use crate::{constants::*, driver::tasks::message::*, events::EventStore, input::Input};
use flume::{Receiver, TryRecvError};
use std::time::Duration;
use tokio::sync::mpsc::{self, error::TryRecvError, UnboundedReceiver};
use uuid::Uuid;

/// Control object for audio playback.
Expand Down Expand Up @@ -102,7 +102,7 @@ pub struct Track {
/// Track commands are sent in this manner to ensure that access
/// occurs in a thread-safe manner, without allowing any external
/// code to lock access to audio objects and block packet generation.
pub(crate) commands: UnboundedReceiver<TrackCommand>,
pub(crate) commands: Receiver<TrackCommand>,

/// Handle for safe control of this audio track from other threads.
///
Expand All @@ -124,11 +124,7 @@ impl Track {
/// In general, you should probably use [`create_player`].
///
/// [`create_player`]: fn.create_player.html
pub fn new_raw(
source: Input,
commands: UnboundedReceiver<TrackCommand>,
handle: TrackHandle,
) -> Self {
pub fn new_raw(source: Input, commands: Receiver<TrackCommand>, handle: TrackHandle) -> Self {
let uuid = handle.uuid();

Self {
Expand Down Expand Up @@ -310,7 +306,7 @@ impl Track {
MakePlayable => self.make_playable(),
}
},
Err(TryRecvError::Closed) => {
Err(TryRecvError::Disconnected) => {
// this branch will never be visited.
break;
},
Expand Down Expand Up @@ -389,7 +385,7 @@ pub fn create_player(source: Input) -> (Track, TrackHandle) {
/// [`Track`]: Track
/// [`TrackHandle`]: TrackHandle
pub fn create_player_with_uuid(source: Input, uuid: Uuid) -> (Track, TrackHandle) {
let (tx, rx) = mpsc::unbounded_channel();
let (tx, rx) = flume::unbounded();
let can_seek = source.is_seekable();
let metadata = source.metadata.clone();
let handle = TrackHandle::new(tx, can_seek, uuid, metadata);
Expand Down