-
Notifications
You must be signed in to change notification settings - Fork 874
Consumer
Kafka is quite different from traditional messaging systems such as RabbitMQ!
If you're new to Kafka, check out the following resources for more information about how consumption works in Kafka:
- The original blog post for the modern Java consumer - the .net consumer works similarly (though not exactly).
- A more recent blog post covering the consumer in detail
- Confluent docs
-
Consumer typically work as part of a group. At any given time, one and only one consumer in a group will be assigned to read from each partition of a subscribed to topic (assuming the group is not currently rebalancing). You use the
Subscribe
method to join a group. -
You aren't required to use consumer groups. You can directly assign to specific partitions using the
Assign
method (and never call theSubscribe
method). -
You must specify a
GroupId
in your configuration. You must do this even if you don't utilize any group functionality. We will fix this at some point. -
It's fine to call
Consume()
with a timeout of 0 and this is a non blocking operation - you'll get a message if one is available,null
if not. -
The
AutoOffsetReset
property dictates the behavior when a committed offset is not available for a partition or is invalid. If your consumer is not getting messages, try setting this toAutoOffsetReset.Earliest
and see if that helps (common misunderstanding!).
The Consumer API sits at a much high level of abstraction than the Kafka protocol, which is used to communicate with the cluster.
When you call Consume
, you are pulling messages from an local in-memory queue - you are not directly sending requests to the cluster. Behind the scenes, the client orchestrates connecting to the required brokers, automatically correcting in the case of leader changes etc.
Although your application consumes messages one by one, messages are pulled by the client from brokers in batches for efficiency. By default, caching of messages on the client is very aggressive (optimized for high throughput). You may want to reduce this in scenarios. The configuration property you want is: QueuedMaxMessagesKbytes
.
There are two types of error in Kafka - retryable and non-retryable. Generally, the client will automatically recover from any re-tryable error without the application developer needing to do anything or even being informed.
Generally errors exposed via the error callback or in the log represent re-tryable errors and are for informational purposes only.
However, if an error has the IsFatal
flag marked as true
(should generally never happen), the consumer is in an unrecoverable state and must be re-created.
The term 'Assign' is used for more than one purpose unfortunately. There are two related concepts:
- The assignment given to a particular consumer by the consumer group.
- The partitions assigned to be read from by the consumer.
With the Java client, these are always the same. The .NET Client is more flexible - it allows you to override the assignment given to you by the group to be whatever you want. This is an advanced, uncommonly used feature. It might be useful for example to add a control message partition to be read by every consumer in addition to the assigned partitions.
The docs for the .NET client on the confluent website go into a fair bit of information about how to commit offsets:
https://docs.confluent.io/current/clients/dotnet.html#auto-offset-commit
TLDR: you should probably be using StoreOffsets
, set EnableAutoOffsetStore
to false
and EnableAutoCommit
to true
(the default)
Question: I want to synchronously commit offsets after each consumed message. It's very slow. How do I make it fast?
librdkafka uses the same broker connection both for commits and fetching messages, thus a commit may be backed up behind a long-poll blocking Fetch request. The long-poll behavior of fetch requests is configured with the fetch.wait.max.ms
property that defaults to 100ms. You can decrease that value to decrease offset commit latency at the expense of a larger number of Fetch requests (depending on your traffic pattern). Also see https://github.com/edenhill/librdkafka/issues/1787
todo: is this information still current?
It's possible to write your consume loop such that your application is guaranteed to process each message at-most once, or at-least once. It's also possible to write code that provides neither guarantee - which is what you get by default! you probably want to change this.
When v1.4 is released, you'll also be able to write stream processing applications (kafka -> kafka) with exactly-once semantics.
By default, EnableAutoCommit
and EnableAutoOffsetStore
are both set to true.
With auto commit enabled, the consumer automatically commits offsets to Kafka periodically in a background thread if they have been marked as ready to store. With auto store offset enabled, offsets are marked ready to store immediately prior to a message being delivered to the application via Consume
.
With this setup:
-
your application may fail to process a message if it crashes between when the
Consume
method stores the message offset as ready for commit, and when your application has finished processing the message. This will occur if additionally before the application crashes, a commit protocol request is successfully sent to the cluster by the consumer in the background. -
your application may process messages more than once. This may occur because offsets are only committed periodically to the cluster - it's possible for your application to process a number of messages and crash before the corresponding background auto commit occurs.
For more information, refer to the Confluent docs.
The consumer automatically sends 'heartbeat' messages to the group controller every HeartbeatIntervalMs
in a background thread. If the controller does not receive a heartbeat message within SessionTimeoutMs
it assumes the Consumer
has died, and kicks it out of the consumer group. If the consumer has not in fact died, it will automatically ask to rejoin the group when connectivity is re-established.
MaxPollIntervalMs
protects against the situation where the application processing logic may have become unresponsive, but the consumer's background thread is still diligently sending heartbeats to the group coordinator, so the consumer remains in the group. You must call Consume
with a period not greater than MaxPollIntervalMs
otherwise or the consumer will be kicked out of the group.
We haven't implemented these yet (and we want to take the time to do so properly). We do want to add them because they allow you to write more idiomatic C# code.
If you are trying to set up a HostedService
, instead consider setting up a dedicated background thread (tied to app lifetime) and do a standard sync consume loop in that. this is completely fine - just not idiomatic C# (everything is async these days). this approach will actually be measurably more performant than an async approach because that comes with a fair bit of overhead (compared to the # msgs / s you can get out of the kafka consumer!)
Alternatively you could fake an async consume method using await Task.Run(() => cosumer.Consumer(timeout))
. That has more overhead than approach #1, but will allow you to use the standard hosted service pattern (you'll still get 100's of thousands of messages a second out of it).