Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add-source-side-claim-check-support #383

Merged
merged 7 commits into from
Sep 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "aws-lambda-stream",
"version": "1.0.24",
"version": "1.0.25",
"description": "Create stream processors with AWS Lambda functions.",
"keywords": [
"aws",
Expand Down
5 changes: 3 additions & 2 deletions src/from/kinesis.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import _ from 'highland';

import {
faulty, decompress, compress, claimcheck, options,
faulty, decompress, compress, options,
} from '../utils';
import { outSkip } from '../filters';
import { redeemClaimcheck } from '../queries';

export const fromKinesis = (event) =>

Expand All @@ -26,7 +27,7 @@ export const fromKinesis = (event) =>
})))
.tap((uow) => options().metrics?.adornKinesisMetrics(uow, event))
.filter(outSkip)
.through(claimcheck());
.through(redeemClaimcheck());

// test helper
export const toKinesisRecords = (events, approximateArrivalTimestamp) => ({
Expand Down
5 changes: 3 additions & 2 deletions src/from/sqs.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import _ from 'highland';

import {
faulty, decompress, compress, claimcheck, options,
faulty, decompress, compress, options,
} from '../utils';
import { outSkip } from '../filters';
import { redeemClaimcheck } from '../queries';

// this from function is intended for use with intra-service messages
// as opposed to consuming inter-servic events
Expand All @@ -30,7 +31,7 @@ export const fromSqsEvent = (event) => _(event.Records)
})))
.tap((uow) => options().metrics?.adornSqsMetrics(uow, event))
.filter(outSkip)
.through(claimcheck());
.through(redeemClaimcheck());

// test helper
// https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
Expand Down
8 changes: 4 additions & 4 deletions src/utils/claimcheck.js → src/queries/claimcheck.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { decompress } from './compression';
import { faulty } from './faults';
import { getObjectFromS3 } from '../queries/s3';
import { decompress } from '../utils/compression';
import { faulty } from '../utils/faults';
import { getObjectFromS3 } from './s3';

// claim-check pattern support
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/StoreInLibrary.html

export const claimcheck = (opt = {}) => (s) => s // eslint-disable-line import/prefer-default-export
export const redeemClaimcheck = (opt = {}) => (s) => s // eslint-disable-line import/prefer-default-export
.map(faulty((uow) => ({
...uow,
getClaimCheckRequest: uow.event.s3 ? {
Expand Down
1 change: 1 addition & 0 deletions src/queries/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './claimcheck';
export * from './dynamodb';
export * from './secretsmgr';
export * from './s3';
64 changes: 64 additions & 0 deletions src/sinks/claimcheck.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import _ from 'highland';

import { putObjectToS3 } from './s3';
import { now } from '../utils/time';

// claim-check pattern support
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/StoreInLibrary.html

const formatKey = (event) => {
const d = new Date(now());
// region/claimchecks/YYYY/MM/DD/HH/id
return `${process.env.AWS_REGION}/claimchecks/${d.getUTCFullYear()}/${d.getUTCMonth()}/${d.getUTCDate()}/${d.getUTCHours()}/${event.id}`;
};

export const toClaimcheckEvent = (event, bucket) => ({
id: event.id,
type: event.type,
partitionKey: event.partitionKey,
timestamp: event.timestamp,
tags: event.tags,
s3: {
bucket,
key: formatKey(event),
},
});

export const toPutClaimcheckRequest = (event, Bucket) => ({
Bucket,
Key: formatKey(event),
Body: JSON.stringify(event),
});

// designed to run right after batchWithSize
export const storeClaimcheck = ({
id: pipelineId,
// usually the s3 event lake bucket
claimCheckBucketName = process.env.CLAIMCHECK_BUCKET_NAME,
putClaimcheckRequest = 'putClaimcheckRequest',
putClaimcheckResponse = 'putClaimcheckResponse',
parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8,
debug,
step,
...opt
}) => {
// if we don't have a bucket we can't claimcheck
if (!claimCheckBucketName) return (s) => s;

return (s) => s
.flatMap((batch) =>
_(batch)
.through(putObjectToS3({
debug,
id: pipelineId,
bucketName: claimCheckBucketName,
putRequestField: putClaimcheckRequest,
putResponseField: putClaimcheckResponse,
parallel,
step,
...opt,
}))
// reassemble the batch
.collect());
// minus any faults during putObject
};
2 changes: 2 additions & 0 deletions src/sinks/eventbridge.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { debug as d } from '../utils/print';
import { adornStandardTags } from '../utils/tags';
import { compress } from '../utils/compression';
import { ratelimit } from '../utils/ratelimit';
import { storeClaimcheck } from './claimcheck';

export const publishToEventBridge = ({ // eslint-disable-line import/prefer-default-export
id: pipelineId,
Expand Down Expand Up @@ -74,6 +75,7 @@ export const publishToEventBridge = ({ // eslint-disable-line import/prefer-defa
requestField: publishRequestField,
debug,
}))
.through(storeClaimcheck(opt))
.map(toBatchUow)
.map(toPublishRequest)

Expand Down
44 changes: 29 additions & 15 deletions src/utils/batch.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import _ from 'highland';
import isFunction from 'lodash/isFunction';
import { toClaimcheckEvent, toPutClaimcheckRequest } from '../sinks/claimcheck';

// used after highland batch step
export const toBatchUow = (batch) => ({ batch });
Expand Down Expand Up @@ -47,7 +48,11 @@ export const compact = (rule) => {
})));
};

export const batchWithSize = (opt) => {
export const batchWithSize = ({
claimCheckBucketName = process.env.CLAIMCHECK_BUCKET_NAME,
putClaimcheckRequest = 'putClaimcheckRequest',
...opt
}) => {
let batched = [];
let sizes = [];

Expand All @@ -67,25 +72,34 @@ export const batchWithSize = (opt) => {
if (!x[opt.requestEntryField]) {
push(null, [x]);
} else {
const size = Buffer.byteLength(JSON.stringify(x[opt.requestEntryField]));
let size = Buffer.byteLength(JSON.stringify(x[opt.requestEntryField]));
if (size > opt.maxRequestSize) {
logMetrics([x], [size], opt);
const error = new Error(`Request size: ${size}, exceeded max: ${opt.maxRequestSize}`);
error.uow = x;
push(error);
} else {
const totalSize = sizes.reduce((a, c) => a + c, size);

if (totalSize <= opt.maxRequestSize && batched.length + 1 <= opt.batchSize) {
batched.push(x);
sizes.push(size);
if (claimCheckBucketName) {
// setup claim check
x[putClaimcheckRequest] = toPutClaimcheckRequest(x[opt.requestEntryField], claimCheckBucketName);
x[opt.requestEntryField] = toClaimcheckEvent(x[opt.requestEntryField], claimCheckBucketName);
size = Buffer.byteLength(JSON.stringify(x[opt.requestEntryField]));
} else {
logMetrics(batched, sizes, opt);
push(null, batched);
batched = [x];
sizes = [size];
const error = new Error(`Request size: ${size}, exceeded max: ${opt.maxRequestSize}`);
error.uow = x;
push(error);
next();
return;
}
}

const totalSize = sizes.reduce((a, c) => a + c, size);

if (totalSize <= opt.maxRequestSize && batched.length + 1 <= opt.batchSize) {
batched.push(x);
sizes.push(size);
} else {
logMetrics(batched, sizes, opt);
push(null, batched);
batched = [x];
sizes = [size];
}
}

next();
Expand Down
1 change: 0 additions & 1 deletion src/utils/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
export * from './batch';
export * from './claimcheck';
export * from './compression';
export * from './encryption';
export * from './faults';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import sinon from 'sinon';
import _ from 'highland';

import debug from 'debug';
import { claimcheck } from '../../../src/utils/claimcheck';
import { redeemClaimcheck } from '../../../src/queries/claimcheck';

import Connector from '../../../src/connectors/s3';

describe('utils/claimcheck.js', () => {
describe('queries/claimcheck.js', () => {
afterEach(sinon.restore);

it('should get event', (done) => {
Expand Down Expand Up @@ -36,7 +36,7 @@ describe('utils/claimcheck.js', () => {
}];

_(uows)
.through(claimcheck())
.through(redeemClaimcheck())
.collect()
.tap((collected) => {
// console.log(JSON.stringify(collected, null, 2));
Expand Down Expand Up @@ -69,7 +69,7 @@ describe('utils/claimcheck.js', () => {
});

it('should use a pipeline label to cache regional redirects configuration', () => {
claimcheck();
redeemClaimcheck();
const testClient = new Connector({ debug: debug('test'), bucketName: 'test-bucket', pipelineId: 'handler:claimcheck' }).client;
expect(testClient.config.followRegionRedirects).to.eq(true);
});
Expand Down
123 changes: 123 additions & 0 deletions test/unit/sinks/claimcheck.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
import 'mocha';
import { expect } from 'chai';
import sinon from 'sinon';
import _ from 'highland';

import { storeClaimcheck } from '../../../src/sinks/claimcheck';
import { batchWithSize } from '../../../src/utils/batch';
import * as time from '../../../src/utils/time';

import Connector from '../../../src/connectors/s3';

describe('sinks/claimcheck.js', () => {
afterEach(sinon.restore);

it('should handle oversized requests', (done) => {
sinon.stub(time, 'now').returns(new Date(1726854864001));
const spy = sinon.spy();
const uows = [
{
publishRequestEntry: { // size = 23
id: '1',
body: 'xxx',
},
},
{
publishRequestEntry: { // size = 33
id: '2',
body: 'xxxxxxxxxxxxx',
},
},
{
publishRequestEntry: { // size = 140
id: '3',
body: 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
},
},
];

_(uows)
.consume(batchWithSize({
batchSize: 10,
maxRequestSize: 100,
requestEntryField: 'publishRequestEntry',
claimCheckBucketName: 'event-lake-s3',
}))
.errors(spy)
.collect()
.tap((collected) => {
// console.log(JSON.stringify(collected, null, 2));
expect(collected.length).to.equal(2);
expect(spy).to.not.have.been.called;
expect(collected[1]).to.deep.equal([
{
publishRequestEntry: { // size = 39
id: '3',
type: undefined,
partitionKey: undefined,
timestamp: undefined,
tags: undefined,
s3: {
bucket: 'event-lake-s3',
key: 'us-west-2/claimchecks/2024/8/20/17/3',
},
},
putClaimcheckRequest: {
Bucket: 'event-lake-s3',
Key: 'us-west-2/claimchecks/2024/8/20/17/3',
Body: '{\"id\":\"3\",\"body\":\"xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\"}',
},
},
]);
})
.done(done);
});

it('should store event', (done) => {
const uows = [
{}, // not too large
{ // too large
putClaimcheckRequest: {
Bucket: 'b1',
Key: 'us-west-2/claimchecks/2024/09/21/12/1',
Body: JSON.stringify({ id: '1' }),
},
},
{}, // not too large
];

const stub = sinon.stub(Connector.prototype, 'putObject').resolves({});

_(uows)
.batch() // batch in
.through(storeClaimcheck({
claimCheckBucketName: 'event-lake-s3',
}))
.collect()
.tap((collected) => {
// console.log(JSON.stringify(collected, null, 2));

expect(collected.length).to.equal(1); // assert batch out

expect(stub).to.have.been.calledWith({
Bucket: 'b1',
Key: 'us-west-2/claimchecks/2024/09/21/12/1',
Body: JSON.stringify({ id: '1' }),
});

expect(collected[0]).to.deep.equal([
{},
{
putClaimcheckRequest: {
Bucket: 'b1',
Key: 'us-west-2/claimchecks/2024/09/21/12/1',
Body: JSON.stringify({ id: '1' }),
},
putClaimcheckResponse: {},
},
{},
]);
})
.done(done);
});
});
Loading