diff --git a/packages/common-grpc/package.json b/packages/common-grpc/package.json index a6cc3dba30b..72059e1d6de 100644 --- a/packages/common-grpc/package.json +++ b/packages/common-grpc/package.json @@ -42,7 +42,7 @@ "dot-prop": "^2.4.0", "duplexify": "^3.5.0", "extend": "^3.0.0", - "grpc": "^1.3.1", + "grpc": "^1.6.0", "is": "^3.2.0", "modelo": "^4.2.0", "retry-request": "^2.0.0", diff --git a/packages/pubsub/package.json b/packages/pubsub/package.json index 56a1fa9d135..3733c3b5321 100644 --- a/packages/pubsub/package.json +++ b/packages/pubsub/package.json @@ -59,6 +59,7 @@ "google-auto-auth": "^0.7.1", "google-gax": "^0.13.0", "google-proto-files": "^0.12.0", + "grpc": "^1.6.0", "is": "^3.0.1", "uuid": "^3.0.1" }, diff --git a/packages/pubsub/src/connection-pool.js b/packages/pubsub/src/connection-pool.js index 33fc4cc3e9c..a7d9b64f9c7 100644 --- a/packages/pubsub/src/connection-pool.js +++ b/packages/pubsub/src/connection-pool.js @@ -32,6 +32,13 @@ var uuid = require('uuid'); var PKG = require('../package.json'); var v1 = require('./v1'); +var CHANNEL_READY_EVENT = 'channel.ready'; +var CHANNEL_ERROR_EVENT = 'channel.error'; + +// if we can't establish a connection within 5 minutes, we need to back off +// and emit an error to the user. +var MAX_TIMEOUT = 300000; + // codes to retry streams var RETRY_CODES = [ 0, // ok @@ -62,6 +69,7 @@ function ConnectionPool(subscription) { this.isPaused = false; this.isOpen = false; + this.isGettingChannelState = false; this.failedConnectionAttempts = 0; this.noConnectionsTime = 0; @@ -72,7 +80,6 @@ function ConnectionPool(subscription) { }; this.queue = []; - events.EventEmitter.call(this); // grpc related fields we need since we're bypassing gax this.metadata_ = new grpc.Metadata(); @@ -83,6 +90,7 @@ function ConnectionPool(subscription) { 'grpc/' + require('grpc/package.json').version ].join(' ')); + events.EventEmitter.call(this); this.open(); } @@ -142,16 +150,25 @@ ConnectionPool.prototype.acquire = function(id, callback) { * @param {?error} callback.error - An error returned while closing the pool. */ ConnectionPool.prototype.close = function(callback) { - var self = this; var connections = Array.from(this.connections.values()); callback = callback || common.util.noop; - this.isOpen = false; - self.connections.clear(); - this.queue.forEach(clearTimeout); + if (this.client) { + this.client.close(); + } + this.connections.clear(); + this.queue.forEach(clearTimeout); this.queue.length = 0; + + this.isOpen = false; + this.isGettingChannelState = false; + + this.removeAllListeners('newListener') + .removeAllListeners(CHANNEL_READY_EVENT) + .removeAllListeners(CHANNEL_ERROR_EVENT); + this.failedConnectionAttempts = 0; this.noConnectionsTime = 0; @@ -177,28 +194,60 @@ ConnectionPool.prototype.createConnection = function() { var connection = client.streamingPull(self.metadata_); var errorImmediateHandle; - connection.on('error', function(err) { - // since this is a bidi stream it's possible that we recieve errors from - // reads or writes. We also want to try and cut down on the number of - // errors that we emit if other connections are still open. So by using - // setImmediate we're able to cancel the error message if it gets passed - // to the `status` event where we can check if the connection should be - // re-opened or if we should send the error to the user - errorImmediateHandle = setImmediate(self.emit.bind(self), 'error', err); - }); + if (self.isPaused) { + connection.pause(); + } - connection.on('metadata', function(metadata) { - if (!metadata.get('date').length) { - return; - } + self + .once(CHANNEL_ERROR_EVENT, onChannelError) + .once(CHANNEL_READY_EVENT, onChannelReady); + + connection + .on('error', onConnectionError) + .on('data', onConnectionData) + .on('status', onConnectionStatus) + .write({ + subscription: common.util.replaceProjectIdToken( + self.subscription.name, self.projectId), + streamAckDeadlineSeconds: self.settings.ackDeadline / 1000 + }); + + self.connections.set(id, connection); + + function onChannelError() { + self.removeListener(CHANNEL_READY_EVENT, onChannelReady); + + connection.cancel(); + } + + function onChannelReady() { + self.removeListener(CHANNEL_ERROR_EVENT, onChannelError); connection.isConnected = true; + self.noConnectionsTime = 0; self.failedConnectionAttempts = 0; + self.emit('connected', connection); - }); + } + + // since this is a bidi stream it's possible that we recieve errors from + // reads or writes. We also want to try and cut down on the number of + // errors that we emit if other connections are still open. So by using + // setImmediate we're able to cancel the error message if it gets passed + // to the `status` event where we can check if the connection should be + // re-opened or if we should send the error to the user + function onConnectionError(err) { + errorImmediateHandle = setImmediate(self.emit.bind(self), 'error', err); + } - connection.on('status', function(status) { + function onConnectionData(data) { + arrify(data.receivedMessages).forEach(function(message) { + self.emit('message', self.createMessage(id, message)); + }); + } + + function onConnectionStatus(status) { clearImmediate(errorImmediateHandle); connection.end(); @@ -219,25 +268,7 @@ ConnectionPool.prototype.createConnection = function() { error.code = status.code; self.emit('error', error); } - }); - - connection.on('data', function(data) { - arrify(data.receivedMessages).forEach(function(message) { - self.emit('message', self.createMessage(id, message)); - }); - }); - - if (self.isPaused) { - connection.pause(); } - - connection.write({ - subscription: common.util.replaceProjectIdToken( - self.subscription.name, self.projectId), - streamAckDeadlineSeconds: self.settings.ackDeadline / 1000 - }); - - self.connections.set(id, connection); }); }; @@ -277,6 +308,59 @@ ConnectionPool.prototype.createMessage = function(connectionId, resp) { }; }; +/** + * Gets the channels connectivity state and emits channel events accordingly. + * + * @fires CHANNEL_ERROR_EVENT + * @fires CHANNEL_READY_EVENT + */ +ConnectionPool.prototype.getAndEmitChannelState = function() { + var self = this; + + this.isGettingChannelState = true; + + this.getClient(function(err, client) { + if (err) { + self.isGettingChannelState = false; + self.emit(CHANNEL_ERROR_EVENT); + self.emit('error', err); + return; + } + + var READY_STATE = 2; + + var channel = client.getChannel(); + var connectivityState = channel.getConnectivityState(false); + + if (connectivityState === READY_STATE) { + self.isGettingChannelState = false; + self.emit(CHANNEL_READY_EVENT); + return; + } + + var elapsedTimeWithoutConnection = 0; + var now = Date.now(); + var deadline; + + if (self.noConnectionsTime) { + elapsedTimeWithoutConnection = now - self.noConnectionsTime; + } + + deadline = now + (MAX_TIMEOUT - elapsedTimeWithoutConnection); + + client.waitForReady(deadline, function(err) { + self.isGettingChannelState = false; + + if (err) { + self.emit(CHANNEL_ERROR_EVENT, err); + return; + } + + self.emit(CHANNEL_READY_EVENT); + }); + }); +}; + /** * Gets the Subscriber client. We need to bypass GAX until they allow deadlines * to be optional. @@ -312,6 +396,7 @@ ConnectionPool.prototype.getClient = function(callback) { } self.client = new Subscriber(address, credentials, { + 'grpc.keepalive_time_ms': MAX_TIMEOUT, 'grpc.max_receive_message_length': 20000001, 'grpc.primary_user_agent': common.util.getUserAgentFromPackageJson(PKG) }); @@ -375,6 +460,8 @@ ConnectionPool.prototype.isConnected = function() { * Creates specified number of connections and puts pool in open state. */ ConnectionPool.prototype.open = function() { + var self = this; + var existing = this.connections.size; var max = this.settings.maxConnections; @@ -385,6 +472,12 @@ ConnectionPool.prototype.open = function() { this.isOpen = true; this.failedConnectionAttempts = 0; this.noConnectionsTime = Date.now(); + + this.on('newListener', function(eventName) { + if (eventName === CHANNEL_READY_EVENT && !self.isGettingChannelState) { + self.getAndEmitChannelState(); + } + }); }; /** @@ -450,7 +543,7 @@ ConnectionPool.prototype.shouldReconnect = function(status) { } var exceededRetryLimit = this.noConnectionsTime && - Date.now() - this.noConnectionsTime > 300000; + Date.now() - this.noConnectionsTime > MAX_TIMEOUT; if (exceededRetryLimit) { return false; diff --git a/packages/pubsub/src/subscription.js b/packages/pubsub/src/subscription.js index 42683f15c35..1ecccbb1b8b 100644 --- a/packages/pubsub/src/subscription.js +++ b/packages/pubsub/src/subscription.js @@ -181,6 +181,7 @@ function Subscription(pubsub, name, options) { this.flushTimeoutHandle_ = null; this.leaseTimeoutHandle_ = null; this.userClosed_ = false; + this.isOpen = false; events.EventEmitter.call(this); this.messageListeners = 0; @@ -328,12 +329,12 @@ Subscription.prototype.breakLease_ = function(message) { Subscription.prototype.close = function(callback) { this.userClosed_ = true; + var inventory = this.inventory_; + inventory.lease.length = inventory.bytes = 0; + clearTimeout(this.leaseTimeoutHandle_); this.leaseTimeoutHandle_ = null; - clearTimeout(this.flushTimeoutHandle_); - this.flushTimeoutHandle_ = null; - this.flushQueues_(); this.closeConnection_(callback); }; @@ -347,6 +348,8 @@ Subscription.prototype.close = function(callback) { * @param {?error} err - An error returned from this request. */ Subscription.prototype.closeConnection_ = function(callback) { + this.isOpen = false; + if (this.connectionPool) { this.connectionPool.close(callback || common.util.noop); this.connectionPool = null; @@ -520,6 +523,9 @@ Subscription.prototype.exists = function(callback) { Subscription.prototype.flushQueues_ = function() { var self = this; + clearTimeout(this.flushTimeoutHandle_); + this.flushTimeoutHandle_ = null; + var acks = this.inventory_.ack; var nacks = this.inventory_.nack; @@ -846,6 +852,8 @@ Subscription.prototype.openConnection_ = function() { var self = this; var pool = this.connectionPool = new ConnectionPool(this); + this.isOpen = true; + pool.on('error', function(err) { self.emit('error', err); }); @@ -859,8 +867,6 @@ Subscription.prototype.openConnection_ = function() { }); pool.once('connected', function() { - clearTimeout(self.flushTimeoutHandle_); - self.flushTimeoutHandle_ = null; self.flushQueues_(); }); }; @@ -874,14 +880,16 @@ Subscription.prototype.openConnection_ = function() { Subscription.prototype.renewLeases_ = function() { var self = this; + clearTimeout(this.leaseTimeoutHandle_); this.leaseTimeoutHandle_ = null; if (!this.inventory_.lease.length) { return; } - var ackIds = this.inventory_.lease; this.ackDeadline = this.histogram.percentile(99); + + var ackIds = this.inventory_.lease.slice(); var ackDeadlineSeconds = this.ackDeadline / 1000; if (this.connectionPool) { @@ -991,7 +999,7 @@ Subscription.prototype.setFlushTimeout_ = function() { * @private */ Subscription.prototype.setLeaseTimeout_ = function() { - if (this.leaseTimeoutHandle_) { + if (this.leaseTimeoutHandle_ || !this.isOpen) { return; } diff --git a/packages/pubsub/test/connection-pool.js b/packages/pubsub/test/connection-pool.js index be58be32756..2fb5097634f 100644 --- a/packages/pubsub/test/connection-pool.js +++ b/packages/pubsub/test/connection-pool.js @@ -41,6 +41,7 @@ function FakeConnection() { this.isConnected = false; this.isPaused = false; this.ended = false; + this.canceled = false; events.EventEmitter.call(this); } @@ -61,6 +62,10 @@ FakeConnection.prototype.resume = function() { this.isPaused = false; }; +FakeConnection.prototype.cancel = function() { + this.canceled = true; +}; + describe('ConnectionPool', function() { var ConnectionPool; var pool; @@ -122,6 +127,7 @@ describe('ConnectionPool', function() { assert(pool.connections instanceof Map); assert.strictEqual(pool.isPaused, false); assert.strictEqual(pool.isOpen, false); + assert.strictEqual(pool.isGettingChannelState, false); assert.strictEqual(pool.failedConnectionAttempts, 0); assert.strictEqual(pool.noConnectionsTime, 0); assert.strictEqual(pool.settings.maxConnections, 5); @@ -238,11 +244,23 @@ describe('ConnectionPool', function() { }); describe('close',function() { + it('should close the client', function(done) { + pool.client = { close: done }; + pool.close(); + }); + it('should set isOpen to false', function() { pool.close(); assert.strictEqual(pool.isOpen, false); }); + it('should set isGettingChannelState to false', function() { + pool.isGettingChannelState = true; + pool.close(); + + assert.strictEqual(pool.isGettingChannelState, false); + }); + it('should call end on all active connections', function() { var a = new FakeConnection(); var b = new FakeConnection(); @@ -290,6 +308,23 @@ describe('ConnectionPool', function() { assert.strictEqual(pool.noConnectionsTime, 0); }); + it('should remove event listeners', function() { + pool + .on('channel.ready', nope) + .on('channel.error', nope) + .on('newListener', nope); + + pool.close(); + + assert.strictEqual(pool.listenerCount('channel.ready'), 0); + assert.strictEqual(pool.listenerCount('channel.error'), 0); + assert.strictEqual(pool.listenerCount('newListener'), 0); + + function nope() { + throw new Error('Should not be called!'); + } + }); + it('should exec a callback when finished closing', function(done) { pool.close(done); }); @@ -307,13 +342,23 @@ describe('ConnectionPool', function() { describe('createConnection', function() { var fakeClient; var fakeConnection; + var fakeChannel; beforeEach(function() { fakeConnection = new FakeConnection(); + fakeChannel = { + getConnectivityState: function() { + return 2; + } + }; + fakeClient = { streamingPull: function() { return fakeConnection; + }, + getChannel: function() { + return fakeChannel; } }; @@ -347,6 +392,66 @@ describe('ConnectionPool', function() { pool.createConnection(); }); + describe('channel', function() { + var channelReadyEvent = 'channel.ready'; + var channelErrorEvent = 'channel.error'; + + describe('error', function() { + it('should remove the channel ready event listener', function() { + pool.createConnection(); + assert.strictEqual(pool.listenerCount(channelReadyEvent), 1); + + pool.emit(channelErrorEvent); + assert.strictEqual(pool.listenerCount(channelReadyEvent), 0); + }); + + it('should cancel the connection', function() { + pool.createConnection(); + pool.emit(channelErrorEvent); + + assert.strictEqual(fakeConnection.canceled, true); + }); + }); + + describe('success', function() { + it('should remove the channel error event', function() { + pool.createConnection(); + assert.strictEqual(pool.listenerCount(channelErrorEvent), 1); + + pool.emit(channelReadyEvent); + assert.strictEqual(pool.listenerCount(channelErrorEvent), 0); + }); + + it('should set the isConnected flag to true', function() { + pool.createConnection(); + pool.emit(channelReadyEvent); + + assert.strictEqual(fakeConnection.isConnected, true); + }); + + it('should reset internally used properties', function() { + pool.noConnectionsTime = Date.now(); + pool.failedConnectionAttempts = 10; + + pool.createConnection(); + pool.emit(channelReadyEvent); + + assert.strictEqual(pool.noConnectionsTime, 0); + assert.strictEqual(pool.failedConnectionAttempts, 0); + }); + + it('should emit a connected event', function(done) { + pool.on('connected', function(connection) { + assert.strictEqual(connection, fakeConnection); + done(); + }); + + pool.createConnection(); + pool.emit(channelReadyEvent); + }); + }); + }); + describe('connection', function() { var TOKENIZED_SUB_NAME = 'project/p/subscriptions/' + SUB_NAME; var fakeId; @@ -404,35 +509,6 @@ describe('ConnectionPool', function() { }); }); - describe('metadata events', function() { - it('should do nothing if the metadata is empty', function(done) { - var metadata = new grpc.Metadata(); - - pool.on('connected', done); // should not fire - pool.createConnection(); - - fakeConnection.emit('metadata', metadata); - done(); - }); - - it('should reset counters and fire connected', function(done) { - var metadata = new grpc.Metadata(); - - metadata.set('date', 'abc'); - - pool.on('connected', function(connection) { - assert.strictEqual(connection, fakeConnection); - assert(fakeConnection.isConnected); - assert.strictEqual(pool.noConnectionsTime, 0); - assert.strictEqual(pool.failedConnectionAttempts, 0); - done(); - }); - - pool.createConnection(); - fakeConnection.emit('metadata', metadata); - }); - }); - describe('status events', function() { beforeEach(function() { pool.connections.set('a', new FakeConnection()); @@ -674,6 +750,153 @@ describe('ConnectionPool', function() { }); }); + describe('getAndEmitChannelState', function() { + var channelErrorEvent = 'channel.error'; + var channelReadyEvent = 'channel.ready'; + var channelReadyState = 2; + var fakeChannelState; + var dateNow; + var fakeTimestamp; + var fakeChannel = {}; + + var fakeClient = { + getChannel: function() { + return fakeChannel; + } + }; + + before(function() { + dateNow = global.Date.now; + }); + + beforeEach(function() { + fakeChannel.getConnectivityState = function() { + return fakeChannelState; + }; + + fakeChannelState = 0; + fakeClient.waitForReady = fakeUtil.noop; + + pool.getClient = function(callback) { + callback(null, fakeClient); + }; + + fakeTimestamp = dateNow.call(global.Date); + pool.noConnectionsTime = 0; + + global.Date.now = function() { + return fakeTimestamp; + }; + }); + + after(function() { + global.Date.now = dateNow; + }); + + it('should set the isGettingChannelState flag to true', function() { + pool.getAndEmitChannelState(); + assert.strictEqual(pool.isGettingChannelState, true); + }); + + it('should emit any client errors', function(done) { + var channelErrorEmitted = false; + + pool.on(channelErrorEvent, function() { + channelErrorEmitted = true; + }); + + var fakeError = new Error('nope'); + var errorEmitted = false; + + pool.on('error', function(err) { + assert.strictEqual(err, fakeError); + errorEmitted = true; + }); + + pool.getClient = function(callback) { + callback(fakeError); + + assert.strictEqual(pool.isGettingChannelState, false); + assert.strictEqual(channelErrorEmitted, true); + assert.strictEqual(errorEmitted, true); + + done(); + }; + + pool.getAndEmitChannelState(); + }); + + it('should emit the ready event if the channel is ready', function(done) { + fakeChannelState = channelReadyState; + + fakeChannel.getConnectivityState = function(shouldConnect) { + assert.strictEqual(shouldConnect, false); + return fakeChannelState; + }; + + pool.on(channelReadyEvent, function() { + assert.strictEqual(pool.isGettingChannelState, false); + done(); + }); + + pool.getAndEmitChannelState(); + }); + + it('should wait for the channel to be ready', function(done) { + var expectedDeadline = fakeTimestamp + 300000; + + fakeClient.waitForReady = function(deadline) { + assert.strictEqual(deadline, expectedDeadline); + done(); + }; + + pool.getAndEmitChannelState(); + }); + + it('should factor in the noConnectionsTime property', function(done) { + pool.noConnectionsTime = 10; + + var fakeElapsedTime = fakeTimestamp - pool.noConnectionsTime; + var expectedDeadline = fakeTimestamp + (300000 - fakeElapsedTime); + + fakeClient.waitForReady = function(deadline) { + assert.strictEqual(deadline, expectedDeadline); + done(); + }; + + pool.getAndEmitChannelState(); + }); + + it('should emit any waitForReady errors', function(done) { + var fakeError = new Error('err'); + + pool.on(channelErrorEvent, function(err) { + assert.strictEqual(err, fakeError); + assert.strictEqual(pool.isGettingChannelState, false); + done(); + }); + + fakeClient.waitForReady = function(deadline, callback) { + callback(fakeError); + }; + + pool.getAndEmitChannelState(); + }); + + it('should emit the ready event when ready', function(done) { + pool.on(channelReadyEvent, function() { + assert.strictEqual(pool.isGettingChannelState, false); + done(); + }); + + fakeClient.waitForReady = function(deadline, callback) { + callback(null); + }; + + pool.getAndEmitChannelState(); + }); + }); + describe('getClient', function() { var fakeCreds = {}; @@ -681,8 +904,13 @@ describe('ConnectionPool', function() { this.address = address; this.creds = creds; this.options = options; + this.closed = false; } + FakeSubscriber.prototype.close = function() { + this.closed = true; + }; + beforeEach(function() { pool.getCredentials = function(callback) { callback(null, fakeCreds); @@ -696,7 +924,7 @@ describe('ConnectionPool', function() { }); it('should return the cached client when available', function(done) { - var fakeClient = pool.client = {}; + var fakeClient = pool.client = new FakeSubscriber(); pool.getClient(function(err, client) { assert.ifError(err); @@ -758,6 +986,7 @@ describe('ConnectionPool', function() { assert.strictEqual(client.creds, fakeCreds); assert.deepEqual(client.options, { + 'grpc.keepalive_time_ms': 300000, 'grpc.max_receive_message_length': 20000001, 'grpc.primary_user_agent': fakeUserAgent }); @@ -949,6 +1178,35 @@ describe('ConnectionPool', function() { assert.strictEqual(pool.failedConnectionAttempts, 0); assert.strictEqual(pool.noConnectionsTime, Date.now()); }); + + it('should listen for newListener events', function() { + pool.removeAllListeners('newListener'); + pool.open(); + + assert.strictEqual(pool.listenerCount('newListener'), 1); + }); + + describe('newListener callback', function() { + beforeEach(function() { + pool.getAndEmitChannelState = function() { + throw new Error('Should not be called!'); + }; + }); + + it('should call getAndEmitChannelState', function(done) { + pool.getAndEmitChannelState = done; + pool.emit('newListener', 'channel.ready'); + }); + + it('should do nothing for unknown events', function() { + pool.emit('newListener', 'channel.error'); + }); + + it('should do nothing when already getting state', function() { + pool.isGettingChannelState = true; + pool.emit('newListener', 'channel.ready'); + }); + }); }); describe('pause', function() { diff --git a/packages/pubsub/test/subscription.js b/packages/pubsub/test/subscription.js index 63fa029b502..c07bfa644a4 100644 --- a/packages/pubsub/test/subscription.js +++ b/packages/pubsub/test/subscription.js @@ -160,6 +160,7 @@ describe('Subscription', function() { assert.strictEqual(subscription.maxConnections, 5); assert.strictEqual(subscription.userClosed_, false); assert.strictEqual(subscription.messageListeners, 0); + assert.strictEqual(subscription.isOpen, false); assert.deepEqual(subscription.flowControl, { maxBytes: FAKE_FREE_MEM * 0.2, @@ -428,25 +429,36 @@ describe('Subscription', function() { }); describe('close', function() { + beforeEach(function() { + subscription.flushQueues_ = fakeUtil.noop; + subscription.closeConnection_ = fakeUtil.noop; + }); + it('should set the userClosed_ flag', function() { subscription.close(); assert.strictEqual(subscription.userClosed_, true); }); - it('should stop auto-leasing', function(done) { - subscription.leaseTimeoutHandle_ = setTimeout(done, 1); + it('should dump the inventory', function() { + subscription.inventory_ = { + lease: [0, 1, 2], + bytes: 123 + }; + subscription.close(); - assert.strictEqual(subscription.leaseTimeoutHandle_, null); - setImmediate(done); + assert.deepEqual(subscription.inventory_, { + lease: [], + bytes: 0 + }); }); - it('should stop any queued flushes', function(done) { - subscription.flushTimeoutHandle_ = setTimeout(done, 1); + it('should stop auto-leasing', function(done) { + subscription.leaseTimeoutHandle_ = setTimeout(done, 1); subscription.close(); - assert.strictEqual(subscription.flushTimeoutHandle_, null); + assert.strictEqual(subscription.leaseTimeoutHandle_, null); setImmediate(done); }); @@ -469,6 +481,11 @@ describe('Subscription', function() { fakeUtil.noop = function() {}; }); + it('should set isOpen to false', function() { + subscription.closeConnection_(); + assert.strictEqual(subscription.isOpen, false); + }); + describe('with connection pool', function() { beforeEach(function() { subscription.connectionPool = { @@ -761,6 +778,26 @@ describe('Subscription', function() { subscription.flushQueues_(); }); + it('should cancel any pending flushes', function() { + var fakeHandle = 'abc'; + var cleared = false; + + var _clearTimeout = global.clearTimeout; + + global.clearTimeout = function(handle) { + assert.strictEqual(handle, fakeHandle); + cleared = true; + }; + + subscription.flushTimeoutHandle_ = fakeHandle; + subscription.flushQueues_(); + + assert.strictEqual(subscription.flushTimeoutHandle_, null); + assert.strictEqual(cleared, true); + + global.clearTimeout = _clearTimeout; + }); + describe('with connection pool', function() { var fakeConnection; @@ -1371,6 +1408,11 @@ describe('Subscription', function() { subscription.connectionPool.emit('error', error); }); + it('should set isOpen to true', function() { + subscription.openConnection_(); + assert.strictEqual(subscription.isOpen, true); + }); + it('should lease & emit messages from pool', function(done) { var message = {}; var leasedMessage = {}; @@ -1431,12 +1473,8 @@ describe('Subscription', function() { }); it('should flush the queue when connected', function(done) { - subscription.flushQueues_ = function() { - assert.strictEqual(subscription.flushTimeoutHandle_, null); - done(); - }; + subscription.flushQueues_ = done; - subscription.flushTimeoutHandle_ = setTimeout(done, 1); subscription.openConnection_(); subscription.connectionPool.emit('connected'); }); @@ -1444,9 +1482,10 @@ describe('Subscription', function() { describe('renewLeases_', function() { var fakeDeadline = 9999; + var fakeAckIds = ['abc', 'def']; beforeEach(function() { - subscription.inventory_.lease = ['abc', 'def']; + subscription.inventory_.lease = fakeAckIds; subscription.setLeaseTimeout_ = fakeUtil.noop; subscription.histogram.percentile = function() { @@ -1454,6 +1493,25 @@ describe('Subscription', function() { }; }); + it('should clean up the old timeout handle', function() { + var fakeHandle = 123; + var clearTimeoutCalled = false; + var _clearTimeout = global.clearTimeout; + + global.clearTimeout = function(handle) { + assert.strictEqual(handle, fakeHandle); + clearTimeoutCalled = true; + }; + + subscription.leaseTimeoutHandle_ = fakeHandle; + subscription.renewLeases_(); + + assert.strictEqual(subscription.leaseTimeoutHandle_, null); + assert.strictEqual(clearTimeoutCalled, true); + + global.clearTimeout = _clearTimeout; + }); + it('should update the ackDeadline', function() { subscription.request = subscription.setLeaseTimeout_ = fakeUtil.noop; @@ -1515,8 +1573,9 @@ describe('Subscription', function() { it('should write to the connection', function(done) { fakeConnection.write = function(reqOpts) { + assert.notStrictEqual(reqOpts.modifyDeadlineAckIds, fakeAckIds); assert.deepEqual(reqOpts, { - modifyDeadlineAckIds: ['abc', 'def'], + modifyDeadlineAckIds: fakeAckIds, modifyDeadlineSeconds: Array(2).fill(fakeDeadline / 1000) }); done(); @@ -1531,9 +1590,10 @@ describe('Subscription', function() { subscription.request = function(config, callback) { assert.strictEqual(config.client, 'subscriberClient'); assert.strictEqual(config.method, 'modifyAckDeadline'); + assert.notStrictEqual(config.reqOpts.ackIds, fakeAckIds); assert.deepEqual(config.reqOpts, { subscription: subscription.name, - ackIds: ['abc', 'def'], + ackIds: fakeAckIds, ackDeadlineSeconds: fakeDeadline / 1000 }); callback(); @@ -1674,6 +1734,10 @@ describe('Subscription', function() { globalMathRandom = global.Math.random; }); + beforeEach(function() { + subscription.isOpen = true; + }); + after(function() { global.setTimeout = globalSetTimeout; global.Math.random = globalMathRandom; @@ -1713,6 +1777,23 @@ describe('Subscription', function() { subscription.leaseTimeoutHandle_ = fakeTimeoutHandle; subscription.setLeaseTimeout_(); }); + + it('should not set a timeout if the sub is closed', function() { + subscription.renewLeases_ = function() { + throw new Error('Should not be called.'); + }; + + global.Math.random = function() { + throw new Error('Should not be called.'); + }; + + global.setTimeout = function() { + throw new Error('Should not be called.'); + }; + + subscription.isOpen = false; + subscription.setLeaseTimeout_(); + }); }); describe('setMetadata', function() {