diff --git a/docker-compose.yml b/docker-compose.yml index fd9d07be..abe29df2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -18,3 +18,6 @@ kafka: KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092' KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_DEFAULT_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 diff --git a/e2e/producer-transaction.spec.js b/e2e/producer-transaction.spec.js new file mode 100644 index 00000000..6799fda5 --- /dev/null +++ b/e2e/producer-transaction.spec.js @@ -0,0 +1,188 @@ +/* + * node-rdkafka - Node.js wrapper for RdKafka C/C++ library + * + * Copyright (c) 2016 Blizzard Entertainment + * + * This software may be modified and distributed under the terms + * of the MIT license. See the LICENSE.txt file for details. + */ + +/*jshint esversion: 6 */ + +var Kafka = require('../'); +var t = require('assert'); + +var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092'; + +describe('Transactional Producer', function() { + var producer; + var consumer; + const topic = "test3"; + + describe('with consumer', function() { + beforeEach(function(done) { + producer = new Kafka.Producer({ + 'client.id': 'kafka-test-transactions', + 'metadata.broker.list': kafkaBrokerList, + 'dr_cb': true, + 'debug': 'all', + 'transactional.id': 'noderdkafka_transactions_test', + 'enable.idempotence': true + }); + + consumer = new Kafka.KafkaConsumer({ + 'metadata.broker.list': kafkaBrokerList, + 'group.id': 'node-rdkafka-consumer-flow-example', + 'enable.auto.commit': true, + 'isolation.level': 'read_committed' + }); + + producer.connect({}, function(err) { + t.ifError(err); + done(); + }); + }); + + afterEach(function(done) { + producer.disconnect(function() { + consumer.disconnect(function() { + done(); + }); + }); + }); + + it('should get 100% deliverability if transaction is committed', function(done) { + this.timeout(3000); + + var total = 0; + const max = 100; + const transactions_timeout_ms = 200; + + consumer.on('ready', function(arg) { + consumer.subscribe([topic]); + + //start consuming messages + consumer.consume(); + + producer.initTransactions(transactions_timeout_ms); + producer.beginTransaction(); + + setTimeout( function () { + for (total = 0; total < max; total++) { + producer.produce(topic, null, Buffer.from('message ' + total), null); + } + producer.commitTransaction(transactions_timeout_ms); + }, 1000); + + }); + + var counter = 0; + consumer.on('data', function(m) { + counter++; + + if (counter == max) { + done(); + } + }); + + consumer.on('event.error', function(err) { + console.error('Error from consumer'); + console.error(err); + }); + + consumer.connect(); + }); + + it('no message should be delivered if transaction is aborted', function(done) { + this.timeout(3000); + + var total = 0; + const max = 100; + var transactions_timeout_ms = 200; + + var tt = setInterval(function() { + producer.poll(); + }, 200); + + consumer.on('ready', function(arg) { + consumer.subscribe([topic]); + + //start consuming messages + consumer.consume(); + + producer.initTransactions(transactions_timeout_ms); + producer.beginTransaction(); + + setTimeout( function () { + for (total = 0; total < max; total++) { + producer.produce(topic, null, Buffer.from('message ' + total), null); + } + producer.abortTransaction(transactions_timeout_ms) + }, 1000) + }); + + var received = 0; + consumer.on('data', function(m) { + received++; + }); + + var delivery_reports = 0; + producer.on('delivery-report', function(err, report) { + delivery_reports++; + t.notStrictEqual(report, undefined); + t.notStrictEqual(err, undefined); + if (delivery_reports == max) { + clearInterval(tt); + t.strictEqual(0, received) + done(); + } + }); + + consumer.on('event.error', function(err) { + console.error('Error from consumer'); + console.error(err); + }); + + consumer.connect(); + }); + }) + + describe('without consumer', function() { + beforeEach(function(done) { + producer = new Kafka.Producer({ + 'client.id': 'kafka-test', + 'metadata.broker.list': kafkaBrokerList, + 'dr_cb': true, + 'debug': 'all', + 'transactional.id': 'noderdkafka_transactions_test', + 'enable.idempotence': true + }); + + producer.connect({}, function(err) { + t.ifError(err); + done(); + }); + }); + + afterEach(function(done) { + producer.disconnect(function() { + done(); + }); + }); + + it('should throw exception if Init not called', function(done) { + + t.throws( () => {producer.beginTransaction()}, + { + isFatal: false, + isRetriable: false, + isTxnRequiresAbort: false + }); + + done(); + }) + }) + +}); + + diff --git a/e2e/send_offset_transaction.spec.js b/e2e/send_offset_transaction.spec.js new file mode 100644 index 00000000..d7fab8bc --- /dev/null +++ b/e2e/send_offset_transaction.spec.js @@ -0,0 +1,89 @@ +/* + * node-rdkafka - Node.js wrapper for RdKafka C/C++ library + * + * Copyright (c) 2016 Blizzard Entertainment + * + * This software may be modified and distributed under the terms + * of the MIT license. See the LICENSE.txt file for details. + */ + +var Kafka = require('../'); +var t = require('assert'); + +var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092'; + +describe('Send offsets to transaction', function() { + var producer; + var consumer; + + describe('with dr_cb', function() { + beforeEach(function(done) { + producer = new Kafka.Producer({ + 'client.id': 'kafka-test', + 'metadata.broker.list': kafkaBrokerList, + 'dr_cb': true, + 'debug': 'all', + 'transactional.id': 'noderdkafka_transactions_send_offset', + 'enable.idempotence': true + }); + + consumer = new Kafka.KafkaConsumer({ + 'metadata.broker.list': kafkaBrokerList, + 'group.id': 'node-rdkafka-consumer-send-offset', + 'enable.auto.commit': false, + 'isolation.level': 'read_committed' + }); + + producer.connect({}, function(err) { + t.ifError(err); + done(); + }); + }); + + afterEach(function(done) { + producer.disconnect(function() { + consumer.disconnect(function() { + done(); + }); + }); + }); + + it('consumer offsets should get committed by sending to transaction', function(done) { + this.timeout(3000); + + let transactions_timeout_ms = 200 + let topic = "test2" + test_offset = [ { offset: 1000000, partition: 0, topic: 'test2' } ] + + consumer.on('ready', function(arg) { + consumer.subscribe([topic]); + + consumer.consume(); + + producer.initTransactions(transactions_timeout_ms); + producer.beginTransaction(); + + setTimeout(function() { + producer.produce(topic, null, Buffer.from('test message'), null) + producer.sendOffsetsToTransaction(test_offset, consumer, transactions_timeout_ms) + producer.commitTransaction(transactions_timeout_ms) + }, 1000); + }); + + consumer.once('data', function(m) { + position = consumer.position() + consumer.committed(null, transactions_timeout_ms, function (err, committed) { + // Assert that what the consumer sees as committed offsets matches whats added to the transaction + t.deepStrictEqual(test_offset, committed) + done() + }) + }); + + consumer.on('event.error', function(err) { + console.error(err); + }); + + consumer.connect(); + }); + }); +}); diff --git a/index.d.ts b/index.d.ts index 1444b472..db6f708e 100644 --- a/index.d.ts +++ b/index.d.ts @@ -147,7 +147,7 @@ type EventListenerMap = { 'disconnected': (metrics: ClientMetrics) => void, 'ready': (info: ReadyInfo, metadata: Metadata) => void, 'connection.failure': (error: LibrdKafkaError, metrics: ClientMetrics) => void, - // event messages + // event messages 'event.error': (error: LibrdKafkaError) => void, 'event.stats': (eventData: any) => void, 'event.log': (eventData: any) => void, @@ -262,6 +262,12 @@ export class Producer extends Client { setPollInterval(interval: number): this; static createWriteStream(conf: ProducerGlobalConfig, topicConf: ProducerTopicConfig, streamOptions: WriteStreamOptions): ProducerStream; + + initTransactions(timeout?: NumberNullUndefined): void; + beginTransaction(): void; + commitTransaction(timeout?: NumberNullUndefined): void; + abortTransaction(timeout?: NumberNullUndefined): void; + sendOffsetsToTransaction(offsets: TopicPartitionOffset[], consumer: KafkaConsumer, timeout?: NumberNullUndefined): void; } export class HighLevelProducer extends Producer { diff --git a/lib/client.js b/lib/client.js index 9cbd3f9a..01355df7 100644 --- a/lib/client.js +++ b/lib/client.js @@ -485,6 +485,34 @@ Client.prototype._errorWrap = function(errorCode, intIsError) { return returnValue; }; +/** + * Wrap a potential RdKafka transaction error. + * + * This internal method is meant to take the return value + * from a transaction function, throw if it + * is an error (Making it a proper librdkafka error object), or + * return the appropriate value otherwise. + * + * It is intended to be used in a return statement, + * + * @private + * @param {number} errorCode - Error code returned from a native method + * @return {boolean} - Returns true or the method return value unless it throws. + */ + Client.prototype._errorWrapTxn = function(errorObject) { + if (errorObject.code !== LibrdKafkaError.codes.ERR_NO_ERROR) { + var e = LibrdKafkaError.create(errorObject); + + e.isFatal = errorObject.isFatal; + e.isRetriable = errorObject.isRetriable; + e.isTxnRequiresAbort = errorObject.isTxnRequiresAbort; + throw e; + } + else { + return true; + } +}; + /** * This callback is used to pass metadata or an error after a successful * connection diff --git a/lib/producer.js b/lib/producer.js index 14d1183a..07606fbf 100644 --- a/lib/producer.js +++ b/lib/producer.js @@ -289,3 +289,73 @@ Producer.prototype.disconnect = function(timeout, cb) { self._disconnect(cb); }); }; + +/** + * Init a transaction. + * + * Initialize transactions, this is only performed once per transactional producer. + * + * @param {number} timeout - Number of milliseconds to try to initialize before giving up. + * defaults to 5 seconds. + * @return {Producer} - returns itself. + */ + Producer.prototype.initTransactions = function(timeout) { + if (timeout === undefined || timeout === null) { + timeout = 5000; + } + + this._errorWrapTxn(this._client.initTransactions(timeout)); +}; + +/** + * Begin a transaction. + * + * 'initTransaction' must have been called successfully (once) before this function is called. + * + * @return {Producer} - returns itself. + */ + Producer.prototype.beginTransaction = function() { + this._errorWrapTxn(this._client.beginTransaction()); +}; + +/** + * Commit the current transaction (as started with 'beginTransaction'). + * + * @param {number} timeout - Number of milliseconds to try to commit before giving up, defaults to 5 seconds. + * @return {Producer} - returns itself. + */ + Producer.prototype.commitTransaction = function(timeout) { + if (timeout === undefined || timeout === null) { + timeout = 5000; + } + + this._errorWrapTxn(this._client.commitTransaction(timeout)); +}; + +/** + * Aborts the ongoing transaction. + * + * @param {number} timeout - Number of milliseconds to try to abort, defaults to 5 seconds. + * @return {Producer} - returns itself. + */ + Producer.prototype.abortTransaction = function(timeout) { + if (timeout === undefined || timeout === null) { + timeout = 5000; + } + + this._errorWrapTxn(this._client.abortTransaction(timeout)); +}; + +/** + * Send the current offsets of the consumer to the ongoing transaction. + * + * @param {Consumer} consumer - An instance of the consumer + * @param {number} timeout - Number of milliseconds to try to send offsets + * @return {Producer} - returns itself. + */ + Producer.prototype.sendOffsetsToTransaction = function(offsets, consumer, timeout) { + if (timeout === undefined || timeout === null) { + timeout = 5000; + } + this._errorWrapTxn(this._client.sendOffsetsToTransaction(offsets, consumer.getClient(), timeout)); +}; diff --git a/src/errors.cc b/src/errors.cc index 10ab3b19..220773fc 100644 --- a/src/errors.cc +++ b/src/errors.cc @@ -31,6 +31,20 @@ v8::Local RdKafkaError(const RdKafka::ErrorCode &err) { return RdKafkaError(err, RdKafka::err2str(err)); } +v8::Local RdKafkaError(const RdKafka::ErrorCode &err, std::string errstr, + bool isFatal, bool isRetriable, bool isTxnRequiresAbort) { + v8::Local ret = RdKafkaError(err, errstr); + + Nan::Set(ret, Nan::New("isFatal").ToLocalChecked(), + Nan::New(isFatal)); + Nan::Set(ret, Nan::New("isRetriable").ToLocalChecked(), + Nan::New(isRetriable)); + Nan::Set(ret, Nan::New("isTxnRequiresAbort").ToLocalChecked(), + Nan::New(isTxnRequiresAbort)); + + return ret; +} + Baton::Baton(const RdKafka::ErrorCode &code) { m_err = code; } @@ -45,6 +59,16 @@ Baton::Baton(void* data) { m_data = data; } +Baton::Baton(const RdKafka::ErrorCode &code, std::string errstr, bool isFatal, + bool isRetriable, bool isTxnRequiresAbort) { + m_err = code; + m_errstr = errstr; + m_isFatal = isFatal; + m_isRetriable = isRetriable; + m_isTxnRequiresAbort = isTxnRequiresAbort; +} + + v8::Local Baton::ToObject() { if (m_errstr.empty()) { return RdKafkaError(m_err); @@ -53,6 +77,10 @@ v8::Local Baton::ToObject() { } } +v8::Local Baton::ToTxnObject() { + return RdKafkaError(m_err, m_errstr, m_isFatal, m_isRetriable, m_isTxnRequiresAbort); +} + RdKafka::ErrorCode Baton::err() { return m_err; } diff --git a/src/errors.h b/src/errors.h index d03585c5..bec0beb6 100644 --- a/src/errors.h +++ b/src/errors.h @@ -25,6 +25,8 @@ class Baton { explicit Baton(const RdKafka::ErrorCode &); explicit Baton(void* data); explicit Baton(const RdKafka::ErrorCode &, std::string); + explicit Baton(const RdKafka::ErrorCode &, std::string, bool isFatal, + bool isRetriable, bool isTxnRequiresAbort); template T data() { return static_cast(m_data); @@ -34,11 +36,15 @@ class Baton { std::string errstr(); v8::Local ToObject(); + v8::Local ToTxnObject(); private: void* m_data; std::string m_errstr; RdKafka::ErrorCode m_err; + bool m_isFatal; + bool m_isRetriable; + bool m_isTxnRequiresAbort; }; v8::Local RdKafkaError(const RdKafka::ErrorCode &); diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index c0b95162..29911403 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -423,7 +423,7 @@ Baton KafkaConsumer::Consume(int timeout_ms) { RdKafka::Message * message = consumer->consume(timeout_ms); RdKafka::ErrorCode response_code = message->err(); // we want to handle these errors at the call site - if (response_code != RdKafka::ERR_NO_ERROR && + if (response_code != RdKafka::ERR_NO_ERROR && response_code != RdKafka::ERR__PARTITION_EOF && response_code != RdKafka::ERR__TIMED_OUT && response_code != RdKafka::ERR__TIMED_OUT_QUEUE diff --git a/src/kafka-consumer.h b/src/kafka-consumer.h index a6fe4674..57b92904 100644 --- a/src/kafka-consumer.h +++ b/src/kafka-consumer.h @@ -35,6 +35,7 @@ namespace NodeKafka { */ class KafkaConsumer : public Connection { + friend class Producer; public: static void Init(v8::Local); static v8::Local NewInstance(v8::Local); diff --git a/src/producer.cc b/src/producer.cc index 896e8572..415491bc 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -11,6 +11,7 @@ #include #include "src/producer.h" +#include "src/kafka-consumer.h" #include "src/workers.h" namespace NodeKafka { @@ -78,6 +79,16 @@ void Producer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "flush", NodeFlush); + /* + * @brief Methods exposed to do with transactions + */ + + Nan::SetPrototypeMethod(tpl, "initTransactions", NodeInitTransactions); + Nan::SetPrototypeMethod(tpl, "beginTransaction", NodeBeginTransaction); + Nan::SetPrototypeMethod(tpl, "commitTransaction", NodeCommitTransaction); + Nan::SetPrototypeMethod(tpl, "abortTransaction", NodeAbortTransaction); + Nan::SetPrototypeMethod(tpl, "sendOffsetsToTransaction", NodeSendOffsetsToTransaction); + // connect. disconnect. resume. pause. get meta data constructor.Reset((tpl->GetFunction(Nan::GetCurrentContext())) .ToLocalChecked()); @@ -339,6 +350,79 @@ void Producer::ConfigureCallback(const std::string &string_key, const v8::Local< } } +Baton rdkafkaErrorToBaton(RdKafka::Error* error) { + if ( NULL == error) { + return Baton(RdKafka::ERR_NO_ERROR); + } + else { + Baton result(error->code(), error->str(), error->is_fatal(), + error->is_retriable(), error->txn_requires_abort()); + delete error; + return result; + } +} + +Baton Producer::InitTransactions(int32_t timeout_ms) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + RdKafka::Producer* producer = dynamic_cast(m_client); + RdKafka::Error* error = producer->init_transactions(timeout_ms); + + return rdkafkaErrorToBaton( error); +} + +Baton Producer::BeginTransaction() { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + RdKafka::Producer* producer = dynamic_cast(m_client); + RdKafka::Error* error = producer->begin_transaction(); + + return rdkafkaErrorToBaton( error); +} + +Baton Producer::CommitTransaction(int32_t timeout_ms) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + RdKafka::Producer* producer = dynamic_cast(m_client); + RdKafka::Error* error = producer->commit_transaction(timeout_ms); + + return rdkafkaErrorToBaton( error); +} + +Baton Producer::AbortTransaction(int32_t timeout_ms) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + RdKafka::Producer* producer = dynamic_cast(m_client); + RdKafka::Error* error = producer->abort_transaction(timeout_ms); + + return rdkafkaErrorToBaton( error); +} + +Baton Producer::SendOffsetsToTransaction( + std::vector &offsets, + NodeKafka::KafkaConsumer* consumer, + int timeout_ms) { + if (!IsConnected()) { + return Baton(RdKafka::ERR__STATE); + } + + RdKafka::ConsumerGroupMetadata* group_metadata = dynamic_cast(consumer->m_client)->groupMetadata(); + + RdKafka::Producer* producer = dynamic_cast(m_client); + RdKafka::Error* error = producer->send_offsets_to_transaction(offsets, group_metadata, timeout_ms); + delete group_metadata; + + return rdkafkaErrorToBaton( error); +} + /* Node exposed methods */ /** @@ -499,7 +583,7 @@ NAN_METHOD(Producer::NodeProduce) { RdKafka::Headers::Header(key, value.c_str(), value.size())); } } -} + } Producer* producer = ObjectWrap::Unwrap(info.This()); @@ -653,4 +737,104 @@ NAN_METHOD(Producer::NodeDisconnect) { info.GetReturnValue().Set(Nan::Null()); } +NAN_METHOD(Producer::NodeInitTransactions) { + Nan::HandleScope scope; + + if (info.Length() != 1) { + return Nan::ThrowError("Need to specify a timeout for transaction initialization"); + } + + int timeout_ms = Nan::To(info[0]).FromJust(); + + Producer* producer = ObjectWrap::Unwrap(info.This()); + + if (!producer->IsConnected()) { + Nan::ThrowError("Producer is disconnected"); + } + + Baton result = producer->InitTransactions(timeout_ms); + info.GetReturnValue().Set(result.ToTxnObject()); +} + +NAN_METHOD(Producer::NodeBeginTransaction) { + Nan::HandleScope scope; + + Producer* producer = ObjectWrap::Unwrap(info.This()); + + if (!producer->IsConnected()) { + Nan::ThrowError("Producer is disconnected"); + } + + Baton result = producer->BeginTransaction(); + info.GetReturnValue().Set(result.ToTxnObject()); +} + +NAN_METHOD(Producer::NodeCommitTransaction) { + Nan::HandleScope scope; + + if (info.Length() != 1) { + return Nan::ThrowError("Need to specify a timeout for commit transaction"); + } + + int timeout_ms = Nan::To(info[0]).FromJust(); + + Producer* producer = ObjectWrap::Unwrap(info.This()); + + if (!producer->IsConnected()) { + Nan::ThrowError("Producer is disconnected"); + } + + Baton result = producer->CommitTransaction(timeout_ms); + info.GetReturnValue().Set(result.ToTxnObject()); +} + +NAN_METHOD(Producer::NodeAbortTransaction) { + Nan::HandleScope scope; + + if (info.Length() != 1) { + return Nan::ThrowError("Need to specify a timeout for abort transaction"); + } + + int timeout_ms = Nan::To(info[0]).FromJust(); + + Producer* producer = ObjectWrap::Unwrap(info.This()); + + if (!producer->IsConnected()) { + Nan::ThrowError("Producer is disconnected"); + } + + Baton result = producer->AbortTransaction(timeout_ms); + info.GetReturnValue().Set(result.ToTxnObject()); +} + +NAN_METHOD(Producer::NodeSendOffsetsToTransaction) { + Nan::HandleScope scope; + + if (info.Length() != 3) { + return Nan::ThrowError("Need to specify offsets, consumer and timeout for 'send offsets to transaction'"); + } + if (!info[0]->IsObject()) { + return Nan::ThrowError("First argument to 'send offsets to transaction' has to be a consumer object"); + } + if (info[0]->IsNull() || info[0]->IsUndefined()) { + Nan::ThrowError("Topic partitions was not provided"); + } + if (!info[0]->IsArray()) { + Nan::ThrowError("Topic partitions must be an array"); + } + + std::vector toppars = Conversion::TopicPartition::FromV8Array(info[0].As()); + NodeKafka::KafkaConsumer* consumer = ObjectWrap::Unwrap(info[1].As()); + int timeout_ms = Nan::To(info[2]).FromJust(); + + Producer* producer = ObjectWrap::Unwrap(info.This()); + + if (!producer->IsConnected()) { + Nan::ThrowError("Producer is disconnected"); + } + + Baton result = producer->SendOffsetsToTransaction(toppars, consumer, timeout_ms); + info.GetReturnValue().Set(result.ToTxnObject()); +} + } // namespace NodeKafka diff --git a/src/producer.h b/src/producer.h index a30963d3..5674aa41 100644 --- a/src/producer.h +++ b/src/producer.h @@ -82,6 +82,16 @@ class Producer : public Connection { void ConfigureCallback(const std::string &string_key, const v8::Local &cb, bool add) override; + Baton InitTransactions(int32_t timeout_ms); + Baton BeginTransaction(); + Baton CommitTransaction(int32_t timeout_ms); + Baton AbortTransaction(int32_t timeout_ms); + Baton SendOffsetsToTransaction( + std::vector &offsets, + NodeKafka::KafkaConsumer* consumer, + int timeout_ms + ); + protected: static Nan::Persistent constructor; static void New(const Nan::FunctionCallbackInfo&); @@ -98,6 +108,11 @@ class Producer : public Connection { #if RD_KAFKA_VERSION > 0x00090200 static NAN_METHOD(NodeFlush); #endif + static NAN_METHOD(NodeInitTransactions); + static NAN_METHOD(NodeBeginTransaction); + static NAN_METHOD(NodeCommitTransaction); + static NAN_METHOD(NodeAbortTransaction); + static NAN_METHOD(NodeSendOffsetsToTransaction); Callbacks::Delivery m_dr_cb; Callbacks::Partitioner m_partitioner_cb;