Wrapper around aws-sdk.
When dealing with the aws-sdk a lot, some calls become very repetitive and achieving code coverage becomes tiresome. This wrapper abstracts some of the repetitive logic.
Some examples of repetitive logic are:
- having to call
.promise()
- handling of expected errors
- logging of unexpected errors
Install with npm:
$ npm install --save aws-sdk-wrap
Ensure required peer dependencies are available.
import AWS from 'aws-sdk-wrap';
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
AWS({
services: {
S3: S3Client,
'S3:CMD': { PutObjectCommand }
}
})
.call('S3:PutObjectCommand', { /* ... */ })
.then(/* ... */)
.catch(/* ... */);
where the first parameter is the service, the second parameter is the method and the third parameter are the "params" passed into the call.
Services are lazily initialized on first access.
One can access an aws-sdk
service directly by calling e.g. aws.get('S3')
.
The action
is of the format path.to.service:functionName
.
Gets the service from the underlying aws-sdk
and initialize it with the available config iff the service is not initialized yet.
Then calls the function with the passed params
(which needs to contain the appropriate parameters for the function).
The available call options
are detailed below.
Get the service from the underlying aws-sdk
without initializing it. Possible to access nested paths.
Updates the global aws config of the passed aws-sdk
via AWS.config.update
.
In most cases this should not be necessary to use.
Splits messages
into groups and calls sqs.SendMessageBatch for every group.
Batch sizes can be modified by the batchSize
option. Failed calls will be retried up to the maxRetries
option.
The available sendMessageBatch options
are detailed below.
Return the Dead Letter Queue Url configured for the passed queue url
sqs.QueueProcessor({ queueUrls: String[], steps: Array, ingestSteps: String[] })
Initialize a queue processor lambda handler with steps. Steps need to be defined in the steps directory as separate STEPNAME.js
files. Each queueUrl used by a step must be defined in queueUrls.
Each step
should export:
schema<Joi>
: Joi schemahandler<function(payload, event, stepContext): steps>
: execution logic ingesting payload and eventnext
: array of next possible stepsqueueUrl
: the queue that the step is ingested intodelay = 0
(optional): the amount of seconds that the message is delayed, defaults to undefined, i.e. the queue defaultretry = null
(optional): Declare object that is then used to instantiateRetryError
internallytimeout = 900
(optional): Timeout for individual step. Should allow for extra overhead for message management / processing and account for concurrency.groupIdFunction = undefined
(optional): Generator function for the groupId. Takes step payload as parameterdeduplicationIdFunction = undefined
(optional): Generator function for the deduplicationId. Takes step payload as parameterbefore<function(stepContext, payloads[]): steps>
(optional): called before first step is executedafter<function(stepContext): steps>
(optional):
The schema needs to define the event name under name
. New events that are to be re-queued into the queue need to be returned from the handler
, before
or after
function as an array.
Exposes:
ingest
: Method used to seed queue. Note that messages generated inside a step should simply be returned from that step.handler
: Lambda function handler that is triggered by sqsdigraph
: Visualize flow using viz-js.com.
Please see tests for example.
sqs.errors
RetryError
: Can be thrown from step logic or declared on step to trigger (code) retry logic
sqs.prepareMessage(msg: Object, opts: Object)
Prepare message object with options. Currently options include:
delaySeconds
(integer): used to set the delay for a specific message. Supersedes the corresponding batch option.groupId
(string): group id for the message, can only be set for steps that do not definegroupIdFunction
deduplicationId
(string): deduplication id for the message, can only be set for steps that do not definededuplicationIdFunction
urgent
(boolean): message is immediately enqueued if returned from before or handler, instead of at the very end
s3.putGzipObject({ bucket: String, key: String, data: Object })
Adds an object to an Amazon S3 bucket gzipped. Uses s3:putObject.
s3.getGzipJsonObject({ bucket: String, key: String, expectedErrorCodes: [String] })
Retrieves objects from Amazon S3, expecting it to be gzipped. Uses s3:getObject.
s3.headObject({ bucket: String, key: String, expectedErrorCodes: [String] })
Retrieves only the metadata from an object in an Amazon S3 bucket. Uses s3:headObject.
s3.deleteObject({ bucket: String, key: String })
Delete object from an Amazon S3 bucket at key. Uses s3:deleteObject.
s3.listObjects({ bucket: String, limit: Number, startAfter: String, stopAfter: String, prefix: String })
List objects keys in an Amazon S3 bucket. Internally this pages until the limit is reached or no more keys are available. Uses s3:listObjectsV2.
- stopAfter: If provided paging is stopped if the last key returned is larger or equal to this parameter. Only appropriate keys are then returned.
s3.decodeKey(key: String)
Returns a non-ASCII key representation for an encoded s3 key. Useful to obtain the
not-encoded key representation after calling listObjects
.
dy.Model({ name: String, attributes: Object, indices: Object, onNotFound: Function, onUpdate: Function, onCreate: Function })
Options details:
onNotFound
(Function): Return value is returned from corresponding function. Return value is returned from corresponding function.onUpdate
(Function): Executed after an item is updated successfully.onCreate
(Function): Executed after an item is created successfully. Instantiates Model.
Internally uses dynamodb-toolbox
dy.Model().createOrModify(item: Object, opts: Object)
Creates entry if key does not exist. Otherwise updates the item.
Options include (all optional):
conditions
(Object|Array): Conditions that must be met for operation to succeed.expectedErrorCodes
(Array): Provide string list of expected AWS error codes. Promise succeeds on expected error with error code as string.toReturn
(Array): Fields to return on item.
Internally uses update
dy.Model().createOrReplace(item: Object, opts: Object)
Creates entry if key does not exist. Otherwise replaces entire entry if item exists.
Options include (all optional):
conditions
(Object|Array): Conditions that must be met for operation to succeed.expectedErrorCodes
(Array): Provide string list of expected AWS error codes. Promise succeeds on expected error with error code as string.toReturn
(Array): Fields to return on item.
Internally uses put
dy.Model().modify(item: Object, opts: Object)
Edits an existing item's attributes. Can only update an item if it exists.
Options include (all optional):
conditions
(Object|Array): Conditions that must be met for operation to succeed.onNotFound
(Function): Overrides ModelonNotFound
function.expectedErrorCodes
(Array): Provide string list of expected AWS error codes. Promise succeeds on expected error with error code as string.toReturn
(Array): Fields to return on item.
Internally uses update
dy.Model().delete(key: Object, opts: Object)
Deletes an item. Can only delete an item if it exists.
Options include (all optional):
conditions
(Object|Array): Conditions that must be met for operation to succeed.onNotFound
(Function): Overrides ModelonNotFound
function.expectedErrorCodes
(Array): Provide string list of expected AWS error codes. Promise succeeds on expected error with error code as string.toReturn
(Array): Fields to return on item.
Internally uses delete
dy.Model().getItem(key: String, opts: Object)
Returns entry or null if not found.
Options include (all optional):
toReturn
(Array): Fields to return.onNotFound
(Function): Overrides ModelonNotFound
function.
Internally uses get
dy.Model().create(item: Object, opts: Object)
Creates entry if key does not exist. Otherwise errors.
Options include (all optional):
conditions
(Object|Array): Conditions that must be met for operation to succeed.onAlreadyExists
(Function): Overrides ModelonAlreadyExists
function.expectedErrorCodes
(Array): Provide string list of expected AWS error codes. Promise succeeds on expected error with error code as string.toReturn
(Array): Fields to return on item.
Internally uses put
dy.Model().query(key: String, opts: Object)
Pages through table based on primary key values.
Options include (all optional):
index
(String): Index name.limit
(Array): Maximum number of items to retrieve. If set tonull
, will exhaustively paginate.consistent
(Boolean): Enables ConsistentRead.conditions
(Object): Conditions that must be met for operation to succeed.filters
(Object): Conditions to filter the query results after execution (still executed on AWS).toReturn
(Array): Fields to return.cursor
(String): Cursor to page through query results.
Internally uses query
dy.Model().replace(item: Object, opts: Object)
Replaces entire entry if item exists. Otherwise errors.
Options include (all optional):
conditions
(Object|Array): Conditions that must be met for operation to succeed.onNotFound
(Function): Overrides ModelonNotFound
function.expectedErrorCodes
(Array): Provide string list of expected AWS error codes. Promise succeeds on expected error with error code as string.toReturn
(Array): Fields to return on item.
Internally uses put
dy.Model().scan(opts: Object)
Scans through every item in a table or secondary index.
Options include (all optional):
index
(String): Index name.limit
(Array): Maximum number of items to retrieve.consistent
(Boolean): Enables ConsistentRead.filters
(Object): Conditions to filter the query results after execution (still executed on AWS).toReturn
(Array): Fields to return.lastEvaluatedKey
(Object): Primary key of first item to be evaluated by operation.
Internally uses scan
dy.Model().schema
Returns subset of cloudformation template.
Init Options
services
Type: Object
Default: N/A
AWS Services that should be available for this utility.
logger
Type: Logger
Default: null
Provide logger. E.g. logplease or lambda-rollbar.
When an unexpected error is risen, information is logged using .error(...)
.
config
Type: Object
Default: {}
AWS Config object used to initialize the service.
This only affects initialized services. To update the global AWS config use updateGlobalConfig
.
configService
Type: Object
Default: {}
Declare service specific configurations. E.g. configService = { dynamodb: { ... } }
.
onCall
Type: Function
Default: () => {}
Callback function that is called everytime after the AWS service is called, containing all information about the call and response.
Call Options
expectedErrorCodes
Type: list
Default: []
Provide string list of expected AWS error codes. Promise succeeds on expected error with error code as string.
meta
Type: object
Default: null
Provide additional debug information for failure case.
logger
Used to overwrite global logger. Set to null
to prevent logging of errors.
SendMessageBatch Options
batchSize
Type: integer
Default: 10
Specify the size of each batch that will be sent. Should never exceed 10.
maxRetries
Type: integer
Default: 10
Number of times to retry any failed requests.
backoffFunction
Type: Function
Default: (count) => 30 * (count ** 2)
The length of time the function will wait after each failed request before retrying.
delaySeconds
Type: integer
Default: null
Set DelaySeconds option.