Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support transactions #881

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ kafka:
KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092'
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
188 changes: 188 additions & 0 deletions e2e/producer-transaction.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

/*jshint esversion: 6 */

var Kafka = require('../');
var t = require('assert');

var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';

describe('Transactional Producer', function() {
var producer;
var consumer;
const topic = "test3";

describe('with consumer', function() {
beforeEach(function(done) {
producer = new Kafka.Producer({
'client.id': 'kafka-test-transactions',
'metadata.broker.list': kafkaBrokerList,
'dr_cb': true,
'debug': 'all',
'transactional.id': 'noderdkafka_transactions_test',
'enable.idempotence': true
});

consumer = new Kafka.KafkaConsumer({
'metadata.broker.list': kafkaBrokerList,
'group.id': 'node-rdkafka-consumer-flow-example',
'enable.auto.commit': true,
'isolation.level': 'read_committed'
});

producer.connect({}, function(err) {
t.ifError(err);
done();
});
});

afterEach(function(done) {
producer.disconnect(function() {
consumer.disconnect(function() {
done();
});
});
});

it('should get 100% deliverability if transaction is committed', function(done) {
this.timeout(3000);

var total = 0;
const max = 100;
const transactions_timeout_ms = 200;

consumer.on('ready', function(arg) {
consumer.subscribe([topic]);

//start consuming messages
consumer.consume();

producer.initTransactions(transactions_timeout_ms);
producer.beginTransaction();

setTimeout( function () {
for (total = 0; total < max; total++) {
producer.produce(topic, null, Buffer.from('message ' + total), null);
}
producer.commitTransaction(transactions_timeout_ms);
}, 1000);

});

var counter = 0;
consumer.on('data', function(m) {
counter++;

if (counter == max) {
done();
}
});

consumer.on('event.error', function(err) {
console.error('Error from consumer');
console.error(err);
});

consumer.connect();
});

it('no message should be delivered if transaction is aborted', function(done) {
this.timeout(3000);

var total = 0;
const max = 100;
var transactions_timeout_ms = 200;

var tt = setInterval(function() {
producer.poll();
}, 200);

consumer.on('ready', function(arg) {
consumer.subscribe([topic]);

//start consuming messages
consumer.consume();

producer.initTransactions(transactions_timeout_ms);
producer.beginTransaction();

setTimeout( function () {
for (total = 0; total < max; total++) {
producer.produce(topic, null, Buffer.from('message ' + total), null);
}
producer.abortTransaction(transactions_timeout_ms)
}, 1000)
});

var received = 0;
consumer.on('data', function(m) {
received++;
});

var delivery_reports = 0;
producer.on('delivery-report', function(err, report) {
delivery_reports++;
t.notStrictEqual(report, undefined);
t.notStrictEqual(err, undefined);
if (delivery_reports == max) {
clearInterval(tt);
t.strictEqual(0, received)
done();
}
});

consumer.on('event.error', function(err) {
console.error('Error from consumer');
console.error(err);
});

consumer.connect();
});
})

describe('without consumer', function() {
beforeEach(function(done) {
producer = new Kafka.Producer({
'client.id': 'kafka-test',
'metadata.broker.list': kafkaBrokerList,
'dr_cb': true,
'debug': 'all',
'transactional.id': 'noderdkafka_transactions_test',
'enable.idempotence': true
});

producer.connect({}, function(err) {
t.ifError(err);
done();
});
});

afterEach(function(done) {
producer.disconnect(function() {
done();
});
});

it('should throw exception if Init not called', function(done) {

t.throws( () => {producer.beginTransaction()},
{
isFatal: false,
isRetriable: false,
isTxnRequiresAbort: false
});

done();
})
})

});


89 changes: 89 additions & 0 deletions e2e/send_offset_transaction.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

var Kafka = require('../');
var t = require('assert');

var kafkaBrokerList = process.env.KAFKA_HOST || 'localhost:9092';

describe('Send offsets to transaction', function() {
var producer;
var consumer;

describe('with dr_cb', function() {
beforeEach(function(done) {
producer = new Kafka.Producer({
'client.id': 'kafka-test',
'metadata.broker.list': kafkaBrokerList,
'dr_cb': true,
'debug': 'all',
'transactional.id': 'noderdkafka_transactions_send_offset',
'enable.idempotence': true
});

consumer = new Kafka.KafkaConsumer({
'metadata.broker.list': kafkaBrokerList,
'group.id': 'node-rdkafka-consumer-send-offset',
'enable.auto.commit': false,
'isolation.level': 'read_committed'
});

producer.connect({}, function(err) {
t.ifError(err);
done();
});
});

afterEach(function(done) {
producer.disconnect(function() {
consumer.disconnect(function() {
done();
});
});
});

it('consumer offsets should get committed by sending to transaction', function(done) {
this.timeout(3000);

let transactions_timeout_ms = 200
let topic = "test2"
test_offset = [ { offset: 1000000, partition: 0, topic: 'test2' } ]

consumer.on('ready', function(arg) {
consumer.subscribe([topic]);

consumer.consume();

producer.initTransactions(transactions_timeout_ms);
producer.beginTransaction();

setTimeout(function() {
producer.produce(topic, null, Buffer.from('test message'), null)
producer.sendOffsetsToTransaction(test_offset, consumer, transactions_timeout_ms)
producer.commitTransaction(transactions_timeout_ms)
}, 1000);
});

consumer.once('data', function(m) {
position = consumer.position()
consumer.committed(null, transactions_timeout_ms, function (err, committed) {
// Assert that what the consumer sees as committed offsets matches whats added to the transaction
t.deepStrictEqual(test_offset, committed)
done()
})
});

consumer.on('event.error', function(err) {
console.error(err);
});

consumer.connect();
});
});
});
8 changes: 7 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ type EventListenerMap = {
'disconnected': (metrics: ClientMetrics) => void,
'ready': (info: ReadyInfo, metadata: Metadata) => void,
'connection.failure': (error: LibrdKafkaError, metrics: ClientMetrics) => void,
// event messages
// event messages
'event.error': (error: LibrdKafkaError) => void,
'event.stats': (eventData: any) => void,
'event.log': (eventData: any) => void,
Expand Down Expand Up @@ -262,6 +262,12 @@ export class Producer extends Client<KafkaProducerEvents> {
setPollInterval(interval: number): this;

static createWriteStream(conf: ProducerGlobalConfig, topicConf: ProducerTopicConfig, streamOptions: WriteStreamOptions): ProducerStream;

initTransactions(timeout?: NumberNullUndefined): void;
beginTransaction(): void;
commitTransaction(timeout?: NumberNullUndefined): void;
abortTransaction(timeout?: NumberNullUndefined): void;
sendOffsetsToTransaction(offsets: TopicPartitionOffset[], consumer: KafkaConsumer, timeout?: NumberNullUndefined): void;
}

export class HighLevelProducer extends Producer {
Expand Down
28 changes: 28 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,34 @@ Client.prototype._errorWrap = function(errorCode, intIsError) {
return returnValue;
};

/**
* Wrap a potential RdKafka transaction error.
*
* This internal method is meant to take the return value
* from a transaction function, throw if it
* is an error (Making it a proper librdkafka error object), or
* return the appropriate value otherwise.
*
* It is intended to be used in a return statement,
*
* @private
* @param {number} errorCode - Error code returned from a native method
* @return {boolean} - Returns true or the method return value unless it throws.
*/
Client.prototype._errorWrapTxn = function(errorObject) {
if (errorObject.code !== LibrdKafkaError.codes.ERR_NO_ERROR) {
var e = LibrdKafkaError.create(errorObject);

e.isFatal = errorObject.isFatal;
e.isRetriable = errorObject.isRetriable;
e.isTxnRequiresAbort = errorObject.isTxnRequiresAbort;
throw e;
}
else {
return true;
}
};

/**
* This callback is used to pass metadata or an error after a successful
* connection
Expand Down
Loading