Skip to content

Commit

Permalink
fix: don't do multiple drains per publish() in message queues unless …
Browse files Browse the repository at this point in the history
…requested (#1691)
  • Loading branch information
feywind authored Mar 6, 2023
1 parent 38d8455 commit d9b3a63
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
/build/
system-test/secrets.js
system-test/*key.json
samples/**/build
*.lock
.DS_Store
package-lock.json
Expand Down
4 changes: 2 additions & 2 deletions src/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ export class Publisher {
const flushResolver = () => {
resolve();

// flush() maybe called more than once, so remove these
// flush() may be called more than once, so remove these
// event listeners after we've completed flush().
q.removeListener('drain', flushResolver);
};
Expand All @@ -129,7 +129,7 @@ export class Publisher {
);

const allPublishes = Promise.all(
toDrain.map(q => promisify(q.publish).bind(q)())
toDrain.map(q => promisify(q.publishDrain).bind(q)())
);

allPublishes
Expand Down
53 changes: 49 additions & 4 deletions src/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,23 @@ export abstract class MessageQueue extends EventEmitter {
* @param {PublishCallback} callback The publish callback.
*/
abstract add(message: PubsubMessage, callback: PublishCallback): void;

/**
* Method to initiate publishing.
* Method to initiate publishing. Full drain behaviour depends on whether the
* queues are ordered or not.
*
* @abstract
*/
abstract publish(): void;

/**
* Method to finalize publishing. Does as many publishes as are needed
* to finish emptying the queues, and fires a drain event afterward.
*
* @abstract
*/
abstract publishDrain(): void;

/**
* Accepts a batch of messages and publishes them to the API.
*
Expand Down Expand Up @@ -156,12 +167,33 @@ export class Queue extends MessageQueue {
this.pending = setTimeout(() => this.publish(), maxMilliseconds!);
}
}

/**
* Cancels any pending publishes and calls _publish immediately.
*
* _Does_ attempt to further drain after one batch is sent.
*
* @emits Queue#drain when all messages are sent.
*/
publishDrain(callback?: PublishDone): void {
this._publishInternal(true, callback);
}

/**
* Cancels any pending publishes and calls _publish immediately.
*
* Does _not_ attempt to further drain after one batch is sent.
*/
publish(callback?: PublishDone): void {
this._publishInternal(false, callback);
}

/**
* Cancels any pending publishes and calls _publish immediately.
*
* @emits Queue#drain when all messages are sent.
*/
_publishInternal(fullyDrain: boolean, callback?: PublishDone): void {
const definedCallback = callback || (() => {});
const {messages, callbacks} = this.batch;

Expand All @@ -176,8 +208,12 @@ export class Queue extends MessageQueue {
if (err) {
definedCallback(err);
} else if (this.batch.messages.length) {
// Make another go-around, we're trying to drain the queues fully.
this.publish(callback);
// We only do the indefinite go-arounds when we're trying to do a
// final drain for flush(). In all other cases, we want to leave
// subsequent batches alone so that they can time out as needed.
if (fullyDrain) {
this._publishInternal(true, callback);
}
} else {
this.emit('drain');
definedCallback(null);
Expand Down Expand Up @@ -279,7 +315,7 @@ export class OrderedQueue extends MessageQueue {
*
* @returns {MessageBatch}
*/
createBatch() {
createBatch(): MessageBatch {
return new MessageBatch(this.batchOptions);
}
/**
Expand Down Expand Up @@ -333,6 +369,15 @@ export class OrderedQueue extends MessageQueue {
});
}

/**
* For ordered queues, this does exactly the same thing as `publish()`.
*
* @fires OrderedQueue#drain
*/
publishDrain(callback?: PublishDone): void {
this.publish(callback);
}

/**
* Tells the queue it is ok to continue publishing messages.
*/
Expand Down
6 changes: 6 additions & 0 deletions test/publisher/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ class FakeQueue extends EventEmitter {
publish(callback: (err: Error | null) => void) {
this._publish([], [], callback);
}
publishDrain(callback: (err: Error | null) => void) {
this.publish(callback);
}
_publish(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
messages: p.PubsubMessage[],
Expand All @@ -85,6 +88,9 @@ class FakeOrderedQueue extends FakeQueue {
publish(callback: (err: Error | null) => void) {
this._publish([], [], callback);
}
publishDrain(callback: (err: Error | null) => void) {
this.publish(callback);
}
_publish(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
messages: p.PubsubMessage[],
Expand Down
71 changes: 71 additions & 0 deletions test/publisher/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ class FakeMessageBatch {
created: number;
messages: p.PubsubMessage[];
options: b.BatchPublishOptions;
bytes: number;
constructor(options = {} as b.BatchPublishOptions) {
this.callbacks = [];
this.created = Date.now();
this.messages = [];
this.options = options;
this.bytes = 0;
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
add(message: p.PubsubMessage, callback: p.PublishCallback): void {}
Expand Down Expand Up @@ -332,6 +334,75 @@ describe('Message Queues', () => {
assert.strictEqual(messages, batch.messages);
assert.strictEqual(callbacks, batch.callbacks);
});

describe('publish chaining', () => {
let fakeMessages: p.PubsubMessage[];
let spies: p.PublishCallback[];
beforeEach(() => {
fakeMessages = [{}, {}] as p.PubsubMessage[];
spies = [sandbox.spy(), sandbox.spy()] as p.PublishCallback[];
});

it('should begin another publish(drain) if there are pending batches', () => {
const stub = sandbox.stub(queue, '_publish');
let once = false;
stub.callsFake((m, c, done) => {
if (!once) {
// Drop in a second batch before calling the callback.
const secondBatch = new FakeMessageBatch();
secondBatch.messages = fakeMessages;
secondBatch.callbacks = spies;
queue.batch = secondBatch;
}
once = true;

done!(null);
});

queue.batch = new FakeMessageBatch();
queue.batch.messages = fakeMessages;
queue.batch.callbacks = spies;
queue.publishDrain();

assert.strictEqual(stub.callCount, 2);
});

it('should not begin another publish(non-drain) if there are pending batches', () => {
const stub = sandbox.stub(queue, '_publish');
let once = false;
stub.callsFake((m, c, done) => {
if (!once) {
// Drop in a second batch before calling the callback.
const secondBatch = new FakeMessageBatch();
secondBatch.messages = fakeMessages;
secondBatch.callbacks = spies;
queue.batch = secondBatch;
}
once = true;

done!(null);
});

queue.batch = new FakeMessageBatch();
queue.batch.messages = fakeMessages;
queue.batch.callbacks = spies;
queue.publish();

assert.strictEqual(stub.callCount, 1);
});

it('should emit "drain" if there is nothing left to publish', () => {
const spy = sandbox.spy();
sandbox
.stub(queue, '_publish')
.callsFake((m, c, done) => done!(null));

queue.on('drain', spy);
queue.publish();

assert.strictEqual(spy.callCount, 1);
});
});
});
});

Expand Down

0 comments on commit d9b3a63

Please sign in to comment.