-
Notifications
You must be signed in to change notification settings - Fork 628
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposal: Create a Readable stream based consumer #371
Comments
I am using this project as an under pinning of a lightweight (emphasis light) streaming framework (e.g. samza/storm/spark) for NodeJs, and would like to be able to work this way. My focus is really creating an expressive systems for the application engineers to utilize without them having to worry about the complexities underpinning that solution. |
💯 agree that a consumer implementation based on Readable is needed. |
cc @hyperlink - I'm not ready to work on this, but it sounds very interesting |
Couldn't agree more. I ran into the exact issues described by @tizzo and suffered the pain of implementing a secondary in-memory queue as well. The manual pause/resume is actually not only inefficient and an implementation burden, but in high traffic conditions you can't rely on your own counters for backpressure because the internal consumer might have already buffered a huge number of messages. A stream API is clearly the right approach. |
+1 for the commit method on the message. I've been oscillating between this module and the much more lightweight (but not very well supported) https://github.com/confluentinc/kafka-rest-node. rest-node implements the read as a stream but lacks any option to wait until processed before committing. |
I spiked out a stream based reader and began a commit stream about 6 months back for a project at work. It seemed to work under basic testing. As you suggested we use the return from push to pause the consumer etc. I've just returned to the code as we have an interest in productionising it. If it all works out (with the commit stream etc) I'll feedback, maybe see if I can get agreement to publish our implementation. |
Where do we stand on that one? Has anyone started giving it a shot? |
I've got some code I've spiked out with both a readable kafka topic stream and a writeable kafka commit stream. It seems to work, with back pressure / demand pausing and resuming the kafka consumer. I'll try and get it cleaned up a bit and open a PR. It can be a starting point for discussion anyway |
Any updates on this one? Having the commit on the message and guarantee of order per partition is exactly how I am trying to use kafka. |
I'm currently working on a PR - hoping to have some code to show in the On Thu, Sep 1, 2016 at 9:24 AM, gbarton notifications@github.com wrote:
|
Has this gone anywhere? |
+1 |
I'll be more than happy to try and give it a shot. |
This is how I do it https://gist.github.com/tinder-dyakobian/6d06095acac7f7e22255b526cc2d3dd3 |
This is implemented in #695 |
Published to npm as v2.1.0 |
Two important features of Kafka are message durability and strict message ordering (within a topic partition). As others have noted, this module's consumers convert kafka's fetch based API into a firehose event emitter that loads every message from the topic as fast as it can without regard for processing. That means that new messages are fired independently of whether the previous message is finished processing and this problem is compounded by the fact that message decompression compromises the order the messages are delivered to event subscribers.
It is currently possible to monitor the number of messages received and to pause on the consumer to prevent a subsequent fetch, but this puts quite a burden on calling code to constantly manage the
pause
andresume
in themessage
event callback (you can't fetch at all if the consumer is currently paused) and to do the bookkeeping to buffer messages and maintain their order (actually impossible if you're using compression). This means that any system wanting to process messages in order, especially in circumstances where the destination is much slower than the source, needs to implement some kind of queuing/buffering and to do a lot of explicit pausing/un-pausing.Node has a standard paradigm for processing I/O in sequence - The Almighty Stream API. The beauty of the stream API is that it is designed for strict ordering and buffering of fast sources and slow destinations. This makes it a perfect fit for the Kafka protocol. An added bonus of this approach is that it makes this module work more consistently with the core node API and it provides a simple abstraction that handles all of the nasty details of a sort-order-enforcing queue and provides a clear answer on how to maintain order as messages are decompressed (pipe to a transform stream when appropriate). Best of all, consumers who want a simple emitter can still bind to
.on('data')
(and we could easily provide some magic to emit.on('message')
as well so consumers don't need to update if they don't want to).Another solvable issue with this module's current consumer firehose paradigm is that it makes it difficult to commit only messages that were successfully and completely processed. The fetching logic updates the internal offset list before emitting the fire-and-forget
message
event. This means that it's difficult to know whether you had a failure in your event processing and it means that you're likely to commit messages you didn't actually successfully read and process. One issue is that theConsumer
's connect method binds the update offsets method to the done event emitted by theClient
upon successful fetch. That means that the commit functionality is essentially useless if you care about successfully processing events from a kafka queue in order and can't risk dropping messages. I'll admit you can manage this yourself if you keep track of the queue mentioned above and use theClient
'ssendOffsetCommitRequest
method directly, but again, this means doing a fair bit of book keeping yourself and it feels out of sync with the guarantees and use cases that kafka is designed for. This is really annoying if you're using compressions as the you then also need to keep track of messages received out of order. The current implementation does a nice job for fire-and-forget pub-sub style use cases but much of Kafka differentiates itself based on the features beyond what you can do with redis pub-sub. This could also help to remove the need for the buggy maxTickMessages option.I propose that we implement a Readable stream for the consumer classes. In the streams2+ paradigm, a readable stream emits the
readable
event when it is ready to start reading and the consumer calls read. At this point the readable stream should keep callingthis.push()
with new data untilthis.push()
returns false - at that point the stream buffer is full and the stream should pause and wait for theread()
method to be called again to perform the next fetch. If the Readable stream'shighWaterMark
is coordinated with the kafka client'sfetchMaxBytes
(which we can trivially do by default) this should allow us to easily ensure that messages are processed exactly in the correct order.To address the problem with committing messages that have not been verifiably processed we could either include a callback in the message payload that will increment the offset for the topic in the consumer and/or implement a writable stream that would receive messages and update the offset in the client. The auto-commit options and behaviors would remain the same but would not assume that every message emitted was properly processed (or at least provide a mode that does not assume that).
So the proposed syntax might be something like this:
Alternatively, if we gave the message a commit callback it could look like this:
I think the former commit model is probably a better one as the writable stream can easily maintain state and ensure that messages aren't skipped, emitting an error if one is.
The text was updated successfully, but these errors were encountered: