From ca749dec77fceb7fd69acf45460a486b4fe23efa Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Tue, 26 Feb 2019 12:36:04 -0500 Subject: [PATCH 1/3] fix(messageStream): remove call to destroy grpc stream --- src/message-stream.ts | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/message-stream.ts b/src/message-stream.ts index f72f58409..6cae4854f 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -197,7 +197,16 @@ export class MessageStream extends PassThrough { stream.cancel(); } - return destroy(this, err); + if (typeof super.destroy === 'function') { + return super.destroy(err); + } + + process.nextTick(() => { + if (err) { + this.emit('error', err); + } + this.emit('close'); + }); } /** * Adds a StreamingPull stream to the combined stream. @@ -329,7 +338,6 @@ export class MessageStream extends PassThrough { */ private _onStatus(stream: PullStream, status: StatusObject): void { if (this.destroyed) { - destroy(stream); return; } From aaa9f6e290f318b465a516ab9f51d083f69380c1 Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Wed, 27 Feb 2019 15:35:45 -0500 Subject: [PATCH 2/3] fix unit tests --- src/message-stream.ts | 25 +------------------------ test/message-stream.ts | 29 +++++++---------------------- 2 files changed, 8 insertions(+), 46 deletions(-) diff --git a/src/message-stream.ts b/src/message-stream.ts index 6cae4854f..96cfa4d4a 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -19,7 +19,7 @@ import {Gaxios} from 'gaxios'; import {ClientStub} from 'google-gax'; import {ClientDuplexStream, Metadata, ServiceError, status, StatusObject} from 'grpc'; import * as isStreamEnded from 'is-stream-ended'; -import {Duplex, PassThrough} from 'stream'; +import {PassThrough} from 'stream'; import {PullResponse, Subscriber} from './subscriber'; @@ -109,29 +109,6 @@ export class ChannelError extends Error implements ServiceError { } } -/** - * Ponyfill for destroying streams. - * - * @private - * - * @param {stream} stream The stream to destroy. - * @param {error?} err Error to emit. - */ -export function destroy(stream: Duplex, err?: Error): void { - const nativeDestroy = Duplex.prototype.destroy; - - if (typeof nativeDestroy === 'function') { - return nativeDestroy.call(stream, err); - } - - process.nextTick(() => { - if (err) { - stream.emit('error', err); - } - stream.emit('close'); - }); -} - /** * @typedef {object} MessageStreamOptions * @property {number} [highWaterMark=0] Configures the Buffer level for all diff --git a/test/message-stream.ts b/test/message-stream.ts index 66d6c632f..1c7937031 100644 --- a/test/message-stream.ts +++ b/test/message-stream.ts @@ -136,10 +136,9 @@ describe('MessageStream', () => { let messageStream; before(() => { - MessageStream = - proxyquire('../src/message-stream.js', { - 'stream': {Duplex: FakeDuplex, PassThrough: FakePassThrough} - }).MessageStream; + MessageStream = proxyquire('../src/message-stream.js', { + 'stream': {PassThrough: FakePassThrough} + }).MessageStream; }); beforeEach(() => { @@ -257,7 +256,7 @@ describe('MessageStream', () => { describe('destroy', () => { it('should noop if already destroyed', done => { - const stub = sandbox.stub(FakeDuplex.prototype, 'destroy') + const stub = sandbox.stub(FakePassThrough.prototype, 'destroy') .callsFake(function(this: Duplex) { if (this === messageStream) { done(); @@ -309,13 +308,13 @@ describe('MessageStream', () => { let destroy; before(() => { - destroy = FakeDuplex.prototype.destroy; + destroy = FakePassThrough.prototype.destroy; // tslint:disable-next-line no-any - FakeDuplex.prototype.destroy = (false as any); + FakePassThrough.prototype.destroy = (false as any); }); after(() => { - FakeDuplex.prototype.destroy = destroy; + FakePassThrough.prototype.destroy = destroy; }); it('should emit close', done => { @@ -446,20 +445,6 @@ describe('MessageStream', () => { }); describe('on status', () => { - it('should destroy the stream if the message stream is destroyed', - done => { - const [stream] = client.streams; - const stub = sandbox.stub(FakeDuplex.prototype, 'destroy') - .callsFake(function(this: Duplex) { - if (this === stream) { - done(); - } - }); - - messageStream.destroy(); - stream.emit('status', {}); - }); - it('should wait for end to fire before creating a new stream', done => { const [stream] = client.streams; const expectedCount = stream.listenerCount('end') + 1; From 0cf44a0cd10f4981e1ab93a9bb2bf2a9999f3a40 Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Wed, 27 Feb 2019 15:43:39 -0500 Subject: [PATCH 3/3] fix test for node6 --- test/message-stream.ts | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/test/message-stream.ts b/test/message-stream.ts index 1c7937031..acdfafd0d 100644 --- a/test/message-stream.ts +++ b/test/message-stream.ts @@ -41,21 +41,18 @@ interface StreamOptions { highWaterMark?: number; } -class FakeDuplex extends Duplex { - destroy(err?: Error): void { - if (super.destroy) { - return super.destroy(err); - } - destroy(this, err); - } -} - class FakePassThrough extends PassThrough { options: StreamOptions; constructor(options: StreamOptions) { super(options); this.options = options; } + destroy(err?: Error): void { + if (super.destroy) { + return super.destroy(err); + } + destroy(this, err); + } } class FakeGrpcStream extends Duplex {