Skip to content

Commit

Permalink
feat: add availability support
Browse files Browse the repository at this point in the history
  • Loading branch information
maxjoehnk committed Jan 24, 2021
1 parent 0853476 commit 3bd1482
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 23 deletions.
49 changes: 43 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,70 @@ mod config;
mod modules;
mod options;

use crate::config::get_config;
use crate::config::{get_config, Config};
use crate::modules::*;
use crate::options::CliOptions;
use log::LevelFilter;
use structopt::StructOpt;
use tokio::sync::{broadcast, mpsc};
use mqtt_async_client::client::{Publish, Client};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
fn main() -> anyhow::Result<()> {
let options = CliOptions::from_args();
setup_logging(options.verbose);

let config = get_config(&options)?;

log::info!("Starting desktop2mqtt...");

let mut runtime = tokio::runtime::Runtime::new()?;

let mut client = Client::builder().set_host(config.mqtt.url.clone()).build()?;

runtime.block_on(run(&mut client, config.clone()))?;
log::info!("Stopping desktop2mqtt...");

runtime.block_on(go_offline(&client, &config))?;
runtime.shutdown_background();

Ok(())
}

async fn go_offline(client: &Client, config: &Config) -> anyhow::Result<()> {
let msg = MqttMessage {
topic: format!("desktop2mqtt/{}/availability", config.hass.entity_id),
payload: "offline".to_string()
};
let mut publish = Publish::from(msg);
publish.set_retain(true);
client.publish(&publish).await?;
Ok(())
}

async fn run(client: &mut Client, config: Config) -> anyhow::Result<()> {
client.connect().await?;
tokio::select! {
result = run_loop(client, config) => {
result?;
}
_ = tokio::signal::ctrl_c() => {}
};

Ok(())
}

async fn run_loop(client: &mut Client, config: Config) -> anyhow::Result<()> {
let (mqtt_sender, mqtt_receiver) = mpsc::unbounded_channel();
let (mqtt_event_sender, mqtt_event_receiver) = broadcast::channel(10);
let (state_sender, state_receiver) = mpsc::unbounded_channel();

let mut mqtt_module = MqttModule::new(mqtt_receiver, mqtt_event_sender);
let mut mqtt_module = MqttModule::new(client, mqtt_receiver, mqtt_event_sender);
let mut hass_discovery_module = HomeAssistantModule::new(mqtt_sender.clone());
let mut state_module = StateModule::new(mqtt_sender.clone(), state_receiver);
let mut idle_module = IdleModule::new(state_sender.clone());
let mut backlight_module =
get_backlight_module(state_sender, mqtt_event_receiver, config.backlight);

log::info!("Starting desktop2mqtt...");

tokio::try_join!(
mqtt_module.run(&config),
hass_discovery_module.run(&config),
Expand Down
3 changes: 3 additions & 0 deletions src/modules/home_assistant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl HomeAssistantModule {

#[derive(Debug, Clone, Serialize)]
pub struct ConfigMessage {
pub availability_topic: String,
pub name: String,
pub unique_id: String,
pub state_topic: String,
Expand All @@ -116,6 +117,7 @@ impl ConfigMessage {
config: SensorConfig,
) -> Self {
ConfigMessage {
availability_topic: format!("{}/availability", topic),
name,
unique_id: id,
device,
Expand All @@ -128,6 +130,7 @@ impl ConfigMessage {

fn light(name: String, id: String, device: Device, topic: String, config: LightConfig) -> Self {
ConfigMessage {
availability_topic: format!("{}/availability", topic),
name,
unique_id: id,
device,
Expand Down
36 changes: 19 additions & 17 deletions src/modules/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,39 @@ use crate::modules::Module;
use serde::de::DeserializeOwned;
use std::convert::TryFrom;

pub struct MqttModule {
pub struct MqttModule<'a> {
client: &'a mut Client,
receiver: UnboundedReceiver<MqttCommand>,
sender: broadcast::Sender<MqttMessage>,
}

impl MqttModule {
impl<'a> MqttModule<'a> {
pub fn new(
client: &'a mut Client,
receiver: UnboundedReceiver<MqttCommand>,
sender: broadcast::Sender<MqttMessage>,
) -> Self {
MqttModule { receiver, sender }
MqttModule { client, receiver, sender }
}
}

impl Module for MqttModule {
impl<'a> Module for MqttModule<'a> {
fn run(&mut self, config: &Config) -> BoxFuture<anyhow::Result<()>> {
let mqtt_config = config.mqtt.clone();
let entity_id = config.hass.entity_id.clone();
async move {
let mut client = Client::builder().set_host(mqtt_config.url).build()?;

client.connect().await?;

self.publish(MqttMessage {
topic: format!("desktop2mqtt/{}/availability", entity_id),
payload: "online".to_string()
}).await?;
loop {
tokio::select! {
Some(msg) = self.receiver.recv() => {
match msg {
MqttCommand::Subscribe(topic) => Self::subscribe(&mut client, topic).await?,
MqttCommand::Emit(msg) => Self::publish(&client, msg).await?,
MqttCommand::Subscribe(topic) => self.subscribe(topic).await?,
MqttCommand::Emit(msg) => self.publish(msg).await?,
}
}
msg = client.read_subscriptions() => {
msg = self.client.read_subscriptions() => {
Self::recv(msg, &self.sender).await?;
},
else => break
Expand All @@ -53,24 +55,24 @@ impl Module for MqttModule {
}
}

impl MqttModule {
async fn publish(client: &Client, msg: MqttMessage) -> anyhow::Result<()> {
impl<'a> MqttModule<'a> {
async fn publish(&self, msg: MqttMessage) -> anyhow::Result<()> {
log::debug!("Publishing mqtt message {:?}...", &msg);
let mut publish = Publish::from(msg);
publish.set_retain(true);

client.publish(&publish).await?;
self.client.publish(&publish).await?;

Ok(())
}

async fn subscribe(client: &mut Client, topic: String) -> anyhow::Result<()> {
async fn subscribe(&mut self, topic: String) -> anyhow::Result<()> {
let topic = SubscribeTopic {
topic_path: topic,
qos: QoS::AtLeastOnce,
};
let subscription = Subscribe::new(vec![topic]);
client.subscribe(subscription).await?;
self.client.subscribe(subscription).await?;

Ok(())
}
Expand Down

0 comments on commit 3bd1482

Please sign in to comment.