From d215c3e5d1017c9e46f5658cb65789be0e212a77 Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Wed, 13 Sep 2023 16:34:47 +0200 Subject: [PATCH 1/2] Fix typos in code documentation and tests Signed-off-by: Daniel Zullo --- src/frequenz/channels/_anycast.py | 4 ++-- src/frequenz/channels/_bidirectional.py | 4 ++-- src/frequenz/channels/_broadcast.py | 2 +- src/frequenz/channels/util/_event.py | 2 +- src/frequenz/channels/util/_file_watcher.py | 2 +- src/frequenz/channels/util/_merge.py | 2 +- src/frequenz/channels/util/_merge_named.py | 2 +- src/frequenz/channels/util/_select.py | 2 +- src/frequenz/channels/util/_timer.py | 10 +++++----- tests/test_anycast.py | 4 ++-- tests/test_broadcast.py | 2 +- tests/test_merge.py | 2 +- tests/test_mergenamed.py | 2 +- tests/utils/test_select_integration.py | 4 ++-- tests/utils/test_timer.py | 18 +++++++++--------- 15 files changed, 31 insertions(+), 31 deletions(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index edc665ac..7237e440 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -111,7 +111,7 @@ def new_receiver(self) -> Receiver[T]: class Sender(BaseSender[T]): """A sender to send messages to an Anycast channel. - Should not be created directly, but through the `Anycast.ggetet_sender()` + Should not be created directly, but through the `Anycast.new_sender()` method. """ @@ -211,7 +211,7 @@ def consume(self) -> T: assert ( self._next is not _Empty - ), "`consume()` must be preceeded by a call to `ready()`" + ), "`consume()` must be preceded by a call to `ready()`" # mypy doesn't understand that the assert above ensures that self._next is not # _Sentinel. So we have to use a type ignore here. next_val: T = self._next # type: ignore[assignment] diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py index d2cc8ac7..1edff009 100644 --- a/src/frequenz/channels/_bidirectional.py +++ b/src/frequenz/channels/_bidirectional.py @@ -57,7 +57,7 @@ async def send(self, msg: V) -> None: except SenderError as err: # If this comes from a channel error, then we inject another # ChannelError having the information about the Bidirectional - # channel to hide (at least partially) the underlaying + # channel to hide (at least partially) the underlying # Broadcast channels we use. if isinstance(err.__cause__, ChannelError): this_chan_error = ChannelError( @@ -98,7 +98,7 @@ def consume(self) -> W: except ReceiverError as err: # If this comes from a channel error, then we inject another # ChannelError having the information about the Bidirectional - # channel to hide (at least partially) the underlaying + # channel to hide (at least partially) the underlying # Broadcast channels we use. if isinstance(err.__cause__, ChannelError): this_chan_error = ChannelError( diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index ef814e22..1fdf12e8 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -311,7 +311,7 @@ def consume(self) -> T: if not self._q and self._chan.closed: raise ReceiverStoppedError(self) from ChannelClosedError(self._chan) - assert self._q, "`consume()` must be preceeded by a call to `ready()`" + assert self._q, "`consume()` must be preceded by a call to `ready()`" return self._q.popleft() def into_peekable(self) -> Peekable[T]: diff --git a/src/frequenz/channels/util/_event.py b/src/frequenz/channels/util/_event.py index a0c64372..c227663a 100644 --- a/src/frequenz/channels/util/_event.py +++ b/src/frequenz/channels/util/_event.py @@ -41,7 +41,7 @@ async def exit_after_10_seconds() -> None: if selected_from(selected, other_receiver): print(selected.value) else: - assert False, "Unknow receiver selected" + assert False, "Unknown receiver selected" ``` """ diff --git a/src/frequenz/channels/util/_file_watcher.py b/src/frequenz/channels/util/_file_watcher.py index e7ec9ca0..5c8b13fa 100644 --- a/src/frequenz/channels/util/_file_watcher.py +++ b/src/frequenz/channels/util/_file_watcher.py @@ -129,7 +129,7 @@ def consume(self) -> Event: if not self._changes and self._awatch_stopped_exc is not None: raise ReceiverStoppedError(self) from self._awatch_stopped_exc - assert self._changes, "`consume()` must be preceeded by a call to `ready()`" + assert self._changes, "`consume()` must be preceded by a call to `ready()`" # Tuple of (Change, path) returned by watchfiles change, path_str = self._changes.pop() return FileWatcher.Event( diff --git a/src/frequenz/channels/util/_merge.py b/src/frequenz/channels/util/_merge.py index c1170ffe..f026c9f1 100644 --- a/src/frequenz/channels/util/_merge.py +++ b/src/frequenz/channels/util/_merge.py @@ -113,6 +113,6 @@ def consume(self) -> T: if not self._results and not self._pending: raise ReceiverStoppedError(self) - assert self._results, "`consume()` must be preceeded by a call to `ready()`" + assert self._results, "`consume()` must be preceded by a call to `ready()`" return self._results.popleft() diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/util/_merge_named.py index 7fa06585..8f43e09a 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/util/_merge_named.py @@ -94,6 +94,6 @@ def consume(self) -> tuple[str, T]: if not self._results and not self._pending: raise ReceiverStoppedError(self) - assert self._results, "`consume()` must be preceeded by a call to `ready()`" + assert self._results, "`consume()` must be preceded by a call to `ready()`" return self._results.popleft() diff --git a/src/frequenz/channels/util/_select.py b/src/frequenz/channels/util/_select.py index f2d0d084..0ce63aad 100644 --- a/src/frequenz/channels/util/_select.py +++ b/src/frequenz/channels/util/_select.py @@ -198,7 +198,7 @@ class SelectErrorGroup(BaseExceptionGroup[BaseException], SelectError): """An exception group for [`select()`][frequenz.channels.util.select] operation. This exception group is raised when a `select()` loops fails while cleaning up - runing tasts to check for ready receivers. + running tests to check for ready receivers. """ diff --git a/src/frequenz/channels/util/_timer.py b/src/frequenz/channels/util/_timer.py index 5cab86b6..f236755e 100644 --- a/src/frequenz/channels/util/_timer.py +++ b/src/frequenz/channels/util/_timer.py @@ -81,7 +81,7 @@ class TriggerAllMissed(MissedTickPolicy): Example: Assume a timer with interval 1 second, the tick `T0` happens exactly at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds - late), so it trigges immediately. The third tick, `T2`, happens at + late), so it triggers immediately. The third tick, `T2`, happens at time 2.3 (0.3 seconds late), so it also triggers immediately. The fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also triggers immediately as well as the fifth tick, `T4`, which was also @@ -120,7 +120,7 @@ def calculate_next_tick_time( class SkipMissedAndResync(MissedTickPolicy): """A policy that drops all the missed ticks, triggers immediately and resyncs. - If ticks are missed, the timer will trigger immediately returing the drift + If ticks are missed, the timer will trigger immediately returning the drift and it will schedule to trigger again on the next multiple of `interval`, effectively skipping any missed ticks, but resyncing with the original start time. @@ -128,7 +128,7 @@ class SkipMissedAndResync(MissedTickPolicy): Example: Assume a timer with interval 1 second, the tick `T0` happens exactly at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds - late), so it trigges immediately. The third tick, `T2`, happens at + late), so it triggers immediately. The third tick, `T2`, happens at time 2.3 (0.3 seconds late), so it also triggers immediately. The fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also triggers immediately but the fifth tick, `T4`, which was also @@ -180,7 +180,7 @@ class SkipMissedAndDrift(MissedTickPolicy): Assume a timer with interval 1 second and `delay_tolerance=0.1`, the first tick, `T0`, happens exactly at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds late), so the timer triggers - immmediately but drifts a bit. The next tick, `T2.2`, happens at 2.3 seconds + immediately but drifts a bit. The next tick, `T2.2`, happens at 2.3 seconds (0.1 seconds late), so it also triggers immediately but it doesn't drift because the delay is under the `delay_tolerance`. The next tick, `T3.2`, triggers at 4.3 seconds (1.1 seconds late), so it also triggers @@ -200,7 +200,7 @@ def __init__(self, *, delay_tolerance: timedelta = timedelta(0)): """ Create an instance. - See the class documenation for more details. + See the class documentation for more details. Args: delay_tolerance: The maximum delay that is tolerated before diff --git a/tests/test_anycast.py b/tests/test_anycast.py index bb063ea5..9737a6ff 100644 --- a/tests/test_anycast.py +++ b/tests/test_anycast.py @@ -28,7 +28,7 @@ async def test_anycast() -> None: expected_sum = num_senders * num_receivers * (num_receivers + 1) / 2 # a list of `num_receivers` elements, where each element with get - # incremented by values the corrosponding receiver receives. Once the run + # incremented by values the corresponding receiver receives. Once the run # finishes, we will check if their sum equals `expected_sum`. recv_trackers = [0] * num_receivers @@ -46,7 +46,7 @@ async def update_tracker_on_receive(receiver_id: int, recv: Receiver[int]) -> No assert isinstance(err.__cause__, ChannelClosedError) return recv_trackers[receiver_id] += msg - # without the sleep, decomissioning receivers temporarily, all + # without the sleep, decommissioning receivers temporarily, all # messages go to the first receiver. await asyncio.sleep(0) diff --git a/tests/test_broadcast.py b/tests/test_broadcast.py index 936fe20d..aa8ea8cb 100644 --- a/tests/test_broadcast.py +++ b/tests/test_broadcast.py @@ -29,7 +29,7 @@ async def test_broadcast() -> None: expected_sum = num_senders * num_receivers * num_receivers * (num_receivers + 1) / 2 # a list of `num_receivers` elements, where each element with get - # incremented by values the corrosponding receiver receives. Once the run + # incremented by values the corresponding receiver receives. Once the run # finishes, we will check if their sum equals `expected_sum`. recv_trackers = [0] * num_receivers diff --git a/tests/test_merge.py b/tests/test_merge.py index e9516b29..9cc920f2 100644 --- a/tests/test_merge.py +++ b/tests/test_merge.py @@ -34,7 +34,7 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None: # It is hard to get messages from multiple channels in the same order, # so we use a `set` to check the next N messages are the same, in any # order, where N is the number of channels. This only works in this - # example because the `send` method sends values in immeidate + # example because the `send` method sends values in immediate # succession. assert set(results[idx : idx + 2]) == {ctr + 1, ctr + 101} assert results[-1] == 1000 diff --git a/tests/test_mergenamed.py b/tests/test_mergenamed.py index f51868e1..1c06bbe9 100644 --- a/tests/test_mergenamed.py +++ b/tests/test_mergenamed.py @@ -35,7 +35,7 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None: # It is hard to get messages from multiple channels in the same order, # so we use a `set` to check the next N messages are the same, in any # order, where N is the number of channels. This only works in this - # example because the `send` method sends values in immeidate + # example because the `send` method sends values in immediate # succession. assert set(results[idx : idx + 2]) == { ("chan1", ctr + 1), diff --git a/tests/utils/test_select_integration.py b/tests/utils/test_select_integration.py index 7d2ff997..e5f405d7 100644 --- a/tests/utils/test_select_integration.py +++ b/tests/utils/test_select_integration.py @@ -356,7 +356,7 @@ async def test_multiple_ready( self, start_run_multiple_ready: asyncio.Task[None], # pylint: disable=unused-argument ) -> None: - """Test that multiple ready receviers are handled properly. + """Test that multiple ready receivers are handled properly. Also test that the loop waits forever if there are no more receivers ready. """ @@ -401,7 +401,7 @@ async def test_multiple_ready( assert False, "Should not reach this point" except asyncio.TimeoutError: assert self.loop.time() == 15 - # This happened after time == 3, but the loop never resumes becuase + # This happened after time == 3, but the loop never resumes because # there is nothing ready, so we need to check it after the timeout. assert received == { self.recv1.name, diff --git a/tests/utils/test_timer.py b/tests/utils/test_timer.py index ecf774a5..dd5e5109 100644 --- a/tests/utils/test_timer.py +++ b/tests/utils/test_timer.py @@ -211,7 +211,7 @@ def test_policy_skip_missed_and_drift_examples() -> None: ) -async def test_timer_contruction_defaults() -> None: +async def test_timer_construction_defaults() -> None: """Test the construction of a periodic timer with default values.""" timer = Timer(timedelta(seconds=1.0), TriggerAllMissed()) assert timer.interval == timedelta(seconds=1.0) @@ -220,7 +220,7 @@ async def test_timer_contruction_defaults() -> None: assert timer.is_running is True -def test_timer_contruction_no_async() -> None: +def test_timer_construction_no_async() -> None: """Test the construction outside of async (using a custom loop).""" loop = async_solipsism.EventLoop() timer = Timer(timedelta(seconds=1.0), TriggerAllMissed(), loop=loop) @@ -230,13 +230,13 @@ def test_timer_contruction_no_async() -> None: assert timer.is_running is True -def test_timer_contruction_no_event_loop() -> None: +def test_timer_construction_no_event_loop() -> None: """Test the construction outside of async (without a custom loop) fails.""" with pytest.raises(RuntimeError, match="no running event loop"): Timer(timedelta(seconds=1.0), TriggerAllMissed()) -async def test_timer_contruction_auto_start() -> None: +async def test_timer_construction_auto_start() -> None: """Test the construction of a periodic timer with auto_start=False.""" policy = TriggerAllMissed() timer = Timer( @@ -251,7 +251,7 @@ async def test_timer_contruction_auto_start() -> None: assert timer.is_running is False -async def test_timer_contruction_custom_args() -> None: +async def test_timer_construction_custom_args() -> None: """Test the construction of a periodic timer with custom arguments.""" policy = TriggerAllMissed() timer = Timer( @@ -266,7 +266,7 @@ async def test_timer_contruction_custom_args() -> None: assert timer.is_running is True -async def test_timer_contruction_timeout_custom_args() -> None: +async def test_timer_construction_timeout_custom_args() -> None: """Test the construction of a timeout timer with custom arguments.""" timer = Timer.timeout( timedelta(seconds=5.0), @@ -280,7 +280,7 @@ async def test_timer_contruction_timeout_custom_args() -> None: assert timer.is_running is True -async def test_timer_contruction_periodic_defaults() -> None: +async def test_timer_construction_periodic_defaults() -> None: """Test the construction of a periodic timer.""" timer = Timer.periodic(timedelta(seconds=5.0)) assert timer.interval == timedelta(seconds=5.0) @@ -289,7 +289,7 @@ async def test_timer_contruction_periodic_defaults() -> None: assert timer.is_running is True -async def test_timer_contruction_periodic_custom_args() -> None: +async def test_timer_construction_periodic_custom_args() -> None: """Test the construction of a timeout timer with custom arguments.""" timer = Timer.periodic( timedelta(seconds=5.0), @@ -304,7 +304,7 @@ async def test_timer_contruction_periodic_custom_args() -> None: assert timer.is_running is True -async def test_timer_contruction_wrong_args() -> None: +async def test_timer_construction_wrong_args() -> None: """Test the construction of a timeout timer with wrong arguments.""" with pytest.raises( ValueError, From 17d91b79b34154ce8b7d3e5ac60f9bd4eac4be4b Mon Sep 17 00:00:00 2001 From: Daniel Zullo Date: Thu, 14 Sep 2023 11:53:28 +0200 Subject: [PATCH 2/2] Document class and module attributes Add or amend docstrings for all classes and modules where needed. Signed-off-by: Daniel Zullo --- src/frequenz/channels/_anycast.py | 29 +++++++++++++++++++- src/frequenz/channels/_base_classes.py | 8 +++--- src/frequenz/channels/_bidirectional.py | 13 +++++++++ src/frequenz/channels/_broadcast.py | 30 +++++++++++++++++++++ src/frequenz/channels/_exceptions.py | 3 +++ src/frequenz/channels/util/_file_watcher.py | 4 ++- src/frequenz/channels/util/_merge_named.py | 5 ++++ src/frequenz/channels/util/_select.py | 1 + tests/utils/test_file_watcher.py | 1 + 9 files changed, 89 insertions(+), 5 deletions(-) diff --git a/src/frequenz/channels/_anycast.py b/src/frequenz/channels/_anycast.py index 7237e440..98d03fb2 100644 --- a/src/frequenz/channels/_anycast.py +++ b/src/frequenz/channels/_anycast.py @@ -68,10 +68,34 @@ def __init__(self, maxsize: int = 10) -> None: maxsize: Size of the channel's buffer. """ self.limit: int = maxsize + """The maximum number of values that can be stored in the channel's buffer. + + If the length of channel's buffer reaches the limit, then the sender + blocks at the [send()][frequenz.channels.Sender.send] method until + a value is consumed. + """ + self.deque: Deque[T] = deque(maxlen=maxsize) + """The channel's buffer.""" + self.send_cv: Condition = Condition() + """The condition to wait for free space in the channel's buffer. + + If the channel's buffer is full, then the sender waits for values to + get consumed using this condition until there's some free space + available in the channel's buffer. + """ + self.recv_cv: Condition = Condition() + """The condition to wait for values in the channel's buffer. + + If the channel's buffer is empty, then the receiver waits for values + using this condition until there's a value available in the channel's + buffer. + """ + self.closed: bool = False + """Whether the channel is closed.""" async def close(self) -> None: """Close the channel. @@ -79,7 +103,7 @@ async def close(self) -> None: Any further attempts to [send()][frequenz.channels.Sender.send] data will return `False`. - Receivers will still be able to drain the pending items on the channel, + Receivers will still be able to drain the pending values on the channel, but after that, subsequent [receive()][frequenz.channels.Receiver.receive] calls will return `None` immediately. @@ -122,6 +146,7 @@ def __init__(self, chan: Anycast[T]) -> None: chan: A reference to the channel that this sender belongs to. """ self._chan = chan + """The channel that this sender belongs to.""" async def send(self, msg: T) -> None: """Send a message across the channel. @@ -169,6 +194,8 @@ def __init__(self, chan: Anycast[T]) -> None: chan: A reference to the channel that this receiver belongs to. """ self._chan = chan + """The channel that this receiver belongs to.""" + self._next: T | type[_Empty] = _Empty async def ready(self) -> bool: diff --git a/src/frequenz/channels/_base_classes.py b/src/frequenz/channels/_base_classes.py index 3518f116..638c3d3b 100644 --- a/src/frequenz/channels/_base_classes.py +++ b/src/frequenz/channels/_base_classes.py @@ -1,7 +1,7 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""Baseclasses for Channel Sender and Receiver.""" +"""Base classes for Channel Sender and Receiver.""" from __future__ import annotations @@ -169,11 +169,13 @@ def __init__(self, recv: Receiver[T], transform: Callable[[T], U]) -> None: Args: recv: The input receiver. - transform: The function to run on the input - data. + transform: The function to run on the input data. """ self._recv = recv + """The input receiver.""" + self._transform = transform + """The function to run on the input data.""" async def ready(self) -> bool: """Wait until the receiver is ready with a value or an error. diff --git a/src/frequenz/channels/_bidirectional.py b/src/frequenz/channels/_bidirectional.py index 1edff009..747567b5 100644 --- a/src/frequenz/channels/_bidirectional.py +++ b/src/frequenz/channels/_bidirectional.py @@ -38,8 +38,13 @@ def __init__( receiver: A receiver to receive values from. """ self._chan = channel + """The underlying channel.""" + self._sender = sender + """The sender to send values with.""" + self._receiver = receiver + """The receiver to receive values from.""" async def send(self, msg: V) -> None: """Send a value to the other side. @@ -117,21 +122,29 @@ def __init__(self, client_id: str, service_id: str) -> None: service_id: A name for the service end of the channels. """ self._client_id = client_id + """The name for the client, used to name the channels.""" + self._request_channel: Broadcast[T] = Broadcast(f"req_{service_id}_{client_id}") + """The channel to send requests.""" + self._response_channel: Broadcast[U] = Broadcast( f"resp_{service_id}_{client_id}" ) + """The channel to send responses.""" self._client_handle = Bidirectional.Handle( self, self._request_channel.new_sender(), self._response_channel.new_receiver(), ) + """The handle for the client side to send/receive values.""" + self._service_handle = Bidirectional.Handle( self, self._response_channel.new_sender(), self._request_channel.new_receiver(), ) + """The handle for the service side to send/receive values.""" @property def client_handle(self) -> Bidirectional.Handle[T, U]: diff --git a/src/frequenz/channels/_broadcast.py b/src/frequenz/channels/_broadcast.py index 1fdf12e8..27566ae6 100644 --- a/src/frequenz/channels/_broadcast.py +++ b/src/frequenz/channels/_broadcast.py @@ -85,12 +85,25 @@ def __init__(self, name: str, resend_latest: bool = False) -> None: to wait for the next message on the channel to arrive. """ self.name: str = name + """The name of the broadcast channel. + + Only used for debugging purposes. + """ + self._resend_latest = resend_latest + """Whether to resend the latest value to new receivers.""" self.recv_cv: Condition = Condition() + """The condition to wait for data in the channel's buffer.""" + self.receivers: dict[UUID, weakref.ReferenceType[Receiver[T]]] = {} + """The receivers attached to the channel, indexed by their UUID.""" + self.closed: bool = False + """Whether the channel is closed.""" + self._latest: T | None = None + """The latest value sent to the channel.""" async def close(self) -> None: """Close the Broadcast channel. @@ -167,6 +180,7 @@ def __init__(self, chan: Broadcast[T]) -> None: chan: A reference to the broadcast channel this sender belongs to. """ self._chan = chan + """The broadcast channel this sender belongs to.""" async def send(self, msg: T) -> None: """Send a message to all broadcast receivers. @@ -222,11 +236,26 @@ def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> N belongs to. """ self._uuid = uuid + """The UUID to identify the receiver in the broadcast channel's list of receivers.""" + self._name = name + """The name to identify the receiver. + + Only used for debugging purposes. + """ + self._chan = chan + """The broadcast channel that this receiver belongs to.""" + self._q: Deque[T] = deque(maxlen=maxsize) + """The receiver's internal message queue.""" self._active = True + """Whether the receiver is still active. + + If this receiver is converted into a Peekable, it will neither be + considered valid nor active. + """ def enqueue(self, msg: T) -> None: """Put a message into this receiver's queue. @@ -343,6 +372,7 @@ def __init__(self, chan: Broadcast[T]) -> None: chan: The broadcast channel this Peekable will try to peek into. """ self._chan = chan + """The broadcast channel this Peekable will try to peek into.""" def peek(self) -> T | None: """Return the latest value that was sent to the channel. diff --git a/src/frequenz/channels/_exceptions.py b/src/frequenz/channels/_exceptions.py index 7147efee..2042003b 100644 --- a/src/frequenz/channels/_exceptions.py +++ b/src/frequenz/channels/_exceptions.py @@ -43,6 +43,7 @@ def __init__(self, message: Any, channel: Any): """ super().__init__(message) self.channel: Any = channel + """The channel where the error happened.""" class ChannelClosedError(ChannelError): @@ -73,6 +74,7 @@ def __init__(self, message: Any, sender: _base_classes.Sender[T]): """ super().__init__(message) self.sender: _base_classes.Sender[T] = sender + """The sender where the error happened.""" class ReceiverError(Error, Generic[T]): @@ -91,6 +93,7 @@ def __init__(self, message: Any, receiver: _base_classes.Receiver[T]): """ super().__init__(message) self.receiver: _base_classes.Receiver[T] = receiver + """The receiver where the error happened.""" class ReceiverStoppedError(ReceiverError[T]): diff --git a/src/frequenz/channels/util/_file_watcher.py b/src/frequenz/channels/util/_file_watcher.py index 5c8b13fa..1c87742a 100644 --- a/src/frequenz/channels/util/_file_watcher.py +++ b/src/frequenz/channels/util/_file_watcher.py @@ -1,7 +1,7 @@ # License: MIT # Copyright © 2022 Frequenz Energy-as-a-Service GmbH -"""A Channel receiver for watching for new (or modified) files.""" +"""A Channel receiver for watching for new, modified or deleted files.""" from __future__ import annotations @@ -55,6 +55,8 @@ def __init__( all event types. """ self.event_types: frozenset[FileWatcher.EventType] = frozenset(event_types) + """The types of events to watch for.""" + self._stop_event = asyncio.Event() self._paths = [ path if isinstance(path, pathlib.Path) else pathlib.Path(path) diff --git a/src/frequenz/channels/util/_merge_named.py b/src/frequenz/channels/util/_merge_named.py index 8f43e09a..d8ab9839 100644 --- a/src/frequenz/channels/util/_merge_named.py +++ b/src/frequenz/channels/util/_merge_named.py @@ -25,11 +25,16 @@ def __init__(self, **kwargs: Receiver[T]) -> None: **kwargs: sequence of channel receivers. """ self._receivers = kwargs + """The sequence of channel receivers to get the messages to merge.""" + self._pending: set[asyncio.Task[Any]] = { asyncio.create_task(recv.__anext__(), name=name) for name, recv in self._receivers.items() } + """The set of pending tasks to merge messages.""" + self._results: Deque[tuple[str, T]] = deque(maxlen=len(self._receivers)) + """The internal buffer of merged messages.""" def __del__(self) -> None: """Cleanup any pending tasks.""" diff --git a/src/frequenz/channels/util/_select.py b/src/frequenz/channels/util/_select.py index 0ce63aad..0196390c 100644 --- a/src/frequenz/channels/util/_select.py +++ b/src/frequenz/channels/util/_select.py @@ -192,6 +192,7 @@ def __init__(self, selected: Selected[_T]) -> None: recv = selected._recv # pylint: disable=protected-access super().__init__(f"Selected receiver {recv} was not handled in the if-chain") self.selected = selected + """The selected receiver that was not handled.""" class SelectErrorGroup(BaseExceptionGroup[BaseException], SelectError): diff --git a/tests/utils/test_file_watcher.py b/tests/utils/test_file_watcher.py index 789427b9..e3eade62 100644 --- a/tests/utils/test_file_watcher.py +++ b/tests/utils/test_file_watcher.py @@ -29,6 +29,7 @@ def __init__(self, changes: Sequence[FileChange] = ()) -> None: function. """ self.changes: Sequence[FileChange] = changes + """The sequence of file changes.""" async def fake_awatch( self, *paths: str, **kwargs: Any # pylint: disable=unused-argument