Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub: refactor subscription connection #2627

Merged
merged 7 commits into from
Oct 2, 2017
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"google-auto-auth": "^0.7.1",
"google-gax": "^0.13.0",
"google-proto-files": "^0.12.0",
"grpc": "^1.6.0",

This comment was marked as spam.

This comment was marked as spam.

This comment was marked as spam.

"is": "^3.0.1",
"uuid": "^3.0.1"
},
Expand Down
165 changes: 127 additions & 38 deletions packages/pubsub/src/connection-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ var uuid = require('uuid');
var PKG = require('../package.json');
var v1 = require('./v1');

var CHANNEL_READY_EVENT = 'channel.ready';
var CHANNEL_ERROR_EVENT = 'channel.error';

// codes to retry streams
var RETRY_CODES = [
0, // ok
Expand Down Expand Up @@ -62,6 +65,7 @@ function ConnectionPool(subscription) {

this.isPaused = false;
this.isOpen = false;
this.isGettingChannelState = false;

this.failedConnectionAttempts = 0;
this.noConnectionsTime = 0;
Expand All @@ -72,7 +76,6 @@ function ConnectionPool(subscription) {
};

this.queue = [];
events.EventEmitter.call(this);

// grpc related fields we need since we're bypassing gax
this.metadata_ = new grpc.Metadata();
Expand All @@ -83,6 +86,7 @@ function ConnectionPool(subscription) {
'grpc/' + require('grpc/package.json').version
].join(' '));

events.EventEmitter.call(this);
this.open();
}

Expand Down Expand Up @@ -142,16 +146,25 @@ ConnectionPool.prototype.acquire = function(id, callback) {
* @param {?error} callback.error - An error returned while closing the pool.
*/
ConnectionPool.prototype.close = function(callback) {
var self = this;
var connections = Array.from(this.connections.values());

callback = callback || common.util.noop;

this.isOpen = false;
self.connections.clear();
this.queue.forEach(clearTimeout);
if (this.client) {
this.client.close();
}

this.connections.clear();
this.queue.forEach(clearTimeout);
this.queue.length = 0;

this.isOpen = false;
this.isGettingChannelState = false;

this.removeAllListeners('newListener')

This comment was marked as spam.

This comment was marked as spam.

.removeAllListeners(CHANNEL_READY_EVENT)
.removeAllListeners(CHANNEL_ERROR_EVENT);

this.failedConnectionAttempts = 0;
this.noConnectionsTime = 0;

Expand All @@ -177,28 +190,60 @@ ConnectionPool.prototype.createConnection = function() {
var connection = client.streamingPull(self.metadata_);
var errorImmediateHandle;

connection.on('error', function(err) {
// since this is a bidi stream it's possible that we recieve errors from
// reads or writes. We also want to try and cut down on the number of
// errors that we emit if other connections are still open. So by using
// setImmediate we're able to cancel the error message if it gets passed
// to the `status` event where we can check if the connection should be
// re-opened or if we should send the error to the user
errorImmediateHandle = setImmediate(self.emit.bind(self), 'error', err);
});
if (self.isPaused) {
connection.pause();
}

connection.on('metadata', function(metadata) {
if (!metadata.get('date').length) {
return;
}
self
.once(CHANNEL_ERROR_EVENT, onChannelError)
.once(CHANNEL_READY_EVENT, onChannelReady);

connection
.on('error', onConnectionError)
.on('data', onConnectionData)
.on('status', onConnectionStatus)
.write({
subscription: common.util.replaceProjectIdToken(
self.subscription.name, self.projectId),
streamAckDeadlineSeconds: self.settings.ackDeadline / 1000
});

self.connections.set(id, connection);

function onChannelError() {
self.removeListener(CHANNEL_READY_EVENT, onChannelReady);

connection.cancel();
}

function onChannelReady() {
self.removeListener(CHANNEL_ERROR_EVENT, onChannelError);

connection.isConnected = true;

This comment was marked as spam.

This comment was marked as spam.


self.noConnectionsTime = 0;
self.failedConnectionAttempts = 0;

self.emit('connected', connection);
});
}

connection.on('status', function(status) {
// since this is a bidi stream it's possible that we recieve errors from
// reads or writes. We also want to try and cut down on the number of
// errors that we emit if other connections are still open. So by using
// setImmediate we're able to cancel the error message if it gets passed
// to the `status` event where we can check if the connection should be
// re-opened or if we should send the error to the user
function onConnectionError(err) {
errorImmediateHandle = setImmediate(self.emit.bind(self), 'error', err);
}

function onConnectionData(data) {
arrify(data.receivedMessages).forEach(function(message) {
self.emit('message', self.createMessage(id, message));
});
}

function onConnectionStatus(status) {
clearImmediate(errorImmediateHandle);

connection.end();
Expand All @@ -219,25 +264,7 @@ ConnectionPool.prototype.createConnection = function() {
error.code = status.code;
self.emit('error', error);
}
});

connection.on('data', function(data) {
arrify(data.receivedMessages).forEach(function(message) {
self.emit('message', self.createMessage(id, message));
});
});

if (self.isPaused) {
connection.pause();
}

connection.write({
subscription: common.util.replaceProjectIdToken(
self.subscription.name, self.projectId),
streamAckDeadlineSeconds: self.settings.ackDeadline / 1000
});

self.connections.set(id, connection);
});
};

Expand Down Expand Up @@ -277,6 +304,59 @@ ConnectionPool.prototype.createMessage = function(connectionId, resp) {
};
};

/**
* Gets the channels connectivity state and emits channel events accordingly.
*
* @fires CHANNEL_ERROR_EVENT
* @fires CHANNEL_READY_EVENT
*/
ConnectionPool.prototype.getAndEmitChannelState = function() {
var self = this;

this.isGettingChannelState = true;

this.getClient(function(err, client) {
if (err) {
self.isGettingChannelState = false;
self.emit(CHANNEL_ERROR_EVENT);
self.emit('error', err);
return;
}

var READY_STATE = 2;

var channel = client.getChannel();
var connectivityState = channel.getConnectivityState(false);

if (connectivityState === READY_STATE) {
self.isGettingChannelState = false;
self.emit(CHANNEL_READY_EVENT);
return;
}

var elapsedTimeWithoutConnection = 0;
var now = Date.now();
var deadline;

if (self.noConnectionsTime) {
elapsedTimeWithoutConnection = now - self.noConnectionsTime;
}

deadline = now + (300000 - elapsedTimeWithoutConnection);

This comment was marked as spam.

This comment was marked as spam.


client.waitForReady(deadline, function(err) {
self.isGettingChannelState = false;

if (err) {
self.emit(CHANNEL_ERROR_EVENT, err);
return;
}

self.emit(CHANNEL_READY_EVENT);
});
});
};

/**
* Gets the Subscriber client. We need to bypass GAX until they allow deadlines
* to be optional.
Expand Down Expand Up @@ -312,6 +392,7 @@ ConnectionPool.prototype.getClient = function(callback) {
}

self.client = new Subscriber(address, credentials, {
'grpc.keepalive_time_ms': 300000,
'grpc.max_receive_message_length': 20000001,
'grpc.primary_user_agent': common.util.getUserAgentFromPackageJson(PKG)
});
Expand Down Expand Up @@ -375,6 +456,8 @@ ConnectionPool.prototype.isConnected = function() {
* Creates specified number of connections and puts pool in open state.
*/
ConnectionPool.prototype.open = function() {
var self = this;

var existing = this.connections.size;
var max = this.settings.maxConnections;

Expand All @@ -385,6 +468,12 @@ ConnectionPool.prototype.open = function() {
this.isOpen = true;
this.failedConnectionAttempts = 0;
this.noConnectionsTime = Date.now();

this.on('newListener', function(eventName) {
if (eventName === CHANNEL_READY_EVENT && !self.isGettingChannelState) {
self.getAndEmitChannelState();
}
});
};

/**
Expand Down
22 changes: 15 additions & 7 deletions packages/pubsub/src/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ function Subscription(pubsub, name, options) {
this.flushTimeoutHandle_ = null;
this.leaseTimeoutHandle_ = null;
this.userClosed_ = false;
this.isOpen = false;

events.EventEmitter.call(this);
this.messageListeners = 0;
Expand Down Expand Up @@ -328,12 +329,12 @@ Subscription.prototype.breakLease_ = function(message) {
Subscription.prototype.close = function(callback) {
this.userClosed_ = true;

var inventory = this.inventory_;
inventory.lease.length = inventory.bytes = 0;

clearTimeout(this.leaseTimeoutHandle_);
this.leaseTimeoutHandle_ = null;

clearTimeout(this.flushTimeoutHandle_);
this.flushTimeoutHandle_ = null;

this.flushQueues_();
this.closeConnection_(callback);
};
Expand All @@ -347,6 +348,8 @@ Subscription.prototype.close = function(callback) {
* @param {?error} err - An error returned from this request.
*/
Subscription.prototype.closeConnection_ = function(callback) {
this.isOpen = false;

if (this.connectionPool) {
this.connectionPool.close(callback || common.util.noop);
this.connectionPool = null;
Expand Down Expand Up @@ -520,6 +523,9 @@ Subscription.prototype.exists = function(callback) {
Subscription.prototype.flushQueues_ = function() {
var self = this;

clearTimeout(this.flushTimeoutHandle_);

This comment was marked as spam.

This comment was marked as spam.

this.flushTimeoutHandle_ = null;

var acks = this.inventory_.ack;
var nacks = this.inventory_.nack;

Expand Down Expand Up @@ -846,6 +852,8 @@ Subscription.prototype.openConnection_ = function() {
var self = this;
var pool = this.connectionPool = new ConnectionPool(this);

this.isOpen = true;

pool.on('error', function(err) {
self.emit('error', err);
});
Expand All @@ -859,8 +867,6 @@ Subscription.prototype.openConnection_ = function() {
});

pool.once('connected', function() {
clearTimeout(self.flushTimeoutHandle_);
self.flushTimeoutHandle_ = null;
self.flushQueues_();
});
};
Expand All @@ -874,14 +880,16 @@ Subscription.prototype.openConnection_ = function() {
Subscription.prototype.renewLeases_ = function() {
var self = this;

clearTimeout(this.leaseTimeoutHandle_);
this.leaseTimeoutHandle_ = null;

if (!this.inventory_.lease.length) {
return;
}

var ackIds = this.inventory_.lease;
this.ackDeadline = this.histogram.percentile(99);

var ackIds = this.inventory_.lease.slice();
var ackDeadlineSeconds = this.ackDeadline / 1000;

if (this.connectionPool) {
Expand Down Expand Up @@ -991,7 +999,7 @@ Subscription.prototype.setFlushTimeout_ = function() {
* @private
*/
Subscription.prototype.setLeaseTimeout_ = function() {
if (this.leaseTimeoutHandle_) {
if (this.leaseTimeoutHandle_ || !this.isOpen) {
return;
}

Expand Down
Loading