diff --git a/lib/errors/MessageSizeTooLargeError.js b/lib/errors/MessageSizeTooLargeError.js new file mode 100644 index 00000000..7af5a2c6 --- /dev/null +++ b/lib/errors/MessageSizeTooLargeError.js @@ -0,0 +1,15 @@ +var util = require('util'); + +var MessageSizeTooLarge = function (vars) { + Error.captureStackTrace(this, this); + if (typeof vars === 'object') { + this.message = `Found a message larger than the maximum fetch size of this consumer on topic ${vars.topic} partition ${vars.partition} at fetch offset ${vars.offset}. Increase the fetch size, or decrease the maximum message size the broker will allow.`; + } else { + this.message = vars; + } +}; + +util.inherits(MessageSizeTooLarge, Error); +MessageSizeTooLarge.prototype.name = 'MessageSizeTooLarge'; + +module.exports = MessageSizeTooLarge; diff --git a/lib/protocol/protocol.js b/lib/protocol/protocol.js index 207c0212..fe9b05ca 100644 --- a/lib/protocol/protocol.js +++ b/lib/protocol/protocol.js @@ -11,6 +11,7 @@ var ERROR_CODE = protocol.ERROR_CODE; var GROUP_ERROR = protocol.GROUP_ERROR; var PartitionMetadata = protocol.PartitionMetadata; const API_KEY_TO_NAME = _.invert(REQUEST_TYPE); +const MessageSizeTooLarge = require('../errors/MessageSizeTooLargeError'); var API_VERSION = 0; var REPLICA_ID = -1; @@ -177,6 +178,17 @@ function decodeMessageSet (topic, partition, messageSet, cb, maxTickMessages, hi vars.value = null; } + if (vars.attributes === 0 && vars.partial) { + cb( + new MessageSizeTooLarge({ + topic: topic, + offset: vars.offset, + partition: partition + }) + ); + return; + } + if (!vars.partial && vars.offset !== null) { messageCount++; set.push(vars.offset); diff --git a/test/test.consumerGroup.js b/test/test.consumerGroup.js index 08d2e62c..460dcb16 100644 --- a/test/test.consumerGroup.js +++ b/test/test.consumerGroup.js @@ -3,6 +3,8 @@ const sinon = require('sinon'); const should = require('should'); const ConsumerGroup = require('../lib/consumerGroup'); +const KafkaClient = require('../lib/kafkaClient'); +const HighLevelProducer = require('../lib/highLevelProducer'); const host = process.env['KAFKA_TEST_HOST'] || ''; const proxyquire = require('proxyquire').noCallThru(); const EventEmitter = require('events').EventEmitter; @@ -27,12 +29,15 @@ describe('ConsumerGroup', function () { }; // eslint-disable-next-line no-new - new ConsumerGroup({ - host: 'myhost', - id: 'myClientId', - batch: batch, - connectOnReady: false - }, 'SampleTopic'); + new ConsumerGroup( + { + host: 'myhost', + id: 'myClientId', + batch: batch, + connectOnReady: false + }, + 'SampleTopic' + ); sinon.assert.calledWithExactly(fakeClient, 'myhost', 'myClientId', undefined, batch, undefined); }); @@ -43,36 +48,45 @@ describe('ConsumerGroup', function () { }; // eslint-disable-next-line no-new - new ConsumerGroup({ - host: 'myhost', - id: 'myClientId', - zk: zkOptions, - connectOnReady: false - }, 'SampleTopic'); + new ConsumerGroup( + { + host: 'myhost', + id: 'myClientId', + zk: zkOptions, + connectOnReady: false + }, + 'SampleTopic' + ); sinon.assert.calledWithExactly(fakeClient, 'myhost', 'myClientId', zkOptions, undefined, undefined); }); it('should setup SSL ConsumerGroup option ssl is true', function () { // eslint-disable-next-line no-new - new ConsumerGroup({ - host: 'myhost', - id: 'myClientId', - ssl: true, - connectOnReady: false - }, 'SampleTopic'); + new ConsumerGroup( + { + host: 'myhost', + id: 'myClientId', + ssl: true, + connectOnReady: false + }, + 'SampleTopic' + ); sinon.assert.calledWithExactly(fakeClient, 'myhost', 'myClientId', undefined, undefined, {}); }); it('should pass SSL client options through ConsumerGroup option', function () { const ssl = { rejectUnauthorized: false }; // eslint-disable-next-line no-new - new ConsumerGroup({ - host: 'myhost', - id: 'myClientId', - ssl: ssl, - connectOnReady: false - }, 'SampleTopic'); + new ConsumerGroup( + { + host: 'myhost', + id: 'myClientId', + ssl: ssl, + connectOnReady: false + }, + 'SampleTopic' + ); sinon.assert.calledWithExactly(fakeClient, 'myhost', 'myClientId', undefined, undefined, ssl); }); @@ -91,10 +105,13 @@ describe('ConsumerGroup', function () { ['earliest', 'latest', 'none'].forEach(offset => { should.doesNotThrow(() => { // eslint-disable-next-line no-new - new ConsumerGroup({ - outOfRangeOffset: offset, - connectOnReady: false - }, 'TestTopic'); + new ConsumerGroup( + { + outOfRangeOffset: offset, + connectOnReady: false + }, + 'TestTopic' + ); }); }); }); @@ -114,10 +131,13 @@ describe('ConsumerGroup', function () { ['earliest', 'latest', 'none'].forEach(offset => { should.doesNotThrow(() => { // eslint-disable-next-line no-new - new ConsumerGroup({ - fromOffset: offset, - connectOnReady: false - }, 'TestTopic'); + new ConsumerGroup( + { + fromOffset: offset, + connectOnReady: false + }, + 'TestTopic' + ); }); }); }); @@ -139,10 +159,13 @@ describe('ConsumerGroup', function () { './client': fakeClient }); - consumerGroup = new ConsumerGroup({ - host: 'gibberish', - connectOnReady: false - }, 'TestTopic'); + consumerGroup = new ConsumerGroup( + { + host: 'gibberish', + connectOnReady: false + }, + 'TestTopic' + ); }); beforeEach(function () { @@ -230,14 +253,17 @@ describe('ConsumerGroup', function () { './client': fakeClient }); - consumerGroup = new ConsumerGroup({ - host: host, - connectOnReady: false, - sessionTimeout: 8000, - heartbeatInterval: 250, - retryMinTimeout: 250, - heartbeatTimeoutMs: 200 - }, 'TestTopic'); + consumerGroup = new ConsumerGroup( + { + host: host, + connectOnReady: false, + sessionTimeout: 8000, + heartbeatInterval: 250, + retryMinTimeout: 250, + heartbeatTimeoutMs: 200 + }, + 'TestTopic' + ); }); afterEach(function () { @@ -253,7 +279,7 @@ describe('ConsumerGroup', function () { }); consumerGroup.options.outOfRangeOffset = 'none'; - consumerGroup.emit('offsetOutOfRange', {topic: 'test-topic', partition: '0'}); + consumerGroup.emit('offsetOutOfRange', { topic: 'test-topic', partition: '0' }); }); it('should emit error if fetchOffset fails', function (done) { @@ -272,14 +298,16 @@ describe('ConsumerGroup', function () { sandbox.spy(consumerGroup, 'setOffset'); sandbox.stub(consumerGroup.offset, 'fetch').yields(new Error('something went wrong')); - consumerGroup.topicPayloads = [{ - topic: TOPIC_NAME, - partition: '0', - offset: 1 - }]; + consumerGroup.topicPayloads = [ + { + topic: TOPIC_NAME, + partition: '0', + offset: 1 + } + ]; consumerGroup.options.outOfRangeOffset = 'latest'; - consumerGroup.emit('offsetOutOfRange', {topic: TOPIC_NAME, partition: '0'}); + consumerGroup.emit('offsetOutOfRange', { topic: TOPIC_NAME, partition: '0' }); }); it('should set offset to latest if outOfRangeOffset is latest', function (done) { @@ -288,7 +316,11 @@ describe('ConsumerGroup', function () { sandbox.stub(consumerGroup, 'resume').callsFake(function () { sinon.assert.calledOnce(consumerGroup.pause); - sinon.assert.calledWithExactly(consumerGroup.offset.fetch, [{topic: TOPIC_NAME, partition: '0', time: -1}], sinon.match.func); + sinon.assert.calledWithExactly( + consumerGroup.offset.fetch, + [{ topic: TOPIC_NAME, partition: '0', time: -1 }], + sinon.match.func + ); sinon.assert.calledWithExactly(consumerGroup.setOffset, TOPIC_NAME, '0', NEW_OFFSET); consumerGroup.topicPayloads[0].offset.should.be.eql(NEW_OFFSET); done(); @@ -305,14 +337,16 @@ describe('ConsumerGroup', function () { } }); - consumerGroup.topicPayloads = [{ - topic: TOPIC_NAME, - partition: '0', - offset: 1 - }]; + consumerGroup.topicPayloads = [ + { + topic: TOPIC_NAME, + partition: '0', + offset: 1 + } + ]; consumerGroup.options.outOfRangeOffset = 'latest'; - consumerGroup.emit('offsetOutOfRange', {topic: TOPIC_NAME, partition: '0'}); + consumerGroup.emit('offsetOutOfRange', { topic: TOPIC_NAME, partition: '0' }); }); it('should set offset to earliest if outOfRangeOffset is earliest', function (done) { @@ -320,7 +354,11 @@ describe('ConsumerGroup', function () { const NEW_OFFSET = 500; sandbox.stub(consumerGroup, 'resume').callsFake(function () { - sinon.assert.calledWithExactly(consumerGroup.offset.fetch, [{topic: TOPIC_NAME, partition: '0', time: -2}], sinon.match.func); + sinon.assert.calledWithExactly( + consumerGroup.offset.fetch, + [{ topic: TOPIC_NAME, partition: '0', time: -2 }], + sinon.match.func + ); sinon.assert.calledWithExactly(consumerGroup.setOffset, TOPIC_NAME, '0', NEW_OFFSET); consumerGroup.topicPayloads[0].offset.should.be.eql(NEW_OFFSET); done(); @@ -336,14 +374,16 @@ describe('ConsumerGroup', function () { } }); - consumerGroup.topicPayloads = [{ - topic: TOPIC_NAME, - partition: '0', - offset: 1 - }]; + consumerGroup.topicPayloads = [ + { + topic: TOPIC_NAME, + partition: '0', + offset: 1 + } + ]; consumerGroup.options.outOfRangeOffset = 'earliest'; - consumerGroup.emit('offsetOutOfRange', {topic: TOPIC_NAME, partition: '0'}); + consumerGroup.emit('offsetOutOfRange', { topic: TOPIC_NAME, partition: '0' }); }); }); @@ -352,14 +392,17 @@ describe('ConsumerGroup', function () { beforeEach(function () { sandbox = sinon.sandbox.create(); - consumerGroup = new ConsumerGroup({ - host: host, - connectOnReady: false, - sessionTimeout: 8000, - heartbeatInterval: 250, - retryMinTimeout: 250, - heartbeatTimeoutMs: 200 - }, 'TestTopic'); + consumerGroup = new ConsumerGroup( + { + host: host, + connectOnReady: false, + sessionTimeout: 8000, + heartbeatInterval: 250, + retryMinTimeout: 250, + heartbeatTimeoutMs: 200 + }, + 'TestTopic' + ); }); afterEach(function (done) { @@ -395,10 +438,13 @@ describe('ConsumerGroup', function () { let consumerGroup; beforeEach(function (done) { - consumerGroup = new ConsumerGroup({ - host: host, - groupId: 'longFetchSimulation' - }, 'TestTopic'); + consumerGroup = new ConsumerGroup( + { + host: host, + groupId: 'longFetchSimulation' + }, + 'TestTopic' + ); consumerGroup.once('connect', done); }); @@ -420,10 +466,13 @@ describe('ConsumerGroup', function () { var consumerGroup, sandbox; beforeEach(function () { - consumerGroup = new ConsumerGroup({ - host: host, - connectOnReady: false - }, 'TestTopic'); + consumerGroup = new ConsumerGroup( + { + host: host, + connectOnReady: false + }, + 'TestTopic' + ); sandbox = sinon.sandbox.create(); sandbox.stub(consumerGroup, 'sendHeartbeat').returns({ @@ -488,10 +537,13 @@ describe('ConsumerGroup', function () { describe('#handleSyncGroup', function () { var consumerGroup, sandbox; beforeEach(function () { - consumerGroup = new ConsumerGroup({ - host: host, - connectOnReady: false - }, 'TestTopic'); + consumerGroup = new ConsumerGroup( + { + host: host, + connectOnReady: false + }, + 'TestTopic' + ); sandbox = sinon.sandbox.create(); }); @@ -533,10 +585,10 @@ describe('ConsumerGroup', function () { const topicPayloads = _(consumerGroup.topicPayloads); - topicPayloads.find({topic: 'TestTopic', partition: 0}).offset.should.be.eql(10); - topicPayloads.find({topic: 'TestTopic', partition: 2}).offset.should.be.eql(0); - topicPayloads.find({topic: 'TestTopic', partition: 3}).offset.should.be.eql(9); - topicPayloads.find({topic: 'TestTopic', partition: 4}).offset.should.be.eql(6); + topicPayloads.find({ topic: 'TestTopic', partition: 0 }).offset.should.be.eql(10); + topicPayloads.find({ topic: 'TestTopic', partition: 2 }).offset.should.be.eql(0); + topicPayloads.find({ topic: 'TestTopic', partition: 3 }).offset.should.be.eql(9); + topicPayloads.find({ topic: 'TestTopic', partition: 4 }).offset.should.be.eql(6); done(error); }); }); @@ -593,10 +645,10 @@ describe('ConsumerGroup', function () { const topicPayloads = _(consumerGroup.topicPayloads); - topicPayloads.find({topic: 'TestTopic', partition: 0}).offset.should.be.eql(10); - topicPayloads.find({topic: 'TestTopic', partition: 2}).offset.should.be.eql(20); - topicPayloads.find({topic: 'TestTopic', partition: 3}).offset.should.be.eql(9); - topicPayloads.find({topic: 'TestTopic', partition: 4}).offset.should.be.eql(5000); + topicPayloads.find({ topic: 'TestTopic', partition: 0 }).offset.should.be.eql(10); + topicPayloads.find({ topic: 'TestTopic', partition: 2 }).offset.should.be.eql(20); + topicPayloads.find({ topic: 'TestTopic', partition: 3 }).offset.should.be.eql(9); + topicPayloads.find({ topic: 'TestTopic', partition: 4 }).offset.should.be.eql(5000); done(error); }); }); @@ -659,10 +711,10 @@ describe('ConsumerGroup', function () { const topicPayloads = _(consumerGroup.topicPayloads); - topicPayloads.find({topic: 'TestTopic', partition: 0}).offset.should.be.eql(10); - topicPayloads.find({topic: 'TestTopic', partition: 2}).offset.should.be.eql(0); - topicPayloads.find({topic: 'TestTopic', partition: 3}).offset.should.be.eql(9); - topicPayloads.find({topic: 'TestTopic', partition: 4}).offset.should.be.eql(6); + topicPayloads.find({ topic: 'TestTopic', partition: 0 }).offset.should.be.eql(10); + topicPayloads.find({ topic: 'TestTopic', partition: 2 }).offset.should.be.eql(0); + topicPayloads.find({ topic: 'TestTopic', partition: 3 }).offset.should.be.eql(9); + topicPayloads.find({ topic: 'TestTopic', partition: 4 }).offset.should.be.eql(6); done(error); }); }); @@ -704,22 +756,25 @@ describe('ConsumerGroup', function () { const topicPayloads = _(consumerGroup.topicPayloads); - topicPayloads.find({topic: 'TestTopic', partition: 3}).offset.should.be.eql(0); - topicPayloads.find({topic: 'TestTopic', partition: 2}).offset.should.be.eql(3); - topicPayloads.find({topic: 'TestTopic', partition: 0}).offset.should.be.eql(10); - topicPayloads.find({topic: 'TestTopic', partition: 4}).offset.should.be.eql(5000); + topicPayloads.find({ topic: 'TestTopic', partition: 3 }).offset.should.be.eql(0); + topicPayloads.find({ topic: 'TestTopic', partition: 2 }).offset.should.be.eql(3); + topicPayloads.find({ topic: 'TestTopic', partition: 0 }).offset.should.be.eql(10); + topicPayloads.find({ topic: 'TestTopic', partition: 4 }).offset.should.be.eql(5000); done(error); }); }); }); it('should yield false when there are no partitions owned', function (done) { - consumerGroup.handleSyncGroup({ - partitions: {} - }, function (error, ownsPartitions) { - ownsPartitions.should.be.false; - done(error); - }); + consumerGroup.handleSyncGroup( + { + partitions: {} + }, + function (error, ownsPartitions) { + ownsPartitions.should.be.false; + done(error); + } + ); }); }); @@ -727,10 +782,60 @@ describe('ConsumerGroup', function () { it('should throw error if migration option is used with KafkaClient', function () { should.throws(function () { // eslint-disable-next-line no-new - new ConsumerGroup({ - kafkaHost: 'localhost:9092', - migrateHLC: true - }, 'TestTopic'); + new ConsumerGroup( + { + kafkaHost: 'localhost:9092', + migrateHLC: true + }, + 'TestTopic' + ); + }); + }); + }); + + function sendRandomByteMessage (bytes, topic, done) { + const crypto = require('crypto'); + const buffer = crypto.randomBytes(bytes); + + sendMessage(buffer, topic, done); + + return buffer; + } + + function sendMessage (message, topic, done) { + var client = new KafkaClient({ kafkaHost: '127.0.0.1:9092' }); + var producer = new HighLevelProducer(client, { requireAcks: 1 }); + + client.on('connect', function () { + producer.send([{ topic: topic, messages: message, attributes: 0 }], done); + }); + } + + describe('fetchMaxBytes', function () { + let topic; + beforeEach(function (done) { + topic = uuid.v4(); + sendRandomByteMessage(2048, topic, done); + }); + + it('should throw an error when message exceeds maximum fetch size', function (done) { + const consumerGroup = new ConsumerGroup( + { + kafkaHost: '127.0.0.1:9092', + groupId: uuid.v4(), + fetchMaxBytes: 1024, + fromOffset: 'earliest' + }, + topic + ); + + consumerGroup.once('error', function (error) { + error.should.be.an.instanceOf(require('../lib/errors/MessageSizeTooLargeError')); + done(); + }); + + consumerGroup.once('message', function (message) { + done(new Error('should not receive a message')); }); }); }); @@ -739,10 +844,13 @@ describe('ConsumerGroup', function () { var consumerGroup, sandbox; beforeEach(function () { - consumerGroup = new ConsumerGroup({ - host: host, - connectOnReady: false - }, 'TestTopic'); + consumerGroup = new ConsumerGroup( + { + host: host, + connectOnReady: false + }, + 'TestTopic' + ); sandbox = sinon.sandbox.create(); sandbox.stub(consumerGroup, 'connect'); @@ -807,48 +915,57 @@ describe('ConsumerGroup', function () { const client = new Client(host); const producer = new Producer(client); - async.series([ - function (callback) { - client.createTopics([topic], true, callback); - }, - function (callback) { - if (producer.ready) { - return callback(); + async.series( + [ + function (callback) { + client.createTopics([topic], true, callback); + }, + function (callback) { + if (producer.ready) { + return callback(); + } + producer.once('ready', callback); + }, + function (callback) { + producer.send([{ topic: topic, messages: messages }], callback); + }, + function (callback) { + producer.close(callback); } - producer.once('ready', callback); - }, - function (callback) { - producer.send([{topic: topic, messages: messages}], callback); - }, - function (callback) { - producer.close(callback); - } - ], done); + ], + done + ); } function confirmMessages (done) { const left = messages.slice(); - const consumerGroup = new ConsumerGroup({ - fromOffset: 'earliest', - groupId: groupId, - sessionTimeout: 8000, - autoCommit: false - }, topic); - - async.series([ - function (callback) { - consumerGroup.on('message', function (data) { - _.pull(left, data.value); - - if (left.length === 0) { - callback(); - } - }); + const consumerGroup = new ConsumerGroup( + { + fromOffset: 'earliest', + groupId: groupId, + sessionTimeout: 8000, + autoCommit: false }, - function (callback) { - consumerGroup.close(callback); - } - ], done); + topic + ); + + async.series( + [ + function (callback) { + consumerGroup.on('message', function (data) { + _.pull(left, data.value); + + if (left.length === 0) { + callback(); + } + }); + }, + function (callback) { + consumerGroup.close(callback); + } + ], + done + ); } async.series([addMessages, confirmMessages, confirmMessages], done);