Skip to content

Commit

Permalink
feat: added QueueProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
simlu committed Sep 11, 2019
1 parent 6f68048 commit 05552d5
Show file tree
Hide file tree
Showing 21 changed files with 401 additions and 125 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,22 @@ Get the service from the underlying `aws-sdk` without initializing it. Possible
Updates the global aws config of the underlying `aws-sdk` via `AWS.config.update`.
In most cases this should not be necessary to use.

#### sendMessageBatch(msgs: Array, queueUrl: String, options: Object = {})
#### sqs.sendMessageBatch(msgs: Array, queueUrl: String, options: Object = {})

Splits `msgs` into groups and calls [sqs.SendMessageBatch](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#sendMessageBatch-property) for every group.
Batch sizes can be modified by the `batchSize` option. Failed calls will be retried up to the `maxRetries` option.
The available sendMessageBatch `options` are detailed below.

#### sqs.QueueProcessor({ queueUrl: String, stepsDir: String })

Initialize a queue processor lambda handler with steps. Steps need to be defined in the steps directory as separate `STEPNAME.js` files.

Each `step` needs to export `schema` (Joi schema), `handler` (execution logic ingesting payload and event) and `next` (array of next possible steps).

The schema needs to define the event name under `name`. New events that are to be re-queued into the queue need to be returned from the `handler` function as an array.

Please see tests for example.

### Init Options

#### logger
Expand Down
8 changes: 5 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@
"eslint-plugin-import": "2.18.2",
"eslint-plugin-json": "1.4.0",
"eslint-plugin-markdown": "1.0.0",
"eslint-plugin-mocha": "6.1.0",
"eslint-plugin-mocha": "6.1.1",
"js-gardener": "2.0.83",
"node-tdd": "2.5.2",
"node-tdd": "2.5.3",
"nyc": "14.1.1",
"semantic-release": "15.13.24"
},
Expand Down Expand Up @@ -101,8 +101,10 @@
],
"dependencies": {
"joi-strict": "1.0.9",
"lambda-async": "1.0.1",
"lodash.chunk": "4.2.0",
"lodash.get": "4.4.2",
"object-hash": "1.3.1"
"object-hash": "1.3.1",
"smart-fs": "1.9.11"
}
}
11 changes: 8 additions & 3 deletions src/module/sqs.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
const sendMessageBatch = require('./sqs/send-message-batch');
const QueueProcessor = require('./sqs/queue-processor');

module.exports = ({ call, getService, logger }) => ({
sendMessageBatch: sendMessageBatch({ call, getService, logger })
});
module.exports = ({ call, getService, logger }) => {
const smb = sendMessageBatch({ call, getService, logger });
return {
sendMessageBatch: smb,
QueueProcessor: QueueProcessor({ sendMessageBatch: smb })
};
};
55 changes: 55 additions & 0 deletions src/module/sqs/queue-processor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
const fs = require('smart-fs');
const path = require('path');
const Joi = require('joi-strict');
const { wrap } = require('lambda-async');

module.exports = ({ sendMessageBatch }) => ({ queueUrl, stepsDir }) => {
const steps = fs
.readdirSync(stepsDir)
.reduce((p, step) => Object.assign(p, {
[step.slice(0, -3)]: (() => {
const { schema, handler, next } = fs.smartRead(path.join(stepsDir, step));
return {
handler: (payload, event) => {
Joi.assert(payload, schema, `Invalid payload received for step: ${payload.name}`);
return handler(payload, event);
},
schema,
next
};
})()
}), {});

return wrap((event) => {
if (event.Records.length !== 1) {
throw new Error(
'Lambda SQS subscription is mis-configured! '
+ 'Please only process one event at a time for retry resilience.'
);
}
return Promise.all(event.Records.map(async (e) => {
const payload = JSON.parse(e.body);
if (!(payload instanceof Object && !Array.isArray(payload))) {
throw new Error(`Invalid Event Received: ${e.body}`);
}
const step = steps[payload.name];
if (payload.name === undefined) {
throw new Error('Received step event that is missing "name" property.');
}
if (step === undefined) {
throw new Error(`Invalid step provided: ${payload.name}`);
}
const messages = await step.handler(payload, e);
if (messages.length !== 0 && step.next.length === 0) {
throw new Error(`No output allowed for step: ${payload.name}`);
}
Joi.assert(
messages,
Joi.array().items(step.next.map((n) => steps[n].schema)),
`Unexpected/Invalid next step(s) returned for: ${payload.name}`
);
await sendMessageBatch(messages, queueUrl);
return payload;
}));
});
};
File renamed without changes.
87 changes: 87 additions & 0 deletions test/module/sqs/queue-processor.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
const expect = require('chai').expect;
const { describe } = require('node-tdd');
const index = require('../../../src/index');

describe('Testing QueueProcessor', {
useNock: true,
record: console,
envVarsFile: 'config.env.yml'
}, () => {
let aws;
let processor;
let executor;
before(() => {
aws = index({ logger: console });
processor = aws.sqs.QueueProcessor({
queueUrl: process.env.QUEUE_URL,
stepsDir: `${__filename}_steps`
});
executor = (records) => new Promise((resolve, reject) => {
processor({
Records: records.map((r) => ({ body: JSON.stringify(r) }))
}, {}, (err, resp) => {
if (err !== null) {
reject(err);
} else {
resolve(resp);
}
});
});
});

it('Testing step1 -> [step2]', async () => {
const result = await executor([{ name: 'step1' }]);
expect(result).to.deep.equal([{ name: 'step1' }]);
});

it('Testing step2 -> []', async () => {
const result = await executor([{ name: 'step2' }]);
expect(result).to.deep.equal([{ name: 'step2' }]);
});

it('Testing bad-output', async ({ capture }) => {
const result = await capture(() => executor([{ name: 'bad-output' }]));
expect(result.message).to.equal(
'Unexpected/Invalid next step(s) returned for: bad-output '
+ '[\n {\n "name" \u001b[31m[1]\u001b[0m: "unknown-step"\n }\n]'
+ '\n\u001b[31m\n[1] "name" must be one of [step2]\u001b[0m'
);
});

it('Testing disallowed-output', async ({ capture }) => {
const result = await capture(() => executor([{ name: 'disallowed-output' }]));
expect(result.message).to.equal('No output allowed for step: disallowed-output');
});

it('Testing unknown-step', async ({ capture }) => {
const result = await capture(() => executor([{ name: 'unknown-step' }]));
expect(result.message).to.equal('Invalid step provided: unknown-step');
});

it('Testing unnamed-step', async ({ capture }) => {
const result = await capture(() => executor([{}]));
expect(result.message).to.equal('Received step event that is missing "name" property.');
});

it('Testing invalid event format', async ({ capture }) => {
const result = await capture(() => executor([['element']]));
expect(result.message).to.equal('Invalid Event Received: ["element"]');
});

it('Testing invalid step payload', async ({ capture }) => {
const result = await capture(() => executor([{ name: 'step1', unexpected: 'value' }]));
expect(result.message).to.equal(
'Invalid payload received for step: step1 '
+ '{\n "name": "step1",\n "unexpected" \u001b[31m[1]\u001b[0m: "value"\n}\n\u001b[31m\n'
+ '[1] "unexpected" is not allowed\u001b[0m'
);
});

it('Testing multiple records', async ({ capture }) => {
const result = await capture(() => executor([{ name: 'step1' }, { name: 'step1' }]));
expect(result.message).to.equal(
'Lambda SQS subscription is mis-configured! '
+ 'Please only process one event at a time for retry resilience.'
);
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[
{
"scope": "https://sqs.us-west-2.amazonaws.com:443",
"method": "POST",
"path": "/",
"body": "Action=SendMessageBatch&QueueUrl=https%3A%2F%2Fsqs.us-west-2.amazonaws.com%2FXXXXXXXXXXXX%2FqueueUrl&SendMessageBatchRequestEntry.1.Id=9207814704912e326ed773e40027e802bf31d830&SendMessageBatchRequestEntry.1.MessageBody=%7B%22name%22%3A%22step2%22%7D&Version=2012-11-05",
"status": 200,
"response": "<?xml version=\"1.0\"?><SendMessageBatchResponse xmlns=\"http://queue.amazonaws.com/doc/2012-11-05/\"><SendMessageBatchResult><SendMessageBatchResultEntry><Id>9207814704912e326ed773e40027e802bf31d830</Id><MessageId>8085f0e3-a423-447e-9c89-3463acc149ef</MessageId><MD5OfMessageBody>4e85d0a6c2a615ac701dc2f1f05f58e6</MD5OfMessageBody></SendMessageBatchResultEntry></SendMessageBatchResult><ResponseMetadata><RequestId>8d1d8e98-dc7e-5fc2-9c83-db8791125e19</RequestId></ResponseMetadata></SendMessageBatchResponse>"
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[]
9 changes: 9 additions & 0 deletions test/module/sqs/queue-processor.spec.js_steps/bad-output.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const Joi = require('joi-strict');

module.exports.schema = Joi.object().keys({
name: Joi.string().valid('bad-output')
});

module.exports.handler = async (payload, event) => [{ name: 'unknown-step' }];

module.exports.next = ['step2'];
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const Joi = require('joi-strict');

module.exports.schema = Joi.object().keys({
name: Joi.string().valid('disallowed-output')
});

module.exports.handler = async (payload, event) => [{ name: 'step2' }];

module.exports.next = [];
9 changes: 9 additions & 0 deletions test/module/sqs/queue-processor.spec.js_steps/step1.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const Joi = require('joi-strict');

module.exports.schema = Joi.object().keys({
name: Joi.string().valid('step1')
});

module.exports.handler = async (payload, event) => [{ name: 'step2' }];

module.exports.next = ['step2'];
9 changes: 9 additions & 0 deletions test/module/sqs/queue-processor.spec.js_steps/step2.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const Joi = require('joi-strict');

module.exports.schema = Joi.object().keys({
name: Joi.string().valid('step2')
});

module.exports.handler = async (payload, event) => [];

module.exports.next = [];
6 changes: 5 additions & 1 deletion test/module/sqs/send-message-batch.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ const { describe } = require('node-tdd');
const index = require('../../../src/index');
const { SendMessageBatchError } = require('../../../src/resources/errors');

describe('Testing sendMessageBatch', { useNock: true, record: console }, () => {
describe('Testing sendMessageBatch', {
useNock: true,
record: console,
envVarsFile: 'config.env.yml'
}, () => {
let aws;
before(() => {
aws = index({ logger: console });
Expand Down
Loading

0 comments on commit 05552d5

Please sign in to comment.