Skip to content

Commit

Permalink
fix(messageStream): remove call to destroy grpc stream (#499)
Browse files Browse the repository at this point in the history
  • Loading branch information
callmehiphop authored and JustinBeckwith committed Feb 27, 2019
1 parent 355d8d7 commit 0ef82e0
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 57 deletions.
37 changes: 11 additions & 26 deletions src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {Gaxios} from 'gaxios';
import {ClientStub} from 'google-gax';
import {ClientDuplexStream, Metadata, ServiceError, status, StatusObject} from 'grpc';
import * as isStreamEnded from 'is-stream-ended';
import {Duplex, PassThrough} from 'stream';
import {PassThrough} from 'stream';

import {PullResponse, Subscriber} from './subscriber';

Expand Down Expand Up @@ -109,29 +109,6 @@ export class ChannelError extends Error implements ServiceError {
}
}

/**
* Ponyfill for destroying streams.
*
* @private
*
* @param {stream} stream The stream to destroy.
* @param {error?} err Error to emit.
*/
export function destroy(stream: Duplex, err?: Error): void {
const nativeDestroy = Duplex.prototype.destroy;

if (typeof nativeDestroy === 'function') {
return nativeDestroy.call(stream, err);
}

process.nextTick(() => {
if (err) {
stream.emit('error', err);
}
stream.emit('close');
});
}

/**
* @typedef {object} MessageStreamOptions
* @property {number} [highWaterMark=0] Configures the Buffer level for all
Expand Down Expand Up @@ -197,7 +174,16 @@ export class MessageStream extends PassThrough {
stream.cancel();
}

return destroy(this, err);
if (typeof super.destroy === 'function') {
return super.destroy(err);
}

process.nextTick(() => {
if (err) {
this.emit('error', err);
}
this.emit('close');
});
}
/**
* Adds a StreamingPull stream to the combined stream.
Expand Down Expand Up @@ -329,7 +315,6 @@ export class MessageStream extends PassThrough {
*/
private _onStatus(stream: PullStream, status: StatusObject): void {
if (this.destroyed) {
destroy(stream);
return;
}

Expand Down
44 changes: 13 additions & 31 deletions test/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,21 +41,18 @@ interface StreamOptions {
highWaterMark?: number;
}

class FakeDuplex extends Duplex {
destroy(err?: Error): void {
if (super.destroy) {
return super.destroy(err);
}
destroy(this, err);
}
}

class FakePassThrough extends PassThrough {
options: StreamOptions;
constructor(options: StreamOptions) {
super(options);
this.options = options;
}
destroy(err?: Error): void {
if (super.destroy) {
return super.destroy(err);
}
destroy(this, err);
}
}

class FakeGrpcStream extends Duplex {
Expand Down Expand Up @@ -136,10 +133,9 @@ describe('MessageStream', () => {
let messageStream;

before(() => {
MessageStream =
proxyquire('../src/message-stream.js', {
'stream': {Duplex: FakeDuplex, PassThrough: FakePassThrough}
}).MessageStream;
MessageStream = proxyquire('../src/message-stream.js', {
'stream': {PassThrough: FakePassThrough}
}).MessageStream;
});

beforeEach(() => {
Expand Down Expand Up @@ -257,7 +253,7 @@ describe('MessageStream', () => {

describe('destroy', () => {
it('should noop if already destroyed', done => {
const stub = sandbox.stub(FakeDuplex.prototype, 'destroy')
const stub = sandbox.stub(FakePassThrough.prototype, 'destroy')
.callsFake(function(this: Duplex) {
if (this === messageStream) {
done();
Expand Down Expand Up @@ -309,13 +305,13 @@ describe('MessageStream', () => {
let destroy;

before(() => {
destroy = FakeDuplex.prototype.destroy;
destroy = FakePassThrough.prototype.destroy;
// tslint:disable-next-line no-any
FakeDuplex.prototype.destroy = (false as any);
FakePassThrough.prototype.destroy = (false as any);
});

after(() => {
FakeDuplex.prototype.destroy = destroy;
FakePassThrough.prototype.destroy = destroy;
});

it('should emit close', done => {
Expand Down Expand Up @@ -446,20 +442,6 @@ describe('MessageStream', () => {
});

describe('on status', () => {
it('should destroy the stream if the message stream is destroyed',
done => {
const [stream] = client.streams;
const stub = sandbox.stub(FakeDuplex.prototype, 'destroy')
.callsFake(function(this: Duplex) {
if (this === stream) {
done();
}
});

messageStream.destroy();
stream.emit('status', {});
});

it('should wait for end to fire before creating a new stream', done => {
const [stream] = client.streams;
const expectedCount = stream.listenerCount('end') + 1;
Expand Down

0 comments on commit 0ef82e0

Please sign in to comment.