diff --git a/src/from/kinesis.js b/src/from/kinesis.js index 8fa5d31..30b4ba2 100644 --- a/src/from/kinesis.js +++ b/src/from/kinesis.js @@ -4,7 +4,7 @@ import { faulty, decompress, compress, options, } from '../utils'; import { outSkip } from '../filters'; -import { claimcheck } from '../queries'; +import { redeemClaimcheck } from '../queries'; export const fromKinesis = (event) => @@ -27,7 +27,7 @@ export const fromKinesis = (event) => }))) .tap((uow) => options().metrics?.adornKinesisMetrics(uow, event)) .filter(outSkip) - .through(claimcheck()); + .through(redeemClaimcheck()); // test helper export const toKinesisRecords = (events, approximateArrivalTimestamp) => ({ diff --git a/src/from/sqs.js b/src/from/sqs.js index 7e1e99d..e8635d6 100644 --- a/src/from/sqs.js +++ b/src/from/sqs.js @@ -4,7 +4,7 @@ import { faulty, decompress, compress, options, } from '../utils'; import { outSkip } from '../filters'; -import { claimcheck } from '../queries'; +import { redeemClaimcheck } from '../queries'; // this from function is intended for use with intra-service messages // as opposed to consuming inter-servic events @@ -31,7 +31,7 @@ export const fromSqsEvent = (event) => _(event.Records) }))) .tap((uow) => options().metrics?.adornSqsMetrics(uow, event)) .filter(outSkip) - .through(claimcheck()); + .through(redeemClaimcheck()); // test helper // https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html diff --git a/src/queries/claimcheck.js b/src/queries/claimcheck.js index c9a489e..f870f66 100644 --- a/src/queries/claimcheck.js +++ b/src/queries/claimcheck.js @@ -5,9 +5,7 @@ import { getObjectFromS3 } from './s3'; // claim-check pattern support // https://www.enterpriseintegrationpatterns.com/patterns/messaging/StoreInLibrary.html -// TODO move to query folder - -export const claimcheck = (opt = {}) => (s) => s // eslint-disable-line import/prefer-default-export +export const redeemClaimcheck = (opt = {}) => (s) => s // eslint-disable-line import/prefer-default-export .map(faulty((uow) => ({ ...uow, getClaimCheckRequest: uow.event.s3 ? { diff --git a/src/sinks/claimcheck.js b/src/sinks/claimcheck.js new file mode 100644 index 0000000..b943961 --- /dev/null +++ b/src/sinks/claimcheck.js @@ -0,0 +1,53 @@ +import _ from 'highland'; + +import { putObjectToS3 } from './s3'; + +// // claim-check pattern support +// // https://www.enterpriseintegrationpatterns.com/patterns/messaging/StoreInLibrary.html + +export const toClaimcheckEvent = (event, bucket) => ({ + id: event.id, + type: event.type, + partitionKey: event.partitionKey, + timestamp: event.timestamp, + tags: event.tags, + s3: { + bucket, + key: `claimchecks/${event.id}`, + }, +}); + +export const toPutClaimcheckRequest = (event, Bucket) => ({ + Bucket, + Key: `claimchecks/${event.id}`, + Body: JSON.stringify(event), +}); + +export const storeClaimcheck = ({ + id: pipelineId, + claimCheckBucketName = process.env.CLAIMCHECK_BUCKET_NAME, + putClaimcheckRequest = 'putClaimcheckRequest', + putClaimcheckResponse = 'putClaimcheckResponse', + parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8, + debug, + step, + ...opt +}) => { + // if we don't have a bucket we can't claimcheck + if (!claimCheckBucketName) return (s) => s; + + return (s) => s + .flatMap((batch) => + _(batch) + .through(putObjectToS3({ + debug, + id: pipelineId, + bucketName: claimCheckBucketName, + putRequestField: putClaimcheckRequest, + putResponseField: putClaimcheckResponse, + parallel, + step, + ...opt, + })) + .collect()); +}; diff --git a/src/sinks/eventbridge.js b/src/sinks/eventbridge.js index 8119055..9ab9390 100644 --- a/src/sinks/eventbridge.js +++ b/src/sinks/eventbridge.js @@ -8,6 +8,7 @@ import { debug as d } from '../utils/print'; import { adornStandardTags } from '../utils/tags'; import { compress } from '../utils/compression'; import { ratelimit } from '../utils/ratelimit'; +import { storeClaimcheck } from './claimcheck'; export const publishToEventBridge = ({ // eslint-disable-line import/prefer-default-export id: pipelineId, @@ -74,6 +75,7 @@ export const publishToEventBridge = ({ // eslint-disable-line import/prefer-defa requestField: publishRequestField, debug, })) + .through(storeClaimcheck(opt)) .map(toBatchUow) .map(toPublishRequest) diff --git a/src/utils/batch.js b/src/utils/batch.js index fcf0974..47e49c1 100644 --- a/src/utils/batch.js +++ b/src/utils/batch.js @@ -1,5 +1,6 @@ import _ from 'highland'; import isFunction from 'lodash/isFunction'; +import { toClaimcheckEvent, toPutClaimcheckRequest } from '../sinks/claimcheck'; // used after highland batch step export const toBatchUow = (batch) => ({ batch }); @@ -47,7 +48,11 @@ export const compact = (rule) => { }))); }; -export const batchWithSize = (opt) => { +export const batchWithSize = ({ + claimCheckBucketName = process.env.CLAIMCHECK_BUCKET_NAME, + putClaimcheckRequest = 'putClaimcheckRequest', + ...opt +}) => { let batched = []; let sizes = []; @@ -67,25 +72,34 @@ export const batchWithSize = (opt) => { if (!x[opt.requestEntryField]) { push(null, [x]); } else { - const size = Buffer.byteLength(JSON.stringify(x[opt.requestEntryField])); + let size = Buffer.byteLength(JSON.stringify(x[opt.requestEntryField])); if (size > opt.maxRequestSize) { logMetrics([x], [size], opt); - const error = new Error(`Request size: ${size}, exceeded max: ${opt.maxRequestSize}`); - error.uow = x; - push(error); - } else { - const totalSize = sizes.reduce((a, c) => a + c, size); - - if (totalSize <= opt.maxRequestSize && batched.length + 1 <= opt.batchSize) { - batched.push(x); - sizes.push(size); + if (claimCheckBucketName) { + // setup claim check + x[putClaimcheckRequest] = toPutClaimcheckRequest(x[opt.requestEntryField], claimCheckBucketName); + x[opt.requestEntryField] = toClaimcheckEvent(x[opt.requestEntryField], claimCheckBucketName); + size = Buffer.byteLength(JSON.stringify(x[opt.requestEntryField])); } else { - logMetrics(batched, sizes, opt); - push(null, batched); - batched = [x]; - sizes = [size]; + const error = new Error(`Request size: ${size}, exceeded max: ${opt.maxRequestSize}`); + error.uow = x; + push(error); + next(); + return; } } + + const totalSize = sizes.reduce((a, c) => a + c, size); + + if (totalSize <= opt.maxRequestSize && batched.length + 1 <= opt.batchSize) { + batched.push(x); + sizes.push(size); + } else { + logMetrics(batched, sizes, opt); + push(null, batched); + batched = [x]; + sizes = [size]; + } } next(); diff --git a/test/unit/queries/claimcheck.test.js b/test/unit/queries/claimcheck.test.js index 0c484f0..936ebd0 100644 --- a/test/unit/queries/claimcheck.test.js +++ b/test/unit/queries/claimcheck.test.js @@ -4,7 +4,7 @@ import sinon from 'sinon'; import _ from 'highland'; import debug from 'debug'; -import { claimcheck } from '../../../src/queries/claimcheck'; +import { redeemClaimcheck } from '../../../src/queries/claimcheck'; import Connector from '../../../src/connectors/s3'; @@ -36,7 +36,7 @@ describe('utils/claimcheck.js', () => { }]; _(uows) - .through(claimcheck()) + .through(redeemClaimcheck()) .collect() .tap((collected) => { // console.log(JSON.stringify(collected, null, 2)); @@ -69,7 +69,7 @@ describe('utils/claimcheck.js', () => { }); it('should use a pipeline label to cache regional redirects configuration', () => { - claimcheck(); + redeemClaimcheck(); const testClient = new Connector({ debug: debug('test'), bucketName: 'test-bucket', pipelineId: 'handler:claimcheck' }).client; expect(testClient.config.followRegionRedirects).to.eq(true); });