From a282b5486744ef3ba386405727f1b640542924e5 Mon Sep 17 00:00:00 2001 From: Ryan Witt Date: Wed, 21 Mar 2018 14:01:11 -0400 Subject: [PATCH] more changes for #18 --- CHANGELOG.md | 61 +++++++++ package.json | 3 +- src/cli.js | 3 +- src/enqueue.js | 18 +-- src/qrlCache.js | 26 ++++ src/worker.js | 4 +- test/fixtures/test-fifo01-x24.batch | 24 ++++ test/test.cli.js | 185 +++++++++++++++++++++++++++- 8 files changed, 304 insertions(+), 20 deletions(-) create mode 100644 CHANGELOG.md create mode 100644 test/fixtures/test-fifo01-x24.batch diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..3031eff --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,61 @@ +# Changelog + +[v.1.3.0-alpha] (Unreleased) +----------------------------- + +## New Features + +### FIFO Option (#18) + +Added a `--fifo` and `--group-id ` option to `equeue` and `enqueue-batch` +- Causes any new queues to be created as FIFO queues +- Causes the `.fifo` suffix to be appended to any queue names that do not explicitly have them +- Causes failed queues to take the form `${name}_failed.fifo` +- Any commands with the same `--group-id` will be worked on in the order they were received by SQS (see [FIFO docs](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html)) +- If you don't set `--group-id` it defaults to a unique id per call to `qdone`, so this means messages sent by `enqueue-batch` will always be ordered as you sent them. +- There is NO option to set group id per-message in `enqueue-batch`. Adding this feature in the future will change the format of the batch input file. +- There is NO support right now for Content Deduplication, however a Unique Message Deduplication ID is generated for each command, so retry-able errors should not result in duplicate messages. + +Added a `--fifo` option to `worker` +- Causes the `.fifo` suffix to be appended to any queue names that do not explicitly have them +- When wildcard names are specified (e.g. `test_*` or `*`), worker only listens to queues with a `.fifo` suffix. +- Failed queues are still only included if `--include-failed` is set. +- Regardless of how many workers you have, FIFO commands with the same `--group-id` will only be executed by one worker at a time. +- There is NO support right now for only-once processing using the Receive Request Attempt ID + +## Bug Fixes + +- Fixed (#29) bug in `enqueue-batch` where SQS batches where command lines added up to > 256kb would not be split correctly and loop + +## Under the hood + +- Increased test coverage related to (#29) +- Added test coverage for (#18) +- Updated command line args libraries + + +[v1.2.0] (January 5, 2018) +--------------------------- + +## Bug Fixes + +- #22 fixes exception deleting failed queues in paired mode when fail queue does not exist + + +[v1.1.0]. (December 25, 2017) +----------------------------- + +## New Features + +- Add experimental support for using exports in node. Exports various functions from enqueue and worker for use from node. Doesn't change the public facing interface (which is command line only). + + +[v1.0.0]. (August 8, 2017) +-------------------------- + +## New Features + +- There is a new command called [`idle-queues`](https://github.com/suredone/qdone#idle-queues-usage) which can identify queues that have had no activity for a specified period of time, and delete them, if desired. +- Qdone's `worker` now [allows a child process to finish running](https://github.com/suredone/qdone#shutdown-behavior) before shutting down in response to a `SIGTERM` or `SIGINT`. +- Queues are now always resolved, and the `--always-resolve` option has been removed. +- Output to non TTYs is less chatty by default, but you can get the previous behavior by using `--verbose`, or silence output in a TTY by using `--quiet`. diff --git a/package.json b/package.json index bab7dd7..3fa1639 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,8 @@ }, "scripts": { "test": "standard && nyc mocha", - "coverage": "nyc report --reporter=text-lcov | coveralls" + "coverage": "nyc report --reporter=text-lcov | coveralls", + "standard": "standard" }, "author": "Ryan Witt", "license": "MIT", diff --git a/src/cli.js b/src/cli.js index 17ce580..0397055 100644 --- a/src/cli.js +++ b/src/cli.js @@ -170,7 +170,8 @@ exports.worker = function worker (argv) { { name: 'kill-after', alias: 'k', type: Number, defaultValue: 30, description: 'Kill job after this many seconds [default: 30]' }, { name: 'wait-time', alias: 'w', type: Number, defaultValue: 20, description: 'Listen at most this long on each queue [default: 20]' }, { name: 'include-failed', type: Boolean, description: 'When using \'*\' do not ignore fail queues.' }, - { name: 'drain', type: Boolean, description: 'Run until no more work is found and quit. NOTE: if used with --wait-time 0, this option will not drain queues.' } + { name: 'drain', type: Boolean, description: 'Run until no more work is found and quit. NOTE: if used with --wait-time 0, this option will not drain queues.' }, + { name: 'fifo', alias: 'f', type: Boolean, description: 'Automatically adds .fifo to queue names. Only listens to fifo queues when using \'*\'.' } ].concat(globalOptionDefinitions) const usageSections = [ diff --git a/src/enqueue.js b/src/enqueue.js index 5585ffc..f4460d7 100644 --- a/src/enqueue.js +++ b/src/enqueue.js @@ -14,10 +14,7 @@ function createFailQueue (fqueue, fqname, deadLetterTargetArn, options) { Attributes: {}, QueueName: fqname } - if (options.fifo) { - params.Attributes.FifoQueue = 'true' - params.QueueName = params.QueueName + '.fifo' - } + if (options.fifo) params.Attributes.FifoQueue = 'true' return sqs .createQueue(params) .promise() @@ -37,10 +34,7 @@ function createQueue (queue, qname, deadLetterTargetArn, options) { }, QueueName: qname } - if (options.fifo) { - params.Attributes.FifoQueue = 'true' - params.QueueName = params.QueueName + '.fifo' - } + if (options.fifo) params.Attributes.FifoQueue = 'true' return sqs .createQueue(params) .promise() @@ -149,7 +143,6 @@ function flushMessages (qrl, options) { if (data.Failed && data.Failed.length) { const err = new Error('One or more message failures: ' + JSON.stringify(data.Failed)) err.Failed = data.Failed - debug(err) throw err } // If we actually managed to flush any of them @@ -226,8 +219,9 @@ function getQrl (queue, qname, fqueue, fqname, options) { exports.enqueue = function enqueue (queue, command, options) { debug('enqueue(', queue, command, ')') + queue = qrlCache.normalizeQueueName(queue, options) const qname = options.prefix + queue - const fqueue = queue + options['fail-suffix'] + const fqueue = qrlCache.normalizeFailQueueName(queue, options) const fqname = options.prefix + fqueue // Now that we have the queue, send our message @@ -243,10 +237,10 @@ exports.enqueueBatch = function enqueueBatch (pairs, options) { debug('enqueueBatch(', pairs, ')') function unpackPair (pair) { - const queue = pair.queue + const queue = qrlCache.normalizeQueueName(pair.queue, options) const command = pair.command const qname = options.prefix + queue - const fqueue = queue + options['fail-suffix'] + const fqueue = qrlCache.normalizeFailQueueName(queue, options) const fqname = options.prefix + fqueue return { queue, qname, fqueue, fqname, command } } diff --git a/src/qrlCache.js b/src/qrlCache.js index 85469c2..a8227c4 100644 --- a/src/qrlCache.js +++ b/src/qrlCache.js @@ -12,6 +12,28 @@ function _get (qname) { return qcache[qname] } +// +// Normalizes a queue name to end with .fifo if options.fifo is set +// +const fifoSuffix = '.fifo' +exports.normalizeQueueName = function normalizeQueueName (qname, options) { + const sliced = qname.slice(0, -fifoSuffix.length) + const suffix = qname.slice(-fifoSuffix.length) + const base = suffix === fifoSuffix ? sliced : qname + return base + (options.fifo && qname.slice(-1) !== '*' ? fifoSuffix : '') +} + +// +// Normalizes fail queue name, appending both --fail-suffix and .fifo depending on options +// +exports.normalizeFailQueueName = function normalizeFailQueueName (qname, options) { + qname = exports.normalizeQueueName(qname, {fifo: false}) // strip .fifo if it is there + const sliced = qname.slice(0, -options['fail-suffix'].length) + const suffix = qname.slice(-options['fail-suffix'].length) + const base = suffix === options['fail-suffix'] ? sliced : qname // strip --fail-suffix if it is there + return (base + options['fail-suffix']) + (options.fifo ? fifoSuffix : '') +} + // // Clear cache // @@ -85,6 +107,10 @@ exports.getQnameUrlPairs = function getQnameUrlPairs (qnames, options) { .promise() .then(function (data) { debug('listQueues return', data) + if (options.fifo) { + // Remove non-fifo queues + data.QueueUrls = data.QueueUrls.filter(url => url.slice(-fifoSuffix.length) === fifoSuffix) + } return ingestQRLs(data.QueueUrls || []) }) : exports diff --git a/src/worker.js b/src/worker.js index 142f18f..d79b0fe 100644 --- a/src/worker.js +++ b/src/worker.js @@ -157,7 +157,7 @@ function pollForJobs (qname, qrl, options) { // exports.listen = function listen (queues, options) { if (options.verbose) console.error(chalk.blue('Resolving queues: ') + queues.join(' ')) - const qnames = queues.map(function (queue) { return options.prefix + queue }) + const qnames = queues.map(function (queue) { return options.prefix + qrlCache.normalizeQueueName(queue, options) }) return qrlCache .getQnameUrlPairs(qnames, options) .then(function (entries) { @@ -170,7 +170,7 @@ exports.listen = function listen (queues, options) { // Don't listen to fail queues... unless user wants to entries = entries .filter(function (entry) { - const suf = options['fail-suffix'] + const suf = options['fail-suffix'] + (options.fifo ? '.fifo' : '') return options['include-failed'] ? true : entry.qname.slice(-suf.length) !== suf }) diff --git a/test/fixtures/test-fifo01-x24.batch b/test/fixtures/test-fifo01-x24.batch new file mode 100644 index 0000000..ddfa16a --- /dev/null +++ b/test/fixtures/test-fifo01-x24.batch @@ -0,0 +1,24 @@ +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true +test.fifo true diff --git a/test/test.cli.js b/test/test.cli.js index f7ae5e8..6bc8d9b 100644 --- a/test/test.cli.js +++ b/test/test.cli.js @@ -106,7 +106,7 @@ describe('cli', function () { // Enqueue describe('qdone enqueue', function () { it('should print usage and exit 1 with error', - cliTest(['enqueue'], null, function (err, stdout, stderr) { + cliTest(['enqueue', '--verbose'], null, function (err, stdout, stderr) { expect(stdout).to.contain('usage: ') expect(stderr).to.contain('') expect(err).to.be.an('error') @@ -123,7 +123,7 @@ describe('cli', function () { describe('qdone enqueue onlyQueue', function () { it('should print usage and exit 1 with error', - cliTest(['enqueue', 'onlyQueue'], null, function (err, stdout, stderr) { + cliTest(['enqueue', '--verbose', 'onlyQueue'], null, function (err, stdout, stderr) { expect(stdout).to.contain('usage: ') expect(stderr).to.contain('') expect(err).to.be.an('error') @@ -160,7 +160,7 @@ describe('cli', function () { }) }) it('should print id of enqueued message and exit 0', - cliTest(['enqueue', 'testQueue', 'true'], function (result, stdout, stderr) { + cliTest(['enqueue', '--verbose', 'testQueue', 'true'], function (result, stdout, stderr) { expect(stderr).to.contain('Enqueued job da68f62c-0c07-4bee-bf5f-7e856EXAMPLE') })) }) @@ -198,7 +198,7 @@ describe('cli', function () { }) }) it('should print id of enqueued message and exit 0', - cliTest(['enqueue', '--fifo', '--group-id', 'gidtest', 'testQueue', 'true'], function (result, stdout, stderr) { + cliTest(['enqueue', '--verbose', '--fifo', '--group-id', 'gidtest', 'testQueue', 'true'], function (result, stdout, stderr) { expect(stderr).to.contain('Enqueued job da68f62c-0c07-4bee-bf5f-7e856EXAMPLE') })) }) @@ -267,6 +267,96 @@ describe('cli', function () { })) }) + describe('qdone enqueue --quiet testQueue true # (queue does not exist)', function () { + before(function () { + AWS.mock('SQS', 'getQueueUrl', function (params, callback) { + const err = new Error('Queue does not exist.') + err.code = 'AWS.SimpleQueueService.NonExistentQueue' + callback(err) + }) + AWS.mock('SQS', 'createQueue', function (params, callback) { + callback(null, {QueueUrl: `https://q.amazonaws.com/123456789101/${params.QueueName}`}) + }) + AWS.mock('SQS', 'getQueueAttributes', function (params, callback) { + callback(null, { + Attributes: { + ApproximateNumberOfMessages: '0', + ApproximateNumberOfMessagesDelayed: '0', + ApproximateNumberOfMessagesNotVisible: '0', + CreatedTimestamp: '1442426968', + DelaySeconds: '0', + LastModifiedTimestamp: '1442426968', + MaximumMessageSize: '262144', + MessageRetentionPeriod: '345600', + QueueArn: 'arn:aws:sqs:us-east-1:80398EXAMPLE:MyNewQueue', + ReceiveMessageWaitTimeSeconds: '0', + RedrivePolicy: `{'deadLetterTargetArn':'arn:aws:sqs:us-east-1:80398EXAMPLE:${params.QueueName}','maxReceiveCount':1000}`, + VisibilityTimeout: '30' + } + }) + }) + AWS.mock('SQS', 'sendMessage', function (params, callback) { + callback(null, { + MD5OfMessageAttributes: '00484c68...59e48f06', + MD5OfMessageBody: '51b0a325...39163aa0', + MessageId: 'da68f62c-0c07-4bee-bf5f-7e856EXAMPLE' + }) + }) + }) + it('should create queues, print nothing and exit 0', + cliTest(['enqueue', '--quiet', 'testQueue', 'true'], function (result, stdout, stderr) { + expect(stderr).to.equal('') + expect(stdout).to.equal('') + })) + }) + + describe('qdone enqueue testQueue true # (unhandled error on fail queue creation)', function () { + before(function () { + var code = 'AWS.SimpleQueueService.NonExistentQueue' + AWS.mock('SQS', 'getQueueUrl', function (params, callback) { + const err = new Error('Queue does not exist.') + err.code = code + code = 'AWS.SimpleQueueService.SomeOtherError' + callback(err) + }) + AWS.mock('SQS', 'createQueue', function (params, callback) { + // callback(null, {QueueUrl: `https://q.amazonaws.com/123456789101/${params.QueueName}`}) + const err = new Error('Some Other Error.') + err.code = 'AWS.SimpleQueueService.SomeOtherError' + callback(err) + }) + AWS.mock('SQS', 'getQueueAttributes', function (params, callback) { + callback(null, { + Attributes: { + ApproximateNumberOfMessages: '0', + ApproximateNumberOfMessagesDelayed: '0', + ApproximateNumberOfMessagesNotVisible: '0', + CreatedTimestamp: '1442426968', + DelaySeconds: '0', + LastModifiedTimestamp: '1442426968', + MaximumMessageSize: '262144', + MessageRetentionPeriod: '345600', + QueueArn: 'arn:aws:sqs:us-east-1:80398EXAMPLE:MyNewQueue', + ReceiveMessageWaitTimeSeconds: '0', + RedrivePolicy: `{'deadLetterTargetArn':'arn:aws:sqs:us-east-1:80398EXAMPLE:${params.QueueName}','maxReceiveCount':1000}`, + VisibilityTimeout: '30' + } + }) + }) + AWS.mock('SQS', 'sendMessage', function (params, callback) { + callback(null, { + MD5OfMessageAttributes: '00484c68...59e48f06', + MD5OfMessageBody: '51b0a325...39163aa0', + MessageId: 'da68f62c-0c07-4bee-bf5f-7e856EXAMPLE' + }) + }) + }) + it('should print traceback and exit 1 with error', + cliTest(['enqueue', '--verbose', 'testQueue', 'true'], null, function (err, stdout, stderr) { + expect(err).to.be.an('error') + })) + }) + // Enqueue batch describe('qdone enqueue-batch', function () { it('should print usage and exit 1 with error', @@ -338,6 +428,38 @@ describe('cli', function () { })) }) + describe('qdone enqueue-batch test/fixtures/test-unique01-x24.batch # (queue exists, some failures)', function () { + before(function () { + AWS.mock('SQS', 'getQueueUrl', function (params, callback) { + callback(null, {QueueUrl: `https://q.amazonaws.com/123456789101/${params.QueueName}`}) + }) + var messageId = 0 + AWS.mock('SQS', 'sendMessageBatch', function (params, callback) { + callback(null, { + Failed: params.Entries.slice(0, 2).map(message => ({ + MD5OfMessageAttributes: '00484c68...59e48f06', + MD5OfMessageBody: '51b0a325...39163aa0', + MessageId: 'da68f62c-0c07-4bee-bf5f-56EXAMPLE-' + messageId++ + })), + Successful: params.Entries.slice(2).map(message => ({ + MD5OfMessageAttributes: '00484c68...59e48f06', + MD5OfMessageBody: '51b0a325...39163aa0', + MessageId: 'da68f62c-0c07-4bee-bf5f-56EXAMPLE-' + messageId++ + })) + }) + }) + }) + it('should exit 1 and show which messages failed', + cliTest(['enqueue-batch', '--verbose', 'test/fixtures/test-unique01-x24.batch'], null, function (err, stdout, stderr) { + expect(stderr).to.contain('Error: One or more message failures') + expect(err).to.be.an('error') + // Expect some ids of failed messages + for (var messageId = 0; messageId < 2; messageId++) { + expect(stderr).to.contain('da68f62c-0c07-4bee-bf5f-56EXAMPLE-' + messageId) + } + })) + }) + describe('qdone enqueue-batch --quiet test/fixtures/test-unique01-x24.batch # (queue exists)', function () { before(function () { AWS.mock('SQS', 'getQueueUrl', function (params, callback) { @@ -362,6 +484,61 @@ describe('cli', function () { })) }) + describe('qdone enqueue-batch --fifo test/fixtures/test-fifo01-x24.batch # (queue does not exist)', function () { + before(function () { + AWS.mock('SQS', 'getQueueUrl', function (params, callback) { + const err = new Error('Queue does not exist.') + err.code = 'AWS.SimpleQueueService.NonExistentQueue' + callback(err) + }) + AWS.mock('SQS', 'createQueue', function (params, callback) { + expect(params.QueueName.slice(-'.fifo'.length) === '.fifo') + expect(params.FifoQueue === 'true') + callback(null, {QueueUrl: `https://q.amazonaws.com/123456789101/${params.QueueName}`}) + }) + AWS.mock('SQS', 'getQueueAttributes', function (params, callback) { + callback(null, { + Attributes: { + ApproximateNumberOfMessages: '0', + ApproximateNumberOfMessagesDelayed: '0', + ApproximateNumberOfMessagesNotVisible: '0', + CreatedTimestamp: '1442426968', + DelaySeconds: '0', + LastModifiedTimestamp: '1442426968', + MaximumMessageSize: '262144', + MessageRetentionPeriod: '345600', + QueueArn: 'arn:aws:sqs:us-east-1:80398EXAMPLE:MyNewQueue', + ReceiveMessageWaitTimeSeconds: '0', + RedrivePolicy: `{'deadLetterTargetArn':'arn:aws:sqs:us-east-1:80398EXAMPLE:${params.QueueName}','maxReceiveCount':1000}`, + VisibilityTimeout: '30' + } + }) + }) + var messageId = 0 + AWS.mock('SQS', 'sendMessageBatch', function (params, callback) { + callback(null, { + Failed: [], + Successful: params.Entries.map(message => ({ + MD5OfMessageAttributes: '00484c68...59e48f06', + MD5OfMessageBody: '51b0a325...39163aa0', + MessageId: 'da68f62c-0c07-4bee-bf5f-56EXAMPLE-' + messageId++ + })) + }) + }) + }) + it('should print id of enqueued messages, use 3 requests, print total count and exit 0', + cliTest(['enqueue-batch', '--verbose', '--fifo', 'test/fixtures/test-fifo01-x24.batch'], function (result, stdout, stderr) { + for (var messageId = 0; messageId < 24; messageId++) { + expect(stderr).to.contain('Enqueued job da68f62c-0c07-4bee-bf5f-56EXAMPLE-' + messageId) + } + expect(stderr).to.contain('Enqueued 24 jobs') + expect(stderr).to.contain('request 1') + expect(stderr).to.contain('request 2') + expect(stderr).to.contain('request 3') + expect(stderr).to.not.contain('request 4') + })) + }) + describe('qdone enqueue-batch test/fixtures/test-unique01-x24.batch # (queue does not exist)', function () { before(function () { AWS.mock('SQS', 'getQueueUrl', function (params, callback) {