Skip to content
Magnus Edenhill edited this page Sep 26, 2019 · 58 revisions

Frequently asked questions

Consumer

Are balanced consumer groups supported?

Yes, broker based balanced consumer groups are supported and requires Apache Kafka broker version >= 0.9.

  • C: rd_kafka_subscribe() et.al
  • C++: KafkaConsumer class.

What is a simple consumer?

The Consumer class is the legacy simple consumer that works with any broker >= 0.8, it does not feature any consumer group support but allows full flexibility in regards to partitions to consume and their offsets.

  • C: rd_kafka_consume_start() et.al.
  • C++: Consumer class

This API is deprecated but there is no plan to remove it, please see the rd_kafka_assign() or KafkaConsumer::assign() API for a more modern alternative.

What is a high-level balanced consumer?

The KafkaConsumer class is the new high-level balanced consumer which requires broker version >= 0.9, the subscribed topics and partitions are assigned to the members in a consumer group so that only one consumer may consume messages from a single partition at any time.

  • C: rd_kafka_subscribe(), rd_kafka_assign(), et.al.
  • C++: KafkaConsumer class

Why am I not seeing any messages?

If there are no stored offsets for a partition (and group in case of the KafkaConsumer) the consumer will default its starting offset to the topic configuration setting auto.offset.reset which defaults to latest - that is, it will start consuming at the current end of a partition.

If you are using the KafkaConsumer you probably do not have a per-topic configuration object but should use the default topic config, see default_topic_conf.

How do I manually set the start offset to consume?

The assign() API takes a list of partitions with an optional starting offset for each partition. See Manually setting the consumer start offset.

How do I set topic configuration properties with the KafkaConsumer?

Since you dont need to create any topic objects when using KafkaConsumer you might wonder how to set topic configuration. The default_topic_conf configuration property solves this, it takes a topic configuration object and applies it to all internally created topics as their default configuration.

If you need special configuration for a sub-set of your topics you will need to create them prior to calling subscribe() or assign() and pass their explicit configuration objects at topic creation time.

Sync offset commits are slow

If the current controller broker is also partition leader for one or more partitions that the consumer is fetching from, an outstanding FetchRequest will block sub-sequent OffsetCommitRequests queued on the same connection for up to fetch.wait.max.ms. This is only a problem if no new messages are arriving at the partition. The workaround is to decrease fetch.wait.max.ms (default 100ms).

Why committing each message is slow

Even when asynchronously committing offsets for each processed message, the OffsetCommit request-response round-trip times will quite quickly build up a queue of OffsetCommitRequests waiting in-queue or in-flight to be processed and responded to by the broker. When this queue grows large enough the latency to make it through the queue is higher than the request timeout (socket.timeout.ms, def 60s) and these requests will start failing with Local: Timed out. Even if it does not come to that it still risks slowing down the consume rate if it is fetching messages from the same broker, due to the fact that a blocking FetchRequest will block sub-sequently queued OffsetCommitRequests, and vice versa.

The underlying problem is that the consumption and processing rate is higher than the commit rate, which inevitably leads to these queue build ups.

  • Do you need to commit per message? If using async commits and the commit fails for whatever reason (such as timeout on queue!) that commit is still lost. If you really need per-message commits you probably want sync commits as well, but that will really slow things down, see FAQ item just above.
  • What about not committing every message, but every say 100? or 1000? Whatever it takes to make the broker keep up with the commits. This is what the default auto offset committer does, but based on on the auto.commit.interval.ms time interval.
  • There are actually two layers of offset commit, first there is the offset store (offset_store(), store_offsets(), etc) that locally stores (in memory) the offset to be committed, then there's the actual commit which commits the stored offset to the broker. An alternative is thus to explicitly store() each message's offset after processing but then relying on the auto offset committer to actually commit the offsets for you. Use enable.auto.offset.store=false to disable automatic offset store (which is by default done prior to your processing).

My otherwise idle consumer is taking 1-2% CPU

librdkafka pre-fetches messages to an internal queue (so that consume() or consumer_poll() is instantaneous) and when there are no new messages available to fetch for the given topics+partitions the Fetch request will block on the broker for fetch.wait.max.ms. This property defaults to 100ms which will thus cause 10 wake-ups per second in the case where there are no new messages - which will increase the CPU usage somewhat. This increased processing typically does not matter (since the consumer will be idle anyway) but it can be mitigated by increasing fetch.wait.max.ms. Do note though that this will block that entire broker connection for that long, preventing metadata requests, commits, etc, from being sent until the Fetch request returns empty-handed after that long.

Explain the consumer's memory usage to me

Messages are fetched in batches from the broker (the batch size is determined by the producer) and since librdkafka strives for performance it will avoid copying the message payload unless necessary. To that end a message_t object is allocated for each message, holding its metadata, but the message's payload/value is referenced from a single zero-copy receive buffer, and that buffer is only freed when all message objects pointing to it are destroyed. However, the queued.max.messages.kbytes accounts for the size of the message objects (with referenced memory), when a message is destroyed (but the underlying buffer isn't) that space is decremented from the queued.max.messages.kbytes meter, allowing new messages to be fetched. That is probably the reason why you are seeing an increase in memory usage when you destroy messages, which is of course counter-intuitive.

The default settings of librdkafka is optimized for the normal stream case and will aggressively prefetch messages, but if your application is pausing, seeking or processing messages slowly that pre-fetch buffer is not of much use and should be decreased. For this purpose, try adjusting queued.min.messages and fetch.message.max.bytes downwards.

You may also enable statistics and monitoring the partition's fetchq metric.

How are partitions fetched?

Each broker connection is handled by its own thread, and each such thread has a partition fetcher that serves the assigned partitions for that broker. While there will only be at most one outstanding Fetch request for a single broker connection, other broker connections may have outstanding Fetch requests as well.

An assigned partition is included in the Fetch request if:

  • it has a valid fetch offset (i.e., not currently acquiring a committed offset or a logical offset lookup (e.g., earliest)), and
  • the partition's fetch queue, or the queue it has been forwarded to (a single global fetchq when using the high-level KafkaConsumer or having called rd_kafka_poll_set_consumer()), has less than queued.min.messages enqueued, and
  • the same queue has less than queued.max.message.kbytes enqueued, and
  • the partition is not backed off (fetch.error.backoff.ms after reaching end of partition or an error).

The broker will add messages to the Fetch response in the order the partitions were requested until there are no more messages for the requested partitions, or the maximum requested size (fetch.message.max.bytes) or maximum response size is exceeded. To provide fairness the order in which the partitions are added to the Fetch request is round-robin. As the client parses the Fetch response the messages are appended to the partition queues (or forwarded-to global queue, see above) in the order they were returned from the broker, this means that all messages for partition A are appended, then partition B, then partition C, and so on.

The per-partition fetch decision logic can be seen in detail in rd_kafka_toppar_fetch_decide().

What are partition queues? And why are some partitions slower than others?

Each fetched partition has its own queue of fetched messages, when this queue size reaches queued.min.messages or queued.max.messages.kbytes the fetcher will stop fetching that partition until the queue drops below those thresholds. When using the high-level consumer (KafkaConsumer) the partition queues are forwarded to a single queue so that you can read message from all partitions by calling rd_kafka_consumer_poll() or KafkaConsumer::consume() in a single place. With the queues forwarded the queue thresholds are no longer checked per partition, but for the common queue, which means that all partitions stop fetching when the common queue size thresholds are exceeded.

If a set of partitions have the same rate of messages, but some of those partitions are on "faster" brokers (faster/closer/less-loaded broker) this shared queue behaviour may lead to these faster partitions occupying more space than the "slower" partitions.

One way around this partition rate inbalance is to consume messages from each partition separately. You do this by removing the forwarding of the partition queues (to the common queue) so that each partition queue is checked individually against the size thresholds, this will generate a more balanced partition queue load since each partition will be fetched up to the same threshold, instead of a few partitions taking up much of that space.

For each assigned partition, use get_partition_queue() to get the partition's queue, then call forward(NULL) on that queue to disable forwarding, then poll each queue separately.

Do note that you still need to call consumer_poll() (or KafkaConsuner::consume()) to serve rebalance events, etc, but it will not return any messages now.

Also make sure to adjust queued.min.messages and queued.max.messages.kbytes to limit the number of messages and bytes, since these are now applied per partition rather than globally.

Producer

Why does rd_kafka_produce() always seem to succeed, even if there is a problem connecting to the Kafka brokers?

Because librdkafka has put your message on its outbound queue and will try to recover from the error for message.timeout.ms before reporting failure. Delivery failures are reported asynchronously; meanwhile your program can be notified (also asynchronously) about connection issues if it registers an error callback.

When and how should I call rd_kafka_poll()?

Since the producer is asynchronous it needs a way to inform the application when a message has been delivered or permanently fails delivery. It does this through a delivery report callback (DR callback), which is triggered from the rd_kafka_poll() call.

If you know that your producer application will have a steady stream of incoming messages to produce, you can call poll() from the produce loop, like so:

   while (new data to produce) {
         rd_kafka_produce(data); /* Non-blocking: Enqueues message to produce by background thread */
         rd_kafka_poll(0);       /* Non-blocking: Trigger any delivery report callbacks that are ready. */
   }

   rd_kafka_flush(-1);  /* Wait for all outstanding messages to deliver, triggers delivery report callbacks too. */

A common misconception is that the poll() call in the above example will trigger the delivery report callback for the message just produced, but since the producer is asynchronous the message will most likely not even have been sent to the broker yet. But the poll() call may trigger delivery reports from previous produce calls.

In the case where the application does not know the incoming data rate to produce, or there are intervals of no messages, it is recommended to run poll() in a separate loop to trigger delivery reports (and other callbacks) in a timely manner. If the application has a main loop that is not driven by input data that is a good place to call poll.

Else spawn a new thread that simply calls poll, like so:

void *my_thread_main (..) {

   while (!app_terminate)
      rd_kafka_poll(1000);  /* Use a timeout of 1s to avoid busy looping */
}

Note: a message is accounted for in the producer's maximum queue limit (queue.buffering.max.messages and queue.buffering.max.kbytes) until its delivery report callback has been triggered. To avoid ERR__QUEUE_FULL errors, make sure to call poll regularily.

Why is there no sync produce() interface?

Because synchronous producing is a performance killer and scales very poorly, it is effectively bound by network and broker latency. If the round-trip to produce a message is 2ms the maximum achievable rate is 500 messages per second. In scenarios where the broker or network is getting slower for whatever reason this rate decreases even more, possibly causing backpressure in the application affecting the upstream data source.

It is thus better to design the application to use asynchronous sends and use the delivery report callback to take action when the message is finally delivered or fails permanently.

If you still think you need a sync producer, see How to implement a sync producer

How do I get the offset of a produced message?

Use a delivery callback.

General

Is librdkafka compatible with my broker version?

Yes, it is. But you need to read this: Broker version compatibility

Is the library thread-safe?

Yes, librdkafka is completely thread-safe (unless otherwise noted in the API documentation). Any API, short of the destructor functions, may be called at any time from any thread. The common restrictions of object destruction still applies (e.g., you must not call rd_kafka_destroy() while another thread is calling rd_kafka_poll() or similar).

Should I use the C or C++ interface?

That is up to you, the library itself is implemented in C and the C++ interface is a thin layer on top of the C code. The C++ interface may lag behind the C interface functionally (but usually not by far).

What version should I use?

For production use: use latest official release

For testing and development use: use latest master branch

How do I properly terminate my Producer or Consumer without loosing messages?

This is covered in Proper termination sequence

Can librdkafka create, modify or delete topics?

Yes, the Admin API allows you to create and delete topics, create partitions, get and alter configuration for topics, brokers, and other cluster resources.

librdkafka also supports the deprecated automatic topic creation, which needs to be enabled with auto.create.topics.enable=true on the broker.

How are tombstones (deletes) represented in compacted topics

A record tombstone is represented in Kafka compacted topics by a valid key and null payload.

For the producer: simply pass NULL for the value/payload pointer in the ..produce() call.

For the consumer: treat message->payload == NULL as tombstone.

Logging and logs

  1. Where does librdkafka log?
  • That's up to you, see rd_kafka_conf_set_log_cb. The default is a stderr logger.
  1. Even though I have log_level set to DEBUG I don't see anything on stdout /stderr?
  • log_level is 6 (info) by default (anything but debug) but is automatically raised to 7 (debug) when debugging is enabled. So it serves the reverse purpose: to filter out higher-leveled logs. There is really very little use for it.
  1. I'm not seeing any logs
  • librdkafka does not really log anything unless there are errors. Logs are logged through the log_cb which defaults to a stderr writer, there is also a builtin syslog writer you can configure with set_log_cb(). If you want to get librdkafka to log stuff even though there are no errors you can enable debugging by setting the debug config property to e.g. topic,broker.
  1. Is there any provision to give your own log file path to librdkafka?
  • If you want to log to a file you need to implement your own log_cb, which is simple enough to do, just implement the log_cb interface and pass your function reference to set_log_cb.

Why am I seeing Receive failed: Disconnected?

If the remote peer, typically the broker (but could also be an active TCP gateway of some kind), closes the connection you'll see a log message like this:

%3|1500588440.537|FAIL|rdkafka#producer-1| 10.255.84.150:9092/1: Receive failed: Disconnected

There are a number of possible reasons, in order of how common they are:

  • Broker's idle connection reaper closes the connection due to inactivity. This is controlled by the broker configuration property connections.max.idle.ms and defaults to 10 minutes. This is by far the most common reason for spontaneous disconnects.
  • The client sent an unsupported protocol request; see Broker version compatibility. This is considered a configuration error on the client. The broker should log an exception explaining why the connection was closed, see the broker logs.
  • The client sent a malformed protocol request; this is an indication of a bug in the client. The broker should log an exception explaining why the connection was closed, see the broker logs.
  • The broker is in an invalid state. The broker should log an exception explaining why the connection was closed, see the broker logs.
  • TCP gateway/load-balancer/firewall session timeout. Try enabling TCP keep-alives on the client by setting socket.keepalive.enable=true.

Since a TCP close can't signal why the remote peer closed the connection there is no way for the client to know what went wrong. If the disconnect logs are getting annoying and the admin deems they are caused by the idle connection reaper, the log.connection.close client configuration property can be set to false to silence all spontaneous disconnect logs.

NOTE: Whenever a connection is closed for whatever reason, librdkafka will automatically reconnect after reconnect.backoff.jitter.ms (default 500ms).

Connecting to IPv6 addresses: Connect to ipv6#[::1]:9092 failed: Connection refused

librdkafka will use the system resolver to resolve the broker hostname. On some systems, OSX in particular, the localhost entry in /etc/hosts resolves both to an IPv4 and IPv6 address, so librdkafka will, in a round-robin fashion, attempt to connect to all addresses the hostname resolves to. If the broker is only listening to the IPv4 address then the clients connection attempt to the IPv6 address will fail.

To limit the address families the clients connects to, set the broker.address.family configuration property to v4 or v6.

Topic configuration

  • If rd_kafka_topic_new() is called with a NULL rd_kafka_topic_conf_t * it will use the default topic configuration.
  • The default topic configuration is set by populating a rd_kafka_topic_conf_t object and then using rd_kafka_conf_set_default_topic_conf(global_conf, my_default_config) - this will replace any previously set default topic configuration.
  • As a convenience to the user, librdkafka allows default topic configuration to be specified on the global topic configuration object - this is handy in the case of configuration files since there is no need to split configuration between global and (default) topic, allowing you to set request.required.acks (et.al) with rd_kafka_conf_set() rather than rd_kafka_topic_conf_set(). This functionality is called topic fallthru configuration.
  • Internally, topic fallthru configuration will create a default topic config object. If you first call rd_kafka_conf_set("some.topic.property") and then call rd_kafka_conf_set_default_topic_config(), your default topic config object will replace the implicitly created one, which you probably don't want.
  • The recommendation is to not use rd_kafka_set_default_topic_conf() but rely on the topic fallthru configuration.
  • You can still use topic-specific configuration, do note though that a topic's configuration is applied only the first time the topic is referenced, either by the application with rd_kafka_topic_new() or when the topic is created automatically internally because it was referenced in rd_kafka_subscribe() or similar.
  • Specific configuration passed to rd_kafka_topic_new() will override the default topic configuration.

How do I change the topic configuration?

You can only set the topic configuration once per client instance, namely in your first call to rd_kafka_topic_new().

Topic config, as supplied to (the first call to) rd_kafka_topic_new() for a specific topic is local to that topic for the remainder of that rd_kafka_t instance's lifetime. That means that only the properties from conf1 will be used in the following example:

rd_kafka_topic_new(rk, "topic1", conf1);
rd_kafka_topic_new(rk, "topic1", conf2);

Topics are local to their rd_kafka_t instance and not shared between them in any way.

Number of internal threads

librdkafka creates 1 main thread and one thread per broker, for each client instance.

See "Number of broker TCP connections" below.

Number of broker TCP connections

librdkafka will attempt connect to all brokers specified in the bootstrap.servers (or metadata.broker.list) configuration property. Upon the first connection the broker will be queried for the full list of brokers in the cluster, this is called a Metadata request, and librdkafka will then connect to all brokers returned in the Metadata response - the full set of brokers in the cluster.

The initial bootstrap broker connections will only be used for Metadata queries, unless the hostname and port of a bootstrap broker exactly matches the hostname and port of a broker returned in the Metadata response (this is the advertised.listeners broker configuration property), in which case the bootstrap broker connection will be associated with that broker's broker id and used for the full protocol set (such as producing or consuming).

The worst case number of connections made by librdkafka is: cnt(bootstrap.servers) + cnt(brokers in Metadata response) and this happens if the bootstrap server hostnames and ports don't match the Metadata response.

The best case number of connections is cnt(brokers in Metadata response) if all the bootstrap.server hostnames and ports exactly match the brokers in the Metadata response.

librdkafka will attempt to connect to all brokers it knows about (through bootstrap or Metadata) and will reconnect and retry forever.

Zookeeper

Modern Kafka clients do not connect to Zookeeper. Historical versions of Kafka exposed Zookeeper to clients to retrieve the full broker list (Kafka 0.7) and the initial consumer groups were implemented in the clients using Zookeeper (Kafka 0.8). But since Kafka 0.8 (and 0.9 for consumer groups) there is no longer any need for clients to connect to Zookeeper, the metadata and consumer group functionality has moved to the Kafka broker. In fact, it is highly recommended not to make Zookeeper reachable from clients due to security aspects.

What's the meaning of rd in librdkafka?

librdkafka originally stems from librd, which was my contractor toolbox library of convenience functions, the rd stands for rapid development.

In hindsight librdkafka should have been named simply libkafka.