diff --git a/README.md b/README.md index b8f9ba4..65406dc 100644 --- a/README.md +++ b/README.md @@ -330,12 +330,12 @@ Run the server with the following command: ```bash docker run -it --rm --name rabbitmq -p 5552:5552 -p 5672:5672 -p 15672:15672 \ -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost' \ - rabbitmq:3.12-management + rabbitmq:3.13.1-management ``` enable the plugin: ```bash -docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream +docker exec rabbitmq rabbitmq-plugins enable rabbitmq_stream rabbitmq_stream_management rabbitmq_amqp1_0 ``` and run the tests: diff --git a/rstream/client.py b/rstream/client.py index 976507a..877e577 100644 --- a/rstream/client.py +++ b/rstream/client.py @@ -240,12 +240,16 @@ async def start(self) -> None: self.add_handler(schema.Close, self._on_close) async def run_queue_listener_task(self, subscriber_name: str, handler: HT[FT]) -> None: - if subscriber_name not in self._frames: + task_name = f"run_delivery_handlers_{subscriber_name}" + if task_name not in self._tasks: self.start_task( - f"run_delivery_handlers_{subscriber_name}", + task_name, self._run_delivery_handlers(subscriber_name, handler), ) + async def stop_queue_listener_task(self, subscriber_name: str) -> None: + await self.stop_task(name=f"run_delivery_handlers_{subscriber_name}") + async def _run_delivery_handlers(self, subscriber_name: str, handler: HT[FT]): while self.is_connection_alive(): frame_entry = await self._frames[subscriber_name].get() @@ -361,7 +365,7 @@ async def close(self) -> None: await self.stop_task("listener") for subscriber_name in self._frames: - await self.stop_task(f"run_delivery_handlers_{subscriber_name}") + await self.stop_queue_listener_task(subscriber_name=subscriber_name) if self._conn is not None and connection_is_broken is False: await self._conn.close() diff --git a/rstream/consumer.py b/rstream/consumer.py index 32ade3f..d759e72 100644 --- a/rstream/consumer.py +++ b/rstream/consumer.py @@ -304,6 +304,8 @@ async def subscribe( async def unsubscribe(self, subscriber_name: str) -> None: logger.debug("unsubscribe(): UnSubscribing and removing handlers") subscriber = self._subscribers[subscriber_name] + + await subscriber.client.stop_queue_listener_task(subscriber_name=subscriber_name) subscriber.client.remove_handler( schema.Deliver, name=subscriber.reference, diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 3d81b33..750a350 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -242,66 +242,74 @@ async def test_offset_type_next(stream: str, consumer: Consumer, producer: Produ async def test_consume_with_resubscribe(stream: str, consumer: Consumer, producer: Producer) -> None: - captured: list[bytes] = [] + captured_by_first_consumer: list[bytes] = [] subscriber_name = await consumer.subscribe( - stream, callback=lambda message, message_context: captured.append(bytes(message)) + stream, callback=lambda message, message_context: captured_by_first_consumer.append(bytes(message)) ) await producer.send_wait(stream, b"one") - await wait_for(lambda: len(captured) >= 1) + await wait_for(lambda: len(captured_by_first_consumer) >= 1) + assert captured_by_first_consumer == [b"one"] await consumer.unsubscribe(subscriber_name) + + captured_by_second_consumer: list[bytes] = [] await consumer.subscribe( stream, - callback=lambda message, message_context: captured.append(bytes(message)), + callback=lambda message, message_context: captured_by_second_consumer.append(bytes(message)), offset_specification=ConsumerOffsetSpecification(OffsetType.NEXT, None), ) await producer.send_wait(stream, b"two") - await wait_for(lambda: len(captured) >= 2) - assert captured == [b"one", b"two"] + await asyncio.sleep(1) + await wait_for(lambda: len(captured_by_second_consumer) >= 1) + assert captured_by_second_consumer == [b"two"] async def test_consume_with_resubscribe_msg(stream: str, consumer: Consumer, producer: Producer) -> None: - captured: list[bytes] = [] + captured_by_first_consumer: list[bytes] = [] subscriber_name = await consumer.subscribe( - stream, callback=lambda message, message_context: captured.append(bytes(message)) + stream, callback=lambda message, message_context: captured_by_first_consumer.append(bytes(message)) ) for i in range(100): await producer.send_wait(stream, b"one") - await wait_for(lambda: len(captured) >= 100) + await wait_for(lambda: len(captured_by_first_consumer) >= 100) await consumer.unsubscribe(subscriber_name) + + captured_by_second_consumer: list[bytes] = [] await consumer.subscribe( stream, subscriber_name=subscriber_name, - callback=lambda message, message_context: captured.append(bytes(message)), + callback=lambda message, message_context: captured_by_second_consumer.append(bytes(message)), offset_specification=ConsumerOffsetSpecification(OffsetType.NEXT, None), ) for i in range(100): await producer.send_wait(stream, b"two") - await wait_for(lambda: len(captured) >= 200) + await wait_for(lambda: len(captured_by_second_consumer) >= 100) async def test_consume_superstream_with_resubscribe( super_stream: str, super_stream_consumer: SuperStreamConsumer, super_stream_producer: SuperStreamProducer ) -> None: - captured: list[bytes] = [] + captured_by_first_consumer: list[bytes] = [] await super_stream_consumer.subscribe( - callback=lambda message, message_context: captured.append(bytes(message)) + callback=lambda message, message_context: captured_by_first_consumer.append(bytes(message)) ) await super_stream_producer.send(b"one") - await wait_for(lambda: len(captured) >= 1) + await wait_for(lambda: len(captured_by_first_consumer) >= 1) await super_stream_consumer.unsubscribe() + + captured_by_second_consumer: list[bytes] = [] await super_stream_consumer.subscribe( - callback=lambda message, message_context: captured.append(bytes(message)), + callback=lambda message, message_context: captured_by_second_consumer.append(bytes(message)), offset_specification=ConsumerOffsetSpecification(OffsetType.NEXT, None), ) await super_stream_producer.send(b"two") - await wait_for(lambda: len(captured) >= 2) - assert captured == [b"one", b"two"] + await wait_for(lambda: len(captured_by_second_consumer) >= 1) + assert captured_by_second_consumer == [b"two"] async def test_consume_with_restart(stream: str, consumer: Consumer, producer: Producer) -> None: