Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add setToken API for OAuthBearer authentication flow #1075

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 80 additions & 0 deletions examples/oauthbearer-default-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
Producer, Consumer and HighLevelProducer:
andrewstanovsky marked this conversation as resolved.
Show resolved Hide resolved
```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

var Kafka = require('../');

var token = "your_token";

var producer = new Kafka.Producer({
//'debug' : 'all',
'metadata.broker.list': 'localhost:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
}).setOauthBearerToken(token);

//start the producer
producer.connect();

//refresh the token
producer.setOauthBearerToken(token);
```

AdminClient:
```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
var Kafka = require('../');

var token = "your_token";

var admin = Kafka.AdminClient.create({
'metadata.broker.list': 'localhost:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
}, token);

//refresh the token
admin.refreshOauthBearerToken(token);
```

ConsumerStream:
```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
var Kafka = require('../');

var token = "your_token";

var stream = Kafka.KafkaConsumer.createReadStream({
'metadata.broker.list': 'localhost:9093',
'group.id': 'myGroup',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER'
}, {}, {
topics: 'test1',
initOauthBearerToken: token,
});

//refresh the token
stream.refreshOauthBearerToken(token.token);
```
8 changes: 7 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export interface ReadStreamOptions extends ReadableOptions {
autoClose?: boolean;
streamAsBatch?: boolean;
connectOptions?: any;
initOauthBearerToken?: string;
}

export interface WriteStreamOptions extends WritableOptions {
Expand All @@ -137,6 +138,7 @@ export interface ProducerStream extends Writable {
export interface ConsumerStream extends Readable {
consumer: KafkaConsumer;
connect(options: ConsumerGlobalConfig): void;
refreshOauthBearerToken(tokenStr: string): void;
close(cb?: () => void): void;
}

Expand Down Expand Up @@ -180,6 +182,8 @@ export abstract class Client<Events extends string> extends EventEmitter {

connect(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): this;

setOauthBearerToken(tokenStr: string): this;

getClient(): any;

connectedTime(): number;
Expand Down Expand Up @@ -330,6 +334,8 @@ export interface NewTopic {
}

export interface IAdminClient {
refreshOauthBearerToken(tokenStr: string): void;

createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void;
createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;

Expand All @@ -343,5 +349,5 @@ export interface IAdminClient {
}

export abstract class AdminClient {
static create(conf: GlobalConfig): IAdminClient;
static create(conf: GlobalConfig, initOauthBearerToken?: string): IAdminClient;
}
22 changes: 21 additions & 1 deletion lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ var shallowCopy = require('./util').shallowCopy;
* active handle with the brokers.
*
*/
function createAdminClient(conf) {
function createAdminClient(conf, initOauthBearerToken) {
var client = new AdminClient(conf);

if (initOauthBearerToken) {
client.refreshOauthBearerToken(initOauthBearerToken);
}

// Wrap the error so we throw if it failed with some context
LibrdKafkaError.wrap(client.connect(), true);

Expand Down Expand Up @@ -105,6 +109,22 @@ AdminClient.prototype.disconnect = function() {
this._isConnected = false;
};

/**
* Refresh OAuthBearer token, initially provided in factory method.
* Expiry is always set to maximum value, as the callback of librdkafka
* for token refresh is not used.
*
* @param {string} tokenStr - OAuthBearer token string
* @see connection.cc
*/
AdminClient.prototype.refreshOauthBearerToken = function (tokenStr) {
if (!tokenStr || typeof tokenStr !== 'string') {
throw new Error("OAuthBearer token is undefined/empty or not a string");
}

this._client.setToken(tokenStr);
};

/**
* Create a topic with a given config.
*
Expand Down
19 changes: 19 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,25 @@ Client.prototype.connect = function(metadataOptions, cb) {

};

/**
* Set initial token before any connection is established for oauthbearer authentication flow.
* Expiry is always set to maximum value, as the callback of librdkafka
* for token refresh is not used.
* Call this method again to refresh the token.
*
* @param {string} tokenStr - OAuthBearer token string
* @see connection.cc
* @return {Client} - Returns itself.
*/
Client.prototype.setOauthBearerToken = function (tokenStr) {
if (!tokenStr || typeof tokenStr !== 'string') {
throw new Error("OAuthBearer token is undefined/empty or not a string");
}

this._client.setToken(tokenStr);
return this;
};

/**
* Get the native Kafka client.
*
Expand Down
16 changes: 16 additions & 0 deletions lib/kafka-consumer-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ function KafkaConsumerStream(consumer, options) {
self.push(null);
});

if (options.initOauthBearerToken) {
this.consumer.setOauthBearerToken(options.initOauthBearerToken);
}

// Call connect. Handles potentially being connected already
this.connect(this.connectOptions);

Expand All @@ -123,6 +127,18 @@ function KafkaConsumerStream(consumer, options) {

}

/**
* Refresh OAuthBearer token, initially provided in factory method.
* Expiry is always set to maximum value, as the callback of librdkafka
* for token refresh is not used.
*
* @param {string} tokenStr - OAuthBearer token string
* @see connection.cc
*/
KafkaConsumerStream.prototype.refreshOauthBearerToken = function (tokenStr) {
this.consumer.setOauthBearerToken(tokenStr);
};

/**
* Internal stream read method. This method reads message objects.
* @param {number} size - This parameter is ignored for our cases.
Expand Down
Loading
Loading