Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename "value" to "message" and other minor breaking changes #281

Merged
merged 10 commits into from
Mar 6, 2024
16 changes: 8 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ async def main() -> None:
receiver = hello_channel.new_receiver()

await sender.send("Hello World!")
msg = await receiver.receive()
print(msg)
message = await receiver.receive()
print(message)


asyncio.run(main())
Expand Down Expand Up @@ -141,8 +141,8 @@ async def send(
await sender.send(f"{sender}: {counter}")
counter += 1
elif selected_from(selected, control_command):
print(f"{sender}: Received command: {selected.value.name}")
match selected.value:
print(f"{sender}: Received command: {selected.message.name}")
match selected.message:
case Command.STOP_SENDER:
print(f"{sender}: Stopping")
break
Expand All @@ -166,13 +166,13 @@ async def receive(
merged = merge(*receivers)
async for selected in select(merged, timer, control_command):
if selected_from(selected, merged):
message = selected.value
message = selected.message
print(f"receive: Received {message=}")
timer.reset()
print(f"{timer=}")
elif selected_from(selected, control_command):
print(f"receive: received command: {selected.value.name}")
match selected.value:
print(f"receive: received command: {selected.message.name}")
match selected.message:
case Command.PING:
print("receive: Ping received, reply with pong")
await control_reply.send(Reply(ReplyCommand.PONG, "receive"))
Expand All @@ -181,7 +181,7 @@ async def receive(
case _ as unknown:
assert_never(unknown)
elif selected_from(selected, timer):
drift = selected.value
drift = selected.message
print(
f"receive: No data received for {timer.interval + drift} seconds, "
"giving up"
Expand Down
17 changes: 15 additions & 2 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@

- Support classes are no longer nested inside `FileWatcher`. They are now top-level classes within the new `frequenz.channels.file_watcher` module (e.g., `frequenz.channels.util.FileWatcher.EventType` -> `frequenz.channels.file_watcher.EventType`, `frequenz.channels.util.FileWatcher.Event` -> `frequenz.channels.file_watcher.Event`).

* `Receiver`

- The `map()` function now takes a positional-only argument, if you were using `receiver.map(call=fun)` you should replace it with `receiver.map(func)`.

* `Selected`

- The `value` property was renamed to `message`.
- `was_stopped` is now a property, you need to replace `selected.was_stopped()` with `selected.was_stopped`.

* `Sender`

- The `send` method now takes a positional-only argument, if you were using `sender.send(msg=message)` you should replace it with `sender.send(message)`.

* `Timer` and support classes

- Moved from `frequenz.channels.util` to `frequenz.channels.timer`.
Expand Down Expand Up @@ -79,9 +92,9 @@

* `Peekable`

This class was removed because it was merely a shortcut to a receiver that caches the last value received. It did not fit the channel abstraction well and was infrequently used.
This class was removed because it was merely a shortcut to a receiver that caches the last message received. It did not fit the channel abstraction well and was infrequently used.

You can replace it with a task that receives and retains the last value.
You can replace it with a task that receives and retains the last message.

* `Broadcast.new_peekable()`

Expand Down
3 changes: 0 additions & 3 deletions docs/_scripts/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ def _slugify(text: str) -> str:
Returns:
The slugified text.
"""
# The type of the return value is not defined for the markdown library.
# Also for some reason `mypy` thinks the `toc` module doesn't have a
# `slugify_unicode` function, but it definitely does.
return toc.slugify_unicode(text, "-")


Expand Down
8 changes: 4 additions & 4 deletions docs/user-guide/receiving/synchronization/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ not work:
receiver1: Receiver[int] = channel1.new_receiver()
receiver2: Receiver[int] = channel2.new_receiver()

msg = await receiver1.receive()
print(f"Received from channel1: {msg}")
message = await receiver1.receive()
print(f"Received from channel1: {message}")

msg = await receiver2.receive()
print(f"Received from channel2: {msg}")
message = await receiver2.receive()
print(f"Received from channel2: {message}")
```

The problem is that if the first message is not available in `channel1` but in
Expand Down
4 changes: 2 additions & 2 deletions src/frequenz/channels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
* [merge][frequenz.channels.merge]: Merge messages coming from multiple receivers into
a single stream.

* [select][frequenz.channels.select]: Iterate over the values of all
[receivers][frequenz.channels.Receiver] as new values become available.
* [select][frequenz.channels.select]: Iterate over the messages of all
[receivers][frequenz.channels.Receiver] as new messages become available.

Exception classes:

Expand Down
114 changes: 57 additions & 57 deletions src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class Anycast(Generic[_T]):
When the channel is not needed anymore, it should be closed with the
[`close()`][frequenz.channels.Anycast.close] method. This will prevent further
attempts to [`send()`][frequenz.channels.Sender.send] data. Receivers will still be
able to drain the pending values on the channel, but after that, subsequent
able to drain the pending messages on the channel, but after that, subsequent
[`receive()`][frequenz.channels.Receiver.receive] calls will raise a
[`ReceiverStoppedError`][frequenz.channels.ReceiverStoppedError] exception.

Expand All @@ -101,9 +101,9 @@ class Anycast(Generic[_T]):


async def send(sender: Sender[int]) -> None:
for msg in range(3):
print(f"sending {msg}")
await sender.send(msg)
for message in range(3):
print(f"sending {message}")
await sender.send(message)


async def main() -> None:
Expand All @@ -115,8 +115,8 @@ async def main() -> None:
async with asyncio.TaskGroup() as task_group:
task_group.create_task(send(sender))
for _ in range(3):
msg = await receiver.receive()
print(f"received {msg}")
message = await receiver.receive()
print(f"received {message}")
await asyncio.sleep(0.1) # sleep (or work) with the data


Expand Down Expand Up @@ -146,15 +146,15 @@ async def main() -> None:


async def send(name: str, sender: Sender[int], start: int, stop: int) -> None:
for msg in range(start, stop):
print(f"{name} sending {msg}")
await sender.send(msg)
for message in range(start, stop):
print(f"{name} sending {message}")
await sender.send(message)


async def recv(name: str, receiver: Receiver[int]) -> None:
try:
async for msg in receiver:
print(f"{name} received {msg}")
async for message in receiver:
print(f"{name} received {message}")
await asyncio.sleep(0.1) # sleep (or work) with the data
except ReceiverStoppedError:
pass
Expand All @@ -181,15 +181,15 @@ async def main() -> None:
sender_1 sending 11
sender_1 sending 12
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
consumes a message
sender_2 sending 20
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
consumes a message
receiver_1 received 10
receiver_1 received 11
sender_2 sending 21
Anycast channel [Anycast:numbers:_Sender] is full, blocking sender until a receiver
consumes a value
consumes a message
receiver_1 received 12
receiver_1 received 20
receiver_1 received 21
Expand Down Expand Up @@ -219,16 +219,16 @@ def __init__(self, *, name: str, limit: int = 10) -> None:
self._send_cv: Condition = Condition()
"""The condition to wait for free space in the channel's buffer.

If the channel's buffer is full, then the sender waits for values to
If the channel's buffer is full, then the sender waits for messages to
get consumed using this condition until there's some free space
available in the channel's buffer.
"""

self._recv_cv: Condition = Condition()
"""The condition to wait for values in the channel's buffer.
"""The condition to wait for messages in the channel's buffer.

If the channel's buffer is empty, then the receiver waits for values
using this condition until there's a value available in the channel's
If the channel's buffer is empty, then the receiver waits for messages
using this condition until there's a message available in the channel's
buffer.
"""

Expand All @@ -255,11 +255,11 @@ def is_closed(self) -> bool:

@property
def limit(self) -> int:
"""The maximum number of values that can be stored in the channel's buffer.
"""The maximum number of messages that can be stored in the channel's buffer.

If the length of channel's buffer reaches the limit, then the sender
blocks at the [send()][frequenz.channels.Sender.send] method until
a value is consumed.
a message is consumed.
"""
maxlen = self._deque.maxlen
assert maxlen is not None
Expand All @@ -271,7 +271,7 @@ async def close(self) -> None:
Any further attempts to [send()][frequenz.channels.Sender.send] data
will return `False`.

Receivers will still be able to drain the pending values on the channel,
Receivers will still be able to drain the pending messages on the channel,
but after that, subsequent
[receive()][frequenz.channels.Receiver.receive] calls will return `None`
immediately.
Expand Down Expand Up @@ -309,16 +309,16 @@ class _Sender(Sender[_T]):
method.
"""

def __init__(self, chan: Anycast[_T]) -> None:
def __init__(self, channel: Anycast[_T], /) -> None:
"""Initialize this sender.

Args:
chan: A reference to the channel that this sender belongs to.
channel: A reference to the channel that this sender belongs to.
"""
self._chan: Anycast[_T] = chan
self._channel: Anycast[_T] = channel
"""The channel that this sender belongs to."""

async def send(self, msg: _T) -> None:
async def send(self, message: _T, /) -> None:
"""Send a message across the channel.

To send, this method inserts the message into the Anycast channel's
Expand All @@ -327,47 +327,47 @@ async def send(self, msg: _T) -> None:
message will be received by exactly one receiver.

Args:
msg: The message to be sent.
message: The message to be sent.

Raises:
SenderError: If the underlying channel was closed.
A [ChannelClosedError][frequenz.channels.ChannelClosedError] is
set as the cause.
"""
# pylint: disable=protected-access
if self._chan._closed:
if self._channel._closed:
raise SenderError("The channel was closed", self) from ChannelClosedError(
self._chan
self._channel
)
if len(self._chan._deque) == self._chan._deque.maxlen:
if len(self._channel._deque) == self._channel._deque.maxlen:
_logger.warning(
"Anycast channel [%s] is full, blocking sender until a receiver "
"consumes a value",
"consumes a message",
self,
)
while len(self._chan._deque) == self._chan._deque.maxlen:
async with self._chan._send_cv:
await self._chan._send_cv.wait()
while len(self._channel._deque) == self._channel._deque.maxlen:
async with self._channel._send_cv:
await self._channel._send_cv.wait()
_logger.info(
"Anycast channel [%s] has space again, resuming the blocked sender",
self,
)
self._chan._deque.append(msg)
async with self._chan._recv_cv:
self._chan._recv_cv.notify(1)
self._channel._deque.append(message)
async with self._channel._recv_cv:
self._channel._recv_cv.notify(1)
# pylint: enable=protected-access

def __str__(self) -> str:
"""Return a string representation of this sender."""
return f"{self._chan}:{type(self).__name__}"
return f"{self._channel}:{type(self).__name__}"

def __repr__(self) -> str:
"""Return a string representation of this sender."""
return f"{type(self).__name__}({self._chan!r})"
return f"{type(self).__name__}({self._channel!r})"


class _Empty:
"""A sentinel value to indicate that a value has not been set."""
"""A sentinel to indicate that a message has not been set."""


class _Receiver(Receiver[_T]):
Expand All @@ -377,21 +377,21 @@ class _Receiver(Receiver[_T]):
method.
"""

def __init__(self, chan: Anycast[_T]) -> None:
def __init__(self, channel: Anycast[_T], /) -> None:
"""Initialize this receiver.

Args:
chan: A reference to the channel that this receiver belongs to.
channel: A reference to the channel that this receiver belongs to.
"""
self._chan: Anycast[_T] = chan
self._channel: Anycast[_T] = channel
"""The channel that this receiver belongs to."""

self._next: _T | type[_Empty] = _Empty

async def ready(self) -> bool:
"""Wait until the receiver is ready with a value or an error.
"""Wait until the receiver is ready with a message or an error.

Once a call to `ready()` has finished, the value should be read with
Once a call to `ready()` has finished, the message should be read with
a call to `consume()` (`receive()` or iterated over). The receiver will
remain ready (this method will return immediately) until it is
consumed.
Expand All @@ -404,31 +404,31 @@ async def ready(self) -> bool:
return True

# pylint: disable=protected-access
while len(self._chan._deque) == 0:
if self._chan._closed:
while len(self._channel._deque) == 0:
if self._channel._closed:
return False
async with self._chan._recv_cv:
await self._chan._recv_cv.wait()
self._next = self._chan._deque.popleft()
async with self._chan._send_cv:
self._chan._send_cv.notify(1)
async with self._channel._recv_cv:
await self._channel._recv_cv.wait()
self._next = self._channel._deque.popleft()
async with self._channel._send_cv:
self._channel._send_cv.notify(1)
# pylint: enable=protected-access
return True

def consume(self) -> _T:
"""Return the latest value once `ready()` is complete.
"""Return the latest message once `ready()` is complete.

Returns:
The next value that was received.
The next message that was received.

Raises:
ReceiverStoppedError: If the receiver stopped producing messages.
ReceiverError: If there is some problem with the receiver.
"""
if ( # pylint: disable=protected-access
self._next is _Empty and self._chan._closed
self._next is _Empty and self._channel._closed
):
raise ReceiverStoppedError(self) from ChannelClosedError(self._chan)
raise ReceiverStoppedError(self) from ChannelClosedError(self._channel)

assert (
self._next is not _Empty
Expand All @@ -442,8 +442,8 @@ def consume(self) -> _T:

def __str__(self) -> str:
"""Return a string representation of this receiver."""
return f"{self._chan}:{type(self).__name__}"
return f"{self._channel}:{type(self).__name__}"

def __repr__(self) -> str:
"""Return a string representation of this receiver."""
return f"{type(self).__name__}({self._chan!r})"
return f"{type(self).__name__}({self._channel!r})"
Loading
Loading