diff --git a/examples/oauthbearer-default-flow.md b/examples/oauthbearer-default-flow.md new file mode 100644 index 00000000..06cb9aab --- /dev/null +++ b/examples/oauthbearer-default-flow.md @@ -0,0 +1,80 @@ +Producer, Consumer and HighLevelProducer: +```js +/* + * node-rdkafka - Node.js wrapper for RdKafka C/C++ library + * + * Copyright (c) 2016 Blizzard Entertainment + * + * This software may be modified and distributed under the terms + * of the MIT license. See the LICENSE.txt file for details. + */ + +var Kafka = require('../'); + +var token = "your_token"; + +var producer = new Kafka.Producer({ + //'debug' : 'all', + 'metadata.broker.list': 'localhost:9093', + 'security.protocol': 'SASL_SSL', + 'sasl.mechanisms': 'OAUTHBEARER', +}).setOauthBearerToken(token); + +//start the producer +producer.connect(); + +//refresh the token +producer.setOauthBearerToken(token); +``` + +AdminClient: +```js +/* + * node-rdkafka - Node.js wrapper for RdKafka C/C++ library + * + * Copyright (c) 2016 Blizzard Entertainment + * + * This software may be modified and distributed under the terms + * of the MIT license. See the LICENSE.txt file for details. + */ +var Kafka = require('../'); + +var token = "your_token"; + +var admin = Kafka.AdminClient.create({ + 'metadata.broker.list': 'localhost:9093', + 'security.protocol': 'SASL_SSL', + 'sasl.mechanisms': 'OAUTHBEARER', +}, token); + +//refresh the token +admin.refreshOauthBearerToken(token); +``` + +ConsumerStream: +```js +/* + * node-rdkafka - Node.js wrapper for RdKafka C/C++ library + * + * Copyright (c) 2016 Blizzard Entertainment + * + * This software may be modified and distributed under the terms + * of the MIT license. See the LICENSE.txt file for details. + */ +var Kafka = require('../'); + +var token = "your_token"; + +var stream = Kafka.KafkaConsumer.createReadStream({ + 'metadata.broker.list': 'localhost:9093', + 'group.id': 'myGroup', + 'security.protocol': 'SASL_SSL', + 'sasl.mechanisms': 'OAUTHBEARER' + }, {}, { + topics: 'test1', + initOauthBearerToken: token, + }); + +//refresh the token +stream.refreshOauthBearerToken(token.token); +``` diff --git a/index.d.ts b/index.d.ts index d7ce7e61..4cd46ec1 100644 --- a/index.d.ts +++ b/index.d.ts @@ -117,6 +117,7 @@ export interface ReadStreamOptions extends ReadableOptions { autoClose?: boolean; streamAsBatch?: boolean; connectOptions?: any; + initOauthBearerToken?: string; } export interface WriteStreamOptions extends WritableOptions { @@ -137,6 +138,7 @@ export interface ProducerStream extends Writable { export interface ConsumerStream extends Readable { consumer: KafkaConsumer; connect(options: ConsumerGlobalConfig): void; + refreshOauthBearerToken(tokenStr: string): void; close(cb?: () => void): void; } @@ -180,6 +182,8 @@ export abstract class Client extends EventEmitter { connect(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): this; + setOauthBearerToken(tokenStr: string): this; + getClient(): any; connectedTime(): number; @@ -330,6 +334,8 @@ export interface NewTopic { } export interface IAdminClient { + refreshOauthBearerToken(tokenStr: string): void; + createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void; createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void; @@ -343,5 +349,5 @@ export interface IAdminClient { } export abstract class AdminClient { - static create(conf: GlobalConfig): IAdminClient; + static create(conf: GlobalConfig, initOauthBearerToken?: string): IAdminClient; } diff --git a/lib/admin.js b/lib/admin.js index 773dc957..bbe06084 100644 --- a/lib/admin.js +++ b/lib/admin.js @@ -25,9 +25,13 @@ var shallowCopy = require('./util').shallowCopy; * active handle with the brokers. * */ -function createAdminClient(conf) { +function createAdminClient(conf, initOauthBearerToken) { var client = new AdminClient(conf); + if (initOauthBearerToken) { + client.refreshOauthBearerToken(initOauthBearerToken); + } + // Wrap the error so we throw if it failed with some context LibrdKafkaError.wrap(client.connect(), true); @@ -105,6 +109,22 @@ AdminClient.prototype.disconnect = function() { this._isConnected = false; }; +/** + * Refresh OAuthBearer token, initially provided in factory method. + * Expiry is always set to maximum value, as the callback of librdkafka + * for token refresh is not used. + * + * @param {string} tokenStr - OAuthBearer token string + * @see connection.cc + */ +AdminClient.prototype.refreshOauthBearerToken = function (tokenStr) { + if (!tokenStr || typeof tokenStr !== 'string') { + throw new Error("OAuthBearer token is undefined/empty or not a string"); + } + + this._client.setToken(tokenStr); +}; + /** * Create a topic with a given config. * diff --git a/lib/client.js b/lib/client.js index 9cbd3f9a..354f5446 100644 --- a/lib/client.js +++ b/lib/client.js @@ -229,6 +229,25 @@ Client.prototype.connect = function(metadataOptions, cb) { }; +/** + * Set initial token before any connection is established for oauthbearer authentication flow. + * Expiry is always set to maximum value, as the callback of librdkafka + * for token refresh is not used. + * Call this method again to refresh the token. + * + * @param {string} tokenStr - OAuthBearer token string + * @see connection.cc + * @return {Client} - Returns itself. + */ +Client.prototype.setOauthBearerToken = function (tokenStr) { + if (!tokenStr || typeof tokenStr !== 'string') { + throw new Error("OAuthBearer token is undefined/empty or not a string"); + } + + this._client.setToken(tokenStr); + return this; +}; + /** * Get the native Kafka client. * diff --git a/lib/kafka-consumer-stream.js b/lib/kafka-consumer-stream.js index 0abb5358..ce1fbe95 100644 --- a/lib/kafka-consumer-stream.js +++ b/lib/kafka-consumer-stream.js @@ -112,6 +112,10 @@ function KafkaConsumerStream(consumer, options) { self.push(null); }); + if (options.initOauthBearerToken) { + this.consumer.setOauthBearerToken(options.initOauthBearerToken); + } + // Call connect. Handles potentially being connected already this.connect(this.connectOptions); @@ -123,6 +127,18 @@ function KafkaConsumerStream(consumer, options) { } +/** + * Refresh OAuthBearer token, initially provided in factory method. + * Expiry is always set to maximum value, as the callback of librdkafka + * for token refresh is not used. + * + * @param {string} tokenStr - OAuthBearer token string + * @see connection.cc + */ +KafkaConsumerStream.prototype.refreshOauthBearerToken = function (tokenStr) { + this.consumer.setOauthBearerToken(tokenStr); +}; + /** * Internal stream read method. This method reads message objects. * @param {number} size - This parameter is ignored for our cases. diff --git a/package-lock.json b/package-lock.json index a67e6cab..74b5ad03 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "node-rdkafka", - "version": "v3.0.0", + "version": "v3.0.1", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "node-rdkafka", - "version": "v3.0.0", + "version": "v3.0.1", "hasInstallScript": true, "license": "MIT", "dependencies": { @@ -184,9 +184,9 @@ } }, "node_modules/@types/linkify-it": { - "version": "3.0.2", - "resolved": "https://registry.npmjs.org/@types/linkify-it/-/linkify-it-3.0.2.tgz", - "integrity": "sha512-HZQYqbiFVWufzCwexrvh694SOim8z2d+xJl5UNamcvQFejLY/2YUtzXHYi3cHdI7PMlS8ejH2slRAOJQ32aNbA==", + "version": "3.0.5", + "resolved": "https://registry.npmjs.org/@types/linkify-it/-/linkify-it-3.0.5.tgz", + "integrity": "sha512-yg6E+u0/+Zjva+buc3EIb+29XEg4wltq7cSmd4Uc2EE/1nUVmxyzpX6gUXD0V8jIrG0r7YeOGVIbYRkxeooCtw==", "dev": true }, "node_modules/@types/markdown-it": { @@ -200,9 +200,9 @@ } }, "node_modules/@types/mdurl": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/@types/mdurl/-/mdurl-1.0.2.tgz", - "integrity": "sha512-eC4U9MlIcu2q0KQmXszyn5Akca/0jrQmwDRgpAMJai7qBWq4amIQhZyNau4VYGtCeALvW1/NtjzJJ567aZxfKA==", + "version": "1.0.5", + "resolved": "https://registry.npmjs.org/@types/mdurl/-/mdurl-1.0.5.tgz", + "integrity": "sha512-6L6VymKTzYSrEf4Nev4Xa1LCHKrlTlYCBMTlQKFuddo1CvQcE52I0mwfOJayueUC7MJuXOeHTcIU683lzd0cUA==", "dev": true }, "node_modules/abbrev": { @@ -298,12 +298,15 @@ "dev": true }, "node_modules/binary-extensions": { - "version": "2.2.0", - "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.2.0.tgz", - "integrity": "sha512-jDctJ/IVQbZoJykoeHbhXpOlNBqGNcwXJKJog42E5HDPUwQTSdjCHdihjj0DlnheQ7blbT6dHOafNAiS8ooQKA==", + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.3.0.tgz", + "integrity": "sha512-Ceh+7ox5qe7LJuLHoY0feh3pHuUDHAcRUeyL2VYghZwfpkNIy/+8Ocg0a3UuSoYzavmylwuLWQOf3hl0jjMMIw==", "dev": true, "engines": { "node": ">=8" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" } }, "node_modules/bindings": { @@ -635,6 +638,12 @@ } } }, + "node_modules/debug/node_modules/ms": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", + "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", + "dev": true + }, "node_modules/decamelize": { "version": "4.0.0", "resolved": "https://registry.npmjs.org/decamelize/-/decamelize-4.0.0.tgz", @@ -756,9 +765,9 @@ "dev": true }, "node_modules/escalade": { - "version": "3.1.1", - "resolved": "https://registry.npmjs.org/escalade/-/escalade-3.1.1.tgz", - "integrity": "sha512-k0er2gUkLf8O0zKJiAhmkTnJlTvINGv7ygDNPbeIsX/TJjGJZHuh9B2UxbsaEkmlEo9MfhrSzmhIlhRlI2GXnw==", + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/escalade/-/escalade-3.1.2.tgz", + "integrity": "sha512-ErCHMCae19vR8vQGe50xIsVomy19rg6gFu3+r3jkEO46suLMWBksvVyoGgQV+jOfl84ZSOSlmv6Gxa89PmTGmA==", "dev": true, "engines": { "node": ">=6" @@ -864,6 +873,20 @@ "integrity": "sha512-OO0pH2lK6a0hZnAdau5ItzHPI6pUlvI7jMVnxUQRtw4owF2wk8lOSabtGDCTP4Ggrg2MbGnWO9X8K1t4+fGMDw==", "dev": true }, + "node_modules/fsevents": { + "version": "2.3.3", + "resolved": "https://registry.npmjs.org/fsevents/-/fsevents-2.3.3.tgz", + "integrity": "sha512-5xoDfX+fL7faATnagmWPpbFtwh/R77WmMMqqHGS65C3vvB0YHrgF+B1YmZ3441tMj5n63k0212XNoJwzlhffQw==", + "dev": true, + "hasInstallScript": true, + "optional": true, + "os": [ + "darwin" + ], + "engines": { + "node": "^8.16.0 || ^10.6.0 || >=11.0.0" + } + }, "node_modules/get-caller-file": { "version": "2.0.5", "resolved": "https://registry.npmjs.org/get-caller-file/-/get-caller-file-2.0.5.tgz", @@ -918,9 +941,9 @@ } }, "node_modules/graceful-fs": { - "version": "4.2.10", - "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.10.tgz", - "integrity": "sha512-9ByhssR2fPVsNZj478qUUbKfmL0+t5BDVyjShtyZZLiK7ZDAArFFfopyOTj0M05wE2tJPisA4iTnnXl2YoPvOA==", + "version": "4.2.11", + "resolved": "https://registry.npmjs.org/graceful-fs/-/graceful-fs-4.2.11.tgz", + "integrity": "sha512-RbJ5/jmFcNNCcDV5o9eTnBLJ/HszWV0P73bc+Ff4nS/rJj+YaS6IGyiOL0VoBYX+l1Wrl3k63h/KrH+nhJ0XvQ==", "dev": true }, "node_modules/has-flag": { @@ -1214,9 +1237,9 @@ } }, "node_modules/jshint": { - "version": "2.13.5", - "resolved": "https://registry.npmjs.org/jshint/-/jshint-2.13.5.tgz", - "integrity": "sha512-dB2n1w3OaQ35PLcBGIWXlszjbPZwsgZoxsg6G8PtNf2cFMC1l0fObkYLUuXqTTdi6tKw4sAjfUseTdmDMHQRcg==", + "version": "2.13.6", + "resolved": "https://registry.npmjs.org/jshint/-/jshint-2.13.6.tgz", + "integrity": "sha512-IVdB4G0NTTeQZrBoM8C5JFVLjV2KtZ9APgybDA1MK73xb09qFs0jCXyQLnCOp1cSZZZbvhq/6mfXHUTaDkffuQ==", "dev": true, "dependencies": { "cli": "~1.0.0", @@ -1346,9 +1369,9 @@ } }, "node_modules/markdown-it-anchor": { - "version": "8.6.5", - "resolved": "https://registry.npmjs.org/markdown-it-anchor/-/markdown-it-anchor-8.6.5.tgz", - "integrity": "sha512-PI1qEHHkTNWT+X6Ip9w+paonfIQ+QZP9sCeMYi47oqhH+EsW8CrJ8J7CzV19QVOj6il8ATGbK2nTECj22ZHGvQ==", + "version": "8.6.7", + "resolved": "https://registry.npmjs.org/markdown-it-anchor/-/markdown-it-anchor-8.6.7.tgz", + "integrity": "sha512-FlCHFwNnutLgVTflOYHPW2pPcl2AACqVzExlkGQNsi4CJgqOHN7YTgDd4LuhgN1BFO3TS0vLAruV1Td6dwWPJA==", "dev": true, "peerDependencies": { "@types/markdown-it": "*", @@ -1365,9 +1388,9 @@ } }, "node_modules/marked": { - "version": "4.1.1", - "resolved": "https://registry.npmjs.org/marked/-/marked-4.1.1.tgz", - "integrity": "sha512-0cNMnTcUJPxbA6uWmCmjWz4NJRe/0Xfk2NhXCUHjew9qJzFN20krFnsUe7QynwqOwa5m1fZ4UDg0ycKFVC0ccw==", + "version": "4.3.0", + "resolved": "https://registry.npmjs.org/marked/-/marked-4.3.0.tgz", + "integrity": "sha512-PRsaiG84bK+AMvxziE/lCFss8juXjNaWzVbN5tXAm4XjeaS9NAHhop+PjQxz2A9h8Q4M/xGmzP8vqNwy6JeK0A==", "dev": true, "bin": { "marked": "bin/marked.js" @@ -1542,9 +1565,9 @@ } }, "node_modules/mocha": { - "version": "10.2.0", - "resolved": "https://registry.npmjs.org/mocha/-/mocha-10.2.0.tgz", - "integrity": "sha512-IDY7fl/BecMwFHzoqF2sg/SHHANeBoMMXFlS9r0OXKDssYE1M5O43wUY/9BVPeIvfH2zmEbBfseqN9gBQZzXkg==", + "version": "10.4.0", + "resolved": "https://registry.npmjs.org/mocha/-/mocha-10.4.0.tgz", + "integrity": "sha512-eqhGB8JKapEYcC4ytX/xrzKforgEc3j1pGlAXVy3eRwrtAy5/nIfT1SvgGzfN0XZZxeLq0aQWkOUAmqIJiv+bA==", "dev": true, "dependencies": { "ansi-colors": "4.1.1", @@ -1554,13 +1577,12 @@ "diff": "5.0.0", "escape-string-regexp": "4.0.0", "find-up": "5.0.0", - "glob": "7.2.0", + "glob": "8.1.0", "he": "1.2.0", "js-yaml": "4.1.0", "log-symbols": "4.1.0", "minimatch": "5.0.1", "ms": "2.1.3", - "nanoid": "3.3.3", "serialize-javascript": "6.0.0", "strip-json-comments": "3.1.1", "supports-color": "8.1.1", @@ -1575,10 +1597,15 @@ }, "engines": { "node": ">= 14.0.0" - }, - "funding": { - "type": "opencollective", - "url": "https://opencollective.com/mochajs" + } + }, + "node_modules/mocha/node_modules/brace-expansion": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", + "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", + "dev": true, + "dependencies": { + "balanced-match": "^1.0.0" } }, "node_modules/mocha/node_modules/escape-string-regexp": { @@ -1594,37 +1621,24 @@ } }, "node_modules/mocha/node_modules/glob": { - "version": "7.2.0", - "resolved": "https://registry.npmjs.org/glob/-/glob-7.2.0.tgz", - "integrity": "sha512-lmLf6gtyrPq8tTjSmrO94wBeQbFR3HbLHbuyD69wuyQkImp2hWqMGB47OX65FBkPffO641IP9jWa1z4ivqG26Q==", + "version": "8.1.0", + "resolved": "https://registry.npmjs.org/glob/-/glob-8.1.0.tgz", + "integrity": "sha512-r8hpEjiQEYlF2QU0df3dS+nxxSIreXQS1qRhMJM0Q5NDdR386C7jb7Hwwod8Fgiuex+k0GFjgft18yvxm5XoCQ==", "dev": true, "dependencies": { "fs.realpath": "^1.0.0", "inflight": "^1.0.4", "inherits": "2", - "minimatch": "^3.0.4", - "once": "^1.3.0", - "path-is-absolute": "^1.0.0" + "minimatch": "^5.0.1", + "once": "^1.3.0" }, "engines": { - "node": "*" + "node": ">=12" }, "funding": { "url": "https://github.com/sponsors/isaacs" } }, - "node_modules/mocha/node_modules/glob/node_modules/minimatch": { - "version": "3.1.2", - "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-3.1.2.tgz", - "integrity": "sha512-J7p63hRiAjw1NDEww1W7i37+ByIrOWO5XQQAzZ3VOcL0PNybwpfmV/N05zFAzwQ9USyEcX6t3UO+K5aqBQOIHw==", - "dev": true, - "dependencies": { - "brace-expansion": "^1.1.7" - }, - "engines": { - "node": "*" - } - }, "node_modules/mocha/node_modules/minimatch": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/minimatch/-/minimatch-5.0.1.tgz", @@ -1637,44 +1651,17 @@ "node": ">=10" } }, - "node_modules/mocha/node_modules/minimatch/node_modules/brace-expansion": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/brace-expansion/-/brace-expansion-2.0.1.tgz", - "integrity": "sha512-XnAIvQ8eM+kC6aULx6wuQiwVsnzsi9d3WxzV3FpWTGA19F621kwdbsAcFKXgKUHZWsy+mY6iL1sHTxWEFCytDA==", - "dev": true, - "dependencies": { - "balanced-match": "^1.0.0" - } - }, - "node_modules/mocha/node_modules/ms": { + "node_modules/ms": { "version": "2.1.3", "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", "dev": true }, - "node_modules/ms": { - "version": "2.1.2", - "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.2.tgz", - "integrity": "sha512-sGkPx+VjMtmA6MX27oA4FBFELFCZZ4S4XqeGOXCv68tT+jb3vk/RyaKWP0PTKyWtmLSM0b+adUTEvbs1PEaH2w==", - "dev": true - }, "node_modules/nan": { "version": "2.19.0", "resolved": "https://registry.npmjs.org/nan/-/nan-2.19.0.tgz", "integrity": "sha512-nO1xXxfh/RWNxfd/XPfbIfFk5vgLsAxUR9y5O0cHMJu/AW9U95JLXqthYHjEp+8gQ5p96K9jUp8nbVOxCdRbtw==" }, - "node_modules/nanoid": { - "version": "3.3.3", - "resolved": "https://registry.npmjs.org/nanoid/-/nanoid-3.3.3.tgz", - "integrity": "sha512-p1sjXuopFs0xg+fPASzQ28agW1oHD7xDsd9Xkf3T15H3c/cifrFHVwrh74PdoklAPi+i7MdRsE47vm2r6JoB+w==", - "dev": true, - "bin": { - "nanoid": "bin/nanoid.cjs" - }, - "engines": { - "node": "^10 || ^12 || ^13.7 || ^14 || >=15.0.1" - } - }, "node_modules/negotiator": { "version": "0.6.3", "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz", @@ -1952,12 +1939,12 @@ } }, "node_modules/requizzle": { - "version": "0.2.3", - "resolved": "https://registry.npmjs.org/requizzle/-/requizzle-0.2.3.tgz", - "integrity": "sha512-YanoyJjykPxGHii0fZP0uUPEXpvqfBDxWV7s6GKAiiOsiqhX6vHNyW3Qzdmqp/iq/ExbhaGbVrjB4ruEVSM4GQ==", + "version": "0.2.4", + "resolved": "https://registry.npmjs.org/requizzle/-/requizzle-0.2.4.tgz", + "integrity": "sha512-JRrFk1D4OQ4SqovXOgdav+K8EAhSB/LJZqCz8tbX0KObcdeM15Ss59ozWMBWmmINMagCwmqn4ZNryUGpBsl6Jw==", "dev": true, "dependencies": { - "lodash": "^4.17.14" + "lodash": "^4.17.21" } }, "node_modules/retry": { diff --git a/package.json b/package.json index e81866ea..080a69ce 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "node-rdkafka", - "version": "v3.0.0", + "version": "v3.0.1", "description": "Node.js bindings for librdkafka", "librdkafka": "2.3.0", "main": "lib/index.js", @@ -45,4 +45,4 @@ "engines": { "node": ">=16" } -} \ No newline at end of file +} diff --git a/src/admin.cc b/src/admin.cc index 1453ad35..b5c7c8f0 100644 --- a/src/admin.cc +++ b/src/admin.cc @@ -49,6 +49,24 @@ Baton AdminClient::Connect() { return Baton(RdKafka::ERR__STATE, errstr); } + if (m_init_oauthToken) { + scoped_shared_write_lock lock(m_connection_lock); + if (m_init_oauthToken) { + std::list emptyList; + std::string token = m_init_oauthToken->token; + int64_t expiry = m_init_oauthToken->expiry; + // needed for initial connection only + m_init_oauthToken.reset(); + + RdKafka::ErrorCode err = m_client->oauthbearer_set_token(token, expiry, + "", emptyList, errstr); + + if (err != RdKafka::ERR_NO_ERROR) { + return Baton(err, errstr); + } + } + } + if (rkqu == NULL) { rkqu = rd_kafka_queue_new(m_client->c_ptr()); } @@ -88,6 +106,7 @@ void AdminClient::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "connect", NodeConnect); Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect); + Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken); constructor.Reset( (tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked()); diff --git a/src/connection.cc b/src/connection.cc index 9de3b3c1..cfb8ba23 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -7,6 +7,7 @@ * of the MIT license. See the LICENSE.txt file for details. */ +#include #include #include @@ -226,6 +227,45 @@ void Connection::ConfigureCallback(const std::string &string_key, const v8::Loca } // NAN METHODS +NAN_METHOD(Connection::NodeSetToken) +{ + if (info.Length() < 1 || !info[0]->IsString()) { + Nan::ThrowError("Token argument must be a string"); + return; + } + + Nan::Utf8String tk(info[0]); + std::string token = *tk; + // we always set expiry to maximum value in ms, as we don't use refresh callback, + // rdkafka continues sending a token even if it expired. Client code must + // handle token refreshing by calling 'setToken' again when needed. + int64_t expiry = (std::numeric_limits::max)() / 100000; + Connection* obj = ObjectWrap::Unwrap(info.This()); + RdKafka::Handle* handle = obj->m_client; + + if (!handle) { + scoped_shared_write_lock lock(obj->m_connection_lock); + obj->m_init_oauthToken = std::make_unique( + OauthBearerToken{token, expiry}); + info.GetReturnValue().Set(Nan::Null()); + return; + } + + { + scoped_shared_write_lock lock(obj->m_connection_lock); + std::string errstr; + std::list emptyList; + RdKafka::ErrorCode err = handle->oauthbearer_set_token(token, expiry, + "", emptyList, errstr); + + if (err != RdKafka::ERR_NO_ERROR) { + Nan::ThrowError(errstr.c_str()); + return; + } + } + + info.GetReturnValue().Set(Nan::Null()); +} NAN_METHOD(Connection::NodeGetMetadata) { Nan::HandleScope scope; diff --git a/src/connection.h b/src/connection.h index 8c4ac73f..8ee3f052 100644 --- a/src/connection.h +++ b/src/connection.h @@ -45,7 +45,13 @@ namespace NodeKafka { */ class Connection : public Nan::ObjectWrap { - public: + struct OauthBearerToken + { + std::string token; + int64_t expiry; + }; + +public: bool IsConnected(); bool IsClosing(); @@ -82,10 +88,13 @@ class Connection : public Nan::ObjectWrap { Conf* m_tconfig; std::string m_errstr; + std::unique_ptr m_init_oauthToken; + uv_rwlock_t m_connection_lock; RdKafka::Handle* m_client; + static NAN_METHOD(NodeSetToken); static NAN_METHOD(NodeConfigureCallbacks); static NAN_METHOD(NodeGetMetadata); static NAN_METHOD(NodeQueryWatermarkOffsets); diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 019b0cb6..0f5e32ed 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -56,6 +56,24 @@ Baton KafkaConsumer::Connect() { return Baton(RdKafka::ERR__STATE, errstr); } + if (m_init_oauthToken) { + scoped_shared_write_lock lock(m_connection_lock); + if (m_init_oauthToken) { + std::list emptyList; + std::string token = m_init_oauthToken->token; + int64_t expiry = m_init_oauthToken->expiry; + // needed for initial connection only + m_init_oauthToken.reset(); + + RdKafka::ErrorCode err = m_client->oauthbearer_set_token(token, expiry, + "", emptyList, errstr); + + if (err != RdKafka::ERR_NO_ERROR) { + return Baton(err, errstr); + } + } + } + if (m_partitions.size() > 0) { m_client->resume(m_partitions); } @@ -499,6 +517,7 @@ void KafkaConsumer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "connect", NodeConnect); Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect); + Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken); Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata); Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT Nan::SetPrototypeMethod(tpl, "offsetsForTimes", NodeOffsetsForTimes); diff --git a/src/producer.cc b/src/producer.cc index 04e75688..c8e3e632 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -66,6 +66,7 @@ void Producer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "connect", NodeConnect); Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect); + Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken); Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata); Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT Nan::SetPrototypeMethod(tpl, "poll", NodePoll); @@ -183,6 +184,25 @@ Baton Producer::Connect() { return Baton(RdKafka::ERR__STATE, errstr); } + if (m_init_oauthToken) { + scoped_shared_write_lock lock(m_connection_lock); + if (m_init_oauthToken) { + std::list emptyList; + std::string token = m_init_oauthToken->token; + int64_t expiry = m_init_oauthToken->expiry; + // needed for initial connection only + m_init_oauthToken.reset(); + + RdKafka::ErrorCode err = m_client->oauthbearer_set_token(token, expiry, + "", emptyList, errstr); + + if (err != RdKafka::ERR_NO_ERROR) { + return Baton(err, errstr); + } + } + } + + return Baton(RdKafka::ERR_NO_ERROR); } diff --git a/test/binding.spec.js b/test/binding.spec.js index b82c7bc4..117fde1b 100644 --- a/test/binding.spec.js +++ b/test/binding.spec.js @@ -56,7 +56,7 @@ module.exports = { }); }, 'has necessary methods from superclass': function() { - var methods = ['connect', 'disconnect', 'configureCallbacks', 'getMetadata']; + var methods = ['connect', 'disconnect', 'setToken', 'configureCallbacks', 'getMetadata']; methods.forEach(function(m) { t.equal(typeof(client[m]), 'function', 'Client is missing ' + m + ' method'); }); diff --git a/test/consumer.spec.js b/test/consumer.spec.js index 40b52ee4..4fc3bd53 100644 --- a/test/consumer.spec.js +++ b/test/consumer.spec.js @@ -71,7 +71,7 @@ module.exports = { }); }, 'has necessary methods from superclass': function() { - var methods = ['connect', 'disconnect', 'configureCallbacks', 'getMetadata']; + var methods = ['connect', 'disconnect', 'setToken', 'configureCallbacks', 'getMetadata']; methods.forEach(function(m) { t.equal(typeof(client[m]), 'function', 'Client is missing ' + m + ' method'); });