Skip to content

Commit

Permalink
store claimcheck
Browse files Browse the repository at this point in the history
  • Loading branch information
John Gilbert committed Sep 20, 2024
1 parent 3849f80 commit 22cf94f
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 25 deletions.
4 changes: 2 additions & 2 deletions src/from/kinesis.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
faulty, decompress, compress, options,
} from '../utils';
import { outSkip } from '../filters';
import { claimcheck } from '../queries';
import { redeemClaimcheck } from '../queries';

export const fromKinesis = (event) =>

Expand All @@ -27,7 +27,7 @@ export const fromKinesis = (event) =>
})))
.tap((uow) => options().metrics?.adornKinesisMetrics(uow, event))
.filter(outSkip)
.through(claimcheck());
.through(redeemClaimcheck());

// test helper
export const toKinesisRecords = (events, approximateArrivalTimestamp) => ({
Expand Down
4 changes: 2 additions & 2 deletions src/from/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
faulty, decompress, compress, options,
} from '../utils';
import { outSkip } from '../filters';
import { claimcheck } from '../queries';
import { redeemClaimcheck } from '../queries';

// this from function is intended for use with intra-service messages
// as opposed to consuming inter-servic events
Expand All @@ -31,7 +31,7 @@ export const fromSqsEvent = (event) => _(event.Records)
})))
.tap((uow) => options().metrics?.adornSqsMetrics(uow, event))
.filter(outSkip)
.through(claimcheck());
.through(redeemClaimcheck());

// test helper
// https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
Expand Down
4 changes: 1 addition & 3 deletions src/queries/claimcheck.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ import { getObjectFromS3 } from './s3';
// claim-check pattern support
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/StoreInLibrary.html

// TODO move to query folder

export const claimcheck = (opt = {}) => (s) => s // eslint-disable-line import/prefer-default-export
export const redeemClaimcheck = (opt = {}) => (s) => s // eslint-disable-line import/prefer-default-export
.map(faulty((uow) => ({
...uow,
getClaimCheckRequest: uow.event.s3 ? {
Expand Down
53 changes: 53 additions & 0 deletions src/sinks/claimcheck.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import _ from 'highland';

import { putObjectToS3 } from './s3';

// // claim-check pattern support
// // https://www.enterpriseintegrationpatterns.com/patterns/messaging/StoreInLibrary.html

export const toClaimcheckEvent = (event, bucket) => ({
id: event.id,
type: event.type,
partitionKey: event.partitionKey,
timestamp: event.timestamp,
tags: event.tags,
s3: {
bucket,
key: `claimchecks/${event.id}`,
},
});

export const toPutClaimcheckRequest = (event, Bucket) => ({
Bucket,
Key: `claimchecks/${event.id}`,
Body: JSON.stringify(event),
});

export const storeClaimcheck = ({
id: pipelineId,
claimCheckBucketName = process.env.CLAIMCHECK_BUCKET_NAME,
putClaimcheckRequest = 'putClaimcheckRequest',
putClaimcheckResponse = 'putClaimcheckResponse',
parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8,
debug,
step,
...opt
}) => {
// if we don't have a bucket we can't claimcheck
if (!claimCheckBucketName) return (s) => s;

return (s) => s
.flatMap((batch) =>
_(batch)
.through(putObjectToS3({
debug,
id: pipelineId,
bucketName: claimCheckBucketName,
putRequestField: putClaimcheckRequest,
putResponseField: putClaimcheckResponse,
parallel,
step,
...opt,
}))
.collect());
};
2 changes: 2 additions & 0 deletions src/sinks/eventbridge.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { debug as d } from '../utils/print';
import { adornStandardTags } from '../utils/tags';
import { compress } from '../utils/compression';
import { ratelimit } from '../utils/ratelimit';
import { storeClaimcheck } from './claimcheck';

export const publishToEventBridge = ({ // eslint-disable-line import/prefer-default-export
id: pipelineId,
Expand Down Expand Up @@ -74,6 +75,7 @@ export const publishToEventBridge = ({ // eslint-disable-line import/prefer-defa
requestField: publishRequestField,
debug,
}))
.through(storeClaimcheck(opt))
.map(toBatchUow)
.map(toPublishRequest)

Expand Down
44 changes: 29 additions & 15 deletions src/utils/batch.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import _ from 'highland';
import isFunction from 'lodash/isFunction';
import { toClaimcheckEvent, toPutClaimcheckRequest } from '../sinks/claimcheck';

// used after highland batch step
export const toBatchUow = (batch) => ({ batch });
Expand Down Expand Up @@ -47,7 +48,11 @@ export const compact = (rule) => {
})));
};

export const batchWithSize = (opt) => {
export const batchWithSize = ({
claimCheckBucketName = process.env.CLAIMCHECK_BUCKET_NAME,
putClaimcheckRequest = 'putClaimcheckRequest',
...opt
}) => {
let batched = [];
let sizes = [];

Expand All @@ -67,25 +72,34 @@ export const batchWithSize = (opt) => {
if (!x[opt.requestEntryField]) {
push(null, [x]);
} else {
const size = Buffer.byteLength(JSON.stringify(x[opt.requestEntryField]));
let size = Buffer.byteLength(JSON.stringify(x[opt.requestEntryField]));
if (size > opt.maxRequestSize) {
logMetrics([x], [size], opt);
const error = new Error(`Request size: ${size}, exceeded max: ${opt.maxRequestSize}`);
error.uow = x;
push(error);
} else {
const totalSize = sizes.reduce((a, c) => a + c, size);

if (totalSize <= opt.maxRequestSize && batched.length + 1 <= opt.batchSize) {
batched.push(x);
sizes.push(size);
if (claimCheckBucketName) {
// setup claim check
x[putClaimcheckRequest] = toPutClaimcheckRequest(x[opt.requestEntryField], claimCheckBucketName);
x[opt.requestEntryField] = toClaimcheckEvent(x[opt.requestEntryField], claimCheckBucketName);
size = Buffer.byteLength(JSON.stringify(x[opt.requestEntryField]));
} else {
logMetrics(batched, sizes, opt);
push(null, batched);
batched = [x];
sizes = [size];
const error = new Error(`Request size: ${size}, exceeded max: ${opt.maxRequestSize}`);
error.uow = x;
push(error);
next();
return;
}
}

const totalSize = sizes.reduce((a, c) => a + c, size);

if (totalSize <= opt.maxRequestSize && batched.length + 1 <= opt.batchSize) {
batched.push(x);
sizes.push(size);
} else {
logMetrics(batched, sizes, opt);
push(null, batched);
batched = [x];
sizes = [size];
}
}

next();
Expand Down
6 changes: 3 additions & 3 deletions test/unit/queries/claimcheck.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import sinon from 'sinon';
import _ from 'highland';

import debug from 'debug';
import { claimcheck } from '../../../src/queries/claimcheck';
import { redeemClaimcheck } from '../../../src/queries/claimcheck';

import Connector from '../../../src/connectors/s3';

Expand Down Expand Up @@ -36,7 +36,7 @@ describe('utils/claimcheck.js', () => {
}];

_(uows)
.through(claimcheck())
.through(redeemClaimcheck())
.collect()
.tap((collected) => {
// console.log(JSON.stringify(collected, null, 2));
Expand Down Expand Up @@ -69,7 +69,7 @@ describe('utils/claimcheck.js', () => {
});

it('should use a pipeline label to cache regional redirects configuration', () => {
claimcheck();
redeemClaimcheck();
const testClient = new Connector({ debug: debug('test'), bucketName: 'test-bucket', pipelineId: 'handler:claimcheck' }).client;
expect(testClient.config.followRegionRedirects).to.eq(true);
});
Expand Down

0 comments on commit 22cf94f

Please sign in to comment.