Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: improve subscriber error handling #440

Merged
merged 2 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions src/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

import {CallOptions} from 'google-gax';
import {Metadata, ServiceError, status} from 'grpc';
import * as defer from 'p-defer';

import {Message, Subscriber} from './subscriber';
Expand All @@ -37,6 +38,28 @@ export interface BatchOptions {
maxMilliseconds?: number;
}

/**
* Error class used to signal a batch failure.
*
* @class
*
* @param {string} message The error message.
* @param {ServiceError} err The grpc service error.
*/
export class BatchError extends Error implements ServiceError {
ackIds: string[];
code?: status;
metadata?: Metadata;
constructor(err: ServiceError, ackIds: string[], rpc: string) {
super(`Failed to "${rpc}" for ${ackIds.length} message(s). Reason: ${
err.message}`);

this.ackIds = ackIds;
this.code = err.code;
this.metadata = err.metadata;
}
}

/**
* Class for buffering ack/modAck requests.
*
Expand Down Expand Up @@ -162,7 +185,11 @@ export class AckQueue extends MessageQueue {
const ackIds = batch.map(([ackId]) => ackId);
const reqOpts = {subscription: this._subscriber.name, ackIds};

await client.acknowledge(reqOpts, this._options.callOptions!);
try {
await client.acknowledge(reqOpts, this._options.callOptions!);
} catch (e) {
throw new BatchError(e, ackIds, 'acknowledge');
}
}
}

Expand Down Expand Up @@ -194,12 +221,16 @@ export class ModAckQueue extends MessageQueue {
return table;
}, {});

const modAckRequests = Object.keys(modAckTable).map(deadline => {
const modAckRequests = Object.keys(modAckTable).map(async (deadline) => {
const ackIds = modAckTable[deadline];
const ackDeadlineSeconds = Number(deadline);
const reqOpts = {subscription, ackIds, ackDeadlineSeconds};

return client.modifyAckDeadline(reqOpts, this._options.callOptions!);
try {
await client.modifyAckDeadline(reqOpts, this._options.callOptions!);
} catch (e) {
throw new BatchError(e, ackIds, 'modifyAckDeadline');
}
});

await Promise.all(modAckRequests);
Expand Down
56 changes: 44 additions & 12 deletions src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import {promisify} from '@google-cloud/promisify';
import {ClientStub} from 'google-gax';
import {ClientDuplexStream, Metadata, StatusObject} from 'grpc';
import {ClientDuplexStream, Metadata, ServiceError, status, StatusObject} from 'grpc';
import * as isStreamEnded from 'is-stream-ended';
import {Duplex, PassThrough} from 'stream';

Expand All @@ -27,10 +27,20 @@ import {PullResponse, Subscriber} from './subscriber';
*/
const KEEP_ALIVE_INTERVAL = 30000;

/*!
* Deadline Exceeded status code
*/
const DEADLINE: status = 4;

/*!
* Unknown status code
*/
const UNKNOWN: status = 2;

/*!
* codes to retry streams
*/
const RETRY_CODES: number[] = [
const RETRY_CODES: status[] = [
0, // ok
1, // canceled
2, // unknown
Expand All @@ -45,7 +55,7 @@ const RETRY_CODES: number[] = [
/*!
* default stream options
*/
const DEFAULT_OPTIONS = {
const DEFAULT_OPTIONS: MessageStreamOptions = {
highWaterMark: 0,
maxStreams: 5,
timeout: 300000,
Expand Down Expand Up @@ -73,16 +83,31 @@ type PullStream = ClientDuplexStream<StreamingPullRequest, PullResponse>&
*
* @param {object} status The gRPC status object.
*/
export class StatusError extends Error {
code: number;
metadata: Metadata;
export class StatusError extends Error implements ServiceError {
code?: status;
metadata?: Metadata;
constructor(status: StatusObject) {
super(status.details);
this.code = status.code;
this.metadata = status.metadata;
}
}

/**
* Error thrown when we fail to open a channel for the message stream.
*
* @class
*
* @param {Error} err The original error.
*/
export class ChannelError extends Error implements ServiceError {
code: status;
constructor(err: Error) {
super(`Failed to connect to channel. Reason: ${err.message}`);
this.code = err.message.includes('deadline') ? DEADLINE : UNKNOWN;
}
}

/**
* Ponyfill for destroying streams.
*
Expand Down Expand Up @@ -272,16 +297,18 @@ export class MessageStream extends PassThrough {
}
}
/**
* Sometimes a gRPC status will be emitted as both a status event and an
* error event. In order to cut back on emitted errors, we'll ignore any
* error events that come in AFTER the status has been received.
* gRPC will usually emit a status as a ServiceError via `error` event before
* it emits the status itself. In order to cut back on emitted errors, we'll
* wait a tick on error and ignore it if the status has been received.
*
* @private
*
* @param {stream} stream The stream that errored.
* @param {Error} err The error.
*/
private _onError(stream: PullStream, err: Error): void {
private async _onError(stream: PullStream, err: Error): Promise<void> {
await promisify(setImmediate)();

const code = (err as StatusError).code;
const receivedStatus = this._streams.get(stream) !== false;

Expand Down Expand Up @@ -349,8 +376,13 @@ export class MessageStream extends PassThrough {
* @param {object} client The gRPC client to wait for.
* @returns {Promise}
*/
private _waitForClientReady(client: ClientStub): Promise<void> {
private async _waitForClientReady(client: ClientStub): Promise<void> {
const deadline = Date.now() + this._options.timeout!;
return promisify(client.waitForReady).call(client, deadline);

try {
await promisify(client.waitForReady).call(client, deadline);
} catch (e) {
throw new ChannelError(e);
}
}
}
63 changes: 63 additions & 0 deletions test/message-queues.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@

import * as assert from 'assert';
import {EventEmitter} from 'events';
import {Metadata, ServiceError} from 'grpc';
import * as proxyquire from 'proxyquire';
import * as sinon from 'sinon';
import * as uuid from 'uuid';

import {BatchError} from '../src/message-queues';

class FakeClient {
async acknowledge(reqOpts, callOptions): Promise<void> {}
async modifyAckDeadline(reqOpts, callOptions): Promise<void> {}
Expand Down Expand Up @@ -296,6 +299,36 @@ describe('MessageQueues', () => {
const [, callOptions] = stub.lastCall.args;
assert.strictEqual(callOptions, fakeCallOptions);
});

it('should throw a BatchError if unable to ack', done => {
const messages = [
new FakeMessage(),
new FakeMessage(),
new FakeMessage(),
];

const ackIds = messages.map(message => message.ackId);

const fakeError: ServiceError = new Error('Err.');
fakeError.code = 2;
fakeError.metadata = new Metadata();

const expectedMessage =
`Failed to "acknowledge" for 3 message(s). Reason: Err.`;

sandbox.stub(subscriber.client, 'acknowledge').rejects(fakeError);

subscriber.on('error', (err: BatchError) => {
assert.strictEqual(err.message, expectedMessage);
assert.deepStrictEqual(err.ackIds, ackIds);
assert.strictEqual(err.code, fakeError.code);
assert.strictEqual(err.metadata, fakeError.metadata);
done();
});

messages.forEach(message => ackQueue.add(message));
ackQueue.flush();
});
});

describe('ModAckQueue', () => {
Expand Down Expand Up @@ -376,5 +409,35 @@ describe('MessageQueues', () => {
const [, callOptions] = stub.lastCall.args;
assert.strictEqual(callOptions, fakeCallOptions);
});

it('should throw a BatchError if unable to modAck', done => {
const messages = [
new FakeMessage(),
new FakeMessage(),
new FakeMessage(),
];

const ackIds = messages.map(message => message.ackId);

const fakeError: ServiceError = new Error('Err.');
fakeError.code = 2;
fakeError.metadata = new Metadata();

const expectedMessage =
`Failed to "modifyAckDeadline" for 3 message(s). Reason: Err.`;

sandbox.stub(subscriber.client, 'modifyAckDeadline').rejects(fakeError);

subscriber.on('error', (err: BatchError) => {
assert.strictEqual(err.message, expectedMessage);
assert.deepStrictEqual(err.ackIds, ackIds);
assert.strictEqual(err.code, fakeError.code);
assert.strictEqual(err.metadata, fakeError.metadata);
done();
});

messages.forEach(message => modAckQueue.add(message));
modAckQueue.flush();
});
});
});
33 changes: 31 additions & 2 deletions test/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,15 @@ describe('MessageStream', () => {
});
});

it('should destroy the stream if unable to verify channel', done => {
it('should destroy the stream if unable to connect to channel', done => {
const stub = sandbox.stub(client, 'waitForReady');
const ms = new MessageStream(subscriber);
const fakeError = new Error('err');
const expectedMessage = `Failed to connect to channel. Reason: err`;

ms.on('error', err => {
assert.strictEqual(err, fakeError);
assert.strictEqual(err.code, 2);
assert.strictEqual(err.message, expectedMessage);
assert.strictEqual(ms.destroyed, true);
done();
});
Expand All @@ -394,6 +396,22 @@ describe('MessageStream', () => {
});
});

it('should give a deadline error if waitForReady times out', done => {
const stub = sandbox.stub(client, 'waitForReady');
const ms = new MessageStream(subscriber);
const fakeError = new Error('Failed to connect before the deadline');

ms.on('error', err => {
assert.strictEqual(err.code, 4);
done();
});

setImmediate(() => {
const [, callback] = stub.lastCall.args;
callback(fakeError);
});
});

it('should emit non-status errors', done => {
const fakeError = new Error('err');

Expand All @@ -405,6 +423,17 @@ describe('MessageStream', () => {
client.streams[0].emit('error', fakeError);
});

it('should ignore status errors', done => {
const [stream] = client.streams;
const status = {code: 0};

messageStream.on('error', done);
stream.emit('error', status);
stream.emit('status', status);

setImmediate(done);
});

it('should ignore errors that come in after the status', done => {
const [stream] = client.streams;

Expand Down