Skip to content

Commit

Permalink
Reorder functions.
Browse files Browse the repository at this point in the history
  • Loading branch information
denpamusic committed Oct 14, 2023
1 parent fbebbce commit f5957a3
Showing 1 changed file with 142 additions and 136 deletions.
278 changes: 142 additions & 136 deletions pyplumio/devices/ecomax.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,6 @@ def _frame_is_supported(self, frame_type: FrameType | int) -> bool:
"""Check if frame is supported by the device."""
return frame_type not in self.data.get(ATTR_FRAME_ERRORS, [])

async def _update_frame_versions(self, versions: dict[int, int]) -> None:
"""Check frame versions and update outdated frames."""
for frame_type, version in versions.items():
if (
is_known_frame_type(frame_type)
and self._frame_is_supported(frame_type)
and not self._has_frame_version(frame_type, version)
):
# We don't have this frame or it's version has changed.
request = factory(get_frame_handler(frame_type), recipient=self.address)
self.queue.put_nowait(request)
self._frame_versions[frame_type] = version

def _mixers(self, indexes: Iterable[int]) -> Generator[Mixer, None, None]:
"""Iterate through the mixer indexes.
Expand Down Expand Up @@ -196,17 +183,6 @@ def _thermostats(self, indexes: Iterable[int]) -> Generator[Thermostat, None, No

return self.dispatch_nowait(ATTR_THERMOSTATS, thermostats)

async def _handle_ecomax_sensors(self, sensors: EventDataType) -> bool:
"""Handle ecoMAX sensors.
For each sensor dispatch an event with the sensor's name and
value.
"""
for name, value in sensors.items():
await self.dispatch(name, value)

return True

async def _handle_ecomax_parameters(
self, parameters: Sequence[tuple[int, ParameterDataType]]
) -> bool:
Expand All @@ -222,32 +198,49 @@ async def _handle_ecomax_parameters(
if product.type == ProductType.ECOMAX_P
else ECOMAX_I_PARAMETERS[index]
)
parameter = description.cls(
device=self,
description=description,
index=index,
value=value[0],
min_value=value[1],
max_value=value[2],
await self.dispatch(
description.name,
description.cls(
device=self,
description=description,
index=index,
value=value[0],
min_value=value[1],
max_value=value[2],
),
)
await self.dispatch(description.name, parameter)

return True

async def _handle_mixer_sensors(self, sensors: dict[int, EventDataType]) -> bool:
"""Handle mixer sensors.
async def _update_frame_versions(self, versions: dict[int, int]) -> None:
"""Check frame versions and update outdated frames."""
for frame_type, version in versions.items():
if (
is_known_frame_type(frame_type)
and self._frame_is_supported(frame_type)
and not self._has_frame_version(frame_type, version)
):
# We don't have this frame or it's version has changed.
request = factory(get_frame_handler(frame_type), recipient=self.address)
self.queue.put_nowait(request)
self._frame_versions[frame_type] = version

For each sensor dispatch an event with the
sensor's name and value. Events are dispatched for the
respective mixer instance.
async def _add_burned_fuel_counter(self, fuel_consumption: float) -> None:
"""Calculate fuel burned since last sensor's data message
and dispatch 'fuel_burned' event.
"""
if not sensors:
return False

for mixer in self._mixers(sensors.keys()):
await mixer.dispatch(ATTR_MIXER_SENSORS, sensors[mixer.index])
current_timestamp_ns = time.perf_counter_ns()
time_passed_ns = current_timestamp_ns - self._fuel_burned_timestamp_ns
if time_passed_ns >= MAX_TIME_SINCE_LAST_FUEL_UPDATE_NS:
_LOGGER.warning(
"Skipping outdated fuel consumption data, was %i seconds old",
time_passed_ns / 1000000000,
)
return

return True
fuel_burned = fuel_consumption / (3600 * 1000000000) * time_passed_ns
await self.dispatch(ATTR_FUEL_BURNED, fuel_burned)
self._fuel_burned_timestamp_ns = current_timestamp_ns

async def _handle_mixer_parameters(
self,
Expand All @@ -267,25 +260,100 @@ async def _handle_mixer_parameters(

return True

async def _handle_thermostat_sensors(
self, sensors: dict[int, EventDataType]
) -> bool:
"""Handle thermostat sensors.
async def _handle_mixer_sensors(self, sensors: dict[int, EventDataType]) -> bool:
"""Handle mixer sensors.
For each sensor dispatch an event with the
sensor's name and value. Events are dispatched for the
respective thermostat instance.
respective mixer instance.
"""
if not sensors:
return False

for thermostat in self._thermostats(sensors.keys()):
await thermostat.dispatch(
ATTR_THERMOSTAT_SENSORS, sensors[thermostat.index]
for mixer in self._mixers(sensors.keys()):
await mixer.dispatch(ATTR_MIXER_SENSORS, sensors[mixer.index])

return True

async def _decode_regulator_data(self, decoder: StructureDecoder) -> bool:
"""Decode an ecoMAX regulator data.
Dispatch 'frame_versions' and 'regdata' events.
"""
data = decoder.decode(decoder.frame.message, data=self.data)[0]
for field in (ATTR_FRAME_VERSIONS, ATTR_REGDATA):
try:
await self.dispatch(field, data[field])
except KeyError:
continue

return True

async def _add_schedules(
self, schedules: list[tuple[int, list[list[bool]]]]
) -> EventDataType:
"""Add schedules to the dataset."""
return {
SCHEDULES[index]: Schedule(
name=SCHEDULES[index],
device=self,
monday=ScheduleDay(schedule[1]),
tuesday=ScheduleDay(schedule[2]),
wednesday=ScheduleDay(schedule[3]),
thursday=ScheduleDay(schedule[4]),
friday=ScheduleDay(schedule[5]),
saturday=ScheduleDay(schedule[6]),
sunday=ScheduleDay(schedule[0]),
)
for index, schedule in schedules
}

async def _add_schedule_parameters(
self, parameters: Sequence[tuple[int, ParameterDataType]]
) -> bool:
"""Add schedule parameters to the dataset."""
for index, value in parameters:
description = SCHEDULE_PARAMETERS[index]
await self.dispatch(
description.name,
description.cls(
device=self,
description=description,
index=index,
value=value[0],
min_value=value[1],
max_value=value[2],
),
)

return True

async def _handle_ecomax_sensors(self, sensors: EventDataType) -> bool:
"""Handle ecoMAX sensors.
For each sensor dispatch an event with the sensor's name and
value.
"""
for name, value in sensors.items():
await self.dispatch(name, value)

return True

async def _add_ecomax_control_parameter(self, mode: int) -> None:
"""Create ecoMAX control parameter instance and dispatch an
'ecomax_control' event.
"""
await self.dispatch(
ECOMAX_CONTROL_PARAMETER.name,
ECOMAX_CONTROL_PARAMETER.cls(
device=self,
description=ECOMAX_CONTROL_PARAMETER,
value=(mode != DeviceState.OFF),
min_value=STATE_OFF,
max_value=STATE_ON,
),
)

async def _handle_thermostat_parameters(
self,
parameters: dict[int, Sequence[tuple[int, ParameterDataType]]] | None,
Expand All @@ -306,6 +374,17 @@ async def _handle_thermostat_parameters(

return True

async def _decode_thermostat_parameters(self, decoder: StructureDecoder) -> bool:
"""Decode thermostat parameters.
Dispatch 'thermostat_profile' and 'thermostat_parameters' event.
"""
data = decoder.decode(decoder.frame.message, data=self.data)[0]
for field in (ATTR_THERMOSTAT_PROFILE, ATTR_THERMOSTAT_PARAMETERS):
await self.dispatch(field, data[field])

return True

async def _add_thermostat_profile_parameter(
self, parameter: ParameterDataType
) -> EcomaxParameter | None:
Expand All @@ -321,98 +400,25 @@ async def _add_thermostat_profile_parameter(

return None

async def _decode_thermostat_parameters(self, decoder: StructureDecoder) -> bool:
"""Decode thermostat parameters.
Dispatch 'thermostat_profile' and 'thermostat_parameters' event.
"""
data = decoder.decode(decoder.frame.message, data=self.data)[0]
for field in (ATTR_THERMOSTAT_PROFILE, ATTR_THERMOSTAT_PARAMETERS):
await self.dispatch(field, data[field])

return True

async def _add_ecomax_control_parameter(self, mode: int) -> None:
"""Create ecoMAX control parameter instance and dispatch an
'ecomax_control' event.
"""
parameter = ECOMAX_CONTROL_PARAMETER.cls(
device=self,
description=ECOMAX_CONTROL_PARAMETER,
value=(mode != DeviceState.OFF),
min_value=STATE_OFF,
max_value=STATE_ON,
)
await self.dispatch(ECOMAX_CONTROL_PARAMETER.name, parameter)

async def _add_burned_fuel_counter(self, fuel_consumption: float) -> None:
"""Calculate fuel burned since last sensor's data message
and dispatch 'fuel_burned' event.
"""
current_timestamp_ns = time.perf_counter_ns()
time_passed_ns = current_timestamp_ns - self._fuel_burned_timestamp_ns
if time_passed_ns < MAX_TIME_SINCE_LAST_FUEL_UPDATE_NS:
fuel_burned = fuel_consumption / (3600 * 1000000000) * time_passed_ns
await self.dispatch(ATTR_FUEL_BURNED, fuel_burned)
else:
_LOGGER.warning(
"Skipping outdated fuel consumption data, was %i seconds old",
time_passed_ns / 1000000000,
)

self._fuel_burned_timestamp_ns = current_timestamp_ns

async def _decode_regulator_data(self, decoder: StructureDecoder) -> bool:
"""Decode an ecoMAX regulator data.
async def _handle_thermostat_sensors(
self, sensors: dict[int, EventDataType]
) -> bool:
"""Handle thermostat sensors.
Dispatch 'frame_versions' and 'regdata' events.
For each sensor dispatch an event with the
sensor's name and value. Events are dispatched for the
respective thermostat instance.
"""
data = decoder.decode(decoder.frame.message, data=self.data)[0]
for field in (ATTR_FRAME_VERSIONS, ATTR_REGDATA):
try:
await self.dispatch(field, data[field])
except KeyError:
continue

return True
if not sensors:
return False

async def _add_schedule_parameters(
self, parameters: Sequence[tuple[int, ParameterDataType]]
) -> bool:
"""Add schedule parameters to the dataset."""
for index, value in parameters:
description = SCHEDULE_PARAMETERS[index]
parameter = description.cls(
device=self,
description=description,
index=index,
value=value[0],
min_value=value[1],
max_value=value[2],
for thermostat in self._thermostats(sensors.keys()):
await thermostat.dispatch(
ATTR_THERMOSTAT_SENSORS, sensors[thermostat.index]
)
await self.dispatch(description.name, parameter)

return True

async def _add_schedules(
self, schedules: list[tuple[int, list[list[bool]]]]
) -> EventDataType:
"""Add schedules to the dataset."""
return {
SCHEDULES[index]: Schedule(
name=SCHEDULES[index],
device=self,
monday=ScheduleDay(schedule[1]),
tuesday=ScheduleDay(schedule[2]),
wednesday=ScheduleDay(schedule[3]),
thursday=ScheduleDay(schedule[4]),
friday=ScheduleDay(schedule[5]),
saturday=ScheduleDay(schedule[6]),
sunday=ScheduleDay(schedule[0]),
)
for index, schedule in schedules
}

async def turn_on(self) -> bool:
"""Turn on the ecoMAX controller."""
try:
Expand Down

0 comments on commit f5957a3

Please sign in to comment.