Skip to content

Commit

Permalink
Upgrade librdkafka to 1.5.2
Browse files Browse the repository at this point in the history
  • Loading branch information
iradul committed Dec 5, 2020
1 parent c681490 commit a5b1ef1
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 22 deletions.
10 changes: 5 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ I am looking for *your* help to make this project even better! If you're interes

The `node-rdkafka` library is a high-performance NodeJS client for [Apache Kafka](http://kafka.apache.org/) that wraps the native [librdkafka](https://github.com/edenhill/librdkafka) library. All the complexity of balancing writes across partitions and managing (possibly ever-changing) brokers should be encapsulated in the library.

__This library currently uses `librdkafka` version `1.4.2`.__
__This library currently uses `librdkafka` version `1.5.2`.__

## Reference Docs

Expand Down Expand Up @@ -59,7 +59,7 @@ Using Alpine Linux? Check out the [docs](https://github.com/Blizzard/node-rdkafk

### Windows

Windows build **is not** compiled from `librdkafka` source but it is rather linked against the appropriate version of [NuGet librdkafka.redist](https://www.nuget.org/packages/librdkafka.redist/) static binary that gets downloaded from `https://globalcdn.nuget.org/packages/librdkafka.redist.1.4.2.nupkg` during installation. This download link can be changed using the environment variable `NODE_RDKAFKA_NUGET_BASE_URL` that defaults to `https://globalcdn.nuget.org/packages/` when it's no set.
Windows build **is not** compiled from `librdkafka` source but it is rather linked against the appropriate version of [NuGet librdkafka.redist](https://www.nuget.org/packages/librdkafka.redist/) static binary that gets downloaded from `https://globalcdn.nuget.org/packages/librdkafka.redist.1.5.2.nupkg` during installation. This download link can be changed using the environment variable `NODE_RDKAFKA_NUGET_BASE_URL` that defaults to `https://globalcdn.nuget.org/packages/` when it's no set.

Requirements:
* [node-gyp for Windows](https://github.com/nodejs/node-gyp#on-windows) (the easies way to get it: `npm install --global --production windows-build-tools`, if your node version is 6.x or below, pleasse use `npm install --global --production windows-build-tools@3.1.0`)
Expand Down Expand Up @@ -96,7 +96,7 @@ var Kafka = require('node-rdkafka');

## Configuration

You can pass many configuration options to `librdkafka`. A full list can be found in `librdkafka`'s [Configuration.md](https://github.com/edenhill/librdkafka/blob/v1.4.2/CONFIGURATION.md)
You can pass many configuration options to `librdkafka`. A full list can be found in `librdkafka`'s [Configuration.md](https://github.com/edenhill/librdkafka/blob/v1.5.2/CONFIGURATION.md)

Configuration keys that have the suffix `_cb` are designated as callbacks. Some
of these keys are informational and you can choose to opt-in (for example, `dr_cb`). Others are callbacks designed to
Expand Down Expand Up @@ -131,7 +131,7 @@ You can also get the version of `librdkafka`
const Kafka = require('node-rdkafka');
console.log(Kafka.librdkafkaVersion);

// #=> 1.4.2
// #=> 1.5.2
```

## Sending Messages
Expand All @@ -144,7 +144,7 @@ var producer = new Kafka.Producer({
});
```

A `Producer` requires only `metadata.broker.list` (the Kafka brokers) to be created. The values in this list are separated by commas. For other configuration options, see the [Configuration.md](https://github.com/edenhill/librdkafka/blob/v1.4.2/CONFIGURATION.md) file described previously.
A `Producer` requires only `metadata.broker.list` (the Kafka brokers) to be created. The values in this list are separated by commas. For other configuration options, see the [Configuration.md](https://github.com/edenhill/librdkafka/blob/v1.5.2/CONFIGURATION.md) file described previously.

The following example illustrates a list with several `librdkafka` options set.

Expand Down
4 changes: 2 additions & 2 deletions ci/librdkafka-defs-generator.js
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ function updateErrorDefinitions(file) {
// validate body
const emptyCheck = body
.replace(/(( \/\*)|( ?\*)).*/g, '')
.replace(/ ERR_\w+: -?\d+,\n/g, '')
.replace(/ ERR_\w+: -?\d+,?\n/g, '')
.trim()
if (emptyCheck !== '') {
throw new Error(`Fail to parse ${file}. It contains these extra details:\n${emptyCheck}`);
Expand All @@ -184,7 +184,7 @@ function updateErrorDefinitions(file) {
.replace(/(\/\/.*\n)?LibrdKafkaError.codes = {[^}]+/g, `${getHeader(file)}\nLibrdKafkaError.codes = {\n${body}`)

fs.writeFileSync(error_js_file, error_js);
fs.writeFileSync(path.resolve(__dirname, '../errors.d.ts'), `${getHeader(file)}\nexport const CODES: { ERRORS: {${body.replace(/[ \.]*(\*\/\n \w+: )(-?\d+),/g, ' (**$2**) $1number,')}}}`)
fs.writeFileSync(path.resolve(__dirname, '../errors.d.ts'), `${getHeader(file)}\nexport const CODES: { ERRORS: {${body.replace(/[ \.]*(\*\/\n \w+: )(-?\d+),?/g, ' (**$2**) $1number,')}}}`)
}

(async function updateTypeDefs() {
Expand Down
43 changes: 32 additions & 11 deletions config.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ====== Generated from librdkafka 1.4.2 file CONFIGURATION.md ======
// ====== Generated from librdkafka 1.5.2 file CONFIGURATION.md ======
// Code that generated this is a derivative work of the code from Nam Nguyen
// https://gist.github.com/ntgn81/066c2c8ec5b4238f85d1e9168a04e3fb

Expand Down Expand Up @@ -104,6 +104,13 @@ export interface GlobalConfig {
*/
"topic.metadata.refresh.sparse"?: boolean;

/**
* Apache Kafka topic creation is asynchronous and it takes some time for a new topic to propagate throughout the cluster to all brokers. If a client requests topic metadata after manual topic creation but before the topic has been fully propagated to the broker the client is requesting metadata from, the topic will seem to be non-existent and the client will mark the topic as such, failing queued produced messages with `ERR__UNKNOWN_TOPIC`. This setting delays marking a topic as non-existent until the configured propagation max time has passed. The maximum propagation time is calculated from the time the topic is first referenced in the client, e.g., on produce().
*
* @default 30000
*/
"topic.metadata.propagation.max.ms"?: number;

/**
* Topic blacklist, a comma-separated list of regular expressions for matching topic names that should be ignored in broker metadata information as if the topics did not exist.
*/
Expand Down Expand Up @@ -579,28 +586,28 @@ export interface ProducerGlobalConfig extends GlobalConfig {
/**
* Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency.
*
* @default 0.5
* @default 5
*/
"queue.buffering.max.ms"?: any;

/**
* Alias for `queue.buffering.max.ms`: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches (MessageSets) to transmit to brokers. A higher value allows larger and more effective (less overhead, improved compression) batches of messages to accumulate at the expense of increased message delivery latency.
*
* @default 0.5
* @default 5
*/
"linger.ms"?: any;

/**
* How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true.
*
* @default 2
* @default 2147483647
*/
"message.send.max.retries"?: number;

/**
* Alias for `message.send.max.retries`: How many times to retry sending a failing Message. **Note:** retrying may cause reordering unless `enable.idempotence` is set to true.
*
* @default 2
* @default 2147483647
*/
"retries"?: number;

Expand Down Expand Up @@ -633,12 +640,19 @@ export interface ProducerGlobalConfig extends GlobalConfig {
"compression.type"?: 'none' | 'gzip' | 'snappy' | 'lz4' | 'zstd';

/**
* Maximum number of messages batched in one MessageSet. The total MessageSet size is also limited by message.max.bytes.
* Maximum number of messages batched in one MessageSet. The total MessageSet size is also limited by batch.size and message.max.bytes.
*
* @default 10000
*/
"batch.num.messages"?: number;

/**
* Maximum size (in bytes) of all messages batched in one MessageSet, including protocol framing overhead. This limit is applied after the first message has been added to the batch, regardless of the first message's size, this is to ensure that messages that exceed batch.size are produced. The total MessageSet size is also limited by batch.num.messages and message.max.bytes.
*
* @default 1000000
*/
"batch.size"?: number;

/**
* Only provide delivery reports for failed messages.
*
Expand Down Expand Up @@ -739,16 +753,16 @@ export interface ConsumerGlobalConfig extends GlobalConfig {
"queued.min.messages"?: number;

/**
* Maximum number of kilobytes per topic+partition in the local consumer queue. This value may be overshot by fetch.message.max.bytes. This property has higher priority than queued.min.messages.
* Maximum number of kilobytes of queued pre-fetched messages in the local consumer queue. If using the high-level consumer this setting applies to the single consumer queue, regardless of the number of partitions. When using the legacy simple consumer or when separate partition queues are used this setting applies per partition. This value may be overshot by fetch.message.max.bytes. This property has higher priority than queued.min.messages.
*
* @default 1048576
* @default 65536
*/
"queued.max.messages.kbytes"?: number;

/**
* Maximum time the broker may wait to fill the response with fetch.min.bytes.
* Maximum time the broker may wait to fill the Fetch response with fetch.min.bytes of messages.
*
* @default 100
* @default 500
*/
"fetch.wait.max.ms"?: number;

Expand Down Expand Up @@ -829,6 +843,13 @@ export interface ConsumerGlobalConfig extends GlobalConfig {
* @default false
*/
"check.crcs"?: boolean;

/**
* Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics. The broker must also be configured with `auto.create.topics.enable=true` for this configuraiton to take effect. Note: The default value (false) is different from the Java consumer (true). Requires broker version >= 0.11.0.0, for older broker versions only the broker configuration applies.
*
* @default false
*/
"allow.auto.create.topics"?: boolean;
}

export interface TopicConfig {
Expand Down Expand Up @@ -856,7 +877,7 @@ export interface ProducerTopicConfig extends TopicConfig {
/**
* The ack timeout of the producer request in milliseconds. This value is only enforced by the broker and relies on `request.required.acks` being != 0.
*
* @default 5000
* @default 30000
*/
"request.timeout.ms"?: number;

Expand Down
2 changes: 1 addition & 1 deletion deps/librdkafka
14 changes: 13 additions & 1 deletion errors.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// ====== Generated from librdkafka 1.4.2 file src-cpp/rdkafkacpp.h ======
// ====== Generated from librdkafka 1.5.2 file src-cpp/rdkafkacpp.h ======
export const CODES: { ERRORS: {
/* Internal errors to rdkafka: */
/** Begin internal error codes (**-200**) */
Expand Down Expand Up @@ -303,4 +303,16 @@ export const CODES: { ERRORS: {
/** Static consumer fenced by other consumer with same
* group.instance.id (**82**) */
ERR_FENCED_INSTANCE_ID: number,
/** Eligible partition leaders are not available (**83**) */
ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE: number,
/** Leader election not needed for topic partition (**84**) */
ERR_ELECTION_NOT_NEEDED: number,
/** No partition reassignment is in progress (**85**) */
ERR_NO_REASSIGNMENT_IN_PROGRESS: number,
/** Deleting offsets of a topic while the consumer group is subscribed to it (**86**) */
ERR_GROUP_SUBSCRIBED_TO_TOPIC: number,
/** Broker failed to validate record (**87**) */
ERR_INVALID_RECORD: number,
/** There are unstable offsets that need to be cleared (**88**) */
ERR_UNSTABLE_OFFSET_COMMIT: number,
}}
14 changes: 13 additions & 1 deletion lib/error.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ LibrdKafkaError.wrap = errorWrap;
* @enum {number}
* @constant
*/
// ====== Generated from librdkafka 1.4.2 file src-cpp/rdkafkacpp.h ======
// ====== Generated from librdkafka 1.5.2 file src-cpp/rdkafkacpp.h ======
LibrdKafkaError.codes = {

/* Internal errors to rdkafka: */
Expand Down Expand Up @@ -333,6 +333,18 @@ LibrdKafkaError.codes = {
/** Static consumer fenced by other consumer with same
* group.instance.id. */
ERR_FENCED_INSTANCE_ID: 82,
/** Eligible partition leaders are not available */
ERR_ELIGIBLE_LEADERS_NOT_AVAILABLE: 83,
/** Leader election not needed for topic partition */
ERR_ELECTION_NOT_NEEDED: 84,
/** No partition reassignment is in progress */
ERR_NO_REASSIGNMENT_IN_PROGRESS: 85,
/** Deleting offsets of a topic while the consumer group is subscribed to it */
ERR_GROUP_SUBSCRIBED_TO_TOPIC: 86,
/** Broker failed to validate record */
ERR_INVALID_RECORD: 87,
/** There are unstable offsets that need to be cleared */
ERR_UNSTABLE_OFFSET_COMMIT: 88
};

/**
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "node-rdkafka",
"version": "v2.9.1",
"description": "Node.js bindings for librdkafka",
"librdkafka": "1.4.2",
"librdkafka": "1.5.2",
"main": "lib/index.js",
"scripts": {
"configure": "node-gyp configure",
Expand Down

0 comments on commit a5b1ef1

Please sign in to comment.