Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Document class and module attributes #185

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,42 @@ def __init__(self, maxsize: int = 10) -> None:
maxsize: Size of the channel's buffer.
"""
self.limit: int = maxsize
daniel-zullo-frequenz marked this conversation as resolved.
Show resolved Hide resolved
daniel-zullo-frequenz marked this conversation as resolved.
Show resolved Hide resolved
"""The maximum number of values that can be stored in the channel's buffer.

If the length of channel's buffer reaches the limit, then the sender
blocks at the [send()][frequenz.channels.Sender.send] method until
a value is consumed.
"""

self.deque: Deque[T] = deque(maxlen=maxsize)
"""The channel's buffer."""

self.send_cv: Condition = Condition()
"""The condition to wait for free space in the channel's buffer.

If the channel's buffer is full, then the sender waits for values to
get consumed using this condition until there's some free space
available in the channel's buffer.
"""

self.recv_cv: Condition = Condition()
"""The condition to wait for values in the channel's buffer.

If the channel's buffer is empty, then the receiver waits for values
using this condition until there's a value available in the channel's
buffer.
"""

self.closed: bool = False
"""Whether the channel is closed."""

async def close(self) -> None:
"""Close the channel.

Any further attempts to [send()][frequenz.channels.Sender.send] data
will return `False`.

Receivers will still be able to drain the pending items on the channel,
Receivers will still be able to drain the pending values on the channel,
but after that, subsequent
[receive()][frequenz.channels.Receiver.receive] calls will return `None`
immediately.
Expand Down Expand Up @@ -111,7 +135,7 @@ def new_receiver(self) -> Receiver[T]:
class Sender(BaseSender[T]):
"""A sender to send messages to an Anycast channel.

Should not be created directly, but through the `Anycast.ggetet_sender()`
Should not be created directly, but through the `Anycast.new_sender()`
method.
"""

Expand All @@ -122,6 +146,7 @@ def __init__(self, chan: Anycast[T]) -> None:
chan: A reference to the channel that this sender belongs to.
"""
self._chan = chan
"""The channel that this sender belongs to."""

async def send(self, msg: T) -> None:
"""Send a message across the channel.
Expand Down Expand Up @@ -169,6 +194,8 @@ def __init__(self, chan: Anycast[T]) -> None:
chan: A reference to the channel that this receiver belongs to.
"""
self._chan = chan
"""The channel that this receiver belongs to."""

self._next: T | type[_Empty] = _Empty

async def ready(self) -> bool:
Expand Down Expand Up @@ -211,7 +238,7 @@ def consume(self) -> T:

assert (
self._next is not _Empty
), "`consume()` must be preceeded by a call to `ready()`"
), "`consume()` must be preceded by a call to `ready()`"
# mypy doesn't understand that the assert above ensures that self._next is not
# _Sentinel. So we have to use a type ignore here.
next_val: T = self._next # type: ignore[assignment]
Expand Down
8 changes: 5 additions & 3 deletions src/frequenz/channels/_base_classes.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""Baseclasses for Channel Sender and Receiver."""
"""Base classes for Channel Sender and Receiver."""

from __future__ import annotations

Expand Down Expand Up @@ -169,11 +169,13 @@ def __init__(self, recv: Receiver[T], transform: Callable[[T], U]) -> None:

Args:
recv: The input receiver.
transform: The function to run on the input
data.
transform: The function to run on the input data.
"""
self._recv = recv
"""The input receiver."""
daniel-zullo-frequenz marked this conversation as resolved.
Show resolved Hide resolved

self._transform = transform
"""The function to run on the input data."""

async def ready(self) -> bool:
"""Wait until the receiver is ready with a value or an error.
Expand Down
17 changes: 15 additions & 2 deletions src/frequenz/channels/_bidirectional.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,13 @@ def __init__(
receiver: A receiver to receive values from.
"""
self._chan = channel
"""The underlying channel."""

self._sender = sender
"""The sender to send values with."""

self._receiver = receiver
"""The receiver to receive values from."""

async def send(self, msg: V) -> None:
"""Send a value to the other side.
Expand All @@ -57,7 +62,7 @@ async def send(self, msg: V) -> None:
except SenderError as err:
# If this comes from a channel error, then we inject another
# ChannelError having the information about the Bidirectional
# channel to hide (at least partially) the underlaying
# channel to hide (at least partially) the underlying
# Broadcast channels we use.
if isinstance(err.__cause__, ChannelError):
this_chan_error = ChannelError(
Expand Down Expand Up @@ -98,7 +103,7 @@ def consume(self) -> W:
except ReceiverError as err:
# If this comes from a channel error, then we inject another
# ChannelError having the information about the Bidirectional
# channel to hide (at least partially) the underlaying
# channel to hide (at least partially) the underlying
# Broadcast channels we use.
if isinstance(err.__cause__, ChannelError):
this_chan_error = ChannelError(
Expand All @@ -117,21 +122,29 @@ def __init__(self, client_id: str, service_id: str) -> None:
service_id: A name for the service end of the channels.
"""
self._client_id = client_id
"""The name for the client, used to name the channels."""
llucax marked this conversation as resolved.
Show resolved Hide resolved

self._request_channel: Broadcast[T] = Broadcast(f"req_{service_id}_{client_id}")
"""The channel to send requests."""

self._response_channel: Broadcast[U] = Broadcast(
f"resp_{service_id}_{client_id}"
)
"""The channel to send responses."""

self._client_handle = Bidirectional.Handle(
self,
self._request_channel.new_sender(),
self._response_channel.new_receiver(),
)
"""The handle for the client side to send/receive values."""

self._service_handle = Bidirectional.Handle(
self,
self._response_channel.new_sender(),
self._request_channel.new_receiver(),
)
"""The handle for the service side to send/receive values."""

@property
def client_handle(self) -> Bidirectional.Handle[T, U]:
Expand Down
32 changes: 31 additions & 1 deletion src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,25 @@ def __init__(self, name: str, resend_latest: bool = False) -> None:
to wait for the next message on the channel to arrive.
"""
self.name: str = name
"""The name of the broadcast channel.

Only used for debugging purposes.
"""

self._resend_latest = resend_latest
"""Whether to resend the latest value to new receivers."""

self.recv_cv: Condition = Condition()
"""The condition to wait for data in the channel's buffer."""

self.receivers: dict[UUID, weakref.ReferenceType[Receiver[T]]] = {}
"""The receivers attached to the channel, indexed by their UUID."""

self.closed: bool = False
"""Whether the channel is closed."""

self._latest: T | None = None
"""The latest value sent to the channel."""

async def close(self) -> None:
"""Close the Broadcast channel.
Expand Down Expand Up @@ -167,6 +180,7 @@ def __init__(self, chan: Broadcast[T]) -> None:
chan: A reference to the broadcast channel this sender belongs to.
"""
self._chan = chan
"""The broadcast channel this sender belongs to."""

async def send(self, msg: T) -> None:
"""Send a message to all broadcast receivers.
Expand Down Expand Up @@ -222,11 +236,26 @@ def __init__(self, uuid: UUID, name: str, maxsize: int, chan: Broadcast[T]) -> N
belongs to.
"""
self._uuid = uuid
"""The UUID to identify the receiver in the broadcast channel's list of receivers."""

self._name = name
"""The name to identify the receiver.

Only used for debugging purposes.
"""

self._chan = chan
"""The broadcast channel that this receiver belongs to."""

self._q: Deque[T] = deque(maxlen=maxsize)
"""The receiver's internal message queue."""

self._active = True
"""Whether the receiver is still active.

If this receiver is converted into a Peekable, it will neither be
considered valid nor active.
"""

def enqueue(self, msg: T) -> None:
"""Put a message into this receiver's queue.
Expand Down Expand Up @@ -311,7 +340,7 @@ def consume(self) -> T:
if not self._q and self._chan.closed:
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)

assert self._q, "`consume()` must be preceeded by a call to `ready()`"
assert self._q, "`consume()` must be preceded by a call to `ready()`"
return self._q.popleft()

def into_peekable(self) -> Peekable[T]:
Expand Down Expand Up @@ -343,6 +372,7 @@ def __init__(self, chan: Broadcast[T]) -> None:
chan: The broadcast channel this Peekable will try to peek into.
"""
self._chan = chan
"""The broadcast channel this Peekable will try to peek into."""

def peek(self) -> T | None:
"""Return the latest value that was sent to the channel.
Expand Down
3 changes: 3 additions & 0 deletions src/frequenz/channels/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self, message: Any, channel: Any):
"""
super().__init__(message)
self.channel: Any = channel
"""The channel where the error happened."""


class ChannelClosedError(ChannelError):
Expand Down Expand Up @@ -73,6 +74,7 @@ def __init__(self, message: Any, sender: _base_classes.Sender[T]):
"""
super().__init__(message)
self.sender: _base_classes.Sender[T] = sender
"""The sender where the error happened."""


class ReceiverError(Error, Generic[T]):
Expand All @@ -91,6 +93,7 @@ def __init__(self, message: Any, receiver: _base_classes.Receiver[T]):
"""
super().__init__(message)
self.receiver: _base_classes.Receiver[T] = receiver
"""The receiver where the error happened."""


class ReceiverStoppedError(ReceiverError[T]):
Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/channels/util/_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ async def exit_after_10_seconds() -> None:
if selected_from(selected, other_receiver):
print(selected.value)
else:
assert False, "Unknow receiver selected"
assert False, "Unknown receiver selected"
```
"""

Expand Down
6 changes: 4 additions & 2 deletions src/frequenz/channels/util/_file_watcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH

"""A Channel receiver for watching for new (or modified) files."""
"""A Channel receiver for watching for new, modified or deleted files."""

from __future__ import annotations

Expand Down Expand Up @@ -55,6 +55,8 @@ def __init__(
all event types.
"""
self.event_types: frozenset[FileWatcher.EventType] = frozenset(event_types)
"""The types of events to watch for."""

self._stop_event = asyncio.Event()
self._paths = [
path if isinstance(path, pathlib.Path) else pathlib.Path(path)
Expand Down Expand Up @@ -129,7 +131,7 @@ def consume(self) -> Event:
if not self._changes and self._awatch_stopped_exc is not None:
raise ReceiverStoppedError(self) from self._awatch_stopped_exc

assert self._changes, "`consume()` must be preceeded by a call to `ready()`"
assert self._changes, "`consume()` must be preceded by a call to `ready()`"
# Tuple of (Change, path) returned by watchfiles
change, path_str = self._changes.pop()
return FileWatcher.Event(
Expand Down
2 changes: 1 addition & 1 deletion src/frequenz/channels/util/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,6 @@ def consume(self) -> T:
if not self._results and not self._pending:
raise ReceiverStoppedError(self)

assert self._results, "`consume()` must be preceeded by a call to `ready()`"
assert self._results, "`consume()` must be preceded by a call to `ready()`"

return self._results.popleft()
7 changes: 6 additions & 1 deletion src/frequenz/channels/util/_merge_named.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ def __init__(self, **kwargs: Receiver[T]) -> None:
**kwargs: sequence of channel receivers.
"""
self._receivers = kwargs
"""The sequence of channel receivers to get the messages to merge."""

self._pending: set[asyncio.Task[Any]] = {
asyncio.create_task(recv.__anext__(), name=name)
for name, recv in self._receivers.items()
}
"""The set of pending tasks to merge messages."""

self._results: Deque[tuple[str, T]] = deque(maxlen=len(self._receivers))
"""The internal buffer of merged messages."""

def __del__(self) -> None:
"""Cleanup any pending tasks."""
Expand Down Expand Up @@ -94,6 +99,6 @@ def consume(self) -> tuple[str, T]:
if not self._results and not self._pending:
raise ReceiverStoppedError(self)

assert self._results, "`consume()` must be preceeded by a call to `ready()`"
assert self._results, "`consume()` must be preceded by a call to `ready()`"

return self._results.popleft()
3 changes: 2 additions & 1 deletion src/frequenz/channels/util/_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,13 +192,14 @@ def __init__(self, selected: Selected[_T]) -> None:
recv = selected._recv # pylint: disable=protected-access
super().__init__(f"Selected receiver {recv} was not handled in the if-chain")
self.selected = selected
"""The selected receiver that was not handled."""


class SelectErrorGroup(BaseExceptionGroup[BaseException], SelectError):
"""An exception group for [`select()`][frequenz.channels.util.select] operation.

This exception group is raised when a `select()` loops fails while cleaning up
runing tasts to check for ready receivers.
running tests to check for ready receivers.
"""


Expand Down
10 changes: 5 additions & 5 deletions src/frequenz/channels/util/_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class TriggerAllMissed(MissedTickPolicy):
Example:
Assume a timer with interval 1 second, the tick `T0` happens exactly
at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds
late), so it trigges immediately. The third tick, `T2`, happens at
late), so it triggers immediately. The third tick, `T2`, happens at
time 2.3 (0.3 seconds late), so it also triggers immediately. The
fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also
triggers immediately as well as the fifth tick, `T4`, which was also
Expand Down Expand Up @@ -120,15 +120,15 @@ def calculate_next_tick_time(
class SkipMissedAndResync(MissedTickPolicy):
"""A policy that drops all the missed ticks, triggers immediately and resyncs.

If ticks are missed, the timer will trigger immediately returing the drift
If ticks are missed, the timer will trigger immediately returning the drift
and it will schedule to trigger again on the next multiple of `interval`,
effectively skipping any missed ticks, but resyncing with the original start
time.

Example:
Assume a timer with interval 1 second, the tick `T0` happens exactly
at time 0, the second tick, `T1`, happens at time 1.2 (0.2 seconds
late), so it trigges immediately. The third tick, `T2`, happens at
late), so it triggers immediately. The third tick, `T2`, happens at
time 2.3 (0.3 seconds late), so it also triggers immediately. The
fourth tick, `T3`, happens at time 4.3 (1.3 seconds late), so it also
triggers immediately but the fifth tick, `T4`, which was also
Expand Down Expand Up @@ -180,7 +180,7 @@ class SkipMissedAndDrift(MissedTickPolicy):
Assume a timer with interval 1 second and `delay_tolerance=0.1`, the
first tick, `T0`, happens exactly at time 0, the second tick, `T1`,
happens at time 1.2 (0.2 seconds late), so the timer triggers
immmediately but drifts a bit. The next tick, `T2.2`, happens at 2.3 seconds
immediately but drifts a bit. The next tick, `T2.2`, happens at 2.3 seconds
(0.1 seconds late), so it also triggers immediately but it doesn't
drift because the delay is under the `delay_tolerance`. The next tick,
`T3.2`, triggers at 4.3 seconds (1.1 seconds late), so it also triggers
Expand All @@ -200,7 +200,7 @@ def __init__(self, *, delay_tolerance: timedelta = timedelta(0)):
"""
Create an instance.

See the class documenation for more details.
See the class documentation for more details.

Args:
delay_tolerance: The maximum delay that is tolerated before
Expand Down
Loading