Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add sensor_reading and sensor_value helpers #86

Merged
merged 5 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 89 additions & 4 deletions src/aiokatcp/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017, 2019, 2022 National Research Foundation (SARAO)
# Copyright 2017, 2019, 2022, 2024 National Research Foundation (SARAO)
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
Expand Down Expand Up @@ -49,8 +49,10 @@
Set,
Tuple,
Type,
TypeVar,
Union,
cast,
overload,
)

from typing_extensions import Protocol
Expand All @@ -59,6 +61,7 @@
from .connection import FailReply, InvalidReply

logger = logging.getLogger(__name__)
_T = TypeVar("_T")


class _Handler(Protocol):
Expand All @@ -67,12 +70,12 @@ class _Handler(Protocol):

class _InformHandler(_Handler):
def __call__(self, _client: "Client", _msg: core.Message) -> None:
...
... # pragma: nocover


class _InformCallback(_Handler):
def __call__(self, _msg: core.Message) -> None:
...
... # pragma: nocover


class _PendingRequest:
Expand Down Expand Up @@ -221,7 +224,7 @@ async def handle_message(self, conn: connection.Connection, msg: core.Message) -
elif msg.mtype == core.Message.Type.INFORM:
req.informs.append(msg)
else:
self.logger.warning("Unknown message type %s", msg.mtype) # pragma: no cover
self.logger.warning("Unknown message type %s", msg.mtype) # pragma: nocover
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean this one looks as though we can probably explicitly test for it but it would be a bit contrived.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure it's possible, but also outside the scope of this PR.

elif msg.mtype == core.Message.Type.INFORM:
self.handle_inform(msg)
else:
Expand Down Expand Up @@ -615,6 +618,88 @@ async def request(self, name: str, *args: Any) -> Tuple[List[bytes], List[core.M
else:
raise InvalidReply(error.decode("utf-8", errors="replace"))

@overload
async def sensor_reading(self, sensor_name: str, sensor_type: None = None) -> sensor.Reading:
... # pragma: nocover

@overload
async def sensor_reading(self, sensor_name: str, sensor_type: Type[_T]) -> sensor.Reading[_T]:
... # pragma: nocover

async def sensor_reading(
self, sensor_name: str, sensor_type: Optional[type] = None
) -> sensor.Reading:
"""Request the reading of a single sensor from the server.

This is a wrapper around a ``?sensor-value`` request that decodes the
result. If you know the type of the sensor, it can be passed as a
parameter; if it is not specified, ``?sensor-list`` is used to
determine it. Note that this introduces a race condition (but an
unlikely one) where the sensor could be replaced by one of a different
type between the two requests.

If `sensor_type` is not given and the sensor has a discrete type, the
returned reading will contain a byte string rather than an enum.
Similarly, string sensors are returned as byte strings, but
`sensor_type` can be passed as `str` to override this.

This is not a high-performance interface. If you need to sample a
large number of sensors, better performance can be obtained with
hand-coded implementations, such as by pipelining multiple requests.

Raises
------
FailReply
If any of the requests fails e.g., because the sensor does not exist.
InvalidReply
If any of the requests is invalid. This generally indicates a bug, either
in this function or in the server.
"""
if sensor_type is None:
list_resp, value_resp = await asyncio.gather(
asyncio.create_task(self.request("sensor-list", sensor_name)),
asyncio.create_task(self.request("sensor-value", sensor_name)),
)
type_name = core.decode(str, list_resp[1][0].arguments[3])
if type_name == "discrete":
sensor_type = bytes
else:
sensor_type = SensorWatcher.SENSOR_TYPES[type_name]
else:
value_resp = await self.request("sensor-value", sensor_name)
value_informs = value_resp[1]
if len(value_informs) != 1:
raise FailReply(f"Server returned {len(value_informs)} sensors, but only 1 expected")
value_inform = value_informs[0]
timestamp = float(core.decode(core.Timestamp, value_inform.arguments[0]))
status = core.decode(sensor.Sensor.Status, value_inform.arguments[3])
value = core.decode(sensor_type, value_inform.arguments[4])
return sensor.Reading(value=value, status=status, timestamp=timestamp)

@overload
async def sensor_value(self, sensor_name: str, sensor_type: None = None) -> Any:
... # pragma: nocover

@overload
async def sensor_value(self, sensor_name: str, sensor_type: Type[_T]) -> _T:
... # pragma: nocover

async def sensor_value(self, sensor_name: str, sensor_type: Optional[type] = None) -> Any:
"""Request the value of a single sensor from the server.

See :meth:`sensor_reading` for more information. This is a thin
wrapper that just returns the value from the reading.

Raises
------
ValueError
if the sensor status indicates that the value is invalid.
"""
reading = await self.sensor_reading(sensor_name, sensor_type)
if not reading.status.valid_value():
raise ValueError(f"Reading for {sensor_name} has status {reading.status}")
return reading.value

def add_sensor_watcher(self, watcher: "AbstractSensorWatcher") -> None:
if self._sensor_monitor is None:
self._sensor_monitor = _SensorMonitor(self)
Expand Down
2 changes: 1 addition & 1 deletion src/aiokatcp/sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ def close(self) -> None:

@abc.abstractmethod
def _parameters(self) -> tuple:
pass # pragma: no cover
pass # pragma: nocover

def parameters(self) -> tuple:
"""Return the parameters with which the sensor was created."""
Expand Down
79 changes: 78 additions & 1 deletion tests/test_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2017, 2019-2020, 2022 National Research Foundation (SARAO)
# Copyright 2017, 2019-2020, 2022, 2024 National Research Foundation (SARAO)
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
Expand Down Expand Up @@ -41,6 +41,7 @@
from aiokatcp import (
AbstractSensorWatcher,
Client,
DeviceStatus,
FailReply,
InvalidReply,
Message,
Expand Down Expand Up @@ -235,6 +236,82 @@ async def test_request_with_informs(channel, event_loop) -> None:
)


async def test_sensor_reading_explicit_type(channel, event_loop) -> None:
await channel.wait_connected()
future = event_loop.create_task(channel.client.sensor_reading("device-status", DeviceStatus))
assert await channel.reader.readline() == b"?sensor-value[1] device-status\n"
channel.writer.write(b"#sensor-value[1] 1234567890.1 1 device-status nominal ok\n")
channel.writer.write(b"!sensor-value[1] ok 1\n")
result = await future
assert result == Reading(1234567890.1, Sensor.Status.NOMINAL, DeviceStatus.OK)


async def test_sensor_reading_int(channel, event_loop) -> None:
await channel.wait_connected()
future = event_loop.create_task(channel.client.sensor_reading("foo"))
assert await channel.reader.readline() == b"?sensor-list[1] foo\n"
channel.writer.write(b"#sensor-list[1] foo description\\_stuff unit integer\n")
channel.writer.write(b"!sensor-list[1] ok 1\n")
assert await channel.reader.readline() == b"?sensor-value[2] foo\n"
channel.writer.write(b"#sensor-value[2] 1234567890.1 1 device-status warn 7\n")
channel.writer.write(b"!sensor-value[2] ok 1\n")
result = await future
assert result == Reading(1234567890.1, Sensor.Status.WARN, 7)


async def test_sensor_reading_discrete(channel, event_loop) -> None:
await channel.wait_connected()
future = event_loop.create_task(channel.client.sensor_reading("foo"))
assert await channel.reader.readline() == b"?sensor-list[1] foo\n"
channel.writer.write(b"#sensor-list[1] foo description\\_stuff unit discrete hello world\n")
channel.writer.write(b"!sensor-list[1] ok 1\n")
assert await channel.reader.readline() == b"?sensor-value[2] foo\n"
channel.writer.write(b"#sensor-value[2] 1234567890.1 1 device-status warn hello\n")
channel.writer.write(b"!sensor-value[2] ok 1\n")
result = await future
assert result == Reading(1234567890.1, Sensor.Status.WARN, b"hello")


async def test_sensor_reading_missing(channel, event_loop) -> None:
await channel.wait_connected()
future = event_loop.create_task(channel.client.sensor_reading("foo", str))
assert await channel.reader.readline() == b"?sensor-value[1] foo\n"
channel.writer.write(b"!sensor-value[1] fail Unknown\\_sensor\\_'foo'\n")
with pytest.raises(FailReply):
await future


async def test_sensor_reading_wrong_count(channel, event_loop) -> None:
await channel.wait_connected()
future = event_loop.create_task(channel.client.sensor_reading("/foo/", str))
assert await channel.reader.readline() == b"?sensor-value[1] /foo/\n"
channel.writer.write(b"#sensor-value[1] 1234567890.1 1 foo1 nominal ok\n")
channel.writer.write(b"#sensor-value[1] 1234567890.2 1 foo2 nominal ok\n")
channel.writer.write(b"!sensor-value[1] ok 2\n")
with pytest.raises(FailReply, match="Server returned 2 sensors, but only 1 expected"):
await future


async def test_sensor_value_ok(channel, event_loop) -> None:
await channel.wait_connected()
future = event_loop.create_task(channel.client.sensor_value("foo", int))
assert await channel.reader.readline() == b"?sensor-value[1] foo\n"
channel.writer.write(b"#sensor-value[1] 1234567890.1 1 device-status warn 7\n")
channel.writer.write(b"!sensor-value[1] ok 1\n")
result = await future
assert result == 7


async def test_sensor_value_invalid_status(channel, event_loop) -> None:
await channel.wait_connected()
future = event_loop.create_task(channel.client.sensor_value("foo", int))
assert await channel.reader.readline() == b"?sensor-value[1] foo\n"
channel.writer.write(b"#sensor-value[1] 1234567890.1 1 device-status unknown 7\n")
channel.writer.write(b"!sensor-value[1] ok 1\n")
with pytest.raises(ValueError):
await future


async def test_inform(channel, caplog) -> None:
client = cast(DummyClient, channel.client)
await channel.wait_connected()
Expand Down
Loading