Skip to content

Commit

Permalink
more changes for #18
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanwitt committed Mar 21, 2018
1 parent 5611b18 commit a282b54
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 20 deletions.
61 changes: 61 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Changelog

[v.1.3.0-alpha] (Unreleased)
-----------------------------

## New Features

### FIFO Option (#18)

Added a `--fifo` and `--group-id <string>` option to `equeue` and `enqueue-batch`
- Causes any new queues to be created as FIFO queues
- Causes the `.fifo` suffix to be appended to any queue names that do not explicitly have them
- Causes failed queues to take the form `${name}_failed.fifo`
- Any commands with the same `--group-id` will be worked on in the order they were received by SQS (see [FIFO docs](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/FIFO-queues.html))
- If you don't set `--group-id` it defaults to a unique id per call to `qdone`, so this means messages sent by `enqueue-batch` will always be ordered as you sent them.
- There is NO option to set group id per-message in `enqueue-batch`. Adding this feature in the future will change the format of the batch input file.
- There is NO support right now for Content Deduplication, however a Unique Message Deduplication ID is generated for each command, so retry-able errors should not result in duplicate messages.

Added a `--fifo` option to `worker`
- Causes the `.fifo` suffix to be appended to any queue names that do not explicitly have them
- When wildcard names are specified (e.g. `test_*` or `*`), worker only listens to queues with a `.fifo` suffix.
- Failed queues are still only included if `--include-failed` is set.
- Regardless of how many workers you have, FIFO commands with the same `--group-id` will only be executed by one worker at a time.
- There is NO support right now for only-once processing using the Receive Request Attempt ID

## Bug Fixes

- Fixed (#29) bug in `enqueue-batch` where SQS batches where command lines added up to > 256kb would not be split correctly and loop

## Under the hood

- Increased test coverage related to (#29)
- Added test coverage for (#18)
- Updated command line args libraries


[v1.2.0] (January 5, 2018)
---------------------------

## Bug Fixes

- #22 fixes exception deleting failed queues in paired mode when fail queue does not exist


[v1.1.0]. (December 25, 2017)
-----------------------------

## New Features

- Add experimental support for using exports in node. Exports various functions from enqueue and worker for use from node. Doesn't change the public facing interface (which is command line only).


[v1.0.0]. (August 8, 2017)
--------------------------

## New Features

- There is a new command called [`idle-queues`](https://github.com/suredone/qdone#idle-queues-usage) which can identify queues that have had no activity for a specified period of time, and delete them, if desired.
- Qdone's `worker` now [allows a child process to finish running](https://github.com/suredone/qdone#shutdown-behavior) before shutting down in response to a `SIGTERM` or `SIGINT`.
- Queues are now always resolved, and the `--always-resolve` option has been removed.
- Output to non TTYs is less chatty by default, but you can get the previous behavior by using `--verbose`, or silence output in a TTY by using `--quiet`.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
},
"scripts": {
"test": "standard && nyc mocha",
"coverage": "nyc report --reporter=text-lcov | coveralls"
"coverage": "nyc report --reporter=text-lcov | coveralls",
"standard": "standard"
},
"author": "Ryan Witt",
"license": "MIT",
Expand Down
3 changes: 2 additions & 1 deletion src/cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ exports.worker = function worker (argv) {
{ name: 'kill-after', alias: 'k', type: Number, defaultValue: 30, description: 'Kill job after this many seconds [default: 30]' },
{ name: 'wait-time', alias: 'w', type: Number, defaultValue: 20, description: 'Listen at most this long on each queue [default: 20]' },
{ name: 'include-failed', type: Boolean, description: 'When using \'*\' do not ignore fail queues.' },
{ name: 'drain', type: Boolean, description: 'Run until no more work is found and quit. NOTE: if used with --wait-time 0, this option will not drain queues.' }
{ name: 'drain', type: Boolean, description: 'Run until no more work is found and quit. NOTE: if used with --wait-time 0, this option will not drain queues.' },
{ name: 'fifo', alias: 'f', type: Boolean, description: 'Automatically adds .fifo to queue names. Only listens to fifo queues when using \'*\'.' }
].concat(globalOptionDefinitions)

const usageSections = [
Expand Down
18 changes: 6 additions & 12 deletions src/enqueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ function createFailQueue (fqueue, fqname, deadLetterTargetArn, options) {
Attributes: {},
QueueName: fqname
}
if (options.fifo) {
params.Attributes.FifoQueue = 'true'
params.QueueName = params.QueueName + '.fifo'
}
if (options.fifo) params.Attributes.FifoQueue = 'true'
return sqs
.createQueue(params)
.promise()
Expand All @@ -37,10 +34,7 @@ function createQueue (queue, qname, deadLetterTargetArn, options) {
},
QueueName: qname
}
if (options.fifo) {
params.Attributes.FifoQueue = 'true'
params.QueueName = params.QueueName + '.fifo'
}
if (options.fifo) params.Attributes.FifoQueue = 'true'
return sqs
.createQueue(params)
.promise()
Expand Down Expand Up @@ -149,7 +143,6 @@ function flushMessages (qrl, options) {
if (data.Failed && data.Failed.length) {
const err = new Error('One or more message failures: ' + JSON.stringify(data.Failed))
err.Failed = data.Failed
debug(err)
throw err
}
// If we actually managed to flush any of them
Expand Down Expand Up @@ -226,8 +219,9 @@ function getQrl (queue, qname, fqueue, fqname, options) {
exports.enqueue = function enqueue (queue, command, options) {
debug('enqueue(', queue, command, ')')

queue = qrlCache.normalizeQueueName(queue, options)
const qname = options.prefix + queue
const fqueue = queue + options['fail-suffix']
const fqueue = qrlCache.normalizeFailQueueName(queue, options)
const fqname = options.prefix + fqueue

// Now that we have the queue, send our message
Expand All @@ -243,10 +237,10 @@ exports.enqueueBatch = function enqueueBatch (pairs, options) {
debug('enqueueBatch(', pairs, ')')

function unpackPair (pair) {
const queue = pair.queue
const queue = qrlCache.normalizeQueueName(pair.queue, options)
const command = pair.command
const qname = options.prefix + queue
const fqueue = queue + options['fail-suffix']
const fqueue = qrlCache.normalizeFailQueueName(queue, options)
const fqname = options.prefix + fqueue
return { queue, qname, fqueue, fqname, command }
}
Expand Down
26 changes: 26 additions & 0 deletions src/qrlCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,28 @@ function _get (qname) {
return qcache[qname]
}

//
// Normalizes a queue name to end with .fifo if options.fifo is set
//
const fifoSuffix = '.fifo'
exports.normalizeQueueName = function normalizeQueueName (qname, options) {
const sliced = qname.slice(0, -fifoSuffix.length)
const suffix = qname.slice(-fifoSuffix.length)
const base = suffix === fifoSuffix ? sliced : qname
return base + (options.fifo && qname.slice(-1) !== '*' ? fifoSuffix : '')
}

//
// Normalizes fail queue name, appending both --fail-suffix and .fifo depending on options
//
exports.normalizeFailQueueName = function normalizeFailQueueName (qname, options) {
qname = exports.normalizeQueueName(qname, {fifo: false}) // strip .fifo if it is there
const sliced = qname.slice(0, -options['fail-suffix'].length)
const suffix = qname.slice(-options['fail-suffix'].length)
const base = suffix === options['fail-suffix'] ? sliced : qname // strip --fail-suffix if it is there
return (base + options['fail-suffix']) + (options.fifo ? fifoSuffix : '')
}

//
// Clear cache
//
Expand Down Expand Up @@ -85,6 +107,10 @@ exports.getQnameUrlPairs = function getQnameUrlPairs (qnames, options) {
.promise()
.then(function (data) {
debug('listQueues return', data)
if (options.fifo) {
// Remove non-fifo queues
data.QueueUrls = data.QueueUrls.filter(url => url.slice(-fifoSuffix.length) === fifoSuffix)
}
return ingestQRLs(data.QueueUrls || [])
})
: exports
Expand Down
4 changes: 2 additions & 2 deletions src/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ function pollForJobs (qname, qrl, options) {
//
exports.listen = function listen (queues, options) {
if (options.verbose) console.error(chalk.blue('Resolving queues: ') + queues.join(' '))
const qnames = queues.map(function (queue) { return options.prefix + queue })
const qnames = queues.map(function (queue) { return options.prefix + qrlCache.normalizeQueueName(queue, options) })
return qrlCache
.getQnameUrlPairs(qnames, options)
.then(function (entries) {
Expand All @@ -170,7 +170,7 @@ exports.listen = function listen (queues, options) {
// Don't listen to fail queues... unless user wants to
entries = entries
.filter(function (entry) {
const suf = options['fail-suffix']
const suf = options['fail-suffix'] + (options.fifo ? '.fifo' : '')
return options['include-failed'] ? true : entry.qname.slice(-suf.length) !== suf
})

Expand Down
24 changes: 24 additions & 0 deletions test/fixtures/test-fifo01-x24.batch
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
test.fifo true
Loading

0 comments on commit a282b54

Please sign in to comment.