Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp modules structure #235

Merged
merged 13 commits into from
Nov 9, 2023
12 changes: 1 addition & 11 deletions src/frequenz/channels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@

Other base classes:

* [Peekable][frequenz.channels.Peekable]: An object to allow users to get
a peek at the latest value in the channel, without consuming anything.

* [Receiver][frequenz.channels.Receiver]: An object that can wait for and
consume messages from a channel.

Expand Down Expand Up @@ -52,22 +49,17 @@

* [ReceiverStoppedError][frequenz.channels.ReceiverStoppedError]: A receiver
stopped producing messages.

* [ReceiverInvalidatedError][frequenz.channels.ReceiverInvalidatedError]:
A receiver is not longer valid (for example if it was converted into
a peekable.
"""

from . import util
from ._anycast import Anycast
from ._base_classes import Peekable, Receiver, Sender
from ._base_classes import Receiver, Sender
from ._broadcast import Broadcast
from ._exceptions import (
ChannelClosedError,
ChannelError,
Error,
ReceiverError,
ReceiverInvalidatedError,
ReceiverStoppedError,
SenderError,
)
Expand All @@ -78,10 +70,8 @@
"ChannelClosedError",
"ChannelError",
"Error",
"Peekable",
"Receiver",
"ReceiverError",
"ReceiverInvalidatedError",
"ReceiverStoppedError",
"Sender",
"SenderError",
Expand Down
34 changes: 0 additions & 34 deletions src/frequenz/channels/_base_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,40 +122,6 @@ def map(self, call: Callable[[T], U]) -> Receiver[U]:
"""
return _Map(self, call)

def into_peekable(self) -> Peekable[T]:
"""Convert the `Receiver` implementation into a `Peekable`.

Once this function has been called, the receiver will no longer be
usable, and calling `receive` on the receiver will raise an exception.

Returns:
A `Peekable` that can be used to peek at the latest value in the
channel.

Raises:
NotImplementedError: when a `Receiver` implementation doesn't have
a custom `into_peekable` implementation.
"""
raise NotImplementedError("This receiver does not implement `into_peekable`")


class Peekable(ABC, Generic[T]):
"""A channel peekable.

A Peekable provides a [peek()][frequenz.channels.Peekable] method that
allows the user to get a peek at the latest value in the channel, without
consuming anything.
"""

@abstractmethod
def peek(self) -> T | None:
"""Return the latest value that was sent to the channel.

Returns:
The latest value received by the channel, and `None`, if nothing
has been sent to the channel yet.
"""


class _Map(Receiver[U], Generic[T, U]):
"""Apply a transform function on a channel receiver.
Expand Down
99 changes: 2 additions & 97 deletions src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,10 @@
from collections import deque
from typing import Generic

from ._base_classes import Peekable as BasePeekable
from ._base_classes import Receiver as BaseReceiver
from ._base_classes import Sender as BaseSender
from ._base_classes import T
from ._exceptions import (
ChannelClosedError,
ReceiverInvalidatedError,
ReceiverStoppedError,
SenderError,
)
from ._exceptions import ChannelClosedError, ReceiverStoppedError, SenderError

_logger = logging.Logger(__name__)

Expand Down Expand Up @@ -176,18 +170,6 @@ def new_receiver(self, *, name: str | None = None, limit: int = 50) -> Receiver[
recv.enqueue(self._latest)
return recv

def new_peekable(self) -> Peekable[T]:
shsms marked this conversation as resolved.
Show resolved Hide resolved
"""Create a new Peekable for the broadcast channel.

A Peekable provides a [peek()][frequenz.channels.Peekable.peek] method
that allows the user to get a peek at the latest value in the channel,
without consuming anything.

Returns:
A Peekable to peek into the broadcast channel with.
"""
return Peekable(self)

def __str__(self) -> str:
"""Return a string representation of this receiver."""
return f"{type(self).__name__}:{self._name}"
Expand Down Expand Up @@ -295,13 +277,6 @@ def __init__(self, name: str | None, limit: int, chan: Broadcast[T]) -> None:
self._q: deque[T] = deque(maxlen=limit)
"""The receiver's internal message queue."""

self._active: bool = True
"""Whether the receiver is still active.

If this receiver is converted into a Peekable, it will neither be
considered valid nor active.
"""

def enqueue(self, msg: T) -> None:
"""Put a message into this receiver's queue.

Expand Down Expand Up @@ -343,10 +318,6 @@ async def ready(self) -> bool:
if self._q:
return True

# if it is not longer active, return immediately
if not self._active:
return False

# Use a while loop here, to handle spurious wakeups of condition variables.
#
# The condition also makes sure that if there are already messages ready to be
Expand All @@ -360,15 +331,6 @@ async def ready(self) -> bool:
return True
# pylint: enable=protected-access

def _deactivate(self) -> None:
"""Set the receiver as inactive and remove it from the channel."""
self._active = False
# pylint: disable=protected-access
_hash = hash(self)
if _hash in self._chan._receivers:
del self._chan._receivers[_hash]
# pylint: enable=protected-access

def consume(self) -> T:
"""Return the latest value once `ready` is complete.

Expand All @@ -377,34 +339,13 @@ def consume(self) -> T:

Raises:
ReceiverStoppedError: if there is some problem with the receiver.
ReceiverInvalidatedError: if the receiver was converted into
a peekable.
"""
if not self._q and not self._active:
raise ReceiverInvalidatedError(
"This receiver was converted into a Peekable so it is not longer valid.",
self,
)

if not self._q and self._chan._closed: # pylint: disable=protected-access
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)

assert self._q, "`consume()` must be preceded by a call to `ready()`"
return self._q.popleft()

def into_peekable(self) -> Peekable[T]:
"""Convert the `Receiver` implementation into a `Peekable`.

Once this function has been called, the receiver will no longer be
usable, and calling [receive()][frequenz.channels.Receiver.receive] on
the receiver will raise an exception.

Returns:
A `Peekable` instance.
"""
self._deactivate()
return Peekable(self._chan)

def __str__(self) -> str:
"""Return a string representation of this receiver."""
return f"{self._chan}:{type(self).__name__}"
Expand All @@ -415,41 +356,5 @@ def __repr__(self) -> str:
assert limit is not None
return (
f"{type(self).__name__}(name={self._name!r}, limit={limit!r}, "
f"{self._chan!r}):<id={id(self)!r}, used={len(self._q)!r}, "
f"active={self._active!r}>"
f"{self._chan!r}):<id={id(self)!r}, used={len(self._q)!r}>"
)


class Peekable(BasePeekable[T]):
"""A Peekable to peek into broadcast channels.

A Peekable provides a [peek()][frequenz.channels.Peekable] method that
allows the user to get a peek at the latest value in the channel, without
consuming anything.
"""

def __init__(self, chan: Broadcast[T]) -> None:
"""Create a `Peekable` instance.

Args:
chan: The broadcast channel this Peekable will try to peek into.
"""
self._chan: Broadcast[T] = chan
"""The broadcast channel this Peekable will try to peek into."""

def peek(self) -> T | None:
"""Return the latest value that was sent to the channel.

Returns:
The latest value received by the channel, and `None`, if nothing
has been sent to the channel yet, or if the channel is closed.
"""
return self._chan._latest # pylint: disable=protected-access

def __str__(self) -> str:
"""Return a string representation of this receiver."""
return f"{self._chan}:{type(self).__name__}"

def __repr__(self) -> str:
"""Return a string representation of this receiver."""
return f"{type(self).__name__}({self._chan!r}):<latest={self.peek()!r}>"
9 changes: 0 additions & 9 deletions src/frequenz/channels/_exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,3 @@ def __init__(self, receiver: _base_classes.Receiver[T]):
error happened.
"""
super().__init__(f"Receiver {receiver} was stopped", receiver)


class ReceiverInvalidatedError(ReceiverError[T]):
"""The [Receiver][frequenz.channels.Receiver] was invalidated.

This happens when the Receiver is converted
[into][frequenz.channels.Receiver.into_peekable]
a [Peekable][frequenz.channels.Peekable].
"""
27 changes: 0 additions & 27 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
Broadcast,
ChannelClosedError,
Receiver,
ReceiverInvalidatedError,
ReceiverStoppedError,
Sender,
SenderError,
Expand Down Expand Up @@ -187,32 +186,6 @@ async def test_broadcast_no_resend_latest() -> None:
assert await new_recv.receive() == 100


async def test_broadcast_peek() -> None:
"""Ensure we are able to peek into broadcast channels."""
bcast: Broadcast[int] = Broadcast(name="peek-test")
receiver = bcast.new_receiver()
peekable = receiver.into_peekable()
sender = bcast.new_sender()

with pytest.raises(ReceiverInvalidatedError):
await receiver.receive()

assert peekable.peek() is None

for val in range(0, 10):
await sender.send(val)

assert peekable.peek() == 9
assert peekable.peek() == 9

await sender.send(20)

assert peekable.peek() == 20

await bcast.close()
assert peekable.peek() is None


async def test_broadcast_async_iterator() -> None:
"""Check that the broadcast receiver works as an async iterator."""
bcast: Broadcast[int] = Broadcast(name="iter_test")
Expand Down