From 548da52f2a0df0fd9078488d1d7240db50306594 Mon Sep 17 00:00:00 2001 From: Tony Huang Date: Sun, 12 Feb 2017 00:17:19 +0800 Subject: [PATCH] Added event source mapping feature. --- bin/node-lambda | 3 + lib/event_sources.json.example | 8 ++ lib/main.js | 157 +++++++++++++++++++++++++++++++-- 3 files changed, 162 insertions(+), 6 deletions(-) create mode 100644 lib/event_sources.json.example diff --git a/bin/node-lambda b/bin/node-lambda index 9044c896..73692cbb 100755 --- a/bin/node-lambda +++ b/bin/node-lambda @@ -12,6 +12,7 @@ dotenv.load(); var AWS_ENVIRONMENT = process.env.AWS_ENVIRONMENT || ''; var CONFIG_FILE = process.env.CONFIG_FILE || ''; +var EVENT_SOURCE_FILE = process.env.EVENT_SOURCE_FILE || 'event_sources.json'; var EXCLUDE_GLOBS = process.env.EXCLUDE_GLOBS || ''; var AWS_ACCESS_KEY_ID = process.env.AWS_ACCESS_KEY_ID; var AWS_SECRET_ACCESS_KEY = process.env.AWS_SECRET_ACCESS_KEY; @@ -61,6 +62,8 @@ program .option('-A, --packageDirectory [' + PACKAGE_DIRECTORY + ']', 'Local Package Directory', PACKAGE_DIRECTORY) .option('-f, --configFile [' + CONFIG_FILE + ']', 'Path to file holding secret environment variables (e.g. "deploy.env")', CONFIG_FILE) + .option('-S, --eventSourceFile [' + EVENT_SOURCE_FILE + ']', + 'Path to file holding event source mapping variables (e.g. "event_sources.json")', EVENT_SOURCE_FILE) .option('-x, --excludeGlobs [' + EXCLUDE_GLOBS + ']', 'Space-separated glob pattern(s) for additional exclude files (e.g. "event.json dotenv.sample")', EXCLUDE_GLOBS) .option('-D, --prebuiltDirectory [' + PREBUILT_DIRECTORY + ']', 'Prebuilt directory', PREBUILT_DIRECTORY) diff --git a/lib/event_sources.json.example b/lib/event_sources.json.example new file mode 100644 index 00000000..8944af9f --- /dev/null +++ b/lib/event_sources.json.example @@ -0,0 +1,8 @@ +[ + { + "EventSourceArn": "your event source arn", + "StartingPosition": "LATEST", + "BatchSize": 100, + "Enabled": true + } +] \ No newline at end of file diff --git a/lib/main.js b/lib/main.js index 3f43f941..09d85ca6 100644 --- a/lib/main.js +++ b/lib/main.js @@ -32,6 +32,7 @@ Lambda.prototype.setup = function (program) { this._createSampleFile(program.eventFile, 'event.json'); this._createSampleFile('deploy.env', 'deploy.env'); this._createSampleFile(program.contextFile, 'context.json'); + this._createSampleFile('event_sources.json', 'event_sources.json'); console.log('Setup done. Edit the .env, deploy.env, ' + program.contextFile + ' and ' + program.eventFile + ' files as needed.'); }; @@ -146,10 +147,24 @@ Lambda.prototype._params = function (program, buffer) { Variables: config } } - + return params; }; +Lambda.prototype._eventSourceList = function (program) { + var eventSourceList = [] + if (program.eventSourceFile) { + try { + eventSourceList = fs.readJsonSync(program.eventSourceFile); + } catch(err) { + eventSourceList = []; + throw err; + } + } + + return eventSourceList; +}; + /** * @deprecated */ @@ -402,6 +417,93 @@ Lambda.prototype._buildAndArchive = function (program, archive_callback) { }); }; +Lambda.prototype._listEventSourceMappings = function (lambda, params, cb) { + return lambda.listEventSourceMappings(params, function (err, data) { + var eventSourceMappings = []; + if (!err && data && data.EventSourceMappings) { + eventSourceMappings = data.EventSourceMappings; + } + return cb(err, eventSourceMappings); + }); +}; + +Lambda.prototype._updateEventSources = function (lambda, functionName, existingEventSourceList, eventSourceList, cb) { + var updateEventSourceList = []; + // Checking new and update event sources + for (var i in eventSourceList) { + var isExisting = false; + for (var j in existingEventSourceList) { + if (eventSourceList[i]['EventSourceArn'] === existingEventSourceList[j]['EventSourceArn']) { + isExisting = true; + updateEventSourceList.push({ + 'type': 'update', + 'FunctionName': functionName, + 'Enabled': eventSourceList[i]['Enabled'], + 'BatchSize': eventSourceList[i]['BatchSize'], + 'UUID': existingEventSourceList[j]['UUID'] + }); + break; + } + } + + // If it is new source + if (!isExisting) { + updateEventSourceList.push({ + 'type': 'create', + 'FunctionName': functionName, + 'EventSourceArn': eventSourceList[i]['EventSourceArn'], + 'Enabled': eventSourceList[i]['Enabled'] ? eventSourceList[i]['Enabled'] : false, + 'BatchSize': eventSourceList[i]['BatchSize'] ? eventSourceList[i]['BatchSize'] : 100, + 'StartingPosition': eventSourceList[i]['StartingPosition'] ? eventSourceList[i]['StartingPosition'] : 'LATEST', + }); + } + } + + // Checking delete event sources + for (var i in existingEventSourceList) { + var isExisting = false; + for (var j in eventSourceList) { + if (eventSourceList[j]['EventSourceArn'] === existingEventSourceList[i]['EventSourceArn']) { + isExisting = true; + break; + } + } + + // If delete the source + if (!isExisting) { + updateEventSourceList.push({ + 'type': 'delete', + 'UUID': existingEventSourceList[i]['UUID'] + }); + } + } + + return async.map(updateEventSourceList, function (updateEventSource, _cb) { + switch(updateEventSource['type']) { + case 'create': + delete updateEventSource['type']; + lambda.createEventSourceMapping(updateEventSource, function (err, data) { + return _cb(err, data); + }); + break; + case 'update': + delete updateEventSource['type']; + lambda.updateEventSourceMapping(updateEventSource, function (err, data) { + return _cb(err, data); + }); + break; + case 'delete': + delete updateEventSource['type']; + lambda.deleteEventSourceMapping(updateEventSource, function (err, data) { + return _cb(err, data); + }); + break; + } + }, function(err, results) { + return cb(err, results); + }); +}; + Lambda.prototype.package = function (program) { var _this = this; if (!program.packageDirectory) { @@ -451,6 +553,9 @@ Lambda.prototype.deploy = function (program) { console.log('=> Reading zip file to memory'); var params = _this._params(program, buffer); + console.log('=> Reading event source file to memory'); + var eventSourceList = _this._eventSourceList(program); + async.map(regions, function (region, cb) { console.log('=> Uploading zip file to AWS Lambda ' + region + ' with parameters:'); console.log(params); @@ -478,20 +583,60 @@ Lambda.prototype.deploy = function (program) { apiVersion: '2015-03-31' }); + // Checking function return lambda.getFunction({ 'FunctionName': params.FunctionName }, function (err) { - if(err) { - return _this._uploadNew(lambda, params, cb); + if (err) { + return _this._uploadNew(lambda, params, function(err, results) { + if (err) { + throw err; + } else { + console.log('=> Zip file(s) done uploading. Results follow: '); + console.log(results); + + // Updating event source(s) + _this._updateEventSources(lambda, params.FunctionName, [], eventSourceList, function(err, results) { + cb(null, results); + }); + } + }); + } else { + _this._listEventSourceMappings(lambda, { + 'FunctionName': params.FunctionName + }, function(err, existingEventSourceList) { + if (err) { + throw err; + } else { + return async.parallel([ + function(_callback) { + _this._uploadExisting(lambda, params, function(err, results) { + if (err) { + throw err; + } else { + console.log('=> Zip file(s) done uploading. Results follow: '); + console.log(results); + _callback(err, results); + } + }) + }, + function(_callback) { + _this._updateEventSources(lambda, params.FunctionName, existingEventSourceList, eventSourceList, function(err, results) { + _callback(err, results) + }) + } + ], function(err, results) { + cb(err, results); + }); + } + }); } - - return _this._uploadExisting(lambda, params, cb); }); }, function (err, results) { if (err) { throw err; } else { - console.log('=> Zip file(s) done uploading. Results follow: '); + console.log('=> All tasks done. Results follow: '); console.log(results); } });