Skip to content

Commit

Permalink
Consumer should throw an error if message exceeds fetchMaxBytes fixes #…
Browse files Browse the repository at this point in the history
  • Loading branch information
hyperlink committed Aug 15, 2017
1 parent 0eb61c5 commit a060f57
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 151 deletions.
15 changes: 15 additions & 0 deletions lib/errors/MessageSizeTooLargeError.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
var util = require('util');

var MessageSizeTooLarge = function (vars) {
Error.captureStackTrace(this, this);
if (typeof vars === 'object') {
this.message = `Found a message larger than the maximum fetch size of this consumer on topic ${vars.topic} partition ${vars.partition} at fetch offset ${vars.offset}. Increase the fetch size, or decrease the maximum message size the broker will allow.`;
} else {
this.message = vars;
}
};

util.inherits(MessageSizeTooLarge, Error);
MessageSizeTooLarge.prototype.name = 'MessageSizeTooLarge';

module.exports = MessageSizeTooLarge;
12 changes: 12 additions & 0 deletions lib/protocol/protocol.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var ERROR_CODE = protocol.ERROR_CODE;
var GROUP_ERROR = protocol.GROUP_ERROR;
var PartitionMetadata = protocol.PartitionMetadata;
const API_KEY_TO_NAME = _.invert(REQUEST_TYPE);
const MessageSizeTooLarge = require('../errors/MessageSizeTooLargeError');

var API_VERSION = 0;
var REPLICA_ID = -1;
Expand Down Expand Up @@ -177,6 +178,17 @@ function decodeMessageSet (topic, partition, messageSet, cb, maxTickMessages, hi
vars.value = null;
}

if (vars.attributes === 0 && vars.partial) {
cb(
new MessageSizeTooLarge({
topic: topic,
offset: vars.offset,
partition: partition
})
);
return;
}

if (!vars.partial && vars.offset !== null) {
messageCount++;
set.push(vars.offset);
Expand Down
Loading

0 comments on commit a060f57

Please sign in to comment.