diff --git a/.changeset/empty-avocados-juggle.md b/.changeset/empty-avocados-juggle.md new file mode 100644 index 000000000..3ae4bbad4 --- /dev/null +++ b/.changeset/empty-avocados-juggle.md @@ -0,0 +1,5 @@ +--- +'@segment/analytics-node': major +--- + +Removing support for Node.js 14 and 16 as they are EOL diff --git a/.changeset/hot-eyes-run.md b/.changeset/hot-eyes-run.md new file mode 100644 index 000000000..04c7ca938 --- /dev/null +++ b/.changeset/hot-eyes-run.md @@ -0,0 +1,15 @@ +--- +'@segment/analytics-node': major +--- + +Support for Segment OAuth2 + +OAuth2 must be enabled from the Segment dashboard. You will need a PEM format +private/public key pair. Once you've uploaded your public key, you will need +the OAuth Client Id, the Key Id, and your private key. You can set these in +the new OAuthSettings type and provide it in your Analytics configuration. + +Note: This introduces a breaking change only if you have implemented a custom +HTTPClient. HTTPClientRequest `data: Record` has changed to +`body: string`. Processing data into a string now occurs before calls to +`makeRequest` diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 91e3862c5..e27e2ab49 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,7 +10,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - node-version: [14, 16, 18] + node-version: [18] steps: - uses: actions/checkout@v3 - uses: actions/setup-node@v3 diff --git a/packages/node-integration-tests/src/cloudflare-tests/index.test.ts b/packages/node-integration-tests/src/cloudflare-tests/index.test.ts index f0aba3159..e49efe9c5 100644 --- a/packages/node-integration-tests/src/cloudflare-tests/index.test.ts +++ b/packages/node-integration-tests/src/cloudflare-tests/index.test.ts @@ -91,6 +91,68 @@ describe('Analytics in Cloudflare workers', () => { }, ], "sentAt": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "writeKey": "__TEST__", + }, + ] + ` + ) + }) + + it('can send an oauth event', async () => { + const batches: any[] = [] + const oauths: any[] = [] + mockSegmentServer.on('batch', (batch) => { + batches.push(batch) + }) + + mockSegmentServer.on('token', (oauth) => { + oauths.push(oauth) + }) + + const worker = await unstable_dev( + joinPath(__dirname, 'workers', 'send-oauth-event.ts'), + { + experimental: { + disableExperimentalWarning: true, + }, + bundle: true, + } + ) + const response = await worker.fetch('http://localhost') + await response.text() + await worker.stop() + + console.log(batches) + + expect(oauths[0]).toHaveLength(756) + + expect(batches).toMatchInlineSnapshot( + batches.map(() => snapshotMatchers.getReqBody(1)), + ` + [ + { + "batch": [ + { + "_metadata": { + "jsRuntime": "cloudflare-worker", + }, + "context": { + "library": { + "name": "@segment/analytics-node", + "version": Any, + }, + }, + "event": "some-event", + "integrations": {}, + "messageId": Any, + "properties": {}, + "timestamp": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "type": "track", + "userId": "some-user", + }, + ], + "sentAt": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "writeKey": "__TEST__", }, ] ` @@ -235,6 +297,7 @@ describe('Analytics in Cloudflare workers', () => { }, ], "sentAt": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "writeKey": "__TEST__", }, ] ` @@ -314,6 +377,7 @@ describe('Analytics in Cloudflare workers', () => { }, ], "sentAt": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "writeKey": "__TEST__", }, { "batch": [ @@ -359,6 +423,7 @@ describe('Analytics in Cloudflare workers', () => { }, ], "sentAt": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "writeKey": "__TEST__", }, { "batch": [ @@ -404,6 +469,7 @@ describe('Analytics in Cloudflare workers', () => { }, ], "sentAt": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "writeKey": "__TEST__", }, ] ` diff --git a/packages/node-integration-tests/src/cloudflare-tests/workers/send-oauth-event.ts b/packages/node-integration-tests/src/cloudflare-tests/workers/send-oauth-event.ts new file mode 100644 index 000000000..d758ae1c1 --- /dev/null +++ b/packages/node-integration-tests/src/cloudflare-tests/workers/send-oauth-event.ts @@ -0,0 +1,54 @@ +/// +import { Analytics, OAuthSettings } from '@segment/analytics-node' + +export default { + async fetch(_request: Request, _env: {}, _ctx: ExecutionContext) { + const settings: OAuthSettings = { + clientId: '__CLIENTID__', + // Has to be a valid key to encrypt the JWT, not used for anything else + clientKey: `-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDVll7uJaH322IN +PQsH2aOXZJ2r1q+6hpVK1R5JV1p41PUzn8pOxyXFHWB+53dUd4B8qywKS36XQjp0 +VmhR1tQ22znQ9ZCM6y4LGeOJBjAZiFZLcGQNNrDFC0WGWTrK1ZTS2K7p5qy4fIXG +laNkMXiGGCawkgcHAdOvPTy8m1d9a6YSetYVmBP/tEYN95jPyZFIoHQfkQPBPr9W +cWPpdEBzasHV5d957akjurPpleDiD5as66UW4dkWXvS7Wu7teCLCyDApcyJKTb2Z +SXybmWjhIZuctZMAx3wT/GgW3FbkGaW5KLQgBUMzjpL0fCtMatlqckMD92ll1FuK +R+HnXu05AgMBAAECggEBAK4o2il4GDUh9zbyQo9ZIPLuwT6AZXRED3Igi3ykNQp4 +I6S/s9g+vQaY6LkyBnSiqOt/K/8NBiFSiJWaa5/n+8zrP56qzf6KOlYk+wsdN5Vq +PWtwLrUzljpl8YAWPEFunNa8hwwE42vfZbnDBKNLT4qQIOQzfnVxQOoQlfj49gM2 +iSrblvsnQTyucFy3UyTeioHbh8q2Xqcxry5WUCOrFDd3IIwATTpLZGw0IPeuFJbJ +NfBizLEcyJaM9hujQU8PRCRd16MWX+bbYM6Mh4dkT40QXWsVnHBHwsgPdQgDgseF +Na4ajtHoC0DlwYCXpCm3IzJfKfq/LR2q8NDUgKeF4AECgYEA9nD4czza3SRbzhpZ +bBoK77CSNqCcMAqyuHB0hp/XX3yB7flF9PIPb2ReO8wwmjbxn+bm7PPz2Uwd2SzO +pU+FXmyKJr53Jxw/hmDWZCoh42gsGDlVqpmytzsj74KlaYiMyZmEGbD7t/FGfNGV +LdLDJaHIYxEimFviOTXKCeKvPAECgYEA3d8tv4jdp1uAuRZiU9Z/tfw5mJOi3oXF +8AdFFDwaPzcTorEAxjrt9X6IjPbLIDJNJtuXYpe+dG6720KyuNnhLhWW9oZEJTwT +dUgqZ2fTCOS9uH0jSn+ZFlgTWI6UDQXRwE7z8avlhMIrQVmPsttGTo7V6sQVtGRx +bNj2RSVekTkCgYAJvy4UYLPHS0jWPfSLcfw8vp8JyhBjVgj7gncZW/kIrcP1xYYe +yfQSU8XmV40UjFfCGz/G318lmP0VOdByeVKtCV3talsMEPHyPqI8E+6DL/uOebYJ +qUqINK6XKnOgWOY4kvnGillqTQCcry1XQp61PlDOmj7kB75KxPXYrj6AAQKBgQDa ++ixCv6hURuEyy77cE/YT/Q4zYnL6wHjtP5+UKwWUop1EkwG6o+q7wtiul90+t6ah +1VUCP9X/QFM0Qg32l0PBohlO0pFrVnG17TW8vSHxwyDkds1f97N19BOT8ZR5jebI +sKPfP9LVRnY+l1BWLEilvB+xBzqMwh2YWkIlWI6PMQKBgGi6TBnxp81lOYrxVRDj +/3ycRnVDmBdlQKFunvfzUBmG1mG/G0YHeVSUKZJGX7w2l+jnDwIA383FcUeA8X6A +l9q+amhtkwD/6fbkAu/xoWNl+11IFoxd88y2ByBFoEKB6UVLuCTSKwXDqzEZet7x +mDyRxq7ohIzLkw8b8buDeuXZ +-----END PRIVATE KEY-----`, + keyId: '__KEYID__', + maxRetries: 3, + authServer: 'http://localhost:3000', + scope: 'tracking_api:write', + } + + const analytics = new Analytics({ + writeKey: '__TEST__', + host: 'http://localhost:3000', + oauthSettings: settings, + }) + + analytics.track({ userId: 'some-user', event: 'some-event' }) + + await analytics.closeAndFlush() + return new Response('ok') + }, +} diff --git a/packages/node-integration-tests/src/server/mock-segment-workers.ts b/packages/node-integration-tests/src/server/mock-segment-workers.ts index 0c767c2c0..6d442ff0c 100644 --- a/packages/node-integration-tests/src/server/mock-segment-workers.ts +++ b/packages/node-integration-tests/src/server/mock-segment-workers.ts @@ -20,29 +20,52 @@ function isBatchRequest(req: IncomingMessage) { return false } +function isTokenRequest(req: IncomingMessage) { + if (req.url?.endsWith('/token')) { + return true + } + return false +} + type BatchHandler = (batch: any) => void +type TokenHandler = (token: any) => void export class MockSegmentServer { private server: ReturnType private port: number private onBatchHandlers: Set = new Set() + private onTokenHandlers: Set = new Set() constructor(port: number) { this.port = port this.server = createServer(async (req, res) => { - if (!isBatchRequest(req)) { + if (!isBatchRequest(req) && !isTokenRequest(req)) { res.writeHead(404, { 'Content-Type': 'application/json' }) res.end(JSON.stringify({ success: false })) return } const text = await getRequestText(req) - const batch = JSON.parse(text) - this.onBatchHandlers.forEach((handler) => { - handler(batch) - }) res.writeHead(200, { 'Content-Type': 'application/json' }) - res.end(JSON.stringify({ success: true })) + if (isBatchRequest(req)) { + const batch = JSON.parse(text) + this.onBatchHandlers.forEach((handler) => { + handler(batch) + }) + res.end(JSON.stringify({ success: true })) + } else if (isTokenRequest(req)) { + this.onTokenHandlers.forEach((handler) => { + handler(text) + }) + res.end( + JSON.stringify({ + access_token: '__TOKEN__', + token_type: 'Bearer', + scope: 'tracking_api:write', + expires_in: 86400, + }) + ) + } }) } @@ -68,11 +91,19 @@ export class MockSegmentServer { }) } - on(_event: 'batch', handler: BatchHandler) { - this.onBatchHandlers.add(handler) + on(_event: 'batch' | 'token', handler: BatchHandler) { + if (_event === 'batch') { + this.onBatchHandlers.add(handler) + } else if (_event === 'token') { + this.onTokenHandlers.add(handler) + } } - off(_event: 'batch', handler: BatchHandler) { - this.onBatchHandlers.delete(handler) + off(_event: 'batch' | 'token', handler: BatchHandler) { + if (_event === 'batch') { + this.onBatchHandlers.delete(handler) + } else if (_event === 'token') { + this.onTokenHandlers.delete(handler) + } } } diff --git a/packages/node/README.md b/packages/node/README.md index 3427d231c..564b0991a 100644 --- a/packages/node/README.md +++ b/packages/node/README.md @@ -147,4 +147,30 @@ export default { ``` +### OAuth 2 +In order to guarantee authorized communication between your server environment and Segment's Tracking API, you can [enable OAuth 2 in your Segment workspace](https://segment.com/docs/partners/enable-with-segment/). To support the non-interactive server environment, the OAuth workflow used is a signed client assertion JWT. You will need a public and private key pair where the public key is uploaded to the segment dashboard and the private key is kept in your server environment to be used by this SDK. Your server will verify its identity by signing a token request and will receive a token that is used to to authorize all communication with the Segment Tracking API. +You will also need to provide the OAuth Application ID and the public key's ID, both of which are provided in the Segment dashboard. You should ensure that you are implementing handling for Analytics SDK errors. Good logging will help distinguish any configuration issues. + +```ts +import { Analytics, OAuthSettings } from '@segment/analytics-node'; +import { readFileSync } from 'fs' + +const privateKey = readFileSync('private.pem', 'utf8') + +const settings: OAuthSettings = { + clientId: '', + clientKey: privateKey, + keyId: '', +} + +const analytics = new Analytics({ + writeKey: '', + oauthSettings: settings, +}) + +analytics.on('error', (err) => { console.error(err) }) + +analytics.track({ userId: 'foo', event: 'bar' }) + +``` diff --git a/packages/node/package.json b/packages/node/package.json index 88a62a1f2..993427471 100644 --- a/packages/node/package.json +++ b/packages/node/package.json @@ -17,7 +17,7 @@ "!*.tsbuildinfo" ], "engines": { - "node": ">=14" + "node": ">=18" }, "scripts": { ".": "yarn run -T turbo run --filter=@segment/analytics-node", @@ -39,6 +39,7 @@ "@segment/analytics-core": "1.4.1", "@segment/analytics-generic-utils": "1.1.1", "buffer": "^6.0.3", + "jose": "^5.1.0", "node-fetch": "^2.6.7", "tslib": "^2.4.1" }, diff --git a/packages/node/src/__tests__/graceful-shutdown-integration.test.ts b/packages/node/src/__tests__/graceful-shutdown-integration.test.ts index ef5728cbb..52c9e6fc4 100644 --- a/packages/node/src/__tests__/graceful-shutdown-integration.test.ts +++ b/packages/node/src/__tests__/graceful-shutdown-integration.test.ts @@ -28,11 +28,11 @@ describe('Ability for users to exit without losing events', () => { }) const _helpers = { getFetchCalls: () => - makeReqSpy.mock.calls.map(([{ url, method, data, headers }]) => ({ + makeReqSpy.mock.calls.map(([{ url, method, body, headers }]) => ({ url, method, headers, - data, + body, })), makeTrackCall: (analytics = ajs, cb?: (...args: any[]) => void) => { analytics.track({ userId: 'foo', event: 'Thing Updated' }, cb) @@ -206,7 +206,7 @@ describe('Ability for users to exit without losing events', () => { expect(elapsedTime).toBeLessThan(100) const calls = _helpers.getFetchCalls() expect(calls.length).toBe(1) - expect(calls[0].data.batch.length).toBe(2) + expect(JSON.parse(calls[0].body).batch.length).toBe(2) }) test('should wait to flush if close is called and an event has not made it to the segment.io plugin yet', async () => { @@ -238,7 +238,7 @@ describe('Ability for users to exit without losing events', () => { expect(elapsedTime).toBeLessThan(TRACK_DELAY * 2) const calls = _helpers.getFetchCalls() expect(calls.length).toBe(1) - expect(calls[0].data.batch.length).toBe(2) + expect(JSON.parse(calls[0].body).batch.length).toBe(2) }) }) diff --git a/packages/node/src/__tests__/http-client.integration.test.ts b/packages/node/src/__tests__/http-client.integration.test.ts index 9545477d7..ee7572ceb 100644 --- a/packages/node/src/__tests__/http-client.integration.test.ts +++ b/packages/node/src/__tests__/http-client.integration.test.ts @@ -20,7 +20,6 @@ const helpers = { ) => { expect(url).toBe('https://api.segment.io/v1/batch') expect(options.headers).toEqual({ - Authorization: 'Basic Zm9vOg==', 'Content-Type': 'application/json', 'User-Agent': 'analytics-node-next/latest', }) diff --git a/packages/node/src/__tests__/http-integration.test.ts b/packages/node/src/__tests__/http-integration.test.ts index fc183f6b6..09618ae72 100644 --- a/packages/node/src/__tests__/http-integration.test.ts +++ b/packages/node/src/__tests__/http-integration.test.ts @@ -81,9 +81,6 @@ describe('Method Smoke Tests', () => { expect(pick(headers, 'authorization', 'user-agent', 'content-type')) .toMatchInlineSnapshot(` { - "authorization": [ - "Basic Zm9vOg==", - ], "content-type": [ "application/json", ], @@ -157,6 +154,7 @@ describe('Method Smoke Tests', () => { }, ], "sentAt": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "writeKey": "foo", } ` ) @@ -194,6 +192,7 @@ describe('Method Smoke Tests', () => { }, ], "sentAt": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "writeKey": "foo", } ` ) @@ -225,6 +224,7 @@ describe('Method Smoke Tests', () => { }, ], "sentAt": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "writeKey": "foo", } ` ) @@ -262,6 +262,7 @@ describe('Method Smoke Tests', () => { }, ], "sentAt": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "writeKey": "foo", } ` ) @@ -292,6 +293,7 @@ describe('Method Smoke Tests', () => { }, ], "sentAt": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "writeKey": "foo", } ` ) @@ -329,6 +331,7 @@ describe('Method Smoke Tests', () => { }, ], "sentAt": StringMatching /\\^\\\\d\\{4\\}-\\\\d\\{2\\}-\\\\d\\{2\\}T\\\\d\\{2\\}:\\\\d\\{2\\}:\\\\d\\{2\\}\\\\\\.\\\\d\\{3\\}Z\\$/, + "writeKey": "foo", } ` ) diff --git a/packages/node/src/__tests__/integration.test.ts b/packages/node/src/__tests__/integration.test.ts index 8a2f702cd..f16bbb4ca 100644 --- a/packages/node/src/__tests__/integration.test.ts +++ b/packages/node/src/__tests__/integration.test.ts @@ -7,6 +7,8 @@ import { TestFetchClient, } from './test-helpers/create-test-analytics' +const isoDateRegEx = /^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{3}Z$/ + const writeKey = 'foo' jest.setTimeout(10000) const timestamp = new Date() @@ -247,7 +249,9 @@ describe('track', () => { const track = resolveCtx(analytics, 'track') analytics.track({ event: 'hello', userId: 'foo' }) await track - expect(makeReqSpy.mock.calls[0][0].data.sentAt).toBeInstanceOf(Date) + expect(JSON.parse(makeReqSpy.mock.calls[0][0].body).sentAt).toMatch( + isoDateRegEx + ) }) it('generates track events', async () => { const analytics = createTestAnalytics() diff --git a/packages/node/src/__tests__/oauth.integration.test.ts b/packages/node/src/__tests__/oauth.integration.test.ts new file mode 100644 index 000000000..5d826e92f --- /dev/null +++ b/packages/node/src/__tests__/oauth.integration.test.ts @@ -0,0 +1,334 @@ +import { HTTPResponse } from '../lib/http-client' +import { OAuthSettings } from '../lib/types' +import { + TestFetchClient, + createTestAnalytics, +} from './test-helpers/create-test-analytics' +import { createError } from './test-helpers/factories' +import { resolveCtx } from './test-helpers/resolve-ctx' + +// NOTE: Fake private key for illustrative purposes only +const privateKey = `-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDVll7uJaH322IN +PQsH2aOXZJ2r1q+6hpVK1R5JV1p41PUzn8pOxyXFHWB+53dUd4B8qywKS36XQjp0 +VmhR1tQ22znQ9ZCM6y4LGeOJBjAZiFZLcGQNNrDFC0WGWTrK1ZTS2K7p5qy4fIXG +laNkMXiGGCawkgcHAdOvPTy8m1d9a6YSetYVmBP/tEYN95jPyZFIoHQfkQPBPr9W +cWPpdEBzasHV5d957akjurPpleDiD5as66UW4dkWXvS7Wu7teCLCyDApcyJKTb2Z +SXybmWjhIZuctZMAx3wT/GgW3FbkGaW5KLQgBUMzjpL0fCtMatlqckMD92ll1FuK +R+HnXu05AgMBAAECggEBAK4o2il4GDUh9zbyQo9ZIPLuwT6AZXRED3Igi3ykNQp4 +I6S/s9g+vQaY6LkyBnSiqOt/K/8NBiFSiJWaa5/n+8zrP56qzf6KOlYk+wsdN5Vq +PWtwLrUzljpl8YAWPEFunNa8hwwE42vfZbnDBKNLT4qQIOQzfnVxQOoQlfj49gM2 +iSrblvsnQTyucFy3UyTeioHbh8q2Xqcxry5WUCOrFDd3IIwATTpLZGw0IPeuFJbJ +NfBizLEcyJaM9hujQU8PRCRd16MWX+bbYM6Mh4dkT40QXWsVnHBHwsgPdQgDgseF +Na4ajtHoC0DlwYCXpCm3IzJfKfq/LR2q8NDUgKeF4AECgYEA9nD4czza3SRbzhpZ +bBoK77CSNqCcMAqyuHB0hp/XX3yB7flF9PIPb2ReO8wwmjbxn+bm7PPz2Uwd2SzO +pU+FXmyKJr53Jxw/hmDWZCoh42gsGDlVqpmytzsj74KlaYiMyZmEGbD7t/FGfNGV +LdLDJaHIYxEimFviOTXKCeKvPAECgYEA3d8tv4jdp1uAuRZiU9Z/tfw5mJOi3oXF +8AdFFDwaPzcTorEAxjrt9X6IjPbLIDJNJtuXYpe+dG6720KyuNnhLhWW9oZEJTwT +dUgqZ2fTCOS9uH0jSn+ZFlgTWI6UDQXRwE7z8avlhMIrQVmPsttGTo7V6sQVtGRx +bNj2RSVekTkCgYAJvy4UYLPHS0jWPfSLcfw8vp8JyhBjVgj7gncZW/kIrcP1xYYe +yfQSU8XmV40UjFfCGz/G318lmP0VOdByeVKtCV3talsMEPHyPqI8E+6DL/uOebYJ +qUqINK6XKnOgWOY4kvnGillqTQCcry1XQp61PlDOmj7kB75KxPXYrj6AAQKBgQDa ++ixCv6hURuEyy77cE/YT/Q4zYnL6wHjtP5+UKwWUop1EkwG6o+q7wtiul90+t6ah +1VUCP9X/QFM0Qg32l0PBohlO0pFrVnG17TW8vSHxwyDkds1f97N19BOT8ZR5jebI +sKPfP9LVRnY+l1BWLEilvB+xBzqMwh2YWkIlWI6PMQKBgGi6TBnxp81lOYrxVRDj +/3ycRnVDmBdlQKFunvfzUBmG1mG/G0YHeVSUKZJGX7w2l+jnDwIA383FcUeA8X6A +l9q+amhtkwD/6fbkAu/xoWNl+11IFoxd88y2ByBFoEKB6UVLuCTSKwXDqzEZet7x +mDyRxq7ohIzLkw8b8buDeuXZ +-----END PRIVATE KEY-----` + +jest.setTimeout(10000) +const timestamp = new Date() + +class OauthFetchClient extends TestFetchClient {} + +const oauthTestClient = new OauthFetchClient() +const oauthFetcher = jest.spyOn(oauthTestClient, 'makeRequest') + +const tapiTestClient = new TestFetchClient() +const tapiFetcher = jest.spyOn(tapiTestClient, 'makeRequest') + +const getOAuthSettings = (): OAuthSettings => ({ + httpClient: oauthTestClient, + maxRetries: 3, + clientId: 'clientId', + clientKey: privateKey, + keyId: 'keyId', + scope: 'scope', + authServer: 'http://127.0.0.1:1234', +}) + +const createOAuthSuccess = async (body?: any): Promise => ({ + text: async () => JSON.stringify(body), + status: 200, + statusText: 'OK', +}) + +const createOAuthError = async ( + overrides: Partial = {} +): Promise => ({ + status: 400, + statusText: 'Foo', + text: async () => '', + ...overrides, +}) + +describe('OAuth Integration Success', () => { + it('track event with OAuth', async () => { + const analytics = createTestAnalytics({ + oauthSettings: getOAuthSettings(), + }) + const eventName = 'Test Event' + + oauthFetcher.mockReturnValue( + createOAuthSuccess({ access_token: 'token', expires_in: 100 }) + ) + + analytics.track({ + event: eventName, + anonymousId: 'unknown', + userId: 'known', + timestamp: timestamp, + }) + + const ctx1 = await resolveCtx(analytics, 'track') + + expect(ctx1.event.type).toEqual('track') + expect(ctx1.event.event).toEqual(eventName) + expect(ctx1.event.properties).toEqual({}) + expect(ctx1.event.anonymousId).toEqual('unknown') + expect(ctx1.event.userId).toEqual('known') + expect(ctx1.event.timestamp).toEqual(timestamp) + + expect(oauthFetcher).toHaveBeenCalledTimes(1) + + await analytics.closeAndFlush() + }) + it('track event with OAuth after retry', async () => { + const analytics = createTestAnalytics({ + oauthSettings: getOAuthSettings(), + }) + oauthFetcher + .mockReturnValueOnce(createOAuthError({ status: 425 })) + .mockReturnValueOnce( + createOAuthSuccess({ access_token: 'token', expires_in: 100 }) + ) + + const eventName = 'Test Event' + + analytics.track({ + event: eventName, + anonymousId: 'unknown', + userId: 'known', + timestamp: timestamp, + }) + + const ctx1 = await resolveCtx(analytics, 'track') + + expect(ctx1.event.type).toEqual('track') + expect(ctx1.event.event).toEqual(eventName) + expect(ctx1.event.properties).toEqual({}) + expect(ctx1.event.anonymousId).toEqual('unknown') + expect(ctx1.event.userId).toEqual('known') + expect(ctx1.event.timestamp).toEqual(timestamp) + + expect(oauthFetcher).toHaveBeenCalledTimes(2) + + await analytics.closeAndFlush() + }) + + it('delays appropriately on 429 error', async () => { + const analytics = createTestAnalytics({ + oauthSettings: getOAuthSettings(), + }) + const retryTime = Date.now() + 250 + oauthFetcher + .mockReturnValueOnce( + createOAuthError({ + status: 429, + headers: { 'X-RateLimit-Reset': retryTime }, + }) + ) + .mockReturnValue( + createOAuthSuccess({ access_token: 'token', expires_in: 100 }) + ) + + analytics.track({ + event: 'Test Event', + anonymousId: 'unknown', + userId: 'known', + timestamp: timestamp, + }) + const ctx1 = await resolveCtx(analytics, 'track') // forces exception to be thrown + expect(ctx1.event.type).toEqual('track') + await analytics.closeAndFlush() + expect(retryTime).toBeLessThan(Date.now()) + }) +}) + +describe('OAuth Failure', () => { + it('surfaces error after retries', async () => { + const analytics = createTestAnalytics({ + oauthSettings: getOAuthSettings(), + }) + + oauthFetcher.mockReturnValue(createOAuthError({ status: 500 })) + + const eventName = 'Test Event' + + try { + analytics.track({ + event: eventName, + anonymousId: 'unknown', + userId: 'known', + timestamp: timestamp, + }) + + const ctx1 = await resolveCtx(analytics, 'track') + + expect(ctx1.event.type).toEqual('track') + expect(ctx1.event.event).toEqual(eventName) + expect(ctx1.event.properties).toEqual({}) + expect(ctx1.event.anonymousId).toEqual('unknown') + expect(ctx1.event.userId).toEqual('known') + expect(ctx1.event.timestamp).toEqual(timestamp) + + expect(oauthFetcher).toHaveBeenCalledTimes(3) + + await analytics.closeAndFlush() + + throw new Error('fail') + } catch (err: any) { + expect(err.reason).toEqual(new Error('[500] Foo')) + expect(err.code).toMatch(/delivery_failure/) + } + }) + + it('surfaces error after failing immediately', async () => { + const logger = jest.fn() + const analytics = createTestAnalytics({ + oauthSettings: getOAuthSettings(), + }).on('error', (err) => { + logger(err) + }) + + oauthFetcher.mockReturnValue(createOAuthError({ status: 400 })) + + try { + analytics.track({ + event: 'Test Event', + anonymousId: 'unknown', + userId: 'known', + timestamp: timestamp, + }) + + const ctx1 = await resolveCtx(analytics, 'track') // forces exception to be thrown + expect(ctx1.event.type).toEqual('track') + await analytics.closeAndFlush() + + expect(logger).toHaveBeenCalledWith('foo') + throw new Error('fail') + } catch (err: any) { + expect(err.reason).toEqual(new Error('[400] Foo')) + expect(err.code).toMatch(/delivery_failure/) + } + }) + + it('handles a bad key', async () => { + const props = getOAuthSettings() + props.clientKey = 'Garbage' + const analytics = createTestAnalytics({ + oauthSettings: props, + }) + + try { + analytics.track({ + event: 'Test Event', + anonymousId: 'unknown', + userId: 'known', + timestamp: timestamp, + }) + await analytics.closeAndFlush() + const ctx1 = await resolveCtx(analytics, 'track') // forces exception to be thrown + expect(ctx1.event.type).toEqual('track') + throw new Error('fail') + } catch (err: any) { + expect(err.reason).toEqual( + new TypeError('"pkcs8" must be PKCS#8 formatted string') + ) + } + }) + + it('OAuth inherits Analytics custom client', async () => { + const oauthSettings = getOAuthSettings() + oauthSettings.httpClient = undefined + const analytics = createTestAnalytics({ + oauthSettings: oauthSettings, + httpClient: tapiTestClient, + }) + tapiFetcher.mockReturnValue(createOAuthError({ status: 415 })) + + try { + analytics.track({ + event: 'Test Event', + anonymousId: 'unknown', + userId: 'known', + timestamp: timestamp, + }) + + await resolveCtx(analytics, 'track') + await analytics.closeAndFlush() + + throw new Error('fail') + } catch (err: any) { + expect(err.reason).toEqual(new Error('[415] Foo')) + expect(err.code).toMatch(/delivery_failure/) + } + }) +}) + +describe('TAPI rejection', () => { + it('surfaces error', async () => { + const analytics = createTestAnalytics({ + oauthSettings: getOAuthSettings(), + httpClient: tapiTestClient, + }) + const eventName = 'Test Event' + + oauthFetcher.mockReturnValue( + createOAuthSuccess({ access_token: 'token', expires_in: 100 }) + ) + tapiFetcher.mockReturnValue( + createError({ + status: 400, + statusText: + '{"success":false,"message":"malformed JSON","code":"invalid_request"}', + }) + ) + + try { + analytics.track({ + event: eventName, + anonymousId: 'unknown', + userId: 'known', + timestamp: timestamp, + }) + + const ctx1 = await resolveCtx(analytics, 'track') + + expect(ctx1.event.type).toEqual('track') + expect(ctx1.event.event).toEqual(eventName) + expect(ctx1.event.properties).toEqual({}) + expect(ctx1.event.anonymousId).toEqual('unknown') + expect(ctx1.event.userId).toEqual('known') + expect(ctx1.event.timestamp).toEqual(timestamp) + + expect(oauthFetcher).toHaveBeenCalledTimes(1) + + await analytics.closeAndFlush() + throw new Error('fail') + } catch (err: any) { + expect(err.code).toBe('delivery_failure') + } + }) +}) diff --git a/packages/node/src/__tests__/test-helpers/assert-shape/http-request-event.ts b/packages/node/src/__tests__/test-helpers/assert-shape/http-request-event.ts index 684a32dc2..7c0b733ce 100644 --- a/packages/node/src/__tests__/test-helpers/assert-shape/http-request-event.ts +++ b/packages/node/src/__tests__/test-helpers/assert-shape/http-request-event.ts @@ -4,7 +4,7 @@ type HttpRequestEmitterEvent = NodeEmitterEvents['http_request'][0] export const assertHttpRequestEmittedEvent = ( event: HttpRequestEmitterEvent ) => { - const body = event.body + const body = JSON.parse(event.body) expect(Array.isArray(body.batch)).toBeTruthy() expect(body.batch.length).toBe(1) expect(typeof event.headers).toBe('object') diff --git a/packages/node/src/__tests__/test-helpers/assert-shape/segment-http-api.ts b/packages/node/src/__tests__/test-helpers/assert-shape/segment-http-api.ts index 5cb0ce741..f3540679c 100644 --- a/packages/node/src/__tests__/test-helpers/assert-shape/segment-http-api.ts +++ b/packages/node/src/__tests__/test-helpers/assert-shape/segment-http-api.ts @@ -6,7 +6,9 @@ import { HTTPClientRequest } from '../../../lib/http-client' */ export const httpClientOptionsBodyMatcher = { messageId: expect.stringMatching(/^node-next-\d*-\w*-\w*-\w*-\w*-\w*/), - timestamp: expect.any(Date), + timestamp: expect.stringMatching( + /^20\d{2}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d+Z/ + ), _metadata: expect.any(Object), context: { library: { @@ -18,21 +20,20 @@ export const httpClientOptionsBodyMatcher = { } export function assertHTTPRequestOptions( - { data, headers, method, url }: HTTPClientRequest, + { body, headers, method, url }: HTTPClientRequest, contexts: Context[] ) { expect(url).toBe('https://api.segment.io/v1/batch') expect(method).toBe('POST') expect(headers).toEqual({ - Authorization: 'Basic Og==', 'Content-Type': 'application/json', 'User-Agent': 'analytics-node-next/latest', }) - expect(data.batch).toHaveLength(contexts.length) + expect(JSON.parse(body).batch).toHaveLength(contexts.length) let idx = 0 for (const context of contexts) { - expect(data.batch[idx]).toEqual({ + expect(JSON.parse(body).batch[idx]).toEqual({ ...context.event, ...httpClientOptionsBodyMatcher, }) diff --git a/packages/node/src/__tests__/test-helpers/create-test-analytics.ts b/packages/node/src/__tests__/test-helpers/create-test-analytics.ts index 6d91535a2..df15e038b 100644 --- a/packages/node/src/__tests__/test-helpers/create-test-analytics.ts +++ b/packages/node/src/__tests__/test-helpers/create-test-analytics.ts @@ -1,6 +1,9 @@ -import { Analytics } from '../../app/analytics-node' -import { AnalyticsSettings } from '../../app/settings' -import { FetchHTTPClient, HTTPFetchFn } from '../../lib/http-client' +import { + Analytics, + AnalyticsSettings, + FetchHTTPClient, + HTTPFetchFn, +} from '../../index' import { createError, createSuccess } from './factories' export const createTestAnalytics = ( diff --git a/packages/node/src/__tests__/test-helpers/factories.ts b/packages/node/src/__tests__/test-helpers/factories.ts index b2c670bf0..5d9cb761e 100644 --- a/packages/node/src/__tests__/test-helpers/factories.ts +++ b/packages/node/src/__tests__/test-helpers/factories.ts @@ -1,6 +1,7 @@ export const createSuccess = (body?: any) => { return Promise.resolve({ json: () => Promise.resolve(body), + text: () => Promise.resolve(JSON.stringify(body)), ok: true, status: 200, statusText: 'OK', diff --git a/packages/node/src/__tests__/typedef-tests.ts b/packages/node/src/__tests__/typedef-tests.ts index 2f8e626f6..f847c26c8 100644 --- a/packages/node/src/__tests__/typedef-tests.ts +++ b/packages/node/src/__tests__/typedef-tests.ts @@ -110,7 +110,7 @@ export default { return axios({ url: options.url, method: options.method, - data: options.data, + data: options.body, headers: options.headers, timeout: options.httpRequestTimeout, }) diff --git a/packages/node/src/app/analytics-node.ts b/packages/node/src/app/analytics-node.ts index 229df134c..f956ca3a3 100644 --- a/packages/node/src/app/analytics-node.ts +++ b/packages/node/src/app/analytics-node.ts @@ -60,10 +60,12 @@ export class Analytics extends NodeEmitter implements CoreAnalytics { typeof settings.httpClient === 'function' ? new FetchHTTPClient(settings.httpClient) : settings.httpClient ?? new FetchHTTPClient(), + oauthSettings: settings.oauthSettings, }, this as NodeEmitter ) this._publisher = publisher + this.ready = this.register(plugin).then(() => undefined) this.emit('initialize', settings) diff --git a/packages/node/src/app/emitter.ts b/packages/node/src/app/emitter.ts index 1925d30b3..0a577c1c5 100644 --- a/packages/node/src/app/emitter.ts +++ b/packages/node/src/app/emitter.ts @@ -15,7 +15,7 @@ export type NodeEmitterEvents = CoreEmitterContract & { url: string method: string headers: Record - body: Record + body: string } ] drained: [] diff --git a/packages/node/src/app/settings.ts b/packages/node/src/app/settings.ts index 9b0f4136d..0de577b4f 100644 --- a/packages/node/src/app/settings.ts +++ b/packages/node/src/app/settings.ts @@ -1,5 +1,6 @@ import { ValidationError } from '@segment/analytics-core' import { HTTPClient, HTTPFetchFn } from '../lib/http-client' +import { OAuthSettings } from '../lib/types' export interface AnalyticsSettings { /** @@ -45,6 +46,10 @@ export interface AnalyticsSettings { * Default: an HTTP client that uses globalThis.fetch, with node-fetch as a fallback. */ httpClient?: HTTPFetchFn | HTTPClient + /** + * Set up OAuth2 authentication between the client and Segment's endpoints + */ + oauthSettings?: OAuthSettings } export const validateSettings = (settings: AnalyticsSettings) => { diff --git a/packages/node/src/index.common.ts b/packages/node/src/index.common.ts new file mode 100644 index 000000000..55d7ac05f --- /dev/null +++ b/packages/node/src/index.common.ts @@ -0,0 +1,24 @@ +export { Analytics } from './app/analytics-node' +export { Context } from './app/context' +export { + HTTPClient, + FetchHTTPClient, + HTTPFetchRequest, + HTTPResponse, + HTTPFetchFn, + HTTPClientRequest, +} from './lib/http-client' + +export { OAuthSettings } from './lib/types' + +export type { + Plugin, + GroupTraits, + UserTraits, + TrackParams, + IdentifyParams, + AliasParams, + GroupParams, + PageParams, +} from './app/types' +export type { AnalyticsSettings } from './app/settings' diff --git a/packages/node/src/index.ts b/packages/node/src/index.ts index e1facda3a..326adb60b 100644 --- a/packages/node/src/index.ts +++ b/packages/node/src/index.ts @@ -1,26 +1,5 @@ -export { Analytics } from './app/analytics-node' -export { Context } from './app/context' -export { - HTTPClient, - FetchHTTPClient, - HTTPFetchRequest, - HTTPResponse, - HTTPFetchFn, - HTTPClientRequest, -} from './lib/http-client' - -export type { - Plugin, - GroupTraits, - UserTraits, - TrackParams, - IdentifyParams, - AliasParams, - GroupParams, - PageParams, -} from './app/types' -export type { AnalyticsSettings } from './app/settings' +export * from './index.common' // export Analytics as both a named export and a default export (for backwards-compat. reasons) -import { Analytics } from './app/analytics-node' +import { Analytics } from './index.common' export default Analytics diff --git a/packages/node/src/lib/__tests__/token-manager.test.ts b/packages/node/src/lib/__tests__/token-manager.test.ts new file mode 100644 index 000000000..4a2ba0f59 --- /dev/null +++ b/packages/node/src/lib/__tests__/token-manager.test.ts @@ -0,0 +1,142 @@ +import { sleep } from '@segment/analytics-core' +import { TestFetchClient } from '../../__tests__/test-helpers/create-test-analytics' +import { HTTPResponse } from '../http-client' +import { TokenManager, TokenManagerSettings } from '../token-manager' + +// NOTE: Fake private key for illustrative purposes only +const privateKey = `-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQDVll7uJaH322IN +PQsH2aOXZJ2r1q+6hpVK1R5JV1p41PUzn8pOxyXFHWB+53dUd4B8qywKS36XQjp0 +VmhR1tQ22znQ9ZCM6y4LGeOJBjAZiFZLcGQNNrDFC0WGWTrK1ZTS2K7p5qy4fIXG +laNkMXiGGCawkgcHAdOvPTy8m1d9a6YSetYVmBP/tEYN95jPyZFIoHQfkQPBPr9W +cWPpdEBzasHV5d957akjurPpleDiD5as66UW4dkWXvS7Wu7teCLCyDApcyJKTb2Z +SXybmWjhIZuctZMAx3wT/GgW3FbkGaW5KLQgBUMzjpL0fCtMatlqckMD92ll1FuK +R+HnXu05AgMBAAECggEBAK4o2il4GDUh9zbyQo9ZIPLuwT6AZXRED3Igi3ykNQp4 +I6S/s9g+vQaY6LkyBnSiqOt/K/8NBiFSiJWaa5/n+8zrP56qzf6KOlYk+wsdN5Vq +PWtwLrUzljpl8YAWPEFunNa8hwwE42vfZbnDBKNLT4qQIOQzfnVxQOoQlfj49gM2 +iSrblvsnQTyucFy3UyTeioHbh8q2Xqcxry5WUCOrFDd3IIwATTpLZGw0IPeuFJbJ +NfBizLEcyJaM9hujQU8PRCRd16MWX+bbYM6Mh4dkT40QXWsVnHBHwsgPdQgDgseF +Na4ajtHoC0DlwYCXpCm3IzJfKfq/LR2q8NDUgKeF4AECgYEA9nD4czza3SRbzhpZ +bBoK77CSNqCcMAqyuHB0hp/XX3yB7flF9PIPb2ReO8wwmjbxn+bm7PPz2Uwd2SzO +pU+FXmyKJr53Jxw/hmDWZCoh42gsGDlVqpmytzsj74KlaYiMyZmEGbD7t/FGfNGV +LdLDJaHIYxEimFviOTXKCeKvPAECgYEA3d8tv4jdp1uAuRZiU9Z/tfw5mJOi3oXF +8AdFFDwaPzcTorEAxjrt9X6IjPbLIDJNJtuXYpe+dG6720KyuNnhLhWW9oZEJTwT +dUgqZ2fTCOS9uH0jSn+ZFlgTWI6UDQXRwE7z8avlhMIrQVmPsttGTo7V6sQVtGRx +bNj2RSVekTkCgYAJvy4UYLPHS0jWPfSLcfw8vp8JyhBjVgj7gncZW/kIrcP1xYYe +yfQSU8XmV40UjFfCGz/G318lmP0VOdByeVKtCV3talsMEPHyPqI8E+6DL/uOebYJ +qUqINK6XKnOgWOY4kvnGillqTQCcry1XQp61PlDOmj7kB75KxPXYrj6AAQKBgQDa ++ixCv6hURuEyy77cE/YT/Q4zYnL6wHjtP5+UKwWUop1EkwG6o+q7wtiul90+t6ah +1VUCP9X/QFM0Qg32l0PBohlO0pFrVnG17TW8vSHxwyDkds1f97N19BOT8ZR5jebI +sKPfP9LVRnY+l1BWLEilvB+xBzqMwh2YWkIlWI6PMQKBgGi6TBnxp81lOYrxVRDj +/3ycRnVDmBdlQKFunvfzUBmG1mG/G0YHeVSUKZJGX7w2l+jnDwIA383FcUeA8X6A +l9q+amhtkwD/6fbkAu/xoWNl+11IFoxd88y2ByBFoEKB6UVLuCTSKwXDqzEZet7x +mDyRxq7ohIzLkw8b8buDeuXZ +-----END PRIVATE KEY-----` + +const testClient = new TestFetchClient() +const fetcher = jest.spyOn(testClient, 'makeRequest') + +const createOAuthSuccess = async (body?: any): Promise => ({ + text: async () => JSON.stringify(body), + status: 200, + statusText: 'OK', +}) + +const createOAuthError = async ( + overrides: Partial = {} +): Promise => ({ + text: async () => '', + status: 400, + statusText: 'Foo', + ...overrides, +}) + +const getTokenManager = () => { + const oauthSettings: TokenManagerSettings = { + httpClient: testClient, + maxRetries: 3, + clientId: 'clientId', + clientKey: privateKey, + keyId: 'keyId', + scope: 'scope', + authServer: 'http://127.0.0.1:1234', + } + + return new TokenManager(oauthSettings) +} + +test( + 'OAuth Success', + async () => { + fetcher.mockReturnValueOnce( + createOAuthSuccess({ access_token: 'token', expires_in: 100 }) + ) + + const tokenManager = getTokenManager() + const token = await tokenManager.getAccessToken() + tokenManager.stopPoller() + + expect(tokenManager.isValidToken(token)).toBeTruthy() + expect(token.access_token).toBe('token') + expect(token.expires_in).toBe(100) + expect(fetcher).toHaveBeenCalledTimes(1) + }, + 30 * 1000 +) + +test('OAuth retry failure', async () => { + fetcher.mockReturnValue(createOAuthError({ status: 425 })) + + const tokenManager = getTokenManager() + + await expect(tokenManager.getAccessToken()).rejects.toThrowError('Foo') + tokenManager.stopPoller() + + expect(fetcher).toHaveBeenCalledTimes(3) +}) + +test('OAuth immediate failure', async () => { + fetcher.mockReturnValue(createOAuthError({ status: 400 })) + + const tokenManager = getTokenManager() + + await expect(tokenManager.getAccessToken()).rejects.toThrowError('Foo') + tokenManager.stopPoller() + + expect(fetcher).toHaveBeenCalledTimes(1) +}) + +test('OAuth rate limit', async () => { + fetcher + .mockReturnValueOnce( + createOAuthError({ + status: 429, + headers: { 'X-RateLimit-Reset': Date.now() + 250 }, + }) + ) + .mockReturnValueOnce( + createOAuthError({ + status: 429, + headers: { 'X-RateLimit-Reset': Date.now() + 500 }, + }) + ) + .mockReturnValue( + createOAuthSuccess({ access_token: 'token', expires_in: 100 }) + ) + + const tokenManager = getTokenManager() + + const tokenPromise = tokenManager.getAccessToken() + await sleep(25) + expect(fetcher).toHaveBeenCalledTimes(1) + await sleep(250) + expect(fetcher).toHaveBeenCalledTimes(2) + await sleep(250) + expect(fetcher).toHaveBeenCalledTimes(3) + + const token = await tokenPromise + expect(tokenManager.isValidToken(token)).toBeTruthy() + expect(token.access_token).toBe('token') + expect(token.expires_in).toBe(100) + expect(fetcher).toHaveBeenCalledTimes(3) +}) diff --git a/packages/node/src/lib/abort.ts b/packages/node/src/lib/abort.ts index b8d93120f..4e3a17f38 100644 --- a/packages/node/src/lib/abort.ts +++ b/packages/node/src/lib/abort.ts @@ -41,7 +41,7 @@ export class AbortSignal { * This polyfill is only neccessary to support versions of node < 14.17. * Can be removed once node 14 support is dropped. */ -class AbortController { +export class AbortController { signal = new AbortSignal() abort() { if (this.signal.aborted) return diff --git a/packages/node/src/lib/http-client.ts b/packages/node/src/lib/http-client.ts index 93c1c5a96..316b0936f 100644 --- a/packages/node/src/lib/http-client.ts +++ b/packages/node/src/lib/http-client.ts @@ -21,11 +21,23 @@ export interface HTTPFetchRequest { signal: any // AbortSignal type does not play nicely with node-fetch } +/** + * This interface is meant to be compatible with the Headers interface. + * @link https://developer.mozilla.org/en-US/docs/Web/API/Headers + */ +export interface HTTPHeaders { + get: (key: string) => string | null + has: (key: string) => boolean + entries: () => IterableIterator<[string, any]> +} + /** * This interface is meant to very minimally conform to the Response interface. * @link https://developer.mozilla.org/en-US/docs/Web/API/Response */ export interface HTTPResponse { + headers?: Record | HTTPHeaders + text?: () => Promise status: number statusText: string } @@ -49,10 +61,9 @@ export interface HTTPClientRequest { */ headers: Record /** - * JSON data to be sent with the request (will be stringified) - * @example { "batch": [ ... ]} + * Data to be sent with the request */ - data: Record + body: string /** * Specifies the timeout (in milliseconds) for an HTTP client to get an HTTP response from the server * @example 10000 @@ -84,7 +95,7 @@ export class FetchHTTPClient implements HTTPClient { url: options.url, method: options.method, headers: options.headers, - body: JSON.stringify(options.data), + body: options.body, signal: signal, } diff --git a/packages/node/src/lib/token-manager.ts b/packages/node/src/lib/token-manager.ts new file mode 100644 index 000000000..19d4ec841 --- /dev/null +++ b/packages/node/src/lib/token-manager.ts @@ -0,0 +1,327 @@ +import { uuid } from './uuid' +import { HTTPClient, HTTPClientRequest, HTTPResponse } from './http-client' +import { SignJWT, importPKCS8 } from 'jose' +import { backoff, sleep } from '@segment/analytics-core' +import { Emitter } from '@segment/analytics-generic-utils' +import type { + AccessToken, + OAuthSettings, + TokenManager as ITokenManager, +} from './types' + +const isAccessToken = (thing: unknown): thing is AccessToken => { + return Boolean( + thing && + typeof thing === 'object' && + 'access_token' in thing && + 'expires_in' in thing && + typeof thing.access_token === 'string' && + typeof thing.expires_in === 'number' + ) +} + +const isValidCustomResponse = ( + response: HTTPResponse +): response is HTTPResponse & Required> => { + return typeof response.text === 'function' +} + +function convertHeaders( + headers: HTTPResponse['headers'] +): Record { + const lowercaseHeaders: Record = {} + if (!headers) return {} + if (isHeaders(headers)) { + for (const [name, value] of headers.entries()) { + lowercaseHeaders[name.toLowerCase()] = value + } + return lowercaseHeaders + } + for (const [name, value] of Object.entries(headers)) { + lowercaseHeaders[name.toLowerCase()] = value as string + } + return lowercaseHeaders +} + +function isHeaders(thing: unknown): thing is HTTPResponse['headers'] { + if ( + typeof thing === 'object' && + thing !== null && + 'entries' in Object(thing) && + typeof Object(thing).entries === 'function' + ) { + return true + } + return false +} + +export interface TokenManagerSettings extends OAuthSettings { + httpClient: HTTPClient + maxRetries: number +} + +export class TokenManager implements ITokenManager { + private alg = 'RS256' as const + private grantType = 'client_credentials' as const + private clientAssertionType = + 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer' as const + private clientId: string + private clientKey: string + private keyId: string + private scope: string + private authServer: string + private httpClient: HTTPClient + private maxRetries: number + private clockSkewInSeconds = 0 + + private accessToken?: AccessToken + private tokenEmitter = new Emitter<{ + access_token: [{ token: AccessToken } | { error: unknown }] + }>() + private retryCount: number + private pollerTimer?: ReturnType + + constructor(props: TokenManagerSettings) { + this.keyId = props.keyId + this.clientId = props.clientId + this.clientKey = props.clientKey + this.authServer = props.authServer ?? 'https://oauth2.segment.io' + this.scope = props.scope ?? 'tracking_api:write' + this.httpClient = props.httpClient + this.maxRetries = props.maxRetries + this.tokenEmitter.on('access_token', (event) => { + if ('token' in event) { + this.accessToken = event.token + } + }) + this.retryCount = 0 + } + + stopPoller() { + clearTimeout(this.pollerTimer) + } + + async pollerLoop() { + let timeUntilRefreshInMs = 25 + let response: HTTPResponse + + try { + response = await this.requestAccessToken() + } catch (err) { + // Error without a status code - likely networking, retry + return this.handleTransientError({ error: err }) + } + + if (!isValidCustomResponse(response)) { + return this.handleInvalidCustomResponse() + } + + const headers = convertHeaders(response.headers) + if (headers['date']) { + this.updateClockSkew(Date.parse(headers['date'])) + } + + // Handle status codes! + if (response.status === 200) { + try { + const body = await response.text() + const token = JSON.parse(body) + + if (!isAccessToken(token)) { + throw new Error( + 'Response did not contain a valid access_token and expires_in' + ) + } + + // Success, we have a token! + token.expires_at = Math.round(Date.now() / 1000) + token.expires_in + this.tokenEmitter.emit('access_token', { token }) + + // Reset our failure count + this.retryCount = 0 + // Refresh the token after half the expiry time passes + timeUntilRefreshInMs = (token.expires_in / 2) * 1000 + return this.queueNextPoll(timeUntilRefreshInMs) + } catch (err) { + // Something went really wrong with the body, lets surface an error and try again? + return this.handleTransientError({ error: err, forceEmitError: true }) + } + } else if (response.status === 429) { + // Rate limited, wait for the reset time + return await this.handleRateLimited( + response, + headers, + timeUntilRefreshInMs + ) + } else if ([400, 401, 415].includes(response.status)) { + // Unrecoverable errors, stops the poller + return this.handleUnrecoverableErrors(response) + } else { + return this.handleTransientError({ + error: new Error(`[${response.status}] ${response.statusText}`), + }) + } + } + + private handleTransientError({ + error, + forceEmitError, + }: { + error: unknown + forceEmitError?: boolean + }) { + this.incrementRetries({ error, forceEmitError }) + + const timeUntilRefreshInMs = backoff({ + attempt: this.retryCount, + minTimeout: 25, + maxTimeout: 1000, + }) + this.queueNextPoll(timeUntilRefreshInMs) + } + + private handleInvalidCustomResponse() { + this.tokenEmitter.emit('access_token', { + error: new Error('HTTPClient does not implement response.text method'), + }) + } + + private async handleRateLimited( + response: HTTPResponse, + headers: Record, + timeUntilRefreshInMs: number + ) { + this.incrementRetries({ + error: new Error(`[${response.status}] ${response.statusText}`), + }) + + if (headers['x-ratelimit-reset']) { + const rateLimitResetTimestamp = parseInt(headers['x-ratelimit-reset'], 10) + if (isFinite(rateLimitResetTimestamp)) { + timeUntilRefreshInMs = + rateLimitResetTimestamp - Date.now() + this.clockSkewInSeconds * 1000 + } else { + timeUntilRefreshInMs = 5 * 1000 + } + // We want subsequent calls to get_token to be able to interrupt our + // Timeout when it's waiting for e.g. a long normal expiration, but + // not when we're waiting for a rate limit reset. Sleep instead. + await sleep(timeUntilRefreshInMs) + timeUntilRefreshInMs = 0 + } + + this.queueNextPoll(timeUntilRefreshInMs) + } + + private handleUnrecoverableErrors(response: HTTPResponse) { + this.retryCount = 0 + this.tokenEmitter.emit('access_token', { + error: new Error(`[${response.status}] ${response.statusText}`), + }) + this.stopPoller() + } + + private updateClockSkew(dateInMs: number) { + this.clockSkewInSeconds = (Date.now() - dateInMs) / 1000 + } + + private incrementRetries({ + error, + forceEmitError, + }: { + error: unknown + forceEmitError?: boolean + }) { + this.retryCount++ + if (forceEmitError || this.retryCount % this.maxRetries === 0) { + this.retryCount = 0 + this.tokenEmitter.emit('access_token', { error: error }) + } + } + + private queueNextPoll(timeUntilRefreshInMs: number) { + this.pollerTimer = setTimeout(() => this.pollerLoop(), timeUntilRefreshInMs) + if (this.pollerTimer.unref) { + this.pollerTimer.unref() + } + } + + /** + * Solely responsible for building the HTTP request and calling the token service. + */ + private async requestAccessToken(): Promise { + // Set issued at time to 5 seconds in the past to account for clock skew + const ISSUED_AT_BUFFER_IN_SECONDS = 5 + const MAX_EXPIRY_IN_SECONDS = 60 + // Final expiry time takes into account the issued at time, so need to subtract IAT buffer + const EXPIRY_IN_SECONDS = + MAX_EXPIRY_IN_SECONDS - ISSUED_AT_BUFFER_IN_SECONDS + const jti = uuid() + const currentUTCInSeconds = + Math.round(Date.now() / 1000) - this.clockSkewInSeconds + const jwtBody = { + iss: this.clientId, + sub: this.clientId, + aud: this.authServer, + iat: currentUTCInSeconds - ISSUED_AT_BUFFER_IN_SECONDS, + exp: currentUTCInSeconds + EXPIRY_IN_SECONDS, + jti, + } + + const key = await importPKCS8(this.clientKey, 'RS256') + const signedJwt = await new SignJWT(jwtBody) + .setProtectedHeader({ alg: this.alg, kid: this.keyId, typ: 'JWT' }) + .sign(key) + + const requestBody = `grant_type=${this.grantType}&client_assertion_type=${this.clientAssertionType}&client_assertion=${signedJwt}&scope=${this.scope}` + const accessTokenEndpoint = `${this.authServer}/token` + + const requestOptions: HTTPClientRequest = { + method: 'POST', + url: accessTokenEndpoint, + body: requestBody, + headers: { + 'Content-Type': 'application/x-www-form-urlencoded', + }, + httpRequestTimeout: 10000, + } + return this.httpClient.makeRequest(requestOptions) + } + + async getAccessToken(): Promise { + // Use the cached token if it is still valid, otherwise wait for a new token. + if (this.isValidToken(this.accessToken)) { + return this.accessToken + } + + // stop poller first in order to make sure that it's not sleeping if we need a token immediately + // Otherwise it could be hours before the expiration time passes normally + this.stopPoller() + + // startPoller needs to be called somewhere, either lazily when a token is first requested, or at instantiation. + // Doing it lazily currently + this.pollerLoop().catch(() => {}) + + return new Promise((resolve, reject) => { + this.tokenEmitter.once('access_token', (event) => { + if ('token' in event) { + resolve(event.token) + } else { + reject(event.error) + } + }) + }) + } + + clearToken() { + this.accessToken = undefined + } + + isValidToken(token?: AccessToken): token is AccessToken { + return ( + typeof token !== 'undefined' && + token !== null && + token.expires_in < Date.now() / 1000 + ) + } +} diff --git a/packages/node/src/lib/types.ts b/packages/node/src/lib/types.ts new file mode 100644 index 000000000..110d05480 --- /dev/null +++ b/packages/node/src/lib/types.ts @@ -0,0 +1,58 @@ +import { HTTPClient } from './http-client' +import { TokenManagerSettings } from './token-manager' + +export interface OAuthSettings { + /** + * The OAuth App ID from Access Management under Workspace Settings in the Segment Dashboard. + */ + clientId: string + /** + * The private key that matches the public key set in the OAuth app in the Segment Dashboard. + */ + clientKey: string + /** + * The ID for the matching public key as given in the Segment Dashboard after it is uploaded. + */ + keyId: string + /** + * The Authorization server. Defaults to https://oauth2.segment.io + * If your TAPI endpoint is not https://api.segment.io you will need to set this value. + * e.g. https://oauth2.eu1.segmentapis.com/ for TAPI endpoint https://events.eu1.segmentapis.com/ + */ + authServer?: string + /** + * The scope of permissions. Defaults to `tracking_api:write`. + * Must match a scope from the OAuth app settings in the Segment Dashboard. + */ + scope?: string + /** + * Custom number of retries before a recoverable error is reported. + * Defaults to the custom value set in the Analytics settings, or 3 if unset + */ + maxRetries?: number + /** + * Custom HTTP Client implementation. + * Defaults to the custom value set in the Analytics settings, or uses the default fetch client. + * Note: This would only be need to be set in a complex environment that may have different access + * rules for the TAPI and Auth endpoints. + */ + httpClient?: HTTPClient +} + +export type AccessToken = { + access_token: string + expires_in: number + expires_at?: number +} + +export interface TokenManager { + pollerLoop(): Promise + stopPoller(): void + getAccessToken(): Promise + clearToken(): void + isValidToken(token?: AccessToken): token is AccessToken +} + +export interface TokenManagerConstructor { + new (settings: TokenManagerSettings): TokenManager +} diff --git a/packages/node/src/plugins/segmentio/__tests__/methods.test.ts b/packages/node/src/plugins/segmentio/__tests__/methods.test.ts index 90fc7e0f4..2818318bc 100644 --- a/packages/node/src/plugins/segmentio/__tests__/methods.test.ts +++ b/packages/node/src/plugins/segmentio/__tests__/methods.test.ts @@ -52,7 +52,7 @@ test('alias', async () => { expect(makeReqSpy).toHaveBeenCalledTimes(1) validateMakeReqInputs(context) - const data = getLastRequest().data + const data = JSON.parse(getLastRequest().body) expect(data.batch).toHaveLength(1) expect(data.batch[0]).toEqual({ @@ -81,7 +81,7 @@ test('group', async () => { expect(makeReqSpy).toHaveBeenCalledTimes(1) validateMakeReqInputs(context) - const data = getLastRequest().data + const data = JSON.parse(getLastRequest().body) expect(data.batch).toHaveLength(1) expect(data.batch[0]).toEqual({ @@ -109,7 +109,8 @@ test('identify', async () => { expect(makeReqSpy).toHaveBeenCalledTimes(1) validateMakeReqInputs(context) - const data = getLastRequest().data + const data = JSON.parse(getLastRequest().body) + expect(data.batch).toHaveLength(1) expect(data.batch[0]).toEqual({ ...httpClientOptionsBodyMatcher, @@ -138,7 +139,7 @@ test('page', async () => { expect(makeReqSpy).toHaveBeenCalledTimes(1) validateMakeReqInputs(context) - const data = getLastRequest().data + const data = JSON.parse(getLastRequest().body) expect(data.batch).toHaveLength(1) expect(data.batch[0]).toEqual({ @@ -171,7 +172,7 @@ test('screen', async () => { expect(makeReqSpy).toHaveBeenCalledTimes(1) validateMakeReqInputs(context) - const data = getLastRequest().data + const data = JSON.parse(getLastRequest().body) expect(data.batch).toHaveLength(1) expect(data.batch[0]).toEqual({ @@ -202,7 +203,7 @@ test('track', async () => { expect(makeReqSpy).toHaveBeenCalledTimes(1) validateMakeReqInputs(context) - const data = getLastRequest().data + const data = JSON.parse(getLastRequest().body) expect(data.batch).toHaveLength(1) expect(data.batch[0]).toEqual({ diff --git a/packages/node/src/plugins/segmentio/publisher.ts b/packages/node/src/plugins/segmentio/publisher.ts index 6297f2c0f..5c63f9ee8 100644 --- a/packages/node/src/plugins/segmentio/publisher.ts +++ b/packages/node/src/plugins/segmentio/publisher.ts @@ -4,8 +4,9 @@ import { tryCreateFormattedUrl } from '../../lib/create-url' import { createDeferred } from '@segment/analytics-generic-utils' import { ContextBatch } from './context-batch' import { NodeEmitter } from '../../app/emitter' -import { b64encode } from '../../lib/base-64-encode' import { HTTPClient, HTTPClientRequest } from '../../lib/http-client' +import { OAuthSettings } from '../../lib/types' +import { TokenManager } from '../../lib/token-manager' function sleep(timeoutInMs: number): Promise { return new Promise((resolve) => setTimeout(resolve, timeoutInMs)) @@ -28,6 +29,7 @@ export interface PublisherProps { httpRequestTimeout?: number disable?: boolean httpClient: HTTPClient + oauthSettings?: OAuthSettings } /** @@ -40,13 +42,14 @@ export class Publisher { private _flushInterval: number private _flushAt: number private _maxRetries: number - private _auth: string private _url: string private _flushPendingItemsCount?: number private _httpRequestTimeout: number private _emitter: NodeEmitter private _disable: boolean private _httpClient: HTTPClient + private _writeKey: string + private _tokenManager: TokenManager | undefined constructor( { @@ -59,6 +62,7 @@ export class Publisher { httpRequestTimeout, httpClient, disable, + oauthSettings, }: PublisherProps, emitter: NodeEmitter ) { @@ -66,7 +70,6 @@ export class Publisher { this._maxRetries = maxRetries this._flushAt = Math.max(flushAt, 1) this._flushInterval = flushInterval - this._auth = b64encode(`${writeKey}:`) this._url = tryCreateFormattedUrl( host ?? 'https://api.segment.io', path ?? '/v1/batch' @@ -74,6 +77,15 @@ export class Publisher { this._httpRequestTimeout = httpRequestTimeout ?? 10000 this._disable = Boolean(disable) this._httpClient = httpClient + this._writeKey = writeKey + + if (oauthSettings) { + this._tokenManager = new TokenManager({ + ...oauthSettings, + httpClient: oauthSettings.httpClient ?? httpClient, + maxRetries: oauthSettings.maxRetries ?? maxRetries, + }) + } } private createBatch(): ContextBatch { @@ -99,7 +111,10 @@ export class Publisher { flush(pendingItemsCount: number): void { if (!pendingItemsCount) { - // if number of pending items is 0, there is nothing to flush + // if number of pending items is 0, there will never be anything else entering the batch, since the app is closed. + if (this._tokenManager) { + this._tokenManager.stopPoller() + } return } @@ -112,7 +127,14 @@ export class Publisher { // Any mismatch is because some globally pending items are in plugins. const isExpectingNoMoreItems = this._batch.length === pendingItemsCount if (isExpectingNoMoreItems) { - this.send(this._batch).catch(noop) + this.send(this._batch) + .catch(noop) + .finally(() => { + // stop poller so program can exit (). + if (this._tokenManager) { + this._tokenManager.stopPoller() + } + }) this.clearBatch() } } @@ -199,20 +221,34 @@ export class Publisher { return batch.resolveEvents() } + let authString = undefined + if (this._tokenManager) { + const token = await this._tokenManager.getAccessToken() + if (token && token.access_token) { + authString = `Bearer ${token.access_token}` + } + } + + const headers: Record = { + 'Content-Type': 'application/json', + 'User-Agent': 'analytics-node-next/latest', + ...(authString ? { Authorization: authString } : {}), + } + const request: HTTPClientRequest = { url: this._url, method: 'POST', - headers: { - 'Content-Type': 'application/json', - Authorization: `Basic ${this._auth}`, - 'User-Agent': 'analytics-node-next/latest', - }, - data: { batch: events, sentAt: new Date() }, + headers: headers, + body: JSON.stringify({ + batch: events, + writeKey: this._writeKey, + sentAt: new Date(), + }), httpRequestTimeout: this._httpRequestTimeout, } this._emitter.emit('http_request', { - body: request.data, + body: request.body, method: request.method, url: request.url, headers: request.headers, @@ -224,6 +260,17 @@ export class Publisher { // Successfully sent events, so exit! batch.resolveEvents() return + } else if ( + this._tokenManager && + (response.status === 400 || + response.status === 401 || + response.status === 403) + ) { + // Retry with a new OAuth token if we have OAuth data + this._tokenManager.clearToken() + failureReason = new Error( + `[${response.status}] ${response.statusText}` + ) } else if (response.status === 400) { // https://segment.com/docs/connections/sources/catalog/libraries/server/http-api/#max-request-size // Request either malformed or size exceeded - don't retry. diff --git a/packages/node/tsconfig.json b/packages/node/tsconfig.json index 8941846ae..e4aad78dd 100644 --- a/packages/node/tsconfig.json +++ b/packages/node/tsconfig.json @@ -3,8 +3,8 @@ "exclude": ["node_modules", "dist"], "compilerOptions": { "module": "ESNext", - "target": "es2020", // node 14 + "target": "es2022", // node 18 "moduleResolution": "node", - "lib": ["es2020"] // TODO: https://www.npmjs.com/package/@tsconfig/node14 + "lib": ["es2022"] // TODO: es2023 https://www.npmjs.com/package/@tsconfig/node18 } } diff --git a/yarn.lock b/yarn.lock index 6e33e03c6..1bd030636 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4022,6 +4022,7 @@ __metadata: "@types/node": ^16 axios: ^1.6.0 buffer: ^6.0.3 + jose: ^5.1.0 node-fetch: ^2.6.7 tslib: ^2.4.1 languageName: unknown @@ -14884,6 +14885,13 @@ __metadata: languageName: node linkType: hard +"jose@npm:^5.1.0": + version: 5.1.0 + resolution: "jose@npm:5.1.0" + checksum: 62e907e953fd83f869cf4ce2e08359f5678bf99a6a4cc50faf7997b4694bc5cd32f43b79c3ff86f0e5385346b465a408166ccd1bf4237ae8c705ee82c733acc0 + languageName: node + linkType: hard + "jpeg-js@npm:^0.4.1, jpeg-js@npm:^0.4.3": version: 0.4.4 resolution: "jpeg-js@npm:0.4.4"