From 5fd58ac5d3601b8383ce1aa43691de14972a03a6 Mon Sep 17 00:00:00 2001 From: Andrey Khrolenok Date: Wed, 2 Dec 2020 04:23:20 +0300 Subject: [PATCH] Remastered access to cloud --- custom_components/jq300/__init__.py | 184 ++++++++++++----------- custom_components/jq300/binary_sensor.py | 8 +- custom_components/jq300/const.py | 5 +- custom_components/jq300/sensor.py | 8 +- requirements-dev.txt | 10 +- requirements.txt | 7 +- 6 files changed, 120 insertions(+), 102 deletions(-) diff --git a/custom_components/jq300/__init__.py b/custom_components/jq300/__init__.py index cdeb06e..1800b17 100644 --- a/custom_components/jq300/__init__.py +++ b/custom_components/jq300/__init__.py @@ -13,11 +13,13 @@ import json import logging from time import monotonic -from typing import Optional, Dict +from typing import Optional, Dict, Any +from urllib.parse import urlparse import async_timeout import homeassistant.helpers.config_validation as cv import homeassistant.util.dt as dt_util +import paho.mqtt.client as mqtt import requests import voluptuous as vol from homeassistant import exceptions @@ -28,12 +30,9 @@ CONCENTRATION_PARTS_PER_BILLION, CONCENTRATION_MILLIGRAMS_PER_CUBIC_METER, CONCENTRATION_PARTS_PER_MILLION, - CONF_SCAN_INTERVAL, ) from homeassistant.helpers import discovery from homeassistant.helpers.dispatcher import dispatcher_send -from homeassistant.helpers.event import async_track_time_interval -from homeassistant.util import Throttle from requests import PreparedRequest from .const import ( @@ -51,7 +50,6 @@ MSG_LOGIN_FAIL, MSG_BUSY, SENSORS, - UPDATE_MIN_INTERVAL, CONF_RECEIVE_TVOC_IN_PPB, CONF_RECEIVE_HCHO_IN_PPB, SENSORS_FILTER_FRAME, @@ -61,13 +59,14 @@ PLATFORMS, UPDATE_TIMEOUT, ACCOUNT_CONTROLLER, - SCAN_INTERVAL, SIGNAL_UPDATE_JQ300, CONF_ACCOUNT_ID, + MQTT_URL, ) from .util import mask_email _LOGGER = logging.getLogger(__name__) +_LOGGER_MQTT = logging.getLogger(__name__ + ".mqtt") ACCOUNT_SCHEMA = vol.Schema( { @@ -76,7 +75,6 @@ vol.Optional(CONF_DEVICES): vol.All(cv.ensure_list, [cv.string]), vol.Optional(CONF_RECEIVE_TVOC_IN_PPB, default=False): cv.boolean, vol.Optional(CONF_RECEIVE_HCHO_IN_PPB, default=False): cv.boolean, - vol.Optional(CONF_SCAN_INTERVAL, default=SCAN_INTERVAL): cv.time_period, } ) @@ -102,7 +100,6 @@ async def async_setup(hass, config): active_devices = config[DOMAIN].get(CONF_DEVICES, []) receive_tvoc_in_ppb = config[DOMAIN].get(CONF_RECEIVE_TVOC_IN_PPB) receive_hcho_in_ppb = config[DOMAIN].get(CONF_RECEIVE_HCHO_IN_PPB) - scan_interval = conf.get(CONF_SCAN_INTERVAL, SCAN_INTERVAL) _LOGGER.debug("Connecting to account %s", mask_email(conf[CONF_USERNAME])) @@ -124,14 +121,6 @@ async def async_setup(hass, config): account.active_devices.append(device_id) devs[name] = device_id - # Fetch initial data so we have data when entities subscribe - try: - await account.async_update_sensors_or_timeout() - except CannotConnect: - return False - - async_track_time_interval(hass, account.update_sensors, scan_interval) - hass.data[DOMAIN][account.unique_id] = { ACCOUNT_CONTROLLER: account, CONF_DEVICES: devs, @@ -177,6 +166,7 @@ def __init__( self._receive_tvoc_in_ppb = receive_tvoc_in_ppb self._receive_hcho_in_ppb = receive_hcho_in_ppb + self._mqtt = None self._active_devices = [] self._available = False self._session = requests.session() @@ -347,8 +337,68 @@ def connect(self, force=False) -> bool: self.params["uid"] = ret["uid"] self.params["safeToken"] = ret["safeToken"] self._devices = {} + + self._mqtt_connect() + return True + def _mqtt_connect(self): + _LOGGER.debug("MQTT connection") + if self._mqtt is not None or not self.is_connected: + return + + # pylint: disable=unused-argument + def on_connect_callback(client, userdata, flags, res): + _LOGGER.debug("Connected to MQTT") + try: + self._mqtt_subscribe(self._get_devices_mqtt_topics(self.active_devices)) + except Exception as exc: # pylint: disable=broad-except + logging.exception(exc) + + # pylint: disable=unused-argument + def on_message_callback(client, userdata, message): + try: + msg = json.loads(message.payload) + _LOGGER.debug("Received MQTT message: %s", msg) + self._update_sensors(msg) + except Exception as exc: # pylint: disable=broad-except + logging.exception(exc) + + self._mqtt = mqtt.Client( + client_id="_".join( + (str(self.params["uid"]), str(int(dt_util.now().timestamp() * 1000))) + ), + clean_session=True, + ) + self._mqtt.enable_logger(_LOGGER_MQTT) + self._mqtt.on_connect = on_connect_callback + self._mqtt.on_message = on_message_callback + parsed = urlparse(MQTT_URL) + if parsed.username is not None: + if parsed.password is not None: + self._mqtt.username_pw_set(parsed.username, parsed.password) + else: + _LOGGER.error( + "The MQTT password was not found, " "this is required for auth" + ) + self._mqtt.connect_async(host=parsed.hostname, port=parsed.port) + self._mqtt.loop_start() + + def _mqtt_subscribe(self, topics: list): + if self._mqtt.is_connected(): + self._mqtt.subscribe([(x, 0) for x in topics]) + + def _mqtt_unsubscribe(self, topics: list): + if self._mqtt.is_connected(): + self._mqtt.unsubscribe(topics) + + def _get_devices_mqtt_topics(self, device_ids: list) -> list: + devs = self.update_devices() + topics = list( + (devs[dev_id]["deviceToken"] for dev_id in device_ids) if devs else () + ) + return topics + @property def active_devices(self) -> list: """Get list of devices we want to fetch sensors data.""" @@ -357,12 +407,29 @@ def active_devices(self) -> list: @active_devices.setter def active_devices(self, devices: list): """Set list of devices we want to fetch sensors data.""" + unsub = self._get_devices_mqtt_topics( + list(set(self._active_devices) - set(devices)) + ) + sub = self._get_devices_mqtt_topics( + list(set(devices) - set(self._active_devices)) + ) + self._active_devices = devices for device_id in devices: self._sensors.setdefault(device_id, {}) - def update_devices(self, force=False) -> Optional[dict]: + _LOGGER.debug("Unsubscribe from MQTT topics: %s", ", ".join(unsub)) + self._mqtt_unsubscribe(unsub) + _LOGGER.debug("Subscribe for new MQTT topics: %s", ", ".join(sub)) + self._mqtt_subscribe(sub) + + @property + def devices(self) -> dict: + """Get available devices.""" + return self._devices + + def update_devices(self, force=False) -> Optional[Dict[int, Dict[str, Any]]]: """Update available devices.""" if not self.connect(): _LOGGER.error("Can't connect to cloud.") @@ -407,9 +474,9 @@ async def async_update_devices_or_timeout(self, timeout=UPDATE_TIMEOUT): return self._devices - except asyncio.TimeoutError: + except asyncio.TimeoutError as exc: _LOGGER.error("Timeout fetching %s devices list", self.name_secure) - raise CannotConnect + raise CannotConnect from exc except Exception as err: # pylint: disable=broad-except _LOGGER.exception( @@ -423,34 +490,18 @@ async def async_update_devices_or_timeout(self, timeout=UPDATE_TIMEOUT): monotonic() - start, ) - @property - def devices(self) -> dict: - """Get available devices.""" - return self._devices - - @Throttle(UPDATE_MIN_INTERVAL) - def _fetch_sensors(self, device_id, ts_now) -> bool: - """Fetch states of available sensors for device.""" - devices = self.update_devices() - if not devices or not devices[device_id]: - _LOGGER.error("Can't receive devices list from cloud.") - return False - - ret = self._query( - QUERY_TYPE_DEVICE, - "list", - extra_params={ - "deviceToken": devices[device_id]["deviceToken"], - "timestamp": ts_now, - "callback": "jsoncallback", - "_": ts_now, - }, - ) - if not ret: - return False + def _update_sensors(self, message): + device_id = None + for dev_id, dev in self.devices.items(): + if message["deviceToken"] == dev["deviceToken"]: + device_id = dev_id + if device_id is None: + return + _LOGGER.debug("Update sensors for device %d", device_id) res = {} - for sensor in ret["deviceValueVos"]: + ts_now = int(dt_util.now().timestamp()) + for sensor in json.loads(message["content"]): sensor_id = sensor["seq"] if sensor["content"] is None or ( sensor_id not in SENSORS.keys() @@ -468,47 +519,10 @@ def _fetch_sensors(self, device_id, ts_now) -> bool: self._sensors[device_id][ts_now] = res self._sensors_raw[device_id] = res - self._sensors_last_update = monotonic() - return True - - def update_sensors(self, _=None): - """Update current states of all active devices for account.""" - _LOGGER.debug("Updating sensors state for account %s", self.name_secure) + self._sensors_last_update = ts_now - ts_now = int(dt_util.now().timestamp()) - - for device_id in self.active_devices: - self._fetch_sensors(device_id, ts_now) - - dispatcher_send(self.hass, SIGNAL_UPDATE_JQ300) - - async def async_update_sensors_or_timeout(self, timeout=UPDATE_TIMEOUT): - """Update current states of all active devices for account.""" - try: - with async_timeout.timeout(timeout): - start = monotonic() - await self.hass.async_add_job(self.update_sensors) - while self._sensors_last_update < start: - # Waiting for connection and check datas ready - await asyncio.sleep(1) - - except asyncio.TimeoutError: - _LOGGER.error("Timeout fetching %s device's sensors", self.name_secure) - raise CannotConnect - - except Exception as err: # pylint: disable=broad-except - _LOGGER.exception( - "Unexpected error fetching %s device's sensors: %s", - self.name_secure, - err, - ) - - finally: - _LOGGER.debug( - "Finished fetching %s device's sensors in %.3f seconds", - self.name_secure, - monotonic() - start, - ) + if self.hass: + dispatcher_send(self.hass, SIGNAL_UPDATE_JQ300) def get_sensors_raw(self, device_id) -> Optional[dict]: """Get raw values of states of available sensors for device.""" diff --git a/custom_components/jq300/binary_sensor.py b/custom_components/jq300/binary_sensor.py index ca8e90d..d76712e 100644 --- a/custom_components/jq300/binary_sensor.py +++ b/custom_components/jq300/binary_sensor.py @@ -10,6 +10,7 @@ # (see LICENSE.md or https://creativecommons.org/licenses/by-nc-sa/4.0/) import logging +from time import sleep from homeassistant import exceptions from homeassistant.components.binary_sensor import ENTITY_ID_FORMAT @@ -56,9 +57,10 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info= entities = [] for dev_name, dev_id in devices.items(): sensors = account.get_sensors(dev_id) - if not sensors: - _LOGGER.error("Can't receive sensors list for device '%s'.", dev_id) - continue + while not sensors: + _LOGGER.debug("Sensors list is not ready. Wait for 3 sec...") + sleep(3) + sensors = account.get_sensors(dev_id) sensor_id = 1 ent_name = BINARY_SENSORS.get(sensor_id)[4] or BINARY_SENSORS.get(sensor_id)[0] diff --git a/custom_components/jq300/const.py b/custom_components/jq300/const.py index 464425f..a5cd0b9 100644 --- a/custom_components/jq300/const.py +++ b/custom_components/jq300/const.py @@ -51,6 +51,7 @@ BASE_URL_API = "http://www.youpinyuntai.com:32086/ypyt-api/api/app/" BASE_URL_DEVICE = "https://www.youpinyuntai.com:31447/device/" +MQTT_URL = "mqtt://ye5h8c3n:T%4ran8c@www.youpinyuntai.com:55450" _USERAGENT_SYSTEM = "Android 6.0.1; RedMi Note 5 Build/RB3N5C" USERAGENT_API = "Dalvik/2.1.0 (Linux; U; %s)" % _USERAGENT_SYSTEM @@ -94,9 +95,7 @@ ATTR_DEVICE_MODEL = "device_model" ATTR_RAW_STATE = "raw_state" -SCAN_INTERVAL = timedelta(seconds=60) -UPDATE_MIN_INTERVAL = timedelta(minutes=10) -SENSORS_FILTER_FRAME = timedelta(minutes=15) +SENSORS_FILTER_FRAME = timedelta(minutes=5) SIGNAL_UPDATE_JQ300 = "jq300_update" diff --git a/custom_components/jq300/sensor.py b/custom_components/jq300/sensor.py index dad0a6a..fee0e46 100644 --- a/custom_components/jq300/sensor.py +++ b/custom_components/jq300/sensor.py @@ -10,6 +10,7 @@ # (see LICENSE.md or https://creativecommons.org/licenses/by-nc-sa/4.0/) import logging +from time import sleep from homeassistant import exceptions from homeassistant.components.sensor import ENTITY_ID_FORMAT @@ -49,9 +50,10 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info= entities = [] for dev_name, dev_id in devices.items(): sensors = account.get_sensors(dev_id) - if not sensors: - _LOGGER.error("Can't receive sensors list for device '%s'.", dev_id) - continue + while not sensors: + _LOGGER.debug("Sensors list is not ready. Wait for 3 sec...") + sleep(3) + sensors = account.get_sensors(dev_id) for sensor_id, sensor_state in sensors.items(): if sensor_id not in SENSORS.keys(): diff --git a/requirements-dev.txt b/requirements-dev.txt index c3becef..1ad564e 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,11 +1,11 @@ -r requirements.txt black==20.8b1 flake8~=3.8 -mypy==0.782 -packaging~=20.4 -pre-commit~=2.7 -PyGithub==1.53 +mypy==0.790 +packaging~=20.7 +pre-commit~=2.9 +PyGithub~=1.54 pylint~=2.6 pylint-strict-informational==0.1 pyupgrade~=2.7 -yamllint~=1.24 +yamllint~=1.25 diff --git a/requirements.txt b/requirements.txt index 84404ed..9f7d8c5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ -requests~=2.24 -voluptuous~=0.11 -homeassistant~=0.115 +requests~=2.25 +voluptuous~=0.12 +homeassistant~=0.118 +paho-mqtt~=1.5