From db1a80d6e83f60d6751a5e00309a4b42edb93de6 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 7 Nov 2023 12:40:34 +0100 Subject: [PATCH 01/13] Remove `Bidirectional` channel Having bidirectional channels is not a recommended practice, as it could introduce deadlocks. It is a very niche use case, so it is better to remove it. Signed-off-by: Leandro Lucarella --- .github/ISSUE_TEMPLATE/bug.yml | 2 +- src/frequenz/channels/__init__.py | 5 - src/frequenz/channels/_bidirectional.py | 206 ------------------------ tests/test_bidirectional.py | 83 ---------- 4 files changed, 1 insertion(+), 295 deletions(-) delete mode 100644 src/frequenz/channels/_bidirectional.py delete mode 100644 tests/test_bidirectional.py diff --git a/.github/ISSUE_TEMPLATE/bug.yml b/.github/ISSUE_TEMPLATE/bug.yml index 3b12aa28..bd567007 100644 --- a/.github/ISSUE_TEMPLATE/bug.yml +++ b/.github/ISSUE_TEMPLATE/bug.yml @@ -50,7 +50,7 @@ body: - Documentation (part:docs) - Unit, integration and performance tests (part:tests) - Build script, CI, dependencies, etc. (part:tooling) - - Channels, `Broadcast`, `Bidirectional`, etc. (part:channels) + - Channels, `Broadcast`, `Anycast`, etc. (part:channels) - Select (part:select) - Utility receivers, `Merge`, etc. (part:receivers) validations: diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index 3489452b..2089403f 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -12,9 +12,6 @@ senders and multiple receivers. A message sent through a sender will be received by exactly one receiver. -* [Bidirectional][frequenz.channels.Bidirectional]: A channel providing - a `client` and a `service` handle to send and receive bidirectionally. - * [Broadcast][frequenz.channels.Broadcast]: A channel to broadcast messages from multiple senders to multiple receivers. Each message sent through any of the senders is received by all of the receivers. @@ -64,7 +61,6 @@ from . import util from ._anycast import Anycast from ._base_classes import Peekable, Receiver, Sender -from ._bidirectional import Bidirectional from ._broadcast import Broadcast from ._exceptions import ( ChannelClosedError, @@ -78,7 +74,6 @@ __all__ = [ "Anycast", - "Bidirectional", "Broadcast", "ChannelClosedError", "ChannelError", diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py deleted file mode 100644 index d535705b..00000000 --- a/src/frequenz/channels/_bidirectional.py +++ /dev/null @@ -1,206 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""An abstraction to provide bi-directional communication between actors.""" - -from __future__ import annotations - -from typing import Generic, TypeVar - -from ._base_classes import Receiver, Sender, T, U -from ._broadcast import Broadcast -from ._exceptions import ChannelError, ReceiverError, SenderError - -V = TypeVar("V") -W = TypeVar("W") - - -class Bidirectional(Generic[T, U]): - """A wrapper class for simulating bidirectional channels.""" - - class Handle(Sender[V], Receiver[W]): - """A handle to a [Bidirectional][frequenz.channels.Bidirectional] instance. - - It can be used to send/receive values between the client and service. - """ - - def __init__( - self, - channel: Bidirectional[V, W] | Bidirectional[W, V], - sender: Sender[V], - receiver: Receiver[W], - ) -> None: - """Create a `Bidirectional.Handle` instance. - - Args: - channel: The underlying channel. - sender: A sender to send values with. - receiver: A receiver to receive values from. - """ - self._chan: Bidirectional[V, W] | Bidirectional[W, V] = channel - """The underlying channel.""" - - self._sender: Sender[V] = sender - """The sender to send values with.""" - - self._receiver: Receiver[W] = receiver - """The receiver to receive values from.""" - - async def send(self, msg: V) -> None: - """Send a value to the other side. - - Args: - msg: The value to send. - - Raises: - SenderError: if the underlying channel was closed. - A [ChannelClosedError][frequenz.channels.ChannelClosedError] - is set as the cause. - """ - try: - await self._sender.send(msg) - except SenderError as err: - # If this comes from a channel error, then we inject another - # ChannelError having the information about the Bidirectional - # channel to hide (at least partially) the underlying - # Broadcast channels we use. - if isinstance(err.__cause__, ChannelError): - this_chan_error = ChannelError( - f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}", - self._chan, # pylint: disable=protected-access - ) - this_chan_error.__cause__ = err.__cause__ - err.__cause__ = this_chan_error - raise err - - async def ready(self) -> bool: - """Wait until the receiver is ready with a value or an error. - - Once a call to `ready()` has finished, the value should be read with - a call to `consume()` (`receive()` or iterated over). The receiver will - remain ready (this method will return immediately) until it is - consumed. - - Returns: - Whether the receiver is still active. - """ - return await self._receiver.ready() # pylint: disable=protected-access - - def consume(self) -> W: - """Return the latest value once `_ready` is complete. - - Returns: - The next value that was received. - - Raises: - ReceiverStoppedError: if there is some problem with the receiver. - ReceiverError: if there is some problem with the receiver. - """ - try: - return self._receiver.consume() # pylint: disable=protected-access - except ReceiverError as err: - # If this comes from a channel error, then we inject another - # ChannelError having the information about the Bidirectional - # channel to hide (at least partially) the underlying - # Broadcast channels we use. - if isinstance(err.__cause__, ChannelError): - this_chan_error = ChannelError( - f"Error in the underlying channel {err.__cause__.channel}: {err.__cause__}", - self._chan, # pylint: disable=protected-access - ) - this_chan_error.__cause__ = err.__cause__ - err.__cause__ = this_chan_error - raise err - - def __str__(self) -> str: - """Return a string representation of this handle.""" - return f"{type(self).__name__}:{self._chan}" - - def __repr__(self) -> str: - """Return a string representation of this handle.""" - return ( - f"{type(self).__name__}(channel={self._chan!r}, " - f"sender={self._sender!r}, receiver={self._receiver!r})" - ) - - def __init__(self, *, name: str) -> None: - """Create a `Bidirectional` instance. - - Args: - name: The name of the channel. This is used for logging, and it is used - in the string representation and to name the underlying channels. - """ - self._name: str = name - """The name for the client, used to name the channels.""" - - self._request_channel: Broadcast[T] = Broadcast(name=f"{self._name}:request") - """The channel to send requests.""" - - self._response_channel: Broadcast[U] = Broadcast(name=f"{self._name}:response") - """The channel to send responses.""" - - self._client_handle: Bidirectional.Handle[T, U] = Bidirectional.Handle( - self, - self._request_channel.new_sender(), - self._response_channel.new_receiver(), - ) - """The handle for the client side to send/receive values.""" - - self._service_handle: Bidirectional.Handle[U, T] = Bidirectional.Handle( - self, - self._response_channel.new_sender(), - self._request_channel.new_receiver(), - ) - """The handle for the service side to send/receive values.""" - - @property - def name(self) -> str: - """The name of this channel. - - This is for logging purposes, and it will be shown in the string representation - of this channel. - """ - return self._name - - @property - def is_closed(self) -> bool: - """Whether this channel is closed. - - Any further attempts to use this channel after it is closed will result in an - exception. - - As long as there is a way to send or receive data, the channel is considered - open, even if the other side is closed, so this returns `False` if only both - underlying channels are closed. - """ - return self._request_channel.is_closed and self._response_channel.is_closed - - @property - def client_handle(self) -> Bidirectional.Handle[T, U]: - """Get a `Handle` for the client side to use. - - Returns: - Object to send/receive messages with. - """ - return self._client_handle - - @property - def service_handle(self) -> Bidirectional.Handle[U, T]: - """Get a `Handle` for the service side to use. - - Returns: - Object to send/receive messages with. - """ - return self._service_handle - - def __str__(self) -> str: - """Return a string representation of this channel.""" - return f"{type(self).__name__}:{self._name}" - - def __repr__(self) -> str: - """Return a string representation of this channel.""" - return ( - f"{type(self).__name__}(name={self._name!r}):<" - f"request_channel={self._request_channel!r}, " - f"response_channel={self._response_channel!r}>" - ) diff --git a/tests/test_bidirectional.py b/tests/test_bidirectional.py deleted file mode 100644 index 30d59335..00000000 --- a/tests/test_bidirectional.py +++ /dev/null @@ -1,83 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""Tests for the RequestResponse implementation.""" - -import asyncio - -import pytest - -from frequenz.channels import ( - Bidirectional, - ChannelClosedError, - ChannelError, - ReceiverError, - SenderError, -) - - -async def test_request_response() -> None: - """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") - - async def service(handle: Bidirectional.Handle[str, int]) -> None: - while True: - num = await handle.receive() - if num is None: - break - if num == 42: - break - if num >= 0: - await handle.send("positive") - else: - await handle.send("negative") - - service_task = asyncio.create_task( - service(req_resp.service_handle), - ) - - client_handle: Bidirectional.Handle[int, str] = req_resp.client_handle - - for ctr in range(-5, 5): - await client_handle.send(ctr) - ret = await client_handle.receive() - if ctr < 0: - assert ret == "negative" - else: - assert ret == "positive" - - await client_handle.send(42) # Stop the service task - await service_task - - -async def test_sender_error_chaining() -> None: - """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") - - await req_resp._response_channel.close() # pylint: disable=protected-access - - with pytest.raises(SenderError, match="The channel was closed") as exc_info: - await req_resp.service_handle.send("I'm closed!") - - err = exc_info.value - cause = err.__cause__ - assert isinstance(cause, ChannelError) - assert cause.args[0].startswith("Error in the underlying channel") - assert isinstance(cause.__cause__, ChannelClosedError) - - -async def test_consume_error_chaining() -> None: - """Ensure bi-directional communication is possible.""" - req_resp: Bidirectional[int, str] = Bidirectional(name="test_service") - - await req_resp._request_channel.close() # pylint: disable=protected-access - - await req_resp.service_handle.ready() - with pytest.raises(ReceiverError, match="Receiver .* was stopped") as exc_info: - _ = req_resp.service_handle.consume() - - err = exc_info.value - cause = err.__cause__ - assert isinstance(cause, ChannelError) - assert cause.args[0].startswith("Error in the underlying channel") - assert isinstance(cause.__cause__, ChannelClosedError) From 6f73235e44c9a984acc0ae7d3edfcb379f10c61b Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 7 Nov 2023 12:47:35 +0100 Subject: [PATCH 02/13] Remove `Peekable` and associated methods/classes `Peekable` was a class that allowed users to peek at the latest value in a channel, without consuming anything. Even when useful in principle, it is a special case that breaks the whole channel abstraction, and can easily be emulated by just using a normal receiver and caching the last value. Also the following symbols are removed as they are not longer used: * 'Broadcast.new_peekable` * `Receiver.into_peekable` * `ReceiverInvalidatedError` Signed-off-by: Leandro Lucarella --- src/frequenz/channels/__init__.py | 12 +--- src/frequenz/channels/_base_classes.py | 34 --------- src/frequenz/channels/_broadcast.py | 99 +------------------------- src/frequenz/channels/_exceptions.py | 9 --- tests/test_broadcast.py | 27 ------- 5 files changed, 3 insertions(+), 178 deletions(-) diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index 2089403f..2fddc726 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -18,9 +18,6 @@ Other base classes: -* [Peekable][frequenz.channels.Peekable]: An object to allow users to get - a peek at the latest value in the channel, without consuming anything. - * [Receiver][frequenz.channels.Receiver]: An object that can wait for and consume messages from a channel. @@ -52,22 +49,17 @@ * [ReceiverStoppedError][frequenz.channels.ReceiverStoppedError]: A receiver stopped producing messages. - -* [ReceiverInvalidatedError][frequenz.channels.ReceiverInvalidatedError]: - A receiver is not longer valid (for example if it was converted into - a peekable. """ from . import util from ._anycast import Anycast -from ._base_classes import Peekable, Receiver, Sender +from ._base_classes import Receiver, Sender from ._broadcast import Broadcast from ._exceptions import ( ChannelClosedError, ChannelError, Error, ReceiverError, - ReceiverInvalidatedError, ReceiverStoppedError, SenderError, ) @@ -78,10 +70,8 @@ "ChannelClosedError", "ChannelError", "Error", - "Peekable", "Receiver", "ReceiverError", - "ReceiverInvalidatedError", "ReceiverStoppedError", "Sender", "SenderError", diff --git a/src/frequenz/channels/_base_classes.py b/src/frequenz/channels/_base_classes.py index 55a13644..179027b0 100644 --- a/src/frequenz/channels/_base_classes.py +++ b/src/frequenz/channels/_base_classes.py @@ -122,40 +122,6 @@ def map(self, call: Callable[[T], U]) -> Receiver[U]: """ return _Map(self, call) - def into_peekable(self) -> Peekable[T]: - """Convert the `Receiver` implementation into a `Peekable`. - - Once this function has been called, the receiver will no longer be - usable, and calling `receive` on the receiver will raise an exception. - - Returns: - A `Peekable` that can be used to peek at the latest value in the - channel. - - Raises: - NotImplementedError: when a `Receiver` implementation doesn't have - a custom `into_peekable` implementation. - """ - raise NotImplementedError("This receiver does not implement `into_peekable`") - - -class Peekable(ABC, Generic[T]): - """A channel peekable. - - A Peekable provides a [peek()][frequenz.channels.Peekable] method that - allows the user to get a peek at the latest value in the channel, without - consuming anything. - """ - - @abstractmethod - def peek(self) -> T | None: - """Return the latest value that was sent to the channel. - - Returns: - The latest value received by the channel, and `None`, if nothing - has been sent to the channel yet. - """ - class _Map(Receiver[U], Generic[T, U]): """Apply a transform function on a channel receiver. diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index b7ca9d56..3c8ae94e 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -11,16 +11,10 @@ from collections import deque from typing import Generic -from ._base_classes import Peekable as BasePeekable from ._base_classes import Receiver as BaseReceiver from ._base_classes import Sender as BaseSender from ._base_classes import T -from ._exceptions import ( - ChannelClosedError, - ReceiverInvalidatedError, - ReceiverStoppedError, - SenderError, -) +from ._exceptions import ChannelClosedError, ReceiverStoppedError, SenderError _logger = logging.Logger(__name__) @@ -176,18 +170,6 @@ def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[ recv.enqueue(self._latest) return recv - def new_peekable(self) -> Peekable[T]: - """Create a new Peekable for the broadcast channel. - - A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method - that allows the user to get a peek at the latest value in the channel, - without consuming anything. - - Returns: - A Peekable to peek into the broadcast channel with. - """ - return Peekable(self) - def __str__(self) -> str: """Return a string representation of this receiver.""" return f"{type(self).__name__}:{self._name}" @@ -295,13 +277,6 @@ def __init__(self, name: str | None, limit: int, chan: Broadcast[T]) -> None: self._q: deque[T] = deque(maxlen=limit) """The receiver's internal message queue.""" - self._active: bool = True - """Whether the receiver is still active. - - If this receiver is converted into a Peekable, it will neither be - considered valid nor active. - """ - def enqueue(self, msg: T) -> None: """Put a message into this receiver's queue. @@ -343,10 +318,6 @@ async def ready(self) -> bool: if self._q: return True - # if it is not longer active, return immediately - if not self._active: - return False - # Use a while loop here, to handle spurious wakeups of condition variables. # # The condition also makes sure that if there are already messages ready to be @@ -360,15 +331,6 @@ async def ready(self) -> bool: return True # pylint: enable=protected-access - def _deactivate(self) -> None: - """Set the receiver as inactive and remove it from the channel.""" - self._active = False - # pylint: disable=protected-access - _hash = hash(self) - if _hash in self._chan._receivers: - del self._chan._receivers[_hash] - # pylint: enable=protected-access - def consume(self) -> T: """Return the latest value once `ready` is complete. @@ -377,34 +339,13 @@ def consume(self) -> T: Raises: ReceiverStoppedError: if there is some problem with the receiver. - ReceiverInvalidatedError: if the receiver was converted into - a peekable. """ - if not self._q and not self._active: - raise ReceiverInvalidatedError( - "This receiver was converted into a Peekable so it is not longer valid.", - self, - ) - if not self._q and self._chan._closed: # pylint: disable=protected-access raise ReceiverStoppedError(self) from ChannelClosedError(self._chan) assert self._q, "`consume()` must be preceded by a call to `ready()`" return self._q.popleft() - def into_peekable(self) -> Peekable[T]: - """Convert the `Receiver` implementation into a `Peekable`. - - Once this function has been called, the receiver will no longer be - usable, and calling [receive()][frequenz.channels.Receiver.receive] on - the receiver will raise an exception. - - Returns: - A `Peekable` instance. - """ - self._deactivate() - return Peekable(self._chan) - def __str__(self) -> str: """Return a string representation of this receiver.""" return f"{self._chan}:{type(self).__name__}" @@ -415,41 +356,5 @@ def __repr__(self) -> str: assert limit is not None return ( f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, " - f"{self._chan!r}):" + f"{self._chan!r}):" ) - - -class Peekable(BasePeekable[T]): - """A Peekable to peek into broadcast channels. - - A Peekable provides a [peek()][frequenz.channels.Peekable] method that - allows the user to get a peek at the latest value in the channel, without - consuming anything. - """ - - def __init__(self, chan: Broadcast[T]) -> None: - """Create a `Peekable` instance. - - Args: - chan: The broadcast channel this Peekable will try to peek into. - """ - self._chan: Broadcast[T] = chan - """The broadcast channel this Peekable will try to peek into.""" - - def peek(self) -> T | None: - """Return the latest value that was sent to the channel. - - Returns: - The latest value received by the channel, and `None`, if nothing - has been sent to the channel yet, or if the channel is closed. - """ - return self._chan._latest # pylint: disable=protected-access - - def __str__(self) -> str: - """Return a string representation of this receiver.""" - return f"{self._chan}:{type(self).__name__}" - - def __repr__(self) -> str: - """Return a string representation of this receiver.""" - return f"{type(self).__name__}({self._chan!r}):" diff --git a/src/frequenz/channels/_exceptions.py b/src/frequenz/channels/_exceptions.py index 921c6237..adfd8e68 100644 --- a/src/frequenz/channels/_exceptions.py +++ b/src/frequenz/channels/_exceptions.py @@ -107,12 +107,3 @@ def __init__(self, receiver: _base_classes.Receiver[T]): error happened. """ super().__init__(f"Receiver {receiver} was stopped", receiver) - - -class ReceiverInvalidatedError(ReceiverError[T]): - """The [Receiver][frequenz.channels.Receiver] was invalidated. - - This happens when the Receiver is converted - [into][frequenz.channels.Receiver.into_peekable] - a [Peekable][frequenz.channels.Peekable]. - """ diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 8d1f4aec..ec578e35 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -12,7 +12,6 @@ Broadcast, ChannelClosedError, Receiver, - ReceiverInvalidatedError, ReceiverStoppedError, Sender, SenderError, @@ -187,32 +186,6 @@ async def test_broadcast_no_resend_latest() -> None: assert await new_recv.receive() == 100 -async def test_broadcast_peek() -> None: - """Ensure we are able to peek into broadcast channels.""" - bcast: Broadcast[int] = Broadcast(name="peek-test") - receiver = bcast.new_receiver() - peekable = receiver.into_peekable() - sender = bcast.new_sender() - - with pytest.raises(ReceiverInvalidatedError): - await receiver.receive() - - assert peekable.peek() is None - - for val in range(0, 10): - await sender.send(val) - - assert peekable.peek() == 9 - assert peekable.peek() == 9 - - await sender.send(20) - - assert peekable.peek() == 20 - - await bcast.close() - assert peekable.peek() is None - - async def test_broadcast_async_iterator() -> None: """Check that the broadcast receiver works as an async iterator.""" bcast: Broadcast[int] = Broadcast(name="iter_test") From e1358ec9eb493a120760968d5782d5de9d3aa0c1 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 7 Nov 2023 13:56:36 +0100 Subject: [PATCH 03/13] Don't import `TypeVar`s from other modules There is little gain in importing type variables from other modules, as it adds unncecessary dependencies between modules. Type variables are just an artifact to define stuff locally. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_anycast.py | 5 +++-- src/frequenz/channels/_broadcast.py | 5 +++-- src/frequenz/channels/util/_merge.py | 6 ++++-- src/frequenz/channels/util/_merge_named.py | 6 ++++-- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 21293b94..45e0011b 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -8,15 +8,16 @@ import logging from asyncio import Condition from collections import deque -from typing import Generic +from typing import Generic, TypeVar from ._base_classes import Receiver as BaseReceiver from ._base_classes import Sender as BaseSender -from ._base_classes import T from ._exceptions import ChannelClosedError, ReceiverStoppedError, SenderError _logger = logging.getLogger(__name__) +T = TypeVar("T") + class Anycast(Generic[T]): """A channel for sending data across async tasks. diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 3c8ae94e..40d4a2e8 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -9,15 +9,16 @@ import weakref from asyncio import Condition from collections import deque -from typing import Generic +from typing import Generic, TypeVar from ._base_classes import Receiver as BaseReceiver from ._base_classes import Sender as BaseSender -from ._base_classes import T from ._exceptions import ChannelClosedError, ReceiverStoppedError, SenderError _logger = logging.Logger(__name__) +T = TypeVar("T") + class Broadcast(Generic[T]): """A channel to broadcast messages to multiple receivers. diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/util/_merge.py index 5baeea31..a6e5780c 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/util/_merge.py @@ -6,11 +6,13 @@ import asyncio import itertools from collections import deque -from typing import Any +from typing import Any, TypeVar -from .._base_classes import Receiver, T +from .._base_classes import Receiver from .._exceptions import ReceiverStoppedError +T = TypeVar("T") + class Merge(Receiver[T]): """Merge messages coming from multiple channels into a single stream. diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/util/_merge_named.py index 271f72da..24de1c04 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/util/_merge_named.py @@ -5,11 +5,13 @@ import asyncio import itertools from collections import deque -from typing import Any +from typing import Any, TypeVar -from .._base_classes import Receiver, T +from .._base_classes import Receiver from .._exceptions import ReceiverStoppedError +T = TypeVar("T") + class MergeNamed(Receiver[tuple[str, T]]): """Merge messages coming from multiple named channels into a single stream. From cb68d99a730278e886361391a81fb0cf9f1dd935 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 7 Nov 2023 14:43:48 +0100 Subject: [PATCH 04/13] Make all `TypeVar`s private Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_anycast.py | 30 +++++++++---------- src/frequenz/channels/_base_classes.py | 30 +++++++++---------- src/frequenz/channels/_broadcast.py | 34 +++++++++++----------- src/frequenz/channels/_exceptions.py | 18 ++++++------ src/frequenz/channels/util/_merge.py | 12 ++++---- src/frequenz/channels/util/_merge_named.py | 12 ++++---- 6 files changed, 68 insertions(+), 68 deletions(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 45e0011b..0ccd782a 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -16,10 +16,10 @@ _logger = logging.getLogger(__name__) -T = TypeVar("T") +_T = TypeVar("_T") -class Anycast(Generic[T]): +class Anycast(Generic[_T]): """A channel for sending data across async tasks. Anycast channels support multiple senders and multiple receivers. A message sent @@ -88,7 +88,7 @@ def __init__(self, *, name: str, limit: int = 10) -> None: of the channel. """ - self._deque: deque[T] = deque(maxlen=limit) + self._deque: deque[_T] = deque(maxlen=limit) """The channel's buffer.""" self._send_cv: Condition = Condition() @@ -158,7 +158,7 @@ async def close(self) -> None: async with self._recv_cv: self._recv_cv.notify_all() - def new_sender(self) -> Sender[T]: + def new_sender(self) -> Sender[_T]: """Create a new sender. Returns: @@ -166,7 +166,7 @@ def new_sender(self) -> Sender[T]: """ return Sender(self) - def new_receiver(self) -> Receiver[T]: + def new_receiver(self) -> Receiver[_T]: """Create a new receiver. Returns: @@ -186,23 +186,23 @@ def __repr__(self) -> str: ) -class Sender(BaseSender[T]): +class Sender(BaseSender[_T]): """A sender to send messages to an Anycast channel. Should not be created directly, but through the `Anycast.new_sender()` method. """ - def __init__(self, chan: Anycast[T]) -> None: + def __init__(self, chan: Anycast[_T]) -> None: """Create a channel sender. Args: chan: A reference to the channel that this sender belongs to. """ - self._chan: Anycast[T] = chan + self._chan: Anycast[_T] = chan """The channel that this sender belongs to.""" - async def send(self, msg: T) -> None: + async def send(self, msg: _T) -> None: """Send a message across the channel. To send, this method inserts the message into the Anycast channel's @@ -254,23 +254,23 @@ class _Empty: """A sentinel value to indicate that a value has not been set.""" -class Receiver(BaseReceiver[T]): +class Receiver(BaseReceiver[_T]): """A receiver to receive messages from an Anycast channel. Should not be created directly, but through the `Anycast.new_receiver()` method. """ - def __init__(self, chan: Anycast[T]) -> None: + def __init__(self, chan: Anycast[_T]) -> None: """Create a channel receiver. Args: chan: A reference to the channel that this receiver belongs to. """ - self._chan: Anycast[T] = chan + self._chan: Anycast[_T] = chan """The channel that this receiver belongs to.""" - self._next: T | type[_Empty] = _Empty + self._next: _T | type[_Empty] = _Empty async def ready(self) -> bool: """Wait until the receiver is ready with a value or an error. @@ -299,7 +299,7 @@ async def ready(self) -> bool: # pylint: enable=protected-access return True - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready()` is complete. Returns: @@ -319,7 +319,7 @@ def consume(self) -> T: ), "`consume()` must be preceded by a call to `ready()`" # mypy doesn't understand that the assert above ensures that self._next is not # _Sentinel. So we have to use a type ignore here. - next_val: T = self._next # type: ignore[assignment] + next_val: _T = self._next # type: ignore[assignment] self._next = _Empty return next_val diff --git a/src/frequenz/channels/_base_classes.py b/src/frequenz/channels/_base_classes.py index 179027b0..164c90fc 100644 --- a/src/frequenz/channels/_base_classes.py +++ b/src/frequenz/channels/_base_classes.py @@ -11,15 +11,15 @@ from ._exceptions import ReceiverStoppedError -T = TypeVar("T") -U = TypeVar("U") +_T = TypeVar("_T") +_U = TypeVar("_U") -class Sender(ABC, Generic[T]): +class Sender(ABC, Generic[_T]): """A channel Sender.""" @abstractmethod - async def send(self, msg: T) -> None: + async def send(self, msg: _T) -> None: """Send a message to the channel. Args: @@ -30,10 +30,10 @@ async def send(self, msg: T) -> None: """ -class Receiver(ABC, Generic[T]): +class Receiver(ABC, Generic[_T]): """A channel Receiver.""" - async def __anext__(self) -> T: + async def __anext__(self) -> _T: """Await the next value in the async iteration over received values. Returns: @@ -63,7 +63,7 @@ async def ready(self) -> bool: """ @abstractmethod - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready()` is complete. `ready()` must be called before each call to `consume()`. @@ -76,7 +76,7 @@ def consume(self) -> T: ReceiverError: if there is some problem with the receiver. """ - def __aiter__(self) -> Receiver[T]: + def __aiter__(self) -> Receiver[_T]: """Initialize the async iterator over received values. Returns: @@ -84,7 +84,7 @@ def __aiter__(self) -> Receiver[T]: """ return self - async def receive(self) -> T: + async def receive(self) -> _T: """Receive a message from the channel. Returns: @@ -111,7 +111,7 @@ async def receive(self) -> T: raise ReceiverStoppedError(self) from exc return received - def map(self, call: Callable[[T], U]) -> Receiver[U]: + def map(self, call: Callable[[_T], _U]) -> Receiver[_U]: """Return a receiver with `call` applied on incoming messages. Args: @@ -123,7 +123,7 @@ def map(self, call: Callable[[T], U]) -> Receiver[U]: return _Map(self, call) -class _Map(Receiver[U], Generic[T, U]): +class _Map(Receiver[_U], Generic[_T, _U]): """Apply a transform function on a channel receiver. Has two generic types: @@ -132,17 +132,17 @@ class _Map(Receiver[U], Generic[T, U]): - The output type: return type of the transform method. """ - def __init__(self, receiver: Receiver[T], transform: Callable[[T], U]) -> None: + def __init__(self, receiver: Receiver[_T], transform: Callable[[_T], _U]) -> None: """Create a `Transform` instance. Args: receiver: The input receiver. transform: The function to run on the input data. """ - self._receiver: Receiver[T] = receiver + self._receiver: Receiver[_T] = receiver """The input receiver.""" - self._transform: Callable[[T], U] = transform + self._transform: Callable[[_T], _U] = transform """The function to run on the input data.""" async def ready(self) -> bool: @@ -160,7 +160,7 @@ async def ready(self) -> bool: # We need a noqa here because the docs have a Raises section but the code doesn't # explicitly raise anything. - def consume(self) -> U: # noqa: DOC502 + def consume(self) -> _U: # noqa: DOC502 """Return a transformed value once `ready()` is complete. Returns: diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 40d4a2e8..5085222b 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -17,10 +17,10 @@ _logger = logging.Logger(__name__) -T = TypeVar("T") +_T = TypeVar("_T") -class Broadcast(Generic[T]): +class Broadcast(Generic[_T]): """A channel to broadcast messages to multiple receivers. `Broadcast` channels can have multiple senders and multiple receivers. Each @@ -88,13 +88,13 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: self._recv_cv: Condition = Condition() """The condition to wait for data in the channel's buffer.""" - self._receivers: dict[int, weakref.ReferenceType[Receiver[T]]] = {} + self._receivers: dict[int, weakref.ReferenceType[Receiver[_T]]] = {} """The receivers attached to the channel, indexed by their hash().""" self._closed: bool = False """Whether the channel is closed.""" - self._latest: T | None = None + self._latest: _T | None = None """The latest value sent to the channel.""" self.resend_latest: bool = resend_latest @@ -143,7 +143,7 @@ async def close(self) -> None: async with self._recv_cv: self._recv_cv.notify_all() - def new_sender(self) -> Sender[T]: + def new_sender(self) -> Sender[_T]: """Create a new broadcast sender. Returns: @@ -151,7 +151,7 @@ def new_sender(self) -> Sender[T]: """ return Sender(self) - def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[T]: + def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[_T]: """Create a new broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -165,7 +165,7 @@ def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[ Returns: A Receiver instance attached to the broadcast channel. """ - recv: Receiver[T] = Receiver(name, limit, self) + recv: Receiver[_T] = Receiver(name, limit, self) self._receivers[hash(recv)] = weakref.ref(recv) if self.resend_latest and self._latest is not None: recv.enqueue(self._latest) @@ -186,7 +186,7 @@ def __repr__(self) -> str: ) -class Sender(BaseSender[T]): +class Sender(BaseSender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the @@ -194,16 +194,16 @@ class Sender(BaseSender[T]): method. """ - def __init__(self, chan: Broadcast[T]) -> None: + def __init__(self, chan: Broadcast[_T]) -> None: """Create a Broadcast sender. Args: chan: A reference to the broadcast channel this sender belongs to. """ - self._chan: Broadcast[T] = chan + self._chan: Broadcast[_T] = chan """The broadcast channel this sender belongs to.""" - async def send(self, msg: T) -> None: + async def send(self, msg: _T) -> None: """Send a message to all broadcast receivers. Args: @@ -242,7 +242,7 @@ def __repr__(self) -> str: return f"{type(self).__name__}({self._chan!r})" -class Receiver(BaseReceiver[T]): +class Receiver(BaseReceiver[_T]): """A receiver to receive messages from the broadcast channel. Should not be created directly, but through the @@ -250,7 +250,7 @@ class Receiver(BaseReceiver[T]): method. """ - def __init__(self, name: str | None, limit: int, chan: Broadcast[T]) -> None: + def __init__(self, name: str | None, limit: int, chan: Broadcast[_T]) -> None: """Create a broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -272,13 +272,13 @@ def __init__(self, name: str | None, limit: int, chan: Broadcast[T]) -> None: Only used for debugging purposes. """ - self._chan: Broadcast[T] = chan + self._chan: Broadcast[_T] = chan """The broadcast channel that this receiver belongs to.""" - self._q: deque[T] = deque(maxlen=limit) + self._q: deque[_T] = deque(maxlen=limit) """The receiver's internal message queue.""" - def enqueue(self, msg: T) -> None: + def enqueue(self, msg: _T) -> None: """Put a message into this receiver's queue. To be called by broadcast senders. If the receiver's queue is already @@ -332,7 +332,7 @@ async def ready(self) -> bool: return True # pylint: enable=protected-access - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready` is complete. Returns: diff --git a/src/frequenz/channels/_exceptions.py b/src/frequenz/channels/_exceptions.py index adfd8e68..9d7b1fcb 100644 --- a/src/frequenz/channels/_exceptions.py +++ b/src/frequenz/channels/_exceptions.py @@ -10,7 +10,7 @@ if TYPE_CHECKING: from . import _base_classes -T = TypeVar("T") +_T = TypeVar("_T") class Error(RuntimeError): @@ -58,13 +58,13 @@ def __init__(self, channel: Any): super().__init__(f"Channel {channel} was closed", channel) -class SenderError(Error, Generic[T]): +class SenderError(Error, Generic[_T]): """An error produced in a [Sender][frequenz.channels.Sender]. All exceptions generated by senders inherit from this exception. """ - def __init__(self, message: str, sender: _base_classes.Sender[T]): + def __init__(self, message: str, sender: _base_classes.Sender[_T]): """Create an instance. Args: @@ -73,17 +73,17 @@ def __init__(self, message: str, sender: _base_classes.Sender[T]): happened. """ super().__init__(message) - self.sender: _base_classes.Sender[T] = sender + self.sender: _base_classes.Sender[_T] = sender """The sender where the error happened.""" -class ReceiverError(Error, Generic[T]): +class ReceiverError(Error, Generic[_T]): """An error produced in a [Receiver][frequenz.channels.Receiver]. All exceptions generated by receivers inherit from this exception. """ - def __init__(self, message: str, receiver: _base_classes.Receiver[T]): + def __init__(self, message: str, receiver: _base_classes.Receiver[_T]): """Create an instance. Args: @@ -92,14 +92,14 @@ def __init__(self, message: str, receiver: _base_classes.Receiver[T]): error happened. """ super().__init__(message) - self.receiver: _base_classes.Receiver[T] = receiver + self.receiver: _base_classes.Receiver[_T] = receiver """The receiver where the error happened.""" -class ReceiverStoppedError(ReceiverError[T]): +class ReceiverStoppedError(ReceiverError[_T]): """The [Receiver][frequenz.channels.Receiver] stopped producing messages.""" - def __init__(self, receiver: _base_classes.Receiver[T]): + def __init__(self, receiver: _base_classes.Receiver[_T]): """Create an instance. Args: diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/util/_merge.py index a6e5780c..6fba3f97 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/util/_merge.py @@ -11,10 +11,10 @@ from .._base_classes import Receiver from .._exceptions import ReceiverStoppedError -T = TypeVar("T") +_T = TypeVar("_T") -class Merge(Receiver[T]): +class Merge(Receiver[_T]): """Merge messages coming from multiple channels into a single stream. Example: @@ -40,20 +40,20 @@ class Merge(Receiver[T]): `self.stop()` method. This will cleanup any internal pending async tasks. """ - def __init__(self, *args: Receiver[T]) -> None: + def __init__(self, *args: Receiver[_T]) -> None: """Create a `Merge` instance. Args: *args: sequence of channel receivers. """ - self._receivers: dict[str, Receiver[T]] = { + self._receivers: dict[str, Receiver[_T]] = { str(id): recv for id, recv in enumerate(args) } self._pending: set[asyncio.Task[Any]] = { asyncio.create_task(anext(recv), name=name) for name, recv in self._receivers.items() } - self._results: deque[T] = deque(maxlen=len(self._receivers)) + self._results: deque[_T] = deque(maxlen=len(self._receivers)) def __del__(self) -> None: """Cleanup any pending tasks.""" @@ -104,7 +104,7 @@ async def ready(self) -> bool: asyncio.create_task(anext(self._receivers[name]), name=name) ) - def consume(self) -> T: + def consume(self) -> _T: """Return the latest value once `ready` is complete. Returns: diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/util/_merge_named.py index 24de1c04..a81024bf 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/util/_merge_named.py @@ -10,23 +10,23 @@ from .._base_classes import Receiver from .._exceptions import ReceiverStoppedError -T = TypeVar("T") +_T = TypeVar("_T") -class MergeNamed(Receiver[tuple[str, T]]): +class MergeNamed(Receiver[tuple[str, _T]]): """Merge messages coming from multiple named channels into a single stream. When `MergeNamed` is no longer needed, then it should be stopped using `self.stop()` method. This will cleanup any internal pending async tasks. """ - def __init__(self, **kwargs: Receiver[T]) -> None: + def __init__(self, **kwargs: Receiver[_T]) -> None: """Create a `MergeNamed` instance. Args: **kwargs: sequence of channel receivers. """ - self._receivers: dict[str, Receiver[T]] = kwargs + self._receivers: dict[str, Receiver[_T]] = kwargs """The sequence of channel receivers to get the messages to merge.""" self._pending: set[asyncio.Task[Any]] = { @@ -35,7 +35,7 @@ def __init__(self, **kwargs: Receiver[T]) -> None: } """The set of pending tasks to merge messages.""" - self._results: deque[tuple[str, T]] = deque(maxlen=len(self._receivers)) + self._results: deque[tuple[str, _T]] = deque(maxlen=len(self._receivers)) """The internal buffer of merged messages.""" def __del__(self) -> None: @@ -88,7 +88,7 @@ async def ready(self) -> bool: asyncio.create_task(anext(self._receivers[name]), name=name) ) - def consume(self) -> tuple[str, T]: + def consume(self) -> tuple[str, _T]: """Return the latest value once `ready` is complete. Returns: From f7c86cc40002b8a95ab05619a9e881ae92f65bbf Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 7 Nov 2023 14:55:15 +0100 Subject: [PATCH 05/13] Split `_base_classes` into `_receiver` and `_sender` Also move the receiver exceptions from `_exceptions` to `_receiver` and the sender exceptions from `_exceptions` to `_sender`. This avoids circular imports. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/__init__.py | 12 +--- src/frequenz/channels/_anycast.py | 8 ++- src/frequenz/channels/_broadcast.py | 8 ++- src/frequenz/channels/_exceptions.py | 62 +------------------ .../{_base_classes.py => _receiver.py} | 55 ++++++++++------ src/frequenz/channels/_sender.py | 45 ++++++++++++++ src/frequenz/channels/util/_event.py | 6 +- src/frequenz/channels/util/_file_watcher.py | 3 +- src/frequenz/channels/util/_merge.py | 3 +- src/frequenz/channels/util/_merge_named.py | 3 +- src/frequenz/channels/util/_select.py | 3 +- src/frequenz/channels/util/_timer.py | 3 +- 12 files changed, 104 insertions(+), 107 deletions(-) rename src/frequenz/channels/{_base_classes.py => _receiver.py} (82%) create mode 100644 src/frequenz/channels/_sender.py diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index 2fddc726..5208a9ec 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -53,16 +53,10 @@ from . import util from ._anycast import Anycast -from ._base_classes import Receiver, Sender from ._broadcast import Broadcast -from ._exceptions import ( - ChannelClosedError, - ChannelError, - Error, - ReceiverError, - ReceiverStoppedError, - SenderError, -) +from ._exceptions import ChannelClosedError, ChannelError, Error +from ._receiver import Receiver, ReceiverError, ReceiverStoppedError +from ._sender import Sender, SenderError __all__ = [ "Anycast", diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 0ccd782a..e520e072 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -10,9 +10,11 @@ from collections import deque from typing import Generic, TypeVar -from ._base_classes import Receiver as BaseReceiver -from ._base_classes import Sender as BaseSender -from ._exceptions import ChannelClosedError, ReceiverStoppedError, SenderError +from ._exceptions import ChannelClosedError +from ._receiver import Receiver as BaseReceiver +from ._receiver import ReceiverStoppedError +from ._sender import Sender as BaseSender +from ._sender import SenderError _logger = logging.getLogger(__name__) diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 5085222b..9e7d373c 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -11,9 +11,11 @@ from collections import deque from typing import Generic, TypeVar -from ._base_classes import Receiver as BaseReceiver -from ._base_classes import Sender as BaseSender -from ._exceptions import ChannelClosedError, ReceiverStoppedError, SenderError +from ._exceptions import ChannelClosedError +from ._receiver import Receiver as BaseReceiver +from ._receiver import ReceiverStoppedError +from ._sender import Sender as BaseSender +from ._sender import SenderError _logger = logging.Logger(__name__) diff --git a/src/frequenz/channels/_exceptions.py b/src/frequenz/channels/_exceptions.py index 9d7b1fcb..559b2df7 100644 --- a/src/frequenz/channels/_exceptions.py +++ b/src/frequenz/channels/_exceptions.py @@ -1,16 +1,9 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""Exception classes.""" +"""Base exception classes.""" -from __future__ import annotations - -from typing import TYPE_CHECKING, Any, Generic, TypeVar - -if TYPE_CHECKING: - from . import _base_classes - -_T = TypeVar("_T") +from typing import Any class Error(RuntimeError): @@ -56,54 +49,3 @@ def __init__(self, channel: Any): channel: A reference to the channel that was closed. """ super().__init__(f"Channel {channel} was closed", channel) - - -class SenderError(Error, Generic[_T]): - """An error produced in a [Sender][frequenz.channels.Sender]. - - All exceptions generated by senders inherit from this exception. - """ - - def __init__(self, message: str, sender: _base_classes.Sender[_T]): - """Create an instance. - - Args: - message: An error message. - sender: The [Sender][frequenz.channels.Sender] where the error - happened. - """ - super().__init__(message) - self.sender: _base_classes.Sender[_T] = sender - """The sender where the error happened.""" - - -class ReceiverError(Error, Generic[_T]): - """An error produced in a [Receiver][frequenz.channels.Receiver]. - - All exceptions generated by receivers inherit from this exception. - """ - - def __init__(self, message: str, receiver: _base_classes.Receiver[_T]): - """Create an instance. - - Args: - message: An error message. - receiver: The [Receiver][frequenz.channels.Receiver] where the - error happened. - """ - super().__init__(message) - self.receiver: _base_classes.Receiver[_T] = receiver - """The receiver where the error happened.""" - - -class ReceiverStoppedError(ReceiverError[_T]): - """The [Receiver][frequenz.channels.Receiver] stopped producing messages.""" - - def __init__(self, receiver: _base_classes.Receiver[_T]): - """Create an instance. - - Args: - receiver: The [Receiver][frequenz.channels.Receiver] where the - error happened. - """ - super().__init__(f"Receiver {receiver} was stopped", receiver) diff --git a/src/frequenz/channels/_base_classes.py b/src/frequenz/channels/_receiver.py similarity index 82% rename from src/frequenz/channels/_base_classes.py rename to src/frequenz/channels/_receiver.py index 164c90fc..d3308837 100644 --- a/src/frequenz/channels/_base_classes.py +++ b/src/frequenz/channels/_receiver.py @@ -1,35 +1,20 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""Base classes for Channel Sender and Receiver.""" +"""Channel receiver and associated exceptions.""" from __future__ import annotations from abc import ABC, abstractmethod from collections.abc import Callable -from typing import Generic, TypeVar +from typing import Generic, Self, TypeVar -from ._exceptions import ReceiverStoppedError +from ._exceptions import Error _T = TypeVar("_T") _U = TypeVar("_U") -class Sender(ABC, Generic[_T]): - """A channel Sender.""" - - @abstractmethod - async def send(self, msg: _T) -> None: - """Send a message to the channel. - - Args: - msg: The message to be sent. - - Raises: - SenderError: if there was an error sending the message. - """ - - class Receiver(ABC, Generic[_T]): """A channel Receiver.""" @@ -76,7 +61,7 @@ def consume(self) -> _T: ReceiverError: if there is some problem with the receiver. """ - def __aiter__(self) -> Receiver[_T]: + def __aiter__(self) -> Self: """Initialize the async iterator over received values. Returns: @@ -123,6 +108,38 @@ def map(self, call: Callable[[_T], _U]) -> Receiver[_U]: return _Map(self, call) +class ReceiverError(Error, Generic[_T]): + """An error produced in a [Receiver][frequenz.channels.Receiver]. + + All exceptions generated by receivers inherit from this exception. + """ + + def __init__(self, message: str, receiver: Receiver[_T]): + """Create an instance. + + Args: + message: An error message. + receiver: The [Receiver][frequenz.channels.Receiver] where the + error happened. + """ + super().__init__(message) + self.receiver: Receiver[_T] = receiver + """The receiver where the error happened.""" + + +class ReceiverStoppedError(ReceiverError[_T]): + """The [Receiver][frequenz.channels.Receiver] stopped producing messages.""" + + def __init__(self, receiver: Receiver[_T]): + """Create an instance. + + Args: + receiver: The [Receiver][frequenz.channels.Receiver] where the + error happened. + """ + super().__init__(f"Receiver {receiver} was stopped", receiver) + + class _Map(Receiver[_U], Generic[_T, _U]): """Apply a transform function on a channel receiver. diff --git a/src/frequenz/channels/_sender.py b/src/frequenz/channels/_sender.py new file mode 100644 index 00000000..8cf00f86 --- /dev/null +++ b/src/frequenz/channels/_sender.py @@ -0,0 +1,45 @@ +# License: MIT +# Copyright © 2022 Frequenz Energy-as-a-Service GmbH + +"""Channel sender and associated exceptions.""" + +from abc import ABC, abstractmethod +from typing import Generic, TypeVar + +from ._exceptions import Error + +_T = TypeVar("_T") + + +class Sender(ABC, Generic[_T]): + """A channel Sender.""" + + @abstractmethod + async def send(self, msg: _T) -> None: + """Send a message to the channel. + + Args: + msg: The message to be sent. + + Raises: + SenderError: if there was an error sending the message. + """ + + +class SenderError(Error, Generic[_T]): + """An error produced in a [Sender][frequenz.channels.Sender]. + + All exceptions generated by senders inherit from this exception. + """ + + def __init__(self, message: str, sender: Sender[_T]): + """Create an instance. + + Args: + message: An error message. + sender: The [Sender][frequenz.channels.Sender] where the error + happened. + """ + super().__init__(message) + self.sender: Sender[_T] = sender + """The sender where the error happened.""" diff --git a/src/frequenz/channels/util/_event.py b/src/frequenz/channels/util/_event.py index ca43e262..536a80a5 100644 --- a/src/frequenz/channels/util/_event.py +++ b/src/frequenz/channels/util/_event.py @@ -6,10 +6,10 @@ import asyncio as _asyncio -from frequenz.channels import _base_classes, _exceptions +from frequenz.channels import _receiver -class Event(_base_classes.Receiver[None]): +class Event(_receiver.Receiver[None]): """A receiver that can be made ready through an event. The receiver (the [`ready()`][frequenz.channels.util.Event.ready] method) will wait @@ -134,7 +134,7 @@ def consume(self) -> None: ReceiverStoppedError: If this receiver is stopped. """ if not self._is_set and self._is_stopped: - raise _exceptions.ReceiverStoppedError(self) + raise _receiver.ReceiverStoppedError(self) assert self._is_set, "calls to `consume()` must be follow a call to `ready()`" diff --git a/src/frequenz/channels/util/_file_watcher.py b/src/frequenz/channels/util/_file_watcher.py index 5291a0f9..4713ba2e 100644 --- a/src/frequenz/channels/util/_file_watcher.py +++ b/src/frequenz/channels/util/_file_watcher.py @@ -14,8 +14,7 @@ from watchfiles import Change, awatch from watchfiles.main import FileChange -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from .._receiver import Receiver, ReceiverStoppedError class FileWatcher(Receiver["FileWatcher.Event"]): diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/util/_merge.py index 6fba3f97..7ff54f43 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/util/_merge.py @@ -8,8 +8,7 @@ from collections import deque from typing import Any, TypeVar -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from .._receiver import Receiver, ReceiverStoppedError _T = TypeVar("_T") diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/util/_merge_named.py index a81024bf..425cd3cc 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/util/_merge_named.py @@ -7,8 +7,7 @@ from collections import deque from typing import Any, TypeVar -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from .._receiver import Receiver, ReceiverStoppedError _T = TypeVar("_T") diff --git a/src/frequenz/channels/util/_select.py b/src/frequenz/channels/util/_select.py index ac6e59ad..1eb9d3e8 100644 --- a/src/frequenz/channels/util/_select.py +++ b/src/frequenz/channels/util/_select.py @@ -12,8 +12,7 @@ from collections.abc import AsyncIterator from typing import Any, Generic, TypeGuard, TypeVar -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from .._receiver import Receiver, ReceiverStoppedError _T = TypeVar("_T") diff --git a/src/frequenz/channels/util/_timer.py b/src/frequenz/channels/util/_timer.py index 9b0fd712..81b56efc 100644 --- a/src/frequenz/channels/util/_timer.py +++ b/src/frequenz/channels/util/_timer.py @@ -17,8 +17,7 @@ import asyncio from datetime import timedelta -from .._base_classes import Receiver -from .._exceptions import ReceiverStoppedError +from .._receiver import Receiver, ReceiverStoppedError def _to_microseconds(time: float | timedelta) -> int: From 805b70f638405cccb01d62498a2970227e112ad2 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 7 Nov 2023 14:58:57 +0100 Subject: [PATCH 06/13] Make channels return the base sender and receiver types We don't really want end users to have access to the concrete types, and their custom methods. We want them to use the base types, so we can change the implementation without breaking their code and ensuring a common interface for all senders and receivers. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_anycast.py | 4 ++-- src/frequenz/channels/_broadcast.py | 6 ++++-- tests/test_broadcast.py | 6 ++++++ 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index e520e072..1415cae3 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -160,7 +160,7 @@ async def close(self) -> None: async with self._recv_cv: self._recv_cv.notify_all() - def new_sender(self) -> Sender[_T]: + def new_sender(self) -> BaseSender[_T]: """Create a new sender. Returns: @@ -168,7 +168,7 @@ def new_sender(self) -> Sender[_T]: """ return Sender(self) - def new_receiver(self) -> Receiver[_T]: + def new_receiver(self) -> BaseReceiver[_T]: """Create a new receiver. Returns: diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 9e7d373c..992c5aa7 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -145,7 +145,7 @@ async def close(self) -> None: async with self._recv_cv: self._recv_cv.notify_all() - def new_sender(self) -> Sender[_T]: + def new_sender(self) -> BaseSender[_T]: """Create a new broadcast sender. Returns: @@ -153,7 +153,9 @@ def new_sender(self) -> Sender[_T]: """ return Sender(self) - def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[_T]: + def new_receiver( + self, *, name: str | None = None, limit: int = 50 + ) -> BaseReceiver[_T]: """Create a new broadcast receiver. Broadcast receivers have their own buffer, and when messages are not diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index ec578e35..5319d4bf 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -104,6 +104,10 @@ async def test_broadcast_after_close() -> None: async def test_broadcast_overflow() -> None: """Ensure messages sent to full broadcast receivers get dropped.""" + from frequenz.channels._broadcast import ( # pylint: disable=import-outside-toplevel + _Receiver, + ) + bcast: Broadcast[int] = Broadcast(name="meter_5") big_recv_size = 10 @@ -111,7 +115,9 @@ async def test_broadcast_overflow() -> None: sender = bcast.new_sender() big_receiver = bcast.new_receiver(name="named-recv", limit=big_recv_size) + assert isinstance(big_receiver, _Receiver) small_receiver = bcast.new_receiver(limit=small_recv_size) + assert isinstance(small_receiver, _Receiver) async def drain_receivers() -> tuple[int, int]: big_sum = 0 From 1708cb26b0f2cd5f390cf1cdeb65e1cc33ce2846 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 7 Nov 2023 15:01:34 +0100 Subject: [PATCH 07/13] Make channel's sender and receiver implementation private We don't want to expose the implementation of the sender and receiver at all, so we make them private to the module. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_anycast.py | 8 ++++---- src/frequenz/channels/_broadcast.py | 10 +++++----- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 1415cae3..8381b56a 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -166,7 +166,7 @@ def new_sender(self) -> BaseSender[_T]: Returns: A Sender instance attached to the Anycast channel. """ - return Sender(self) + return _Sender(self) def new_receiver(self) -> BaseReceiver[_T]: """Create a new receiver. @@ -174,7 +174,7 @@ def new_receiver(self) -> BaseReceiver[_T]: Returns: A Receiver instance attached to the Anycast channel. """ - return Receiver(self) + return _Receiver(self) def __str__(self) -> str: """Return a string representation of this channel.""" @@ -188,7 +188,7 @@ def __repr__(self) -> str: ) -class Sender(BaseSender[_T]): +class _Sender(BaseSender[_T]): """A sender to send messages to an Anycast channel. Should not be created directly, but through the `Anycast.new_sender()` @@ -256,7 +256,7 @@ class _Empty: """A sentinel value to indicate that a value has not been set.""" -class Receiver(BaseReceiver[_T]): +class _Receiver(BaseReceiver[_T]): """A receiver to receive messages from an Anycast channel. Should not be created directly, but through the `Anycast.new_receiver()` diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 992c5aa7..b1d5dda3 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -90,7 +90,7 @@ def __init__(self, *, name: str, resend_latest: bool = False) -> None: self._recv_cv: Condition = Condition() """The condition to wait for data in the channel's buffer.""" - self._receivers: dict[int, weakref.ReferenceType[Receiver[_T]]] = {} + self._receivers: dict[int, weakref.ReferenceType[_Receiver[_T]]] = {} """The receivers attached to the channel, indexed by their hash().""" self._closed: bool = False @@ -151,7 +151,7 @@ def new_sender(self) -> BaseSender[_T]: Returns: A Sender instance attached to the broadcast channel. """ - return Sender(self) + return _Sender(self) def new_receiver( self, *, name: str | None = None, limit: int = 50 @@ -169,7 +169,7 @@ def new_receiver( Returns: A Receiver instance attached to the broadcast channel. """ - recv: Receiver[_T] = Receiver(name, limit, self) + recv: _Receiver[_T] = _Receiver(name, limit, self) self._receivers[hash(recv)] = weakref.ref(recv) if self.resend_latest and self._latest is not None: recv.enqueue(self._latest) @@ -190,7 +190,7 @@ def __repr__(self) -> str: ) -class Sender(BaseSender[_T]): +class _Sender(BaseSender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the @@ -246,7 +246,7 @@ def __repr__(self) -> str: return f"{type(self).__name__}({self._chan!r})" -class Receiver(BaseReceiver[_T]): +class _Receiver(BaseReceiver[_T]): """A receiver to receive messages from the broadcast channel. Should not be created directly, but through the From b17803d52ca05f2b4e144e117a7f5a662bcd2953 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 7 Nov 2023 15:04:37 +0100 Subject: [PATCH 08/13] Don't use aliases for the base sender and receiver classes Now that the sender and receiver implementations are private, there is no need to use aliases for the base classes. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_anycast.py | 14 ++++++-------- src/frequenz/channels/_broadcast.py | 16 ++++++---------- 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 8381b56a..549a283e 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -11,10 +11,8 @@ from typing import Generic, TypeVar from ._exceptions import ChannelClosedError -from ._receiver import Receiver as BaseReceiver -from ._receiver import ReceiverStoppedError -from ._sender import Sender as BaseSender -from ._sender import SenderError +from ._receiver import Receiver, ReceiverStoppedError +from ._sender import Sender, SenderError _logger = logging.getLogger(__name__) @@ -160,7 +158,7 @@ async def close(self) -> None: async with self._recv_cv: self._recv_cv.notify_all() - def new_sender(self) -> BaseSender[_T]: + def new_sender(self) -> Sender[_T]: """Create a new sender. Returns: @@ -168,7 +166,7 @@ def new_sender(self) -> BaseSender[_T]: """ return _Sender(self) - def new_receiver(self) -> BaseReceiver[_T]: + def new_receiver(self) -> Receiver[_T]: """Create a new receiver. Returns: @@ -188,7 +186,7 @@ def __repr__(self) -> str: ) -class _Sender(BaseSender[_T]): +class _Sender(Sender[_T]): """A sender to send messages to an Anycast channel. Should not be created directly, but through the `Anycast.new_sender()` @@ -256,7 +254,7 @@ class _Empty: """A sentinel value to indicate that a value has not been set.""" -class _Receiver(BaseReceiver[_T]): +class _Receiver(Receiver[_T]): """A receiver to receive messages from an Anycast channel. Should not be created directly, but through the `Anycast.new_receiver()` diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index b1d5dda3..5be50b2a 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -12,10 +12,8 @@ from typing import Generic, TypeVar from ._exceptions import ChannelClosedError -from ._receiver import Receiver as BaseReceiver -from ._receiver import ReceiverStoppedError -from ._sender import Sender as BaseSender -from ._sender import SenderError +from ._receiver import Receiver, ReceiverStoppedError +from ._sender import Sender, SenderError _logger = logging.Logger(__name__) @@ -145,7 +143,7 @@ async def close(self) -> None: async with self._recv_cv: self._recv_cv.notify_all() - def new_sender(self) -> BaseSender[_T]: + def new_sender(self) -> Sender[_T]: """Create a new broadcast sender. Returns: @@ -153,9 +151,7 @@ def new_sender(self) -> BaseSender[_T]: """ return _Sender(self) - def new_receiver( - self, *, name: str | None = None, limit: int = 50 - ) -> BaseReceiver[_T]: + def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[_T]: """Create a new broadcast receiver. Broadcast receivers have their own buffer, and when messages are not @@ -190,7 +186,7 @@ def __repr__(self) -> str: ) -class _Sender(BaseSender[_T]): +class _Sender(Sender[_T]): """A sender to send messages to the broadcast channel. Should not be created directly, but through the @@ -246,7 +242,7 @@ def __repr__(self) -> str: return f"{type(self).__name__}({self._chan!r})" -class _Receiver(BaseReceiver[_T]): +class _Receiver(Receiver[_T]): """A receiver to receive messages from the broadcast channel. Should not be created directly, but through the From c3951f74780e1a80852ee739780b8c377739adbe Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 7 Nov 2023 15:27:13 +0100 Subject: [PATCH 09/13] Move merge and select symbols to the top level These are core component to work with channels, so they are moved to the `frequenz.channels` top level module. Signed-off-by: Leandro Lucarella --- .github/labeler.yml | 15 +++--- src/frequenz/channels/__init__.py | 54 +++++++++++++++---- src/frequenz/channels/_anycast.py | 6 +-- src/frequenz/channels/_broadcast.py | 6 +-- src/frequenz/channels/{util => }/_merge.py | 2 +- .../channels/{util => }/_merge_named.py | 2 +- src/frequenz/channels/{util => }/_select.py | 36 ++++++------- src/frequenz/channels/util/__init__.py | 30 ----------- src/frequenz/channels/util/_timer.py | 2 +- tests/test_merge.py | 3 +- tests/test_mergenamed.py | 3 +- tests/{utils => }/test_select.py | 3 +- tests/{utils => }/test_select_integration.py | 7 +-- tests/utils/test_integration.py | 3 +- 14 files changed, 87 insertions(+), 85 deletions(-) rename src/frequenz/channels/{util => }/_merge.py (98%) rename src/frequenz/channels/{util => }/_merge_named.py (98%) rename src/frequenz/channels/{util => }/_select.py (91%) rename tests/{utils => }/test_select.py (93%) rename tests/{utils => }/test_select_integration.py (99%) diff --git a/.github/labeler.yml b/.github/labeler.yml index edfdd7a2..a29927fb 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -6,7 +6,7 @@ # For more details on the configuration please see: # https://github.com/marketplace/actions/labeler -"part:docs": +"part:docs": - "**/*.md" - "docs/**" - "examples/**" @@ -31,14 +31,13 @@ - noxfile.py "part:channels": - - any: - - "src/frequenz/channels/**" - - "!src/frequenz/channels/util/**" + - "src/frequenz/channels/_anycast.py" + - "src/frequenz/channels/_broadcast.py" "part:receivers": - - any: - - "src/frequenz/channels/util/**" - - "!src/frequenz/channels/util/_select.py" + - "src/frequenz/channels/_merge.py" + - "src/frequenz/channels/_merge_named.py" + - "src/frequenz/channels/util/**" "part:select": - - "src/frequenz/channels/util/_select.py" + - "src/frequenz/channels/_select.py" diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index 5208a9ec..8559d3b4 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -6,6 +6,14 @@ This package contains [channel](https://en.wikipedia.org/wiki/Channel_(programming)) implementations. +Base classes: + +* [Receiver][frequenz.channels.Receiver]: An object that can wait for and + consume messages from a channel. + +* [Sender][frequenz.channels.Sender]: An object that can send messages to + a channel. + Channels: * [Anycast][frequenz.channels.Anycast]: A channel that supports multiple @@ -16,19 +24,17 @@ from multiple senders to multiple receivers. Each message sent through any of the senders is received by all of the receivers. -Other base classes: +Utilities to work with channels: -* [Receiver][frequenz.channels.Receiver]: An object that can wait for and - consume messages from a channel. - -* [Sender][frequenz.channels.Sender]: An object that can send messages to - a channel. +* [Merge][frequenz.channels.Merge] and [MergeNamed][frequenz.channels.MergeNamed]: + [Receivers][frequenz.channels.Receiver] that merge messages coming from multiple + receivers into a single stream. -Utilities: +* [select][frequenz.channels.select]: Iterate over the values of all + [receivers][frequenz.channels.Receiver] as new values become available. -* [util][frequenz.channels.util]: A module with utilities, like special - receivers that implement timers, file watchers, merge receivers, or wait for - messages in multiple channels. +* [util][frequenz.channels.util]: A module with extra utilities, like special + receivers that implement timers, file watchers, etc. Exception classes: @@ -49,13 +55,33 @@ * [ReceiverStoppedError][frequenz.channels.ReceiverStoppedError]: A receiver stopped producing messages. + +* [SelectError][frequenz.channels.SelectError]: Base class for all errors + related to [select][frequenz.channels.select]. + +* [SelectErrorGroup][frequenz.channels.SelectErrorGroup]: A group of errors + raised by [select][frequenz.channels.select]. + +* [UnhandledSelectedError][frequenz.channels.UnhandledSelectedError]: An error + raised by [select][frequenz.channels.select] that was not handled by the + user. """ from . import util from ._anycast import Anycast from ._broadcast import Broadcast from ._exceptions import ChannelClosedError, ChannelError, Error +from ._merge import Merge +from ._merge_named import MergeNamed from ._receiver import Receiver, ReceiverError, ReceiverStoppedError +from ._select import ( + Selected, + SelectError, + SelectErrorGroup, + UnhandledSelectedError, + select, + selected_from, +) from ._sender import Sender, SenderError __all__ = [ @@ -64,10 +90,18 @@ "ChannelClosedError", "ChannelError", "Error", + "Merge", + "MergeNamed", "Receiver", "ReceiverError", "ReceiverStoppedError", + "SelectError", + "SelectErrorGroup", + "Selected", "Sender", "SenderError", + "UnhandledSelectedError", + "select", + "selected_from", "util", ] diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 549a283e..952aa12f 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -38,9 +38,9 @@ class Anycast(Generic[_T]): thread-safe. When there are multiple channel receivers, they can be awaited - simultaneously using [select][frequenz.channels.util.select], - [Merge][frequenz.channels.util.Merge] or - [MergeNamed][frequenz.channels.util.MergeNamed]. + simultaneously using [select][frequenz.channels.select], + [Merge][frequenz.channels.Merge] or + [MergeNamed][frequenz.channels.MergeNamed]. Example: ``` python diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 5be50b2a..327d3a0c 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -32,9 +32,9 @@ class Broadcast(Generic[_T]): are thread-safe. Because of this, `Broadcast` channels are thread-safe. When there are multiple channel receivers, they can be awaited - simultaneously using [select][frequenz.channels.util.select], - [Merge][frequenz.channels.util.Merge] or - [MergeNamed][frequenz.channels.util.MergeNamed]. + simultaneously using [select][frequenz.channels.select], + [Merge][frequenz.channels.Merge] or + [MergeNamed][frequenz.channels.MergeNamed]. Example: ``` python diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/_merge.py similarity index 98% rename from src/frequenz/channels/util/_merge.py rename to src/frequenz/channels/_merge.py index 7ff54f43..00461ed9 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/_merge.py @@ -8,7 +8,7 @@ from collections import deque from typing import Any, TypeVar -from .._receiver import Receiver, ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError _T = TypeVar("_T") diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/_merge_named.py similarity index 98% rename from src/frequenz/channels/util/_merge_named.py rename to src/frequenz/channels/_merge_named.py index 425cd3cc..fb08c867 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/_merge_named.py @@ -7,7 +7,7 @@ from collections import deque from typing import Any, TypeVar -from .._receiver import Receiver, ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError _T = TypeVar("_T") diff --git a/src/frequenz/channels/util/_select.py b/src/frequenz/channels/_select.py similarity index 91% rename from src/frequenz/channels/util/_select.py rename to src/frequenz/channels/_select.py index 1eb9d3e8..983b28e1 100644 --- a/src/frequenz/channels/util/_select.py +++ b/src/frequenz/channels/_select.py @@ -12,23 +12,23 @@ from collections.abc import AsyncIterator from typing import Any, Generic, TypeGuard, TypeVar -from .._receiver import Receiver, ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError _T = TypeVar("_T") class Selected(Generic[_T]): - """A result of a [`select()`][frequenz.channels.util.select] iteration. + """A result of a [`select()`][frequenz.channels.select] iteration. The selected receiver is consumed immediately and the received value is stored in the instance, unless there was an exception while receiving the value, in which case the exception is stored instead. `Selected` instances should be used in conjunction with the - [`selected_from()`][frequenz.channels.util.selected_from] function to determine + [`selected_from()`][frequenz.channels.selected_from] function to determine which receiver was selected. - Please see [`select()`][frequenz.channels.util.select] for an example. + Please see [`select()`][frequenz.channels.select] for an example. """ class _EmptyResult: @@ -45,9 +45,9 @@ def __init__(self, receiver: Receiver[_T]) -> None: The receiver is consumed immediately when creating the instance and the received value is stored in the instance for later use as - [`value`][frequenz.channels.util.Selected.value]. If there was an exception + [`value`][frequenz.channels.Selected.value]. If there was an exception while receiving the value, then the exception is stored in the instance instead - (as [`exception`][frequenz.channels.util.Selected.exception]). + (as [`exception`][frequenz.channels.Selected.exception]). Args: receiver: The receiver that was selected. @@ -139,16 +139,16 @@ def __repr__(self) -> str: def selected_from( selected: Selected[Any], receiver: Receiver[_T] ) -> TypeGuard[Selected[_T]]: - """Check if the given receiver was selected by [`select()`][frequenz.channels.util.select]. + """Check if the given receiver was selected by [`select()`][frequenz.channels.select]. This function is used in conjunction with the - [`Selected`][frequenz.channels.util.Selected] class to determine which receiver was + [`Selected`][frequenz.channels.Selected] class to determine which receiver was selected in `select()` iteration. It also works as a [type guard][typing.TypeGuard] to narrow the type of the `Selected` instance to the type of the receiver. - Please see [`select()`][frequenz.channels.util.select] for an example. + Please see [`select()`][frequenz.channels.select] for an example. Args: selected: The result of a `select()` iteration. @@ -163,21 +163,21 @@ def selected_from( class SelectError(BaseException): - """A base exception for [`select()`][frequenz.channels.util.select]. + """A base exception for [`select()`][frequenz.channels.select]. This exception is raised when a `select()` iteration fails. It is raised as a single exception when one receiver fails during normal operation (while calling `ready()` for example). It is raised as a group exception - ([`SelectErrorGroup`][frequenz.channels.util.SelectErrorGroup]) when a `select` loop + ([`SelectErrorGroup`][frequenz.channels.SelectErrorGroup]) when a `select` loop is cleaning up after it's done. """ class UnhandledSelectedError(SelectError, Generic[_T]): - """A receiver was not handled in a [`select()`][frequenz.channels.util.select] loop. + """A receiver was not handled in a [`select()`][frequenz.channels.select] loop. This exception is raised when a `select()` iteration finishes without a call to - [`selected_from()`][frequenz.channels.util.selected_from] for the selected receiver. + [`selected_from()`][frequenz.channels.selected_from] for the selected receiver. """ def __init__(self, selected: Selected[_T]) -> None: @@ -193,7 +193,7 @@ def __init__(self, selected: Selected[_T]) -> None: class SelectErrorGroup(BaseExceptionGroup[BaseException], SelectError): - """An exception group for [`select()`][frequenz.channels.util.select] operation. + """An exception group for [`select()`][frequenz.channels.select] operation. This exception group is raised when a `select()` loops fails while cleaning up running tests to check for ready receivers. @@ -242,8 +242,8 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: This function is used to iterate over the values of all receivers as they receive new values. It is used in conjunction with the - [`Selected`][frequenz.channels.util.Selected] class and the - [`selected_from()`][frequenz.channels.util.selected_from] function to determine + [`Selected`][frequenz.channels.Selected] class and the + [`selected_from()`][frequenz.channels.selected_from] function to determine which function to determine which receiver was selected in a select operation. An exhaustiveness check is performed at runtime to make sure all selected receivers @@ -257,8 +257,8 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: receivers from a select loop, there are a few alternatives. Depending on your use case, one or the other could work better for you: - * Use [`Merge`][frequenz.channels.util.Merge] or - [`MergeNamed`][frequenz.channels.util.MergeNamed]: this is useful when you + * Use [`Merge`][frequenz.channels.Merge] or + [`MergeNamed`][frequenz.channels.MergeNamed]: this is useful when you have and unknown number of receivers of the same type that can be handled as a group. * Use tasks to manage each receiver individually: this is better if there are no diff --git a/src/frequenz/channels/util/__init__.py b/src/frequenz/channels/util/__init__.py index 515e1ac2..a7cae62c 100644 --- a/src/frequenz/channels/util/__init__.py +++ b/src/frequenz/channels/util/__init__.py @@ -11,34 +11,12 @@ * [FileWatcher][frequenz.channels.util.FileWatcher]: A [receiver][frequenz.channels.Receiver] that watches for file events. -* [Merge][frequenz.channels.util.Merge]: - A [receiver][frequenz.channels.Receiver] that merge messages coming from - multiple receivers into a single stream. - -* [MergeNamed][frequenz.channels.util.MergeNamed]: - A [receiver][frequenz.channels.Receiver] that merge messages coming from - multiple receivers into a single named stream, allowing to identify the - origin of each message. - * [Timer][frequenz.channels.util.Timer]: A [receiver][frequenz.channels.Receiver] that ticks at certain intervals. - -* [select][frequenz.channels.util.select]: Iterate over the values of all - [receivers][frequenz.channels.Receiver] as new values become available. """ from ._event import Event from ._file_watcher import FileWatcher -from ._merge import Merge -from ._merge_named import MergeNamed -from ._select import ( - Selected, - SelectError, - SelectErrorGroup, - UnhandledSelectedError, - select, - selected_from, -) from ._timer import ( MissedTickPolicy, SkipMissedAndDrift, @@ -50,17 +28,9 @@ __all__ = [ "Event", "FileWatcher", - "Merge", - "MergeNamed", "MissedTickPolicy", - "SelectError", - "SelectErrorGroup", - "Selected", "SkipMissedAndDrift", "SkipMissedAndResync", "Timer", "TriggerAllMissed", - "UnhandledSelectedError", - "select", - "selected_from", ] diff --git a/src/frequenz/channels/util/_timer.py b/src/frequenz/channels/util/_timer.py index 81b56efc..f2896198 100644 --- a/src/frequenz/channels/util/_timer.py +++ b/src/frequenz/channels/util/_timer.py @@ -319,7 +319,7 @@ class Timer(Receiver[timedelta]): print(f"The timer has triggered {drift=}") ``` - But you can also use a [`select`][frequenz.channels.util.select] to combine + But you can also use a [`select`][frequenz.channels.select] to combine it with other receivers, and even start it (semi) manually: ```python diff --git a/tests/test_merge.py b/tests/test_merge.py index c0d1c420..5f6b14a5 100644 --- a/tests/test_merge.py +++ b/tests/test_merge.py @@ -5,8 +5,7 @@ import asyncio -from frequenz.channels import Anycast, Sender -from frequenz.channels.util import Merge +from frequenz.channels import Anycast, Merge, Sender async def test_merge() -> None: diff --git a/tests/test_mergenamed.py b/tests/test_mergenamed.py index 565264ef..03d272ea 100644 --- a/tests/test_mergenamed.py +++ b/tests/test_mergenamed.py @@ -5,8 +5,7 @@ import asyncio -from frequenz.channels import Anycast, Sender -from frequenz.channels.util import MergeNamed +from frequenz.channels import Anycast, MergeNamed, Sender async def test_mergenamed() -> None: diff --git a/tests/utils/test_select.py b/tests/test_select.py similarity index 93% rename from tests/utils/test_select.py rename to tests/test_select.py index a9a46921..9eb001c5 100644 --- a/tests/utils/test_select.py +++ b/tests/test_select.py @@ -7,8 +7,7 @@ import pytest -from frequenz.channels import Receiver, ReceiverStoppedError -from frequenz.channels.util import Selected, selected_from +from frequenz.channels import Receiver, ReceiverStoppedError, Selected, selected_from class TestSelected: diff --git a/tests/utils/test_select_integration.py b/tests/test_select_integration.py similarity index 99% rename from tests/utils/test_select_integration.py rename to tests/test_select_integration.py index a4984f62..19d69b95 100644 --- a/tests/utils/test_select_integration.py +++ b/tests/test_select_integration.py @@ -15,14 +15,15 @@ class at a time. import async_solipsism import pytest -from frequenz.channels import Receiver, ReceiverStoppedError -from frequenz.channels.util import ( - Event, +from frequenz.channels import ( + Receiver, + ReceiverStoppedError, Selected, UnhandledSelectedError, select, selected_from, ) +from frequenz.channels.util import Event @pytest.mark.integration diff --git a/tests/utils/test_integration.py b/tests/utils/test_integration.py index e61cb620..180494f8 100644 --- a/tests/utils/test_integration.py +++ b/tests/utils/test_integration.py @@ -9,7 +9,8 @@ import pytest -from frequenz.channels.util import FileWatcher, Timer, select, selected_from +from frequenz.channels import select, selected_from +from frequenz.channels.util import FileWatcher, Timer @pytest.mark.integration From 25b38bab51f04b69a646401835a37097c3077443 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 7 Nov 2023 15:57:42 +0100 Subject: [PATCH 10/13] Move utility receivers to their own public modules We move the `Event`, `FileWatcher` and `Timer` receivers to their own separate public modules. Since they are not core components and users might only need to use some of them, it is more clear to have them separated. Also in the case of the `FileWatcher`, it requires an extra dependency, which we could potentially have as an optional dependency, so users that don't need it don't need to install it. In the future we might distribute these also as separate python packages (wheels). Signed-off-by: Leandro Lucarella --- .github/labeler.yml | 4 ++- src/frequenz/channels/__init__.py | 16 ++++++--- src/frequenz/channels/_select.py | 4 +-- .../channels/{util/_event.py => event.py} | 10 +++--- .../_file_watcher.py => file_watcher.py} | 2 +- .../channels/{util/_timer.py => timer.py} | 36 +++++++++---------- src/frequenz/channels/util/__init__.py | 36 ------------------- tests/{utils => }/test_event.py | 2 +- tests/{utils => }/test_file_watcher.py | 6 ++-- ...on.py => test_file_watcher_integration.py} | 3 +- tests/test_select_integration.py | 2 +- tests/{utils => }/test_timer.py | 2 +- 12 files changed, 47 insertions(+), 76 deletions(-) rename src/frequenz/channels/{util/_event.py => event.py} (92%) rename src/frequenz/channels/{util/_file_watcher.py => file_watcher.py} (98%) rename src/frequenz/channels/{util/_timer.py => timer.py} (95%) delete mode 100644 src/frequenz/channels/util/__init__.py rename tests/{utils => }/test_event.py (96%) rename tests/{utils => }/test_file_watcher.py (94%) rename tests/{utils/test_integration.py => test_file_watcher_integration.py} (97%) rename tests/{utils => }/test_timer.py (99%) diff --git a/.github/labeler.yml b/.github/labeler.yml index a29927fb..4507b9a8 100644 --- a/.github/labeler.yml +++ b/.github/labeler.yml @@ -37,7 +37,9 @@ "part:receivers": - "src/frequenz/channels/_merge.py" - "src/frequenz/channels/_merge_named.py" - - "src/frequenz/channels/util/**" + - "src/frequenz/channels/event.py" + - "src/frequenz/channels/file_watcher.py" + - "src/frequenz/channels/timer.py" "part:select": - "src/frequenz/channels/_select.py" diff --git a/src/frequenz/channels/__init__.py b/src/frequenz/channels/__init__.py index 8559d3b4..dbf60751 100644 --- a/src/frequenz/channels/__init__.py +++ b/src/frequenz/channels/__init__.py @@ -33,9 +33,6 @@ * [select][frequenz.channels.select]: Iterate over the values of all [receivers][frequenz.channels.Receiver] as new values become available. -* [util][frequenz.channels.util]: A module with extra utilities, like special - receivers that implement timers, file watchers, etc. - Exception classes: * [Error][frequenz.channels.Error]: Base class for all errors in this @@ -65,9 +62,19 @@ * [UnhandledSelectedError][frequenz.channels.UnhandledSelectedError]: An error raised by [select][frequenz.channels.select] that was not handled by the user. + +Extra utility receivers: + +* [Event][frequenz.channels.event.Event]: A receiver that generates a message when + an event is set. + +* [FileWatcher][frequenz.channels.file_watcher.FileWatcher]: A receiver that + generates a message when a file is added, modified or deleted. + +* [Timer][frequenz.channels.timer.Timer]: A receiver that generates a message after a + given amount of time. """ -from . import util from ._anycast import Anycast from ._broadcast import Broadcast from ._exceptions import ChannelClosedError, ChannelError, Error @@ -103,5 +110,4 @@ "UnhandledSelectedError", "select", "selected_from", - "util", ] diff --git a/src/frequenz/channels/_select.py b/src/frequenz/channels/_select.py index 983b28e1..9b182f1b 100644 --- a/src/frequenz/channels/_select.py +++ b/src/frequenz/channels/_select.py @@ -272,8 +272,8 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]: import datetime from typing import assert_never - from frequenz.channels import ReceiverStoppedError - from frequenz.channels.util import select, selected_from, Timer + from frequenz.channels import ReceiverStoppedError, select, selected_from + from frequenz.channels.timer import Timer timer1 = Timer.periodic(datetime.timedelta(seconds=1)) timer2 = Timer.timeout(datetime.timedelta(seconds=0.5)) diff --git a/src/frequenz/channels/util/_event.py b/src/frequenz/channels/event.py similarity index 92% rename from src/frequenz/channels/util/_event.py rename to src/frequenz/channels/event.py index 536a80a5..f546555e 100644 --- a/src/frequenz/channels/util/_event.py +++ b/src/frequenz/channels/event.py @@ -12,19 +12,19 @@ class Event(_receiver.Receiver[None]): """A receiver that can be made ready through an event. - The receiver (the [`ready()`][frequenz.channels.util.Event.ready] method) will wait - until [`set()`][frequenz.channels.util.Event.set] is called. At that point the + The receiver (the [`ready()`][frequenz.channels.event.Event.ready] method) will wait + until [`set()`][frequenz.channels.event.Event.set] is called. At that point the receiver will wait again after the event is [`consume()`][frequenz.channels.Receiver.consume]d. The receiver can be completely stopped by calling - [`stop()`][frequenz.channels.util.Event.stop]. + [`stop()`][frequenz.channels.event.Event.stop]. Example: ```python import asyncio - from frequenz.channels import Receiver - from frequenz.channels.util import Event, select, selected_from + from frequenz.channels import Receiver, select, selected_from + from frequenz.channels.event import Event other_receiver: Receiver[int] = ... exit_event = Event() diff --git a/src/frequenz/channels/util/_file_watcher.py b/src/frequenz/channels/file_watcher.py similarity index 98% rename from src/frequenz/channels/util/_file_watcher.py rename to src/frequenz/channels/file_watcher.py index 4713ba2e..5ddb6e95 100644 --- a/src/frequenz/channels/util/_file_watcher.py +++ b/src/frequenz/channels/file_watcher.py @@ -14,7 +14,7 @@ from watchfiles import Change, awatch from watchfiles.main import FileChange -from .._receiver import Receiver, ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError class FileWatcher(Receiver["FileWatcher.Event"]): diff --git a/src/frequenz/channels/util/_timer.py b/src/frequenz/channels/timer.py similarity index 95% rename from src/frequenz/channels/util/_timer.py rename to src/frequenz/channels/timer.py index f2896198..18db2beb 100644 --- a/src/frequenz/channels/util/_timer.py +++ b/src/frequenz/channels/timer.py @@ -17,7 +17,7 @@ import asyncio from datetime import timedelta -from .._receiver import Receiver, ReceiverStoppedError +from ._receiver import Receiver, ReceiverStoppedError def _to_microseconds(time: float | timedelta) -> int: @@ -270,7 +270,7 @@ class Timer(Receiver[timedelta]): """A timer receiver that triggers every `interval` time. The timer has microseconds resolution, so the - [`interval`][frequenz.channels.util.Timer.interval] must be at least + [`interval`][frequenz.channels.timer.Timer.interval] must be at least 1 microsecond. The message it produces is a [`timedelta`][datetime.timedelta] containing the drift @@ -283,34 +283,34 @@ class Timer(Receiver[timedelta]): as the timer uses [`asyncio`][asyncio]s loop monotonic clock. If the timer is delayed too much, then it will behave according to the - [`missed_tick_policy`][frequenz.channels.util.Timer.missed_tick_policy]. Missing + [`missed_tick_policy`][frequenz.channels.timer.Timer.missed_tick_policy]. Missing ticks might or might not trigger a message and the drift could be accumulated or not depending on the chosen policy. These are the currently built-in available policies: - * [`SkipMissedAndDrift`][frequenz.channels.util.SkipMissedAndDrift] - * [`SkipMissedAndResync`][frequenz.channels.util.SkipMissedAndResync] - * [`TriggerAllMissed`][frequenz.channels.util.TriggerAllMissed] + * [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] + * [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] + * [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] For the most common cases, a specialized constructor is provided: - * [`periodic()`][frequenz.channels.util.Timer.periodic] (uses the - [`TriggerAllMissed`][frequenz.channels.util.TriggerAllMissed] or - [`SkipMissedAndResync`][frequenz.channels.util.SkipMissedAndResync] policy) - * [`timeout()`][frequenz.channels.util.Timer.timeout] (uses the - [`SkipMissedAndDrift`][frequenz.channels.util.SkipMissedAndDrift] policy) + * [`periodic()`][frequenz.channels.timer.Timer.periodic] (uses the + [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] or + [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] policy) + * [`timeout()`][frequenz.channels.timer.Timer.timeout] (uses the + [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy) - The timer accepts an optional [`loop`][frequenz.channels.util.Timer.loop], which + The timer accepts an optional [`loop`][frequenz.channels.timer.Timer.loop], which will be used to track the time. If `loop` is `None`, then the running loop will be used (if there is no running loop most calls will raise a [`RuntimeError`][RuntimeError]). Starting the timer can be delayed if necessary by using `auto_start=False` (for example until we have a running loop). A call to - [`reset()`][frequenz.channels.util.Timer.reset], - [`ready()`][frequenz.channels.util.Timer.ready], - [`receive()`][frequenz.channels.util.Timer.receive] or the async iterator interface + [`reset()`][frequenz.channels.timer.Timer.reset], + [`ready()`][frequenz.channels.timer.Timer.ready], + [`receive()`][frequenz.channels.timer.Timer.receive] or the async iterator interface to await for a new message will start the timer. Example: Periodic timer example @@ -323,8 +323,7 @@ class Timer(Receiver[timedelta]): it with other receivers, and even start it (semi) manually: ```python - from frequenz.channels.util import select, selected_from - from frequenz.channels import Broadcast + from frequenz.channels import Broadcast, select, selected_from timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False) chan = Broadcast[int](name="input-chan") @@ -346,8 +345,7 @@ class Timer(Receiver[timedelta]): Example: Timeout example ```python - from frequenz.channels.util import select, selected_from - from frequenz.channels import Broadcast + from frequenz.channels import Broadcast, select, selected_from def process_data(data: int): print(f"Processing data: {data}") diff --git a/src/frequenz/channels/util/__init__.py b/src/frequenz/channels/util/__init__.py deleted file mode 100644 index a7cae62c..00000000 --- a/src/frequenz/channels/util/__init__.py +++ /dev/null @@ -1,36 +0,0 @@ -# License: MIT -# Copyright © 2022 Frequenz Energy-as-a-Service GmbH - -"""Channel utilities. - -A module with several utilities to work with channels: - -* [Event][frequenz.channels.util.Event]: - A [receiver][frequenz.channels.Receiver] that can be made ready through an event. - -* [FileWatcher][frequenz.channels.util.FileWatcher]: - A [receiver][frequenz.channels.Receiver] that watches for file events. - -* [Timer][frequenz.channels.util.Timer]: - A [receiver][frequenz.channels.Receiver] that ticks at certain intervals. -""" - -from ._event import Event -from ._file_watcher import FileWatcher -from ._timer import ( - MissedTickPolicy, - SkipMissedAndDrift, - SkipMissedAndResync, - Timer, - TriggerAllMissed, -) - -__all__ = [ - "Event", - "FileWatcher", - "MissedTickPolicy", - "SkipMissedAndDrift", - "SkipMissedAndResync", - "Timer", - "TriggerAllMissed", -] diff --git a/tests/utils/test_event.py b/tests/test_event.py similarity index 96% rename from tests/utils/test_event.py rename to tests/test_event.py index 0cda9d23..950720d0 100644 --- a/tests/utils/test_event.py +++ b/tests/test_event.py @@ -8,7 +8,7 @@ import pytest as _pytest from frequenz.channels import ReceiverStoppedError -from frequenz.channels.util import Event +from frequenz.channels.event import Event async def test_event() -> None: diff --git a/tests/utils/test_file_watcher.py b/tests/test_file_watcher.py similarity index 94% rename from tests/utils/test_file_watcher.py rename to tests/test_file_watcher.py index bed75bcb..2071b9da 100644 --- a/tests/utils/test_file_watcher.py +++ b/tests/test_file_watcher.py @@ -15,7 +15,7 @@ from watchfiles import Change from watchfiles.main import FileChange -from frequenz.channels.util import FileWatcher +from frequenz.channels.file_watcher import FileWatcher class _FakeAwatch: @@ -52,7 +52,7 @@ def fake_awatch() -> Iterator[_FakeAwatch]: """Fixture to mock the awatch function.""" fake = _FakeAwatch() with mock.patch( - "frequenz.channels.util._file_watcher.awatch", + "frequenz.channels.file_watcher.awatch", autospec=True, side_effect=fake.fake_awatch, ): @@ -89,7 +89,7 @@ async def test_file_watcher_filter_events( # We need to reset the mock explicitly because hypothesis runs all the produced # inputs in the same context. with mock.patch( - "frequenz.channels.util._file_watcher.awatch", autospec=True + "frequenz.channels.file_watcher.awatch", autospec=True ) as awatch_mock: file_watcher = FileWatcher(paths=[good_path], event_types=event_types) diff --git a/tests/utils/test_integration.py b/tests/test_file_watcher_integration.py similarity index 97% rename from tests/utils/test_integration.py rename to tests/test_file_watcher_integration.py index 180494f8..5c9ab935 100644 --- a/tests/utils/test_integration.py +++ b/tests/test_file_watcher_integration.py @@ -10,7 +10,8 @@ import pytest from frequenz.channels import select, selected_from -from frequenz.channels.util import FileWatcher, Timer +from frequenz.channels.file_watcher import FileWatcher +from frequenz.channels.timer import Timer @pytest.mark.integration diff --git a/tests/test_select_integration.py b/tests/test_select_integration.py index 19d69b95..6d676528 100644 --- a/tests/test_select_integration.py +++ b/tests/test_select_integration.py @@ -23,7 +23,7 @@ class at a time. select, selected_from, ) -from frequenz.channels.util import Event +from frequenz.channels.event import Event @pytest.mark.integration diff --git a/tests/utils/test_timer.py b/tests/test_timer.py similarity index 99% rename from tests/utils/test_timer.py rename to tests/test_timer.py index dd5e5109..73fe28f6 100644 --- a/tests/utils/test_timer.py +++ b/tests/test_timer.py @@ -14,7 +14,7 @@ import pytest from hypothesis import strategies as st -from frequenz.channels.util import ( +from frequenz.channels.timer import ( SkipMissedAndDrift, SkipMissedAndResync, Timer, From dd756d7321515c0fda4cfcedf134e0e119b28aae Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Tue, 7 Nov 2023 16:04:02 +0100 Subject: [PATCH 11/13] Move nested class out of the parent class Removing nested classes avoids having to use hacky constructions, like requiring the use of `from __future__ import annotations`, types as strings and confusing the `mkdocstrings` tools when extracting and cross-linking docs. Signed-off-by: Leandro Lucarella --- src/frequenz/channels/_select.py | 23 +++++++------- src/frequenz/channels/file_watcher.py | 44 ++++++++++++-------------- tests/test_file_watcher.py | 12 +++---- tests/test_file_watcher_integration.py | 14 +++----- 4 files changed, 43 insertions(+), 50 deletions(-) diff --git a/src/frequenz/channels/_select.py b/src/frequenz/channels/_select.py index 9b182f1b..a7ec4c90 100644 --- a/src/frequenz/channels/_select.py +++ b/src/frequenz/channels/_select.py @@ -17,6 +17,16 @@ _T = TypeVar("_T") +class _EmptyResult: + """A sentinel value to distinguish between None and empty result. + + We need a sentinel because a result can also be `None`. + """ + + def __repr__(self) -> str: + return "" + + class Selected(Generic[_T]): """A result of a [`select()`][frequenz.channels.select] iteration. @@ -31,15 +41,6 @@ class Selected(Generic[_T]): Please see [`select()`][frequenz.channels.select] for an example. """ - class _EmptyResult: - """A sentinel value to distinguish between None and empty result. - - We need a sentinel because a result can also be `None`. - """ - - def __repr__(self) -> str: - return "" - def __init__(self, receiver: Receiver[_T]) -> None: """Create a new instance. @@ -55,7 +56,7 @@ def __init__(self, receiver: Receiver[_T]) -> None: self._recv: Receiver[_T] = receiver """The receiver that was selected.""" - self._value: _T | Selected._EmptyResult = Selected._EmptyResult() + self._value: _T | _EmptyResult = _EmptyResult() """The value that was received. If there was an exception while receiving the value, then this will be `None`. @@ -86,7 +87,7 @@ def value(self) -> _T: """ if self._exception is not None: raise self._exception - assert not isinstance(self._value, Selected._EmptyResult) + assert not isinstance(self._value, _EmptyResult) return self._value @property diff --git a/src/frequenz/channels/file_watcher.py b/src/frequenz/channels/file_watcher.py index 5ddb6e95..64bc62e1 100644 --- a/src/frequenz/channels/file_watcher.py +++ b/src/frequenz/channels/file_watcher.py @@ -3,8 +3,6 @@ """A Channel receiver for watching for new, modified or deleted files.""" -from __future__ import annotations - import asyncio import pathlib from collections import abc @@ -17,29 +15,31 @@ from ._receiver import Receiver, ReceiverStoppedError -class FileWatcher(Receiver["FileWatcher.Event"]): - """A channel receiver that watches for file events.""" +class EventType(Enum): + """Available types of changes to watch for.""" + + CREATE = Change.added + """A new file was created.""" - class EventType(Enum): - """Available types of changes to watch for.""" + MODIFY = Change.modified + """An existing file was modified.""" - CREATE = Change.added - """A new file was created.""" + DELETE = Change.deleted + """An existing file was deleted.""" - MODIFY = Change.modified - """An existing file was modified.""" - DELETE = Change.deleted - """An existing file was deleted.""" +@dataclass(frozen=True) +class Event: + """A file change event.""" - @dataclass(frozen=True) - class Event: - """A file change event.""" + type: EventType + """The type of change that was observed.""" + path: pathlib.Path + """The path where the change was observed.""" - type: FileWatcher.EventType - """The type of change that was observed.""" - path: pathlib.Path - """The path where the change was observed.""" + +class FileWatcher(Receiver[Event]): + """A channel receiver that watches for file events.""" def __init__( self, @@ -53,7 +53,7 @@ def __init__( event_types: Types of events to watch for. Defaults to watch for all event types. """ - self.event_types: frozenset[FileWatcher.EventType] = frozenset(event_types) + self.event_types: frozenset[EventType] = frozenset(event_types) """The types of events to watch for.""" self._stop_event: asyncio.Event = asyncio.Event() @@ -133,9 +133,7 @@ def consume(self) -> Event: assert self._changes, "`consume()` must be preceded by a call to `ready()`" # Tuple of (Change, path) returned by watchfiles change, path_str = self._changes.pop() - return FileWatcher.Event( - type=FileWatcher.EventType(change), path=pathlib.Path(path_str) - ) + return Event(type=EventType(change), path=pathlib.Path(path_str)) def __str__(self) -> str: """Return a string representation of this receiver.""" diff --git a/tests/test_file_watcher.py b/tests/test_file_watcher.py index 2071b9da..c1a65838 100644 --- a/tests/test_file_watcher.py +++ b/tests/test_file_watcher.py @@ -15,7 +15,7 @@ from watchfiles import Change from watchfiles.main import FileChange -from frequenz.channels.file_watcher import FileWatcher +from frequenz.channels.file_watcher import Event, EventType, FileWatcher class _FakeAwatch: @@ -74,14 +74,14 @@ async def test_file_watcher_receive_updates( for change in changes: recv_changes = await file_watcher.receive() - event_type = FileWatcher.EventType(change[0]) + event_type = EventType(change[0]) path = pathlib.Path(change[1]) - assert recv_changes == FileWatcher.Event(type=event_type, path=path) + assert recv_changes == Event(type=event_type, path=path) -@hypothesis.given(event_types=st.sets(st.sampled_from(FileWatcher.EventType))) +@hypothesis.given(event_types=st.sets(st.sampled_from(EventType))) async def test_file_watcher_filter_events( - event_types: set[FileWatcher.EventType], + event_types: set[EventType], ) -> None: """Test the file watcher events filtering.""" good_path = "good-file" @@ -100,7 +100,7 @@ async def test_file_watcher_filter_events( pathlib.Path(good_path), stop_event=mock.ANY, watch_filter=filter_events ) ] - for event_type in FileWatcher.EventType: + for event_type in EventType: assert filter_events(event_type.value, good_path) == ( event_type in event_types ) diff --git a/tests/test_file_watcher_integration.py b/tests/test_file_watcher_integration.py index 5c9ab935..754aca5f 100644 --- a/tests/test_file_watcher_integration.py +++ b/tests/test_file_watcher_integration.py @@ -10,7 +10,7 @@ import pytest from frequenz.channels import select, selected_from -from frequenz.channels.file_watcher import FileWatcher +from frequenz.channels.file_watcher import Event, EventType, FileWatcher from frequenz.channels.timer import Timer @@ -33,12 +33,8 @@ async def test_file_watcher(tmp_path: pathlib.Path) -> None: if selected_from(selected, timer): filename.write_text(f"{selected.value}") elif selected_from(selected, file_watcher): - event_type = ( - FileWatcher.EventType.CREATE - if number_of_writes == 0 - else FileWatcher.EventType.MODIFY - ) - assert selected.value == FileWatcher.Event(type=event_type, path=filename) + event_type = EventType.CREATE if number_of_writes == 0 else EventType.MODIFY + assert selected.value == Event(type=event_type, path=filename) number_of_writes += 1 # After receiving a write 3 times, unsubscribe from the writes channel if number_of_writes == expected_number_of_writes: @@ -58,9 +54,7 @@ async def test_file_watcher_deletes(tmp_path: pathlib.Path) -> None: tmp_path: A tmp directory to run the file watcher on. Created by pytest. """ filename = tmp_path / "test-file" - file_watcher = FileWatcher( - paths=[str(tmp_path)], event_types={FileWatcher.EventType.DELETE} - ) + file_watcher = FileWatcher(paths=[str(tmp_path)], event_types={EventType.DELETE}) write_timer = Timer.timeout(timedelta(seconds=0.1)) deletion_timer = Timer.timeout(timedelta(seconds=0.25)) From 8e70c41ceec00f848de4f21e5afad2cf69fc131a Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 8 Nov 2023 14:12:58 +0100 Subject: [PATCH 12/13] Remove release notes Summary This is from the beta-2 release, so we clear it out until we are ready for the new release. Signed-off-by: Leandro Lucarella --- RELEASE_NOTES.md | 1 - 1 file changed, 1 deletion(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 08db0eae..7bd56aa8 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,7 +2,6 @@ ## Summary -The `Timer` now can be started with a delay. ## Upgrading From cc17298a35ed6c1b11c57b9b8a1be71e99df84c8 Mon Sep 17 00:00:00 2001 From: Leandro Lucarella Date: Wed, 8 Nov 2023 14:36:50 +0100 Subject: [PATCH 13/13] Update release notes Signed-off-by: Leandro Lucarella --- RELEASE_NOTES.md | 61 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 57 insertions(+), 4 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 7bd56aa8..fd869421 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -11,11 +11,9 @@ You should instantiate using `Anycast(name=..., limit=...)` (or `Anycast(name=...)` if the default `limit` is enough) instead of `Anycast(...)` or `Anycast(maxsize=...)`. -* `Bidirectional` - - - The `client_id` and `service_id` arguments were merged into a keyword-only `name`. + - `new_sender` and `new_receiver`: They now return a base `Sender` and `Receiver` class (respectively) instead of a channel-specific `Sender` or `Receiver` subclass. - You should instantiate using `Bidirectional(name=...)` instead of `Bidirectional(..., ...)` or `Bidirectional(client_id=..., service_id=...)`. + This means users now don't have access to the internals to the channel-specific `Sender` and `Receiver` subclasses. * `Broadcast` @@ -27,16 +25,71 @@ You should use `.new_receiver(name=name, limit=limit)` (or `.new_receiver()` if the defaults are enough) instead of `.new_receiver(name)` or `.new_receiver(name, maxsize)`. + - `new_sender` and `new_receiver` now return a base `Sender` and `Receiver` class (respectively) instead of a channel-specific `Sender` or `Receiver` subclass. + + This means users now don't have access to the internals to the channel-specific `Sender` and `Receiver` subclasses. + * `Event` - `__init__`: The `name` argument was made keyword-only. The default was changed to a more readable version of `id(self)`. You should instantiate using `Event(name=...)` instead of `Event(...)`. + - Moved from `frequenz.channels.util` to `frequenz.channels.event`. + +* `FileWatcher` + + - Moved from `frequenz.channels.util` to `frequenz.channels.file_watcher`. + + - Support classes are no longer nested inside `FileWatcher`. They are now top-level classes within the new `frequenz.channels.file_watcher` module (e.g., `frequenz.channels.util.FileWatcher.EventType` -> `frequenz.channels.file_watcher.EventType`, `frequenz.channels.util.FileWatcher.Event` -> `frequenz.channels.file_watcher.Event`). + +* `Timer` and support classes + + - Moved from `frequenz.channels.util` to `frequenz.channels.timer`. + * All exceptions that took `Any` as the `message` argument now take `str` instead. If you were passing a non-`str` value to an exception, you should convert it using `str(value)` before passing it to the exception. +* The following symbols were moved to the top-level `frequenz.channels` package: + + - `Merge` + - `MergeNamed` + - `Selected` + - `SelectError` + - `SelectErrorGroup` + - `UnhandledSelectedError` + - `select` + - `selected_from` + +### Removals + +* `Bidirectional` + + This channel was removed as it is not recommended practice and was a niche use case. If you need to use it, you can set up two channels or copy the `Bidirectional` class from the previous version to your project. + +* `Peekable` + + This class was removed because it was merely a shortcut to a receiver that caches the last value received. It did not fit the channel abstraction well and was infrequently used. + + You can replace it with a task that receives and retains the last value. + +* `Broadcast.new_peekable()` + + This was removed alongside `Peekable`. + +* `Receiver.into_peekable()` + + This was removed alongside `Peekable`. + +* `ReceiverInvalidatedError` + + This was removed alongside `Peekable` (it was only raised when using a `Receiver` that was converted into a `Peekable`). + +* `util` + + The entire `util` package was removed and its symbols were either moved to the top-level package or to their own public modules (as noted above). + ## New Features * `Anycast`