Skip to content

Commit

Permalink
refactor(ts): added ts style fix for src/publisher.ts (#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
vijay-qlogic authored and JustinBeckwith committed Nov 22, 2018
1 parent 35e7e9b commit 9f67dc1
Showing 1 changed file with 32 additions and 36 deletions.
68 changes: 32 additions & 36 deletions src/publisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@
* limitations under the License.
*/

import * as arrify from 'arrify';
import {promisifyAll} from '@google-cloud/promisify';
import * as arrify from 'arrify';

const each = require('async-each');
import * as extend from 'extend';
import * as is from 'is';
import { Topic } from './topic';
import {Topic} from './topic';

/**
* A Publisher object allows you to publish messages to a specific topic.
Expand Down Expand Up @@ -48,6 +49,7 @@ import { Topic } from './topic';
* const publisher = topic.publisher();
*/
export class Publisher {
// tslint:disable-next-line variable-name
Promise?: PromiseConstructor;
topic: Topic;
inventory_;
Expand All @@ -58,16 +60,14 @@ export class Publisher {
this.Promise = topic.Promise;
}
options = extend(
true,
{
batching: {
maxBytes: Math.pow(1024, 2) * 5,
maxMessages: 1000,
maxMilliseconds: 100,
true, {
batching: {
maxBytes: Math.pow(1024, 2) * 5,
maxMessages: 1000,
maxMilliseconds: 100,
},
},
},
options
);
options);
/**
* The topic of this publisher.
*
Expand Down Expand Up @@ -157,20 +157,19 @@ export class Publisher {
attributes = {};
}
// Ensure the `attributes` object only has string values
for (const key in attributes) {
for (const key of Object.keys(attributes)) {
const value = attributes[key];
if (!is.string(value)) {
throw new TypeError(`All attributes must be in the form of a string.
\nInvalid value of type "${typeof value}" provided for "${key}".`);
}
}

const opts = this.settings.batching;
// if this message puts us over the maxBytes option, then let's ship
// what we have and add it to the next batch
if (
this.inventory_.bytes > 0 &&
this.inventory_.bytes + data.length > opts.maxBytes
) {
if (this.inventory_.bytes > 0 &&
this.inventory_.bytes + data.length > opts.maxBytes) {
this.publish_();
}
// add it to the queue!
Expand All @@ -184,10 +183,8 @@ export class Publisher {
}
// otherwise let's set a timeout to send the next batch
if (!this.timeoutHandle_) {
this.timeoutHandle_ = setTimeout(
this.publish_.bind(this),
opts.maxMilliseconds
);
this.timeoutHandle_ =
setTimeout(this.publish_.bind(this), opts.maxMilliseconds);
}
}
/**
Expand All @@ -205,24 +202,23 @@ export class Publisher {
this.timeoutHandle_ = null;
const reqOpts = {
topic: this.topic.name,
messages: messages,
messages,
};
this.topic.request(
{
client: 'PublisherClient',
method: 'publish',
reqOpts: reqOpts,
gaxOpts: this.settings.gaxOpts,
},
(err, resp) => {
const messageIds = arrify(resp && resp.messageIds);
each(callbacks, (callback, next) => {
const messageId = messageIds[callbacks.indexOf(callback)];
callback(err, messageId);
next();
{
client: 'PublisherClient',
method: 'publish',
reqOpts,
gaxOpts: this.settings.gaxOpts,
},
(err, resp) => {
const messageIds = arrify(resp && resp.messageIds);
each(callbacks, (callback, next) => {
const messageId = messageIds[callbacks.indexOf(callback)];
callback(err, messageId);
next();
});
});
}
);
}
/**
* Queues message to be sent to the server.
Expand All @@ -235,7 +231,7 @@ export class Publisher {
*/
queue_(data, attrs, callback) {
this.inventory_.queued.push({
data: data,
data,
attributes: attrs,
});
this.inventory_.bytes += data.length;
Expand Down

0 comments on commit 9f67dc1

Please sign in to comment.