Create stream processors with AWS Lambda functions.
The event signature for many Lambda functions is an array containing a micro-batch of event.Records
. Functional Reactive Programming (FRP) is the cleanest approach for processing these streams. This library provides a light-weight framework for creating these stream processors. The underlying streaming library is Highland.js, replete with features like filter, map, reduce, backpressure, batching, parallel processing and more.
Support is provided for AWS EventBridge, Kinesis, DynamoDB Streams and more.
npm install aws-lambda-stream --save
aws-lambda-stream@v1 is using AWS SDK v3. If you are using AWS SDK v2, please use aws-lambda-stream@v0.51.0.
The following diagram depicts a typical scenario for using this library to implement event sourcing and cqrs patterns. State changes in the database of Service X trigger the publication of domain events and Service Y consumes the events and caches the desired data in materialized views to support its requirements.
The following examples show how to implement basic handler functions for consuming events from a Kinesis stream and a DynamoDB Stream. A key thing to note is that the code you see here is responsible for assembling the steps in the stream pipeline. The final step, toPromise
returns a Promise from the handler function. Then the promise starts consuming from the stream and the data starts flowing through the steps. The data is pulled through the steps, which provides natural backpressure (see blow). The promise will resolve once all the data has passed through all the stream steps or reject when an unhandled error is encountered.
This example processes a DynamoDB Stream and publishes domain events to an EventBridge bus, which routes the events to the likes of a Kinesis stream. The details are explained below.
import { fromDynamodb, publishToEventBridge as publish, toPromise } from 'aws-lambda-stream';
export const handler = async (event) =>
fromDynamodb(event)
.map(toEvent)
.through(publish({ batchSize: 25 }))
.through(toPromise);
This example processes a Kinesis stream and materializes the data in a single DynamoDB table. The details are explained below.
import { fromKinesis, update, toPromise } from 'aws-lambda-stream';
export const handler = async (event) =>
fromKinesis(event)
.filter(onEventType)
.map(toUpdateRequest)
.through(update({ parallel: 4 }))
.through(toPromise);
The first step of a stream processor transforms the incoming Records into a stream, like such: _(event.Records)
. The various from
functions, such as fromKinesis
, fromDynamodb
and fromEventBridge
, normialize the records into a standard Event
format. The output is a stream of UnitOfWork
objects.
Think of a uow
as an immutable object that represents the scope
of a set of variables
passing through the stream. More so than ever, we should not use global variables in stream processors. Your processor steps will add new variables to the uow
for use by downstream steps (see Mapping below). This scoping is crucial when we leverage the parallel processing and pipeline features discussed below.
interface UnitOfWork {
record: any;
event?: Event;
batch?: UnitOfWork[];
}
record
- the original recordevent
- the standardized eventbatch
- an array of uow that should succeed or fail together (see Batching and Grouping below)
The various streaming and messaging channels each have their own formats. We want to decouple the processoring logic from the choice of these technologies. Thus all published events conform to the following Event
format. This also provides for polymorphic-like processing. This standard format is also leveraged in the event-lake and micro-event-store features.
interface Event {
id: string;
type: string;
timestamp: number;
partitionKey?: string;
tags: { [key: string]: string | number };
<entity>: any;
raw?: any;
eem?: any;
s3?: { bucket: string, key: string };
}
id
- a unique deterministic valuetype
- generally the namespace, domain entity and action performedtimestamp
- epoch value when the action was performedpartitionKey
- generally the entity id or correlation id to ensure related events can be processed togethertags
- a generic place for routing information. A standard set of values is always included, such asaccount
,region
,stage
,source
,functionname
andpipeline
.<entity>
- a canonical entity that is specific to the event type. This is the contract that must be held backwards compatible. The name of this field is usually the lowerCamelCase name of the entity type, such asthing
forThing
.raw
- this is the raw data and format produced by the source of the event. This is included so that the event-lake can form a complete audit with no lost information. This is not guaranteed to be backwards compatible, so use at your own risk.eem
- envelope encryption metadata (see Encryption below)s3
- supports the claim-check pattern for large events
For a variety of reasons, we generally multiplex many event types through the same stream. I discuss this in detail in the following post: Stream Inversion & Topology. Thus, we use filter
steps with functions like onEventType
to focus in on the event types of interest and perform content based routing in general.
// all event types starting with `thing-`
const onEventType = uow =>
uow.event.type.match(/thing-*/);
Many stream processor steps map the incoming data to the format needed downstream. The results of the mapping are adorned to the uow
as a new variable. The uow
must be immutable, so we return a new uow
by cloning the original uow
with the spread operator and adorning the additional variable. There are various utils provided to assist (see below).
.map((uow) => ({
...uow,
variableName: {
// mapping logic here
}
}))
This is the function used in the Listener Function example above.
import { updateExpression, timestampCondition } from 'aws-lambda-stream';
const toUpdateRequest = (uow) => ({
...uow,
updateRequest: { // variable expected by `update` util
Key: {
pk: uow.event.thing.id,
sk: 'thing',
},
...updateExpression({
...uow.event.thing,
discriminator: 'thing',
timestamp: uow.event.timestamp,
}),
...timestampCondition(),
}
});
This is the function used in the Trigger Function example above.
const toEvent = (uow) => ({
...uow,
event: { // variable expected by the `publish` util
...event,
thing: uow.event.raw.new, // canonical
}
});
Note: It is best to perform mapping in a separate upstream step from the step that will perform the async-non-blocking-io to help maximize the potential for concurrent processing. (aka cooperative programming)
At the end of a stream processor there is usually a sink step that persists the results to a datastore or another stream. These external calls are wrapped in thin Connector
classes so that they can be easily mocked for unit testing.
These connectors are then wrapped with sink functions, such as update
and publish
, to integrate them into the streaming framework. For example, the promise returned from the connector is normalized to a stream, fault handling is provided and features such as parallel and batch are utilized.
These sink functions leverage currying to override default configuration settings, such as the batchSize and the number of parallel asyn-non-blocking-io executions.
Here is the example of using the update
function.
import { update, toPromise } from 'aws-lambda-stream';
...
.through(update({ parallel: 4 }))
.through(toPromise);
Here is the example of using the publish
function.
import { publishToEventBridge as publish, toPromise } from 'aws-lambda-stream';
...
.through(publish({ batchSize: 25 }))
.through(toPromise);
When there is an unhandled error in a Kinesis or DynamoDB stream processor, Lambda will continuously retry the function until the problem is either resolved or the event(s) in question expire(s). For transient errors, such as throttling, this may be the best course of action, because the problem may self-heal. However, if there is a poison event then we want to set it aside, by publishing a fault
event, so that the other events can be processed. I refer to this as the Stream Circuit Breaker pattern.
Here is the definition of a fault
event.
export const FAULT_EVENT_TYPE: string = 'fault';
interface FaultEvent extends Event {
err: {
name: string;
message: string;
stack: string;
};
uow: UnitOfWork;
}
err
- contains the error informationuow
- contains the state of the variables in theuow
when the error happened
When an error is thrown in a Highland.js stream, the error will skip over all the remaining steps until it is either caught by an errors step or it reaches the end of the stream and all processing stops with the error.
When you want to handle a poison event and raise a fault
event then simply catch the error, adorn the current uow
to the error and rethrow the error. Several utilities are provided to assist: throwFault
for standard try/catch, rejectWithFault
for promises, and faulty
and faultyAsyncStream
are function wrappers.
Here is an example of using throwFault
.
try {
...
} catch (err) {
throwFault(err);
}
export const throwFault = (uow) => (err) => {
// adorn the troubled uow
// for processing in the errors handler
err.uow = uow;
throw err;
};
Then we need to setup the faults
errors function and the flushFaults
stream. Fault handling is already included when using the pipelines
feature (see below).
import { faults, flushFaults, toPromise } from 'aws-lambda-stream';
...
.errors(faults)
.through(flushFaults(opt))
.through(toPromise);
The faults
function tests to see if the err
has a uow
adorned. If so then it buffers a fault
event. The flushFaults
stream will published all the buffered fault
events once all events in the batch have been processed. This ensures that the fault
events are not prematurely published in case an unhandled error occurs later in the batch.
As mentioned above, we are multiplexing many event types through a single stream for a variety of good reasons. Therefore, we want to maximize the utilization of each function invocation by acting on as many events as possible. However, we also want to maintain good clean separation of the processing logic for these different event types.
The Highland.js library allows us to fork streams, passing each fork/observer through a pipeline and merge the streams back together where they can share common tail logic like fault
handling.
Each pipeline is implemented and tested separately. Each is usually defined in its own module/file.
Here is an example of a pipeline. They are curried functions that first receive options during initialize
and then the forked stream during assemble
(see below). During assemble
they add the desired steps to the stream. Pipelines typically start with one or more filter
steps to indicate which events the steps apply to.
const pipeline1 = (options) => (stream) => stream
.filter(onEventType)
.tap(uow => options.debug('%j', uow))
.map(toUpdateRequest)
.through(update({ parallel: 4 }));
export default pipeline1;
Here is an example of a handler function that uses pipelines.
import { initialize, defaultOptions, fromKinesis, toPromise } from 'aws-lambda-stream';
import pipeline1 from './pipeline1';
import pipeline2 from './pipeline2';
const PIPELINES = {
pipeline1,
pipeline2,
};
const OPTIONS = { ...defaultOptions, ... };
export const handler = async (event) =>
initialize(PIPELINES, OPTIONS)
.assemble(fromKinesis(event))
.through(toPromise);
- First we
initialize
the pipelines with any options. - Then we
assemble
all pipelines into a forked stream. - And finally the processing of the events through the pipelines is started by
toPromise
. - The data fans out through all the pipelines and the processing concludes when all the units of work have flowed through and merged back together.
But take care to assemble a cohesive set of pipelines into a single function. For example, a listener function in a BFF service will typically consume events from Kinesis and the various pipelines will materialize
different entities from the events into a DynamoDB table to implement the CQRS pattern. Then the trigger function of the BFF service will consume events from the DynamoDB table, as mutations
are invoked in the graphql
function, and these pipelines will publish
events to the Kinesis stream to implement the Event Sourcing pattern. (see Flavors below)
Pipelines also help optimize utilization by giving a function more things to do while it waits on async-non-blocking-io calls (see Parallel below). Run
test/unit/pipelines/coop.test.js
to see an example of cooperative programming in action.
Many of the pipelines we write follow the exact same steps and only the filters and data mapping details are different. We can package these pipeline flavors into reusable pipelines that can be configured with rules
.
The following flavors are included and you can package your own into libaries.
materialize
- used inlistener
functions to materialize anentity
from anevent
into a DynamoDB single tablecdc
- used intrigger
functions topublish
events to Kinesis as entities are maintained in a DynamoDB single tablecollect
- used inlistener
functions to collect events in a micro event store for complex event processingcorrelate
- used intrigger
functions to correlate related events for complex event processingevaluate
- used intrigger
functions to run rules against correlated events and produce higher-order eventsexpire
- used intrigger
functions to emitexpired
events when a TTL is reacted- more to be ported soon
Here is an example of initializing pipelines from rules. Note that you can initialize one-off pipelines along side rule-driven pipelines.
import { initializeFrom } from 'aws-lambda-stream';
const PIPELINES = {
pipeline1,
pipeline2,
...initializeFrom(RULES),
};
Here are some example rules. The id
, flavor
, and eventType
fields are required. The remaining fields are defined by the specified pipeline flavor. You can define functions inline, but it is best to implement and unit test them separately.
import { materialize } from 'aws-lambda-stream';
const RULES = [
{
id: 'p1',
flavor: materialize,
eventType: /thing-(created|updated)/,
toUpdateRequest,
},
{
id: 'p2',
flavor: materialize,
eventType: 'thing-deleted',
toUpdateRequest: toSoftDeleteUpdateRequest,
},
{
id: 'p3',
flavor: materialize,
eventType: ['something-created', 'something-updated'],
toUpdateRequest: (uow) => ({ ... }),
},
];
id
- is a unqiue stringflavor
- the function that implements the pipeline flavoreventType
- a regex, string or array of strings used to filter on event typetoUpdateRequest
- is a mapping function expected by thematerialize
pipeline flavor
The debug library is used for logging. When using pipelines, each pipeline is given its own instance and it is passed in with the pipeline configuration options and it is attached to the uow
for easy access. They are named after the pipelines with a pl:
prefix.
This turns on debug for all pipelines.
cli> DEBUG=pl:*
This turns on debug for a specific pipeline.
cli> DEBUG=pl:pipeline2
Various print utilities are provided, such as: printStartPipeline
and printEndPipeline
.
Here are some highlights of utiltities that are available in this library or Highland.js.
Unlike imperative programming, functional reactive programming with streams provides natural backpressure because it is pull oriented. In other words, a slow downstream step will not pull the next upstream record until it is finished processing the current record. This helps us avoid overwhelming downstream services and systems.
However, this does not hold true for services, like DynamoDB, that return throttling errors. In these cases we can use the Highland.js rateLimit feature to provide explicit backpressure.
...
.rateLimit(2, 100) // 2 per 100ms
.through(update)
...
Asynchronous Non Blocking IO is probably the most important feature for optimizing throughput. The Highland.js parallel feature allows us to take full control. When using this feature, upstream steps will continue to be executed while up to N asyc requests are waiting for responses. This feature along with pipelines
allows us to optimize the utilization of every lambda invocation.
...
.map(makeSomeAsyncCall)
.parallel(8)
...
This is usually the first parameter I tweak when tuning a function. Environment variables, such as UPDATE_PARALLEL
and PARALLEL
are used for experimenting with different settings.
Here is a post on queuing theory that helps put this in perspective: What happens when you add another teller?
This feature is baked into the DynamoDB update
and Kinesis publish
utilities.
Many aws-sdk
operations support batching multiple requests into a single call. This can help increase throughput by reducing aggregate network latency.
The Highland.js batch feature allows us to easily collect a batch of requests. The toBatchUow
utility provided by this library formats these into a batch unit of work so that we can easily raise a fault
for a batch and resubmit
the batch.
...
.batch(10)
.map(toBatchUow)
.map(makeSomeAsyncCall)
...
However, be aware that most of the aws-sdk batch apis do not succeed or fail as a unit. Therefore you either have to selectively retry the failed requests and/or ensure that these calls are idempotent. Therefore I usually try to first optimize using the parellel
feature and then move onto batch
if needs be.
Another way to increase throughput is by grouping related events and thereby reducing the number external calls you will need to make. The Highland.js group feature allows us to easily group related records. The toGroupUows
utility provided by this library formats these into batched units of work so that we can easily raise a fault
for a group and resubmit
the group.
...
.group(uow => uow.event.partitionKey)
.flatMap(toGroupUows)
...
There are various other utilities in the utils folder.
now
- wrapsDate.now()
so that it can be easily mocked in unit teststtl
- calculatesttl
based on a start epoch and a number of days
fromEventBridge
- creates a stream from an EventBridge recordtoEventBridgeRecord
- is a test helper for creating an EventBridge record from a test event
Connector
- connector for the EventBridge SDKpublishToEventBridge
- stream steps for publishing events to EventBridge
Note: The default configuration, as defined in
utils/opt
, is to publisher to an AWS EventBridge custom bus and route the events to one or more streams or other targets. This approach, referred to as the Event Hub, provides maximum flexibility. The bus also routes all events to an Event Lake via AWS Firehose to create a perpetual audit of all events.
fromKinesis
- creates a stream from Kinesis recordstoKinesisRecords
- is a test helper for creating Kinesis records from test events
Connector
- connector for the Kinesis SDKpublishToKinesis
- stream steps for publishing events to Kinesis
fromDynamodb
- creates a stream from DynamoDB Stream recordstoDynamodbRecords
- is a test helper for creating DynamoDB Stream records from test events
Connector
- connector for the DynamoDB SDKupdate
- stream steps for updating rows in a single DynamoDB tableupdateExpression
- creates an expression from a plain old json object- see Mapping above
timestampCondition
- creates an expression for performing inverse oplocks
In addition:
single table
support is provided infromDynamodb
based on thediscriminator
fieldlatching
support is provided infromDynamodb
based on thelatched
fieldsoft delete
support is provided infromDynamodb
based on thedeleted
fieldglobal table
support is provided infromDynamodb
based on theaws:rep:updateregion
field- note this may not be needed in the latest version of global tables
Look for future blog posts on
dynamodb single tables
,latching
,soft-deletes
andoplock-based-joins
.
These features are intended for implementing intra-service logic.
fromS3
- creates a stream from an S3 notificationtoS3Records
- is a test helper for creating S3 records from test messages
Connector
- connector for the S3 SDKputObjectToS3
- stream steps to put an object in a bucket
These features are intended for implementing intra-service logic.
fromSns
- creates a stream from an SNS topictoSnsRecords
- is a test helper for creating SNS records from test messages
Connector
- connector for the SNS SDKpublishToSns
- stream steps to publish a message to a topic
These features are intended for implementing intra-service logic. They are frequently combined with the SNS and S3 support. See the unit tests for an example of S3 wrapped in SNS wrapped in SQS.
fromSqs
- creates a stream from an SQS queuetoSqsRecords
- is a test helper for creating SQS records from test messages
Connector
- connector for the SQS SDKsendToSqs
- stream steps to send a message to a queue
AWS Lambda provides various metrics that help us gauge the performance of our stream processor functions. For example, Iterator Age
tells us if we have an unhandled error or if we are not processing events fast enough. However, to tune our pipelines we need more fine-grained metrics. This is where the metrics feature comes into play.
Note: It will be counter intuitive at first, but your biggest performance gains will most likely come from minimizing wait time. So play close attention the
channel.wait
andio.wait
metrics.
To enable and control the metrics feature we use the process.env.METRICS
environment variable.
environment:
METRICS: true or any value
This will enable the essential metrics:
stream.batch.utilization
is the incoming batch size divided by the possibleprocess.env.BATCH_SIZE
. If the average is consistently above 70% you may be falling behind. see Little's Law and Chapter 4stream.uow.count
is the number of Units of Work (uow) flowing through the function per invocationstream.pipeline.utilization
is the percentage of work performed by a pipeline verses othersstream.channel.wait.time
is a pipeline specific version ofIterator Age
. High wait time may indicate a need for moreshards
orparallelization
.stream.pipeline.time
is the total time it takes a uow to flow through a pipeline. NOTE: this value includes thestream.channel.wait.time
to highlight the true processing latency
environment:
METRICS: emf,xray
emf
enables logging of the metrics using the CloudWatch Embedded Mertric Format- future enhancements may include similar support for
datadog
and other tools
- future enhancements may include similar support for
xray
enables AWS Xray support
environment:
METRICS: emf,xray,metrics:*
Including *
enables all detailed metrics. Or individual detailed metrics can be enabled:
METRICS: emf,xray,metrics:step
This will enable metrics for the IO steps within the pipelines:
stream.pipeline.io.wait.time
is the amount of time an IO step waits for parallel capacity. Consistently high wait times may indicate that theparallel
setting is too low or the function needs more IO capacity (aka higher functionmemorySize
)stream.pipeline.io.time
is the amount of time an IO step took once it finsihed waiting
METRICS: emf,xray,metrics:compact,metrics:step
This will add the metrics for the compact
feature if you are using it:
stream.pipeline.compact.count
is the number of uow that were compacted into one uow by partition key
METRICS: emf,xray,metrics:size
This will add detailed metrics regarding the publishing of events:
stream.pipeline.batchSize.count
is the number of events sent to the bus per batch requeststream.pipeline.eventSize.bytes
is the size of the events sent to the bus
Use process.env.NAMESPACE
to set the namespace for CloudWatch metrics
All metrics include the following tags:
account
is the name of your account, such as subsys1-nonprd, subsys3-prdregion
is self explanatory, such as us-east-1, us-west-2, etcstage
is the environment, such dev, qa, prdsource
is the service namefunctionname
is self explanatory
The pipeline
and step
specific metrics covered above include these tags as well:
pipeline
is the pipeline/rule idstep
is the step name within a pipeline, such as save, query, get, publish
The following links contain additional information:
- Highland.js documentation
- My Blog covers many topics such as System Wide Event Sourcing & CQRS
The following project templates are provided to help get your event platform up and running: https://github.com/jgilbert01/templates
- event-hub
- event-lake-s3
- event-fault-monitor
- and more...