diff --git a/Makefile b/Makefile index 9882c87a..36714cbf 100644 --- a/Makefile +++ b/Makefile @@ -15,6 +15,8 @@ FORMATTED_AREAS=\ aiokafka/record/ \ tests/test_codec.py \ tests/test_helpers.py \ + tests/test_protocol.py \ + tests/test_protocol_object_conversion.py \ tests/record/ .PHONY: setup diff --git a/aiokafka/protocol/fetch.py b/aiokafka/protocol/fetch.py index c63256d7..8f93754a 100644 --- a/aiokafka/protocol/fetch.py +++ b/aiokafka/protocol/fetch.py @@ -1,3 +1,5 @@ +from typing import List, Optional, Tuple + from .api import Request, Response from .types import Array, Bytes, Int8, Int16, Int32, Int64, Schema, String @@ -23,6 +25,8 @@ class FetchResponse_v0(Response): ) ) + topics: Optional[List[Tuple[str, List[Tuple[int, int, int, bytes]]]]] + class FetchResponse_v1(Response): API_KEY = 1 @@ -235,6 +239,8 @@ class FetchRequest_v0(Request): ), ) + min_bytes: Optional[int] + class FetchRequest_v1(Request): API_KEY = 1 diff --git a/tests/test_protocol.py b/tests/test_protocol.py index 1d81aea5..f023325f 100644 --- a/tests/test_protocol.py +++ b/tests/test_protocol.py @@ -1,13 +1,14 @@ import io import struct +from typing import Type import pytest from aiokafka.protocol.api import Request, RequestHeader_v0, Response -from aiokafka.protocol.commit import GroupCoordinatorRequest -from aiokafka.protocol.fetch import FetchRequest, FetchResponse +from aiokafka.protocol.commit import GroupCoordinatorRequest_v0 +from aiokafka.protocol.fetch import FetchRequest_v0, FetchResponse_v0 from aiokafka.protocol.message import Message, MessageSet, PartialMessage -from aiokafka.protocol.metadata import MetadataRequest +from aiokafka.protocol.metadata import MetadataRequest_v0 from aiokafka.protocol.types import ( CompactArray, CompactBytes, @@ -20,7 +21,7 @@ ) -def test_create_message(): +def test_create_message() -> None: payload = b"test" key = b"key" msg = Message(value=payload, key=key, magic=0, attributes=0, crc=0) @@ -30,7 +31,7 @@ def test_create_message(): assert msg.value == payload -def test_encode_message_v0(): +def test_encode_message_v0() -> None: message = Message(value=b"test", key=b"key", magic=0, attributes=0, crc=0) encoded = message.encode() expect = b"".join( @@ -46,7 +47,7 @@ def test_encode_message_v0(): assert encoded == expect -def test_encode_message_v1(): +def test_encode_message_v1() -> None: message = Message( value=b"test", key=b"key", magic=1, attributes=0, crc=0, timestamp=1234 ) @@ -65,7 +66,7 @@ def test_encode_message_v1(): assert encoded == expect -def test_decode_message(): +def test_decode_message() -> None: encoded = b"".join( [ struct.pack(">i", -1427009701), # CRC @@ -82,7 +83,7 @@ def test_decode_message(): assert decoded_message == msg -def test_decode_message_validate_crc(): +def test_decode_message_validate_crc() -> None: encoded = b"".join( [ struct.pack(">i", -1427009701), # CRC @@ -110,7 +111,7 @@ def test_decode_message_validate_crc(): assert decoded_message.validate_crc() is False -def test_encode_message_set(): +def test_encode_message_set() -> None: messages = [ Message(value=b"v1", key=b"k1", magic=0, attributes=0, crc=0), Message(value=b"v2", key=b"k2", magic=0, attributes=0, crc=0), @@ -140,7 +141,7 @@ def test_encode_message_set(): assert encoded == expect -def test_decode_message_set(): +def test_decode_message_set() -> None: encoded = b"".join( [ struct.pack(">q", 0), # MsgSet Offset @@ -180,7 +181,7 @@ def test_decode_message_set(): assert decoded_message2 == message2 -def test_encode_message_header(): +def test_encode_message_header() -> None: expect = b"".join( [ struct.pack(">h", 10), # API Key @@ -191,12 +192,12 @@ def test_encode_message_header(): ] ) - req = GroupCoordinatorRequest[0]("foo") + req = GroupCoordinatorRequest_v0("foo") header = RequestHeader_v0(req, correlation_id=4, client_id="client3") assert header.encode() == expect -def test_decode_message_set_partial(): +def test_decode_message_set_partial() -> None: encoded = b"".join( [ struct.pack(">q", 0), # Msg Offset @@ -235,7 +236,7 @@ def test_decode_message_set_partial(): assert decoded_message2 == PartialMessage() -def test_decode_fetch_response_partial(): +def test_decode_fetch_response_partial() -> None: encoded = b"".join( [ Int32.encode(1), # Num Topics (Array) @@ -283,7 +284,8 @@ def test_decode_fetch_response_partial(): b"ar", # Value (truncated) ] ) - resp = FetchResponse[0].decode(io.BytesIO(encoded)) + resp = FetchResponse_v0.decode(io.BytesIO(encoded)) + assert resp.topics is not None assert len(resp.topics) == 1 topic, partitions = resp.topics[0] assert topic == "foobar" @@ -294,18 +296,18 @@ def test_decode_fetch_response_partial(): assert m1[1] == (None, None, PartialMessage()) -def test_struct_unrecognized_kwargs(): +def test_struct_unrecognized_kwargs() -> None: # Structs should not allow unrecognized kwargs with pytest.raises(ValueError): - MetadataRequest[0](topicz="foo") + MetadataRequest_v0(topicz="foo") -def test_struct_missing_kwargs(): - fr = FetchRequest[0](max_wait_time=100) +def test_struct_missing_kwargs() -> None: + fr = FetchRequest_v0(max_wait_time=100) assert fr.min_bytes is None -def test_unsigned_varint_serde(): +def test_unsigned_varint_serde() -> None: pairs = { 0: [0], -1: [0xFF, 0xFF, 0xFF, 0xFF, 0x0F], @@ -326,7 +328,7 @@ def test_unsigned_varint_serde(): assert value == UnsignedVarInt32.decode(io.BytesIO(encoded)) -def test_compact_data_structs(): +def test_compact_data_structs() -> None: cs = CompactString() encoded = cs.encode(None) assert encoded == struct.pack("B", 0) @@ -366,7 +368,7 @@ def test_compact_data_structs(): @pytest.mark.parametrize("klass", Request.__subclasses__()) @pytest.mark.parametrize("attr_name", attr_names) -def test_request_type_conformance(klass, attr_name): +def test_request_type_conformance(klass: Type[Request], attr_name: str) -> None: assert hasattr(klass, attr_name) @@ -380,5 +382,5 @@ def test_request_type_conformance(klass, attr_name): @pytest.mark.parametrize("klass", Response.__subclasses__()) @pytest.mark.parametrize("attr_name", attr_names) -def test_response_type_conformance(klass, attr_name): +def test_response_type_conformance(klass: Type[Response], attr_name: str) -> None: assert hasattr(klass, attr_name) diff --git a/tests/test_protocol_object_conversion.py b/tests/test_protocol_object_conversion.py index 0529ccb7..e9987366 100644 --- a/tests/test_protocol_object_conversion.py +++ b/tests/test_protocol_object_conversion.py @@ -1,67 +1,84 @@ +from typing import Type, TypeVar, Union + import pytest from aiokafka.protocol.admin import Request, Response from aiokafka.protocol.types import Array, Int16, Schema, String +C = TypeVar("C", bound=Type[Union[Request, Response]]) -@pytest.mark.parametrize("superclass", (Request, Response)) -class TestObjectConversion: - def test_get_item(self, superclass): - class TestClass(superclass): + +def _make_test_class( + klass: Type[Union[Request, Response]], schema: Schema +) -> Type[Union[Request, Response]]: + if klass is Request: + + class RequestTestClass(Request): + API_KEY = 0 + API_VERSION = 0 + RESPONSE_TYPE = Response + SCHEMA = schema + + return RequestTestClass + else: + + class ResponseTestClass(Response): API_KEY = 0 API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC - SCHEMA = Schema(("myobject", Int16)) + SCHEMA = schema + + return ResponseTestClass + + +@pytest.mark.parametrize("superclass", (Request, Response)) +class TestObjectConversion: + def test_get_item(self, superclass: Type[Union[Request, Response]]) -> None: + TestClass = _make_test_class(superclass, Schema(("myobject", Int16))) tc = TestClass(myobject=0) assert tc.get_item("myobject") == 0 with pytest.raises(KeyError): tc.get_item("does-not-exist") - def test_with_empty_schema(self, superclass): - class TestClass(superclass): - API_KEY = 0 - API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC - SCHEMA = Schema() + def test_with_empty_schema( + self, superclass: Type[Union[Request, Response]] + ) -> None: + TestClass = _make_test_class(superclass, Schema()) tc = TestClass() tc.encode() assert tc.to_object() == {} - def test_with_basic_schema(self, superclass): - class TestClass(superclass): - API_KEY = 0 - API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC - SCHEMA = Schema(("myobject", Int16)) + def test_with_basic_schema( + self, superclass: Type[Union[Request, Response]] + ) -> None: + TestClass = _make_test_class(superclass, Schema(("myobject", Int16))) tc = TestClass(myobject=0) tc.encode() assert tc.to_object() == {"myobject": 0} - def test_with_basic_array_schema(self, superclass): - class TestClass(superclass): - API_KEY = 0 - API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC - SCHEMA = Schema(("myarray", Array(Int16))) + def test_with_basic_array_schema( + self, superclass: Type[Union[Request, Response]] + ) -> None: + TestClass = _make_test_class(superclass, Schema(("myarray", Array(Int16)))) tc = TestClass(myarray=[1, 2, 3]) tc.encode() assert tc.to_object()["myarray"] == [1, 2, 3] - def test_with_complex_array_schema(self, superclass): - class TestClass(superclass): - API_KEY = 0 - API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC - SCHEMA = Schema( + def test_with_complex_array_schema( + self, superclass: Type[Union[Request, Response]] + ) -> None: + TestClass = _make_test_class( + superclass, + Schema( ( "myarray", Array(("subobject", Int16), ("othersubobject", String("utf-8"))), ) - ) + ), + ) tc = TestClass(myarray=[[10, "hello"]]) tc.encode() @@ -70,18 +87,19 @@ class TestClass(superclass): assert obj["myarray"][0]["subobject"] == 10 assert obj["myarray"][0]["othersubobject"] == "hello" - def test_with_array_and_other(self, superclass): - class TestClass(superclass): - API_KEY = 0 - API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC - SCHEMA = Schema( + def test_with_array_and_other( + self, superclass: Type[Union[Request, Response]] + ) -> None: + TestClass = _make_test_class( + superclass, + Schema( ( "myarray", Array(("subobject", Int16), ("othersubobject", String("utf-8"))), ), ("notarray", Int16), - ) + ), + ) tc = TestClass(myarray=[[10, "hello"]], notarray=42) @@ -91,14 +109,18 @@ class TestClass(superclass): assert obj["myarray"][0]["othersubobject"] == "hello" assert obj["notarray"] == 42 - def test_with_nested_array(self, superclass): - class TestClass(superclass): - API_KEY = 0 - API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC - SCHEMA = Schema( - ("myarray", Array(("subarray", Array(Int16)), ("otherobject", Int16))) - ) + def test_with_nested_array( + self, superclass: Type[Union[Request, Response]] + ) -> None: + TestClass = _make_test_class( + superclass, + Schema( + ( + "myarray", + Array(("subarray", Array(Int16)), ("otherobject", Int16)), + ) + ), + ) tc = TestClass( myarray=[ @@ -115,12 +137,12 @@ class TestClass(superclass): assert obj["myarray"][1]["subarray"] == [2, 3] assert obj["myarray"][1]["otherobject"] == 4 - def test_with_complex_nested_array(self, superclass): - class TestClass(superclass): - API_KEY = 0 - API_VERSION = 0 - RESPONSE_TYPE = None # To satisfy the Request ABC - SCHEMA = Schema( + def test_with_complex_nested_array( + self, superclass: Type[Union[Request, Response]] + ) -> None: + TestClass = _make_test_class( + superclass, + Schema( ( "myarray", Array( @@ -135,7 +157,8 @@ class TestClass(superclass): ), ), ("notarray", String("utf-8")), - ) + ), + ) tc = TestClass( myarray=[ @@ -165,7 +188,7 @@ class TestClass(superclass): assert myarray[1]["subarray"][0]["otherinnertest"] == "hello again" -def test_with_metadata_response(): +def test_with_metadata_response() -> None: from aiokafka.protocol.metadata import MetadataResponse_v5 tc = MetadataResponse_v5(