Skip to content

Commit

Permalink
Type annotations for errors (#988)
Browse files Browse the repository at this point in the history
  • Loading branch information
ods authored Mar 16, 2024
1 parent ef4c318 commit bd62ee0
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 23 deletions.
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@ SCALA_VERSION?=2.13
KAFKA_VERSION?=2.8.1
DOCKER_IMAGE=aiolibs/kafka:$(SCALA_VERSION)_$(KAFKA_VERSION)
DIFF_BRANCH=origin/master
FORMATTED_AREAS=aiokafka/util.py aiokafka/structs.py aiokafka/codec.py tests/test_codec.py
FORMATTED_AREAS=\
aiokafka/codec.py \
aiokafka/errors.py \
aiokafka/structs.py \
aiokafka/util.py \
tests/test_codec.py

.PHONY: setup
setup:
Expand Down
44 changes: 22 additions & 22 deletions aiokafka/errors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import inspect
import sys
from typing import Any, Iterable, Type, TypeVar

__all__ = [
# aiokafka custom errors
Expand Down Expand Up @@ -83,7 +82,7 @@ class KafkaError(RuntimeError):
# whether metadata should be refreshed on error
invalid_metadata = False

def __str__(self):
def __str__(self) -> str:
if not self.args:
return self.__class__.__name__
return f"{self.__class__.__name__}: {super().__str__()}"
Expand Down Expand Up @@ -140,7 +139,7 @@ class IncompatibleBrokerVersion(KafkaError):


class CommitFailedError(KafkaError):
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(
"""Commit cannot be completed since the group has already
rebalanced and assigned the partitions to another member.
Expand Down Expand Up @@ -223,19 +222,21 @@ class ProducerFenced(KafkaError):

def __init__(
self,
msg="There is a newer producer using the same transactional_id or"
"transaction timeout occurred (check that processing time is "
"below transaction_timeout_ms)",
):
msg: str = (
"There is a newer producer using the same transactional_id or"
"transaction timeout occurred (check that processing time is "
"below transaction_timeout_ms)"
),
) -> None:
super().__init__(msg)


class BrokerResponseError(KafkaError):
errno = None
message = None
description = None
errno: int
message: str
description: str = ""

def __str__(self):
def __str__(self) -> str:
"""Add errno to standard KafkaError str"""
return f"[Error {self.errno}] {super().__str__()}"

Expand Down Expand Up @@ -859,18 +860,17 @@ class MemberIdRequired(BrokerResponseError):
)


def _iter_broker_errors():
for _, obj in inspect.getmembers(sys.modules[__name__]):
if (
inspect.isclass(obj)
and issubclass(obj, BrokerResponseError)
and obj != BrokerResponseError
):
yield obj
_T = TypeVar("_T", bound=type)


kafka_errors = {x.errno: x for x in _iter_broker_errors()}
def _iter_subclasses(cls: _T) -> Iterable[_T]:
for subclass in cls.__subclasses__():
yield subclass
yield from _iter_subclasses(subclass)


def for_code(error_code):
kafka_errors = {x.errno: x for x in _iter_subclasses(BrokerResponseError)}


def for_code(error_code: int) -> Type[BrokerResponseError]:
return kafka_errors.get(error_code, UnknownError)

0 comments on commit bd62ee0

Please sign in to comment.