Skip to content

Commit

Permalink
Optionally ignore telegram sending order in tests
Browse files Browse the repository at this point in the history
because we can't know which platform initialises first
  • Loading branch information
farmio committed Sep 19, 2024
1 parent 7c80c0c commit 732c527
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 42 deletions.
16 changes: 8 additions & 8 deletions tests/components/knx/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@ async def test_something(hass, knx):

## Asserting outgoing telegrams

All outgoing telegrams are pushed to an assertion queue. Assert them in order they were sent.
All outgoing telegrams are appended to an assertion list. Assert them in order they were sent or pass `ignore_order=True` to the assertion method.

- `knx.assert_no_telegram`
Asserts that no telegram was sent (assertion queue is empty).
Asserts that no telegram was sent (assertion list is empty).
- `knx.assert_telegram_count(count: int)`
Asserts that `count` telegrams were sent.
- `knx.assert_read(group_address: str, response: int | tuple[int, ...] | None = None)`
- `knx.assert_read(group_address: str, response: int | tuple[int, ...] | None = None, ignore_order: bool = False)`
Asserts that a GroupValueRead telegram was sent to `group_address`.
The telegram will be removed from the assertion queue.
The telegram will be removed from the assertion list.
Optionally inject incoming GroupValueResponse telegram after reception to clear the value reader waiting task. This can also be done manually with `knx.receive_response`.
- `knx.assert_response(group_address: str, payload: int | tuple[int, ...])`
- `knx.assert_response(group_address: str, payload: int | tuple[int, ...], ignore_order: bool = False)`
Asserts that a GroupValueResponse telegram with `payload` was sent to `group_address`.
The telegram will be removed from the assertion queue.
- `knx.assert_write(group_address: str, payload: int | tuple[int, ...])`
The telegram will be removed from the assertion list.
- `knx.assert_write(group_address: str, payload: int | tuple[int, ...], ignore_order: bool = False)`
Asserts that a GroupValueWrite telegram with `payload` was sent to `group_address`.
The telegram will be removed from the assertion queue.
The telegram will be removed from the assertion list.

Change some states or call some services and assert outgoing telegrams.

Expand Down
85 changes: 55 additions & 30 deletions tests/components/knx/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ def __init__(self, hass: HomeAssistant, mock_config_entry: MockConfigEntry) -> N
self.hass: HomeAssistant = hass
self.mock_config_entry: MockConfigEntry = mock_config_entry
self.xknx: XKNX
# outgoing telegrams will be put in the Queue instead of sent to the interface
# outgoing telegrams will be put in the List instead of sent to the interface
# telegrams to an InternalGroupAddress won't be queued here
self._outgoing_telegrams: asyncio.Queue = asyncio.Queue()
self._outgoing_telegrams: list[Telegram] = []

def assert_state(self, entity_id: str, state: str, **attributes) -> None:
"""Assert the state of an entity."""
Expand All @@ -76,7 +76,7 @@ async def setup_integration(
async def patch_xknx_start():
"""Patch `xknx.start` for unittests."""
self.xknx.cemi_handler.send_telegram = AsyncMock(
side_effect=self._outgoing_telegrams.put
side_effect=self._outgoing_telegrams.append
)
# after XKNX.__init__() to not overwrite it by the config entry again
# before StateUpdater starts to avoid slow down of tests
Expand Down Expand Up @@ -117,24 +117,22 @@ def fish_xknx(*args, **kwargs):
########################

def _list_remaining_telegrams(self) -> str:
"""Return a string containing remaining outgoing telegrams in test Queue. One per line."""
remaining_telegrams = []
while not self._outgoing_telegrams.empty():
remaining_telegrams.append(self._outgoing_telegrams.get_nowait())
return "\n".join(map(str, remaining_telegrams))
"""Return a string containing remaining outgoing telegrams in test List."""
return "\n".join(map(str, self._outgoing_telegrams))

async def assert_no_telegram(self) -> None:
"""Assert if every telegram in test Queue was checked."""
"""Assert if every telegram in test List was checked."""
await self.hass.async_block_till_done()
assert self._outgoing_telegrams.empty(), (
f"Found remaining unasserted Telegrams: {self._outgoing_telegrams.qsize()}\n"
remaining_telegram_count = len(self._outgoing_telegrams)
assert not remaining_telegram_count, (
f"Found remaining unasserted Telegrams: {remaining_telegram_count}\n"
f"{self._list_remaining_telegrams()}"
)

async def assert_telegram_count(self, count: int) -> None:
"""Assert outgoing telegram count in test Queue."""
"""Assert outgoing telegram count in test List."""
await self.hass.async_block_till_done()
actual_count = self._outgoing_telegrams.qsize()
actual_count = len(self._outgoing_telegrams)
assert actual_count == count, (
f"Outgoing telegrams: {actual_count} - Expected: {count}\n"
f"{self._list_remaining_telegrams()}"
Expand All @@ -149,52 +147,79 @@ async def assert_telegram(
group_address: str,
payload: int | tuple[int, ...] | None,
apci_type: type[APCI],
ignore_order: bool = False,
) -> None:
"""Assert outgoing telegram. One by one in timely order."""
"""Assert outgoing telegram. Optionally in timely order."""
await self.xknx.telegrams.join()
try:
telegram = self._outgoing_telegrams.get_nowait()
except asyncio.QueueEmpty as err:
if not self._outgoing_telegrams:
raise AssertionError(
f"No Telegram found. Expected: {apci_type.__name__} -"
f" {group_address} - {payload}"
) from err
)
_expected_ga = GroupAddress(group_address)

if ignore_order:
for telegram in self._outgoing_telegrams:
if (
telegram.destination_address == _expected_ga
and isinstance(telegram.payload, apci_type)
and (payload is None or telegram.payload.value.value == payload)
):
self._outgoing_telegrams.remove(telegram)
return
raise AssertionError(
f"Telegram not found. Expected: {apci_type.__name__} -"
f" {group_address} - {payload}"
f"\nUnasserted telegrams:\n{self._list_remaining_telegrams()}"
)

telegram = self._outgoing_telegrams.pop(0)
assert isinstance(
telegram.payload, apci_type
), f"APCI type mismatch in {telegram} - Expected: {apci_type.__name__}"

assert (
str(telegram.destination_address) == group_address
telegram.destination_address == _expected_ga
), f"Group address mismatch in {telegram} - Expected: {group_address}"

if payload is not None:
assert (
telegram.payload.value.value == payload # type: ignore[attr-defined]
), f"Payload mismatch in {telegram} - Expected: {payload}"

async def assert_read(
self, group_address: str, response: int | tuple[int, ...] | None = None
self,
group_address: str,
response: int | tuple[int, ...] | None = None,
ignore_order: bool = False,
) -> None:
"""Assert outgoing GroupValueRead telegram. One by one in timely order.
"""Assert outgoing GroupValueRead telegram. Optionally in timely order.
Optionally inject incoming GroupValueResponse telegram after reception.
"""
await self.assert_telegram(group_address, None, GroupValueRead)
await self.assert_telegram(group_address, None, GroupValueRead, ignore_order)
if response is not None:
await self.receive_response(group_address, response)

async def assert_response(
self, group_address: str, payload: int | tuple[int, ...]
self,
group_address: str,
payload: int | tuple[int, ...],
ignore_order: bool = False,
) -> None:
"""Assert outgoing GroupValueResponse telegram. One by one in timely order."""
await self.assert_telegram(group_address, payload, GroupValueResponse)
"""Assert outgoing GroupValueResponse telegram. Optionally in timely order."""
await self.assert_telegram(
group_address, payload, GroupValueResponse, ignore_order
)

async def assert_write(
self, group_address: str, payload: int | tuple[int, ...]
self,
group_address: str,
payload: int | tuple[int, ...],
ignore_order: bool = False,
) -> None:
"""Assert outgoing GroupValueWrite telegram. One by one in timely order."""
await self.assert_telegram(group_address, payload, GroupValueWrite)
"""Assert outgoing GroupValueWrite telegram. Optionally in timely order."""
await self.assert_telegram(
group_address, payload, GroupValueWrite, ignore_order
)

####################
# Incoming telegrams
Expand Down
4 changes: 2 additions & 2 deletions tests/components/knx/test_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ async def test_remove_device(
await knx.setup_integration({})
client = await hass_ws_client(hass)

await knx.assert_read("1/0/21", response=True) # test light
await knx.assert_read("1/0/45", response=True) # test switch
await knx.assert_read("1/0/21", response=True, ignore_order=True) # test light
await knx.assert_read("1/0/45", response=True, ignore_order=True) # test switch

assert hass_storage[KNX_CONFIG_STORAGE_KEY]["data"]["entities"].get("switch")
test_device = device_registry.async_get_device(
Expand Down
6 changes: 4 additions & 2 deletions tests/components/knx/test_light.py
Original file line number Diff line number Diff line change
Expand Up @@ -1230,8 +1230,10 @@ async def test_light_ui_load(
"""Test loading a light from storage."""
await knx.setup_integration({})

await knx.assert_read("1/0/21", response=True) # test light
await knx.assert_read("1/0/45", response=True) # test switch - unrelated here
await knx.assert_read("1/0/21", response=True, ignore_order=True)
# unrelated switch in config store
await knx.assert_read("1/0/45", response=True, ignore_order=True)

state = hass.states.get("light.test")
assert state.state is STATE_ON

Expand Down

0 comments on commit 732c527

Please sign in to comment.