Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: add type hints to protocol #997

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions aiokafka/protocol/abstract.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import abc
import io
from typing import Generic, Optional, TypeVar

from typing_extensions import TypeAlias

class AbstractType(metaclass=abc.ABCMeta):
T = TypeVar("T")
BytesIO: TypeAlias = io.BytesIO


class AbstractType(Generic[T], metaclass=abc.ABCMeta):
@classmethod
@abc.abstractmethod
def encode(cls, value): ...
def encode(self, value: Optional[T]) -> bytes: ...
Comment on lines 12 to +14
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why self if it's declared as classmethod?


@classmethod
@abc.abstractmethod
def decode(cls, data): ...
def decode(self, data: BytesIO) -> Optional[T]: ...

@classmethod
def repr(cls, value):
def repr(self, value: T) -> str:
return repr(value)
19 changes: 14 additions & 5 deletions aiokafka/protocol/message.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import io
import time
from binascii import crc32
from typing import Optional

from aiokafka.codec import (
gzip_decode,
Expand Down Expand Up @@ -47,7 +48,15 @@ class Message(Struct):
22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
)

def __init__(self, value, key=None, magic=0, attributes=0, crc=0, timestamp=None):
def __init__(
self,
value: Optional[bytes],
key: Optional[bytes] = None,
magic: int = 0,
attributes: int = 0,
crc: int = 0,
timestamp: Optional[int] = None,
):
assert value is None or isinstance(value, bytes), "value must be bytes"
assert key is None or isinstance(key, bytes), "key must be bytes"
assert magic > 0 or timestamp is None, "timestamp not supported in v0"
Expand All @@ -64,7 +73,7 @@ def __init__(self, value, key=None, magic=0, attributes=0, crc=0, timestamp=None
self.value = value

@property
def timestamp_type(self):
def timestamp_type(self) -> Optional[int]:
"""0 for CreateTime; 1 for LogAppendTime; None if unsupported.

Value is determined by broker; produced messages should always set to 0
Expand All @@ -77,7 +86,7 @@ def timestamp_type(self):
else:
return 0

def encode(self, recalc_crc=True):
def encode(self, recalc_crc: bool = True):
version = self.magic
if version == 1:
fields = (
Expand Down Expand Up @@ -125,15 +134,15 @@ def decode(cls, data):
msg._validated_crc = _validated_crc
return msg

def validate_crc(self):
def validate_crc(self) -> bool:
if self._validated_crc is None:
raw_msg = self.encode(recalc_crc=False)
self._validated_crc = crc32(raw_msg[4:])
if self.crc == self._validated_crc:
return True
return False

def is_compressed(self):
def is_compressed(self) -> bool:
return self.attributes & self.CODEC_MASK != 0

def decompress(self):
Expand Down
Loading