Skip to content

Consuming from multiple topics partitions from a single thread

Emanuele Sabellico edited this page Jul 19, 2023 · 6 revisions

NOTE: This information relates to the legacy simple consumer. The new high-level KafkaConsumer automatically provides a single queue polling point for all assigned partitions

The problem

The standard consumer interface serves messages from one topic+partition (toppar) with one call to rd_kafka_consume*(). In the case an application needs to consume from multiple toppars one method is to create one application thread per toppar. This is sometimes the proper approach, such as when messages from different topics are handled by different parts of the application code, but when consuming from multiple partitions of the same topic multiple application threads complicate matters and requires the application to perform the usual lock-and-dispatch queue between threads.

The solution

To alleviate this librdkafka provides autonomous queues which can be used to re-route messages from multiple toppars to a single queue, thus providing a single queue to serve from a single application thread with a single call.

Create the queue

First create a queue which toppars (topic+partition) messages will be re-routed to:

rd_kafka_queue_t *rkqu = rd_kafka_queue_new(rk);

Start consuming

Create the consumer in the usual way but start consuming by using the queue interface version of .._consume_start():

rd_kafka_consume_start_queue(rkt, partition, RD_KAFKA_OFFSET_STORED, rkqu);

Repeat this step for each topic+partition you need to consume from by reusing the queue rkqu in each step.

NOTE: Queues from one rd_kafka_t may not be used with another rd_kafka_t handle.

Consume messages

Then from your consumer loop simply call one of the ..consume.._queue() functions to consume messages from all topics+partitions+ re-routed to this queue.

  while (run) {
    rd_kafka_message_t *rkmessage;
    /* Consume message from queue */
    rkmessage = rd_kafka_consume_queue(rkqu, 1000);
    /* Handle message */
    ...
    rd_kafka_message_destroy(rkmessage);
  }

Stop consuming

To stop consuming a topic+partition use the standard rd_kafka_consume_stop() interface.

rd_kafka_consume_stop(rkt, partition);

Destroy queue

When all consumers using a queue have been stopped the queue itself may be destroyed.

rd_kafka_queue_destroy(rkqu)

Ordering

Message ordering is guaranteed within a topic+partition that is re-routed to the queue, but there is no definition of ordering between topic+partitions.

Documentation

See https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h for interface documentation.