Skip to content

Commit

Permalink
pubsub: support auto-named subscriptions.
Browse files Browse the repository at this point in the history
  • Loading branch information
stephenplusplus committed Jul 29, 2015
1 parent a108687 commit 91760c8
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 116 deletions.
38 changes: 22 additions & 16 deletions lib/pubsub/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

'use strict';

var extend = require('extend');

/**
* @type {module:pubsub/subscription}
* @private
Expand Down Expand Up @@ -229,14 +231,14 @@ PubSub.prototype.createTopic = function(name, callback) {
*
* @param {module:pubsub/topic|string} - topic - The Topic to create a
* subscription to.
* @param {string} subName - The name of the subscription.
* @param {object=} options - Configuration object.
* @param {number} options.ackDeadlineSeconds - The maximum time after receiving
* a message that you must ack a message before it is redelivered.
* @param {boolean} options.autoAck - Automatically acknowledge the message once
* it's pulled. (default: false)
* @param {number} options.interval - Interval in milliseconds to check for new
* messages. (default: 10)
* @param {string} options.name - The name of the subscription.
* @param {boolean} options.reuseExisting - If the subscription already exists,
* reuse it. The options of the existing subscription are not changed. If
* false, attempting to create a subscription that already exists will fail.
Expand All @@ -250,18 +252,16 @@ PubSub.prototype.createTopic = function(name, callback) {
* //-
* // Subscribe to a topic. (Also see {module:pubsub/topic#subscribe}).
* //-
* var topic = 'messageCenter';
* var name = 'newMessages';
* var topicName = 'message-center';
*
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
* pubsub.subscribe(topicName, function(err, subscription, apiResponse) {});
*
* //-
* // Customize the subscription.
* //-
* pubsub.subscribe(topic, name, {
* ackDeadlineSeconds: 90,
* pubsub.subscribe(topic, {
* autoAck: true,
* interval: 30
* name: 'my-new-subscription'
* }, function(err, subscription, apiResponse) {});
*
* //-
Expand All @@ -273,18 +273,16 @@ PubSub.prototype.createTopic = function(name, callback) {
*
* var topic = anotherProject.topic('messageCenter');
*
* pubsub.subscribe(topic, name, function(err, subscription, apiResponse) {});
* pubsub.subscribe(topic, function(err, subscription, apiResponse) {});
*/
PubSub.prototype.subscribe = function(topic, subName, options, callback) {
PubSub.prototype.subscribe = function(topic, options, callback) {
var self = this;

if (!util.is(topic, 'string') && !(topic instanceof Topic)) {
throw new Error('A Topic is required for a new subscription.');
}

if (!util.is(subName, 'string')) {
throw new Error('A subscription name is required for a new subscription.');
}

if (!callback) {
if (util.is(options, 'function')) {
callback = options;
options = {};
}
Expand All @@ -303,14 +301,22 @@ PubSub.prototype.subscribe = function(topic, subName, options, callback) {
body.ackDeadlineSeconds = options.ackDeadlineSeconds;
}

var subscription = this.subscription(subName, options);
var newSubName = '';

this.makeReq_('PUT', subscription.name, null, body, function(err, result) {
if (options.name) {
newSubName = Subscription.formatName_(self.projectId, options.name);
delete options.name;
}

this.makeReq_('PUT', newSubName, null, body, function(err, result) {
if (err && !(err.code === 409 && options.reuseExisting)) {
callback(err, null, result);
return;
}

var opts = extend({}, result, options);
var subscription = self.subscription(result.name, opts);

callback(null, subscription, result);
});
};
Expand Down
12 changes: 6 additions & 6 deletions lib/pubsub/topic.js
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ Topic.prototype.getSubscriptions = function(options, callback) {
* once it's pulled. (default: false)
* @param {number=} options.interval - Interval in milliseconds to check for new
* messages. (default: 10)
* @param {string} options.name - The name of the subscription.
* @param {boolean=} options.reuseExisting - If the subscription already exists,
* reuse it. The options of the existing subscription are not changed. If
* false, attempting to create a subscription that already exists will fail.
Expand All @@ -282,17 +283,16 @@ Topic.prototype.getSubscriptions = function(options, callback) {
*
* @example
* // Without specifying any options.
* topic.subscribe('newMessages', function(err, subscription, apiResponse) {});
* topic.subscribe(function(err, subscription, apiResponse) {});
*
* // With options.
* topic.subscribe('newMessages', {
* ackDeadlineSeconds: 90,
* topic.subscribe({
* autoAck: true,
* interval: 30
* name: 'my-new-subscription'
* }, function(err, subscription, apiResponse) {});
*/
Topic.prototype.subscribe = function(subName, options, callback) {
this.pubsub.subscribe(this, subName, options, callback);
Topic.prototype.subscribe = function(options, callback) {
this.pubsub.subscribe(this, options, callback);
};

/**
Expand Down
25 changes: 18 additions & 7 deletions system-test/pubsub.js
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ describe('pubsub', function() {
var SUBSCRIPTIONS = [
{
name: SUB_NAMES[0],
options: { ackDeadlineSeconds: 30 }
ackDeadlineSeconds: 30
},
{
name: SUB_NAMES[1],
options: { ackDeadlineSeconds: 60 }
ackDeadlineSeconds: 60
}
];

Expand All @@ -162,9 +162,7 @@ describe('pubsub', function() {
topic = newTopic;

// Create subscriptions.
async.parallel(SUBSCRIPTIONS.map(function(sub) {
return topic.subscribe.bind(topic, sub.name, sub.options);
}), done);
async.each(SUBSCRIPTIONS, topic.subscribe.bind(topic), done);
});
});

Expand Down Expand Up @@ -227,11 +225,24 @@ describe('pubsub', function() {
});
});

it('should allow creation and deletion of a subscription', function(done) {
it('should create and delete a named subscription', function(done) {
var subName = generateSubName();
topic.subscribe(subName, function(err, sub) {
var fullSubName = Subscription.formatName_(pubsub.projectId, subName);

topic.subscribe({
name: subName
}, function(err, sub) {
assert.ifError(err);
assert(sub instanceof Subscription);
assert.strictEqual(sub.name, fullSubName);
sub.delete(done);
});
});

it('should automatically assign a subscription name', function(done) {
topic.subscribe(function(err, sub) {
assert.ifError(err);
assert.notStrictEqual(sub.name, '');
sub.delete(done);
});
});
Expand Down
Loading

0 comments on commit 91760c8

Please sign in to comment.