Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Create a Readable stream based consumer #371

Closed
tizzo opened this issue May 16, 2016 · 17 comments
Closed

Proposal: Create a Readable stream based consumer #371

tizzo opened this issue May 16, 2016 · 17 comments

Comments

@tizzo
Copy link

tizzo commented May 16, 2016

Two important features of Kafka are message durability and strict message ordering (within a topic partition). As others have noted, this module's consumers convert kafka's fetch based API into a firehose event emitter that loads every message from the topic as fast as it can without regard for processing. That means that new messages are fired independently of whether the previous message is finished processing and this problem is compounded by the fact that message decompression compromises the order the messages are delivered to event subscribers.

It is currently possible to monitor the number of messages received and to pause on the consumer to prevent a subsequent fetch, but this puts quite a burden on calling code to constantly manage the pause and resume in the message event callback (you can't fetch at all if the consumer is currently paused) and to do the bookkeeping to buffer messages and maintain their order (actually impossible if you're using compression). This means that any system wanting to process messages in order, especially in circumstances where the destination is much slower than the source, needs to implement some kind of queuing/buffering and to do a lot of explicit pausing/un-pausing.

Node has a standard paradigm for processing I/O in sequence - The Almighty Stream API. The beauty of the stream API is that it is designed for strict ordering and buffering of fast sources and slow destinations. This makes it a perfect fit for the Kafka protocol. An added bonus of this approach is that it makes this module work more consistently with the core node API and it provides a simple abstraction that handles all of the nasty details of a sort-order-enforcing queue and provides a clear answer on how to maintain order as messages are decompressed (pipe to a transform stream when appropriate). Best of all, consumers who want a simple emitter can still bind to .on('data') (and we could easily provide some magic to emit .on('message') as well so consumers don't need to update if they don't want to).

Another solvable issue with this module's current consumer firehose paradigm is that it makes it difficult to commit only messages that were successfully and completely processed. The fetching logic updates the internal offset list before emitting the fire-and-forget message event. This means that it's difficult to know whether you had a failure in your event processing and it means that you're likely to commit messages you didn't actually successfully read and process. One issue is that the Consumer's connect method binds the update offsets method to the done event emitted by the Client upon successful fetch. That means that the commit functionality is essentially useless if you care about successfully processing events from a kafka queue in order and can't risk dropping messages. I'll admit you can manage this yourself if you keep track of the queue mentioned above and use the Client's sendOffsetCommitRequest method directly, but again, this means doing a fair bit of book keeping yourself and it feels out of sync with the guarantees and use cases that kafka is designed for. This is really annoying if you're using compressions as the you then also need to keep track of messages received out of order. The current implementation does a nice job for fire-and-forget pub-sub style use cases but much of Kafka differentiates itself based on the features beyond what you can do with redis pub-sub. This could also help to remove the need for the buggy maxTickMessages option.

I propose that we implement a Readable stream for the consumer classes. In the streams2+ paradigm, a readable stream emits the readable event when it is ready to start reading and the consumer calls read. At this point the readable stream should keep calling this.push() with new data until this.push() returns false - at that point the stream buffer is full and the stream should pause and wait for the read() method to be called again to perform the next fetch. If the Readable stream's highWaterMark is coordinated with the kafka client's fetchMaxBytes (which we can trivially do by default) this should allow us to easily ensure that messages are processed exactly in the correct order.

To address the problem with committing messages that have not been verifiably processed we could either include a callback in the message payload that will increment the offset for the topic in the consumer and/or implement a writable stream that would receive messages and update the offset in the client. The auto-commit options and behaviors would remain the same but would not assume that every message emitted was properly processed (or at least provide a mode that does not assume that).

So the proposed syntax might be something like this:

var commitStream = client.createCommitStream({
  // This would commit every 100 *successfully processed* messages.
  autoCommitMsgCount: 100,
  // This would commit the most recent *successfully processed* offset every second.
  autoCommitIntervalMs: 1000,
});
consumer.stream
  .pipe(through2.obj(function(message, enc, cb) {
    console.log(message.data);
    // Do some stuff with the data
    this.push(message);
    cb();
  })))
  .pipe(commitStream);

Alternatively, if we gave the message a commit callback it could look like this:

consumer.stream
  .pipe(through2.obj(function(message, enc, cb) {
    console.log(message.data);
    // Do some stuff with the data
    this.push(message);
    // We successfully processed this message.
    message.commit();
    cb();
  })))

I think the former commit model is probably a better one as the writable stream can easily maintain state and ensure that messages aren't skipped, emitting an error if one is.

@tb01923
Copy link
Contributor

tb01923 commented May 21, 2016

I am using this project as an under pinning of a lightweight (emphasis light) streaming framework (e.g. samza/storm/spark) for NodeJs, and would like to be able to work this way. My focus is really creating an expressive systems for the application engineers to utilize without them having to worry about the complexities underpinning that solution.

@sehod
Copy link

sehod commented May 21, 2016

💯 agree that a consumer implementation based on Readable is needed.

@estliberitas
Copy link
Contributor

cc @hyperlink - I'm not ready to work on this, but it sounds very interesting

@argh
Copy link

argh commented Jun 2, 2016

Couldn't agree more. I ran into the exact issues described by @tizzo and suffered the pain of implementing a secondary in-memory queue as well. The manual pause/resume is actually not only inefficient and an implementation burden, but in high traffic conditions you can't rely on your own counters for backpressure because the internal consumer might have already buffered a huge number of messages. A stream API is clearly the right approach.

@pcothenet
Copy link

pcothenet commented Jul 8, 2016

+1 for the commit method on the message.

I've been oscillating between this module and the much more lightweight (but not very well supported) https://github.com/confluentinc/kafka-rest-node. rest-node implements the read as a stream but lacks any option to wait until processed before committing.

@Foo42
Copy link

Foo42 commented Jul 14, 2016

I spiked out a stream based reader and began a commit stream about 6 months back for a project at work. It seemed to work under basic testing. As you suggested we use the return from push to pause the consumer etc. I've just returned to the code as we have an interest in productionising it. If it all works out (with the commit stream etc) I'll feedback, maybe see if I can get agreement to publish our implementation.

@pcothenet
Copy link

Where do we stand on that one? Has anyone started giving it a shot?

@Foo42
Copy link

Foo42 commented Aug 5, 2016

I've got some code I've spiked out with both a readable kafka topic stream and a writeable kafka commit stream. It seems to work, with back pressure / demand pausing and resuming the kafka consumer. I'll try and get it cleaned up a bit and open a PR. It can be a starting point for discussion anyway

@gbarton
Copy link

gbarton commented Sep 1, 2016

Any updates on this one? Having the commit on the message and guarantee of order per partition is exactly how I am trying to use kafka.

@tizzo
Copy link
Author

tizzo commented Sep 1, 2016

I'm currently working on a PR - hoping to have some code to show in the
next day or two.

On Thu, Sep 1, 2016 at 9:24 AM, gbarton notifications@github.com wrote:

Any updates on this one? Having the commit on the message and guarantee of
order per partition is exactly how I am trying to use kafka.


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#371 (comment),
or mute the thread
https://github.com/notifications/unsubscribe-auth/AAO_4JfvoO5L4xdmb7N4CKgOqd2yHp0vks5qltIngaJpZM4IfpkJ
.

@jwthomp
Copy link

jwthomp commented Nov 11, 2016

Has this gone anywhere?

@itamarwe
Copy link

itamarwe commented Dec 1, 2016

+1

@barrotsteindev
Copy link

I'll be more than happy to try and give it a shot.

@tinder-dyakobian
Copy link

@lliss
Copy link

lliss commented Jun 29, 2017

This is implemented in #695

@hyperlink
Copy link
Collaborator

Published to npm as v2.1.0

@tizzo
Copy link
Author

tizzo commented Aug 16, 2017

YEEEEESSSS

Thanks everyone for pushing this over the finish line and making my dream a reality.

van damme dance

motivational dance

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests