Skip to content

Commit

Permalink
Add stop method for MergeNamed and Merge channels
Browse files Browse the repository at this point in the history
Both channels stores async tasks that listen on the receivers.
If they are removed without stopping the receivers tasks,
then user gets error: Task was destroyed but it is pending!

`stop` method should be called when channel is no longer needed.
Destructor can't be async, so we do that with extra method `stop`.

Signed-off-by: ela-kotulska-frequenz <elzbieta.kotulska@frequenz.com>
  • Loading branch information
ela-kotulska-frequenz committed Jan 26, 2023
1 parent 32a091a commit dd5d3a7
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 8 deletions.
8 changes: 1 addition & 7 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,10 @@

## Summary

<!-- Here goes a general summary of what this release is about -->

## Upgrading

<!-- Here goes notes on how to upgrade from previous versions, including if there are any depractions and what they should be replaced with -->

## New Features

<!-- Here goes the main new features and examples or instructions on how to use them -->
* Add method to stop `Merge` and `MergeNamed`.

## Bug Fixes

<!-- Here goes notable bug fixes that are worth a special mention or explanation -->
10 changes: 10 additions & 0 deletions src/frequenz/channels/util/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ class Merge(Receiver[T]):
# do something with msg
pass
```
When `merge` is no longer needed, then it should be stopped using
`self.stop()` method. This will cleanup any internal pending async tasks.
"""

def __init__(self, *args: Receiver[T]) -> None:
Expand All @@ -44,6 +47,13 @@ def __del__(self) -> None:
for task in self._pending:
task.cancel()

async def stop(self) -> None:
"""Stop the `Merge` instance and cleanup any pending tasks."""
for task in self._pending:
task.cancel()
await asyncio.gather(*self._pending, return_exceptions=True)
self._pending = set()

async def ready(self) -> None:
"""Wait until the receiver is ready with a value.
Expand Down
13 changes: 12 additions & 1 deletion src/frequenz/channels/util/_merge_named.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@


class MergeNamed(Receiver[Tuple[str, T]]):
"""Merge messages coming from multiple named channels into a single stream."""
"""Merge messages coming from multiple named channels into a single stream.
When `MergeNamed` is no longer needed, then it should be stopped using
`self.stop()` method. This will cleanup any internal pending async tasks.
"""

def __init__(self, **kwargs: Receiver[T]) -> None:
"""Create a `MergeNamed` instance.
Expand All @@ -31,6 +35,13 @@ def __del__(self) -> None:
for task in self._pending:
task.cancel()

async def stop(self) -> None:
"""Stop the `MergeNamed` instance and cleanup any pending tasks."""
for task in self._pending:
task.cancel()
await asyncio.gather(*self._pending, return_exceptions=True)
self._pending = set()

async def ready(self) -> None:
"""Wait until there's a message in any of the channels.
Expand Down

0 comments on commit dd5d3a7

Please sign in to comment.