From 82571dec7699d896f9e1f01f0cf192057fd54ba2 Mon Sep 17 00:00:00 2001 From: Daniele Palaia Date: Wed, 22 May 2024 10:19:32 +0200 Subject: [PATCH] fixing tests/ isort and black checker --- poetry.lock | 4 ++-- pyproject.toml | 2 ++ rstream/__init__.py | 2 +- rstream/_pyamqp/_decode.py | 5 +---- rstream/_pyamqp/_encode.py | 3 --- rstream/_pyamqp/message.py | 11 +++++++---- rstream/_pyamqp/performatives.py | 5 +++++ rstream/amqp.py | 14 +++++++------- setup.cfg | 2 +- tests/test_amqp.py | 12 ++++++------ tests/test_consumer.py | 22 +++++++++++----------- tests/test_producer.py | 12 ++++-------- 12 files changed, 47 insertions(+), 47 deletions(-) diff --git a/poetry.lock b/poetry.lock index ca0fa2f..40d7806 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "appnope" @@ -805,4 +805,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "bd3ce85ab4e4fb564a1b55cd350fd909261649aba9e091c424ba7e28b20ecc0e" +content-hash = "057ddac490c0ea7b41d2a9eb42763c64831f4a87043bb2431a5f2856815a75a0" diff --git a/pyproject.toml b/pyproject.toml index ed2de86..6cf8b86 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,11 +23,13 @@ pytest-asyncio = "^0.15.1" black = "^23.12.1" requests = "^2.31.0" mmh3 = "^4.0.0" +typing_extensions ="^4.11.0" [tool.poetry.group.dev.dependencies] pytest = "^7.4.0" requests = "^2.31.0" types-requests = "^2.31.0.20240406" +typing_extensions ="^4.11.0" [tool.black] line-length = 110 diff --git a/rstream/__init__.py b/rstream/__init__.py index ccb3ad5..c8a221c 100644 --- a/rstream/__init__.py +++ b/rstream/__init__.py @@ -19,7 +19,7 @@ del metadata from .amqp import AMQPMessage, amqp_decoder # noqa: E402 -from ._pyamqp.message import Properties +from ._pyamqp.message import Properties # noqa: E402 from .compression import CompressionType # noqa: E402 from .constants import ( # noqa: E402 ConsumerOffsetSpecification, diff --git a/rstream/_pyamqp/_decode.py b/rstream/_pyamqp/_decode.py index 4a6d071..e0bdfad 100644 --- a/rstream/_pyamqp/_decode.py +++ b/rstream/_pyamqp/_decode.py @@ -243,10 +243,7 @@ def decode_payload(buffer: memoryview) -> Message: elif descriptor == 116: message["application_properties"] = value elif descriptor == 117: - #try: - # cast(List, message["data"]).append(value) - #except KeyError: - message["data"] = value + message["body"] = value elif descriptor == 118: try: cast(List, message["sequence"]).append(value) diff --git a/rstream/_pyamqp/_encode.py b/rstream/_pyamqp/_encode.py index 4e1bdc1..e8fe614 100644 --- a/rstream/_pyamqp/_encode.py +++ b/rstream/_pyamqp/_encode.py @@ -46,8 +46,6 @@ from typing_extensions import TypeAlias - - if TYPE_CHECKING: from .message import Header, Properties @@ -934,7 +932,6 @@ def encode_payload(output: bytearray, payload: Message) -> bytes: ) if payload[5]: # data - #for item_value in payload[5]: encode_value( output, { diff --git a/rstream/_pyamqp/message.py b/rstream/_pyamqp/message.py index 1c27e02..0189908 100644 --- a/rstream/_pyamqp/message.py +++ b/rstream/_pyamqp/message.py @@ -4,7 +4,10 @@ # license information. # -------------------------------------------------------------------------- -from dataclasses import dataclass +""" +isort:skip_file +""" + # TODO: fix mypy errors for _code/_definition/__defaults__ (issue #26500) from typing import ( TYPE_CHECKING, @@ -35,7 +38,7 @@ class MessageDict(TypedDict): # needed for use with spread operator message_annotations: Optional[Dict[Union[str, bytes], Any]] properties: Optional["Properties"] application_properties: Optional[Dict[Union[str, bytes], Any]] - data: Optional[bytes] + body: Optional[bytes] sequence: Optional[List[Any]] value: Optional[Any] footer: Optional[Dict[Any, Any]] @@ -201,7 +204,7 @@ class Message(NamedTuple): message_annotations: Optional[Dict[Union[str, bytes], Any]] = None properties: Optional[Properties] = None application_properties: Optional[Dict[Union[str, bytes], Any]] = None - data: Optional[bytes] = None + body: Optional[bytes] = None sequence: Optional[List[Any]] = None value: Optional[Any] = None footer: Optional[Dict[Any, Any]] = None @@ -214,7 +217,7 @@ class Message(NamedTuple): (0x00000072, FIELD("message_annotations", FieldDefinition.annotations, False, None, False)), (0x00000073, FIELD("properties", Properties, False, None, False)), (0x00000074, FIELD("application_properties", AMQPTypes.map, False, None, False)), - (0x00000075, FIELD("data", AMQPTypes.binary, False, None, True)), + (0x00000075, FIELD("body", AMQPTypes.binary, False, None, True)), (0x00000076, FIELD("sequence", AMQPTypes.list, False, None, False)), (0x00000077, FIELD("value", None, False, None, False)), (0x00000078, FIELD("footer", FieldDefinition.annotations, False, None, False)), diff --git a/rstream/_pyamqp/performatives.py b/rstream/_pyamqp/performatives.py index bc050b3..431a56f 100644 --- a/rstream/_pyamqp/performatives.py +++ b/rstream/_pyamqp/performatives.py @@ -4,7 +4,12 @@ # license information. # -------------------------------------------------------------------------- +""" +isort:skip_file +""" + import sys + # TODO: fix mypy errors for _code/_definition/__defaults__ (issue #26500) from collections import namedtuple from typing import NamedTuple, Optional diff --git a/rstream/amqp.py b/rstream/amqp.py index 8af7618..4ef5ca6 100644 --- a/rstream/amqp.py +++ b/rstream/amqp.py @@ -1,12 +1,12 @@ from __future__ import annotations -from typing import Any, Optional, Protocol, cast +from typing import Any, Optional, Protocol -# import uamqp - -from ._pyamqp.message import Message -from ._pyamqp._encode import encode_payload from ._pyamqp._decode import decode_payload +from ._pyamqp._encode import encode_payload +from ._pyamqp.message import Message + +# import uamqp class _MessageProtocol(Protocol): @@ -27,7 +27,7 @@ def __bytes__(self) -> bytes: return bytes(ret) def __str__(self) -> str: - return str(self.data) + return str(self.body) def amqp_decoder(data: bytes) -> AMQPMessage: @@ -41,7 +41,7 @@ def amqp_decoder(data: bytes) -> AMQPMessage: header=message.header, delivery_annotations=message.delivery_annotations, sequence=message.sequence, - data=message.data, + body=message.body, ) return returned_amqp_message diff --git a/setup.cfg b/setup.cfg index eddc97d..245fc16 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,7 +7,7 @@ skip = venv [flake8] exclude = .git, venv -max-line-length = 120 +max-line-length = 150 [mypy] python_version = 3.9 diff --git a/tests/test_amqp.py b/tests/test_amqp.py index 314c6c8..7bddc53 100644 --- a/tests/test_amqp.py +++ b/tests/test_amqp.py @@ -2,13 +2,13 @@ # SPDX-License-Identifier: MIT import pytest -import uamqp.message from rstream import ( AMQPMessage, Consumer, MessageContext, Producer, + Properties, amqp_decoder, ) @@ -17,9 +17,9 @@ async def test_amqp_message(stream: str, consumer: Consumer, producer: Producer) -> None: amqp_message = AMQPMessage( - properties=uamqp.message.MessageProperties(subject=b"test-subject"), - annotations={b"test": 42}, - body="test-body", + properties=Properties(subject=b"test-subject"), + message_annotations={b"test": 42}, + body=b"test-body", ) await producer.send_wait(stream, amqp_message) @@ -34,6 +34,6 @@ def callback(msg: AMQPMessage, message_context: MessageContext): await consumer.run() assert isinstance(incoming_amqp_message, AMQPMessage) - assert list(incoming_amqp_message.get_data()) == list(amqp_message.get_data()) + assert list(incoming_amqp_message.body) == list(amqp_message.body) assert incoming_amqp_message.properties.subject == amqp_message.properties.subject - assert incoming_amqp_message.annotations == amqp_message.annotations + assert incoming_amqp_message.message_annotations == amqp_message.message_annotations diff --git a/tests/test_consumer.py b/tests/test_consumer.py index e7e3793..d31fe3d 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -7,7 +7,6 @@ from functools import partial import pytest -import uamqp from rstream import ( AMQPMessage, @@ -18,6 +17,7 @@ OffsetType, OnClosedErrorInfo, Producer, + Properties, RouteType, SuperStreamConsumer, SuperStreamProducer, @@ -345,8 +345,8 @@ async def test_consume_superstream_with_sac_all_active( for i in range(10000): amqp_message = AMQPMessage( - body="a:{}".format(i), - properties=uamqp.message.MessageProperties(message_id=i), + body=bytes("a:{}".format(i), "utf-8"), + properties=Properties(message_id=str(i)), ) await super_stream_producer_for_sac.send(amqp_message) @@ -390,8 +390,8 @@ async def test_consume_superstream_with_sac_one_non_active( for i in range(10000): amqp_message = AMQPMessage( - body="a:{}".format(i), - properties=uamqp.message.MessageProperties(message_id=i), + body=bytes("a:{}".format(i), "utf-8"), + properties=Properties(message_id=str(i)), ) await super_stream_producer_for_sac.send(amqp_message) @@ -437,8 +437,8 @@ async def test_consume_superstream_with_callback_next( for i in range(10000): amqp_message = AMQPMessage( - body="a:{}".format(i), - properties=uamqp.message.MessageProperties(message_id=i), + body=bytes("a:{}".format(i), "utf-8"), + properties=Properties(message_id=str(i)), ) await super_stream_producer_for_sac.send(amqp_message) @@ -479,8 +479,8 @@ async def test_consume_superstream_with_callback_first( for i in range(10000): amqp_message = AMQPMessage( - body="a:{}".format(i), - properties=uamqp.message.MessageProperties(message_id=i), + body=bytes("a:{}".format(i), "utf-8"), + properties=Properties(message_id=str(i)), ) await super_stream_producer_for_sac.send(amqp_message) @@ -521,8 +521,8 @@ async def test_consume_superstream_with_callback_offset( for i in range(10000): amqp_message = AMQPMessage( - body="a:{}".format(i), - properties=uamqp.message.MessageProperties(message_id=i), + body=bytes("a:{}".format(i), "utf-8"), + properties=Properties(message_id=str(i)), ) await super_stream_producer_for_sac.send(amqp_message) diff --git a/tests/test_producer.py b/tests/test_producer.py index aa6c8d0..17caebd 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -426,7 +426,7 @@ async def test_publishing_sequence_superstream( async def publish_with_ids(*ids): for publishing_id in ids: amqp_message = AMQPMessage( - body="a:{}".format(publishing_id), + body=bytes("a:{}".format(publishing_id), "utf-8"), ) await super_stream_producer.send(amqp_message) @@ -449,9 +449,7 @@ async def test_publishing_sequence_superstream_key_routing( async def publish_with_ids(*ids): for publishing_id in ids: - amqp_message = AMQPMessage( - body="a:{}".format(publishing_id), - ) + amqp_message = AMQPMessage(body=bytes("a:{}".format(publishing_id), "utf-8")) # will send to super_stream with routing key of 'key1' await super_stream_key_routing_producer.send(amqp_message) @@ -486,9 +484,7 @@ async def test_publishing_sequence_superstream_with_callback( async def publish_with_ids(*ids): for publishing_id in ids: - amqp_message = AMQPMessage( - body="a:{}".format(publishing_id), - ) + amqp_message = AMQPMessage(body=bytes("a:{}".format(publishing_id), "utf-8")) await super_stream_producer.send( amqp_message, on_publish_confirm=partial( @@ -569,7 +565,7 @@ async def test_super_stream_producer_connection_broke(super_stream: str, consume count = 0 while True: amqp_message = AMQPMessage( - body="hello: {}".format(count), + body=bytes("hello: {}".format(count), "utf-8"), application_properties={"id": "{}".format(count)}, )