Skip to content

Commit

Permalink
tests: fix Kafka tests from main branch
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Dec 9, 2024
1 parent 7b00966 commit 82e3969
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 15 deletions.
33 changes: 24 additions & 9 deletions faststream/_internal/broker/abc_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,34 @@
Optional,
)

from faststream._internal.publisher.proto import PublisherProto
from faststream._internal.state import BrokerState, Pointer
from faststream._internal.subscriber.proto import SubscriberProto
from faststream._internal.types import BrokerMiddleware, CustomCallable, MsgType
from faststream.specification.proto import EndpointSpecification
from faststream.specification.schema import PublisherSpec, SubscriberSpec

if TYPE_CHECKING:
from fast_depends.dependencies import Dependant

from faststream._internal.publisher.proto import PublisherProto
from faststream._internal.subscriber.proto import SubscriberProto

class FinalSubscriber(
EndpointSpecification[MsgType, SubscriberSpec],
SubscriberProto[MsgType],
):
pass


class FinalPublisher(
EndpointSpecification[MsgType, PublisherSpec],
PublisherProto[MsgType],
):
pass


class ABCBroker(Generic[MsgType]):
_subscribers: list["SubscriberProto[MsgType]"]
_publishers: list["PublisherProto[MsgType]"]
_subscribers: list[FinalSubscriber[MsgType]]
_publishers: list[FinalPublisher[MsgType]]

def __init__(
self,
Expand Down Expand Up @@ -61,9 +76,9 @@ def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
@abstractmethod
def subscriber(
self,
subscriber: "SubscriberProto[MsgType]",
subscriber: "FinalSubscriber[MsgType]",
is_running: bool = False,
) -> "SubscriberProto[MsgType]":
) -> "FinalSubscriber[MsgType]":
subscriber.add_prefix(self.prefix)
if not is_running:
self._subscribers.append(subscriber)
Expand All @@ -72,17 +87,17 @@ def subscriber(
@abstractmethod
def publisher(
self,
publisher: "PublisherProto[MsgType]",
publisher: "FinalPublisher[MsgType]",
is_running: bool = False,
) -> "PublisherProto[MsgType]":
) -> "FinalPublisher[MsgType]":
publisher.add_prefix(self.prefix)
if not is_running:
self._publishers.append(publisher)
return publisher

def setup_publisher(
self,
publisher: "PublisherProto[MsgType]",
publisher: "FinalPublisher[MsgType]",
**kwargs: Any,
) -> None:
"""Setup the Publisher to prepare it to starting."""
Expand Down
5 changes: 1 addition & 4 deletions faststream/rabbit/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,10 +290,7 @@ def __init__(
)


class RabbitRouter(
RabbitRegistrator,
BrokerRouter["IncomingMessage"]
):
class RabbitRouter(RabbitRegistrator, BrokerRouter["IncomingMessage"]):
"""Includable to RabbitBroker router."""

def __init__(
Expand Down
4 changes: 2 additions & 2 deletions tests/brokers/kafka/test_consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ async def handler(msg):

await asyncio.wait(
(
asyncio.create_task(br._producer._producer.send(queue, key=b"")),
asyncio.create_task(br._producer._producer.producer.send(queue, key=b"")),
asyncio.create_task(event.wait()),
),
timeout=3,
Expand All @@ -426,7 +426,7 @@ async def handler(msg):

await asyncio.wait(
(
asyncio.create_task(br._producer._producer.send(queue, key=b"")),
asyncio.create_task(br._producer._producer.producer.send(queue, key=b"")),
asyncio.create_task(event.wait()),
),
timeout=3,
Expand Down

0 comments on commit 82e3969

Please sign in to comment.