Skip to content

Commit

Permalink
Improve schedule decoder.
Browse files Browse the repository at this point in the history
  • Loading branch information
denpamusic committed Oct 14, 2023
1 parent 6987558 commit 980948a
Showing 1 changed file with 24 additions and 32 deletions.
56 changes: 24 additions & 32 deletions pyplumio/structures/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

if TYPE_CHECKING:
from pyplumio.frames import Request
else:
Request = object


ATTR_SCHEDULES: Final = "schedules"
ATTR_SCHEDULE_PARAMETERS: Final = "schedule_parameters"
Expand Down Expand Up @@ -138,28 +137,11 @@ def _join_bits(bits: Sequence[int | bool]) -> int:
return result


def _decode_schedule(message: bytearray, offset: int) -> tuple[list[list[bool]], int]:
"""Return schedule data and offset."""
schedule: list[list[bool]] = []
day: list[bool] = []

last_offset = offset + SCHEDULE_SIZE
while offset < last_offset:
if len(day) == 48:
schedule.append(day)
day = []

day.extend(_split_byte(message[offset]))
offset += 1

schedule.append(day)

return schedule, offset


class SchedulesStructure(Structure):
"""Represents a schedule data structure."""

_offset: int = 0

def encode(self, data: EventDataType) -> bytearray:
"""Encode data to the bytearray message."""
message = bytearray([1])
Expand All @@ -171,36 +153,46 @@ def encode(self, data: EventDataType) -> bytearray:
except (KeyError, ValueError) as e:
raise FrameDataError from e

message += bytearray(
return message + bytearray(
chain.from_iterable(
[_join_bits(day[i : i + 8]) for i in range(0, len(day), 8)]
for day in list(schedule)
)
)

return message
def _unpack_schedule(self, message: bytearray) -> list[list[bool]]:
"""Unpack a schedule."""
schedule: list[bool] = []
last_offset = self._offset + SCHEDULE_SIZE
while self._offset < last_offset:
schedule += _split_byte(message[self._offset])
self._offset += 1

# Split schedule. Each day consists of 48 half-hour intervals.
return [schedule[i : i + 48] for i in range(0, len(schedule), 48)]

def decode(
self, message: bytearray, offset: int = 0, data: EventDataType | None = None
) -> tuple[EventDataType, int]:
"""Decode bytes and return message data and offset."""
start = message[offset + 1]
end = message[offset + 2]
offset += 3
parameters: list[tuple[int, ParameterDataType]] = []
self._offset = offset + 3
schedules: list[tuple[int, list[list[bool]]]] = []
parameters: list[tuple[int, ParameterDataType]] = []

for _ in range(start, start + end):
index = message[offset]
switch = (message[offset + 1], 0, 1)
parameter = unpack_parameter(message, offset + 2)
schedule, offset = _decode_schedule(message, offset + 5)
schedules.append((index, schedule))
index = message[self._offset]
switch = (message[self._offset + 1], 0, 1)
parameter = unpack_parameter(message, self._offset + 2)
self._offset += 5
schedules.append((index, self._unpack_schedule(message)))
parameters.append((index * 2, switch))
parameters.append(((index * 2) + 1, parameter))
parameters.append((index * 2 + 1, parameter))

return (
ensure_device_data(
data, {ATTR_SCHEDULES: schedules, ATTR_SCHEDULE_PARAMETERS: parameters}
),
offset,
self._offset,
)

0 comments on commit 980948a

Please sign in to comment.