diff --git a/src/main.rs b/src/main.rs index 63231f7..b8f34ae 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,33 +2,70 @@ mod config; mod modules; mod options; -use crate::config::get_config; +use crate::config::{get_config, Config}; use crate::modules::*; use crate::options::CliOptions; use log::LevelFilter; use structopt::StructOpt; use tokio::sync::{broadcast, mpsc}; +use mqtt_async_client::client::{Publish, Client}; -#[tokio::main] -async fn main() -> anyhow::Result<()> { +fn main() -> anyhow::Result<()> { let options = CliOptions::from_args(); setup_logging(options.verbose); let config = get_config(&options)?; + log::info!("Starting desktop2mqtt..."); + + let mut runtime = tokio::runtime::Runtime::new()?; + + let mut client = Client::builder().set_host(config.mqtt.url.clone()).build()?; + + runtime.block_on(run(&mut client, config.clone()))?; + log::info!("Stopping desktop2mqtt..."); + + runtime.block_on(go_offline(&client, &config))?; + runtime.shutdown_background(); + + Ok(()) +} + +async fn go_offline(client: &Client, config: &Config) -> anyhow::Result<()> { + let msg = MqttMessage { + topic: format!("desktop2mqtt/{}/availability", config.hass.entity_id), + payload: "offline".to_string() + }; + let mut publish = Publish::from(msg); + publish.set_retain(true); + client.publish(&publish).await?; + Ok(()) +} + +async fn run(client: &mut Client, config: Config) -> anyhow::Result<()> { + client.connect().await?; + tokio::select! { + result = run_loop(client, config) => { + result?; + } + _ = tokio::signal::ctrl_c() => {} + }; + + Ok(()) +} + +async fn run_loop(client: &mut Client, config: Config) -> anyhow::Result<()> { let (mqtt_sender, mqtt_receiver) = mpsc::unbounded_channel(); let (mqtt_event_sender, mqtt_event_receiver) = broadcast::channel(10); let (state_sender, state_receiver) = mpsc::unbounded_channel(); - let mut mqtt_module = MqttModule::new(mqtt_receiver, mqtt_event_sender); + let mut mqtt_module = MqttModule::new(client, mqtt_receiver, mqtt_event_sender); let mut hass_discovery_module = HomeAssistantModule::new(mqtt_sender.clone()); let mut state_module = StateModule::new(mqtt_sender.clone(), state_receiver); let mut idle_module = IdleModule::new(state_sender.clone()); let mut backlight_module = get_backlight_module(state_sender, mqtt_event_receiver, config.backlight); - log::info!("Starting desktop2mqtt..."); - tokio::try_join!( mqtt_module.run(&config), hass_discovery_module.run(&config), diff --git a/src/modules/home_assistant.rs b/src/modules/home_assistant.rs index f0fa206..4ea7c4c 100644 --- a/src/modules/home_assistant.rs +++ b/src/modules/home_assistant.rs @@ -96,6 +96,7 @@ impl HomeAssistantModule { #[derive(Debug, Clone, Serialize)] pub struct ConfigMessage { + pub availability_topic: String, pub name: String, pub unique_id: String, pub state_topic: String, @@ -116,6 +117,7 @@ impl ConfigMessage { config: SensorConfig, ) -> Self { ConfigMessage { + availability_topic: format!("{}/availability", topic), name, unique_id: id, device, @@ -128,6 +130,7 @@ impl ConfigMessage { fn light(name: String, id: String, device: Device, topic: String, config: LightConfig) -> Self { ConfigMessage { + availability_topic: format!("{}/availability", topic), name, unique_id: id, device, diff --git a/src/modules/mqtt.rs b/src/modules/mqtt.rs index e87b12e..a5746cc 100644 --- a/src/modules/mqtt.rs +++ b/src/modules/mqtt.rs @@ -10,37 +10,39 @@ use crate::modules::Module; use serde::de::DeserializeOwned; use std::convert::TryFrom; -pub struct MqttModule { +pub struct MqttModule<'a> { + client: &'a mut Client, receiver: UnboundedReceiver, sender: broadcast::Sender, } -impl MqttModule { +impl<'a> MqttModule<'a> { pub fn new( + client: &'a mut Client, receiver: UnboundedReceiver, sender: broadcast::Sender, ) -> Self { - MqttModule { receiver, sender } + MqttModule { client, receiver, sender } } } -impl Module for MqttModule { +impl<'a> Module for MqttModule<'a> { fn run(&mut self, config: &Config) -> BoxFuture> { - let mqtt_config = config.mqtt.clone(); + let entity_id = config.hass.entity_id.clone(); async move { - let mut client = Client::builder().set_host(mqtt_config.url).build()?; - - client.connect().await?; - + self.publish(MqttMessage { + topic: format!("desktop2mqtt/{}/availability", entity_id), + payload: "online".to_string() + }).await?; loop { tokio::select! { Some(msg) = self.receiver.recv() => { match msg { - MqttCommand::Subscribe(topic) => Self::subscribe(&mut client, topic).await?, - MqttCommand::Emit(msg) => Self::publish(&client, msg).await?, + MqttCommand::Subscribe(topic) => self.subscribe(topic).await?, + MqttCommand::Emit(msg) => self.publish(msg).await?, } } - msg = client.read_subscriptions() => { + msg = self.client.read_subscriptions() => { Self::recv(msg, &self.sender).await?; }, else => break @@ -53,24 +55,24 @@ impl Module for MqttModule { } } -impl MqttModule { - async fn publish(client: &Client, msg: MqttMessage) -> anyhow::Result<()> { +impl<'a> MqttModule<'a> { + async fn publish(&self, msg: MqttMessage) -> anyhow::Result<()> { log::debug!("Publishing mqtt message {:?}...", &msg); let mut publish = Publish::from(msg); publish.set_retain(true); - client.publish(&publish).await?; + self.client.publish(&publish).await?; Ok(()) } - async fn subscribe(client: &mut Client, topic: String) -> anyhow::Result<()> { + async fn subscribe(&mut self, topic: String) -> anyhow::Result<()> { let topic = SubscribeTopic { topic_path: topic, qos: QoS::AtLeastOnce, }; let subscription = Subscribe::new(vec![topic]); - client.subscribe(subscription).await?; + self.client.subscribe(subscription).await?; Ok(()) }