Skip to content

Commit

Permalink
Support committing an array of topic partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
Stephen Parente authored and webmakersteve committed Feb 12, 2018
1 parent 5780247 commit cfc237b
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 53 deletions.
14 changes: 13 additions & 1 deletion e2e/consumer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ describe('Consumer', function() {
'bootstrap.servers': kafkaBrokerList,
'group.id': grp,
'debug': 'all',
'rebalance_cb': true
'rebalance_cb': true,
'enable.auto.commit': false
};
});

Expand All @@ -42,6 +43,16 @@ describe('Consumer', function() {
eventListener(consumer);
});

it('should allow commit with an array', function(done) {
consumer.commit([{ topic: topic, partition: 0, offset: -1 }]);
done();
});

it('should allow commit without an array', function(done) {
consumer.commit({ topic: topic, partition: 0, offset: -1 });
done();
});

afterEach(function(done) {
consumer.disconnect(function() {
done();
Expand Down Expand Up @@ -111,6 +122,7 @@ describe('Consumer', function() {
done();
});
});

it('after assign, before consume, position should return an array without offsets', function(done) {
consumer.assign([{topic:topic, partition:0}]);
var position = consumer.position();
Expand Down
123 changes: 74 additions & 49 deletions src/kafka-consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -199,25 +199,30 @@ Baton KafkaConsumer::Unassign() {
return Baton(RdKafka::ERR_NO_ERROR);
}

Baton KafkaConsumer::Commit(std::string topic_name, int partition, int64_t offset) { // NOLINT
Baton KafkaConsumer::Commit(std::vector<RdKafka::TopicPartition*> toppars) {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE);
}

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);

RdKafka::TopicPartition* topic =
RdKafka::TopicPartition::create(topic_name, partition);
topic->set_offset(offset);
RdKafka::ErrorCode err = consumer->commitAsync(toppars);

// Need to put topic in a vector for it to work
std::vector<RdKafka::TopicPartition*> offsets = {topic};
return Baton(err);
}

RdKafka::ErrorCode err = consumer->commitAsync(offsets);
Baton KafkaConsumer::Commit(RdKafka::TopicPartition * toppar) {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE, "KafkaConsumer is not connected");
}

// We are done. Clean up our mess
delete topic;
RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);

// Need to put topic in a vector for it to work
std::vector<RdKafka::TopicPartition*> offsets = {toppar};
RdKafka::ErrorCode err = consumer->commitAsync(offsets);

return Baton(err);
}
Expand All @@ -237,25 +242,31 @@ Baton KafkaConsumer::Commit() {
}

// Synchronous commit events
Baton KafkaConsumer::CommitSync(std::string topic_name, int partition, int64_t offset) { // NOLINT
Baton KafkaConsumer::CommitSync(std::vector<RdKafka::TopicPartition*> toppars) {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE);
return Baton(RdKafka::ERR__STATE, "KafkaConsumer is not connected");
}

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);

RdKafka::TopicPartition* topic =
RdKafka::TopicPartition::create(topic_name, partition);
topic->set_offset(offset);
RdKafka::ErrorCode err = consumer->commitSync(toppars);
// RdKafka::TopicPartition::destroy(toppars);

// Need to put topic in a vector for it to work
std::vector<RdKafka::TopicPartition*> offsets = {topic};
return Baton(err);
}

RdKafka::ErrorCode err = consumer->commitSync(offsets);
Baton KafkaConsumer::CommitSync(RdKafka::TopicPartition * toppar) {
if (!IsConnected()) {
return Baton(RdKafka::ERR__STATE);
}

RdKafka::KafkaConsumer* consumer =
dynamic_cast<RdKafka::KafkaConsumer*>(m_client);

// We are done. Clean up our mess
delete topic;
// Need to put topic in a vector for it to work
std::vector<RdKafka::TopicPartition*> offsets = {toppar};
RdKafka::ErrorCode err = consumer->commitSync(offsets);

return Baton(err);
}
Expand Down Expand Up @@ -771,6 +782,7 @@ NAN_METHOD(KafkaConsumer::NodeUnsubscribe) {

NAN_METHOD(KafkaConsumer::NodeCommit) {
Nan::HandleScope scope;
int error_code;

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());

Expand All @@ -779,34 +791,41 @@ NAN_METHOD(KafkaConsumer::NodeCommit) {
return;
}

int error_code;
if (info[0]->IsNull() || info[0]->IsUndefined()) {
Baton b = consumer->Commit();
error_code = static_cast<int>(b.err());
} else if (info[0]->IsArray()) {
std::vector<RdKafka::TopicPartition *> toppars =
Conversion::TopicPartition::FromV8Array(info[0].As<v8::Array>());

Baton b = consumer->Commit(toppars);
error_code = static_cast<int>(b.err());

RdKafka::TopicPartition::destroy(toppars);
} else if (info[0]->IsObject()) {
RdKafka::TopicPartition * toppar =
Conversion::TopicPartition::FromV8Object(info[0].As<v8::Object>());

// If we are provided a message object
if (info.Length() >= 1 && !info[0]->IsNull() && !info[0]->IsUndefined()) {
if (!info[0]->IsObject()) {
Nan::ThrowError("Parameter, when provided, must be an object");
if (toppar == NULL) {
Nan::ThrowError("Invalid topic partition provided");
return;
}
v8::Local<v8::Object> params = info[0].As<v8::Object>();

// This one is a buffer
std::string topic_name = GetParameter<std::string>(params, "topic", "");
int partition = GetParameter<int>(params, "partition", 0);
int64_t offset = GetParameter<int64_t>(params, "offset", -1);

// Do it sync i guess
Baton b = consumer->Commit(topic_name, partition, offset);
Baton b = consumer->Commit(toppar);
error_code = static_cast<int>(b.err());

delete toppar;
} else {
Baton b = consumer->Commit();
error_code = static_cast<int>(b.err());
Nan::ThrowError("First parameter must be an object or an array");
return;
}

info.GetReturnValue().Set(Nan::New<v8::Number>(error_code));
}

NAN_METHOD(KafkaConsumer::NodeCommitSync) {
Nan::HandleScope scope;
int error_code;

KafkaConsumer* consumer = ObjectWrap::Unwrap<KafkaConsumer>(info.This());

Expand All @@ -815,27 +834,33 @@ NAN_METHOD(KafkaConsumer::NodeCommitSync) {
return;
}

int error_code;
if (info[0]->IsNull() || info[0]->IsUndefined()) {
Baton b = consumer->CommitSync();
error_code = static_cast<int>(b.err());
} else if (info[0]->IsArray()) {
std::vector<RdKafka::TopicPartition *> toppars =
Conversion::TopicPartition::FromV8Array(info[0].As<v8::Array>());

Baton b = consumer->CommitSync(toppars);
error_code = static_cast<int>(b.err());

RdKafka::TopicPartition::destroy(toppars);
} else if (info[0]->IsObject()) {
RdKafka::TopicPartition * toppar =
Conversion::TopicPartition::FromV8Object(info[0].As<v8::Object>());

// If we are provided a message object
if (info.Length() >= 1 && !info[0]->IsNull() && !info[0]->IsUndefined()) {
if (!info[0]->IsObject()) {
Nan::ThrowError("Parameter, when provided, must be an object");
if (toppar == NULL) {
Nan::ThrowError("Invalid topic partition provided");
return;
}
v8::Local<v8::Object> params = info[0].As<v8::Object>();

// This one is a buffer
std::string topic_name = GetParameter<std::string>(params, "topic", "");
int partition = GetParameter<int>(params, "partition", 0);
int64_t offset = GetParameter<int64_t>(params, "offset", -1);

// Do it sync i guess
Baton b = consumer->CommitSync(topic_name, partition, offset);
Baton b = consumer->CommitSync(toppar);
error_code = static_cast<int>(b.err());

delete toppar;
} else {
Baton b = consumer->CommitSync();
error_code = static_cast<int>(b.err());
Nan::ThrowError("First parameter must be an object or an array");
return;
}

info.GetReturnValue().Set(Nan::New<v8::Number>(error_code));
Expand Down
8 changes: 5 additions & 3 deletions src/kafka-consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,16 @@ class KafkaConsumer : public Connection {
Baton Resume(std::vector<RdKafka::TopicPartition*> &);

// Asynchronous commit events
Baton Commit(std::string, int, int64_t);
Baton Commit(std::vector<RdKafka::TopicPartition*>);
Baton Commit(RdKafka::TopicPartition*);
Baton Commit();
Baton OffsetsStore(std::vector<RdKafka::TopicPartition*> &);

Baton OffsetsStore(std::vector<RdKafka::TopicPartition*> &);
Baton GetWatermarkOffsets(std::string, int32_t, int64_t*, int64_t*);

// Synchronous commit events
Baton CommitSync(std::string, int, int64_t);
Baton CommitSync(std::vector<RdKafka::TopicPartition*>);
Baton CommitSync(RdKafka::TopicPartition*);
Baton CommitSync();

Baton Committed(std::vector<RdKafka::TopicPartition*> &, int timeout_ms);
Expand Down
20 changes: 20 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
var KafkaConsumer = require('./').KafkaConsumer;
var crypto = require('crypto');

var consumer = new KafkaConsumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'kafka-mocha-grp-' + crypto.randomBytes(20).toString('hex'),
'debug': 'all',
'rebalance_cb': true,
'enable.auto.commit': false
}, {});

var topic = 'test';

consumer.connect({ timeout: 2000 }, function(err, info) {
consumer.commit([{ topic: topic, partition: 0, offset: -1 }]);

consumer.disconnect(function() {

});
});

0 comments on commit cfc237b

Please sign in to comment.