From 3013edb9fcc28269629db7780df45241b3dea9ca Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Mon, 25 Sep 2017 10:08:36 -0400 Subject: [PATCH 1/7] pubsub: refactor subscription connection --- packages/pubsub/src/connection-pool.js | 47 ++++++++++++++++---------- packages/pubsub/src/subscription.js | 15 ++++---- 2 files changed, 38 insertions(+), 24 deletions(-) diff --git a/packages/pubsub/src/connection-pool.js b/packages/pubsub/src/connection-pool.js index 33fc4cc3e9c..470023dfaf2 100644 --- a/packages/pubsub/src/connection-pool.js +++ b/packages/pubsub/src/connection-pool.js @@ -175,8 +175,20 @@ ConnectionPool.prototype.createConnection = function() { var id = uuid.v4(); var connection = client.streamingPull(self.metadata_); + var elapsedTimeWithoutConnection = 0; + var deadline; var errorImmediateHandle; + if (self.noConnectionsTime) { + elapsedTimeWithoutConnection = Date.now() - self.noConnectionsTime; + } + + deadline = 300000 - elapsedTimeWithoutConnection; + + if (self.isPaused) { + connection.pause(); + } + connection.on('error', function(err) { // since this is a bidi stream it's possible that we recieve errors from // reads or writes. We also want to try and cut down on the number of @@ -187,17 +199,6 @@ ConnectionPool.prototype.createConnection = function() { errorImmediateHandle = setImmediate(self.emit.bind(self), 'error', err); }); - connection.on('metadata', function(metadata) { - if (!metadata.get('date').length) { - return; - } - - connection.isConnected = true; - self.noConnectionsTime = 0; - self.failedConnectionAttempts = 0; - self.emit('connected', connection); - }); - connection.on('status', function(status) { clearImmediate(errorImmediateHandle); @@ -227,14 +228,23 @@ ConnectionPool.prototype.createConnection = function() { }); }); - if (self.isPaused) { - connection.pause(); - } + client.waitForReady(deadline, function(err) { + if (err) { + connection.cancel(); + return; + } - connection.write({ - subscription: common.util.replaceProjectIdToken( - self.subscription.name, self.projectId), - streamAckDeadlineSeconds: self.settings.ackDeadline / 1000 + connection.isConnected = true; + self.noConnectionsTime = 0; + self.failedConnectionAttempts = 0; + + connection.write({ + subscription: common.util.replaceProjectIdToken( + self.subscription.name, self.projectId), + streamAckDeadlineSeconds: self.settings.ackDeadline / 1000 + }); + + self.emit('connected', connection); }); self.connections.set(id, connection); @@ -312,6 +322,7 @@ ConnectionPool.prototype.getClient = function(callback) { } self.client = new Subscriber(address, credentials, { + 'grpc.keepalive_time_ms': 300000, 'grpc.max_receive_message_length': 20000001, 'grpc.primary_user_agent': common.util.getUserAgentFromPackageJson(PKG) }); diff --git a/packages/pubsub/src/subscription.js b/packages/pubsub/src/subscription.js index 42683f15c35..5f04aa2966a 100644 --- a/packages/pubsub/src/subscription.js +++ b/packages/pubsub/src/subscription.js @@ -181,6 +181,7 @@ function Subscription(pubsub, name, options) { this.flushTimeoutHandle_ = null; this.leaseTimeoutHandle_ = null; this.userClosed_ = false; + this.isOpen = false; events.EventEmitter.call(this); this.messageListeners = 0; @@ -331,9 +332,6 @@ Subscription.prototype.close = function(callback) { clearTimeout(this.leaseTimeoutHandle_); this.leaseTimeoutHandle_ = null; - clearTimeout(this.flushTimeoutHandle_); - this.flushTimeoutHandle_ = null; - this.flushQueues_(); this.closeConnection_(callback); }; @@ -347,6 +345,8 @@ Subscription.prototype.close = function(callback) { * @param {?error} err - An error returned from this request. */ Subscription.prototype.closeConnection_ = function(callback) { + this.isOpen = false; + if (this.connectionPool) { this.connectionPool.close(callback || common.util.noop); this.connectionPool = null; @@ -520,6 +520,9 @@ Subscription.prototype.exists = function(callback) { Subscription.prototype.flushQueues_ = function() { var self = this; + clearTimeout(this.flushTimeoutHandle_); + this.flushTimeoutHandle_ = null; + var acks = this.inventory_.ack; var nacks = this.inventory_.nack; @@ -846,6 +849,8 @@ Subscription.prototype.openConnection_ = function() { var self = this; var pool = this.connectionPool = new ConnectionPool(this); + this.isOpen = true; + pool.on('error', function(err) { self.emit('error', err); }); @@ -859,8 +864,6 @@ Subscription.prototype.openConnection_ = function() { }); pool.once('connected', function() { - clearTimeout(self.flushTimeoutHandle_); - self.flushTimeoutHandle_ = null; self.flushQueues_(); }); }; @@ -991,7 +994,7 @@ Subscription.prototype.setFlushTimeout_ = function() { * @private */ Subscription.prototype.setLeaseTimeout_ = function() { - if (this.leaseTimeoutHandle_) { + if (this.leaseTimeoutHandle_ || !this.isOpen) { return; } From ce1a803c536e6679eeb31251e8ecd537c84458c0 Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Mon, 25 Sep 2017 13:08:55 -0400 Subject: [PATCH 2/7] refactored client.waitForReady usage --- packages/pubsub/src/connection-pool.js | 126 +++++++++++++++++++------ 1 file changed, 97 insertions(+), 29 deletions(-) diff --git a/packages/pubsub/src/connection-pool.js b/packages/pubsub/src/connection-pool.js index 470023dfaf2..0fd6630ae6f 100644 --- a/packages/pubsub/src/connection-pool.js +++ b/packages/pubsub/src/connection-pool.js @@ -32,6 +32,9 @@ var uuid = require('uuid'); var PKG = require('../package.json'); var v1 = require('./v1'); +var CHANNEL_READY = 'channel.ready'; +var CHANNEL_ERROR = 'channel.error'; + // codes to retry streams var RETRY_CODES = [ 0, // ok @@ -65,6 +68,7 @@ function ConnectionPool(subscription) { this.failedConnectionAttempts = 0; this.noConnectionsTime = 0; + this.waitHandle = null; this.settings = { maxConnections: subscription.maxConnections || 5, @@ -142,16 +146,25 @@ ConnectionPool.prototype.acquire = function(id, callback) { * @param {?error} callback.error - An error returned while closing the pool. */ ConnectionPool.prototype.close = function(callback) { - var self = this; var connections = Array.from(this.connections.values()); callback = callback || common.util.noop; - this.isOpen = false; - self.connections.clear(); - this.queue.forEach(clearTimeout); + if (this.client) { + this.client.close(); + } + this.connections.clear(); + this.queue.forEach(clearTimeout); this.queue.length = 0; + + this.isOpen = false; + this.waiting = false; + + this.removeAllListeners('newListener') + .removeAllListeners(CHANNEL_READY) + .removeAllListeners(CHANNEL_ERROR); + this.failedConnectionAttempts = 0; this.noConnectionsTime = 0; @@ -175,20 +188,25 @@ ConnectionPool.prototype.createConnection = function() { var id = uuid.v4(); var connection = client.streamingPull(self.metadata_); - var elapsedTimeWithoutConnection = 0; - var deadline; var errorImmediateHandle; - if (self.noConnectionsTime) { - elapsedTimeWithoutConnection = Date.now() - self.noConnectionsTime; - } - - deadline = 300000 - elapsedTimeWithoutConnection; - if (self.isPaused) { connection.pause(); } + self.once(CHANNEL_ERROR, function(err) { + connection.cancel(); + }); + + self.once(CHANNEL_READY, function() { + connection.isConnected = true; + + self.noConnectionsTime = 0; + self.failedConnectionAttempts = 0; + + self.emit('connected', connection); + }); + connection.on('error', function(err) { // since this is a bidi stream it's possible that we recieve errors from // reads or writes. We also want to try and cut down on the number of @@ -228,23 +246,10 @@ ConnectionPool.prototype.createConnection = function() { }); }); - client.waitForReady(deadline, function(err) { - if (err) { - connection.cancel(); - return; - } - - connection.isConnected = true; - self.noConnectionsTime = 0; - self.failedConnectionAttempts = 0; - - connection.write({ - subscription: common.util.replaceProjectIdToken( - self.subscription.name, self.projectId), - streamAckDeadlineSeconds: self.settings.ackDeadline / 1000 - }); - - self.emit('connected', connection); + connection.write({ + subscription: common.util.replaceProjectIdToken( + self.subscription.name, self.projectId), + streamAckDeadlineSeconds: self.settings.ackDeadline / 1000 }); self.connections.set(id, connection); @@ -396,6 +401,8 @@ ConnectionPool.prototype.open = function() { this.isOpen = true; this.failedConnectionAttempts = 0; this.noConnectionsTime = Date.now(); + + this.watchForReady(); }; /** @@ -470,4 +477,65 @@ ConnectionPool.prototype.shouldReconnect = function(status) { return true; }; +/** + * + */ +ConnectionPool.prototype.watchForReady = function() { + var self = this; + + this.on('newListener', function(eventName) { + if (eventName == CHANNEL_READY && !self.waiting) { + clearImmediate(self.waitHandle); + self.waitHandle = setImmediate(self.waitForReady.bind(self)); + } + }); +}; + +/** + * + */ +ConnectionPool.prototype.waitForReady = function() { + var self = this; + var READY = 2; + + this.getClient(function(err, client) { + if (err) { + self.emit('error', err); + return; + } + + var channel = client.getChannel(); + var state = channel.getConnectivityState(false); + + if (state === READY) { + self.removeAllListeners(CHANNEL_ERROR); + self.emit(CHANNEL_READY); + return; + } + + var elapsedTimeWithoutConnection = 0; + var deadline, timeout; + + if (self.noConnectionsTime) { + elapsedTimeWithoutConnection = Date.now() - self.noConnectionsTime; + } + + deadline = Date.now() + (300000 - elapsedTimeWithoutConnection); + self.waiting = true; + + client.waitForReady(deadline, function(err) { + self.waiting = false; + + if (err) { + self.removeAllListeners(CHANNEL_READY); + self.emit(CHANNEL_ERROR, err); + return; + } + + self.removeAllListeners(CHANNEL_ERROR); + self.emit(CHANNEL_READY); + }); + }); +}; + module.exports = ConnectionPool; From 856f57d3420cfbfa50472fbf9b2f0df292529a76 Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Wed, 27 Sep 2017 16:29:14 -0400 Subject: [PATCH 3/7] lock down new grpc version --- packages/pubsub/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/pubsub/package.json b/packages/pubsub/package.json index 56a1fa9d135..3733c3b5321 100644 --- a/packages/pubsub/package.json +++ b/packages/pubsub/package.json @@ -59,6 +59,7 @@ "google-auto-auth": "^0.7.1", "google-gax": "^0.13.0", "google-proto-files": "^0.12.0", + "grpc": "^1.6.0", "is": "^3.0.1", "uuid": "^3.0.1" }, From 8c1074b6007372778e29a8aa73aa44f6a3de4821 Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Wed, 27 Sep 2017 16:29:35 -0400 Subject: [PATCH 4/7] refactor per PR feedback --- packages/pubsub/src/connection-pool.js | 204 +++++++++++++------------ packages/pubsub/src/subscription.js | 7 +- 2 files changed, 113 insertions(+), 98 deletions(-) diff --git a/packages/pubsub/src/connection-pool.js b/packages/pubsub/src/connection-pool.js index 0fd6630ae6f..c1f9e7bb3e2 100644 --- a/packages/pubsub/src/connection-pool.js +++ b/packages/pubsub/src/connection-pool.js @@ -32,8 +32,8 @@ var uuid = require('uuid'); var PKG = require('../package.json'); var v1 = require('./v1'); -var CHANNEL_READY = 'channel.ready'; -var CHANNEL_ERROR = 'channel.error'; +var CHANNEL_READY_EVENT = 'channel.ready'; +var CHANNEL_ERROR_EVENT = 'channel.error'; // codes to retry streams var RETRY_CODES = [ @@ -65,10 +65,10 @@ function ConnectionPool(subscription) { this.isPaused = false; this.isOpen = false; + this.isGettingChannelState = false; this.failedConnectionAttempts = 0; this.noConnectionsTime = 0; - this.waitHandle = null; this.settings = { maxConnections: subscription.maxConnections || 5, @@ -76,7 +76,6 @@ function ConnectionPool(subscription) { }; this.queue = []; - events.EventEmitter.call(this); // grpc related fields we need since we're bypassing gax this.metadata_ = new grpc.Metadata(); @@ -87,6 +86,7 @@ function ConnectionPool(subscription) { 'grpc/' + require('grpc/package.json').version ].join(' ')); + events.EventEmitter.call(this); this.open(); } @@ -159,11 +159,11 @@ ConnectionPool.prototype.close = function(callback) { this.queue.length = 0; this.isOpen = false; - this.waiting = false; + this.isGettingChannelState = false; this.removeAllListeners('newListener') - .removeAllListeners(CHANNEL_READY) - .removeAllListeners(CHANNEL_ERROR); + .removeAllListeners(CHANNEL_READY_EVENT) + .removeAllListeners(CHANNEL_ERROR_EVENT); this.failedConnectionAttempts = 0; this.noConnectionsTime = 0; @@ -194,30 +194,56 @@ ConnectionPool.prototype.createConnection = function() { connection.pause(); } - self.once(CHANNEL_ERROR, function(err) { + self + .once(CHANNEL_ERROR_EVENT, onChannelError) + .once(CHANNEL_READY_EVENT, onChannelReady); + + connection + .on('error', onConnectionError) + .on('data', onConnectionData) + .on('status', onConnectionStatus) + .write({ + subscription: common.util.replaceProjectIdToken( + self.subscription.name, self.projectId), + streamAckDeadlineSeconds: self.settings.ackDeadline / 1000 + }); + + self.connections.set(id, connection); + + function onChannelError() { + self.removeListener(CHANNEL_READY_EVENT, onChannelReady); + connection.cancel(); - }); + } + + function onChannelReady() { + self.removeListener(CHANNEL_ERROR_EVENT, onChannelError); - self.once(CHANNEL_READY, function() { connection.isConnected = true; self.noConnectionsTime = 0; self.failedConnectionAttempts = 0; self.emit('connected', connection); - }); + } - connection.on('error', function(err) { - // since this is a bidi stream it's possible that we recieve errors from - // reads or writes. We also want to try and cut down on the number of - // errors that we emit if other connections are still open. So by using - // setImmediate we're able to cancel the error message if it gets passed - // to the `status` event where we can check if the connection should be - // re-opened or if we should send the error to the user + // since this is a bidi stream it's possible that we recieve errors from + // reads or writes. We also want to try and cut down on the number of + // errors that we emit if other connections are still open. So by using + // setImmediate we're able to cancel the error message if it gets passed + // to the `status` event where we can check if the connection should be + // re-opened or if we should send the error to the user + function onConnectionError(err) { errorImmediateHandle = setImmediate(self.emit.bind(self), 'error', err); - }); + } - connection.on('status', function(status) { + function onConnectionData(data) { + arrify(data.receivedMessages).forEach(function(message) { + self.emit('message', self.createMessage(id, message)); + }); + } + + function onConnectionStatus(status) { clearImmediate(errorImmediateHandle); connection.end(); @@ -238,21 +264,7 @@ ConnectionPool.prototype.createConnection = function() { error.code = status.code; self.emit('error', error); } - }); - - connection.on('data', function(data) { - arrify(data.receivedMessages).forEach(function(message) { - self.emit('message', self.createMessage(id, message)); - }); - }); - - connection.write({ - subscription: common.util.replaceProjectIdToken( - self.subscription.name, self.projectId), - streamAckDeadlineSeconds: self.settings.ackDeadline / 1000 - }); - - self.connections.set(id, connection); + } }); }; @@ -292,6 +304,59 @@ ConnectionPool.prototype.createMessage = function(connectionId, resp) { }; }; +/** + * Gets the channels connectivity state and emits channel events accordingly. + * + * @fires CHANNEL_ERROR_EVENT + * @fires CHANNEL_READY_EVENT + */ +ConnectionPool.prototype.getAndEmitChannelState = function() { + var self = this; + + this.isGettingChannelState = true; + + this.getClient(function(err, client) { + if (err) { + self.isGettingChannelState = false; + self.emit(CHANNEL_ERROR_EVENT); + self.emit('error', err); + return; + } + + var READY_STATE = 2; + + var channel = client.getChannel(); + var connectivityState = channel.getConnectivityState(false); + + if (connectivityState === READY_STATE) { + self.isGettingChannelState = false; + self.emit(CHANNEL_READY_EVENT); + return; + } + + var elapsedTimeWithoutConnection = 0; + var now = Date.now(); + var deadline; + + if (self.noConnectionsTime) { + elapsedTimeWithoutConnection = now - self.noConnectionsTime; + } + + deadline = now + (300000 - elapsedTimeWithoutConnection); + + client.waitForReady(deadline, function(err) { + self.isGettingChannelState = false; + + if (err) { + self.emit(CHANNEL_ERROR_EVENT, err); + return; + } + + self.emit(CHANNEL_READY_EVENT); + }); + }); +}; + /** * Gets the Subscriber client. We need to bypass GAX until they allow deadlines * to be optional. @@ -391,6 +456,8 @@ ConnectionPool.prototype.isConnected = function() { * Creates specified number of connections and puts pool in open state. */ ConnectionPool.prototype.open = function() { + var self = this; + var existing = this.connections.size; var max = this.settings.maxConnections; @@ -402,7 +469,11 @@ ConnectionPool.prototype.open = function() { this.failedConnectionAttempts = 0; this.noConnectionsTime = Date.now(); - this.watchForReady(); + this.on('newListener', function(eventName) { + if (eventName === CHANNEL_READY_EVENT && !self.isGettingChannelState) { + self.getAndEmitChannelState(); + } + }); }; /** @@ -477,65 +548,4 @@ ConnectionPool.prototype.shouldReconnect = function(status) { return true; }; -/** - * - */ -ConnectionPool.prototype.watchForReady = function() { - var self = this; - - this.on('newListener', function(eventName) { - if (eventName == CHANNEL_READY && !self.waiting) { - clearImmediate(self.waitHandle); - self.waitHandle = setImmediate(self.waitForReady.bind(self)); - } - }); -}; - -/** - * - */ -ConnectionPool.prototype.waitForReady = function() { - var self = this; - var READY = 2; - - this.getClient(function(err, client) { - if (err) { - self.emit('error', err); - return; - } - - var channel = client.getChannel(); - var state = channel.getConnectivityState(false); - - if (state === READY) { - self.removeAllListeners(CHANNEL_ERROR); - self.emit(CHANNEL_READY); - return; - } - - var elapsedTimeWithoutConnection = 0; - var deadline, timeout; - - if (self.noConnectionsTime) { - elapsedTimeWithoutConnection = Date.now() - self.noConnectionsTime; - } - - deadline = Date.now() + (300000 - elapsedTimeWithoutConnection); - self.waiting = true; - - client.waitForReady(deadline, function(err) { - self.waiting = false; - - if (err) { - self.removeAllListeners(CHANNEL_READY); - self.emit(CHANNEL_ERROR, err); - return; - } - - self.removeAllListeners(CHANNEL_ERROR); - self.emit(CHANNEL_READY); - }); - }); -}; - module.exports = ConnectionPool; diff --git a/packages/pubsub/src/subscription.js b/packages/pubsub/src/subscription.js index 5f04aa2966a..1ecccbb1b8b 100644 --- a/packages/pubsub/src/subscription.js +++ b/packages/pubsub/src/subscription.js @@ -329,6 +329,9 @@ Subscription.prototype.breakLease_ = function(message) { Subscription.prototype.close = function(callback) { this.userClosed_ = true; + var inventory = this.inventory_; + inventory.lease.length = inventory.bytes = 0; + clearTimeout(this.leaseTimeoutHandle_); this.leaseTimeoutHandle_ = null; @@ -877,14 +880,16 @@ Subscription.prototype.openConnection_ = function() { Subscription.prototype.renewLeases_ = function() { var self = this; + clearTimeout(this.leaseTimeoutHandle_); this.leaseTimeoutHandle_ = null; if (!this.inventory_.lease.length) { return; } - var ackIds = this.inventory_.lease; this.ackDeadline = this.histogram.percentile(99); + + var ackIds = this.inventory_.lease.slice(); var ackDeadlineSeconds = this.ackDeadline / 1000; if (this.connectionPool) { From 728575b113a3715e4f83554c4859c4ce7aed898f Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Wed, 27 Sep 2017 16:29:47 -0400 Subject: [PATCH 5/7] unit tests --- packages/pubsub/test/connection-pool.js | 313 +++++++++++++++++++++--- packages/pubsub/test/subscription.js | 91 +++++-- 2 files changed, 359 insertions(+), 45 deletions(-) diff --git a/packages/pubsub/test/connection-pool.js b/packages/pubsub/test/connection-pool.js index be58be32756..ff1ecdc6d27 100644 --- a/packages/pubsub/test/connection-pool.js +++ b/packages/pubsub/test/connection-pool.js @@ -41,6 +41,7 @@ function FakeConnection() { this.isConnected = false; this.isPaused = false; this.ended = false; + this.canceled = false; events.EventEmitter.call(this); } @@ -61,6 +62,10 @@ FakeConnection.prototype.resume = function() { this.isPaused = false; }; +FakeConnection.prototype.cancel = function() { + this.canceled = true; +}; + describe('ConnectionPool', function() { var ConnectionPool; var pool; @@ -122,6 +127,7 @@ describe('ConnectionPool', function() { assert(pool.connections instanceof Map); assert.strictEqual(pool.isPaused, false); assert.strictEqual(pool.isOpen, false); + assert.strictEqual(pool.isGettingChannelState, false); assert.strictEqual(pool.failedConnectionAttempts, 0); assert.strictEqual(pool.noConnectionsTime, 0); assert.strictEqual(pool.settings.maxConnections, 5); @@ -238,11 +244,23 @@ describe('ConnectionPool', function() { }); describe('close',function() { + it('should close the client', function(done) { + pool.client = { close: done }; + pool.close(); + }); + it('should set isOpen to false', function() { pool.close(); assert.strictEqual(pool.isOpen, false); }); + it('should set isGettingChannelState to false', function() { + pool.isGettingChannelState = true; + pool.close(); + + assert.strictEqual(pool.isGettingChannelState, false); + }); + it('should call end on all active connections', function() { var a = new FakeConnection(); var b = new FakeConnection(); @@ -290,6 +308,23 @@ describe('ConnectionPool', function() { assert.strictEqual(pool.noConnectionsTime, 0); }); + it('should remove event listeners', function() { + pool + .on('channel.ready', nope) + .on('channel.error', nope) + .on('newListener', nope); + + pool.close(); + + assert.strictEqual(pool.listenerCount('channel.ready'), 0); + assert.strictEqual(pool.listenerCount('channel.error'), 0); + assert.strictEqual(pool.listenerCount('newListener'), 0); + + function nope() { + throw new Error('Should not be called!'); + } + }); + it('should exec a callback when finished closing', function(done) { pool.close(done); }); @@ -307,13 +342,23 @@ describe('ConnectionPool', function() { describe('createConnection', function() { var fakeClient; var fakeConnection; + var fakeChannel; beforeEach(function() { fakeConnection = new FakeConnection(); + fakeChannel = { + getConnectivityState: function() { + return 2; + } + }; + fakeClient = { streamingPull: function() { return fakeConnection; + }, + getChannel: function() { + return fakeChannel; } }; @@ -347,6 +392,66 @@ describe('ConnectionPool', function() { pool.createConnection(); }); + describe('channel', function() { + var channelReadyEvent = 'channel.ready'; + var channelErrorEvent = 'channel.error'; + + describe('error', function() { + it('should remove the channel ready event listener', function() { + pool.createConnection(); + assert.strictEqual(pool.listenerCount(channelReadyEvent), 1); + + pool.emit(channelErrorEvent); + assert.strictEqual(pool.listenerCount(channelReadyEvent), 0); + }); + + it('should cancel the connection', function() { + pool.createConnection(); + pool.emit(channelErrorEvent); + + assert.strictEqual(fakeConnection.canceled, true); + }); + }); + + describe('success', function() { + it('should remove the channel error event', function() { + pool.createConnection(); + assert.strictEqual(pool.listenerCount(channelErrorEvent), 1); + + pool.emit(channelReadyEvent); + assert.strictEqual(pool.listenerCount(channelErrorEvent), 0); + }); + + it('should set the isConnected flag to true', function() { + pool.createConnection(); + pool.emit(channelReadyEvent); + + assert.strictEqual(fakeConnection.isConnected, true); + }); + + it('should reset internally used properties', function() { + pool.noConnectionsTime = Date.now(); + pool.failedConnectionAttempts = 10; + + pool.createConnection(); + pool.emit(channelReadyEvent); + + assert.strictEqual(pool.noConnectionsTime, 0); + assert.strictEqual(pool.failedConnectionAttempts, 0); + }); + + it('should emit a connected event', function(done) { + pool.on('connected', function(connection) { + assert.strictEqual(connection, fakeConnection); + done(); + }); + + pool.createConnection(); + pool.emit(channelReadyEvent); + }); + }); + }); + describe('connection', function() { var TOKENIZED_SUB_NAME = 'project/p/subscriptions/' + SUB_NAME; var fakeId; @@ -404,35 +509,6 @@ describe('ConnectionPool', function() { }); }); - describe('metadata events', function() { - it('should do nothing if the metadata is empty', function(done) { - var metadata = new grpc.Metadata(); - - pool.on('connected', done); // should not fire - pool.createConnection(); - - fakeConnection.emit('metadata', metadata); - done(); - }); - - it('should reset counters and fire connected', function(done) { - var metadata = new grpc.Metadata(); - - metadata.set('date', 'abc'); - - pool.on('connected', function(connection) { - assert.strictEqual(connection, fakeConnection); - assert(fakeConnection.isConnected); - assert.strictEqual(pool.noConnectionsTime, 0); - assert.strictEqual(pool.failedConnectionAttempts, 0); - done(); - }); - - pool.createConnection(); - fakeConnection.emit('metadata', metadata); - }); - }); - describe('status events', function() { beforeEach(function() { pool.connections.set('a', new FakeConnection()); @@ -674,6 +750,148 @@ describe('ConnectionPool', function() { }); }); + describe('getAndEmitChannelState', function() { + var channelErrorEvent = 'channel.error'; + var channelReadyEvent = 'channel.ready'; + var channelReadyState = 2; + var fakeChannelState; + var dateNow; + var fakeTimestamp; + + var fakeClient = { + getChannel: function() { + return { + getConnectivityState: function(shouldConnect) { + assert.strictEqual(shouldConnect, false); + return fakeChannelState; + } + }; + } + }; + + before(function() { + dateNow = global.Date.now; + }); + + beforeEach(function() { + fakeChannelState = 0; + fakeClient.waitForReady = fakeUtil.noop; + + pool.getClient = function(callback) { + callback(null, fakeClient); + }; + + fakeTimestamp = dateNow.call(global.Date); + pool.noConnectionsTime = 0; + + global.Date.now = function() { + return fakeTimestamp; + }; + }); + + after(function() { + global.Date.now = dateNow; + }); + + it('should set the isGettingChannelState flag to true', function() { + pool.getAndEmitChannelState(); + assert.strictEqual(pool.isGettingChannelState, true); + }); + + it('should emit any client errors', function(done) { + var channelErrorEmitted = false; + + pool.on(channelErrorEvent, function() { + channelErrorEmitted = true; + }); + + var fakeError = new Error('nope'); + var errorEmitted = false; + + pool.on('error', function(err) { + assert.strictEqual(err, fakeError); + errorEmitted = true; + }); + + pool.getClient = function(callback) { + callback(fakeError); + + assert.strictEqual(pool.isGettingChannelState, false); + assert.strictEqual(channelErrorEmitted, true); + assert.strictEqual(errorEmitted, true); + + done(); + }; + + pool.getAndEmitChannelState(); + }); + + it('should emit the ready event if the channel is ready', function(done) { + fakeChannelState = channelReadyState; + + pool.on(channelReadyEvent, function() { + assert.strictEqual(pool.isGettingChannelState, false); + done(); + }); + + pool.getAndEmitChannelState(); + }); + + it('should wait for the channel to be ready', function(done) { + var expectedDeadline = fakeTimestamp + 300000; + + fakeClient.waitForReady = function(deadline) { + assert.strictEqual(deadline, expectedDeadline); + done(); + }; + + pool.getAndEmitChannelState(); + }); + + it('should factor in the noConnectionsTime property', function(done) { + pool.noConnectionsTime = 10; + + var fakeElapsedTime = fakeTimestamp - pool.noConnectionsTime; + var expectedDeadline = fakeTimestamp + (300000 - fakeElapsedTime); + + fakeClient.waitForReady = function(deadline) { + assert.strictEqual(deadline, expectedDeadline); + done(); + }; + + pool.getAndEmitChannelState(); + }); + + it('should emit any waitForReady errors', function(done) { + var fakeError = new Error('err'); + + pool.on(channelErrorEvent, function(err) { + assert.strictEqual(err, fakeError); + assert.strictEqual(pool.isGettingChannelState, false); + done(); + }); + + fakeClient.waitForReady = function(deadline, callback) { + callback(fakeError); + }; + + pool.getAndEmitChannelState(); + }); + + it('should emit the ready event when ready', function(done) { + pool.on(channelReadyEvent, function() { + assert.strictEqual(pool.isGettingChannelState, false); + done(); + }); + + fakeClient.waitForReady = function(deadline, callback) { + callback(null); + }; + + pool.getAndEmitChannelState(); + }); + }); + describe('getClient', function() { var fakeCreds = {}; @@ -681,8 +899,13 @@ describe('ConnectionPool', function() { this.address = address; this.creds = creds; this.options = options; + this.closed = false; } + FakeSubscriber.prototype.close = function() { + this.closed = true; + }; + beforeEach(function() { pool.getCredentials = function(callback) { callback(null, fakeCreds); @@ -696,7 +919,7 @@ describe('ConnectionPool', function() { }); it('should return the cached client when available', function(done) { - var fakeClient = pool.client = {}; + var fakeClient = pool.client = new FakeSubscriber(); pool.getClient(function(err, client) { assert.ifError(err); @@ -758,6 +981,7 @@ describe('ConnectionPool', function() { assert.strictEqual(client.creds, fakeCreds); assert.deepEqual(client.options, { + 'grpc.keepalive_time_ms': 300000, 'grpc.max_receive_message_length': 20000001, 'grpc.primary_user_agent': fakeUserAgent }); @@ -949,6 +1173,35 @@ describe('ConnectionPool', function() { assert.strictEqual(pool.failedConnectionAttempts, 0); assert.strictEqual(pool.noConnectionsTime, Date.now()); }); + + it('should listen for newListener events', function() { + pool.removeAllListeners('newListener'); + pool.open(); + + assert.strictEqual(pool.listenerCount('newListener'), 1); + }); + + describe('newListener callback', function() { + beforeEach(function() { + pool.getAndEmitChannelState = function() { + throw new Error('Should not be called!'); + }; + }); + + it('should call getAndEmitChannelState', function(done) { + pool.getAndEmitChannelState = done; + pool.emit('newListener', 'channel.ready'); + }); + + it('should do nothing for unknown events', function() { + pool.emit('newListener', 'channel.error'); + }); + + it('should do nothing when already getting state', function() { + pool.isGettingChannelState = true; + pool.emit('newListener', 'channel.ready'); + }); + }); }); describe('pause', function() { diff --git a/packages/pubsub/test/subscription.js b/packages/pubsub/test/subscription.js index 63fa029b502..f27ac6e9be8 100644 --- a/packages/pubsub/test/subscription.js +++ b/packages/pubsub/test/subscription.js @@ -160,6 +160,7 @@ describe('Subscription', function() { assert.strictEqual(subscription.maxConnections, 5); assert.strictEqual(subscription.userClosed_, false); assert.strictEqual(subscription.messageListeners, 0); + assert.strictEqual(subscription.isOpen, false); assert.deepEqual(subscription.flowControl, { maxBytes: FAKE_FREE_MEM * 0.2, @@ -428,25 +429,36 @@ describe('Subscription', function() { }); describe('close', function() { + beforeEach(function() { + subscription.flushQueues_ = fakeUtil.noop; + subscription.closeConnection_ = fakeUtil.noop; + }); + it('should set the userClosed_ flag', function() { subscription.close(); assert.strictEqual(subscription.userClosed_, true); }); - it('should stop auto-leasing', function(done) { - subscription.leaseTimeoutHandle_ = setTimeout(done, 1); + it('should dump the inventory', function() { + subscription.inventory_ = { + lease: [0, 1, 2], + bytes: 123 + }; + subscription.close(); - assert.strictEqual(subscription.leaseTimeoutHandle_, null); - setImmediate(done); + assert.deepEqual(subscription.inventory_, { + lease: [], + bytes: 0 + }); }); - it('should stop any queued flushes', function(done) { - subscription.flushTimeoutHandle_ = setTimeout(done, 1); + it('should stop auto-leasing', function(done) { + subscription.leaseTimeoutHandle_ = setTimeout(done, 1); subscription.close(); - assert.strictEqual(subscription.flushTimeoutHandle_, null); + assert.strictEqual(subscription.leaseTimeoutHandle_, null); setImmediate(done); }); @@ -469,6 +481,11 @@ describe('Subscription', function() { fakeUtil.noop = function() {}; }); + it('should set isOpen to false', function() { + subscription.closeConnection_(); + assert.strictEqual(subscription.isOpen, false); + }); + describe('with connection pool', function() { beforeEach(function() { subscription.connectionPool = { @@ -1371,6 +1388,11 @@ describe('Subscription', function() { subscription.connectionPool.emit('error', error); }); + it('should set isOpen to true', function() { + subscription.openConnection_(); + assert.strictEqual(subscription.isOpen, true); + }); + it('should lease & emit messages from pool', function(done) { var message = {}; var leasedMessage = {}; @@ -1431,12 +1453,8 @@ describe('Subscription', function() { }); it('should flush the queue when connected', function(done) { - subscription.flushQueues_ = function() { - assert.strictEqual(subscription.flushTimeoutHandle_, null); - done(); - }; + subscription.flushQueues_ = done; - subscription.flushTimeoutHandle_ = setTimeout(done, 1); subscription.openConnection_(); subscription.connectionPool.emit('connected'); }); @@ -1444,9 +1462,10 @@ describe('Subscription', function() { describe('renewLeases_', function() { var fakeDeadline = 9999; + var fakeAckIds = ['abc', 'def']; beforeEach(function() { - subscription.inventory_.lease = ['abc', 'def']; + subscription.inventory_.lease = fakeAckIds; subscription.setLeaseTimeout_ = fakeUtil.noop; subscription.histogram.percentile = function() { @@ -1454,6 +1473,25 @@ describe('Subscription', function() { }; }); + it('should clean up the old timeout handle', function() { + var fakeHandle = 123; + var clearTimeoutCalled = false; + var _clearTimeout = global.clearTimeout; + + global.clearTimeout = function(handle) { + assert.strictEqual(handle, fakeHandle); + clearTimeoutCalled = true; + }; + + subscription.leaseTimeoutHandle_ = fakeHandle; + subscription.renewLeases_(); + + assert.strictEqual(subscription.leaseTimeoutHandle_, null); + assert.strictEqual(clearTimeoutCalled, true); + + global.clearTimeout = _clearTimeout; + }); + it('should update the ackDeadline', function() { subscription.request = subscription.setLeaseTimeout_ = fakeUtil.noop; @@ -1515,8 +1553,9 @@ describe('Subscription', function() { it('should write to the connection', function(done) { fakeConnection.write = function(reqOpts) { + assert.notStrictEqual(reqOpts.modifyDeadlineAckIds, fakeAckIds); assert.deepEqual(reqOpts, { - modifyDeadlineAckIds: ['abc', 'def'], + modifyDeadlineAckIds: fakeAckIds, modifyDeadlineSeconds: Array(2).fill(fakeDeadline / 1000) }); done(); @@ -1531,9 +1570,10 @@ describe('Subscription', function() { subscription.request = function(config, callback) { assert.strictEqual(config.client, 'subscriberClient'); assert.strictEqual(config.method, 'modifyAckDeadline'); + assert.notStrictEqual(config.reqOpts.ackIds, fakeAckIds); assert.deepEqual(config.reqOpts, { subscription: subscription.name, - ackIds: ['abc', 'def'], + ackIds: fakeAckIds, ackDeadlineSeconds: fakeDeadline / 1000 }); callback(); @@ -1674,6 +1714,10 @@ describe('Subscription', function() { globalMathRandom = global.Math.random; }); + beforeEach(function() { + subscription.isOpen = true; + }); + after(function() { global.setTimeout = globalSetTimeout; global.Math.random = globalMathRandom; @@ -1713,6 +1757,23 @@ describe('Subscription', function() { subscription.leaseTimeoutHandle_ = fakeTimeoutHandle; subscription.setLeaseTimeout_(); }); + + it('should not set a timeout if the sub is closed', function() { + subscription.renewLeases_ = function() { + throw new Error('Should not be called.'); + }; + + global.Math.random = function() { + throw new Error('Should not be called.'); + }; + + global.setTimeout = function() { + throw new Error('Should not be called.'); + }; + + subscription.isOpen = false; + subscription.setLeaseTimeout_(); + }); }); describe('setMetadata', function() { From dc1a372a4040ccc511dbdc02fb5b85d569ca2802 Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Thu, 28 Sep 2017 14:40:18 -0400 Subject: [PATCH 6/7] bump grpc version in common-grpc --- packages/common-grpc/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/common-grpc/package.json b/packages/common-grpc/package.json index a6cc3dba30b..72059e1d6de 100644 --- a/packages/common-grpc/package.json +++ b/packages/common-grpc/package.json @@ -42,7 +42,7 @@ "dot-prop": "^2.4.0", "duplexify": "^3.5.0", "extend": "^3.0.0", - "grpc": "^1.3.1", + "grpc": "^1.6.0", "is": "^3.2.0", "modelo": "^4.2.0", "retry-request": "^2.0.0", From 8f88c29f0b970124bae056fc90bc2dd52fa867fe Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Thu, 28 Sep 2017 15:04:58 -0400 Subject: [PATCH 7/7] minor adjustments per pr feedback --- packages/pubsub/src/connection-pool.js | 10 +++++++--- packages/pubsub/test/connection-pool.js | 17 +++++++++++------ packages/pubsub/test/subscription.js | 20 ++++++++++++++++++++ 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/packages/pubsub/src/connection-pool.js b/packages/pubsub/src/connection-pool.js index c1f9e7bb3e2..a7d9b64f9c7 100644 --- a/packages/pubsub/src/connection-pool.js +++ b/packages/pubsub/src/connection-pool.js @@ -35,6 +35,10 @@ var v1 = require('./v1'); var CHANNEL_READY_EVENT = 'channel.ready'; var CHANNEL_ERROR_EVENT = 'channel.error'; +// if we can't establish a connection within 5 minutes, we need to back off +// and emit an error to the user. +var MAX_TIMEOUT = 300000; + // codes to retry streams var RETRY_CODES = [ 0, // ok @@ -342,7 +346,7 @@ ConnectionPool.prototype.getAndEmitChannelState = function() { elapsedTimeWithoutConnection = now - self.noConnectionsTime; } - deadline = now + (300000 - elapsedTimeWithoutConnection); + deadline = now + (MAX_TIMEOUT - elapsedTimeWithoutConnection); client.waitForReady(deadline, function(err) { self.isGettingChannelState = false; @@ -392,7 +396,7 @@ ConnectionPool.prototype.getClient = function(callback) { } self.client = new Subscriber(address, credentials, { - 'grpc.keepalive_time_ms': 300000, + 'grpc.keepalive_time_ms': MAX_TIMEOUT, 'grpc.max_receive_message_length': 20000001, 'grpc.primary_user_agent': common.util.getUserAgentFromPackageJson(PKG) }); @@ -539,7 +543,7 @@ ConnectionPool.prototype.shouldReconnect = function(status) { } var exceededRetryLimit = this.noConnectionsTime && - Date.now() - this.noConnectionsTime > 300000; + Date.now() - this.noConnectionsTime > MAX_TIMEOUT; if (exceededRetryLimit) { return false; diff --git a/packages/pubsub/test/connection-pool.js b/packages/pubsub/test/connection-pool.js index ff1ecdc6d27..2fb5097634f 100644 --- a/packages/pubsub/test/connection-pool.js +++ b/packages/pubsub/test/connection-pool.js @@ -757,15 +757,11 @@ describe('ConnectionPool', function() { var fakeChannelState; var dateNow; var fakeTimestamp; + var fakeChannel = {}; var fakeClient = { getChannel: function() { - return { - getConnectivityState: function(shouldConnect) { - assert.strictEqual(shouldConnect, false); - return fakeChannelState; - } - }; + return fakeChannel; } }; @@ -774,6 +770,10 @@ describe('ConnectionPool', function() { }); beforeEach(function() { + fakeChannel.getConnectivityState = function() { + return fakeChannelState; + }; + fakeChannelState = 0; fakeClient.waitForReady = fakeUtil.noop; @@ -829,6 +829,11 @@ describe('ConnectionPool', function() { it('should emit the ready event if the channel is ready', function(done) { fakeChannelState = channelReadyState; + fakeChannel.getConnectivityState = function(shouldConnect) { + assert.strictEqual(shouldConnect, false); + return fakeChannelState; + }; + pool.on(channelReadyEvent, function() { assert.strictEqual(pool.isGettingChannelState, false); done(); diff --git a/packages/pubsub/test/subscription.js b/packages/pubsub/test/subscription.js index f27ac6e9be8..c07bfa644a4 100644 --- a/packages/pubsub/test/subscription.js +++ b/packages/pubsub/test/subscription.js @@ -778,6 +778,26 @@ describe('Subscription', function() { subscription.flushQueues_(); }); + it('should cancel any pending flushes', function() { + var fakeHandle = 'abc'; + var cleared = false; + + var _clearTimeout = global.clearTimeout; + + global.clearTimeout = function(handle) { + assert.strictEqual(handle, fakeHandle); + cleared = true; + }; + + subscription.flushTimeoutHandle_ = fakeHandle; + subscription.flushQueues_(); + + assert.strictEqual(subscription.flushTimeoutHandle_, null); + assert.strictEqual(cleared, true); + + global.clearTimeout = _clearTimeout; + }); + describe('with connection pool', function() { var fakeConnection;