Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consumer.consume return unassigned messages if rebalance happens during the call #1013

Closed
4 of 7 tasks
olejorgenb opened this issue Jan 6, 2021 · 2 comments
Closed
4 of 7 tasks
Labels

Comments

@olejorgenb
Copy link

Description

When asking for large number of messages using consumer.consume with a large timeout, non-assigned messages are returned if a rebalance happens during that consume call.

How to reproduce

import typing as ty
import socket
import confluent_kafka as ck

def create_bare_ck_consumer(
        topic: str,
        uri: str,
        group: str,
        max_poll_interval_ms: int,
        auto_offset_reset: str = "latest",
):
    topics = [topic]

    params = {
        "bootstrap.servers": uri,
        "enable.auto.commit": False,
        "auto.offset.reset": auto_offset_reset,
        "session.timeout.ms": 30*1000,
        "client.id": f"{socket.gethostname()}@rdkafka-{ck.__version__}",
        "group.id": group,
        "max.poll.interval.ms": max_poll_interval_ms
        # "debug": "consumer,cgrp",  # https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#debug-contexts
    }

    kafka_consumer = ck.Consumer(
        **params,
    )

    def on_assigned(consumer: ck.Consumer, assigned):
        consumer.assign(assigned)
        print(f"We were assigned {len(assigned)} partitions.")
        print("Rebalance finished (on_assigned)")

    def on_revoked(*args):
        print("Rebalance started (on_revoked)")

    kafka_consumer.subscribe(
        topics,
        on_assigned,
        on_revoked,
    )

    return kafka_consumer

def main():
    print(f"{ck.version()=}")
    print(f"{ck.libversion()=}")

    consumer = create_bare_ck_consumer(
        topic="the-topic",
        uri="localhost:9092",
        group="the-group",
        max_poll_interval_ms=7 * 60 * 1000
    )
    try:
        msgs = []
        msgs_without_errors = []
        while len(msgs) == 0:
            print("Start consuming")
            msgs = consumer.consume(3000, timeout=10)
            print(f"Done consuming -> {len(msgs)}")

            msgs_without_errors: ty.List[ck.Message] = []

            for msg in msgs:
                if (error := msg.error()) is not None:
                    print(f"Got error when polling: {error.code()}")
                else:
                    msgs_without_errors.append(msg)

        assignment = consumer.assignment()
        print("Assignments:")
        for tp in assignment:
            print(tp.topic, tp.partition)

        for msg in msgs_without_errors:
            if ck.TopicPartition(msg.topic(), msg.partition()) not in assignment:
                print("Received unassigned message!", msg.topic(), msg.partition())
    finally:
        if consumer:
            consumer.close()


if __name__ == '__main__':
    main()

When starting two of the above processes fairly rapidly the first is assigned all 128 partitions and then a rebalance is triggered before consume have returned any messages. On my machine there need to be sufficient amount of messages available to trigger the behavior.

The output of the first process is:

ck.version()=('1.5.0', 17104896)
ck.libversion()=('1.5.0', 17105151)
Start consuming
We were assigned 128 partitions.
Rebalance finished (on_assigned)
Rebalance started (on_revoked)
We were assigned 64 partitions.
Rebalance finished (on_assigned)
Done consuming -> 1654
Assignments:
the-topic 0
the-topic 1
the-topic 2
...
the-topic 61
the-topic 62
the-topic 63
Received unassigned message! the-topic 64
Received unassigned message! the-topic 64
Received unassigned message! the-topic 64
Received unassigned message! the-topic 64
Received unassigned message! the-topic 64
Received unassigned message! the-topic 65
Received unassigned message! the-topic 65
Received unassigned message! the-topic 65
Received unassigned message! the-topic 65
...

According to #435 (comment) the internal queue should be cleared on rebalance.

The same result happens if I remove the assign call in the on_assign callback. (it's handled internally now)

rkqu is not filtered after rd_kafka_consume_batch_queue completes, and I guess librdkafka doesn't touch that queue on assign:

n = (Py_ssize_t)rd_kafka_consume_batch_queue(rkqu,

If this is not considered a bug, the behavior should at least be documented? Or is it something I've overlooked?

PS: Similar issue for a node-rdkafka wrapper: Blizzard/node-rdkafka#638 (comment)

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (ck.version()=('1.5.0', 17104896) and ck.libversion()=('1.5.0', 17105151)):
  • Apache Kafka broker version: 2.4.1 (zookeeper: 3.6.0)
  • Client configuration: (see description)
  • Operating system: linux
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

Yeah this is a known issue with the batch consume interface and a reason why we recommend not to use it.
It should really be fixed.

@jliunyu
Copy link
Contributor

jliunyu commented Apr 21, 2021

The fix was merged

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants