From 3dc593aefd24e8a286fca46d4dac0b41a2b0a7e6 Mon Sep 17 00:00:00 2001 From: Bruce Merry Date: Thu, 5 Sep 2024 11:37:34 +0200 Subject: [PATCH] Use _write_async_message for sensor updates Clients that don't keep up with sensor updates will now be disconnected, instead of causing the server to use an ever-increasing amount of RAM. This required some unintuitive changes to keep mypy happy (self: Self should normally never be needed). --- src/aiokatcp/connection.py | 9 ++++----- src/aiokatcp/server.py | 6 ++++-- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/src/aiokatcp/connection.py b/src/aiokatcp/connection.py index 4f4e7dd..6a6e2f4 100644 --- a/src/aiokatcp/connection.py +++ b/src/aiokatcp/connection.py @@ -35,7 +35,7 @@ from typing import Any, Callable, Iterable, Optional, TypeVar import decorator -from typing_extensions import Protocol +from typing_extensions import Protocol, Self from . import core @@ -44,7 +44,6 @@ _BLANK_RE = re.compile(rb"^[ \t]*[\r\n]?$") # typing.Protocol requires a contravariant typevar _C_contra = TypeVar("_C_contra", bound="Connection", contravariant=True) -_C = TypeVar("_C", bound="Connection") class ConvertCRProtocol(asyncio.StreamReaderProtocol): @@ -127,8 +126,8 @@ async def handle_message(self, conn: _C_contra, msg: core.Message) -> None: class Connection: def __init__( - self: _C, - owner: _ConnectionOwner[_C], + self, + owner: _ConnectionOwner[Self], reader: asyncio.StreamReader, writer: asyncio.StreamWriter, is_server: bool, @@ -193,7 +192,7 @@ async def drain(self) -> None: self.logger.warning("Connection closed while draining: %s", error) self._close_writer() - async def _run(self: _C) -> None: + async def _run(self: Self) -> None: while True: # If the output buffer gets too full, pause processing requests await self.drain() diff --git a/src/aiokatcp/server.py b/src/aiokatcp/server.py index fb96507..d7f8596 100644 --- a/src/aiokatcp/server.py +++ b/src/aiokatcp/server.py @@ -1,4 +1,4 @@ -# Copyright 2017, 2019, 2022 National Research Foundation (SARAO) +# Copyright 2017, 2019, 2022, 2024 National Research Foundation (SARAO) # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: @@ -64,6 +64,8 @@ class ClientConnection(connection.Connection): """Server's view of the connection from a single client.""" + owner: "DeviceServer" + def __init__( self, owner: "DeviceServer", @@ -99,7 +101,7 @@ def sensor_update(self, s: sensor.Sensor, reading: sensor.Reading) -> None: msg = core.Message.inform( "sensor-status", reading.timestamp, 1, s.name, reading.status, reading.value ) - self.write_message(msg) + self.owner._write_async_message(self, msg) class RequestContext: