Skip to content

Commit

Permalink
fixing tests/ isort and black checker
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed May 22, 2024
1 parent 35c8246 commit 82571de
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 47 deletions.
4 changes: 2 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ pytest-asyncio = "^0.15.1"
black = "^23.12.1"
requests = "^2.31.0"
mmh3 = "^4.0.0"
typing_extensions ="^4.11.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.0"
requests = "^2.31.0"
types-requests = "^2.31.0.20240406"
typing_extensions ="^4.11.0"

[tool.black]
line-length = 110
Expand Down
2 changes: 1 addition & 1 deletion rstream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
del metadata

from .amqp import AMQPMessage, amqp_decoder # noqa: E402
from ._pyamqp.message import Properties
from ._pyamqp.message import Properties # noqa: E402
from .compression import CompressionType # noqa: E402
from .constants import ( # noqa: E402
ConsumerOffsetSpecification,
Expand Down
5 changes: 1 addition & 4 deletions rstream/_pyamqp/_decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,10 +243,7 @@ def decode_payload(buffer: memoryview) -> Message:
elif descriptor == 116:
message["application_properties"] = value
elif descriptor == 117:
#try:
# cast(List, message["data"]).append(value)
#except KeyError:
message["data"] = value
message["body"] = value
elif descriptor == 118:
try:
cast(List, message["sequence"]).append(value)
Expand Down
3 changes: 0 additions & 3 deletions rstream/_pyamqp/_encode.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@
from typing_extensions import TypeAlias




if TYPE_CHECKING:
from .message import Header, Properties

Expand Down Expand Up @@ -934,7 +932,6 @@ def encode_payload(output: bytearray, payload: Message) -> bytes:
)

if payload[5]: # data
#for item_value in payload[5]:
encode_value(
output,
{
Expand Down
11 changes: 7 additions & 4 deletions rstream/_pyamqp/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
# license information.
# --------------------------------------------------------------------------

from dataclasses import dataclass
"""
isort:skip_file
"""

# TODO: fix mypy errors for _code/_definition/__defaults__ (issue #26500)
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -35,7 +38,7 @@ class MessageDict(TypedDict): # needed for use with spread operator
message_annotations: Optional[Dict[Union[str, bytes], Any]]
properties: Optional["Properties"]
application_properties: Optional[Dict[Union[str, bytes], Any]]
data: Optional[bytes]
body: Optional[bytes]
sequence: Optional[List[Any]]
value: Optional[Any]
footer: Optional[Dict[Any, Any]]
Expand Down Expand Up @@ -201,7 +204,7 @@ class Message(NamedTuple):
message_annotations: Optional[Dict[Union[str, bytes], Any]] = None
properties: Optional[Properties] = None
application_properties: Optional[Dict[Union[str, bytes], Any]] = None
data: Optional[bytes] = None
body: Optional[bytes] = None
sequence: Optional[List[Any]] = None
value: Optional[Any] = None
footer: Optional[Dict[Any, Any]] = None
Expand All @@ -214,7 +217,7 @@ class Message(NamedTuple):
(0x00000072, FIELD("message_annotations", FieldDefinition.annotations, False, None, False)),
(0x00000073, FIELD("properties", Properties, False, None, False)),
(0x00000074, FIELD("application_properties", AMQPTypes.map, False, None, False)),
(0x00000075, FIELD("data", AMQPTypes.binary, False, None, True)),
(0x00000075, FIELD("body", AMQPTypes.binary, False, None, True)),
(0x00000076, FIELD("sequence", AMQPTypes.list, False, None, False)),
(0x00000077, FIELD("value", None, False, None, False)),
(0x00000078, FIELD("footer", FieldDefinition.annotations, False, None, False)),
Expand Down
5 changes: 5 additions & 0 deletions rstream/_pyamqp/performatives.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@
# license information.
# --------------------------------------------------------------------------

"""
isort:skip_file
"""

import sys

# TODO: fix mypy errors for _code/_definition/__defaults__ (issue #26500)
from collections import namedtuple
from typing import NamedTuple, Optional
Expand Down
14 changes: 7 additions & 7 deletions rstream/amqp.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from __future__ import annotations

from typing import Any, Optional, Protocol, cast
from typing import Any, Optional, Protocol

# import uamqp

from ._pyamqp.message import Message
from ._pyamqp._encode import encode_payload
from ._pyamqp._decode import decode_payload
from ._pyamqp._encode import encode_payload
from ._pyamqp.message import Message

# import uamqp


class _MessageProtocol(Protocol):
Expand All @@ -27,7 +27,7 @@ def __bytes__(self) -> bytes:
return bytes(ret)

def __str__(self) -> str:
return str(self.data)
return str(self.body)


def amqp_decoder(data: bytes) -> AMQPMessage:
Expand All @@ -41,7 +41,7 @@ def amqp_decoder(data: bytes) -> AMQPMessage:
header=message.header,
delivery_annotations=message.delivery_annotations,
sequence=message.sequence,
data=message.data,
body=message.body,
)

return returned_amqp_message
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ skip = venv

[flake8]
exclude = .git, venv
max-line-length = 120
max-line-length = 150

[mypy]
python_version = 3.9
Expand Down
12 changes: 6 additions & 6 deletions tests/test_amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
# SPDX-License-Identifier: MIT

import pytest
import uamqp.message

from rstream import (
AMQPMessage,
Consumer,
MessageContext,
Producer,
Properties,
amqp_decoder,
)

Expand All @@ -17,9 +17,9 @@

async def test_amqp_message(stream: str, consumer: Consumer, producer: Producer) -> None:
amqp_message = AMQPMessage(
properties=uamqp.message.MessageProperties(subject=b"test-subject"),
annotations={b"test": 42},
body="test-body",
properties=Properties(subject=b"test-subject"),
message_annotations={b"test": 42},
body=b"test-body",
)
await producer.send_wait(stream, amqp_message)

Expand All @@ -34,6 +34,6 @@ def callback(msg: AMQPMessage, message_context: MessageContext):
await consumer.run()

assert isinstance(incoming_amqp_message, AMQPMessage)
assert list(incoming_amqp_message.get_data()) == list(amqp_message.get_data())
assert list(incoming_amqp_message.body) == list(amqp_message.body)
assert incoming_amqp_message.properties.subject == amqp_message.properties.subject
assert incoming_amqp_message.annotations == amqp_message.annotations
assert incoming_amqp_message.message_annotations == amqp_message.message_annotations
22 changes: 11 additions & 11 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from functools import partial

import pytest
import uamqp

from rstream import (
AMQPMessage,
Expand All @@ -18,6 +17,7 @@
OffsetType,
OnClosedErrorInfo,
Producer,
Properties,
RouteType,
SuperStreamConsumer,
SuperStreamProducer,
Expand Down Expand Up @@ -345,8 +345,8 @@ async def test_consume_superstream_with_sac_all_active(

for i in range(10000):
amqp_message = AMQPMessage(
body="a:{}".format(i),
properties=uamqp.message.MessageProperties(message_id=i),
body=bytes("a:{}".format(i), "utf-8"),
properties=Properties(message_id=str(i)),
)
await super_stream_producer_for_sac.send(amqp_message)

Expand Down Expand Up @@ -390,8 +390,8 @@ async def test_consume_superstream_with_sac_one_non_active(

for i in range(10000):
amqp_message = AMQPMessage(
body="a:{}".format(i),
properties=uamqp.message.MessageProperties(message_id=i),
body=bytes("a:{}".format(i), "utf-8"),
properties=Properties(message_id=str(i)),
)
await super_stream_producer_for_sac.send(amqp_message)

Expand Down Expand Up @@ -437,8 +437,8 @@ async def test_consume_superstream_with_callback_next(

for i in range(10000):
amqp_message = AMQPMessage(
body="a:{}".format(i),
properties=uamqp.message.MessageProperties(message_id=i),
body=bytes("a:{}".format(i), "utf-8"),
properties=Properties(message_id=str(i)),
)
await super_stream_producer_for_sac.send(amqp_message)

Expand Down Expand Up @@ -479,8 +479,8 @@ async def test_consume_superstream_with_callback_first(

for i in range(10000):
amqp_message = AMQPMessage(
body="a:{}".format(i),
properties=uamqp.message.MessageProperties(message_id=i),
body=bytes("a:{}".format(i), "utf-8"),
properties=Properties(message_id=str(i)),
)
await super_stream_producer_for_sac.send(amqp_message)

Expand Down Expand Up @@ -521,8 +521,8 @@ async def test_consume_superstream_with_callback_offset(

for i in range(10000):
amqp_message = AMQPMessage(
body="a:{}".format(i),
properties=uamqp.message.MessageProperties(message_id=i),
body=bytes("a:{}".format(i), "utf-8"),
properties=Properties(message_id=str(i)),
)
await super_stream_producer_for_sac.send(amqp_message)

Expand Down
12 changes: 4 additions & 8 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ async def test_publishing_sequence_superstream(
async def publish_with_ids(*ids):
for publishing_id in ids:
amqp_message = AMQPMessage(
body="a:{}".format(publishing_id),
body=bytes("a:{}".format(publishing_id), "utf-8"),
)

await super_stream_producer.send(amqp_message)
Expand All @@ -449,9 +449,7 @@ async def test_publishing_sequence_superstream_key_routing(

async def publish_with_ids(*ids):
for publishing_id in ids:
amqp_message = AMQPMessage(
body="a:{}".format(publishing_id),
)
amqp_message = AMQPMessage(body=bytes("a:{}".format(publishing_id), "utf-8"))
# will send to super_stream with routing key of 'key1'
await super_stream_key_routing_producer.send(amqp_message)

Expand Down Expand Up @@ -486,9 +484,7 @@ async def test_publishing_sequence_superstream_with_callback(

async def publish_with_ids(*ids):
for publishing_id in ids:
amqp_message = AMQPMessage(
body="a:{}".format(publishing_id),
)
amqp_message = AMQPMessage(body=bytes("a:{}".format(publishing_id), "utf-8"))
await super_stream_producer.send(
amqp_message,
on_publish_confirm=partial(
Expand Down Expand Up @@ -569,7 +565,7 @@ async def test_super_stream_producer_connection_broke(super_stream: str, consume
count = 0
while True:
amqp_message = AMQPMessage(
body="hello: {}".format(count),
body=bytes("hello: {}".format(count), "utf-8"),
application_properties={"id": "{}".format(count)},
)

Expand Down

0 comments on commit 82571de

Please sign in to comment.