diff --git a/package-lock.json b/package-lock.json index e5a0b67..208e326 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "aws-lambda-stream", - "version": "1.0.9", + "version": "1.0.10", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "aws-lambda-stream", - "version": "1.0.9", + "version": "1.0.10", "license": "MIT", "dependencies": { "object-sizeof": "^2.6.0" diff --git a/package.json b/package.json index 051b105..9065a71 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "aws-lambda-stream", - "version": "1.0.9", + "version": "1.0.10", "description": "Create stream processors with AWS Lambda functions.", "keywords": [ "aws", diff --git a/src/connectors/cloudwatch.js b/src/connectors/cloudwatch.js index e82b7fd..537a2a3 100644 --- a/src/connectors/cloudwatch.js +++ b/src/connectors/cloudwatch.js @@ -7,16 +7,26 @@ import { defaultDebugLogger } from '../utils/log'; class Connector { constructor({ debug, + pipelineId, timeout = Number(process.env.CW_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, }) { this.debug = (msg) => debug('%j', msg); - this.cw = new CloudWatchClient({ - requestHandler: new NodeHttpHandler({ - requestTimeout: timeout, - connectionTimeout: timeout, - }), - logger: defaultDebugLogger(debug), - }); + this.cw = Connector.getClient(pipelineId, debug, timeout); + } + + static clients = {}; + + static getClient(pipelineId, debug, timeout) { + if (!this.clients[pipelineId]) { + this.clients[pipelineId] = new CloudWatchClient({ + requestHandler: new NodeHttpHandler({ + requestTimeout: timeout, + connectionTimeout: timeout, + }), + logger: defaultDebugLogger(debug), + }); + } + return this.clients[pipelineId]; } put({ Namespace, MetricData }) { diff --git a/src/connectors/dynamodb.js b/src/connectors/dynamodb.js index b13b2a9..c530025 100644 --- a/src/connectors/dynamodb.js +++ b/src/connectors/dynamodb.js @@ -22,29 +22,39 @@ class Connector { debug, tableName, convertEmptyValues, + pipelineId, removeUndefinedValues = true, timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, retryConfig = defaultRetryConfig, }) { this.debug = (msg) => debug('%j', msg); this.tableName = tableName || /* istanbul ignore next */ 'undefined'; - const dynamoClient = new DynamoDBClient({ - requestHandler: new NodeHttpHandler({ - requestTimeout: timeout, - connectionTimeout: timeout, - }), - retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay), - logger: defaultDebugLogger(debug), - }); - this.db = DynamoDBDocumentClient.from(dynamoClient, { - marshallOptions: { - convertEmptyValues, - removeUndefinedValues, - }, - }); + this.db = Connector.getClient(pipelineId, debug, convertEmptyValues, removeUndefinedValues, timeout); this.retryConfig = retryConfig; } + static clients = {}; + + static getClient(pipelineId, debug, convertEmptyValues, removeUndefinedValues, timeout) { + if (!this.clients[pipelineId]) { + const dynamoClient = new DynamoDBClient({ + requestHandler: new NodeHttpHandler({ + requestTimeout: timeout, + connectionTimeout: timeout, + }), + retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay), + logger: defaultDebugLogger(debug), + }); + this.clients[pipelineId] = DynamoDBDocumentClient.from(dynamoClient, { + marshallOptions: { + convertEmptyValues, + removeUndefinedValues, + }, + }); + } + return this.clients[pipelineId]; + } + update(inputParams) { const params = { TableName: this.tableName, diff --git a/src/connectors/eventbridge.js b/src/connectors/eventbridge.js index 05313c0..817497e 100644 --- a/src/connectors/eventbridge.js +++ b/src/connectors/eventbridge.js @@ -11,21 +11,31 @@ import { defaultDebugLogger } from '../utils/log'; class Connector { constructor({ debug, + pipelineId, timeout = Number(process.env.BUS_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, retryConfig = defaultRetryConfig, }) { this.debug = (msg) => debug('%j', msg); - this.bus = new EventBridgeClient({ - requestHandler: new NodeHttpHandler({ - requestTimeout: timeout, - connectionTimeout: timeout, - }), - retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay), - logger: defaultDebugLogger(debug), - }); + this.bus = Connector.getClient(pipelineId, debug, timeout); this.retryConfig = retryConfig; } + static clients = {}; + + static getClient(pipelineId, debug, timeout) { + if (!this.clients[pipelineId]) { + this.clients[pipelineId] = new EventBridgeClient({ + requestHandler: new NodeHttpHandler({ + requestTimeout: timeout, + connectionTimeout: timeout, + }), + retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay), + logger: defaultDebugLogger(debug), + }); + } + return this.clients[pipelineId]; + } + putEvents(params) { return this._putEvents(params, []); } diff --git a/src/connectors/firehose.js b/src/connectors/firehose.js index 1ff0362..3a1aa7a 100644 --- a/src/connectors/firehose.js +++ b/src/connectors/firehose.js @@ -7,18 +7,28 @@ import { defaultDebugLogger } from '../utils/log'; class Connector { constructor({ debug, + pipelineId, deliveryStreamName = process.env.DELIVERY_STREAM_NAME, timeout = Number(process.env.FIREHOSE_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, }) { this.debug = (msg) => debug('%j', msg); this.deliveryStreamName = deliveryStreamName || 'undefined'; - this.stream = new FirehoseClient({ - requestHandler: new NodeHttpHandler({ - requestTimeout: timeout, - connectionTimeout: timeout, - }), - logger: defaultDebugLogger(debug), - }); + this.stream = Connector.getClient(pipelineId, debug, timeout); + } + + static clients = {}; + + static getClient(pipelineId, debug, timeout) { + if (!this.clients[pipelineId]) { + this.clients[pipelineId] = new FirehoseClient({ + requestHandler: new NodeHttpHandler({ + requestTimeout: timeout, + connectionTimeout: timeout, + }), + logger: defaultDebugLogger(debug), + }); + } + return this.clients[pipelineId]; } putRecordBatch(inputParams) { diff --git a/src/connectors/kinesis.js b/src/connectors/kinesis.js index 4bdbac3..97d9a84 100644 --- a/src/connectors/kinesis.js +++ b/src/connectors/kinesis.js @@ -12,23 +12,33 @@ import { defaultDebugLogger } from '../utils/log'; class Connector { constructor({ debug, + pipelineId, streamName = process.env.STREAM_NAME, timeout = Number(process.env.KINESIS_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, retryConfig = defaultRetryConfig, }) { this.debug = (msg) => debug('%j', msg); this.streamName = streamName || 'undefined'; - this.stream = new KinesisClient({ - requestHandler: new NodeHttpHandler({ - requestTimeout: timeout, - connectionTimeout: timeout, - }), - retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay), - logger: defaultDebugLogger(debug), - }); + this.stream = Connector.getClient(pipelineId, debug, timeout); this.retryConfig = retryConfig; } + static clients = {}; + + static getClient(pipelineId, debug, timeout) { + if (!this.clients[pipelineId]) { + this.clients[pipelineId] = new KinesisClient({ + requestHandler: new NodeHttpHandler({ + requestTimeout: timeout, + connectionTimeout: timeout, + }), + retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay), + logger: defaultDebugLogger(debug), + }); + } + return this.clients[pipelineId]; + } + putRecords(inputParams) { const params = { StreamName: this.streamName, diff --git a/src/connectors/lambda.js b/src/connectors/lambda.js index 7d3682a..52ebab4 100644 --- a/src/connectors/lambda.js +++ b/src/connectors/lambda.js @@ -7,16 +7,26 @@ import { defaultDebugLogger } from '../utils/log'; class Connector { constructor({ debug, + pipelineId, timeout = Number(process.env.LAMBDA_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, }) { this.debug = (msg) => debug('%j', msg); - this.lambda = new LambdaClient({ - requestHandler: new NodeHttpHandler({ - requestTimeout: timeout, - connectionTimeout: timeout, - }), - logger: defaultDebugLogger(debug), - }); + this.lambda = Connector.getClient(pipelineId, debug, timeout); + } + + static clients = {}; + + static getClient(pipelineId, debug, timeout) { + if (!this.clients[pipelineId]) { + this.clients[pipelineId] = new LambdaClient({ + requestHandler: new NodeHttpHandler({ + requestTimeout: timeout, + connectionTimeout: timeout, + }), + logger: defaultDebugLogger(debug), + }); + } + return this.clients[pipelineId]; } invoke(params) { diff --git a/src/connectors/s3.js b/src/connectors/s3.js index 081e006..246a153 100644 --- a/src/connectors/s3.js +++ b/src/connectors/s3.js @@ -11,18 +11,28 @@ import { defaultDebugLogger } from '../utils/log'; class Connector { constructor({ debug, + pipelineId, bucketName = process.env.BUCKET_NAME, timeout = Number(process.env.S3_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, }) { this.debug = (msg) => debug('%j', msg); this.bucketName = bucketName || 'undefined'; - this.bucket = new S3Client({ - requestHandler: new NodeHttpHandler({ - requestTimeout: timeout, - connectionTimeout: timeout, - }), - logger: defaultDebugLogger(debug), - }); + this.bucket = Connector.getClient(pipelineId, debug, timeout); + } + + static clients = {}; + + static getClient(pipelineId, debug, timeout) { + if (!this.clients[pipelineId]) { + this.clients[pipelineId] = new S3Client({ + requestHandler: new NodeHttpHandler({ + requestTimeout: timeout, + connectionTimeout: timeout, + }), + logger: defaultDebugLogger(debug), + }); + } + return this.clients[pipelineId]; } putObject(inputParams) { diff --git a/src/connectors/secretsmgr.js b/src/connectors/secretsmgr.js index 7a6fd86..ecfb921 100644 --- a/src/connectors/secretsmgr.js +++ b/src/connectors/secretsmgr.js @@ -12,18 +12,28 @@ import { defaultDebugLogger } from '../utils/log'; class Connector { constructor({ debug, + pipelineId, secretId, timeout = Number(process.env.SECRETSMGR_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, }) { this.debug = /* istanbul ignore next */ (msg) => debug('%j', msg); this.secretId = secretId; - this.sm = new SecretsManagerClient({ - requestHandler: new NodeHttpHandler({ - requestTimeout: timeout, - connectionTimeout: timeout, - }), - logger: defaultDebugLogger(debug), - }); + this.sm = Connector.getClient(pipelineId, debug, timeout); + } + + static clients = {}; + + static getClient(pipelineId, debug, timeout) { + if (!this.clients[pipelineId]) { + this.clients[pipelineId] = new SecretsManagerClient({ + requestHandler: new NodeHttpHandler({ + requestTimeout: timeout, + connectionTimeout: timeout, + }), + logger: defaultDebugLogger(debug), + }); + } + return this.clients[pipelineId]; } async get() { diff --git a/src/connectors/sns.js b/src/connectors/sns.js index b43a662..bf69409 100644 --- a/src/connectors/sns.js +++ b/src/connectors/sns.js @@ -13,23 +13,33 @@ import { defaultDebugLogger } from '../utils/log'; class Connector { constructor({ debug, + pipelineId, topicArn = process.env.TOPIC_ARN, timeout = Number(process.env.SNS_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, retryConfig = defaultRetryConfig, }) { this.debug = (msg) => debug('%j', msg); this.topicArn = topicArn || 'undefined'; - this.topic = new SNSClient({ - requestHandler: new NodeHttpHandler({ - requestTimeout: timeout, - connectionTimeout: timeout, - }), - retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay), - logger: defaultDebugLogger(debug), - }); + this.topic = Connector.getClient(pipelineId, debug, timeout); this.retryConfig = retryConfig; } + static clients = {}; + + static getClient(pipelineId, debug, timeout) { + if (!this.clients[pipelineId]) { + this.clients[pipelineId] = new SNSClient({ + requestHandler: new NodeHttpHandler({ + requestTimeout: timeout, + connectionTimeout: timeout, + }), + retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay), + logger: defaultDebugLogger(debug), + }); + } + return this.clients[pipelineId]; + } + publish(inputParams) { const params = { TopicArn: this.topicArn, diff --git a/src/connectors/sqs.js b/src/connectors/sqs.js index 80b4f6b..6f8ccdc 100644 --- a/src/connectors/sqs.js +++ b/src/connectors/sqs.js @@ -12,23 +12,33 @@ import { defaultDebugLogger } from '../utils/log'; class Connector { constructor({ debug, + pipelineId, queueUrl = process.env.QUEUE_URL, timeout = Number(process.env.SQS_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, retryConfig = defaultRetryConfig, }) { this.debug = (msg) => debug('%j', msg); this.queueUrl = queueUrl || 'undefined'; - this.queue = new SQSClient({ - requestHandler: new NodeHttpHandler({ - requestTimeout: timeout, - connectionTimeout: timeout, - }), - retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay), - logger: defaultDebugLogger(debug), - }); + this.queue = Connector.getClient(pipelineId, debug, timeout); this.retryConfig = retryConfig; } + static clients = {}; + + static getClient(pipelineId, debug, timeout) { + if (!this.clients[pipelineId]) { + this.clients[pipelineId] = new SQSClient({ + requestHandler: new NodeHttpHandler({ + requestTimeout: timeout, + connectionTimeout: timeout, + }), + retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay), + logger: defaultDebugLogger(debug), + }); + } + return this.clients[pipelineId]; + } + sendMessageBatch(inputParams) { const params = { QueueUrl: this.queueUrl, diff --git a/src/queries/dynamodb.js b/src/queries/dynamodb.js index 346a112..a1553a6 100644 --- a/src/queries/dynamodb.js +++ b/src/queries/dynamodb.js @@ -7,6 +7,7 @@ import { rejectWithFault } from '../utils/faults'; import { debug as d } from '../utils/print'; export const batchGetDynamoDB = ({ + id: pipelineId, debug = d('dynamodb'), tableName = process.env.EVENT_TABLE_NAME || process.env.ENTITY_TABLE_NAME, batchGetRequestField = 'batchGetRequest', @@ -15,7 +16,9 @@ export const batchGetDynamoDB = ({ timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, decrypt = async (data) => data, } = {}) => { - const connector = new Connector({ debug, tableName, timeout }); + const connector = new Connector({ + pipelineId, debug, tableName, timeout, + }); const invoke = (uow) => { if (!uow[batchGetRequestField]) return _(Promise.resolve(uow)); @@ -53,6 +56,7 @@ export const batchGetDynamoDB = ({ }; export const queryAllDynamoDB = (/* istanbul ignore next */{ + id: pipelineId, debug = d('dynamodb'), tableName = process.env.EVENT_TABLE_NAME || process.env.ENTITY_TABLE_NAME, queryRequestField = 'queryRequest', @@ -61,7 +65,9 @@ export const queryAllDynamoDB = (/* istanbul ignore next */{ timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, decrypt = async (data) => data, } = {}) => { - const connector = new Connector({ debug, tableName, timeout }); + const connector = new Connector({ + pipelineId, debug, tableName, timeout, + }); const invoke = (uow) => { if (!uow[queryRequestField]) return _(Promise.resolve(uow)); @@ -140,6 +146,7 @@ export const toGetRequest = (uow, rule) => { }; export const scanSplitDynamoDB = ({ + id: pipelineId, debug = d('dynamodb'), tableName = process.env.EVENT_TABLE_NAME || process.env.ENTITY_TABLE_NAME || process.env.TABLE_NAME, scanRequestField = 'scanRequest', @@ -148,7 +155,9 @@ export const scanSplitDynamoDB = ({ timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, decrypt = async (data) => data, } = {}) => { - const connector = new Connector({ debug, tableName, timeout }); + const connector = new Connector({ + pipelineId, debug, tableName, timeout, + }); const scan = (uow) => { if (!uow[scanRequestField]) return _(Promise.resolve(uow)); @@ -206,6 +215,7 @@ export const scanSplitDynamoDB = ({ }; export const querySplitDynamoDB = ({ + id: pipelineId, debug = d('dynamodb'), tableName = process.env.EVENT_TABLE_NAME || process.env.ENTITY_TABLE_NAME || process.env.TABLE_NAME, querySplitRequestField = 'querySplitRequest', @@ -214,7 +224,9 @@ export const querySplitDynamoDB = ({ timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, decrypt = async (data) => data, } = {}) => { - const connector = new Connector({ debug, tableName, timeout }); + const connector = new Connector({ + pipelineId, debug, tableName, timeout, + }); const invoke = (uow) => { if (!uow[querySplitRequestField]) return _(Promise.resolve(uow)); diff --git a/src/queries/s3.js b/src/queries/s3.js index 023f420..a3caed5 100644 --- a/src/queries/s3.js +++ b/src/queries/s3.js @@ -22,13 +22,14 @@ export const toGetObjectRequest2 = (uow) => ({ }); export const getObjectFromS3 = ({ + id: pipelineId, debug = d('s3'), bucketName = process.env.BUCKET_NAME, getRequestField = 'getRequest', getResponseField = 'getResponse', parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8, } = {}) => { - const connector = new Connector({ debug, bucketName }); + const connector = new Connector({ pipelineId, debug, bucketName }); const getObject = (uow) => { if (!uow[getRequestField]) return _(Promise.resolve(uow)); @@ -46,6 +47,7 @@ export const getObjectFromS3 = ({ }; export const getObjectFromS3AsStream = ({ + id: pipelineId, debug = d('s3'), bucketName = process.env.BUCKET_NAME, getRequestField = 'getRequest', @@ -53,7 +55,7 @@ export const getObjectFromS3AsStream = ({ delimiter = '\n', splitFilter = () => true, } = {}) => { - const connector = new Connector({ debug, bucketName }); + const connector = new Connector({ pipelineId, debug, bucketName }); const getObject = (uow) => { if (!uow[getRequestField]) return _(Promise.resolve(uow)); @@ -91,13 +93,14 @@ export const splitS3Object = ({ }; export const listObjectsFromS3 = ({ + id: pipelineId, debug = d('s3'), bucketName = process.env.BUCKET_NAME, listRequestField = 'listRequest', listResponseField = 'listResponse', parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8, } = {}) => { - const connector = new Connector({ debug, bucketName }); + const connector = new Connector({ pipelineId, debug, bucketName }); const listObjects = (uow) => { /* istanbul ignore if */ @@ -116,12 +119,13 @@ export const listObjectsFromS3 = ({ }; export const pageObjectsFromS3 = ({ + id: pipelineId, debug = d('s3'), bucketName = process.env.BUCKET_NAME, listRequestField = 'listRequest', parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8, } = {}) => { - const connector = new Connector({ debug, bucketName }); + const connector = new Connector({ pipelineId, debug, bucketName }); const listObjects = (uow) => { let { ContinuationToken } = uow[listRequestField]; diff --git a/src/sinks/cloudwatch.js b/src/sinks/cloudwatch.js index aa8f527..b6200d2 100644 --- a/src/sinks/cloudwatch.js +++ b/src/sinks/cloudwatch.js @@ -6,11 +6,12 @@ import { rejectWithFault } from '../utils/faults'; import { debug as d } from '../utils/print'; export const putMetrics = ({ // eslint-disable-line import/prefer-default-export + id: pipelineId, debug = d('cw'), putField = 'putRequest', parallel = Number(process.env.CW_PARALLEL) || Number(process.env.PARALLEL) || 8, } = {}) => { - const connector = new Connector({ debug }); + const connector = new Connector({ pipelineId, debug }); const put = (uow) => { const p = connector.put(uow[putField]) diff --git a/src/sinks/dynamodb.js b/src/sinks/dynamodb.js index f013708..066e862 100644 --- a/src/sinks/dynamodb.js +++ b/src/sinks/dynamodb.js @@ -51,6 +51,7 @@ export const pkCondition = (fieldName = 'pk') => ({ }); export const updateDynamoDB = ({ + id: pipelineId, debug = d('dynamodb'), tableName = process.env.ENTITY_TABLE_NAME || process.env.EVENT_TABLE_NAME, updateRequestField = 'updateRequest', @@ -61,7 +62,7 @@ export const updateDynamoDB = ({ ...opt } = {}) => { const connector = new Connector({ - debug, tableName, timeout, removeUndefinedValues, + pipelineId, debug, tableName, timeout, removeUndefinedValues, }); const invoke = (uow) => { @@ -81,13 +82,16 @@ export const updateDynamoDB = ({ }; export const putDynamoDB = ({ + id: pipelineId, debug = d('dynamodb'), tableName = process.env.EVENT_TABLE_NAME || process.env.ENTITY_TABLE_NAME, putRequestField = 'putRequest', parallel = Number(process.env.UPDATE_PARALLEL) || Number(process.env.PARALLEL) || 4, timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000, } = {}) => { - const connector = new Connector({ debug, tableName, timeout }); + const connector = new Connector({ + pipelineId, debug, tableName, timeout, + }); const invoke = (uow) => { if (!uow[putRequestField]) return _(Promise.resolve(uow)); diff --git a/src/sinks/eventbridge.js b/src/sinks/eventbridge.js index 7e1c4be..69530ed 100644 --- a/src/sinks/eventbridge.js +++ b/src/sinks/eventbridge.js @@ -10,6 +10,7 @@ import { compress } from '../utils/compression'; import { ratelimit } from '../utils/ratelimit'; export const publishToEventBridge = ({ // eslint-disable-line import/prefer-default-export + id: pipelineId, debug = d('eventbridge'), busName = process.env.BUS_NAME || 'undefined', source = process.env.BUS_SRC || 'custom', // could change this to internal vs external/ingress/egress @@ -23,7 +24,7 @@ export const publishToEventBridge = ({ // eslint-disable-line import/prefer-defa retryConfig, ...opt } = {}) => { - const connector = new Connector({ debug, retryConfig }); + const connector = new Connector({ pipelineId, debug, retryConfig }); const toPublishRequestEntry = (uow) => ({ ...uow, diff --git a/src/sinks/firehose.js b/src/sinks/firehose.js index 08d9477..8f1b98d 100644 --- a/src/sinks/firehose.js +++ b/src/sinks/firehose.js @@ -8,6 +8,7 @@ import { debug as d } from '../utils/print'; import { compress } from '../utils/compression'; export const sendToFirehose = ({ + id: pipelineId, debug = d('firehose'), deliveryStreamName = process.env.DELIVERY_STREAM_NAME, eventField = 'event', @@ -16,7 +17,7 @@ export const sendToFirehose = ({ handleErrors = true, ...opt } = {}) => { - const connector = new Connector({ debug, deliveryStreamName }); + const connector = new Connector({ pipelineId, debug, deliveryStreamName }); const toInputParams = (batchUow) => ({ ...batchUow, diff --git a/src/sinks/kinesis.js b/src/sinks/kinesis.js index 2fecbc9..4a64c35 100644 --- a/src/sinks/kinesis.js +++ b/src/sinks/kinesis.js @@ -10,6 +10,7 @@ import { compress } from '../utils/compression'; import { ratelimit } from '../utils/ratelimit'; export const publishToKinesis = ({ + id: pipelineId, debug = d('kinesis'), streamName = process.env.STREAM_NAME, eventField = 'event', @@ -18,7 +19,7 @@ export const publishToKinesis = ({ handleErrors = true, ...opt } = {}) => { - const connector = new Publisher({ debug, streamName }); + const connector = new Publisher({ pipelineId, debug, streamName }); const toInputParams = (batchUow) => ({ ...batchUow, diff --git a/src/sinks/lambda.js b/src/sinks/lambda.js index 0d6b52f..78f127a 100644 --- a/src/sinks/lambda.js +++ b/src/sinks/lambda.js @@ -7,12 +7,13 @@ import { debug as d } from '../utils/print'; import { ratelimit } from '../utils/ratelimit'; export const invokeLambda = ({ // eslint-disable-line import/prefer-default-export + id: pipelineId, debug = d('lambda'), invokeField = 'invokeRequest', parallel = Number(process.env.LAMBDA_PARALLEL) || Number(process.env.PARALLEL) || 8, ...opt } = {}) => { - const connector = new Connector({ debug }); + const connector = new Connector({ pipelineId, debug }); const invoke = (uow) => { const p = connector.invoke(uow[invokeField]) diff --git a/src/sinks/s3.js b/src/sinks/s3.js index 0a48a7a..b464945 100644 --- a/src/sinks/s3.js +++ b/src/sinks/s3.js @@ -8,13 +8,14 @@ import { ratelimit } from '../utils/ratelimit'; export const putObjectToS3 = ({ debug = d('s3'), + id: pipelineId, bucketName = process.env.BUCKET_NAME, putRequestField = 'putRequest', putResponseField = 'putResponse', parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8, ...opt } = {}) => { - const connector = new Connector({ debug, bucketName }); + const connector = new Connector({ pipelineId, debug, bucketName }); const putObject = (uow) => { if (!uow[putRequestField]) return _(Promise.resolve(uow)); @@ -34,12 +35,13 @@ export const putObjectToS3 = ({ export const deleteObjectFromS3 = ({ debug, + id: pipelineId, bucketName = process.env.BUCKET_NAME, deleteRequestField = 'deleteRequest', deleteResponseField = 'deleteResponse', parallel = Number(process.env.S3_PARALLEL) || Number(process.env.PARALLEL) || 8, } = {}) => { - const connector = new Connector({ debug, bucketName }); + const connector = new Connector({ pipelineId, debug, bucketName }); const deleteObject = (uow) => { if (!uow[deleteRequestField]) return _(Promise.resolve(uow)); diff --git a/src/sinks/sns.js b/src/sinks/sns.js index 57aaaff..713a5a9 100644 --- a/src/sinks/sns.js +++ b/src/sinks/sns.js @@ -7,13 +7,14 @@ import { debug as d } from '../utils/print'; import { ratelimit } from '../utils/ratelimit'; export const publishToSns = ({ // eslint-disable-line import/prefer-default-export + id: pipelineId, debug = d('sns'), topicArn = process.env.TOPIC_ARN, messageField = 'message', parallel = Number(process.env.SNS_PARALLEL) || Number(process.env.PARALLEL) || 8, ...opt } = {}) => { - const connector = new Connector({ debug, topicArn }); + const connector = new Connector({ pipelineId, debug, topicArn }); const publish = (uow) => { const p = connector.publish(uow[messageField]) diff --git a/src/sinks/sqs.js b/src/sinks/sqs.js index c24602e..0de4fb9 100644 --- a/src/sinks/sqs.js +++ b/src/sinks/sqs.js @@ -8,6 +8,7 @@ import { rejectWithFault } from '../utils/faults'; import { debug as d } from '../utils/print'; export const sendToSqs = ({ // eslint-disable-line import/prefer-default-export + id: pipelineId, debug = d('sqs'), queueUrl = process.env.QUEUE_URL, messageField = 'message', @@ -15,7 +16,7 @@ export const sendToSqs = ({ // eslint-disable-line import/prefer-default-export parallel = Number(process.env.SQS_PARALLEL) || Number(process.env.PARALLEL) || 8, ...opt } = {}) => { - const connector = new Connector({ debug, queueUrl }); + const connector = new Connector({ pipelineId, debug, queueUrl }); const toInputParams = (batchUow) => ({ ...batchUow, diff --git a/test/unit/connectors/cloudwatch.test.js b/test/unit/connectors/cloudwatch.test.js index 5fe66d3..86e1525 100644 --- a/test/unit/connectors/cloudwatch.test.js +++ b/test/unit/connectors/cloudwatch.test.js @@ -18,6 +18,15 @@ describe('connectors/cloudwatch.js', () => { mockCloudWatch.restore(); }); + it('should reuse client per pipeline', () => { + const client1 = Connector.getClient('test1', debug('test')); + const client2 = Connector.getClient('test1', debug('test')); + const client3 = Connector.getClient('test2', debug('test')); + + expect(client1).to.eq(client2); + expect(client2).to.not.eq(client3); + }); + it('should put', async () => { const spy = sinon.spy((_) => ({})); mockCloudWatch.on(PutMetricDataCommand).callsFake(spy); diff --git a/test/unit/connectors/dynamodb.test.js b/test/unit/connectors/dynamodb.test.js index 896dc89..004cfc9 100644 --- a/test/unit/connectors/dynamodb.test.js +++ b/test/unit/connectors/dynamodb.test.js @@ -27,6 +27,15 @@ describe('connectors/dynamodb.js', () => { mockDdb.restore(); }); + it('should reuse client per pipeline', () => { + const client1 = Connector.getClient('test1', debug('test')); + const client2 = Connector.getClient('test1', debug('test')); + const client3 = Connector.getClient('test2', debug('test')); + + expect(client1).to.eq(client2); + expect(client2).to.not.eq(client3); + }); + it('should update', async () => { const spy = sinon.spy((_) => ({})); mockDdb.on(UpdateCommand).callsFake(spy); diff --git a/test/unit/connectors/eventbridge.test.js b/test/unit/connectors/eventbridge.test.js index 21736a7..f2cd327 100644 --- a/test/unit/connectors/eventbridge.test.js +++ b/test/unit/connectors/eventbridge.test.js @@ -19,6 +19,15 @@ describe('connectors/eventbridge.js', () => { mockEventBridge.restore(); }); + it('should reuse client per pipeline', () => { + const client1 = Connector.getClient('test1', debug('test')); + const client2 = Connector.getClient('test1', debug('test')); + const client3 = Connector.getClient('test2', debug('test')); + + expect(client1).to.eq(client2); + expect(client2).to.not.eq(client3); + }); + it('should publish', async () => { const spy = sinon.spy((_) => ({ Entries: [{ EventId: '1' }], FailedEntryCount: 0 })); mockEventBridge.on(PutEventsCommand).callsFake(spy); diff --git a/test/unit/connectors/firehose.test.js b/test/unit/connectors/firehose.test.js index 7cd8a76..082d0bb 100644 --- a/test/unit/connectors/firehose.test.js +++ b/test/unit/connectors/firehose.test.js @@ -19,6 +19,15 @@ describe('connectors/firehose.js', () => { mockFirehose.restore(); }); + it('should reuse client per pipeline', () => { + const client1 = Connector.getClient('test1', debug('test')); + const client2 = Connector.getClient('test1', debug('test')); + const client3 = Connector.getClient('test2', debug('test')); + + expect(client1).to.eq(client2); + expect(client2).to.not.eq(client3); + }); + it('should put', async () => { const spy = sinon.spy((_) => ({})); mockFirehose.on(PutRecordBatchCommand).callsFake(spy); diff --git a/test/unit/connectors/kinesis.test.js b/test/unit/connectors/kinesis.test.js index 44c63e2..cf229ef 100644 --- a/test/unit/connectors/kinesis.test.js +++ b/test/unit/connectors/kinesis.test.js @@ -19,6 +19,15 @@ describe('connectors/kinesis.js', () => { mockKinesis.restore(); }); + it('should reuse client per pipeline', () => { + const client1 = Connector.getClient('test1', debug('test')); + const client2 = Connector.getClient('test1', debug('test')); + const client3 = Connector.getClient('test2', debug('test')); + + expect(client1).to.eq(client2); + expect(client2).to.not.eq(client3); + }); + it('should publish', async () => { const spy = sinon.spy((_) => ({ Records: [{ SequenceNumber: '1' }], FailedRecordCount: 0 })); mockKinesis.on(PutRecordsCommand).callsFake(spy); diff --git a/test/unit/connectors/lambda.test.js b/test/unit/connectors/lambda.test.js index 8e12c20..2d8fe4d 100644 --- a/test/unit/connectors/lambda.test.js +++ b/test/unit/connectors/lambda.test.js @@ -18,6 +18,15 @@ describe('connectors/lambda.js', () => { mockLambda.restore(); }); + it('should reuse client per pipeline', () => { + const client1 = Connector.getClient('test1', debug('test')); + const client2 = Connector.getClient('test1', debug('test')); + const client3 = Connector.getClient('test2', debug('test')); + + expect(client1).to.eq(client2); + expect(client2).to.not.eq(client3); + }); + it('should invoke', async () => { const spy = sinon.spy((_) => ({ StatusCode: 200, diff --git a/test/unit/connectors/s3.test.js b/test/unit/connectors/s3.test.js index 55ebaa3..1249e0c 100644 --- a/test/unit/connectors/s3.test.js +++ b/test/unit/connectors/s3.test.js @@ -24,6 +24,15 @@ describe('connectors/s3.js', () => { mockS3.restore(); }); + it('should reuse client per pipeline', () => { + const client1 = Connector.getClient('test1', debug('test')); + const client2 = Connector.getClient('test1', debug('test')); + const client3 = Connector.getClient('test2', debug('test')); + + expect(client1).to.eq(client2); + expect(client2).to.not.eq(client3); + }); + it('should put object', async () => { const spy = sinon.spy(() => ({})); mockS3.on(PutObjectCommand).callsFake(spy); diff --git a/test/unit/connectors/secretsmgr.test.js b/test/unit/connectors/secretsmgr.test.js index 2d11404..29d103d 100644 --- a/test/unit/connectors/secretsmgr.test.js +++ b/test/unit/connectors/secretsmgr.test.js @@ -18,6 +18,15 @@ describe('connectors/secretsmgr.js', () => { mockSecretsMgr.restore(); }); + it('should reuse client per pipeline', () => { + const client1 = Connector.getClient('test1', debug('test')); + const client2 = Connector.getClient('test1', debug('test')); + const client3 = Connector.getClient('test2', debug('test')); + + expect(client1).to.eq(client2); + expect(client2).to.not.eq(client3); + }); + it('should get the secret', async () => { const SecretString = Buffer.from(JSON.stringify({ MY_SECRET: '123456' })).toString('base64'); // use this string in the fixtures/.../secrets recording diff --git a/test/unit/connectors/sns.test.js b/test/unit/connectors/sns.test.js index 105b271..8c269a6 100644 --- a/test/unit/connectors/sns.test.js +++ b/test/unit/connectors/sns.test.js @@ -19,6 +19,15 @@ describe('connectors/sns.js', () => { mockSns.restore(); }); + it('should reuse client per pipeline', () => { + const client1 = Connector.getClient('test1', debug('test')); + const client2 = Connector.getClient('test1', debug('test')); + const client3 = Connector.getClient('test2', debug('test')); + + expect(client1).to.eq(client2); + expect(client2).to.not.eq(client3); + }); + it('should publish msg', async () => { const spy = sinon.spy((_) => ({})); mockSns.on(PublishCommand).callsFake(spy); diff --git a/test/unit/connectors/sqs.test.js b/test/unit/connectors/sqs.test.js index b9259aa..d38a638 100644 --- a/test/unit/connectors/sqs.test.js +++ b/test/unit/connectors/sqs.test.js @@ -19,6 +19,15 @@ describe('connectors/sqs.js', () => { mockSqs.restore(); }); + it('should reuse client per pipeline', () => { + const client1 = Connector.getClient('test1', debug('test')); + const client2 = Connector.getClient('test1', debug('test')); + const client3 = Connector.getClient('test2', debug('test')); + + expect(client1).to.eq(client2); + expect(client2).to.not.eq(client3); + }); + it('should send msg', async () => { const spy = sinon.spy(() => ({ Successful: [{ Id: '1' }] })); mockSqs.on(SendMessageBatchCommand).callsFake(spy);