Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use DataUpdateCoordinator for device state polling to reduce parallel requests to Deye official cloud server #47

Merged
merged 11 commits into from
Dec 18, 2024
Merged
119 changes: 34 additions & 85 deletions custom_components/deye_dehumidifier/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@

from __future__ import annotations

from datetime import datetime

from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.entity import DeviceInfo, Entity
from homeassistant.helpers.event import async_call_later
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from libdeye.cloud_api import (
DeyeCloudApi,
DeyeCloudApiCannotConnectError,
DeyeCloudApiInvalidAuthError,
)
from libdeye.const import QUERY_DEVICE_STATE_COMMAND
from libdeye.device_state_command import DeyeDeviceCommand, DeyeDeviceState
from libdeye.device_state_command import DeyeDeviceState
from libdeye.mqtt_client import DeyeMqttClient
from libdeye.types import DeyeApiResponseDeviceInfo

Expand All @@ -26,11 +23,13 @@
CONF_PASSWORD,
CONF_USERNAME,
DATA_CLOUD_API,
DATA_COORDINATOR,
DATA_DEVICE_LIST,
DATA_MQTT_CLIENT,
DOMAIN,
MANUFACTURER,
)
from .data_coordinator import DeyeDataUpdateCoordinator

PLATFORMS: list[Platform] = [
Platform.HUMIDIFIER,
Expand Down Expand Up @@ -72,6 +71,13 @@ def on_auth_token_refreshed(auth_token: str) -> None:
await cloud_api.get_device_list(),
)
)
for device in device_list:
coordinator = DeyeDataUpdateCoordinator(
hass, device, mqtt_client, cloud_api
)
device[DATA_COORDINATOR] = coordinator
await device[DATA_COORDINATOR].async_config_entry_first_refresh()

except DeyeCloudApiInvalidAuthError as err:
raise ConfigEntryAuthFailed from err
except DeyeCloudApiCannotConnectError as err:
Expand Down Expand Up @@ -99,7 +105,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return unload_ok


class DeyeEntity(Entity):
class DeyeEntity(CoordinatorEntity, Entity):
"""Initiate Deye Base Class."""

def __init__(
Expand All @@ -109,11 +115,13 @@ def __init__(
cloud_api: DeyeCloudApi,
) -> None:
"""Initialize the instance."""
self.coordinator = device[DATA_COORDINATOR]
super().__init__(self.coordinator)
self._device = device
self._mqtt_client = mqtt_client
self._cloud_api = cloud_api
self._attr_has_entity_name = True
self._attr_available = self._device["online"]
self._device_available = self._device["online"]
self._attr_unique_id = self._device["mac"]
self.entity_id_base = f'deye_{self._device["mac"].lower()}' # We will override HA generated entity ID
self._attr_device_info = DeviceInfo(
Expand All @@ -123,92 +131,33 @@ def __init__(
name=self._device["device_name"],
)
self._attr_should_poll = False
self.subscription_muted: CALLBACK_TYPE | None = None
# payload from the server sometimes are not a valid string
if isinstance(self._device["payload"], str):
self.device_state = DeyeDeviceState(self._device["payload"])
else:
self.device_state = DeyeDeviceState(
"1411000000370000000000000000003C3C0000000000" # 20°C/60%RH as the default state
)
remove_handle = self.coordinator.async_add_listener(
self._handle_coordinator_update
)
self.async_on_remove(remove_handle)

def update_device_availability(self, available: bool) -> None:
"""Will be called when received new availability status."""
if self.subscription_muted:
return
self._attr_available = available
self.async_write_ha_state()

def update_device_state(self, state: DeyeDeviceState) -> None:
"""Will be called when received new DeyeDeviceState."""
if self.subscription_muted:
return
self.device_state = state
async def publish_command_async(self, attribute, value):
"""Push command to a queue and deal with them together."""
self.async_write_ha_state()
self.hass.bus.fire(
"call_humidifier_method", {"prop": attribute, "value": value}
)
await self.coordinator.async_request_refresh()

async def async_added_to_hass(self) -> None:
"""When entity is added to Home Assistant."""
if self._device["platform"] == 1:
self.async_on_remove(
self._mqtt_client.subscribe_availability_change(
self._device["product_id"],
self._device["device_id"],
self.update_device_availability,
)
)
self.async_on_remove(
self._mqtt_client.subscribe_state_change(
self._device["product_id"],
self._device["device_id"],
self.update_device_state,
)
)

await self.poll_device_state()
self.async_on_remove(self.cancel_polling)
@property
def available(self):
return self._device_available

@callback
async def poll_device_state(self, now: datetime | None = None) -> None:
"""
Some Deye devices have a very long heartbeat period. So polling is still necessary to get the latest state as
quickly as possible.
"""
if self._device["platform"] == 1:
self._mqtt_client.publish_command(
self._device["product_id"],
self._device["device_id"],
QUERY_DEVICE_STATE_COMMAND,
)
elif self._device["platform"] == 2:
state = DeyeDeviceState(
await self._cloud_api.get_fog_platform_device_properties(
self._device["device_id"]
)
)
self.update_device_state(state)
self.cancel_polling = async_call_later(self.hass, 10, self.poll_device_state)

def mute_subscription_for_a_while(self) -> None:
"""Mute subscription for a while to avoid state bouncing."""
if self.subscription_muted:
self.subscription_muted()

@callback
def unmute(now: datetime) -> None:
self.subscription_muted = None

self.subscription_muted = async_call_later(self.hass, 10, unmute)

async def publish_command(self, command: DeyeDeviceCommand) -> None:
if self._device["platform"] == 1:
"""Publish a MQTT command to this device."""
self._mqtt_client.publish_command(
self._device["product_id"], self._device["device_id"], command.bytes()
)
elif self._device["platform"] == 2:
"""Publish a MQTT command to this device."""
await self._cloud_api.set_fog_platform_device_properties(
self._device["device_id"], command.json()
)
self.async_write_ha_state()
self.mute_subscription_for_a_while()
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self.device_state = self.coordinator.data
self._device_available = self.coordinator.device_available
super()._handle_coordinator_update()
1 change: 1 addition & 0 deletions custom_components/deye_dehumidifier/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
DATA_CLOUD_API = "cloud_api"
DATA_MQTT_CLIENT = "mqtt_client"
DATA_DEVICE_LIST = "device_list"
DATA_COORDINATOR = "coordinator"
MANUFACTURER = "Ningbo Deye Technology Co., Ltd"
106 changes: 106 additions & 0 deletions custom_components/deye_dehumidifier/data_coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import asyncio
import logging
from datetime import datetime, timedelta

from homeassistant.core import CALLBACK_TYPE, callback
from homeassistant.helpers.event import async_call_later
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from libdeye.const import QUERY_DEVICE_STATE_COMMAND
from libdeye.device_state_command import DeyeDeviceState

_LOGGER = logging.getLogger(__name__)


class DeyeDataUpdateCoordinator(DataUpdateCoordinator):
def __init__(self, hass, device, mqtt_client, cloud_api):
super().__init__(
hass,
_LOGGER,
name="deye_data_update_coordinator",
update_method=self.poll_device_state,
update_interval=timedelta(seconds=10),
)
self._mqtt_client = mqtt_client
self._cloud_api = cloud_api
self.subscription_muted: CALLBACK_TYPE | None = None

self.data = DeyeDeviceState(
"1411000000370000000000000000003C3C0000000000" # 20°C/60%RH as the default state
)
self._device = device
self.device_available = self._device["online"]
"""When entity is added to Home Assistant."""
if self._device["platform"] == 1:
# self._mqtt_client.subscribe_availability_change(
# self._device["product_id"],
# self._device["device_id"],
# self.update_device_availability,
# )
self._mqtt_client.subscribe_state_change(
self._device["product_id"],
self._device["device_id"],
self.update_device_state,
)

self.receive_queue = asyncio.Queue()
self.device_available_queue = asyncio.Queue()

def mute_subscription_for_a_while(self) -> None:
"""Mute subscription for a while to avoid state bouncing."""
if self.subscription_muted:
self.subscription_muted()

@callback
def unmute(now: datetime) -> None:
self.subscription_muted = None

self.subscription_muted = async_call_later(self.hass, 20, unmute)

def update_device_state(self, state: DeyeDeviceState) -> None:
"""Will be called when received new DeyeDeviceState."""
self.receive_queue.put_nowait(state)
# self.async_set_updated_data(state)

async def async_request_refresh(self) -> None:
self.mute_subscription_for_a_while()
await super().async_request_refresh()

async def poll_device_state(self) -> DeyeDeviceState:
"""
Some Deye devices have a very long heartbeat period. So polling is still necessary to get the latest state as
quickly as possible.
"""
# _LOGGER.error("poll_device_state called: " + str(self._device["product_id"]))
if self.subscription_muted:
return self.data

device_list = list(
filter(
lambda d: d["product_type"] == "dehumidifier"
and d["device_id"] == self._device["device_id"],
await self._cloud_api.get_device_list(),
)
)
if len(device_list) > 0:
device = device_list[0]
self.device_available = device["online"]

if self._device["platform"] == 1:
self._mqtt_client.publish_command(
self._device["product_id"],
self._device["device_id"],
QUERY_DEVICE_STATE_COMMAND,
)
response = await asyncio.wait_for(
self.receive_queue.get(), timeout=10
) # 设置超时时间
# _LOGGER.error(response.to_command().json())
return response
elif self._device["platform"] == 2:
response = DeyeDeviceState(
await self._cloud_api.get_fog_platform_device_properties(
self._device["device_id"]
)
)
# _LOGGER.error(response.to_command().json())
return response
23 changes: 15 additions & 8 deletions custom_components/deye_dehumidifier/fan.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def __init__(
self.entity_id = f"fan.{self.entity_id_base}_fan"
feature_config = get_product_feature_config(device["product_id"])
self._attr_supported_features = FanEntityFeature.SET_SPEED
if hasattr(FanEntityFeature, "TURN_ON"): # v2024.8
self._attr_supported_features |= FanEntityFeature.TURN_ON
if hasattr(FanEntityFeature, "TURN_OFF"):
self._attr_supported_features |= FanEntityFeature.TURN_OFF
if feature_config["oscillating"]:
self._attr_supported_features |= FanEntityFeature.OSCILLATE
self._named_fan_speeds = feature_config["fan_speed"]
Expand Down Expand Up @@ -82,16 +86,17 @@ def percentage(self) -> int:
async def async_oscillate(self, oscillating: bool) -> None:
"""Oscillate the fan."""
self.device_state.oscillating_switch = oscillating
await self.publish_command(self.device_state.to_command())
await self.publish_command_async("oscillating_switch", oscillating)

async def async_set_percentage(self, percentage: int) -> None:
"""Set the speed of the fan, as a percentage."""
if percentage == 0:
await self.async_turn_off()
self.device_state.fan_speed = percentage_to_ordered_list_item(
self._named_fan_speeds, percentage
fan_speed = int(
percentage_to_ordered_list_item(self._named_fan_speeds, percentage)
)
await self.publish_command(self.device_state.to_command())
self.device_state.fan_speed = fan_speed
await self.publish_command_async("fan_speed", fan_speed)

async def async_turn_on(
self,
Expand All @@ -101,13 +106,15 @@ async def async_turn_on(
) -> None:
"""Turn on the fan."""
self.device_state.power_switch = True
await self.publish_command_async("power_switch", True)
if percentage is not None:
self.device_state.fan_speed = percentage_to_ordered_list_item(
self._named_fan_speeds, percentage
fan_speed = int(
percentage_to_ordered_list_item(self._named_fan_speeds, percentage)
)
await self.publish_command(self.device_state.to_command())
self.device_state.fan_speed = fan_speed
await self.publish_command_async("fan_speed", fan_speed)

async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the entity off."""
self.device_state.power_switch = False
await self.publish_command(self.device_state.to_command())
await self.publish_command_async("power_switch", False)
Loading
Loading