Skip to content

Commit

Permalink
More data types improvements.
Browse files Browse the repository at this point in the history
- Byte datatype is now handled by struct module as an UnsignedChar.
- Rename PascalString to VarString to avoid confusion, as in EM's these are true variable length.
- Rename ByteString to VarBytes to follow new naming convention.
- Rename Boolean to BitArray to better reflect it's use case.
  • Loading branch information
denpamusic committed Nov 10, 2023
1 parent 30561b9 commit 42d0729
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 114 deletions.
94 changes: 39 additions & 55 deletions pyplumio/helpers/data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def __eq__(self, other) -> bool:

def _slice_data(self, data: bytes) -> bytes:
"""Slice the data to data type size."""
return data[0 : self.size] if self.size is not None else data
return data[: self.size] if self.size is not None else data

@classmethod
def from_bytes(cls, data: bytes, offset: int = 0):
Expand Down Expand Up @@ -74,27 +74,13 @@ def unpack(self, _: bytes) -> None:
self._value = None


class Byte(DataType):
"""Represents a byte."""

_size: int = 1

def pack(self) -> bytes:
"""Pack the data."""
return self.value.to_bytes(length=self.size, byteorder="big")

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
self._value = ord(self._slice_data(data))


class Boolean(DataType):
"""Represents a boolean."""
class BitArray(DataType):
"""Represents a bit array."""

_index: int = 0

def __init__(self, value: Any = None):
"""Initialize a new boolean."""
"""Initialize a new bit array."""
self._index = 0
super().__init__(value)

Expand All @@ -107,11 +93,11 @@ def next(self, index: int = 0) -> int:

def pack(self) -> bytes:
"""Pack the data."""
return self._value.to_bytes(length=1, byteorder="big")
return UnsignedChar(self._value).to_bytes()

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
self._value = ord(data[0:1])
self._value = UnsignedChar.from_bytes(data[:1]).value

@property
def value(self) -> bool | None:
Expand Down Expand Up @@ -153,55 +139,47 @@ def unpack(self, data: bytes) -> None:


class String(DataType):
"""Represents a string."""
"""Represents a null terminated string."""

def __init__(self, value: Any = ""):
"""Initialize a new null terminated string data type."""
super().__init__(value)
self._size = len(self.value) + 1

def pack(self) -> bytes:
"""Pack the data."""
return self.value.encode() + b"\x00"
return self.value.encode() + b"\0"

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
value = ""
if (offset := 0) in data:
while not data[offset] == 0:
value += chr(data[offset])
offset += 1
self._value = data.split(b"\0", 1)[0].decode()
self._size = len(self.value) + 1

self._value = value

@property
def size(self) -> int:
"""A data size."""
return len(self.value) + 1 if self.value is not None else 0
class VarBytes(DataType):
"""Represents a variable length bytes."""


class ByteString(DataType):
"""Represents a byte string."""
def __init__(self, value: Any = b""):
"""Initialize a new variable length bytes data type."""
super().__init__(value)
self._size = len(value) + 1

def pack(self) -> bytes:
"""Pack the data."""
return bytearray([self.size - 1]) + self.value
return UnsignedChar(self.size - 1).to_bytes() + self.value

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
self._size = data[0] + 1
self._value = data[1 : self.size]

@property
def size(self) -> int:
"""A data size."""
if self._size > 0:
return self._size

return len(self.value) + 1


class PascalString(ByteString):
"""Represents a Pascal string."""
class VarString(VarBytes):
"""Represents a variable length string."""

def pack(self) -> bytes:
"""Pack the data."""
return bytearray([self.size - 1]) + self.value.encode()
return UnsignedChar(self.size - 1).to_bytes() + self.value.encode()

def unpack(self, data: bytes) -> None:
"""Unpack the data."""
Expand Down Expand Up @@ -236,16 +214,16 @@ class SignedChar(BuiltInDataType):
_struct: ClassVar[struct.Struct] = struct.Struct("<b")


class Short(BuiltInDataType):
"""Represents a 16 bit integer."""
class UnsignedChar(BuiltInDataType):
"""Represents an unsigned char."""

_struct: ClassVar[struct.Struct] = struct.Struct("<h")
_struct: ClassVar[struct.Struct] = struct.Struct("<B")


class Int(BuiltInDataType):
"""Represents a 32 bit integer."""
class Short(BuiltInDataType):
"""Represents a 16 bit integer."""

_struct: ClassVar[struct.Struct] = struct.Struct("<i")
_struct: ClassVar[struct.Struct] = struct.Struct("<h")


class UnsignedShort(BuiltInDataType):
Expand All @@ -254,6 +232,12 @@ class UnsignedShort(BuiltInDataType):
_struct: ClassVar[struct.Struct] = struct.Struct("<H")


class Int(BuiltInDataType):
"""Represents a 32 bit integer."""

_struct: ClassVar[struct.Struct] = struct.Struct("<i")


class UnsignedInt(BuiltInDataType):
"""Represents a unsigned 32 bit integer."""

Expand Down Expand Up @@ -291,13 +275,13 @@ class UInt64(BuiltInDataType):
SignedChar,
Short,
Int,
Byte,
UnsignedChar,
UnsignedShort,
UnsignedInt,
Float,
Undefined,
Double,
Boolean,
BitArray,
String,
String,
Int64,
Expand Down
6 changes: 3 additions & 3 deletions pyplumio/structures/network_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Final

from pyplumio.const import EncryptionType
from pyplumio.helpers.data_types import IPv4, PascalString
from pyplumio.helpers.data_types import IPv4, VarString
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import Structure
from pyplumio.utils import ensure_dict
Expand Down Expand Up @@ -66,7 +66,7 @@ def encode(self, data: EventDataType) -> bytearray:
message.append(network_info.wlan.signal_quality)
message.append(network_info.wlan.status)
message += b"\x00" * 4
message += PascalString(network_info.wlan.ssid).to_bytes()
message += VarString(network_info.wlan.ssid).to_bytes()

return message

Expand All @@ -92,7 +92,7 @@ def decode(
encryption=EncryptionType(int(message[offset + 26])),
signal_quality=int(message[offset + 27]),
status=bool(message[offset + 28]),
ssid=PascalString.from_bytes(message, offset + 33).value,
ssid=VarString.from_bytes(message, offset + 33).value,
),
server_status=bool(message[offset + 25]),
)
Expand Down
6 changes: 3 additions & 3 deletions pyplumio/structures/product_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Final

from pyplumio.const import ProductType
from pyplumio.helpers.data_types import ByteString, PascalString, UnsignedShort
from pyplumio.helpers.data_types import UnsignedShort, VarBytes, VarString
from pyplumio.helpers.typing import EventDataType
from pyplumio.helpers.uid import decode_uid
from pyplumio.structures import StructureDecoder
Expand Down Expand Up @@ -48,7 +48,7 @@ def decode(
product_type, product_id = struct.unpack_from("<BH", message)
offset += 3

uid = ByteString.from_bytes(message, offset)
uid = VarBytes.from_bytes(message, offset)
offset += uid.size

logo = UnsignedShort.from_bytes(message, offset)
Expand All @@ -57,7 +57,7 @@ def decode(
image = UnsignedShort.from_bytes(message, offset)
offset += image.size

model_name = PascalString.from_bytes(message, offset)
model_name = VarString.from_bytes(message, offset)
offset += model_name.size

return (
Expand Down
18 changes: 9 additions & 9 deletions pyplumio/structures/regulator_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from typing import Final

from pyplumio.helpers.data_types import Boolean, DataType
from pyplumio.helpers.data_types import BitArray, DataType
from pyplumio.helpers.event_manager import EventManager
from pyplumio.helpers.typing import EventDataType
from pyplumio.structures import StructureDecoder
Expand All @@ -25,20 +25,20 @@ class RegulatorDataStructure(StructureDecoder):
"""Represents a regulator data structure."""

_offset: int = 0
_boolean_index: int = 0
_bitarray_index: int = 0

def _unpack_regulator_data(self, message: bytearray, data_type: DataType):
"""Unpack a regulator data sensor."""
if not isinstance(data_type, Boolean) and self._boolean_index > 0:
# Current data type is not boolean, but previous was, thus
# we skip single byte that was left from the boolean
# and reset boolean index.
if not isinstance(data_type, BitArray) and self._bitarray_index > 0:
# Current data type is not bitarray, but previous was, thus
# we skip a single byte that was left from the bitarray
# and reset bitarray index.
self._offset += 1
self._boolean_index = 0
self._bitarray_index = 0

data_type.unpack(message[self._offset :])
if isinstance(data_type, Boolean):
self._boolean_index = data_type.next(self._boolean_index)
if isinstance(data_type, BitArray):
self._bitarray_index = data_type.next(self._bitarray_index)

try:
return data_type.value
Expand Down
32 changes: 16 additions & 16 deletions tests/helpers/test_data_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ def test_int() -> None:
assert data_type.to_bytes() == buffer


def test_byte() -> None:
"""Test a byte data type."""
def test_unsigned_char() -> None:
"""Test an signed char data type."""
buffer = bytearray([0x3])
data_type = data_types.Byte.from_bytes(buffer)
data_type = data_types.UnsignedChar.from_bytes(buffer)
assert data_type.value == 3
assert data_type.size == 1
assert data_type.to_bytes() == buffer
Expand Down Expand Up @@ -115,10 +115,10 @@ def test_double() -> None:
assert data_type.to_bytes() == buffer


def test_boolean() -> None:
"""Test a boolean data type."""
def test_bitarray() -> None:
"""Test a bit array data type."""
buffer = bytearray([0x55])
data_type = data_types.Boolean.from_bytes(buffer)
data_type = data_types.BitArray.from_bytes(buffer)
for index, value in enumerate([1, 0, 1, 0, 1, 0, 1, 0]):
next_bit = data_type.next(index)
assert data_type.value == bool(value)
Expand Down Expand Up @@ -197,19 +197,19 @@ def test_string() -> None:
assert data_type.to_bytes() == buffer


def test_pascal_string() -> None:
"""Test a pascal string data type."""
buffer = b"\x04test"
data_type = data_types.PascalString.from_bytes(buffer)
assert data_type.value == "test"
def test_var_bytes() -> None:
"""Test a variable bytes data type."""
buffer = b"\x04\xDE\xAD\xBE\xEF"
data_type = data_types.VarBytes.from_bytes(buffer)
assert data_type.value == bytearray([0xDE, 0xAD, 0xBE, 0xEF])
assert data_type.size == 5
assert data_type.to_bytes() == buffer


def test_byte_string() -> None:
"""Test a byte string data type."""
buffer = b"\x04\xDE\xAD\xBE\xEF"
data_type = data_types.ByteString.from_bytes(buffer)
assert data_type.value == bytearray([0xDE, 0xAD, 0xBE, 0xEF])
def test_var_string() -> None:
"""Test a variable string data type."""
buffer = b"\x04test"
data_type = data_types.VarString.from_bytes(buffer)
assert data_type.value == "test"
assert data_type.size == 5
assert data_type.to_bytes() == buffer
Loading

0 comments on commit 42d0729

Please sign in to comment.