-
Notifications
You must be signed in to change notification settings - Fork 591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pubsub: refactor subscription connection #2627
pubsub: refactor subscription connection #2627
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small stuff and questions.
@@ -32,6 +32,9 @@ var uuid = require('uuid'); | |||
var PKG = require('../package.json'); | |||
var v1 = require('./v1'); | |||
|
|||
var CHANNEL_READY = 'channel.ready'; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
connection.pause(); | ||
} | ||
|
||
self.once(CHANNEL_ERROR, function(err) { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
} | ||
|
||
var elapsedTimeWithoutConnection = 0; | ||
var deadline, timeout; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
*/ | ||
ConnectionPool.prototype.waitForReady = function() { | ||
var self = this; | ||
var READY = 2; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
/** | ||
* | ||
*/ | ||
ConnectionPool.prototype.watchForReady = function() { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
this.on('newListener', function(eventName) { | ||
if (eventName == CHANNEL_READY && !self.waiting) { | ||
clearImmediate(self.waitHandle); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
this.isOpen = false; | ||
this.waiting = false; | ||
|
||
this.removeAllListeners('newListener') |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
}); | ||
|
||
self.once(CHANNEL_READY, function() { | ||
connection.isConnected = true; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@stephenplusplus I think this is ready for another look through. |
@@ -59,6 +59,7 @@ | |||
"google-auto-auth": "^0.7.1", | |||
"google-gax": "^0.13.0", | |||
"google-proto-files": "^0.12.0", | |||
"grpc": "^1.6.0", |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pool.createConnection(); | ||
pool.emit(channelErrorEvent); | ||
|
||
assert.strictEqual(fakeConnection.canceled, true); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
pool.getClient = function(callback) { | ||
callback(fakeError); | ||
|
||
assert.strictEqual(pool.isGettingChannelState, false); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
getChannel: function() { | ||
return { | ||
getConnectivityState: function(shouldConnect) { | ||
assert.strictEqual(shouldConnect, false); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
elapsedTimeWithoutConnection = now - self.noConnectionsTime; | ||
} | ||
|
||
deadline = now + (300000 - elapsedTimeWithoutConnection); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -520,6 +523,9 @@ Subscription.prototype.exists = function(callback) { | |||
Subscription.prototype.flushQueues_ = function() { | |||
var self = this; | |||
|
|||
clearTimeout(this.flushTimeoutHandle_); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@stephenplusplus this should be good to go btw! |
Could this be merged ? seems it fixes the issue with acked messages. |
Relates to #2621, #2619, #2598, #2572