Skip to content

Commit

Permalink
Move utility receivers to their own public modules
Browse files Browse the repository at this point in the history
We move the `Event`, `FileWatcher` and `Timer` receivers to their own
separate public modules. Since they are not core components and users
might only need to use some of them, it is more clear to have them
separated.

Also in the case of the `FileWatcher`, it requires an extra dependency,
which we could potentially have as an optional dependency, so users that
don't need it don't need to install it.

In the future we might distribute these also as separate python packages
(wheels).

Signed-off-by: Leandro Lucarella <luca-frequenz@llucax.com>
  • Loading branch information
llucax committed Nov 7, 2023
1 parent b120fed commit bbb0a8c
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 76 deletions.
4 changes: 3 additions & 1 deletion .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
"part:receivers":
- "src/frequenz/channels/_merge.py"
- "src/frequenz/channels/_merge_named.py"
- "src/frequenz/channels/util/**"
- "src/frequenz/channels/event.py"
- "src/frequenz/channels/file_watcher.py"
- "src/frequenz/channels/timer.py"

"part:select":
- "src/frequenz/channels/_select.py"
16 changes: 11 additions & 5 deletions src/frequenz/channels/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@
* [select][frequenz.channels.select]: Iterate over the values of all
[receivers][frequenz.channels.Receiver] as new values become available.
* [util][frequenz.channels.util]: A module with extra utilities, like special
receivers that implement timers, file watchers, etc.
Exception classes:
* [Error][frequenz.channels.Error]: Base class for all errors in this
Expand Down Expand Up @@ -65,9 +62,19 @@
* [UnhandledSelectedError][frequenz.channels.UnhandledSelectedError]: An error
raised by [select][frequenz.channels.select] that was not handled by the
user.
Extra utility receivers:
* [Event][frequenz.channels.event.Event]: A receiver that generates a message when
an event is set.
* [FileWatcher][frequenz.channels.file_watcher.FileWatcher]: A receiver that
generates a message when a file is added, modified or deleted.
* [Timer][frequenz.channels.timer.Timer]: A receiver that generates a message after a
given amount of time.
"""

from . import util
from ._anycast import Anycast
from ._broadcast import Broadcast
from ._exceptions import ChannelClosedError, ChannelError, Error
Expand Down Expand Up @@ -103,5 +110,4 @@
"UnhandledSelectedError",
"select",
"selected_from",
"util",
]
4 changes: 2 additions & 2 deletions src/frequenz/channels/_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ async def select(*receivers: Receiver[Any]) -> AsyncIterator[Selected[Any]]:
import datetime
from typing import assert_never
from frequenz.channels import ReceiverStoppedError
from frequenz.channels.util import select, selected_from, Timer
from frequenz.channels import ReceiverStoppedError, select, selected_from
from frequenz.channels.timer import Timer
timer1 = Timer.periodic(datetime.timedelta(seconds=1))
timer2 = Timer.timeout(datetime.timedelta(seconds=0.5))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@
class Event(_receiver.Receiver[None]):
"""A receiver that can be made ready through an event.
The receiver (the [`ready()`][frequenz.channels.util.Event.ready] method) will wait
until [`set()`][frequenz.channels.util.Event.set] is called. At that point the
The receiver (the [`ready()`][frequenz.channels.event.Event.ready] method) will wait
until [`set()`][frequenz.channels.event.Event.set] is called. At that point the
receiver will wait again after the event is
[`consume()`][frequenz.channels.Receiver.consume]d.
The receiver can be completely stopped by calling
[`stop()`][frequenz.channels.util.Event.stop].
[`stop()`][frequenz.channels.event.Event.stop].
Example:
```python
import asyncio
from frequenz.channels import Receiver
from frequenz.channels.util import Event, select, selected_from
from frequenz.channels import Receiver, select, selected_from
from frequenz.channels.event import Event
other_receiver: Receiver[int] = ...
exit_event = Event()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from watchfiles import Change, awatch
from watchfiles.main import FileChange

from .._receiver import Receiver, ReceiverStoppedError
from ._receiver import Receiver, ReceiverStoppedError


class FileWatcher(Receiver["FileWatcher.Event"]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import asyncio
from datetime import timedelta

from .._receiver import Receiver, ReceiverStoppedError
from ._receiver import Receiver, ReceiverStoppedError


def _to_microseconds(time: float | timedelta) -> int:
Expand Down Expand Up @@ -270,7 +270,7 @@ class Timer(Receiver[timedelta]):
"""A timer receiver that triggers every `interval` time.
The timer has microseconds resolution, so the
[`interval`][frequenz.channels.util.Timer.interval] must be at least
[`interval`][frequenz.channels.timer.Timer.interval] must be at least
1 microsecond.
The message it produces is a [`timedelta`][datetime.timedelta] containing the drift
Expand All @@ -283,34 +283,34 @@ class Timer(Receiver[timedelta]):
as the timer uses [`asyncio`][asyncio]s loop monotonic clock.
If the timer is delayed too much, then it will behave according to the
[`missed_tick_policy`][frequenz.channels.util.Timer.missed_tick_policy]. Missing
[`missed_tick_policy`][frequenz.channels.timer.Timer.missed_tick_policy]. Missing
ticks might or might not trigger a message and the drift could be accumulated or not
depending on the chosen policy.
These are the currently built-in available policies:
* [`SkipMissedAndDrift`][frequenz.channels.util.SkipMissedAndDrift]
* [`SkipMissedAndResync`][frequenz.channels.util.SkipMissedAndResync]
* [`TriggerAllMissed`][frequenz.channels.util.TriggerAllMissed]
* [`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift]
* [`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync]
* [`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed]
For the most common cases, a specialized constructor is provided:
* [`periodic()`][frequenz.channels.util.Timer.periodic] (uses the
[`TriggerAllMissed`][frequenz.channels.util.TriggerAllMissed] or
[`SkipMissedAndResync`][frequenz.channels.util.SkipMissedAndResync] policy)
* [`timeout()`][frequenz.channels.util.Timer.timeout] (uses the
[`SkipMissedAndDrift`][frequenz.channels.util.SkipMissedAndDrift] policy)
* [`periodic()`][frequenz.channels.timer.Timer.periodic] (uses the
[`TriggerAllMissed`][frequenz.channels.timer.TriggerAllMissed] or
[`SkipMissedAndResync`][frequenz.channels.timer.SkipMissedAndResync] policy)
* [`timeout()`][frequenz.channels.timer.Timer.timeout] (uses the
[`SkipMissedAndDrift`][frequenz.channels.timer.SkipMissedAndDrift] policy)
The timer accepts an optional [`loop`][frequenz.channels.util.Timer.loop], which
The timer accepts an optional [`loop`][frequenz.channels.timer.Timer.loop], which
will be used to track the time. If `loop` is `None`, then the running loop will be
used (if there is no running loop most calls will raise
a [`RuntimeError`][RuntimeError]).
Starting the timer can be delayed if necessary by using `auto_start=False`
(for example until we have a running loop). A call to
[`reset()`][frequenz.channels.util.Timer.reset],
[`ready()`][frequenz.channels.util.Timer.ready],
[`receive()`][frequenz.channels.util.Timer.receive] or the async iterator interface
[`reset()`][frequenz.channels.timer.Timer.reset],
[`ready()`][frequenz.channels.timer.Timer.ready],
[`receive()`][frequenz.channels.timer.Timer.receive] or the async iterator interface
to await for a new message will start the timer.
Example: Periodic timer example
Expand All @@ -323,8 +323,7 @@ class Timer(Receiver[timedelta]):
it with other receivers, and even start it (semi) manually:
```python
from frequenz.channels.util import select, selected_from
from frequenz.channels import Broadcast
from frequenz.channels import Broadcast, select, selected_from
timer = Timer.timeout(timedelta(seconds=1.0), auto_start=False)
chan = Broadcast[int](name="input-chan")
Expand All @@ -346,8 +345,7 @@ class Timer(Receiver[timedelta]):
Example: Timeout example
```python
from frequenz.channels.util import select, selected_from
from frequenz.channels import Broadcast
from frequenz.channels import Broadcast, select, selected_from
def process_data(data: int):
print(f"Processing data: {data}")
Expand Down
36 changes: 0 additions & 36 deletions src/frequenz/channels/util/__init__.py

This file was deleted.

2 changes: 1 addition & 1 deletion tests/utils/test_event.py → tests/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import pytest as _pytest

from frequenz.channels import ReceiverStoppedError
from frequenz.channels.util import Event
from frequenz.channels.event import Event


async def test_event() -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from watchfiles import Change
from watchfiles.main import FileChange

from frequenz.channels.util import FileWatcher
from frequenz.channels.file_watcher import FileWatcher


class _FakeAwatch:
Expand Down Expand Up @@ -52,7 +52,7 @@ def fake_awatch() -> Iterator[_FakeAwatch]:
"""Fixture to mock the awatch function."""
fake = _FakeAwatch()
with mock.patch(
"frequenz.channels.util._file_watcher.awatch",
"frequenz.channels.file_watcher.awatch",
autospec=True,
side_effect=fake.fake_awatch,
):
Expand Down Expand Up @@ -89,7 +89,7 @@ async def test_file_watcher_filter_events(
# We need to reset the mock explicitly because hypothesis runs all the produced
# inputs in the same context.
with mock.patch(
"frequenz.channels.util._file_watcher.awatch", autospec=True
"frequenz.channels.file_watcher.awatch", autospec=True
) as awatch_mock:
file_watcher = FileWatcher(paths=[good_path], event_types=event_types)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
import pytest

from frequenz.channels import select, selected_from
from frequenz.channels.util import FileWatcher, Timer
from frequenz.channels.file_watcher import FileWatcher
from frequenz.channels.timer import Timer


@pytest.mark.integration
Expand Down
2 changes: 1 addition & 1 deletion tests/test_select_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class at a time.
select,
selected_from,
)
from frequenz.channels.util import Event
from frequenz.channels.event import Event


@pytest.mark.integration
Expand Down
2 changes: 1 addition & 1 deletion tests/utils/test_timer.py → tests/test_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import pytest
from hypothesis import strategies as st

from frequenz.channels.util import (
from frequenz.channels.timer import (
SkipMissedAndDrift,
SkipMissedAndResync,
Timer,
Expand Down

0 comments on commit bbb0a8c

Please sign in to comment.