Skip to content

Commit

Permalink
Introduce overlay with websocket support (related #22)
Browse files Browse the repository at this point in the history
  • Loading branch information
udoprog committed Apr 1, 2019
1 parent b44cbe2 commit ae0293a
Show file tree
Hide file tree
Showing 33 changed files with 1,206 additions and 1,866 deletions.
18 changes: 18 additions & 0 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"type": "cargo",
"label": "cargo build",
"command": "cargo",
"args": [
"build"
],
"problemMatcher": [
"$rustc"
]
}
]
}
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Web-based overlay with current song ([#22]).

### Changed
- Cleaned up old cruft in the codebase (`gfx` module).

[#22]: https://github.com/udoprog/setmod/issues/22

[Unreleased]: https://github.com/udoprog/setmod/compare/0.2.3...HEAD

## [0.2.3]
Expand Down
149 changes: 134 additions & 15 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
[workspace]

members = [
"notifier",
"bot",
"web",
# "gfx",
]
2 changes: 1 addition & 1 deletion bot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ edition = "2018"
license = "MIT/Apache-2.0"

[dependencies]
setmod-notifier = {version = "0.2.3", path = "../notifier"}
chrono = {version = "0.4.6", features = ["serde"]}
clap = "2.32"
diesel = {version = "1.4.1", features = ["sqlite", "r2d2", "chrono"]}
Expand All @@ -16,6 +15,7 @@ failure = "0.1"
futures = "0.1.25"
hashbrown = {version = "0.1.8", features = ["serde"]}
hyper = "0.12.24"
warp = "0.1.14"
irc = {version = "0.13", git = "https://github.com/aatxe/irc", branch = "develop"}
log = "0.4.6"
oauth2 = {version = "2.0.0-alpha.5", git = "https://github.com/udoprog/oauth2-rs", branch = "async"}
Expand Down
215 changes: 215 additions & 0 deletions bot/src/bus.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use crate::{player, spotify};
use failure::format_err;
use futures::{future, Async, Future, Poll, Stream};
use hashbrown::HashMap;
use parking_lot::Mutex;
use std::{
net::{Ipv4Addr, SocketAddr},
sync::Arc,
};
use tokio::{
io::{self, AsyncRead, WriteHalf},
net::{TcpListener, TcpStream},
};

pub type Reader<T> = tokio_bus::BusReader<T>;

struct Inner {
bus: tokio_bus::Bus<Message>,
/// Latest instance of all messages.
latest: HashMap<&'static str, Message>,
}

/// Bus system.
pub struct Bus {
bus: Mutex<Inner>,
address: SocketAddr,
}

impl Bus {
/// Create a new notifier.
pub fn new() -> Self {
Bus {
bus: Mutex::new(Inner {
bus: tokio_bus::Bus::new(1024),
latest: HashMap::new(),
}),
address: SocketAddr::new(Ipv4Addr::new(127, 0, 0, 1).into(), 4444),
}
}

/// Send a message to the bus.
pub fn send(&self, m: Message) {
let mut inner = self.bus.lock();

if let Some(key) = m.cache() {
inner.latest.insert(key, m.clone());
}

if let Err(_) = inner.bus.try_broadcast(m) {
log::error!("failed to send notification: bus is full");
}
}

/// Get the latest messages received.
pub fn latest(&self) -> Vec<Message> {
let inner = self.bus.lock();
inner.latest.values().cloned().collect()
}

/// Create a receiver of the bus.
pub fn add_rx(self: Arc<Self>) -> Reader<Message> {
self.bus.lock().bus.add_rx()
}

/// Listen for incoming connections and hand serialized bus messages to connected sockets.
pub fn listen(self: Arc<Self>) -> impl Future<Item = (), Error = failure::Error> {
let listener = future::result(TcpListener::bind(&self.address));

listener.from_err::<failure::Error>().and_then(|listener| {
listener
.incoming()
.from_err::<failure::Error>()
.and_then(move |s| {
let (_, writer) = s.split();
let rx = self.bus.lock().bus.add_rx();

let handler = BusHandler::new(writer, rx)
.map_err(|e| {
log::error!("failed to process outgoing message: {}", e);
})
.for_each(|_| Ok(()));

tokio::spawn(handler);
Ok(())
})
.for_each(|_| Ok(()))
})
}
}

enum BusHandlerState {
Receiving,
Serialize(Message),
Send(io::WriteAll<WriteHalf<TcpStream>, String>),
}

/// Handles reading messages of a buss and writing them to a TcpStream.
struct BusHandler {
writer: Option<WriteHalf<TcpStream>>,
rx: tokio_bus::BusReader<Message>,
state: BusHandlerState,
}

impl BusHandler {
pub fn new(writer: WriteHalf<TcpStream>, rx: tokio_bus::BusReader<Message>) -> Self {
Self {
writer: Some(writer),
rx,
state: BusHandlerState::Receiving,
}
}
}

impl Stream for BusHandler {
type Item = ();
type Error = failure::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
use self::BusHandlerState::*;

loop {
self.state = match self.state {
Receiving => match self.rx.poll() {
Ok(Async::Ready(Some(m))) => Serialize(m),
Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => return Err(failure::Error::from(e)),
},
Serialize(ref m) => match (serde_json::to_string(m), self.writer.take()) {
(Ok(json), Some(writer)) => Send(io::write_all(writer, format!("{}\n", json))),
(_, None) => return Err(format_err!("writer not available")),
(Err(e), _) => return Err(failure::Error::from(e)),
},
Send(ref mut f) => match f.poll() {
Ok(Async::Ready((writer, _))) => {
self.writer = Some(writer);
self.state = Receiving;
continue;
}
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(e) => return Err(failure::Error::from(e)),
},
}
}
}
}

#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
#[serde(tag = "type")]
pub enum Message {
#[serde(rename = "firework")]
Firework,
#[serde(rename = "ping")]
Ping,
/// Progress of current song.
#[serde(rename = "song/progress")]
SongProgress { elapsed: u64, duration: u64 },
#[serde(rename = "song/current")]
SongCurrent {
track: Option<spotify::FullTrack>,
user: Option<String>,
paused: bool,
},
}

impl Message {
/// Construct a message about song progress.
pub fn song_progress(song: Option<&player::Song>) -> Self {
let song = match song {
Some(song) => song,
None => {
return Message::SongProgress {
elapsed: 0,
duration: 0,
}
}
};

Message::SongProgress {
elapsed: song.elapsed().as_secs(),
duration: song.duration().as_secs(),
}
}

/// Message indicating that no song is playing.
pub fn from_song(song: Option<&player::Song>, paused: bool) -> Self {
let song = match song {
Some(song) => song,
None => {
return Message::SongCurrent {
track: None,
user: None,
paused,
}
}
};

Message::SongCurrent {
track: Some(song.item.track.clone()),
user: song.item.user.clone(),
paused,
}
}

/// Whether a message should be cached or not and under what key.
pub fn cache(&self) -> Option<&'static str> {
use self::Message::*;

match *self {
SongProgress { .. } => Some("song/progress"),
SongCurrent { .. } => Some("song/current"),
_ => None,
}
}
}
19 changes: 19 additions & 0 deletions bot/src/current_song.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ impl CurrentSong {
Ok(())
}

/// Blank as a string.
pub fn blank_to_string(&self) -> String {
if let Some(not_playing) = self.not_playing.as_ref() {
not_playing.to_string()
} else {
"Not Playing".to_string()
}
}

/// Write the current song to a path.
pub fn write(&self, song: &player::Song, paused: bool) -> Result<(), failure::Error> {
let mut f = self.create_or_truncate()?;
Expand All @@ -43,6 +52,16 @@ impl CurrentSong {
Ok(())
}

/// Format as string.
pub fn write_to_string(
&self,
song: &player::Song,
paused: bool,
) -> Result<String, failure::Error> {
let data = song.data(paused)?;
self.template.render_to_string(&data)
}

/// Get the current update frequency, if present.
pub fn update_interval(&self) -> Option<&time::Duration> {
if self.update_interval.as_secs() == 0 {
Expand Down
Loading

0 comments on commit ae0293a

Please sign in to comment.