Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add-timestream-support #384

Merged
merged 2 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,072 changes: 1,651 additions & 421 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "aws-lambda-stream",
"version": "1.0.25",
"version": "1.0.26",
"description": "Create stream processors with AWS Lambda functions.",
"keywords": [
"aws",
Expand Down Expand Up @@ -69,6 +69,7 @@
"@aws-sdk/client-sns": "^3.450.0",
"@aws-sdk/client-sqs": "^3.450.0",
"@aws-sdk/client-sts": "^3.450.0",
"@aws-sdk/client-timestream-write": "^3.450.0",
"@aws-sdk/lib-dynamodb": "^3.450.0",
"@aws-sdk/util-dynamodb": "^3.450.0",
"@babel/cli": "^7.10.0",
Expand Down
52 changes: 52 additions & 0 deletions src/connectors/timestream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/* eslint import/no-extraneous-dependencies: ["error", {"devDependencies": true}] */
import { TimestreamWriteClient, WriteRecordsCommand } from '@aws-sdk/client-timestream-write';
import { NodeHttpHandler } from '@smithy/node-http-handler';
import Promise from 'bluebird';
import { omit, pick } from 'lodash';
import { defaultDebugLogger } from '../utils/log';

class Connector {
constructor({
debug,
pipelineId,
timeout = Number(process.env.CW_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
additionalClientOpts = {},
...opt
}) {
this.debug = (msg) => debug('%j', msg);
this.client = Connector.getClient(pipelineId, debug, timeout, additionalClientOpts); this.opt = opt;
}

static clients = {};

static getClient(pipelineId, debug, timeout, additionalClientOpts) {
const addlRequestHandlerOpts = pick(additionalClientOpts, ['requestHandler']);
const addlClientOpts = omit(additionalClientOpts, ['requestHandler']);

if (!this.clients[pipelineId]) {
this.clients[pipelineId] = new TimestreamWriteClient({
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
...addlRequestHandlerOpts,
}),
logger: defaultDebugLogger(debug),
...addlClientOpts,
});
}
return this.clients[pipelineId];
}

writeRecords(params, ctx) {
return this._sendCommand(new WriteRecordsCommand(params), ctx);
}

_sendCommand(command, ctx) {
this.opt.metrics?.capture(this.client, command, 'timestream', this.opt, ctx);
return Promise.resolve(this.client.send(command))
.tap(this.debug)
.tapCatch(this.debug);
}
}

export default Connector;
33 changes: 33 additions & 0 deletions src/flavors/materializeTimestream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import {
printStartPipeline, printEndPipeline,
faulty,
splitObject, compact,
} from '../utils';
import {
writeRecords,
} from '../sinks/timestream';
import {
filterOnEventType, filterOnContent,
} from '../filters';

export const materializeTimestream = (rule) => (s) => s // eslint-disable-line import/prefer-default-export
.filter(onEventType(rule))
.tap(printStartPipeline)

.filter(onContent(rule))

.through(compact(rule))
.through(splitObject(rule))

.map(toWriteRequest(rule))
.through(writeRecords(rule))

.tap(printEndPipeline);

const onEventType = (rule) => faulty((uow) => filterOnEventType(rule, uow));
const onContent = (rule) => faulty((uow) => filterOnContent(rule, uow));

const toWriteRequest = (rule) => faulty((uow) => ({
...uow,
writeRequest: rule.toWriteRequest(uow, rule),
}));
33 changes: 33 additions & 0 deletions src/sinks/timestream.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import _ from 'highland';

import Connector from '../connectors/timestream';

import { rejectWithFault } from '../utils/faults';
import { debug as d } from '../utils/print';

export const writeRecords = ({ // eslint-disable-line import/prefer-default-export
id: pipelineId,
debug = d('ts'),
writeRequestField = 'writeRequest',
writeResponseField = 'writeResponse',
parallel = Number(process.env.TIMESTREAM_PARALLEL) || Number(process.env.PARALLEL) || 8,
step = 'save',
...opt
} = {}) => {
const connector = new Connector({ pipelineId, debug, ...opt });

const write = (uow) => {
// istanbul ignore next
if (!uow[writeRequestField]) return _(Promise.resolve(uow));

const p = () => connector.writeRecords(uow[writeRequestField])
.then((writeResponse) => ({ ...uow, [writeResponseField]: writeResponse }))
.catch(rejectWithFault(uow));

return _(uow.metrics?.w(p, step) || p()); // wrap promise in a stream
};

return (s) => s
.map(write)
.parallel(parallel);
};
112 changes: 112 additions & 0 deletions test/unit/connectors/timestream.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import 'mocha';
import { expect } from 'chai';
import sinon from 'sinon';
import debug from 'debug';
import { TimestreamWriteClient, WriteRecordsCommand } from '@aws-sdk/client-timestream-write';
import { mockClient } from 'aws-sdk-client-mock';

import Connector from '../../../src/connectors/timestream';

describe('connectors/timestream.js', () => {
let mockCloudWatch;

beforeEach(() => {
mockCloudWatch = mockClient(TimestreamWriteClient);
});

afterEach(() => {
mockCloudWatch.restore();
});

it('should reuse client per pipeline', () => {
const client1 = Connector.getClient('test1', debug('test'));
const client2 = Connector.getClient('test1', debug('test'));
const client3 = Connector.getClient('test2', debug('test'));

expect(client1).to.eq(client2);
expect(client2).to.not.eq(client3);
});

it('should write', async () => {
const spy = sinon.spy((_) => ({
RecordsIngested: {
Total: 1,
},
}));
mockCloudWatch.on(WriteRecordsCommand).callsFake(spy);

const params = {
DatabaseName: 'd1',
TableName: 't1',
Records: [{
Dimensions: [
{
Name: 'account',
Value: 'dev',
}, {
Name: 'region',
Value: 'us-east-1',
}, {
Name: 'source',
Value: 'service-x',
}, {
Name: 'function',
Value: 'f1',
}, {
Name: 'pipeline',
Value: 'p1',
}, {
Name: 'type',
Value: 'thing-created',
},
],
MeasureName: 'domain.event',
MeasureValue: '1',
MeasureValueType: 'BIGINT',
Time: '1726940256001',
TimeUnit: 'MILLISECONDS',
}],
};
const data = await new Connector({ debug: debug('cw') })
.writeRecords(params);

expect(spy).to.have.been.calledOnce;
expect(spy).to.have.been.calledWith({
DatabaseName: 'd1',
TableName: 't1',
Records: [{
Dimensions: [
{
Name: 'account',
Value: 'dev',
}, {
Name: 'region',
Value: 'us-east-1',
}, {
Name: 'source',
Value: 'service-x',
}, {
Name: 'function',
Value: 'f1',
}, {
Name: 'pipeline',
Value: 'p1',
}, {
Name: 'type',
Value: 'thing-created',
},
],
MeasureName: 'domain.event',
MeasureValue: '1',
MeasureValueType: 'BIGINT',
Time: '1726940256001',
TimeUnit: 'MILLISECONDS',
}],
});
expect(data).to.deep.equal({
RecordsIngested: {
Total: 1,
},
});
});
});
109 changes: 109 additions & 0 deletions test/unit/flavors/materializeTimestream.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import 'mocha';
import { expect } from 'chai';
import sinon from 'sinon';

import {
initialize, initializeFrom,
ttl,
} from '../../../src';

import { toKinesisRecords, fromKinesis } from '../../../src/from/kinesis';

import Connector from '../../../src/connectors/timestream';

import { materializeTimestream } from '../../../src/flavors/materializeTimestream';

describe('flavors/materializeTimestream.js', () => {
beforeEach(() => {
sinon.stub(Connector.prototype, 'writeRecords').resolves({});
});

afterEach(sinon.restore);

it('should execute', (done) => {
const events = toKinesisRecords([
{
type: 'thing-submitted',
timestamp: 1548967022000,
thing: {
id: '1',
status: 's1',
},
},
{
type: 'thing-submitted',
timestamp: 1548967022000,
thing: {
id: '2',
status: 's1',
},
},
]);

initialize({
...initializeFrom(rules),
})
.assemble(fromKinesis(events), false)
.collect()
// .tap((collected) => console.log(JSON.stringify(collected, null, 2)))
.tap((collected) => {
expect(collected.length).to.equal(2);
expect(collected[0].pipeline).to.equal('m1');
expect(collected[0].event.type).to.equal('thing-submitted');
expect(collected[0].writeRequest).to.deep.equal({
DatabaseName: 'd1',
TableName: 't1',
Records: [
{
Dimensions: [
{
Name: 'type',
Value: 'thing-submitted',
},
{
Name: 'status',
Value: 's1',
},
],
MeasureName: 'domain.event',
MeasureValue: '1',
MeasureValueType: 'BIGINT',
Time: '1548967022000',
TimeUnit: 'MILLISECONDS',
},
],
});
})
.done(done);
});
});

const toWriteRequest = (uow) => ({
DatabaseName: 'd1',
TableName: 't1',
Records: [{
Dimensions: [
{
Name: 'type',
Value: uow.event.type,
}, {
Name: 'status',
Value: uow.event.thing.status,
},
],
MeasureName: 'domain.event',
MeasureValue: '1',
MeasureValueType: 'BIGINT',
Time: `${uow.event.timestamp}`,
TimeUnit: 'MILLISECONDS',
}],
});

const rules = [
{
id: 'm1',
flavor: materializeTimestream,
eventType: 'thing-submitted',
toWriteRequest,
},
];
2 changes: 1 addition & 1 deletion test/unit/sinks/cloudwatch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { putMetrics } from '../../../src/sinks/cloudwatch';

import Connector from '../../../src/connectors/cloudwatch';

describe('utils/cloudwatch.js', () => {
describe('sinks/cloudwatch.js', () => {
afterEach(sinon.restore);

it('should putMetrics', (done) => {
Expand Down
2 changes: 1 addition & 1 deletion test/unit/sinks/dynamodb.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {

import Connector from '../../../src/connectors/dynamodb';

describe('utils/dynamodb.js', () => {
describe('sinks/dynamodb.js', () => {
afterEach(sinon.restore);

it('should calculate ttl', () => {
Expand Down
2 changes: 1 addition & 1 deletion test/unit/sinks/eventbridge.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { publishToEventBridge as publish } from '../../../src/sinks/eventbridge'

import Connector from '../../../src/connectors/eventbridge';

describe('utils/eventbridge.js', () => {
describe('sinks/eventbridge.js', () => {
afterEach(sinon.restore);

it('should batch and publish', (done) => {
Expand Down
2 changes: 1 addition & 1 deletion test/unit/sinks/fetch.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { fetch } from '../../../src/sinks/fetch';

import Connector from '../../../src/connectors/fetch';

describe('utils/fetch.js', () => {
describe('sinks/fetch.js', () => {
afterEach(sinon.restore);

it('should fetch', (done) => {
Expand Down
Loading