Skip to content

Commit

Permalink
Merge pull request #366 from jgilbert01/singleton-per-pipeline
Browse files Browse the repository at this point in the history
Singleton sdk client per pipeline.
  • Loading branch information
petermyers authored Jun 27, 2024
2 parents 84b4759 + be8e6d9 commit 6505073
Show file tree
Hide file tree
Showing 33 changed files with 322 additions and 103 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "aws-lambda-stream",
"version": "1.0.9",
"version": "1.0.10",
"description": "Create stream processors with AWS Lambda functions.",
"keywords": [
"aws",
Expand Down
24 changes: 17 additions & 7 deletions src/connectors/cloudwatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,26 @@ import { defaultDebugLogger } from '../utils/log';
class Connector {
constructor({
debug,
pipelineId,
timeout = Number(process.env.CW_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
}) {
this.debug = (msg) => debug('%j', msg);
this.cw = new CloudWatchClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
logger: defaultDebugLogger(debug),
});
this.cw = Connector.getClient(pipelineId, debug, timeout);
}

static clients = {};

static getClient(pipelineId, debug, timeout) {
if (!this.clients[pipelineId]) {
this.clients[pipelineId] = new CloudWatchClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
logger: defaultDebugLogger(debug),
});
}
return this.clients[pipelineId];
}

put({ Namespace, MetricData }) {
Expand Down
38 changes: 24 additions & 14 deletions src/connectors/dynamodb.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,39 @@ class Connector {
debug,
tableName,
convertEmptyValues,
pipelineId,
removeUndefinedValues = true,
timeout = Number(process.env.DYNAMODB_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
retryConfig = defaultRetryConfig,
}) {
this.debug = (msg) => debug('%j', msg);
this.tableName = tableName || /* istanbul ignore next */ 'undefined';
const dynamoClient = new DynamoDBClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay),
logger: defaultDebugLogger(debug),
});
this.db = DynamoDBDocumentClient.from(dynamoClient, {
marshallOptions: {
convertEmptyValues,
removeUndefinedValues,
},
});
this.db = Connector.getClient(pipelineId, debug, convertEmptyValues, removeUndefinedValues, timeout);
this.retryConfig = retryConfig;
}

static clients = {};

static getClient(pipelineId, debug, convertEmptyValues, removeUndefinedValues, timeout) {
if (!this.clients[pipelineId]) {
const dynamoClient = new DynamoDBClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay),
logger: defaultDebugLogger(debug),
});
this.clients[pipelineId] = DynamoDBDocumentClient.from(dynamoClient, {
marshallOptions: {
convertEmptyValues,
removeUndefinedValues,
},
});
}
return this.clients[pipelineId];
}

update(inputParams) {
const params = {
TableName: this.tableName,
Expand Down
26 changes: 18 additions & 8 deletions src/connectors/eventbridge.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,31 @@ import { defaultDebugLogger } from '../utils/log';
class Connector {
constructor({
debug,
pipelineId,
timeout = Number(process.env.BUS_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
retryConfig = defaultRetryConfig,
}) {
this.debug = (msg) => debug('%j', msg);
this.bus = new EventBridgeClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay),
logger: defaultDebugLogger(debug),
});
this.bus = Connector.getClient(pipelineId, debug, timeout);
this.retryConfig = retryConfig;
}

static clients = {};

static getClient(pipelineId, debug, timeout) {
if (!this.clients[pipelineId]) {
this.clients[pipelineId] = new EventBridgeClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay),
logger: defaultDebugLogger(debug),
});
}
return this.clients[pipelineId];
}

putEvents(params) {
return this._putEvents(params, []);
}
Expand Down
24 changes: 17 additions & 7 deletions src/connectors/firehose.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,28 @@ import { defaultDebugLogger } from '../utils/log';
class Connector {
constructor({
debug,
pipelineId,
deliveryStreamName = process.env.DELIVERY_STREAM_NAME,
timeout = Number(process.env.FIREHOSE_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
}) {
this.debug = (msg) => debug('%j', msg);
this.deliveryStreamName = deliveryStreamName || 'undefined';
this.stream = new FirehoseClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
logger: defaultDebugLogger(debug),
});
this.stream = Connector.getClient(pipelineId, debug, timeout);
}

static clients = {};

static getClient(pipelineId, debug, timeout) {
if (!this.clients[pipelineId]) {
this.clients[pipelineId] = new FirehoseClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
logger: defaultDebugLogger(debug),
});
}
return this.clients[pipelineId];
}

putRecordBatch(inputParams) {
Expand Down
26 changes: 18 additions & 8 deletions src/connectors/kinesis.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,33 @@ import { defaultDebugLogger } from '../utils/log';
class Connector {
constructor({
debug,
pipelineId,
streamName = process.env.STREAM_NAME,
timeout = Number(process.env.KINESIS_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
retryConfig = defaultRetryConfig,
}) {
this.debug = (msg) => debug('%j', msg);
this.streamName = streamName || 'undefined';
this.stream = new KinesisClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay),
logger: defaultDebugLogger(debug),
});
this.stream = Connector.getClient(pipelineId, debug, timeout);
this.retryConfig = retryConfig;
}

static clients = {};

static getClient(pipelineId, debug, timeout) {
if (!this.clients[pipelineId]) {
this.clients[pipelineId] = new KinesisClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay),
logger: defaultDebugLogger(debug),
});
}
return this.clients[pipelineId];
}

putRecords(inputParams) {
const params = {
StreamName: this.streamName,
Expand Down
24 changes: 17 additions & 7 deletions src/connectors/lambda.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,26 @@ import { defaultDebugLogger } from '../utils/log';
class Connector {
constructor({
debug,
pipelineId,
timeout = Number(process.env.LAMBDA_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
}) {
this.debug = (msg) => debug('%j', msg);
this.lambda = new LambdaClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
logger: defaultDebugLogger(debug),
});
this.lambda = Connector.getClient(pipelineId, debug, timeout);
}

static clients = {};

static getClient(pipelineId, debug, timeout) {
if (!this.clients[pipelineId]) {
this.clients[pipelineId] = new LambdaClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
logger: defaultDebugLogger(debug),
});
}
return this.clients[pipelineId];
}

invoke(params) {
Expand Down
24 changes: 17 additions & 7 deletions src/connectors/s3.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,28 @@ import { defaultDebugLogger } from '../utils/log';
class Connector {
constructor({
debug,
pipelineId,
bucketName = process.env.BUCKET_NAME,
timeout = Number(process.env.S3_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
}) {
this.debug = (msg) => debug('%j', msg);
this.bucketName = bucketName || 'undefined';
this.bucket = new S3Client({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
logger: defaultDebugLogger(debug),
});
this.bucket = Connector.getClient(pipelineId, debug, timeout);
}

static clients = {};

static getClient(pipelineId, debug, timeout) {
if (!this.clients[pipelineId]) {
this.clients[pipelineId] = new S3Client({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
logger: defaultDebugLogger(debug),
});
}
return this.clients[pipelineId];
}

putObject(inputParams) {
Expand Down
24 changes: 17 additions & 7 deletions src/connectors/secretsmgr.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,28 @@ import { defaultDebugLogger } from '../utils/log';
class Connector {
constructor({
debug,
pipelineId,
secretId,
timeout = Number(process.env.SECRETSMGR_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
}) {
this.debug = /* istanbul ignore next */ (msg) => debug('%j', msg);
this.secretId = secretId;
this.sm = new SecretsManagerClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
logger: defaultDebugLogger(debug),
});
this.sm = Connector.getClient(pipelineId, debug, timeout);
}

static clients = {};

static getClient(pipelineId, debug, timeout) {
if (!this.clients[pipelineId]) {
this.clients[pipelineId] = new SecretsManagerClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
logger: defaultDebugLogger(debug),
});
}
return this.clients[pipelineId];
}

async get() {
Expand Down
26 changes: 18 additions & 8 deletions src/connectors/sns.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,33 @@ import { defaultDebugLogger } from '../utils/log';
class Connector {
constructor({
debug,
pipelineId,
topicArn = process.env.TOPIC_ARN,
timeout = Number(process.env.SNS_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
retryConfig = defaultRetryConfig,
}) {
this.debug = (msg) => debug('%j', msg);
this.topicArn = topicArn || 'undefined';
this.topic = new SNSClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay),
logger: defaultDebugLogger(debug),
});
this.topic = Connector.getClient(pipelineId, debug, timeout);
this.retryConfig = retryConfig;
}

static clients = {};

static getClient(pipelineId, debug, timeout) {
if (!this.clients[pipelineId]) {
this.clients[pipelineId] = new SNSClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay),
logger: defaultDebugLogger(debug),
});
}
return this.clients[pipelineId];
}

publish(inputParams) {
const params = {
TopicArn: this.topicArn,
Expand Down
Loading

0 comments on commit 6505073

Please sign in to comment.