Skip to content

Commit

Permalink
fixing mypy issues
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielePalaia committed Apr 8, 2024
1 parent c52ad08 commit 32de0d5
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 10 deletions.
12 changes: 6 additions & 6 deletions docs/examples/reliable_client/BestPracticesClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async def make_producer(rabbitmq_data: dict) -> Producer | SuperStreamProducer:

else:

producer = SuperStreamProducer(
producer = SuperStreamProducer( # type: ignore
host=host,
username=username,
password=password,
Expand Down Expand Up @@ -131,7 +131,7 @@ async def make_consumer(rabbitmq_data: dict) -> Consumer | SuperStreamConsumer:

else:

consumer = SuperStreamConsumer(
consumer = SuperStreamConsumer( # type: ignore
host=host,
username=username,
password=password,
Expand Down Expand Up @@ -182,7 +182,7 @@ async def publish(rabbitmq_configuration: dict):
producers = int(rabbitmq_configuration["Producers"])
delay_sending_msg = int(rabbitmq_configuration["DelayDuringSendMs"])

producer = await make_producer(rabbitmq_configuration)
producer = await make_producer(rabbitmq_configuration) # type: ignore
await producer.start()

# create a stream if it doesn't already exist
Expand Down Expand Up @@ -217,7 +217,7 @@ async def publish(rabbitmq_configuration: dict):

else:
try:
await producer.send(message=amqp_message, on_publish_confirm=_on_publish_confirm_client)
await producer.send(message=amqp_message, on_publish_confirm=_on_publish_confirm_client) # type: ignore
except Exception as ex:
print("exception while sending " + str(ex))

Expand All @@ -242,7 +242,7 @@ async def consume(rabbitmq_configuration: dict):
consumers = int(rabbitmq_configuration["Consumers"])
stream_name = rabbitmq_configuration["StreamName"]

consumer = await make_consumer(rabbitmq_configuration)
consumer = await make_consumer(rabbitmq_configuration) # type: ignore

# create a stream if it doesn't already exist
if not is_super_stream_scenario:
Expand All @@ -260,7 +260,7 @@ async def consume(rabbitmq_configuration: dict):
offset_specification=offset_spec,
)
else:
await consumer.subscribe(callback=on_message, decoder=amqp_decoder, offset_specification=offset_spec)
await consumer.subscribe(callback=on_message, decoder=amqp_decoder, offset_specification=offset_spec) # type: ignore

await consumer.run()

Expand Down
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ mmh3 = "^4.0.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.0"
requests = "^2.31.0"
types-requests = "^2.31.0.20240406"

[tool.black]
line-length = 110
Expand Down
9 changes: 6 additions & 3 deletions rstream/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,10 +454,12 @@ async def reconnect_stream(self, stream: str, offset: Optional[int] = None) -> N
if stream == self._subscribers[subscriber_id].stream:
curr_subscriber = self._subscribers[subscriber_id]
curr_subscriber_id = subscriber_id
del self._subscribers[curr_subscriber_id]
if curr_subscriber_id is not None:
del self._subscribers[curr_subscriber_id]

if stream in self._clients:
await self._clients[stream].free_available_id(curr_subscriber.subscription_id)
if curr_subscriber is not None:
await self._clients[stream].free_available_id(curr_subscriber.subscription_id)
await self._clients[stream].close()
del self._clients[stream]

Expand Down Expand Up @@ -519,7 +521,8 @@ async def _maybe_clean_up_during_lost_connection(self, stream: str):

if stream in self._clients:
await self._clients[stream].remove_stream(stream)
await self._clients[stream].free_available_id(curr_subscriber.subscription_id)
if curr_subscriber is not None:
await self._clients[stream].free_available_id(curr_subscriber.subscription_id)
if await self._clients[stream].get_stream_count() == 0:
await self._clients[stream].close()
del self._clients[stream]

0 comments on commit 32de0d5

Please sign in to comment.