Skip to content

Commit

Permalink
Fixed a bug when used consumer_update_listener callback from anothe…
Browse files Browse the repository at this point in the history
…r subscription in single active consumer mode. (#210)
  • Loading branch information
nesb1 authored Oct 10, 2024
1 parent a338868 commit c9da907
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 0 deletions.
3 changes: 3 additions & 0 deletions rstream/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ async def _on_consumer_update_query_response(
Callable[[bool, EventContext], Awaitable[OffsetSpecification]]
] = None,
) -> None:
if frame.subscription_id != subscriber.subscription_id:
return

# event the consumer is not active, we need to send a ConsumerUpdateResponse
# by protocol definition. the offsetType can't be null so we use OffsetTypeNext as default
if consumer_update_listener is None:
Expand Down
65 changes: 65 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,71 @@ async def consumer_update_listener_with_custom_offset(
await producer.close()


async def test_consume_with_multiple_sac_custom_consumer_update_listener_cb(
consumer: Consumer, producer: Producer
) -> None:
stream_name_1 = "stream1"
stream_name_2 = "stream2"
await producer.create_stream(stream=stream_name_1)
await producer.create_stream(stream=stream_name_2)
try:
# necessary to use send_wait here, because rmq will store every message in separate batch.
# In case of use send_batch rstream will filter messages on the client side bypassing some problems.
for i in range(10):
await producer.send_wait(stream_name_1, AMQPMessage(body=f"{i}".encode()))
await producer.send_wait(stream_name_2, AMQPMessage(body=f"{i}".encode()))

received_offsets_1 = []
received_offsets_2 = []

async def consumer_cb1(message: bytes, message_context: MessageContext) -> None:
received_offsets_1.append(message_context.offset)

async def consumer_cb2(message: bytes, message_context: MessageContext) -> None:
received_offsets_2.append(message_context.offset)

async def consumer_update_listener_with_custom_offset_1(
is_active: bool, event_context: EventContext
) -> OffsetSpecification:
if is_active:
return OffsetSpecification(offset_type=OffsetType.OFFSET, offset=5)
return OffsetSpecification(offset_type=OffsetType.FIRST, offset=0)

async def consumer_update_listener_with_custom_offset_2(
is_active: bool, event_context: EventContext
) -> OffsetSpecification:
if is_active:
return OffsetSpecification(offset_type=OffsetType.OFFSET, offset=7)
return OffsetSpecification(offset_type=OffsetType.FIRST, offset=0)

async with consumer:
await consumer.subscribe(
stream=stream_name_1,
callback=consumer_cb1,
properties={"single-active-consumer": "true", "name": "sac_name1"},
offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST),
consumer_update_listener=consumer_update_listener_with_custom_offset_1,
)
await consumer.subscribe(
stream=stream_name_2,
callback=consumer_cb2,
properties={"single-active-consumer": "true", "name": "sac_name2"},
offset_specification=ConsumerOffsetSpecification(OffsetType.FIRST),
consumer_update_listener=consumer_update_listener_with_custom_offset_2,
)

await wait_for(lambda: len(received_offsets_1) >= 1)
await wait_for(lambda: len(received_offsets_2) >= 1)

assert received_offsets_1[0] == 5
assert received_offsets_2[0] == 7

finally:
await producer.delete_stream(stream=stream_name_1)
await producer.delete_stream(stream=stream_name_2)
await producer.close()


async def test_consume_superstream_with_sac_all_active(
super_stream: str,
super_stream_consumer_for_sac1: SuperStreamConsumer,
Expand Down

0 comments on commit c9da907

Please sign in to comment.