From b7733485d5326598ae83d1d8bffb52fe73da4dbe Mon Sep 17 00:00:00 2001 From: Dave Gramlich Date: Tue, 28 Jul 2015 14:59:04 -0400 Subject: [PATCH] pubsub: fixed subscription pulling bug Each time a message event was listened for, startPulling_ would be called increasing the total number of requests being made. This change added a check to prevent startPulling_ from being called when it didn't need to be --- lib/pubsub/subscription.js | 8 ++++---- test/pubsub/subscription.js | 30 ++++++++++++++++++++++++++++-- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/lib/pubsub/subscription.js b/lib/pubsub/subscription.js index faf4e68b50e..33a48803d89 100644 --- a/lib/pubsub/subscription.js +++ b/lib/pubsub/subscription.js @@ -134,7 +134,7 @@ function Subscription(pubsub, options) { this.makeReq_ = pubsub.makeReq_.bind(pubsub); this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false; - this.closed = false; + this.closed = true; this.interval = util.is(options.interval, 'number') ? options.interval : 10; this.inProgressAckIds = {}; this.maxInProgress = @@ -210,16 +210,16 @@ Subscription.prototype.listenForEvents_ = function() { this.on('newListener', function(event) { if (event === 'message') { - this.messageListeners++; + self.messageListeners++; if (self.closed) { self.closed = false; + self.startPulling_(); } - self.startPulling_(); } }); this.on('removeListener', function(event) { - if (event === 'message' && --this.messageListeners === 0) { + if (event === 'message' && --self.messageListeners === 0) { self.closed = true; } }); diff --git a/test/pubsub/subscription.js b/test/pubsub/subscription.js index 6f91a52cf40..066b3888233 100644 --- a/test/pubsub/subscription.js +++ b/test/pubsub/subscription.js @@ -75,8 +75,8 @@ describe('Subscription', function() { assert.strictEqual(sub.maxInProgress, 3); }); - it('should not be closed', function() { - assert.strictEqual(subscription.closed, false); + it('should be closed', function() { + assert.strictEqual(subscription.closed, true); }); it('should default autoAck to false if not specified', function() { @@ -145,6 +145,19 @@ describe('Subscription', function() { assert.strictEqual(subscription.messageListeners, 0); }); + it('should only run a single pulling loop', function() { + var startPullingCallCount = 0; + + subscription.startPulling_ = function() { + startPullingCallCount++; + }; + + subscription.on('message', util.noop); + subscription.on('message', util.noop); + + assert.strictEqual(startPullingCallCount, 1); + }); + it('should close when no more message listeners are bound', function() { subscription.startPulling_ = util.noop; subscription.on('message', util.noop); @@ -478,6 +491,8 @@ describe('Subscription', function() { assert.strictEqual(options.returnImmediately, false); done(); }; + + subscription.closed = false; subscription.startPulling_(); }); @@ -486,6 +501,8 @@ describe('Subscription', function() { assert.strictEqual(options.maxResults, undefined); done(); }; + + subscription.closed = false; subscription.startPulling_(); }); @@ -494,6 +511,8 @@ describe('Subscription', function() { assert.strictEqual(options.maxResults, 1); done(); }; + + subscription.closed = false; subscription.maxInProgress = 4; subscription.inProgressAckIds = { id1: true, id2: true, id3: true }; subscription.startPulling_(); @@ -507,6 +526,8 @@ describe('Subscription', function() { callback(error); }); }; + + subscription.closed = false; subscription .once('error', function(err) { assert.equal(err, error); @@ -524,6 +545,8 @@ describe('Subscription', function() { callback(error, null, resp); }); }; + + subscription.closed = false; subscription .once('error', function(err, apiResponse) { assert.equal(err, error); @@ -537,6 +560,7 @@ describe('Subscription', function() { subscription.pull = function(options, callback) { callback(null, [{ hi: 'there' }]); }; + subscription .once('message', function(msg) { assert.deepEqual(msg, { hi: 'there' }); @@ -576,6 +600,8 @@ describe('Subscription', function() { // above. fn(); }; + + subscription.closed = false; subscription.interval = INTERVAL; subscription.startPulling_(); });