This repository has been archived by the owner on Oct 11, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsensor.py
157 lines (128 loc) · 5.4 KB
/
sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import logging
import math
from datetime import (timedelta, datetime)
from typing import Any, Callable, Dict, Optional
from aiohttp import ClientError
from homeassistant import config_entries, core
from homeassistant.helpers.entity import Entity
from homeassistant.helpers.typing import (
ConfigType,
DiscoveryInfoType,
HomeAssistantType,
)
from .session import FMIWaterLevelSession
from .const import CONF_HOURS, CONF_FORECAST_HOURS, CONF_LOCATION, CONF_STEP, CONF_OVERLAP, DOMAIN, FMISID_LOCATIONS, FMISID_VALUES
_LOGGER = logging.getLogger(__name__)
SCAN_INTERVAL = timedelta(minutes=10)
ATTRIBUTION = "Data provided by Finnish Meteorological Institute (FMI)"
ATTR_OBSERVATIONS = "observations"
ATTR_FORECAST = "forecast"
ATTR_LATEST_DATE = "latest_date"
ATTR_LATEST_WATERLEVEL = "latest_waterlevel"
ATTR_LOCATION = "location"
ATTR_DATE = "date"
ATTR_WATERLEVEL = "waterlevel"
async def async_setup_platform(
hass: HomeAssistantType,
config: ConfigType,
async_add_entities: Callable,
discovery_info: Optional[DiscoveryInfoType] = None,
) -> None:
if config[CONF_LOCATION] in FMISID_LOCATIONS:
fmisid = FMISID_VALUES[FMISID_LOCATIONS.index(config[CONF_LOCATION])]
session = FMIWaterLevelSession(fmisid, config[CONF_HOURS], config[CONF_FORECAST_HOURS], config[CONF_STEP], config[CONF_OVERLAP])
await hass.async_add_executor_job(session.call_api, True)
await hass.async_add_executor_job(session.call_api, False)
async_add_entities(
[FMIWaterLevelSensor(session, fmisid, config[CONF_HOURS], config[CONF_FORECAST_HOURS])],
update_before_add=True
)
async def async_setup_entry(hass: core.HomeAssistant, config_entry: config_entries.ConfigEntry, async_add_entities):
config = hass.data[DOMAIN][config_entry.entry_id]
if config_entry.options:
config.update(config_entry.options)
if config[CONF_LOCATION] in FMISID_LOCATIONS:
fmisid = FMISID_VALUES[FMISID_LOCATIONS.index(config[CONF_LOCATION])]
session = FMIWaterLevelSession(fmisid, config[CONF_HOURS], config[CONF_FORECAST_HOURS], config[CONF_STEP], config[CONF_OVERLAP])
await hass.async_add_executor_job(session.call_api, True)
await hass.async_add_executor_job(session.call_api, False)
async_add_entities(
[FMIWaterLevelSensor(session, config[CONF_LOCATION], config[CONF_HOURS], config[CONF_FORECAST_HOURS])],
update_before_add=True
)
class FMIWaterLevelSensor(Entity):
_attr_attribution = ATTRIBUTION
_attr_icon = "mdi:waves-arrow-up"
_attr_native_unit_of_measurement = "cm"
def __init__(
self,
session: FMIWaterLevelSession,
location: str,
hours: int,
forecast_hours: int
):
super().__init__()
self._session = session
self._fmisid = FMISID_VALUES[FMISID_LOCATIONS.index(location)]
self._location = location
self._hours = hours
self._forecast_hours = forecast_hours
self._state = None
self._available = True
self._attrs = {}
@property
def name(self) -> str:
return f"fmi_waterlevel_{self._fmisid}_{self._hours}_{self._forecast_hours}"
@property
def unique_id(self) -> str:
return f"fmi_waterlevel_{self._fmisid}_{self._hours}_{self._forecast_hours}"
@property
def available(self) -> bool:
return self._available
@property
def extra_state_attributes(self) -> Dict[str, Any]:
return self._attrs
@property
def state(self) -> Optional[str]:
return self._state
@property
def unit_of_measurement(self) -> str:
return "cm"
async def async_update(self):
try:
if self._forecast_hours > 0:
forecast_data = await self.hass.async_add_executor_job(self._session.call_api, True)
else:
forecast_data = []
if self._hours > 0:
observation_data = await self.hass.async_add_executor_job(self._session.call_api, False)
else:
observation_data = []
observations = []
forecast = []
latest_date = None
latest_waterlevel = None
for entry in observation_data:
if math.isnan(float(entry[1])):
continue
date = datetime.fromisoformat(entry[0])
waterlevel = f"{(float(entry[1]) / 10):.1f}"
if latest_date is None or date > latest_date:
latest_date = date
latest_waterlevel = waterlevel
observations.append({ATTR_DATE: date, ATTR_WATERLEVEL: waterlevel})
for entry in forecast_data:
if math.isnan(float(entry[1])):
continue
date = datetime.fromisoformat(entry[0])
waterlevel = f"{float(entry[1]):.1f}"
forecast.append({ATTR_DATE: date, ATTR_WATERLEVEL: waterlevel})
self._attrs[ATTR_FORECAST] = forecast
self._attrs[ATTR_OBSERVATIONS] = observations
self._attrs[ATTR_LATEST_DATE] = latest_date
self._attrs[ATTR_LATEST_WATERLEVEL] = latest_waterlevel
self._attrs[ATTR_LOCATION] = self._location
self._available = True
self._state = latest_waterlevel
except ClientError:
self._available = False