diff --git a/.devcontainer/configuration.yaml b/.devcontainer/configuration.yaml index d3eb9e5..79c1f92 100644 --- a/.devcontainer/configuration.yaml +++ b/.devcontainer/configuration.yaml @@ -4,5 +4,7 @@ logger: default: info logs: custom_components.duke_energy_gateway: debug + pyduke_energy.client: debug + pyduke_energy.realtime: debug # If you need to debug uncommment the line below (doc: https://www.home-assistant.io/integrations/debugpy/) # debugpy: diff --git a/README.md b/README.md index 35229e1..3b47e37 100644 --- a/README.md +++ b/README.md @@ -12,17 +12,27 @@ [![Project Maintenance][maintenance-shield]][user_profile] [![BuyMeCoffee][buymecoffeebadge]][buymecoffee] -This is a custom integration for [Home Assistant](https://www.home-assistant.io/). It pulls near-real-time energy usage from Duke Energy via the Duke Energy Gateway pilot program. +This is a custom integration for [Home Assistant](https://www.home-assistant.io/). It pulls real-time energy usage from Duke Energy via the Duke Energy Gateway pilot program. -This component will set up the following entities. +This integration leverages the [`pyduke-energy`](https://github.com/mjmeli/pyduke-energy) library, also written by me, to pull data. This API is _very_ unofficial and may stop working at any time (see [Disclaimer](https://github.com/mjmeli/pyduke-energy#Disclaimer)). Also, you are required to have a Duke Energy Gateway connected to your smartmeter for this to work. This integration does not support any other method of retrieving data (see [Gateway Requirement](https://github.com/mjmeli/pyduke-energy#gateway-requirement)). -| Platform | Description | -| ------------------------------------ | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| `sensor.duke_energy_usage_today_kwh` | Represents today's energy consumption (from 0:00 local time to 23:59 local time, then resetting). Additional attributes are available containing the meter ID, gateway ID, and the timestamp of the last measurement. | +## Sensors -This integration leverages the [`pyduke-energy`](https://github.com/mjmeli/pyduke-energy) library, also written by me, to pull data. This API is _very_ unofficial and may stop working at any time (see [Disclaimer](https://github.com/mjmeli/pyduke-energy#Disclaimer)). Also, you are required to have a Duke Energy Gateway connected to your smartmeter for this to work. This integration does not support any other method of retrieving data (see [Gateway Requirement](https://github.com/mjmeli/pyduke-energy#gateway-requirement)). +This component will set up the following entities: + +### `sensor.duke_energy_current_usage_w` + +- Represents the real-time _power_ usage in watts. +- This data is pushed from the gateway device every 1-3 seconds. _NOTE:_ This produces a lot of data. If this update interval is too frequent for you, you can configure a throttling interval in seconds (see [Configuration](#Configuration) below). +- Note that since this is power usage, it cannot be used as-is for the Home Assistant energy dashboard. Instead, you can use the `sensor.duke_energy_usage_today_kwh` sensor, or you need to feed this real-time sensor through the [Riemann sum integral integration](https://www.home-assistant.io/integrations/integration/). +- Additional attributes are available containing the meter ID and gateway ID. -Energy usage will be provided as _daily_ data, resetting at midnight local time. At the moment, the API appears to be limited to providing new records every 15 minutes, meaning readings could be delayed up to 15 minutes. For more information, see [limitations](https://github.com/mjmeli/pyduke-energy#limitations) in the `pyduke-energy` repo. +### `sensor.duke_energy_usage_today_kwh` + +- Represents today's _energy_ consumption in kilowatt-hours (from 0:00 local time to 23:59 local time, then resetting). +- This data is polled every 60 seconds, but data may be delayed up to 15 minutes due to delays in Duke Energy reporting it (see [Limitations](https://github.com/mjmeli/pyduke-energy#Limitations) in the `pyduke-energy` repo.). +- This can be used as-is for the Home Assistant energy consumption dashboard. +- Additional attributes are available containing the meter ID, gateway ID, and the timestamp of the last measurement. ## Installation @@ -42,19 +52,40 @@ Energy usage will be provided as _daily_ data, resetting at midnight local time. 6. Restart Home Assistant 7. In the HA UI go to "Configuration" -> "Integrations" click "+" and search for "Duke Energy Gateway" -## Configuration is done in the UI +## Configuration -Configuration will be done in the UI. You will need to provide the following data: +Configuration will be done in the UI. Initially, you will need to provide the following data: | Data | Description | | ---------- | ----------------------------------- | | `email` | Your login email to Duke Energy. | | `password` | Your login password to Duke Energy. | +After the integration is setup, you will be able to do further configuration by clicking "Configure" on the integration page. This will allow you to modify the following options: + +| Data | Description | +| --------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `Real-time Usage Update Interval (sec)` | By default, the real-time usage sensor will be updated any time a reading comes in. If this data is too frequent, you can configure this value to throttle the data. When set to a positive integer `X`, the sensor will only be updated once every `X` seconds. In other words, if set to 30, you will get a new real-time usage every ~30 seconds. | + ### Meter Selection The configuration flow will automatically attempt to identify your gateway and smartmeter. Right now, only one is supported per account. The first one identified will be used. If one cannot be found, the configuration process should fail. +If your meter selection fails, a first step should be to enable logging for the component (see [Logging](#Logging)). If this does not give insight into the problem, please open a GitHub issue. + +### Logging + +If you run into any issues and want to look into the logs, this integration provides verbose logging at the debug level. That can be enabled by adding the following to your `configuration.yaml` file. + +```yaml +logger: + default: info + logs: + custom_components.duke_energy_gateway: debug + pyduke_energy.client: debug + pyduke_energy.realtime: debug +``` + ## Development I suggest using the dev container for development by opening in Visual Studio Code with `code .` and clicking on the option to re-open with dev container. In VS Code, you can run the task "Run Home Assistant on the port 9123" and then access it via http://localhost:9123. @@ -63,6 +94,14 @@ If you want to install manually, you can install dev dependencies with `pip inst Before commiting, run `pre-commit run --all-files`. +### Working With In Development `pyduke-energy` Versions + +If you are working on implementing new changes from `pyduke-energy` but do not want to release version of that library, you can set up your development environment to install from a remote working branch. + +1. Update [`requirements_dev.txt`](requirements_dev.txt) to replace the `main` in `git+https://github.com/mjmeli/pyduke-energy@main` with your working branch and update the username if you have a fork (e.g. `git+https://github.com/notmjmeli/pyduke-energy@new-feature-dev-branch`) +2. Uninstall locally cached version of `pyduke-energy`: `pip uninstall -y pyduke-energy` +3. Re-run requirements installation: `pip install -r requirements_dev.txt` + ## Contributions are welcome! If you want to contribute to this please read the [Contribution guidelines](CONTRIBUTING.md) diff --git a/custom_components/duke_energy_gateway/__init__.py b/custom_components/duke_energy_gateway/__init__.py index 8732cf5..92ea066 100644 --- a/custom_components/duke_energy_gateway/__init__.py +++ b/custom_components/duke_energy_gateway/__init__.py @@ -13,23 +13,22 @@ from homeassistant.core import HomeAssistant from homeassistant.exceptions import ConfigEntryNotReady from homeassistant.helpers.aiohttp_client import async_get_clientsession -from homeassistant.helpers.update_coordinator import DataUpdateCoordinator -from homeassistant.helpers.update_coordinator import UpdateFailed -from homeassistant.util import dt from pyduke_energy.client import DukeEnergyClient +from pyduke_energy.realtime import DukeEnergyRealtime from .const import CONF_EMAIL from .const import CONF_PASSWORD +from .const import CONF_REALTIME_INTERVAL +from .const import CONF_REALTIME_INTERVAL_DEFAULT_SEC from .const import DOMAIN from .const import PLATFORMS from .const import STARTUP_MESSAGE - -SCAN_INTERVAL = timedelta(seconds=60) +from .coordinator import DukeEnergyGatewayUsageDataUpdateCoordinator _LOGGER: logging.Logger = logging.getLogger(__package__) -async def async_setup(hass: HomeAssistant, config: Config): +async def async_setup(_hass: HomeAssistant, _config: Config): """Set up this integration using YAML is not supported.""" return True @@ -42,22 +41,31 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry): email = entry.data.get(CONF_EMAIL) password = entry.data.get(CONF_PASSWORD) + realtime_interval = entry.options.get( + CONF_REALTIME_INTERVAL, CONF_REALTIME_INTERVAL_DEFAULT_SEC + ) session = async_get_clientsession(hass) client = DukeEnergyClient(email, password, session) - _LOGGER.debug("Setup Duke Energy API client") + realtime = DukeEnergyRealtime(client) + _LOGGER.debug("Set up Duke Energy API clients") - # Try to find the meter that is used for the gateway - selected_meter, selected_gateway = await find_meter_with_gateway(client) + # Find the meter that is used for the gateway + selected_meter, selected_gateway = await client.select_default_meter() # If no meter was found, we raise an error - if not selected_meter or not selected_gateway: + if not selected_meter: _LOGGER.error( "Could not identify a smart meter on your account with gateway access." ) return False - coordinator = DukeEnergyGatewayUsageDataUpdateCoordinator(hass, client=client) + coordinator = DukeEnergyGatewayUsageDataUpdateCoordinator( + hass, + client=client, + realtime=realtime, + realtime_interval=timedelta(seconds=realtime_interval), + ) await coordinator.async_refresh() if not coordinator.last_update_success: @@ -76,93 +84,16 @@ async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry): hass.config_entries.async_forward_entry_setup(entry, platform) ) - entry.add_update_listener(async_reload_entry) + entry.async_on_unload(entry.add_update_listener(async_reload_entry)) return True -async def find_meter_with_gateway(client: DukeEnergyClient): - """Find the meter that is used for the gateway by iterating through the accounts and meters.""" - account_list = await client.get_account_list() - account_numbers_text = ",".join([f"'{a.src_acct_id}'" for a in account_list]) - _LOGGER.debug( - f"Accounts to check for gateway ({len(account_list)}): {account_numbers_text}" - ) - for account in account_list: - try: - _LOGGER.debug(f"Checking account '{account.src_acct_id}' for gateway") - account_details = await client.get_account_details(account) - serial_numbers_text = ",".join( - [f"'{m.serial_num}'" for m in account_details.meter_infos] - ) - _LOGGER.debug( - f"Meters to check for gateway ({len(account_details.meter_infos)}): {serial_numbers_text}" - ) - for meter in account_details.meter_infos: - try: - _LOGGER.debug( - f"Checking meter '{meter.serial_num}' for gateway [meter_type={meter.meter_type}, is_certified_smart_meter={meter.is_certified_smart_meter}]" - ) - if ( - meter.serial_num - and meter.meter_type.upper() # sometimes blank meters show up - == "ELECTRIC" - and meter.is_certified_smart_meter - ): - client.select_meter(meter) - gw_status = await client.get_gateway_status() - if gw_status is not None: - _LOGGER.debug( - f"Found meter '{meter.serial_num}' with gateway '{gw_status.id}'" - ) - return meter, gw_status - else: - _LOGGER.debug( - f"No gateway status for meter '{meter.serial_num}'" - ) - except Exception as e: - # Try the next meter if anything fails above - _LOGGER.debug( - f"Failed to check meter '{meter.serial_num}' on account '{account.src_acct_id}': {e}" - ) - pass - except Exception as e: - # Try the next account if anything fails above - _LOGGER.debug( - f"Failed to find meter on account '{account.src_acct_id}': {e}" - ) - pass - return None, None - - -class DukeEnergyGatewayUsageDataUpdateCoordinator(DataUpdateCoordinator): - """Class to manage fetching usage data from the API.""" - - def __init__( - self, - hass: HomeAssistant, - client: DukeEnergyClient, - ) -> None: - """Initialize.""" - self.api = client - self.platforms = [] - - super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=SCAN_INTERVAL) - - async def _async_update_data(self): - """Update data via library to get last day of minute-by-minute usage data.""" - try: - today_start = dt.start_of_local_day() - today_end = today_start + timedelta(days=1) - return await self.api.get_gateway_usage(today_start, today_end) - except Exception as exception: - raise UpdateFailed( - f"Error communicating with Duke Energy Usage API: {exception}" - ) from exception - - async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: """Handle removal of an entry.""" - coordinator = hass.data[DOMAIN][entry.entry_id]["coordinator"] + coordinator: DukeEnergyGatewayUsageDataUpdateCoordinator = hass.data[DOMAIN][ + entry.entry_id + ]["coordinator"] + unloaded = all( await asyncio.gather( *[ @@ -175,6 +106,11 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: if unloaded: hass.data[DOMAIN].pop(entry.entry_id) + # Cleanup real-time stream if it wasn't already done so (it should be done by the sensor entity) + _LOGGER.debug("Checking for clean-up of real-time stream in async_unload_entry") + coordinator.realtime_cancel() + coordinator.async_realtime_unsubscribe_all_from_dispatcher() + return unloaded diff --git a/custom_components/duke_energy_gateway/config_flow.py b/custom_components/duke_energy_gateway/config_flow.py index ec67a2a..5fb0cdc 100644 --- a/custom_components/duke_energy_gateway/config_flow.py +++ b/custom_components/duke_energy_gateway/config_flow.py @@ -7,8 +7,9 @@ from .const import CONF_EMAIL from .const import CONF_PASSWORD +from .const import CONF_REALTIME_INTERVAL +from .const import CONF_REALTIME_INTERVAL_DEFAULT_SEC from .const import DOMAIN -from .const import PLATFORMS class DukeEnergyGatewayFlowHandler(config_entries.ConfigFlow, domain=DOMAIN): @@ -89,18 +90,29 @@ async def async_step_user(self, user_input=None): self.options.update(user_input) return await self._update_options() + realtime_interval = self.options.get( + CONF_REALTIME_INTERVAL, CONF_REALTIME_INTERVAL_DEFAULT_SEC + ) + return self.async_show_form( step_id="user", data_schema=vol.Schema( { - vol.Required(x, default=self.options.get(x, True)): bool - for x in sorted(PLATFORMS) + vol.Required( + CONF_REALTIME_INTERVAL, + default=realtime_interval, + ): int, } ), ) async def _update_options(self): """Update config entry options.""" + update_interval = self.options.get( + CONF_REALTIME_INTERVAL, CONF_REALTIME_INTERVAL_DEFAULT_SEC + ) + if update_interval < 0: + return self.async_abort(reason="invalid_update_interval_value") return self.async_create_entry( title=self.config_entry.data.get(CONF_EMAIL), data=self.options ) diff --git a/custom_components/duke_energy_gateway/const.py b/custom_components/duke_energy_gateway/const.py index 7598368..e095e0c 100644 --- a/custom_components/duke_energy_gateway/const.py +++ b/custom_components/duke_energy_gateway/const.py @@ -16,10 +16,14 @@ CONF_ENABLED = "enabled" CONF_EMAIL = "email" CONF_PASSWORD = "password" +CONF_REALTIME_INTERVAL = "realtimeInterval" +CONF_REALTIME_INTERVAL_DEFAULT_SEC = 0 # no throttling # Defaults DEFAULT_NAME = DOMAIN +REALTIME_DISPATCH_SIGNAL = f"{DOMAIN}_realtime_dispatch_signal" + STARTUP_MESSAGE = f""" ------------------------------------------------------------------- {NAME} diff --git a/custom_components/duke_energy_gateway/coordinator.py b/custom_components/duke_energy_gateway/coordinator.py new file mode 100644 index 0000000..7588b78 --- /dev/null +++ b/custom_components/duke_energy_gateway/coordinator.py @@ -0,0 +1,140 @@ +"""Data update coordinator for Duke Energy Gateway entities.""" +import asyncio +import logging +from asyncio.tasks import Task +from datetime import datetime +from datetime import timedelta +from typing import Any +from typing import Callable + +from homeassistant.core import DOMAIN +from homeassistant.core import HomeAssistant +from homeassistant.helpers.dispatcher import async_dispatcher_connect +from homeassistant.helpers.dispatcher import dispatcher_send +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator +from homeassistant.helpers.update_coordinator import UpdateFailed +from homeassistant.util import dt +from pyduke_energy.client import DukeEnergyClient +from pyduke_energy.realtime import DukeEnergyRealtime +from pyduke_energy.types import RealtimeUsageMeasurement + +from .const import REALTIME_DISPATCH_SIGNAL + +SCAN_INTERVAL = timedelta(seconds=60) + +_LOGGER: logging.Logger = logging.getLogger(__package__) + + +class DukeEnergyGatewayUsageDataUpdateCoordinator(DataUpdateCoordinator): + """Class to manage fetching usage data from the API.""" + + def __init__( + self, + hass: HomeAssistant, + client: DukeEnergyClient, + realtime: DukeEnergyRealtime, + realtime_interval: timedelta, + ) -> None: + """Initialize.""" + self.client = client + self.realtime = realtime + self.realtime_interval = realtime_interval + self.realtime_next_send = datetime.utcnow() + self.realtime_task: Task = None + self.async_realtime_remove_subscriber_funcs_by_source: dict[ + str, Callable[[], None] + ] = {} + self.platforms = [] + + super().__init__(hass, _LOGGER, name=DOMAIN, update_interval=SCAN_INTERVAL) + + async def _async_update_data(self): + """Update data via library to get last day of minute-by-minute usage data.""" + try: + today_start = dt.start_of_local_day() + today_end = today_start + timedelta(days=1) + return await self.client.get_gateway_usage(today_start, today_end) + except Exception as exception: + raise UpdateFailed( + f"Error communicating with Duke Energy Usage API: {exception}" + ) from exception + + def realtime_initialize(self): + """Setup callbacks, connect, and subscribe to the real-time usage MQTT stream.""" + try: + self.realtime.on_message = self._realtime_on_message + self.realtime_task = asyncio.create_task( + self.realtime.connect_and_subscribe_forever() + ) + _LOGGER.debug("Triggered real-time connect/subscribe async task") + except Exception as exception: + _LOGGER.error( + "Failure trying to connect and subscribe to real-time usage: %s", + exception, + ) + raise + + def realtime_cancel(self): + """Cancel the real-time usage MQTT stream, which will unsubscribe.""" + if self.realtime_task: + self.realtime_task.cancel() + self.realtime_task = None + _LOGGER.debug("Cancelled real-time async task") + + def _realtime_on_message(self, msg): + """Handler for the real-time usage MQTT messages.""" + try: + measurement = self.realtime.msg_to_usage_measurement(msg) + except (ValueError, TypeError) as exception: + _LOGGER.error( + "Error while parsing real-time usage message: %s [Message='%s']", + exception, + msg.payload.decode("utf8"), + ) + return + + if measurement: + # Throttle sending calls to reduce amount of data bneing produced. + should_send = ( + self.realtime_interval is None + or self.realtime_next_send is None + or datetime.utcnow() >= self.realtime_next_send + ) + if should_send: + self.realtime_next_send = datetime.utcnow() + self.realtime_interval + dispatcher_send(self.hass, REALTIME_DISPATCH_SIGNAL, measurement) + else: + _LOGGER.debug( + "Ignoring real-time update as still in throttling interval" + ) + + def async_realtime_subscribe_to_dispatcher( + self, source: str, target: Callable[[RealtimeUsageMeasurement], Any] + ): + """Setup a subscriber to receive new real-time measurements.""" + # If this source is already subscribed, re-use the existing one + if source in self.async_realtime_remove_subscriber_funcs_by_source: + _LOGGER.warning( + "Attempting to subscribe to dispatcher by source that is already subscribed: %s", + source, + ) + return + + # Connect function returns a function that removes the subscription. Save for later. + self.async_realtime_remove_subscriber_funcs_by_source[ + source + ] = async_dispatcher_connect(self.hass, REALTIME_DISPATCH_SIGNAL, target) + _LOGGER.debug("Subscribed target for %s to dispatcher", source) + + def async_realtime_unsubscribe_from_dispatcher(self, source: str): + """Remove a subscriber from the dispatch.""" + if source in self.async_realtime_remove_subscriber_funcs_by_source: + _LOGGER.debug("Removing subscribers to dispatcher for %s", source) + self.async_realtime_remove_subscriber_funcs_by_source[source]() + self.async_realtime_remove_subscriber_funcs_by_source.pop(source) + + def async_realtime_unsubscribe_all_from_dispatcher(self): + """Remove all subscribers from the dispatch.""" + sources = self.async_realtime_remove_subscriber_funcs_by_source.keys() + for source in sources: + self.async_realtime_unsubscribe_from_dispatcher(source) diff --git a/custom_components/duke_energy_gateway/entity.py b/custom_components/duke_energy_gateway/entity.py index a13ce14..5a0caed 100644 --- a/custom_components/duke_energy_gateway/entity.py +++ b/custom_components/duke_energy_gateway/entity.py @@ -7,18 +7,20 @@ from .const import DOMAIN from .const import NAME from .const import VERSION +from .coordinator import DukeEnergyGatewayUsageDataUpdateCoordinator class DukeEnergyGatewayEntity(CoordinatorEntity): def __init__( self, - coordinator, + coordinator: DukeEnergyGatewayUsageDataUpdateCoordinator, config_entry, entity_id: str, meter: MeterInfo, gateway: GatewayStatus, ): super().__init__(coordinator) + self._coordinator = coordinator self._config_entry = config_entry self._entity_id = entity_id self._meter = meter @@ -32,7 +34,7 @@ def unique_id(self): @property def device_info(self): return { - "identifiers": {(DOMAIN, self.unique_id)}, + "identifiers": {(DOMAIN, self._gateway.id)}, "name": f"{NAME} {self._gateway.id}", "model": VERSION, "manufacturer": NAME, diff --git a/custom_components/duke_energy_gateway/manifest.json b/custom_components/duke_energy_gateway/manifest.json index f33aee3..63fb6c9 100644 --- a/custom_components/duke_energy_gateway/manifest.json +++ b/custom_components/duke_energy_gateway/manifest.json @@ -1,12 +1,12 @@ { "domain": "duke_energy_gateway", "name": "Duke Energy Gateway", - "version": "0.0.11", + "version": "0.1.0b9", "documentation": "https://github.com/mjmeli/ha-duke-energy-gateway", "issue_tracker": "https://github.com/mjmeli/ha-duke-energy-gateway/issues", "dependencies": [], "config_flow": true, "codeowners": ["@mjmeli"], - "requirements": ["pyduke-energy==0.0.15"], + "requirements": ["pyduke-energy==1.0.0"], "iot_class": "cloud_polling" } diff --git a/custom_components/duke_energy_gateway/sensor.py b/custom_components/duke_energy_gateway/sensor.py index 0ae80a8..cdd71ac 100644 --- a/custom_components/duke_energy_gateway/sensor.py +++ b/custom_components/duke_energy_gateway/sensor.py @@ -1,31 +1,27 @@ """Sensor platform for Duke Energy Gateway.""" +import logging +from abc import ABC +from abc import abstractmethod +from dataclasses import dataclass + from homeassistant.components.sensor import SensorEntity +from homeassistant.components.sensor import STATE_CLASS_MEASUREMENT from homeassistant.components.sensor import STATE_CLASS_TOTAL_INCREASING +from homeassistant.helpers import device_registry from homeassistant.util import dt from pyduke_energy.types import GatewayStatus from pyduke_energy.types import MeterInfo +from pyduke_energy.types import RealtimeUsageMeasurement from pyduke_energy.types import UsageMeasurement from .const import DOMAIN +from .coordinator import DukeEnergyGatewayUsageDataUpdateCoordinator from .entity import DukeEnergyGatewayEntity +_LOGGER: logging.Logger = logging.getLogger(__package__) -class _Sensor: - def __init__(self, entity_id, name, unit, icon, device_class): - self.entity_id = entity_id - self.name = name - self.unit = unit - self.icon = icon - self.device_class = device_class - - -# Sensor are defined like: Name, Unit, icon, device class -SENSORS = [ - _Sensor("usage_today_kwh", "Usage Today [kWh]", "kWh", "mdi:flash", "energy") -] - -async def async_setup_entry(hass, entry, async_add_devices): +async def async_setup_entry(hass, entry, async_add_entities): """Setup sensor platform.""" data = hass.data[DOMAIN][entry.entry_id] @@ -41,80 +37,138 @@ async def async_setup_entry(hass, entry, async_add_devices): if not gateway: return False - sensors = [] - for sensor in SENSORS: - sensors.append( - DukeEnergyGatewaySensor(coordinator, entry, sensor, meter, gateway) + # Prior to v1.0, the gateway device had the entity ID in its unique identifier. + # This causes issues when we started adding multiple entities (creates multiple devices). + # To fix this, we will check if the old device exists and update its identifier. + registry = device_registry.async_get(hass) + device_to_update = registry.async_get_device( + identifiers={(DOMAIN, "duke_energy_usage_today_kwh")}, connections=set() + ) + if device_to_update: + _LOGGER.info( + "Correcting Duke Energy Gateway Device %s unique identifier after 1.0 update", + gateway.id, ) + registry.async_update_device( + device_to_update.id, new_identifiers={(DOMAIN, gateway.id)} + ) + + sensors = [] + + # Total usage today sensor + sensors.append(_TotalUsageTodaySensor(coordinator, entry, meter, gateway)) - async_add_devices(sensors) + # Real-time usage sensor + sensors.append(_RealtimeUsageSensor(coordinator, entry, meter, gateway)) + async_add_entities(sensors) -class DukeEnergyGatewaySensor(DukeEnergyGatewayEntity, SensorEntity): + +@dataclass +class _SensorMetadata: + entity_id: str + name: str + unit: str + icon: str + device_class: str + state_class: str + should_poll: bool + + +class DukeEnergyGatewaySensor(DukeEnergyGatewayEntity, SensorEntity, ABC): """duke_energy_gateway Sensor class.""" def __init__( self, - coordinator, + coordinator: DukeEnergyGatewayUsageDataUpdateCoordinator, entry, - sensor: _Sensor, meter: MeterInfo, gateway: GatewayStatus, ): """Initialize the sensor.""" + self._state = None + self._sensor_metadata = self.get_sensor_metadata() super().__init__( coordinator, entry, - sensor.entity_id, + self._sensor_metadata.entity_id, meter, gateway, ) - self._sensor = sensor - self._meter = meter + + @staticmethod + @abstractmethod + def get_sensor_metadata() -> _SensorMetadata: + """Get the sensor metadata for this sensor. Override in base class.""" + return None @property def name(self): """Return the name of the sensor.""" - return f"Duke Energy {self._sensor.name}" + return f"Duke Energy {self._sensor_metadata.name}" @property def state(self): - """Return the state of the sensor.""" - # Currently there is only one sensor so this works. If we add more then we will need to handle this better. - gw_usage: list[UsageMeasurement] = self.coordinator.data - if gw_usage and len(gw_usage) > 0: - today_usage = sum(x.usage for x in gw_usage) / 1000 - else: - today_usage = 0 - return today_usage + """By default, current state is stored in the _state instance variable.""" + self.update() + return self._state + + def update(self): + """Override this to update the _state instance variable for a state update.""" + + @property + def should_poll(self) -> bool: + return self._sensor_metadata.should_poll @property def unit_of_measurement(self): """Return the unit of measurement.""" - return self._sensor.unit + return self._sensor_metadata.unit @property def icon(self): """Return the icon of the sensor.""" - return self._sensor.icon + return self._sensor_metadata.icon @property def device_class(self): """Return the device class of the sensor.""" - return self._sensor.device_class + return self._sensor_metadata.device_class @property def state_class(self): """Return the state class of the sensor""" - return STATE_CLASS_TOTAL_INCREASING + return self._sensor_metadata.state_class + + +class _TotalUsageTodaySensor(DukeEnergyGatewaySensor): + @staticmethod + def get_sensor_metadata() -> _SensorMetadata: + return _SensorMetadata( + "usage_today_kwh", + "Usage Today [kWh]", + "kWh", + "mdi:flash", + "energy", + STATE_CLASS_TOTAL_INCREASING, + True, + ) + + def update(self): + """Return today's usage by summing all measurements.""" + gw_usage: list[UsageMeasurement] = self._coordinator.data + if gw_usage and len(gw_usage) > 0: + today_usage = round(sum(x.usage for x in gw_usage) / 1000, 5) + else: + today_usage = 0 + self._state = today_usage @property def extra_state_attributes(self): - """Return the state attributes.""" + """Record the timestamp of the last measurement into state attributes.""" attrs = super().extra_state_attributes - # Currently there is only one sensor so this works. If we add more then we will need to handle this better. - gw_usage: list[UsageMeasurement] = self.coordinator.data + gw_usage: list[UsageMeasurement] = self._coordinator.data if gw_usage and len(gw_usage) > 0: last_measurement = dt.as_local( dt.utc_from_timestamp(gw_usage[-1].timestamp) @@ -122,7 +176,46 @@ def extra_state_attributes(self): else: # If no data then it's probably the start of the day so use that as the dates last_measurement = dt.start_of_local_day() - attrs["last_measurement"] = last_measurement return attrs + + +class _RealtimeUsageSensor(DukeEnergyGatewaySensor): + @staticmethod + def get_sensor_metadata() -> _SensorMetadata: + return _SensorMetadata( + "current_usage_w", + "Current Usage [W]", + "W", + "mdi:flash", + "power", + STATE_CLASS_MEASUREMENT, + False, + ) + + async def async_added_to_hass(self): + """Subscribe to updates.""" + # Setup subscriber callback + async def async_on_new_measurement(measurement: RealtimeUsageMeasurement): + _LOGGER.debug("New measurement received: %f", measurement.usage) + self._state = measurement.usage + await self.async_update_ha_state() + + # Attach subscriber callback + self._coordinator.async_realtime_subscribe_to_dispatcher( + _RealtimeUsageSensor.__name__, async_on_new_measurement + ) + + # Initialize the real-time data stream + self._coordinator.realtime_initialize() + + async def async_will_remove_from_hass(self): + """Undo subscription.""" + # Cancel the real-time data stream task + self._coordinator.realtime_cancel() + + # Remove subscriber callback + self._coordinator.async_realtime_unsubscribe_from_dispatcher( + _RealtimeUsageSensor.__name__ + ) diff --git a/custom_components/duke_energy_gateway/translations/en.json b/custom_components/duke_energy_gateway/translations/en.json index f766637..9c7248a 100644 --- a/custom_components/duke_energy_gateway/translations/en.json +++ b/custom_components/duke_energy_gateway/translations/en.json @@ -23,9 +23,13 @@ "data": { "binary_sensor": "Binary sensor enabled", "sensor": "Sensor enabled", - "switch": "Switch enabled" + "switch": "Switch enabled", + "realtimeInterval": "Real-time Usage Update Interval (sec)" } } + }, + "abort": { + "invalid_update_interval_value": "The Real-time Usage Update Interval must be a positive integer or 0 for no interval." } } } diff --git a/hacs.json b/hacs.json index a857cbe..83766d0 100644 --- a/hacs.json +++ b/hacs.json @@ -3,6 +3,6 @@ "hacs": "1.6.0", "domains": ["sensor"], "iot_class": "Cloud Polling", - "homeassistant": "2021.9.0", + "homeassistant": "2021.11.0", "render_readme": true }