Skip to content

Commit

Permalink
Gracefully handle a few more XNCP parsing failures
Browse files Browse the repository at this point in the history
  • Loading branch information
puddly committed Nov 27, 2024
1 parent 06eb3d9 commit 4217cee
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 1 deletion.
9 changes: 8 additions & 1 deletion bellows/ezsp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,14 @@ async def send_xncp_frame(
if status != t.EmberStatus.SUCCESS:
raise InvalidCommandError("XNCP is not supported")

rsp_frame = xncp.XncpCommand.from_bytes(data)
try:
rsp_frame = xncp.XncpCommand.from_bytes(data)
except ValueError:
raise InvalidCommandError(f"Invalid XNCP response: {data!r}")

if isinstance(rsp_frame.payload, xncp.Unknown):
raise InvalidCommandError(f"XNCP firmware does not support {payload}")

LOGGER.debug("Received XNCP frame: %s", rsp_frame)

if rsp_frame.status != t.EmberStatus.SUCCESS:
Expand Down
5 changes: 5 additions & 0 deletions bellows/ezsp/xncp.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,8 @@ class GetFlowControlTypeReq(XncpCommandPayload):
@register_command(XncpCommandId.GET_FLOW_CONTROL_TYPE_RSP)
class GetFlowControlTypeRsp(XncpCommandPayload):
flow_control_type: FlowControlType


@register_command(XncpCommandId.UNKNOWN)
class Unknown(XncpCommandPayload):
pass
31 changes: 31 additions & 0 deletions tests/test_xncp.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,37 @@ async def test_xncp_failure(ezsp_f: EZSP) -> None:
]


async def test_xncp_failure_multiprotocol(ezsp_f: EZSP) -> None:
"""Test XNCP failure with multiprotocol firmware."""
ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock(
return_value=[t.EmberStatus.SUCCESS, b""]
)

with pytest.raises(InvalidCommandError):
await ezsp_f.xncp_get_supported_firmware_features()

assert customFrame.mock_calls == [
call(xncp.XncpCommand.from_payload(xncp.GetSupportedFeaturesReq()).serialize())
]


async def test_xncp_failure_unknown(ezsp_f: EZSP) -> None:
"""Test XNCP failure, unknown command."""
ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock(
return_value=[
t.EmberStatus.SUCCESS,
xncp.XncpCommand.from_payload(xncp.Unknown()).serialize(),
]
)

with pytest.raises(InvalidCommandError):
await ezsp_f.xncp_get_supported_firmware_features()

assert customFrame.mock_calls == [
call(xncp.XncpCommand.from_payload(xncp.GetSupportedFeaturesReq()).serialize())
]


async def test_xncp_get_supported_firmware_features(ezsp_f: EZSP) -> None:
"""Test XNCP get_supported_firmware_features."""
ezsp_f._mock_commands["customFrame"] = customFrame = AsyncMock(
Expand Down

0 comments on commit 4217cee

Please sign in to comment.