diff --git a/mqtt2kasa/main.py b/mqtt2kasa/main.py index ccfcb09..6a7f2c8 100755 --- a/mqtt2kasa/main.py +++ b/mqtt2kasa/main.py @@ -3,9 +3,10 @@ import collections from contextlib import AsyncExitStack import re - +import json +from typing import Dict, Optional from aiomqtt import Client, MqttError - +from datetime import datetime, timezone from mqtt2kasa import log from mqtt2kasa.config import Cfg from mqtt2kasa.events import KasaStateEvent, KasaBrightnessEvent, KasaEmeterEvent, MqttMsgEvent @@ -34,6 +35,13 @@ def __init__(self): self.keep_alives: dict[str, KeepAlive] = {} self.keep_alive_topics: dict[str, str] = {} +def create_timestamp_dict(data: Optional[Dict] = None) -> Dict: + utc_time = datetime.now(timezone.utc) + epoch_utc_secs = int(utc_time.timestamp()) + if data is None: + data = {} + data["timestamp"] = epoch_utc_secs + return data async def handle_main_event_kasa( kasa_state: KasaStateEvent, run_state: RunState, mqtt_send_q: asyncio.Queue @@ -51,6 +59,12 @@ async def handle_main_event_kasa( ) await mqtt_send_q.put(MqttMsgEvent(topic=kasa.topic, payload=payload)) + # https://github.com/flavio-fernandes/mqtt2kasa/issues/14 + status_json_topic = f"{kasa.topic}/status" + status_payload = create_timestamp_dict({"name": kasa_state.name, "state": kasa.state_name(kasa_state.state)}) + await mqtt_send_q.put(MqttMsgEvent(topic=status_json_topic, payload=json.dumps(status_payload))) + + async def handle_brightness_event_kasa( kasa_state: KasaBrightnessEvent, run_state: RunState, mqtt_send_q: asyncio.Queue ): @@ -79,20 +93,28 @@ async def handle_emeter_event_kasa( f"Unable to find device with name {kasa_emeter.name}. Ignoring kasa emeter event" ) return - topic = f"{kasa.topic}/emeter" - payload = kasa_emeter.emeter_status - logger.info( - f"Kasa emeter event requesting mqtt for {kasa_emeter.name} to publish" - f" {topic} as {payload}" - ) - await mqtt_send_q.put(MqttMsgEvent(topic=topic, payload=payload)) + emeter_topic = f"{kasa.topic}/emeter" + status_payload = kasa_emeter.emeter_status + await mqtt_send_q.put(MqttMsgEvent(topic=f"{emeter_topic}/status", payload=status_payload)) # also publish each value as a topic # https://github.com/flavio-fernandes/mqtt2kasa/issues/10 - matches = re.findall(r"(\w+)=([^\s>]+)", payload) + matches = re.findall(r"(\w+)=([^\s>]+)", status_payload) + emeter_payload_dict = create_timestamp_dict() for key, value in matches: - emeter_topic = f"{topic}/{key}" - await mqtt_send_q.put(MqttMsgEvent(topic=emeter_topic, payload=value)) + iter_emeter_topic = f"{emeter_topic}/{key}" + emeter_payload_dict[key] = value + await mqtt_send_q.put(MqttMsgEvent(topic=iter_emeter_topic, payload=value)) + + # https://github.com/flavio-fernandes/mqtt2kasa/issues/14 + await mqtt_send_q.put(MqttMsgEvent(topic=f"{emeter_topic}/timestamp", payload=emeter_payload_dict.get("timestamp"))) + + emeter_json_payload = json.dumps(emeter_payload_dict) + logger.info( + f"Kasa emeter event requesting mqtt for {kasa_emeter.name} to publish" + f" {emeter_topic} as {emeter_json_payload}" + ) + await mqtt_send_q.put(MqttMsgEvent(topic=emeter_topic, payload=emeter_json_payload)) async def handle_main_event_mqtt( @@ -141,6 +163,15 @@ async def handle_main_event_mqtt( if kasa.state_name(new_state) != mqtt_msg.payload: msg += f" ({mqtt_msg.payload})" logger.info(msg) + + # https://github.com/flavio-fernandes/mqtt2kasa/issues/14 + status_json_topic = f"{kasa.topic}/status" + status_payload = create_timestamp_dict( + {"name": name, "state": kasa.state_name(new_state)} + ) + await mqtt_send_q.put( + MqttMsgEvent(topic=status_json_topic, payload=json.dumps(status_payload)) + ) return if mqtt_msg.topic.endswith(BRIGHTNESS_TOPIC_SUFFIX): diff --git a/mqtt2kasa/mqtt.py b/mqtt2kasa/mqtt.py index 056c2e8..bd5576a 100755 --- a/mqtt2kasa/mqtt.py +++ b/mqtt2kasa/mqtt.py @@ -28,7 +28,7 @@ async def handle_mqtt_publish(client, mqtt_send_q: asyncio.Queue): mqtt_send_q.task_done() # Dampen publishes. This is a fail-safe and should not affect anything unless # there is a bug lurking somewhere - await asyncio.sleep(1) + await asyncio.sleep(0.5) async def handle_mqtt_messages(messages, main_events_q: asyncio.Queue): diff --git a/mqtt2kasa/tests/basic_test.sh.vagrant b/mqtt2kasa/tests/basic_test.sh.vagrant index 3a1f457..e097afa 100755 --- a/mqtt2kasa/tests/basic_test.sh.vagrant +++ b/mqtt2kasa/tests/basic_test.sh.vagrant @@ -35,7 +35,17 @@ grep --quiet -E 'Discovered 192\.168\.123\.204 .* thing4' ${TMP_OUTPUT} || \ echo TEST: EMeter grep --quiet -E 'bar has no emeter' ${TMP_OUTPUT} || \ { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } -grep --quiet -E 'emeter event requesting mqtt for lar to publish /lar/switch/emeter' ${TMP_OUTPUT} || \ +grep --quiet -E 'emeter event requesting mqtt for lar to publish /lar/switch/emeter as ' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E 'publish /lar/switch/emeter as .*timestamp' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E 'publish /lar/switch/emeter as .*current' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E 'publish /lar/switch/emeter as .*power' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E 'publish /lar/switch/emeter as .*voltage' ${TMP_OUTPUT} || \ + { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } +grep --quiet -E 'publish /lar/switch/emeter as .*total' ${TMP_OUTPUT} || \ { echo "FAILED in $0 line ${LINENO}" >&2; exit ${LINENO}; } echo TEST: Check on/off