Skip to content

Commit

Permalink
Add message ordering samples (#220)
Browse files Browse the repository at this point in the history
* Initial commit.

* Add unit test.

* Add message ordering sample.

* Rename a sample.

* Address comments.
  • Loading branch information
jmdobry authored Sep 28, 2016
1 parent 1217387 commit ff7bda4
Show file tree
Hide file tree
Showing 13 changed files with 282 additions and 38 deletions.
1 change: 0 additions & 1 deletion bigquery/quickstart.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

// [START bigquery_quickstart]
// Imports and instantiates the Google Cloud client library
// for Google BigQuery
const bigquery = require('@google-cloud/bigquery')({
projectId: 'YOUR_PROJECT_ID'
});
Expand Down
1 change: 0 additions & 1 deletion datastore/quickstart.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

// [START datastore_quickstart]
// Imports and instantiates the Google Cloud client library
// for Google Cloud Datastore
const datastore = require('@google-cloud/datastore')({
projectId: 'YOUR_PROJECT_ID'
});
Expand Down
1 change: 0 additions & 1 deletion logging/quickstart.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

// [START logging_quickstart]
// Imports and instantiates the Google Cloud client library
// for Stackdriver Logging
const logging = require('@google-cloud/logging')({
projectId: 'YOUR_PROJECT_ID'
});
Expand Down
1 change: 1 addition & 0 deletions pubsub/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
"yargs": "^5.0.0"
},
"devDependencies": {
"async": "^2.0.1",
"mocha": "^3.0.2",
"node-uuid": "^1.4.7"
},
Expand Down
1 change: 0 additions & 1 deletion pubsub/quickstart.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

// [START pubsub_quickstart]
// Imports and instantiates the Google Cloud client library
// for Google Cloud Pub/Sub
const pubsub = require('@google-cloud/pubsub')({
projectId: 'YOUR_PROJECT_ID'
});
Expand Down
108 changes: 100 additions & 8 deletions pubsub/subscriptions.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@

'use strict';

const pubsubClient = require(`@google-cloud/pubsub`)();
const PubSub = require(`@google-cloud/pubsub`);

// [START pubsub_list_subscriptions]
function listSubscriptions (callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// Lists all subscriptions in the current project
pubsubClient.getSubscriptions((err, subscriptions) => {
if (err) {
Expand All @@ -43,6 +46,9 @@ function listSubscriptions (callback) {

// [START pubsub_list_topic_subscriptions]
function listTopicSubscriptions (topicName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing topic, e.g. "my-topic"
const topic = pubsubClient.topic(topicName);

Expand All @@ -62,6 +68,9 @@ function listTopicSubscriptions (topicName, callback) {

// [START pubsub_create_subscription]
function createSubscription (topicName, subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing topic, e.g. "my-topic"
const topic = pubsubClient.topic(topicName);

Expand All @@ -80,16 +89,18 @@ function createSubscription (topicName, subscriptionName, callback) {

// [START pubsub_create_push_subscription]
function createPushSubscription (topicName, subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing topic, e.g. "my-topic"
const topic = pubsubClient.topic(topicName);
const projectId = process.env.GCLOUD_PROJECT || 'YOU_PROJECT_ID';

// Creates a new push subscription, e.g. "my-new-subscription"
topic.subscribe(subscriptionName, {
pushConfig: {
// Set to an HTTPS endpoint of your choice. If necessary, register
// (authorize) the domain on which the server is hosted.
pushEndpoint: `https://${projectId}.appspot.com/push`
pushEndpoint: `https://${pubsubClient.projectId}.appspot.com/push`
}
}, (err, subscription) => {
if (err) {
Expand All @@ -105,6 +116,9 @@ function createPushSubscription (topicName, subscriptionName, callback) {

// [START pubsub_delete_subscription]
function deleteSubscription (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

Expand All @@ -121,8 +135,11 @@ function deleteSubscription (subscriptionName, callback) {
}
// [END pubsub_delete_subscription]

// [START pubsub_get_subscription_metadata]
function getSubscriptionMetadata (subscriptionName, callback) {
// [START pubsub_get_subscription]
function getSubscription (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

Expand All @@ -140,10 +157,13 @@ function getSubscriptionMetadata (subscriptionName, callback) {
callback();
});
}
// [END pubsub_get_subscription_metadata]
// [END pubsub_get_subscription]

// [START pubsub_pull_messages]
function pullMessages (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

Expand All @@ -168,8 +188,73 @@ function pullMessages (subscriptionName, callback) {
}
// [END pubsub_pull_messages]

let subscribeCounterValue = 1;

function getSubscribeCounterValue () {
return subscribeCounterValue;
}

function setSubscribeCounterValue (value) {
subscribeCounterValue = value;
}

// [START pubsub_pull_ordered_messages]
const outstandingMessages = {};

function pullOrderedMessages (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

// Pulls messages. Set returnImmediately to false to block until messages are
// received.
subscription.pull({ returnImmediately: true }, (err, messages) => {
if (err) {
callback(err);
return;
}

// Pub/Sub messages are unordered, so here we manually order messages by
// their "counterId" attribute which was set when they were published.
messages.forEach((message) => {
outstandingMessages[message.attributes.counterId] = message;
});

const outstandingIds = Object.keys(outstandingMessages).map((counterId) => +counterId);
outstandingIds.sort();

outstandingIds.forEach((counterId) => {
const counter = getSubscribeCounterValue();
const message = outstandingMessages[counterId];

if (counterId < counter) {
// The message has already been processed
subscription.ack(message.ackId);
delete outstandingMessages[counterId];
} else if (counterId === counter) {
// Process the message
console.log(`* %d %j %j`, message.id, message.data, message.attributes);

setSubscribeCounterValue(counterId + 1);
subscription.ack(message.ackId);
delete outstandingMessages[counterId];
} else {
// Have not yet processed the message on which this message is dependent
return false;
}
});
callback();
});
}
// [END pubsub_pull_ordered_messages]

// [START pubsub_get_subscription_policy]
function getSubscriptionPolicy (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

Expand All @@ -188,6 +273,9 @@ function getSubscriptionPolicy (subscriptionName, callback) {

// [START pubsub_set_subscription_policy]
function setSubscriptionPolicy (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

Expand Down Expand Up @@ -222,6 +310,9 @@ function setSubscriptionPolicy (subscriptionName, callback) {

// [START pubsub_test_subscription_permissions]
function testSubscriptionPermissions (subscriptionName, callback) {
// Instantiates the client library
const pubsubClient = PubSub();

// References an existing subscription, e.g. "my-subscription"
const subscription = pubsubClient.subscription(subscriptionName);

Expand Down Expand Up @@ -253,8 +344,9 @@ const program = module.exports = {
createSubscription: createSubscription,
createPushSubscription: createPushSubscription,
deleteSubscription: deleteSubscription,
getSubscriptionMetadata: getSubscriptionMetadata,
getSubscription: getSubscription,
pullMessages: pullMessages,
pullOrderedMessages: pullOrderedMessages,
getSubscriptionPolicy: getSubscriptionPolicy,
setSubscriptionPolicy: setSubscriptionPolicy,
testSubscriptionPermissions: testSubscriptionPermissions,
Expand Down Expand Up @@ -283,7 +375,7 @@ cli
program.deleteSubscription(options.subscriptionName, makeHandler(false));
})
.command(`get <subscriptionName>`, `Gets the metadata for a subscription.`, {}, (options) => {
program.getSubscriptionMetadata(options.subscriptionName, makeHandler(false));
program.getSubscription(options.subscriptionName, makeHandler(false));
})
.command(`pull <subscriptionName>`, `Pulls messages for a subscription.`, {}, (options) => {
program.pullMessages(options.subscriptionName, makeHandler(false));
Expand Down
45 changes: 44 additions & 1 deletion pubsub/system-test/subscriptions.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

'use strict';

const async = require(`async`);
const pubsub = require(`@google-cloud/pubsub`)();
const uuid = require(`node-uuid`);
const path = require(`path`);
Expand Down Expand Up @@ -108,10 +109,52 @@ describe(`pubsub:subscriptions`, () => {
`* ${messageIds[0]} "${expected}" {}`;
assert.equal(output, expectedOutput);
done();
}, 5000);
}, 2000);
});
});

it(`should pull ordered messages`, (done) => {
const subscriptions = require('../subscriptions');
const expected = `Hello, world!`;
const publishedMessageIds = [];

async.waterfall([
(cb) => {
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '3' } }, cb);
},
(messageIds, apiResponse, cb) => {
publishedMessageIds.push(messageIds[0]);
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
},
(cb) => {
assert.equal(console.log.callCount, 0);
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '1' } }, cb);
},
(messageIds, apiResponse, cb) => {
publishedMessageIds.push(messageIds[0]);
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
},
(cb) => {
assert.equal(console.log.callCount, 1);
assert.deepEqual(console.log.firstCall.args, [`* %d %j %j`, publishedMessageIds[1], expected, { counterId: '1' }]);
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '1' } }, cb);
},
(messageIds, apiResponse, cb) => {
pubsub.topic(topicName).publish({ data: expected, attributes: { counterId: '2' } }, cb);
},
(messageIds, apiResponse, cb) => {
publishedMessageIds.push(messageIds[0]);
setTimeout(() => subscriptions.pullOrderedMessages(subscriptionNameOne, cb), 2000);
},
(cb) => {
assert.equal(console.log.callCount, 3);
assert.deepEqual(console.log.secondCall.args, [`* %d %j %j`, publishedMessageIds[2], expected, { counterId: '2' }]);
assert.deepEqual(console.log.thirdCall.args, [`* %d %j %j`, publishedMessageIds[0], expected, { counterId: '3' }]);
cb();
}
], done);
});

it(`should set the IAM policy for a subscription`, (done) => {
run(`${cmd} set-policy ${subscriptionNameOne}`, cwd);
pubsub.subscription(subscriptionNameOne).iam.getPolicy((err, policy) => {
Expand Down
Loading

0 comments on commit ff7bda4

Please sign in to comment.