Skip to content

Commit

Permalink
Merge pull request #754 from callmehiphop/pubsub-pull-bug
Browse files Browse the repository at this point in the history
pubsub: fixed subscription pulling bug
  • Loading branch information
stephenplusplus committed Jul 28, 2015
2 parents 7efd2a8 + b773348 commit 3f281e9
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 6 deletions.
8 changes: 4 additions & 4 deletions lib/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ function Subscription(pubsub, options) {
this.makeReq_ = pubsub.makeReq_.bind(pubsub);

this.autoAck = util.is(options.autoAck, 'boolean') ? options.autoAck : false;
this.closed = false;
this.closed = true;
this.interval = util.is(options.interval, 'number') ? options.interval : 10;
this.inProgressAckIds = {};
this.maxInProgress =
Expand Down Expand Up @@ -210,16 +210,16 @@ Subscription.prototype.listenForEvents_ = function() {

this.on('newListener', function(event) {
if (event === 'message') {
this.messageListeners++;
self.messageListeners++;
if (self.closed) {
self.closed = false;
self.startPulling_();
}
self.startPulling_();
}
});

this.on('removeListener', function(event) {
if (event === 'message' && --this.messageListeners === 0) {
if (event === 'message' && --self.messageListeners === 0) {
self.closed = true;
}
});
Expand Down
30 changes: 28 additions & 2 deletions test/pubsub/subscription.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ describe('Subscription', function() {
assert.strictEqual(sub.maxInProgress, 3);
});

it('should not be closed', function() {
assert.strictEqual(subscription.closed, false);
it('should be closed', function() {
assert.strictEqual(subscription.closed, true);
});

it('should default autoAck to false if not specified', function() {
Expand Down Expand Up @@ -145,6 +145,19 @@ describe('Subscription', function() {
assert.strictEqual(subscription.messageListeners, 0);
});

it('should only run a single pulling loop', function() {
var startPullingCallCount = 0;

subscription.startPulling_ = function() {
startPullingCallCount++;
};

subscription.on('message', util.noop);
subscription.on('message', util.noop);

assert.strictEqual(startPullingCallCount, 1);
});

it('should close when no more message listeners are bound', function() {
subscription.startPulling_ = util.noop;
subscription.on('message', util.noop);
Expand Down Expand Up @@ -478,6 +491,8 @@ describe('Subscription', function() {
assert.strictEqual(options.returnImmediately, false);
done();
};

subscription.closed = false;
subscription.startPulling_();
});

Expand All @@ -486,6 +501,8 @@ describe('Subscription', function() {
assert.strictEqual(options.maxResults, undefined);
done();
};

subscription.closed = false;
subscription.startPulling_();
});

Expand All @@ -494,6 +511,8 @@ describe('Subscription', function() {
assert.strictEqual(options.maxResults, 1);
done();
};

subscription.closed = false;
subscription.maxInProgress = 4;
subscription.inProgressAckIds = { id1: true, id2: true, id3: true };
subscription.startPulling_();
Expand All @@ -507,6 +526,8 @@ describe('Subscription', function() {
callback(error);
});
};

subscription.closed = false;
subscription
.once('error', function(err) {
assert.equal(err, error);
Expand All @@ -524,6 +545,8 @@ describe('Subscription', function() {
callback(error, null, resp);
});
};

subscription.closed = false;
subscription
.once('error', function(err, apiResponse) {
assert.equal(err, error);
Expand All @@ -537,6 +560,7 @@ describe('Subscription', function() {
subscription.pull = function(options, callback) {
callback(null, [{ hi: 'there' }]);
};

subscription
.once('message', function(msg) {
assert.deepEqual(msg, { hi: 'there' });
Expand Down Expand Up @@ -576,6 +600,8 @@ describe('Subscription', function() {
// above.
fn();
};

subscription.closed = false;
subscription.interval = INTERVAL;
subscription.startPulling_();
});
Expand Down

0 comments on commit 3f281e9

Please sign in to comment.