Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace obsolete types #176

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions benchmarks/benchmark_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import asyncio
import csv
import timeit
from typing import Any, Coroutine, Dict, List, Tuple
from collections.abc import Coroutine
from typing import Any

from frequenz.channels import Anycast, Receiver, Sender

Expand Down Expand Up @@ -40,7 +41,7 @@ async def benchmark_anycast(
Returns:
int: Total number of messages received by all channels.
"""
channels: List[Anycast[int]] = [Anycast(buffer_size) for _ in range(num_channels)]
channels: list[Anycast[int]] = [Anycast(buffer_size) for _ in range(num_channels)]
senders = [
asyncio.create_task(send_msg(num_messages, bcast.new_sender()))
for bcast in channels
Expand Down Expand Up @@ -68,7 +69,7 @@ async def update_tracker_on_receive(chan: Receiver[int]) -> None:
return recv_trackers[0]


def time_async_task(task: Coroutine[Any, Any, int]) -> Tuple[float, Any]:
def time_async_task(task: Coroutine[Any, Any, int]) -> tuple[float, Any]:
"""Run a task and return the time taken and the result.

Args:
Expand All @@ -87,7 +88,7 @@ def run_one(
num_messages: int,
num_receivers: int,
buffer_size: int,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Run a single benchmark."""
runtime, total_msgs = time_async_task(
benchmark_anycast(num_channels, num_messages, num_receivers, buffer_size)
Expand Down
13 changes: 7 additions & 6 deletions benchmarks/benchmark_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import asyncio
import csv
import timeit
from collections.abc import Callable, Coroutine
from functools import partial
from typing import Any, Callable, Coroutine, Dict, List, Tuple
from typing import Any

from frequenz.channels import Broadcast, Receiver, Sender

Expand Down Expand Up @@ -60,8 +61,8 @@ async def benchmark_broadcast(
Returns:
int: Total number of messages received by all receivers.
"""
channels: List[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)]
senders: List[asyncio.Task[Any]] = [
channels: list[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)]
senders: list[asyncio.Task[Any]] = [
asyncio.create_task(send_msg(num_messages, bcast.new_sender()))
for bcast in channels
]
Expand Down Expand Up @@ -103,7 +104,7 @@ async def benchmark_single_task_broadcast(
Returns:
int: Total number of messages received by all receivers.
"""
channels: List[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)]
channels: list[Broadcast[int]] = [Broadcast("meter") for _ in range(num_channels)]
senders = [b.new_sender() for b in channels]
recv_tracker = 0

Expand All @@ -122,7 +123,7 @@ async def benchmark_single_task_broadcast(
return recv_tracker


def time_async_task(task: Coroutine[Any, Any, int]) -> Tuple[float, Any]:
def time_async_task(task: Coroutine[Any, Any, int]) -> tuple[float, Any]:
"""Run a task and return the time taken and the result.

Args:
Expand All @@ -144,7 +145,7 @@ def run_one(
num_receivers: int,
tasks_used: str,
interval_between_messages: float,
) -> Dict[str, Any]:
) -> dict[str, Any]:
"""Run a single benchmark."""
runtime, total_msgs = time_async_task(
benchmark_method(num_channels, num_messages, num_receivers)
Expand Down
1 change: 0 additions & 1 deletion src/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
This plugin extracts these code examples and validates them using pylint.
"""

from __future__ import annotations

import ast
import os
Expand Down
4 changes: 2 additions & 2 deletions src/frequenz/channels/_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from asyncio import Condition
from collections import deque
from typing import Deque, Generic, Type
from typing import Deque, Generic

from ._base_classes import Receiver as BaseReceiver
from ._base_classes import Sender as BaseSender
Expand Down Expand Up @@ -169,7 +169,7 @@ def __init__(self, chan: Anycast[T]) -> None:
chan: A reference to the channel that this receiver belongs to.
"""
self._chan = chan
self._next: T | Type[_Empty] = _Empty
self._next: T | type[_Empty] = _Empty

async def ready(self) -> bool:
"""Wait until the receiver is ready with a value or an error.
Expand Down
5 changes: 3 additions & 2 deletions src/frequenz/channels/_base_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Callable, Generic, Optional, TypeVar
from collections.abc import Callable
from typing import Generic, TypeVar

from ._exceptions import ReceiverStoppedError

Expand Down Expand Up @@ -145,7 +146,7 @@ class Peekable(ABC, Generic[T]):
"""

@abstractmethod
def peek(self) -> Optional[T]:
def peek(self) -> T | None:
"""Return the latest value that was sent to the channel.

Returns:
Expand Down
12 changes: 5 additions & 7 deletions src/frequenz/channels/_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import weakref
from asyncio import Condition
from collections import deque
from typing import Deque, Dict, Generic, Optional
from typing import Deque, Generic
from uuid import UUID, uuid4

from ._base_classes import Peekable as BasePeekable
Expand Down Expand Up @@ -88,9 +88,9 @@ def __init__(self, name: str, resend_latest: bool = False) -> None:
self._resend_latest = resend_latest

self.recv_cv: Condition = Condition()
self.receivers: Dict[UUID, weakref.ReferenceType[Receiver[T]]] = {}
self.receivers: dict[UUID, weakref.ReferenceType[Receiver[T]]] = {}
self.closed: bool = False
self._latest: Optional[T] = None
self._latest: T | None = None

async def close(self) -> None:
"""Close the Broadcast channel.
Expand All @@ -116,9 +116,7 @@ def new_sender(self) -> Sender[T]:
"""
return Sender(self)

def new_receiver(
self, name: Optional[str] = None, maxsize: int = 50
) -> Receiver[T]:
def new_receiver(self, name: str | None = None, maxsize: int = 50) -> Receiver[T]:
"""Create a new broadcast receiver.

Broadcast receivers have their own buffer, and when messages are not
Expand Down Expand Up @@ -346,7 +344,7 @@ def __init__(self, chan: Broadcast[T]) -> None:
"""
self._chan = chan

def peek(self) -> Optional[T]:
def peek(self) -> T | None:
"""Return the latest value that was sent to the channel.

Returns:
Expand Down
4 changes: 2 additions & 2 deletions src/frequenz/channels/util/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

import asyncio
from collections import deque
from typing import Any, Deque, Set
from typing import Any, Deque

from .._base_classes import Receiver, T
from .._exceptions import ReceiverStoppedError
Expand Down Expand Up @@ -44,7 +44,7 @@ def __init__(self, *args: Receiver[T]) -> None:
*args: sequence of channel receivers.
"""
self._receivers = {str(id): recv for id, recv in enumerate(args)}
self._pending: Set[asyncio.Task[Any]] = {
self._pending: set[asyncio.Task[Any]] = {
asyncio.create_task(recv.__anext__(), name=name)
for name, recv in self._receivers.items()
}
Expand Down
10 changes: 5 additions & 5 deletions src/frequenz/channels/util/_merge_named.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@

import asyncio
from collections import deque
from typing import Any, Deque, Set, Tuple
from typing import Any, Deque

from .._base_classes import Receiver, T
from .._exceptions import ReceiverStoppedError


class MergeNamed(Receiver[Tuple[str, T]]):
class MergeNamed(Receiver[tuple[str, T]]):
"""Merge messages coming from multiple named channels into a single stream.

When `MergeNamed` is no longer needed, then it should be stopped using
Expand All @@ -25,11 +25,11 @@ def __init__(self, **kwargs: Receiver[T]) -> None:
**kwargs: sequence of channel receivers.
"""
self._receivers = kwargs
self._pending: Set[asyncio.Task[Any]] = {
self._pending: set[asyncio.Task[Any]] = {
asyncio.create_task(recv.__anext__(), name=name)
for name, recv in self._receivers.items()
}
self._results: Deque[Tuple[str, T]] = deque(maxlen=len(self._receivers))
self._results: Deque[tuple[str, T]] = deque(maxlen=len(self._receivers))

def __del__(self) -> None:
"""Cleanup any pending tasks."""
Expand Down Expand Up @@ -81,7 +81,7 @@ async def ready(self) -> bool:
asyncio.create_task(self._receivers[name].__anext__(), name=name)
)

def consume(self) -> Tuple[str, T]:
def consume(self) -> tuple[str, T]:
"""Return the latest value once `ready` is complete.

Returns:
Expand Down
3 changes: 2 additions & 1 deletion src/frequenz/channels/util/_select.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"""

import asyncio
from typing import Any, AsyncIterator, Generic, TypeGuard, TypeVar
from collections.abc import AsyncIterator
from typing import Any, Generic, TypeGuard, TypeVar

from .._base_classes import Receiver
from .._exceptions import ReceiverStoppedError
Expand Down
1 change: 0 additions & 1 deletion tests/test_anycast.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

"""Tests for the Channel implementation."""

from __future__ import annotations

import asyncio

Expand Down
4 changes: 1 addition & 3 deletions tests/test_broadcast.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@

"""Tests for the Broadcast implementation."""

from __future__ import annotations

import asyncio
from typing import Tuple

import pytest

Expand Down Expand Up @@ -117,7 +115,7 @@ async def test_broadcast_overflow() -> None:
big_receiver = bcast.new_receiver("named-recv", big_recv_size)
small_receiver = bcast.new_receiver(None, small_recv_size)

async def drain_receivers() -> Tuple[int, int]:
async def drain_receivers() -> tuple[int, int]:
big_sum = 0
small_sum = 0
while len(big_receiver) > 0:
Expand Down
5 changes: 2 additions & 3 deletions tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"""Tests for the Merge implementation."""

import asyncio
from typing import List

from frequenz.channels import Anycast, Sender
from frequenz.channels.util import Merge
Expand All @@ -26,7 +25,7 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None:
senders = asyncio.create_task(send(chan1.new_sender(), chan2.new_sender()))

merge = Merge(chan1.new_receiver(), chan2.new_receiver())
results: List[int] = []
results: list[int] = []
async for item in merge:
results.append(item)
await senders
Expand All @@ -37,5 +36,5 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None:
# order, where N is the number of channels. This only works in this
# example because the `send` method sends values in immeidate
# succession.
assert set((results[idx : idx + 2])) == {ctr + 1, ctr + 101}
assert set(results[idx : idx + 2]) == {ctr + 1, ctr + 101}
assert results[-1] == 1000
5 changes: 2 additions & 3 deletions tests/test_mergenamed.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"""Tests for the MergeNamed implementation."""

import asyncio
from typing import List, Tuple

from frequenz.channels import Anycast, Sender
from frequenz.channels.util import MergeNamed
Expand All @@ -27,7 +26,7 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None:
recvs = {"chan1": chan1.new_receiver(), "chan2": chan2.new_receiver()}

merge = MergeNamed(**recvs)
results: List[Tuple[str, int]] = []
results: list[tuple[str, int]] = []
async for item in merge:
results.append(item)
await senders
Expand All @@ -38,7 +37,7 @@ async def send(ch1: Sender[int], ch2: Sender[int]) -> None:
# order, where N is the number of channels. This only works in this
# example because the `send` method sends values in immeidate
# succession.
assert set((results[idx : idx + 2])) == {
assert set(results[idx : idx + 2]) == {
("chan1", ctr + 1),
("chan2", ctr + 101),
}
Expand Down
1 change: 0 additions & 1 deletion tests/utils/test_file_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

"""Tests for `channel.FileWatcher`."""

from __future__ import annotations

import pathlib
from collections.abc import AsyncGenerator, Iterator, Sequence
Expand Down
1 change: 0 additions & 1 deletion tests/utils/test_timer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

"""Tests for the timer."""

from __future__ import annotations

import asyncio
import enum
Expand Down