diff --git a/src/message-stream.ts b/src/message-stream.ts index f72f58409..96cfa4d4a 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -19,7 +19,7 @@ import {Gaxios} from 'gaxios'; import {ClientStub} from 'google-gax'; import {ClientDuplexStream, Metadata, ServiceError, status, StatusObject} from 'grpc'; import * as isStreamEnded from 'is-stream-ended'; -import {Duplex, PassThrough} from 'stream'; +import {PassThrough} from 'stream'; import {PullResponse, Subscriber} from './subscriber'; @@ -109,29 +109,6 @@ export class ChannelError extends Error implements ServiceError { } } -/** - * Ponyfill for destroying streams. - * - * @private - * - * @param {stream} stream The stream to destroy. - * @param {error?} err Error to emit. - */ -export function destroy(stream: Duplex, err?: Error): void { - const nativeDestroy = Duplex.prototype.destroy; - - if (typeof nativeDestroy === 'function') { - return nativeDestroy.call(stream, err); - } - - process.nextTick(() => { - if (err) { - stream.emit('error', err); - } - stream.emit('close'); - }); -} - /** * @typedef {object} MessageStreamOptions * @property {number} [highWaterMark=0] Configures the Buffer level for all @@ -197,7 +174,16 @@ export class MessageStream extends PassThrough { stream.cancel(); } - return destroy(this, err); + if (typeof super.destroy === 'function') { + return super.destroy(err); + } + + process.nextTick(() => { + if (err) { + this.emit('error', err); + } + this.emit('close'); + }); } /** * Adds a StreamingPull stream to the combined stream. @@ -329,7 +315,6 @@ export class MessageStream extends PassThrough { */ private _onStatus(stream: PullStream, status: StatusObject): void { if (this.destroyed) { - destroy(stream); return; } diff --git a/test/message-stream.ts b/test/message-stream.ts index 66d6c632f..acdfafd0d 100644 --- a/test/message-stream.ts +++ b/test/message-stream.ts @@ -41,21 +41,18 @@ interface StreamOptions { highWaterMark?: number; } -class FakeDuplex extends Duplex { - destroy(err?: Error): void { - if (super.destroy) { - return super.destroy(err); - } - destroy(this, err); - } -} - class FakePassThrough extends PassThrough { options: StreamOptions; constructor(options: StreamOptions) { super(options); this.options = options; } + destroy(err?: Error): void { + if (super.destroy) { + return super.destroy(err); + } + destroy(this, err); + } } class FakeGrpcStream extends Duplex { @@ -136,10 +133,9 @@ describe('MessageStream', () => { let messageStream; before(() => { - MessageStream = - proxyquire('../src/message-stream.js', { - 'stream': {Duplex: FakeDuplex, PassThrough: FakePassThrough} - }).MessageStream; + MessageStream = proxyquire('../src/message-stream.js', { + 'stream': {PassThrough: FakePassThrough} + }).MessageStream; }); beforeEach(() => { @@ -257,7 +253,7 @@ describe('MessageStream', () => { describe('destroy', () => { it('should noop if already destroyed', done => { - const stub = sandbox.stub(FakeDuplex.prototype, 'destroy') + const stub = sandbox.stub(FakePassThrough.prototype, 'destroy') .callsFake(function(this: Duplex) { if (this === messageStream) { done(); @@ -309,13 +305,13 @@ describe('MessageStream', () => { let destroy; before(() => { - destroy = FakeDuplex.prototype.destroy; + destroy = FakePassThrough.prototype.destroy; // tslint:disable-next-line no-any - FakeDuplex.prototype.destroy = (false as any); + FakePassThrough.prototype.destroy = (false as any); }); after(() => { - FakeDuplex.prototype.destroy = destroy; + FakePassThrough.prototype.destroy = destroy; }); it('should emit close', done => { @@ -446,20 +442,6 @@ describe('MessageStream', () => { }); describe('on status', () => { - it('should destroy the stream if the message stream is destroyed', - done => { - const [stream] = client.streams; - const stub = sandbox.stub(FakeDuplex.prototype, 'destroy') - .callsFake(function(this: Duplex) { - if (this === stream) { - done(); - } - }); - - messageStream.destroy(); - stream.emit('status', {}); - }); - it('should wait for end to fire before creating a new stream', done => { const [stream] = client.streams; const expectedCount = stream.listenerCount('end') + 1;