From 0d68b10195952dee7897b15aab029c65eff78ddf Mon Sep 17 00:00:00 2001 From: Bulygin Evgeny Date: Wed, 9 Oct 2024 14:04:31 +0500 Subject: [PATCH] Fixed a bug when used `consumer_update_listener` callback from another subscription in single active consumer mode. --- rstream/consumer.py | 3 ++ tests/test_consumer.py | 65 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/rstream/consumer.py b/rstream/consumer.py index 8523c1d..5f46aba 100644 --- a/rstream/consumer.py +++ b/rstream/consumer.py @@ -414,6 +414,9 @@ async def _on_consumer_update_query_response( Callable[[bool, EventContext], Awaitable[OffsetSpecification]] ] = None, ) -> None: + if frame.subscription_id != subscriber.subscription_id: + return + # event the consumer is not active, we need to send a ConsumerUpdateResponse # by protocol definition. the offsetType can't be null so we use OffsetTypeNext as default if consumer_update_listener is None: diff --git a/tests/test_consumer.py b/tests/test_consumer.py index df25263..d9788f1 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -400,6 +400,71 @@ async def consumer_update_listener_with_custom_offset( await producer.close() +async def test_consume_with_multiple_sac_custom_consumer_update_listener_cb( + consumer: Consumer, producer: Producer +) -> None: + stream_name_1 = "stream1" + stream_name_2 = "stream2" + await producer.create_stream(stream=stream_name_1) + await producer.create_stream(stream=stream_name_2) + try: + # necessary to use send_batch, since in this case, upon delivery, rabbitmq will deliver + # this batch as a whole, and not one message at a time, like send_wait + for i in range(10): + await producer.send_wait(stream_name_1, AMQPMessage(body=f"{i}".encode())) + await producer.send_wait(stream_name_2, AMQPMessage(body=f"{i}".encode())) + + received_offsets_1 = [] + received_offsets_2 = [] + + async def consumer_cb1(message: bytes, message_context: MessageContext) -> None: + received_offsets_1.append(message_context.offset) + + async def consumer_cb2(message: bytes, message_context: MessageContext) -> None: + received_offsets_2.append(message_context.offset) + + async def consumer_update_listener_with_custom_offset_1( + is_active: bool, event_context: EventContext + ) -> OffsetSpecification: + if is_active: + return OffsetSpecification(offset_type=OffsetType.OFFSET, offset=5) + return OffsetSpecification(offset_type=OffsetType.FIRST, offset=0) + + async def consumer_update_listener_with_custom_offset_2( + is_active: bool, event_context: EventContext + ) -> OffsetSpecification: + if is_active: + return OffsetSpecification(offset_type=OffsetType.OFFSET, offset=7) + return OffsetSpecification(offset_type=OffsetType.FIRST, offset=0) + + async with consumer: + await consumer.subscribe( + stream=stream_name_1, + callback=consumer_cb1, + properties={"single-active-consumer": "true", "name": "sac_name1"}, + offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST), + consumer_update_listener=consumer_update_listener_with_custom_offset_1, + ) + await consumer.subscribe( + stream=stream_name_2, + callback=consumer_cb2, + properties={"single-active-consumer": "true", "name": "sac_name2"}, + offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST), + consumer_update_listener=consumer_update_listener_with_custom_offset_2, + ) + + await wait_for(lambda: len(received_offsets_1) >= 1) + await wait_for(lambda: len(received_offsets_2) >= 1) + + assert received_offsets_1[0] == 5 + assert received_offsets_2[0] == 7 + + finally: + await producer.delete_stream(stream=stream_name_1) + await producer.delete_stream(stream=stream_name_2) + await producer.close() + + async def test_consume_superstream_with_sac_all_active( super_stream: str, super_stream_consumer_for_sac1: SuperStreamConsumer,