diff --git a/.env b/.env index afacc013a..2500df8d7 100644 --- a/.env +++ b/.env @@ -60,6 +60,10 @@ PG_APPLICATION_NAME=stacks-blockchain-api # (with both Event Server and API endpoints). # STACKS_API_MODE= +# If this API instance should consume events from a Stacks node (`stacks` or `default`) or a Store-and-forward Redis service (`redis`). +# STACKS_API_EVENT_SOURCE= +# REDIS_URL= + # To avoid running unnecessary mempool stats during transaction influx, we use a debounce mechanism for the process. # This variable controls the duration it waits until there are no further mempool updates # MEMPOOL_STATS_DEBOUNCE_INTERVAL=1000 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 83b0024e1..6358c8639 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -6,7 +6,7 @@ on: - master - develop - beta - - nakamoto + - redis-stream tags-ignore: - "**" paths-ignore: diff --git a/.releaserc b/.releaserc index 42bc8fe1a..390e1b419 100644 --- a/.releaserc +++ b/.releaserc @@ -7,8 +7,8 @@ "prerelease": true }, { - "name": "nakamoto", - "channel": "nakamoto", + "name": "redis-stream", + "channel": "redis-stream", "prerelease": true } ], diff --git a/.vscode/launch.json b/.vscode/launch.json index 72a01bd2f..240a06e6e 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -25,7 +25,8 @@ "env": { "NODE_ENV": "development", "TS_NODE_SKIP_IGNORE": "true" - } + }, + "killBehavior": "polite", }, { "type": "node", diff --git a/migrations/1736275393880_stacks-node-event-sequence-id.js b/migrations/1736275393880_stacks-node-event-sequence-id.js new file mode 100644 index 000000000..512012f22 --- /dev/null +++ b/migrations/1736275393880_stacks-node-event-sequence-id.js @@ -0,0 +1,15 @@ +/* eslint-disable camelcase */ + +exports.shorthands = undefined; + +exports.up = pgm => { + pgm.addColumn('event_observer_requests', { + sequence_id: { + type: 'string' + } + }); +}; + +exports.down = pgm => { + pgm.dropColumn('event_observer_requests', 'sequence_id'); +}; diff --git a/package-lock.json b/package-lock.json index 229a95fb0..f30768b38 100644 --- a/package-lock.json +++ b/package-lock.json @@ -17,6 +17,7 @@ "@fastify/swagger": "8.15.0", "@fastify/type-provider-typebox": "4.0.0", "@hirosystems/api-toolkit": "1.6.2", + "@hirosystems/salt-n-pepper-client": "0.1.0", "@scure/base": "1.1.1", "@sinclair/typebox": "0.32.35", "@stacks/common": "6.10.0", @@ -131,6 +132,20 @@ "utf-8-validate": "5.0.7" } }, + "../salt-and-pepper/client": { + "name": "@hirosystems/salt-n-pepper-client", + "version": "0.1.0", + "extraneous": true, + "license": "GPL-3.0-only", + "dependencies": { + "@hirosystems/api-toolkit": "^1.7.2", + "redis": "^4.7.0" + }, + "devDependencies": { + "rimraf": "^6.0.1", + "typescript": "^5.7.2" + } + }, "node_modules/@actions/core": { "version": "1.10.0", "resolved": "https://registry.npmjs.org/@actions/core/-/core-1.10.0.tgz", @@ -1494,6 +1509,203 @@ "node": ">=12" } }, + "node_modules/@hirosystems/salt-n-pepper-client": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/@hirosystems/salt-n-pepper-client/-/salt-n-pepper-client-0.1.0.tgz", + "integrity": "sha512-GQE6as0V67YcdusZh9+ghzM1BALtdBlQ+QrLiQzl7Vrbu1ZXltpdTxgu/2jSQHp9a0VqRztDZyAlTN+NPLc+nA==", + "dependencies": { + "@hirosystems/api-toolkit": "^1.7.2", + "redis": "^4.7.0" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/@fastify/cors": { + "version": "8.5.0", + "resolved": "https://registry.npmjs.org/@fastify/cors/-/cors-8.5.0.tgz", + "integrity": "sha512-/oZ1QSb02XjP0IK1U0IXktEsw/dUBTxJOW7IpIeO8c/tNalw/KjoNSJv1Sf6eqoBPO+TDGkifq6ynFK3v68HFQ==", + "dependencies": { + "fastify-plugin": "^4.0.0", + "mnemonist": "0.39.6" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/@fastify/type-provider-typebox": { + "version": "3.6.0", + "resolved": "https://registry.npmjs.org/@fastify/type-provider-typebox/-/type-provider-typebox-3.6.0.tgz", + "integrity": "sha512-HTeOLvirfGg0u1KGao3iXn5rZpYNqlrOmyDnXSXAbWVPa+mDQTTBNs/x5uZzOB6vFAqr0Xcf7x1lxOamNSYKjw==", + "peerDependencies": { + "@sinclair/typebox": ">=0.26 <=0.32" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/@hirosystems/api-toolkit": { + "version": "1.7.2", + "resolved": "https://registry.npmjs.org/@hirosystems/api-toolkit/-/api-toolkit-1.7.2.tgz", + "integrity": "sha512-1xtVXFZTZBUJ2cj8r2jhAD51Hx5UMVspiPUkjEi/MgVetFA2Ne8mlIjCxbE+VO5aZAf2v6xTw35l7FDSIL7xaw==", + "dependencies": { + "@fastify/cors": "^8.0.0", + "@fastify/swagger": "^8.3.1", + "@fastify/type-provider-typebox": "^3.2.0", + "@sinclair/typebox": "^0.28.20", + "fastify": "^4.3.0", + "fastify-metrics": "^10.2.0", + "node-pg-migrate": "^6.2.2", + "pino": "^8.11.0", + "postgres": "^3.3.4" + }, + "bin": { + "api-toolkit-git-info": "bin/api-toolkit-git-info.js" + }, + "engines": { + "node": ">=18" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/@sinclair/typebox": { + "version": "0.28.20", + "resolved": "https://registry.npmjs.org/@sinclair/typebox/-/typebox-0.28.20.tgz", + "integrity": "sha512-QCF3BGfacwD+3CKhGsMeixnwOmX4AWgm61nKkNdRStyLVu0mpVFYlDSY8gVBOOED1oSwzbJauIWl/+REj8K5+w==" + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/@types/pg": { + "version": "8.11.10", + "resolved": "https://registry.npmjs.org/@types/pg/-/pg-8.11.10.tgz", + "integrity": "sha512-LczQUW4dbOQzsH2RQ5qoeJ6qJPdrcM/DcMLoqWQkMLMsq83J5lAX3LXjdkWdpscFy67JSOWDnh7Ny/sPFykmkg==", + "dependencies": { + "@types/node": "*", + "pg-protocol": "*", + "pg-types": "^4.0.1" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/cliui": { + "version": "7.0.4", + "resolved": "https://registry.npmjs.org/cliui/-/cliui-7.0.4.tgz", + "integrity": "sha512-OcRE68cOsVMXp1Yvonl/fzkQOyjLSu/8bhPDfQt0e0/Eb283TKP20Fs2MqoPsr9SwA595rRCA+QMzYc9nBP+JQ==", + "dependencies": { + "string-width": "^4.2.0", + "strip-ansi": "^6.0.0", + "wrap-ansi": "^7.0.0" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/fastify-metrics": { + "version": "10.6.0", + "resolved": "https://registry.npmjs.org/fastify-metrics/-/fastify-metrics-10.6.0.tgz", + "integrity": "sha512-QIPncCnwBOEObMn+VaRhsBC1ox8qEsaiYF2sV/A1UbXj7ic70W8/HNn/hlEC2W8JQbBeZMx++o1um2fPfhsFDQ==", + "dependencies": { + "fastify-plugin": "^4.3.0", + "prom-client": "^14.2.0" + }, + "peerDependencies": { + "fastify": ">=4" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/node-pg-migrate": { + "version": "6.2.2", + "resolved": "https://registry.npmjs.org/node-pg-migrate/-/node-pg-migrate-6.2.2.tgz", + "integrity": "sha512-0WYLTXpWu2doeZhiwJUW/1u21OqAFU2CMQ8YZ8VBcJ0xrdqYAjtd8GGFe5A5DM4NJdIZsqJcLPDFqY0FQsmivw==", + "dependencies": { + "@types/pg": "^8.0.0", + "decamelize": "^5.0.0", + "mkdirp": "~1.0.0", + "yargs": "~17.3.0" + }, + "bin": { + "node-pg-migrate": "bin/node-pg-migrate" + }, + "engines": { + "node": ">=12.20.0" + }, + "peerDependencies": { + "pg": ">=4.3.0 <9.0.0" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/pg-types": { + "version": "4.0.2", + "resolved": "https://registry.npmjs.org/pg-types/-/pg-types-4.0.2.tgz", + "integrity": "sha512-cRL3JpS3lKMGsKaWndugWQoLOCoP+Cic8oseVcbr0qhPzYD5DWXK+RZ9LY9wxRf7RQia4SCwQlXk0q6FCPrVng==", + "dependencies": { + "pg-int8": "1.0.1", + "pg-numeric": "1.0.2", + "postgres-array": "~3.0.1", + "postgres-bytea": "~3.0.0", + "postgres-date": "~2.1.0", + "postgres-interval": "^3.0.0", + "postgres-range": "^1.1.1" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/postgres": { + "version": "3.4.5", + "resolved": "https://registry.npmjs.org/postgres/-/postgres-3.4.5.tgz", + "integrity": "sha512-cDWgoah1Gez9rN3H4165peY9qfpEo+SA61oQv65O3cRUE1pOEoJWwddwcqKE8XZYjbblOJlYDlLV4h67HrEVDg==", + "engines": { + "node": ">=12" + }, + "funding": { + "type": "individual", + "url": "https://github.com/sponsors/porsager" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/postgres-array": { + "version": "3.0.2", + "resolved": "https://registry.npmjs.org/postgres-array/-/postgres-array-3.0.2.tgz", + "integrity": "sha512-6faShkdFugNQCLwucjPcY5ARoW1SlbnrZjmGl0IrrqewpvxvhSLHimCVzqeuULCbG0fQv7Dtk1yDbG3xv7Veog==", + "engines": { + "node": ">=12" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/postgres-bytea": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/postgres-bytea/-/postgres-bytea-3.0.0.tgz", + "integrity": "sha512-CNd4jim9RFPkObHSjVHlVrxoVQXz7quwNFpz7RY1okNNme49+sVyiTvTRobiLV548Hx/hb1BG+iE7h9493WzFw==", + "dependencies": { + "obuf": "~1.1.2" + }, + "engines": { + "node": ">= 6" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/postgres-date": { + "version": "2.1.0", + "resolved": "https://registry.npmjs.org/postgres-date/-/postgres-date-2.1.0.tgz", + "integrity": "sha512-K7Juri8gtgXVcDfZttFKVmhglp7epKb1K4pgrkLxehjqkrgPhfG6OO8LHLkfaqkbpjNRnra018XwAr1yQFWGcA==", + "engines": { + "node": ">=12" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/postgres-interval": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/postgres-interval/-/postgres-interval-3.0.0.tgz", + "integrity": "sha512-BSNDnbyZCXSxgA+1f5UU2GmwhoI0aU5yMxRGO8CdFEcY2BQF9xm/7MqKnYoM1nJDk8nONNWDk9WeSmePFhQdlw==", + "engines": { + "node": ">=12" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/prom-client": { + "version": "14.2.0", + "resolved": "https://registry.npmjs.org/prom-client/-/prom-client-14.2.0.tgz", + "integrity": "sha512-sF308EhTenb/pDRPakm+WgiN+VdM/T1RaHj1x+MvAuT8UiQP8JmOEbxVqtkbfR4LrvOg5n7ic01kRBDGXjYikA==", + "dependencies": { + "tdigest": "^0.1.1" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/@hirosystems/salt-n-pepper-client/node_modules/yargs": { + "version": "17.3.1", + "resolved": "https://registry.npmjs.org/yargs/-/yargs-17.3.1.tgz", + "integrity": "sha512-WUANQeVgjLbNsEmGk20f+nlHgOqzRFpiGWVaBrYGYIGANIIu3lWjoyi0fNlFmJkvfhCZ6BXINe7/W2O2bV4iaA==", + "dependencies": { + "cliui": "^7.0.2", + "escalade": "^3.1.1", + "get-caller-file": "^2.0.5", + "require-directory": "^2.1.1", + "string-width": "^4.2.3", + "y18n": "^5.0.5", + "yargs-parser": "^21.0.0" + }, + "engines": { + "node": ">=12" + } + }, "node_modules/@humanwhocodes/config-array": { "version": "0.11.14", "resolved": "https://registry.npmjs.org/@humanwhocodes/config-array/-/config-array-0.11.14.tgz", @@ -2786,6 +2998,59 @@ "node": ">=14" } }, + "node_modules/@redis/bloom": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@redis/bloom/-/bloom-1.2.0.tgz", + "integrity": "sha512-HG2DFjYKbpNmVXsa0keLHp/3leGJz1mjh09f2RLGGLQZzSHpkmZWuwJbAvo3QcRY8p80m5+ZdXZdYOSBLlp7Cg==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/client": { + "version": "1.6.0", + "resolved": "https://registry.npmjs.org/@redis/client/-/client-1.6.0.tgz", + "integrity": "sha512-aR0uffYI700OEEH4gYnitAnv3vzVGXCFvYfdpu/CJKvk4pHfLPEy/JSZyrpQ+15WhXe1yJRXLtfQ84s4mEXnPg==", + "dependencies": { + "cluster-key-slot": "1.1.2", + "generic-pool": "3.9.0", + "yallist": "4.0.0" + }, + "engines": { + "node": ">=14" + } + }, + "node_modules/@redis/graph": { + "version": "1.1.1", + "resolved": "https://registry.npmjs.org/@redis/graph/-/graph-1.1.1.tgz", + "integrity": "sha512-FEMTcTHZozZciLRl6GiiIB4zGm5z5F3F6a6FZCyrfxdKOhFlGkiAqlexWMBzCi4DcRoyiOsuLfW+cjlGWyExOw==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/json": { + "version": "1.0.7", + "resolved": "https://registry.npmjs.org/@redis/json/-/json-1.0.7.tgz", + "integrity": "sha512-6UyXfjVaTBTJtKNG4/9Z8PSpKE6XgSyEb8iwaqDcy+uKrd/DGYHTWkUdnQDyzm727V7p21WUMhsqz5oy65kPcQ==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/search": { + "version": "1.2.0", + "resolved": "https://registry.npmjs.org/@redis/search/-/search-1.2.0.tgz", + "integrity": "sha512-tYoDBbtqOVigEDMAcTGsRlMycIIjwMCgD8eR2t0NANeQmgK/lvxNAvYyb6bZDD4frHRhIHkJu2TBRvB0ERkOmw==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, + "node_modules/@redis/time-series": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/@redis/time-series/-/time-series-1.1.0.tgz", + "integrity": "sha512-c1Q99M5ljsIuc4YdaCwfUEXsofakb9c8+Zse2qxTadu8TalLXuAESzLvFAvNVbkmSlvlzIQOLpBCmWI9wTOt+g==", + "peerDependencies": { + "@redis/client": "^1.0.0" + } + }, "node_modules/@redocly/ajv": { "version": "8.11.0", "resolved": "https://registry.npmjs.org/@redocly/ajv/-/ajv-8.11.0.tgz", @@ -5772,6 +6037,14 @@ "node": ">=6" } }, + "node_modules/cluster-key-slot": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/cluster-key-slot/-/cluster-key-slot-1.1.2.tgz", + "integrity": "sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==", + "engines": { + "node": ">=0.10.0" + } + }, "node_modules/co": { "version": "4.6.0", "resolved": "https://registry.npmjs.org/co/-/co-4.6.0.tgz", @@ -8635,6 +8908,14 @@ "resolved": "https://registry.npmjs.org/signal-exit/-/signal-exit-3.0.7.tgz", "integrity": "sha512-wnD2ZE+l+SPC/uoS0vXeE9L1+0wuaMqKlfz9AMUo38JsyLSBWSFcHR1Rri62LZc12vLr1gb3jl7iwQhgwpAbGQ==" }, + "node_modules/generic-pool": { + "version": "3.9.0", + "resolved": "https://registry.npmjs.org/generic-pool/-/generic-pool-3.9.0.tgz", + "integrity": "sha512-hymDOu5B53XvN4QT9dBmZxPX4CWhBPPLguTZ9MMFeFa/Kg0xWVfylOVNlJji/E7yTZWFd/q9GO5TxDLq156D7g==", + "engines": { + "node": ">= 4" + } + }, "node_modules/gensync": { "version": "1.0.0-beta.2", "resolved": "https://registry.npmjs.org/gensync/-/gensync-1.0.0-beta.2.tgz", @@ -14774,6 +15055,19 @@ "node": ">= 12.13.0" } }, + "node_modules/redis": { + "version": "4.7.0", + "resolved": "https://registry.npmjs.org/redis/-/redis-4.7.0.tgz", + "integrity": "sha512-zvmkHEAdGMn+hMRXuMBtu4Vo5P6rHQjLoHftu+lBqq8ZTA3RCVC/WzD790bkKKiNFp7d5/9PcSD19fJyyRvOdQ==", + "dependencies": { + "@redis/bloom": "1.2.0", + "@redis/client": "1.6.0", + "@redis/graph": "1.1.1", + "@redis/json": "1.0.7", + "@redis/search": "1.2.0", + "@redis/time-series": "1.1.0" + } + }, "node_modules/redoc": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/redoc/-/redoc-2.1.5.tgz", @@ -15409,15 +15703,6 @@ "integrity": "sha512-Tpp60P6IUJDTuOq/5Z8cdskzJujfwqfOTkrwIwj7IRISpnkJnT6SyJ4PCPnGMoFjC9ddhal5KVIYtAt97ix05A==", "license": "MIT" }, - "node_modules/send/node_modules/encodeurl": { - "version": "1.0.2", - "resolved": "https://registry.npmjs.org/encodeurl/-/encodeurl-1.0.2.tgz", - "integrity": "sha512-TPJXq8JqFaVYm2CWmPvnP2Iyo4ZSM7/QKcSmuMLDObfpH5fi7RUGmd/rTDf+rut/saiDiQEeVTNgAmJEdAOx0w==", - "license": "MIT", - "engines": { - "node": ">= 0.8" - } - }, "node_modules/send/node_modules/encodeurl": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/encodeurl/-/encodeurl-1.0.2.tgz", diff --git a/package.json b/package.json index f7e24ee0e..822a564cd 100644 --- a/package.json +++ b/package.json @@ -95,6 +95,7 @@ "@fastify/swagger": "8.15.0", "@fastify/type-provider-typebox": "4.0.0", "@hirosystems/api-toolkit": "1.6.2", + "@hirosystems/salt-n-pepper-client": "0.1.0", "@scure/base": "1.1.1", "@sinclair/typebox": "0.32.35", "@stacks/common": "6.10.0", diff --git a/src/datastore/pg-store.ts b/src/datastore/pg-store.ts index 92859f119..701e58c73 100644 --- a/src/datastore/pg-store.ts +++ b/src/datastore/pg-store.ts @@ -4524,4 +4524,11 @@ export class PgStore extends BasePgStore { `; if (result.count) return result[0]; } + + async getLastEventObserverRequestSequenceId(): Promise { + const result = await this.sql<{ sequence_id: string }[]>` + SELECT sequence_id FROM event_observer_requests ORDER BY id DESC LIMIT 1 + `; + return result[0]?.sequence_id ?? undefined; + } } diff --git a/src/datastore/pg-write-store.ts b/src/datastore/pg-write-store.ts index bf4691e53..e2ba7afb2 100644 --- a/src/datastore/pg-write-store.ts +++ b/src/datastore/pg-write-store.ts @@ -170,8 +170,14 @@ export class PgWriteStore extends PgStore { return store; } - async storeRawEventRequest(eventPath: string, payload: any): Promise { + async storeRawEventRequest( + eventPath: string, + payload: any, + sequenceId?: string, + timestamp?: string + ): Promise { await this.sqlWriteTransaction(async sql => { + const receive_timestamp = timestamp ? sql`TO_TIMESTAMP(${timestamp})` : sql`NOW()`; const insertResult = await sql< { id: string; @@ -179,8 +185,8 @@ export class PgWriteStore extends PgStore { event_path: string; }[] >`INSERT INTO event_observer_requests( - event_path, payload - ) values(${eventPath}, ${payload}) + event_path, payload, receive_timestamp, sequence_id + ) values(${eventPath}, ${payload}, ${receive_timestamp}, ${sequenceId ?? sql`NULL`}) RETURNING id, receive_timestamp::text, event_path `; if (insertResult.length !== 1) { diff --git a/src/event-replay/parquet-based/importers/attachment-new-importer.ts b/src/event-replay/parquet-based/importers/attachment-new-importer.ts index 76e484c8e..964254307 100644 --- a/src/event-replay/parquet-based/importers/attachment-new-importer.ts +++ b/src/event-replay/parquet-based/importers/attachment-new-importer.ts @@ -4,7 +4,7 @@ import { Readable, Writable } from 'stream'; import { pipeline } from 'stream/promises'; import { PgWriteStore } from '../../../datastore/pg-write-store'; -import { parseAttachment } from '../../../event-stream/event-server'; +import { parseAttachment } from '../../../event-stream/event-message-handler'; import { logger } from '../../../logger'; import { CoreNodeAttachmentMessage } from '../../../event-stream/core-node-message'; import { DataStoreAttachmentSubdomainData } from '../../../datastore/common'; diff --git a/src/event-replay/parquet-based/importers/new-block-importer.ts b/src/event-replay/parquet-based/importers/new-block-importer.ts index 5837742ac..6bc0441b6 100644 --- a/src/event-replay/parquet-based/importers/new-block-importer.ts +++ b/src/event-replay/parquet-based/importers/new-block-importer.ts @@ -1,7 +1,6 @@ import { Readable, Writable, Transform } from 'stream'; import { pipeline } from 'stream/promises'; import { PgWriteStore } from '../../../datastore/pg-write-store'; -import { parseNewBlockMessage } from '../../../event-stream/event-server'; import { DbBlock, DbMicroblock, @@ -21,6 +20,7 @@ import { getApiConfiguredChainID } from '../../../helpers'; import { CoreNodeBlockMessage } from '../../../event-stream/core-node-message'; import { DatasetStore } from '../dataset/store'; import { batchIterate } from '@hirosystems/api-toolkit'; +import { parseNewBlockMessage } from '../../../event-stream/event-message-handler'; const chainID = getApiConfiguredChainID(); diff --git a/src/event-stream/event-message-handler.ts b/src/event-stream/event-message-handler.ts new file mode 100644 index 000000000..af7089fbb --- /dev/null +++ b/src/event-stream/event-message-handler.ts @@ -0,0 +1,1005 @@ +import { inspect } from 'util'; +import PQueue from 'p-queue'; +import * as prom from 'prom-client'; +import { BitVec, ChainID, assertNotNullish, getChainIDNetwork } from '../helpers'; +import { + CoreNodeBlockMessage, + CoreNodeEventType, + CoreNodeBurnBlockMessage, + CoreNodeDropMempoolTxMessage, + CoreNodeAttachmentMessage, + CoreNodeMicroblockMessage, + CoreNodeParsedTxMessage, + CoreNodeEvent, +} from './core-node-message'; +import { + DbEventBase, + DbSmartContractEvent, + DbStxEvent, + DbEventTypeId, + DbFtEvent, + DbAssetEventTypeId, + DbNftEvent, + DbBlock, + DataStoreBlockUpdateData, + DbStxLockEvent, + DbMinerReward, + DbBurnchainReward, + DbRewardSlotHolder, + DataStoreMicroblockUpdateData, + DataStoreTxEventData, + DbMicroblock, + DataStoreAttachmentData, + DbPoxSyntheticEvent, + DbTxStatus, + DbBnsSubdomain, + DbPoxSetSigners, +} from '../datastore/common'; +import { + getTxSenderAddress, + getTxSponsorAddress, + parseMessageTransaction, + CoreNodeMsgBlockData, + parseMicroblocksFromTxs, + isPoxPrintEvent, + newCoreNoreBlockEventCounts, +} from './reader'; +import { + decodeTransaction, + decodeClarityValue, + ClarityValueBuffer, + ClarityValueStringAscii, + ClarityValueTuple, + TxPayloadTypeID, +} from 'stacks-encoding-native-js'; +import { BnsContractIdentifier } from './bns/bns-constants'; +import { + parseNameFromContractEvent, + parseNameRenewalWithNoZonefileHashFromContractCall, + parseNamespaceFromContractEvent, + parseZoneFileTxt, + parseResolver, +} from './bns/bns-helpers'; +import { PgWriteStore } from '../datastore/pg-write-store'; +import { + createDbMempoolTxFromCoreMsg, + createDbTxFromCoreMsg, + getTxDbStatus, +} from '../datastore/helpers'; +import { decodePoxSyntheticPrintEvent } from './pox-event-parsing'; +import { logger } from '../logger'; +import * as zoneFileParser from 'zone-file'; +import { hexToBuffer, isProdEnv, stopwatch } from '@hirosystems/api-toolkit'; +import { POX_2_CONTRACT_NAME, POX_3_CONTRACT_NAME, POX_4_CONTRACT_NAME } from '../pox-helpers'; + +/// Parses and stores raw events received from a Stacks node. +export interface EventMessageHandler { + handleRawEventRequest( + eventPath: string, + payload: any, + db: PgWriteStore, + sequenceId?: string, + timestamp?: string + ): Promise | void; + handleBlockMessage( + chainId: ChainID, + msg: CoreNodeBlockMessage, + db: PgWriteStore + ): Promise | void; + handleMicroblockMessage( + chainId: ChainID, + msg: CoreNodeMicroblockMessage, + db: PgWriteStore + ): Promise | void; + handleMempoolTxs(rawTxs: string[], db: PgWriteStore): Promise | void; + handleBurnBlock(msg: CoreNodeBurnBlockMessage, db: PgWriteStore): Promise | void; + handleDroppedMempoolTxs( + msg: CoreNodeDropMempoolTxMessage, + db: PgWriteStore + ): Promise | void; + handleNewAttachment(msg: CoreNodeAttachmentMessage[], db: PgWriteStore): Promise | void; +} + +export const DummyEventMessageHandler: EventMessageHandler = { + handleRawEventRequest: () => {}, + handleBlockMessage: () => {}, + handleMicroblockMessage: () => {}, + handleBurnBlock: () => {}, + handleMempoolTxs: () => {}, + handleDroppedMempoolTxs: () => {}, + handleNewAttachment: () => {}, +}; + +function parseDataStoreTxEventData( + parsedTxs: CoreNodeParsedTxMessage[], + events: CoreNodeEvent[], + blockData: { + block_height: number; + index_block_hash: string; + block_time: number; + }, + chainId: ChainID +): DataStoreTxEventData[] { + const dbData: DataStoreTxEventData[] = parsedTxs.map(tx => { + const dbTx: DataStoreBlockUpdateData['txs'][number] = { + tx: createDbTxFromCoreMsg(tx), + stxEvents: [], + stxLockEvents: [], + ftEvents: [], + nftEvents: [], + contractLogEvents: [], + smartContracts: [], + names: [], + namespaces: [], + pox2Events: [], + pox3Events: [], + pox4Events: [], + }; + switch (tx.parsed_tx.payload.type_id) { + case TxPayloadTypeID.VersionedSmartContract: + case TxPayloadTypeID.SmartContract: + const contractId = `${tx.sender_address}.${tx.parsed_tx.payload.contract_name}`; + const clarityVersion = + tx.parsed_tx.payload.type_id == TxPayloadTypeID.VersionedSmartContract + ? tx.parsed_tx.payload.clarity_version + : null; + dbTx.smartContracts.push({ + tx_id: tx.core_tx.txid, + contract_id: contractId, + block_height: blockData.block_height, + clarity_version: clarityVersion, + source_code: tx.parsed_tx.payload.code_body, + abi: JSON.stringify(tx.core_tx.contract_abi), + canonical: true, + }); + break; + case TxPayloadTypeID.ContractCall: + // Name renewals can happen without a zonefile_hash. In that case, the BNS contract does NOT + // emit a `name-renewal` contract log, causing us to miss this event. This function catches + // those cases. + const name = parseNameRenewalWithNoZonefileHashFromContractCall(tx, chainId); + if (name) { + dbTx.names.push(name); + } + break; + default: + break; + } + return dbTx; + }); + + const poxEventLogs: Map = new Map(); + + for (const event of events) { + if (!event.committed) { + logger.debug(`Ignoring uncommitted tx event from tx ${event.txid}`); + continue; + } + const dbTx = dbData.find(entry => entry.tx.tx_id === event.txid); + if (!dbTx) { + throw new Error(`Unexpected missing tx during event parsing by tx_id ${event.txid}`); + } + + if (dbTx.tx.status !== DbTxStatus.Success) { + if (event.type === CoreNodeEventType.ContractEvent) { + let reprStr = '?'; + try { + reprStr = decodeClarityValue(event.contract_event.raw_value).repr; + } catch (e) { + logger.warn(`Failed to decode contract log event: ${event.contract_event.raw_value}`); + } + logger.debug( + `Ignoring tx event from unsuccessful tx ${event.txid}, status: ${dbTx.tx.status}, repr: ${reprStr}` + ); + } else { + logger.debug( + `Ignoring tx event from unsuccessful tx ${event.txid}, status: ${dbTx.tx.status}` + ); + } + continue; + } + + const dbEvent: DbEventBase = { + event_index: event.event_index, + tx_id: event.txid, + tx_index: dbTx.tx.tx_index, + block_height: blockData.block_height, + canonical: true, + }; + + switch (event.type) { + case CoreNodeEventType.ContractEvent: { + const entry: DbSmartContractEvent = { + ...dbEvent, + event_type: DbEventTypeId.SmartContractLog, + contract_identifier: event.contract_event.contract_identifier, + topic: event.contract_event.topic, + value: event.contract_event.raw_value, + }; + dbTx.contractLogEvents.push(entry); + + if (isPoxPrintEvent(event)) { + const network = getChainIDNetwork(chainId) === 'mainnet' ? 'mainnet' : 'testnet'; + const [, contractName] = event.contract_event.contract_identifier.split('.'); + // pox-1 is handled in custom node events + const processSyntheticEvent = [ + POX_2_CONTRACT_NAME, + POX_3_CONTRACT_NAME, + POX_4_CONTRACT_NAME, + ].includes(contractName); + if (processSyntheticEvent) { + const poxEventData = decodePoxSyntheticPrintEvent( + event.contract_event.raw_value, + network + ); + if (poxEventData !== null) { + logger.debug(`Synthetic pox event data for ${contractName}:`, poxEventData); + const dbPoxEvent: DbPoxSyntheticEvent = { + ...dbEvent, + ...poxEventData, + }; + poxEventLogs.set(dbPoxEvent, entry); + switch (contractName) { + case POX_2_CONTRACT_NAME: { + dbTx.pox2Events.push(dbPoxEvent); + break; + } + case POX_3_CONTRACT_NAME: { + dbTx.pox3Events.push(dbPoxEvent); + break; + } + case POX_4_CONTRACT_NAME: { + dbTx.pox4Events.push(dbPoxEvent); + break; + } + } + } + } + } + + // Check if we have new BNS names or namespaces. + const parsedTx = parsedTxs.find(entry => entry.core_tx.txid === event.txid); + if (!parsedTx) { + throw new Error(`Unexpected missing tx during BNS parsing by tx_id ${event.txid}`); + } + const name = parseNameFromContractEvent( + event, + parsedTx, + events, + blockData.block_height, + chainId + ); + if (name) { + dbTx.names.push(name); + } + const namespace = parseNamespaceFromContractEvent(event, parsedTx, blockData.block_height); + if (namespace) { + dbTx.namespaces.push(namespace); + } + break; + } + case CoreNodeEventType.StxLockEvent: { + const entry: DbStxLockEvent = { + ...dbEvent, + event_type: DbEventTypeId.StxLock, + locked_amount: BigInt(event.stx_lock_event.locked_amount), + unlock_height: Number(event.stx_lock_event.unlock_height), + locked_address: event.stx_lock_event.locked_address, + // if no contract name available, then we can correctly assume pox-v1 + contract_name: event.stx_lock_event.contract_identifier?.split('.')[1] ?? 'pox', + }; + dbTx.stxLockEvents.push(entry); + break; + } + case CoreNodeEventType.StxTransferEvent: { + const entry: DbStxEvent = { + ...dbEvent, + event_type: DbEventTypeId.StxAsset, + asset_event_type_id: DbAssetEventTypeId.Transfer, + sender: event.stx_transfer_event.sender, + recipient: event.stx_transfer_event.recipient, + amount: BigInt(event.stx_transfer_event.amount), + memo: event.stx_transfer_event.memo ? '0x' + event.stx_transfer_event.memo : undefined, + }; + dbTx.stxEvents.push(entry); + break; + } + case CoreNodeEventType.StxMintEvent: { + const entry: DbStxEvent = { + ...dbEvent, + event_type: DbEventTypeId.StxAsset, + asset_event_type_id: DbAssetEventTypeId.Mint, + recipient: event.stx_mint_event.recipient, + amount: BigInt(event.stx_mint_event.amount), + }; + dbTx.stxEvents.push(entry); + break; + } + case CoreNodeEventType.StxBurnEvent: { + const entry: DbStxEvent = { + ...dbEvent, + event_type: DbEventTypeId.StxAsset, + asset_event_type_id: DbAssetEventTypeId.Burn, + sender: event.stx_burn_event.sender, + amount: BigInt(event.stx_burn_event.amount), + }; + dbTx.stxEvents.push(entry); + break; + } + case CoreNodeEventType.FtTransferEvent: { + const entry: DbFtEvent = { + ...dbEvent, + event_type: DbEventTypeId.FungibleTokenAsset, + asset_event_type_id: DbAssetEventTypeId.Transfer, + sender: event.ft_transfer_event.sender, + recipient: event.ft_transfer_event.recipient, + asset_identifier: event.ft_transfer_event.asset_identifier, + amount: BigInt(event.ft_transfer_event.amount), + }; + dbTx.ftEvents.push(entry); + break; + } + case CoreNodeEventType.FtMintEvent: { + const entry: DbFtEvent = { + ...dbEvent, + event_type: DbEventTypeId.FungibleTokenAsset, + asset_event_type_id: DbAssetEventTypeId.Mint, + recipient: event.ft_mint_event.recipient, + asset_identifier: event.ft_mint_event.asset_identifier, + amount: BigInt(event.ft_mint_event.amount), + }; + dbTx.ftEvents.push(entry); + break; + } + case CoreNodeEventType.FtBurnEvent: { + const entry: DbFtEvent = { + ...dbEvent, + event_type: DbEventTypeId.FungibleTokenAsset, + asset_event_type_id: DbAssetEventTypeId.Burn, + sender: event.ft_burn_event.sender, + asset_identifier: event.ft_burn_event.asset_identifier, + amount: BigInt(event.ft_burn_event.amount), + }; + dbTx.ftEvents.push(entry); + break; + } + case CoreNodeEventType.NftTransferEvent: { + const entry: DbNftEvent = { + ...dbEvent, + event_type: DbEventTypeId.NonFungibleTokenAsset, + asset_event_type_id: DbAssetEventTypeId.Transfer, + recipient: event.nft_transfer_event.recipient, + sender: event.nft_transfer_event.sender, + asset_identifier: event.nft_transfer_event.asset_identifier, + value: event.nft_transfer_event.raw_value, + }; + dbTx.nftEvents.push(entry); + break; + } + case CoreNodeEventType.NftMintEvent: { + const entry: DbNftEvent = { + ...dbEvent, + event_type: DbEventTypeId.NonFungibleTokenAsset, + asset_event_type_id: DbAssetEventTypeId.Mint, + recipient: event.nft_mint_event.recipient, + asset_identifier: event.nft_mint_event.asset_identifier, + value: event.nft_mint_event.raw_value, + }; + dbTx.nftEvents.push(entry); + break; + } + case CoreNodeEventType.NftBurnEvent: { + const entry: DbNftEvent = { + ...dbEvent, + event_type: DbEventTypeId.NonFungibleTokenAsset, + asset_event_type_id: DbAssetEventTypeId.Burn, + sender: event.nft_burn_event.sender, + asset_identifier: event.nft_burn_event.asset_identifier, + value: event.nft_burn_event.raw_value, + }; + dbTx.nftEvents.push(entry); + break; + } + default: { + throw new Error(`Unexpected CoreNodeEventType: ${inspect(event)}`); + } + } + } + + // Normalize event indexes from per-block to per-transaction contiguous series. + for (const tx of dbData) { + const sortedEvents = [ + tx.contractLogEvents, + tx.ftEvents, + tx.nftEvents, + tx.stxEvents, + tx.stxLockEvents, + ] + .flat() + .sort((a, b) => a.event_index - b.event_index); + tx.tx.event_count = sortedEvents.length; + for (let i = 0; i < sortedEvents.length; i++) { + sortedEvents[i].event_index = i; + } + for (const poxEvent of [tx.pox2Events, tx.pox3Events, tx.pox4Events].flat()) { + const associatedLogEvent = poxEventLogs.get(poxEvent); + if (!associatedLogEvent) { + throw new Error(`Missing associated contract log event for pox event ${poxEvent.tx_id}`); + } + poxEvent.event_index = associatedLogEvent.event_index; + } + } + + return dbData; +} + +export function parseNewBlockMessage( + chainId: ChainID, + msg: CoreNodeBlockMessage, + isEventReplay: boolean +) { + const counts = newCoreNoreBlockEventCounts(); + + const parsedTxs: CoreNodeParsedTxMessage[] = []; + const blockData: CoreNodeMsgBlockData = { + ...msg, + }; + + if (!blockData.block_time) { + // If running in IBD mode, we use the parent burn block timestamp as the receipt date, + // otherwise, use the current timestamp. + const stacksBlockReceiptDate = isEventReplay + ? msg.burn_block_time + : Math.round(Date.now() / 1000); + blockData.block_time = stacksBlockReceiptDate; + } + + msg.transactions.forEach(item => { + const parsedTx = parseMessageTransaction(chainId, item, blockData, msg.events); + if (parsedTx) { + parsedTxs.push(parsedTx); + counts.tx_total += 1; + switch (parsedTx.parsed_tx.payload.type_id) { + case TxPayloadTypeID.Coinbase: + counts.txs.coinbase += 1; + break; + case TxPayloadTypeID.CoinbaseToAltRecipient: + counts.txs.coinbase_to_alt_recipient += 1; + break; + case TxPayloadTypeID.ContractCall: + counts.txs.contract_call += 1; + break; + case TxPayloadTypeID.NakamotoCoinbase: + counts.txs.nakamoto_coinbase += 1; + break; + case TxPayloadTypeID.PoisonMicroblock: + counts.txs.poison_microblock += 1; + break; + case TxPayloadTypeID.SmartContract: + counts.txs.smart_contract += 1; + break; + case TxPayloadTypeID.TenureChange: + counts.txs.tenure_change += 1; + break; + case TxPayloadTypeID.TokenTransfer: + counts.txs.token_transfer += 1; + break; + case TxPayloadTypeID.VersionedSmartContract: + counts.txs.versioned_smart_contract += 1; + break; + } + } + }); + for (const event of msg.events) { + counts.event_total += 1; + counts.events[event.type] += 1; + } + + const signerBitvec = msg.signer_bitvec + ? BitVec.consensusDeserializeToString(msg.signer_bitvec) + : null; + + // Stacks-core does not include the '0x' prefix in the signer signature hex strings + const signerSignatures = + msg.signer_signature?.map(s => (s.startsWith('0x') ? s : '0x' + s)) ?? null; + + // `anchored_cost` is not available in very old versions of stacks-core + const execCost = + msg.anchored_cost ?? + parsedTxs.reduce( + (acc, { core_tx: { execution_cost } }) => ({ + read_count: acc.read_count + execution_cost.read_count, + read_length: acc.read_length + execution_cost.read_length, + runtime: acc.runtime + execution_cost.runtime, + write_count: acc.write_count + execution_cost.write_count, + write_length: acc.write_length + execution_cost.write_length, + }), + { + read_count: 0, + read_length: 0, + runtime: 0, + write_count: 0, + write_length: 0, + } + ); + + if (typeof msg.tenure_height !== 'number' && msg.signer_bitvec) { + logger.warn( + `Nakamoto block ${msg.block_height} event payload has no tenure_height. Use stacks-core version 3.0.0.0.0-rc6 or newer!` + ); + } + + const dbBlock: DbBlock = { + canonical: true, + block_hash: msg.block_hash, + index_block_hash: msg.index_block_hash, + parent_index_block_hash: msg.parent_index_block_hash, + parent_block_hash: msg.parent_block_hash, + parent_microblock_hash: msg.parent_microblock, + parent_microblock_sequence: msg.parent_microblock_sequence, + block_height: msg.block_height, + burn_block_time: msg.burn_block_time, + burn_block_hash: msg.burn_block_hash, + burn_block_height: msg.burn_block_height, + miner_txid: msg.miner_txid, + execution_cost_read_count: execCost.read_count, + execution_cost_read_length: execCost.read_length, + execution_cost_runtime: execCost.runtime, + execution_cost_write_count: execCost.write_count, + execution_cost_write_length: execCost.write_length, + tx_count: msg.transactions.length, + block_time: blockData.block_time, + signer_bitvec: signerBitvec, + signer_signatures: signerSignatures, + tenure_height: msg.tenure_height ?? null, + }; + + logger.debug(`Received block ${msg.block_hash} (${msg.block_height}) from node`, dbBlock); + + const dbMinerRewards: DbMinerReward[] = []; + for (const minerReward of msg.matured_miner_rewards) { + const dbMinerReward: DbMinerReward = { + canonical: true, + block_hash: minerReward.from_stacks_block_hash, + index_block_hash: msg.index_block_hash, + from_index_block_hash: minerReward.from_index_consensus_hash, + mature_block_height: msg.block_height, + recipient: minerReward.recipient, + // If `miner_address` is null then it means pre-Stacks2.1 data, and the `recipient` can be accurately used + miner_address: minerReward.miner_address ?? minerReward.recipient, + coinbase_amount: BigInt(minerReward.coinbase_amount), + tx_fees_anchored: BigInt(minerReward.tx_fees_anchored), + tx_fees_streamed_confirmed: BigInt(minerReward.tx_fees_streamed_confirmed), + tx_fees_streamed_produced: BigInt(minerReward.tx_fees_streamed_produced), + }; + dbMinerRewards.push(dbMinerReward); + counts.miner_rewards += 1; + } + + logger.debug(`Received ${dbMinerRewards.length} matured miner rewards`); + + const dbMicroblocks = parseMicroblocksFromTxs({ + parentIndexBlockHash: msg.parent_index_block_hash, + txs: msg.transactions, + parentBurnBlock: { + height: msg.parent_burn_block_height, + hash: msg.parent_burn_block_hash, + time: msg.parent_burn_block_timestamp, + }, + }).map(mb => { + const microblock: DbMicroblock = { + ...mb, + canonical: true, + microblock_canonical: true, + block_height: msg.block_height, + parent_block_height: msg.block_height - 1, + parent_block_hash: msg.parent_block_hash, + index_block_hash: msg.index_block_hash, + block_hash: msg.block_hash, + }; + counts.microblocks += 1; + return microblock; + }); + + let poxSetSigners: DbPoxSetSigners | undefined; + if (msg.reward_set) { + assertNotNullish( + msg.cycle_number, + () => 'Cycle number must be present if reward set is present' + ); + let signers: DbPoxSetSigners['signers'] = []; + if (msg.reward_set.signers) { + signers = msg.reward_set.signers.map(signer => ({ + signing_key: '0x' + signer.signing_key, + weight: signer.weight, + stacked_amount: BigInt(signer.stacked_amt), + })); + logger.info( + `Received new pox set message, block=${msg.block_height}, cycle=${msg.cycle_number}, signers=${msg.reward_set.signers.length}` + ); + } + let rewardedAddresses: string[] = []; + if (msg.reward_set.rewarded_addresses) { + rewardedAddresses = msg.reward_set.rewarded_addresses; + logger.info( + `Received new pox set message, ${rewardedAddresses.length} rewarded BTC addresses` + ); + } + poxSetSigners = { + cycle_number: msg.cycle_number, + pox_ustx_threshold: BigInt(msg.reward_set.pox_ustx_threshold), + signers, + rewarded_addresses: rewardedAddresses, + }; + } + + const dbData: DataStoreBlockUpdateData = { + block: dbBlock, + microblocks: dbMicroblocks, + minerRewards: dbMinerRewards, + txs: parseDataStoreTxEventData(parsedTxs, msg.events, dbBlock, chainId), + pox_v1_unlock_height: msg.pox_v1_unlock_height, + pox_v2_unlock_height: msg.pox_v2_unlock_height, + pox_v3_unlock_height: msg.pox_v3_unlock_height, + poxSetSigners: poxSetSigners, + }; + + return { dbData, counts }; +} + +export function parseAttachment(msg: CoreNodeAttachmentMessage[]) { + const zoneFiles: { zonefile: string; zonefileHash: string; txId: string }[] = []; + const subdomains: DbBnsSubdomain[] = []; + for (const attachment of msg) { + if ( + attachment.contract_id === BnsContractIdentifier.mainnet || + attachment.contract_id === BnsContractIdentifier.testnet + ) { + const metadataCV = decodeClarityValue< + ClarityValueTuple<{ + op: ClarityValueStringAscii; + name: ClarityValueBuffer; + namespace: ClarityValueBuffer; + }> + >(attachment.metadata); + const op = metadataCV.data['op'].data; + const zonefile = Buffer.from(attachment.content.slice(2), 'hex').toString(); + const zonefileHash = attachment.content_hash; + zoneFiles.push({ + zonefile, + zonefileHash, + txId: attachment.tx_id, + }); + if (op === 'name-update') { + const name = hexToBuffer(metadataCV.data['name'].buffer).toString('utf8'); + const namespace = hexToBuffer(metadataCV.data['namespace'].buffer).toString('utf8'); + const zoneFileContents = zoneFileParser.parseZoneFile(zonefile); + const zoneFileTxt = zoneFileContents.txt; + // Case for subdomain + if (zoneFileTxt) { + for (let i = 0; i < zoneFileTxt.length; i++) { + const zoneFile = zoneFileTxt[i]; + const parsedTxt = parseZoneFileTxt(zoneFile.txt); + if (parsedTxt.owner === '') continue; //if txt has no owner , skip it + const subdomain: DbBnsSubdomain = { + name: name.concat('.', namespace), + namespace_id: namespace, + fully_qualified_subdomain: zoneFile.name.concat('.', name, '.', namespace), + owner: parsedTxt.owner, + zonefile_hash: parsedTxt.zoneFileHash, + zonefile: parsedTxt.zoneFile, + tx_id: attachment.tx_id, + tx_index: -1, + canonical: true, + parent_zonefile_hash: attachment.content_hash.slice(2), + parent_zonefile_index: 0, + block_height: Number.parseInt(attachment.block_height, 10), + zonefile_offset: 1, + resolver: zoneFileContents.uri ? parseResolver(zoneFileContents.uri) : '', + index_block_hash: attachment.index_block_hash, + }; + subdomains.push(subdomain); + } + } + } + } + } + return { zoneFiles, subdomains }; +} + +async function handleNewAttachmentMessage(msg: CoreNodeAttachmentMessage[], db: PgWriteStore) { + const attachments = msg + .map(message => { + if ( + message.contract_id === BnsContractIdentifier.mainnet || + message.contract_id === BnsContractIdentifier.testnet + ) { + const metadataCV = decodeClarityValue< + ClarityValueTuple<{ + op: ClarityValueStringAscii; + name: ClarityValueBuffer; + namespace: ClarityValueBuffer; + }> + >(message.metadata); + return { + op: metadataCV.data['op'].data, + zonefile: message.content.slice(2), + name: hexToBuffer(metadataCV.data['name'].buffer).toString('utf8'), + namespace: hexToBuffer(metadataCV.data['namespace'].buffer).toString('utf8'), + zonefileHash: message.content_hash, + txId: message.tx_id, + indexBlockHash: message.index_block_hash, + blockHeight: Number.parseInt(message.block_height, 10), + } as DataStoreAttachmentData; + } + }) + .filter((msg): msg is DataStoreAttachmentData => !!msg); + await db.updateAttachments(attachments); +} + +async function handleRawEventRequest( + eventPath: string, + payload: any, + db: PgWriteStore, + sequenceId?: string, + timestamp?: string +): Promise { + await db.storeRawEventRequest(eventPath, payload, sequenceId, timestamp); +} + +async function handleBurnBlockMessage( + burnBlockMsg: CoreNodeBurnBlockMessage, + db: PgWriteStore +): Promise { + logger.debug( + `Received burn block message hash ${burnBlockMsg.burn_block_hash}, height: ${burnBlockMsg.burn_block_height}, reward recipients: ${burnBlockMsg.reward_recipients.length}` + ); + const rewards = burnBlockMsg.reward_recipients.map((r, index) => { + const dbReward: DbBurnchainReward = { + canonical: true, + burn_block_hash: burnBlockMsg.burn_block_hash, + burn_block_height: burnBlockMsg.burn_block_height, + burn_amount: BigInt(burnBlockMsg.burn_amount), + reward_recipient: r.recipient, + reward_amount: BigInt(r.amt), + reward_index: index, + }; + return dbReward; + }); + const slotHolders = burnBlockMsg.reward_slot_holders.map((r, index) => { + const slotHolder: DbRewardSlotHolder = { + canonical: true, + burn_block_hash: burnBlockMsg.burn_block_hash, + burn_block_height: burnBlockMsg.burn_block_height, + address: r, + slot_index: index, + }; + return slotHolder; + }); + await db.updateBurnchainRewards({ + burnchainBlockHash: burnBlockMsg.burn_block_hash, + burnchainBlockHeight: burnBlockMsg.burn_block_height, + rewards: rewards, + }); + await db.updateBurnchainRewardSlotHolders({ + burnchainBlockHash: burnBlockMsg.burn_block_hash, + burnchainBlockHeight: burnBlockMsg.burn_block_height, + slotHolders: slotHolders, + }); +} + +async function handleMempoolTxsMessage(rawTxs: string[], db: PgWriteStore): Promise { + logger.debug(`Received ${rawTxs.length} mempool transactions`); + const receiptDate = Math.round(Date.now() / 1000); + const decodedTxs = rawTxs.map(str => { + const parsedTx = decodeTransaction(str); + const txSender = getTxSenderAddress(parsedTx); + const sponsorAddress = getTxSponsorAddress(parsedTx); + return { + txId: parsedTx.tx_id, + sender: txSender, + sponsorAddress, + txData: parsedTx, + rawTx: str, + }; + }); + const dbMempoolTxs = decodedTxs.map(tx => { + logger.debug(`Received mempool tx: ${tx.txId}`); + const dbMempoolTx = createDbMempoolTxFromCoreMsg({ + txId: tx.txId, + txData: tx.txData, + sender: tx.sender, + sponsorAddress: tx.sponsorAddress, + rawTx: tx.rawTx, + receiptDate: receiptDate, + }); + return dbMempoolTx; + }); + await db.updateMempoolTxs({ mempoolTxs: dbMempoolTxs }); +} + +async function handleDroppedMempoolTxsMessage( + msg: CoreNodeDropMempoolTxMessage, + db: PgWriteStore +): Promise { + logger.debug(`Received ${msg.dropped_txids.length} dropped mempool txs`); + const dbTxStatus = getTxDbStatus(msg.reason); + await db.dropMempoolTxs({ status: dbTxStatus, txIds: msg.dropped_txids }); +} + +async function handleMicroblockMessage( + chainId: ChainID, + msg: CoreNodeMicroblockMessage, + db: PgWriteStore +): Promise { + logger.debug(`Received microblock with ${msg.transactions.length} txs`); + const dbMicroblocks = parseMicroblocksFromTxs({ + parentIndexBlockHash: msg.parent_index_block_hash, + txs: msg.transactions, + parentBurnBlock: { + height: msg.burn_block_height, + hash: msg.burn_block_hash, + time: msg.burn_block_timestamp, + }, + }); + const stacksBlockReceiptDate = Math.round(Date.now() / 1000); + const parsedTxs: CoreNodeParsedTxMessage[] = []; + msg.transactions.forEach(tx => { + const blockData: CoreNodeMsgBlockData = { + parent_index_block_hash: msg.parent_index_block_hash, + + parent_burn_block_timestamp: msg.burn_block_timestamp, + parent_burn_block_height: msg.burn_block_height, + parent_burn_block_hash: msg.burn_block_hash, + + // These properties aren't known until the next anchor block that accepts this microblock. + burn_block_time: -1, + burn_block_height: -1, + index_block_hash: '', + block_hash: '', + block_time: stacksBlockReceiptDate, + + // These properties can be determined with a db query, they are set while the db is inserting them. + block_height: -1, + parent_block_hash: '', + }; + const parsedTx = parseMessageTransaction(chainId, tx, blockData, msg.events); + if (parsedTx) { + parsedTxs.push(parsedTx); + } + }); + parsedTxs.forEach(tx => { + logger.debug(`Received microblock mined tx: ${tx.core_tx.txid}`); + }); + const updateData: DataStoreMicroblockUpdateData = { + microblocks: dbMicroblocks, + txs: parseDataStoreTxEventData( + parsedTxs, + msg.events, + { + block_height: -1, // Will be filled during db insert + index_block_hash: '', + block_time: stacksBlockReceiptDate, + }, + chainId + ), + }; + await db.updateMicroblocks(updateData); +} + +async function handleBlockMessage( + chainId: ChainID, + msg: CoreNodeBlockMessage, + db: PgWriteStore +): Promise { + const ingestionTimer = stopwatch(); + const { dbData, counts } = parseNewBlockMessage(chainId, msg, db.isEventReplay); + await db.update(dbData); + const ingestionTime = ingestionTimer.getElapsed(); + logger.info( + counts, + `Ingested block ${msg.block_height} (${msg.block_hash}) in ${ingestionTime}ms` + ); +} + +export function newEventMessageHandler(): EventMessageHandler { + // Create a promise queue so that only one message is handled at a time. + const processorQueue = new PQueue({ concurrency: 1 }); + + let eventTimer: prom.Histogram<'event'> | undefined; + if (isProdEnv) { + eventTimer = new prom.Histogram({ + name: 'stacks_event_ingestion_timers', + help: 'Event ingestion timers', + labelNames: ['event'], + buckets: prom.exponentialBuckets(50, 3, 10), // 10 buckets, from 50 ms to 15 minutes + }); + } + + const observeEvent = async (event: string, fn: () => Promise) => { + const timer = stopwatch(); + try { + await fn(); + } finally { + const elapsedMs = timer.getElapsed(); + eventTimer?.observe({ event }, elapsedMs); + } + }; + + const handler: EventMessageHandler = { + handleRawEventRequest: ( + eventPath: string, + payload: any, + db: PgWriteStore, + sequenceId?: string, + timestamp?: string + ) => { + return processorQueue + .add(() => + observeEvent('raw_event', () => + handleRawEventRequest(eventPath, payload, db, sequenceId, timestamp) + ) + ) + .catch(e => { + logger.error(e, 'Error storing raw core node request data'); + throw e; + }); + }, + handleBlockMessage: (chainId: ChainID, msg: CoreNodeBlockMessage, db: PgWriteStore) => { + return processorQueue + .add(() => observeEvent('block', () => handleBlockMessage(chainId, msg, db))) + .catch(e => { + logger.error(e, 'Error processing core node block message'); + throw e; + }); + }, + handleMicroblockMessage: ( + chainId: ChainID, + msg: CoreNodeMicroblockMessage, + db: PgWriteStore + ) => { + return processorQueue + .add(() => observeEvent('microblock', () => handleMicroblockMessage(chainId, msg, db))) + .catch(e => { + logger.error(e, 'Error processing core node microblock message'); + throw e; + }); + }, + handleBurnBlock: (msg: CoreNodeBurnBlockMessage, db: PgWriteStore) => { + return processorQueue + .add(() => observeEvent('burn_block', () => handleBurnBlockMessage(msg, db))) + .catch(e => { + logger.error(e, 'Error processing core node burn block message'); + throw e; + }); + }, + handleMempoolTxs: (rawTxs: string[], db: PgWriteStore) => { + return processorQueue + .add(() => observeEvent('mempool_txs', () => handleMempoolTxsMessage(rawTxs, db))) + .catch(e => { + logger.error(e, 'Error processing core node mempool message'); + throw e; + }); + }, + handleDroppedMempoolTxs: (msg: CoreNodeDropMempoolTxMessage, db: PgWriteStore) => { + return processorQueue + .add(() => + observeEvent('dropped_mempool_txs', () => handleDroppedMempoolTxsMessage(msg, db)) + ) + .catch(e => { + logger.error(e, 'Error processing core node dropped mempool txs message'); + throw e; + }); + }, + handleNewAttachment: (msg: CoreNodeAttachmentMessage[], db: PgWriteStore) => { + return processorQueue + .add(() => observeEvent('new_attachment', () => handleNewAttachmentMessage(msg, db))) + .catch(e => { + logger.error(e, 'Error processing new attachment message'); + throw e; + }); + }, + }; + + return handler; +} diff --git a/src/event-stream/event-server.ts b/src/event-stream/event-server.ts index 64fbf4f45..681067020 100644 --- a/src/event-stream/event-server.ts +++ b/src/event-stream/event-server.ts @@ -1,728 +1,21 @@ -import { inspect } from 'util'; import * as net from 'net'; import Fastify, { FastifyRequest, FastifyServerOptions } from 'fastify'; -import PQueue from 'p-queue'; -import * as prom from 'prom-client'; -import { - BitVec, - ChainID, - assertNotNullish, - getChainIDNetwork, - getIbdBlockHeight, -} from '../helpers'; +import { ChainID, getIbdBlockHeight } from '../helpers'; import { CoreNodeBlockMessage, - CoreNodeEventType, CoreNodeBurnBlockMessage, CoreNodeDropMempoolTxMessage, CoreNodeAttachmentMessage, CoreNodeMicroblockMessage, - CoreNodeParsedTxMessage, - CoreNodeEvent, } from './core-node-message'; -import { - DbEventBase, - DbSmartContractEvent, - DbStxEvent, - DbEventTypeId, - DbFtEvent, - DbAssetEventTypeId, - DbNftEvent, - DbBlock, - DataStoreBlockUpdateData, - DbStxLockEvent, - DbMinerReward, - DbBurnchainReward, - DbRewardSlotHolder, - DataStoreMicroblockUpdateData, - DataStoreTxEventData, - DbMicroblock, - DataStoreAttachmentData, - DbPoxSyntheticEvent, - DbTxStatus, - DbBnsSubdomain, - DbPoxSetSigners, -} from '../datastore/common'; -import { - getTxSenderAddress, - getTxSponsorAddress, - parseMessageTransaction, - CoreNodeMsgBlockData, - parseMicroblocksFromTxs, - isPoxPrintEvent, - newCoreNoreBlockEventCounts, -} from './reader'; -import { - decodeTransaction, - decodeClarityValue, - ClarityValueBuffer, - ClarityValueStringAscii, - ClarityValueTuple, - TxPayloadTypeID, -} from 'stacks-encoding-native-js'; -import { BnsContractIdentifier } from './bns/bns-constants'; -import { - parseNameFromContractEvent, - parseNameRenewalWithNoZonefileHashFromContractCall, - parseNamespaceFromContractEvent, - parseZoneFileTxt, - parseResolver, -} from './bns/bns-helpers'; import { PgWriteStore } from '../datastore/pg-write-store'; -import { - createDbMempoolTxFromCoreMsg, - createDbTxFromCoreMsg, - getTxDbStatus, -} from '../datastore/helpers'; import { handleBnsImport } from '../import-v1'; -import { decodePoxSyntheticPrintEvent } from './pox-event-parsing'; import { logger } from '../logger'; -import * as zoneFileParser from 'zone-file'; -import { hexToBuffer, isProdEnv, PINO_LOGGER_CONFIG, stopwatch } from '@hirosystems/api-toolkit'; -import { POX_2_CONTRACT_NAME, POX_3_CONTRACT_NAME, POX_4_CONTRACT_NAME } from '../pox-helpers'; +import { isProdEnv, PINO_LOGGER_CONFIG } from '@hirosystems/api-toolkit'; +import { EventMessageHandler, newEventMessageHandler } from './event-message-handler'; const IBD_PRUNABLE_ROUTES = ['/new_mempool_tx', '/drop_mempool_tx', '/new_microblocks']; -async function handleRawEventRequest( - eventPath: string, - payload: any, - db: PgWriteStore -): Promise { - await db.storeRawEventRequest(eventPath, payload); -} - -async function handleBurnBlockMessage( - burnBlockMsg: CoreNodeBurnBlockMessage, - db: PgWriteStore -): Promise { - logger.debug( - `Received burn block message hash ${burnBlockMsg.burn_block_hash}, height: ${burnBlockMsg.burn_block_height}, reward recipients: ${burnBlockMsg.reward_recipients.length}` - ); - const rewards = burnBlockMsg.reward_recipients.map((r, index) => { - const dbReward: DbBurnchainReward = { - canonical: true, - burn_block_hash: burnBlockMsg.burn_block_hash, - burn_block_height: burnBlockMsg.burn_block_height, - burn_amount: BigInt(burnBlockMsg.burn_amount), - reward_recipient: r.recipient, - reward_amount: BigInt(r.amt), - reward_index: index, - }; - return dbReward; - }); - const slotHolders = burnBlockMsg.reward_slot_holders.map((r, index) => { - const slotHolder: DbRewardSlotHolder = { - canonical: true, - burn_block_hash: burnBlockMsg.burn_block_hash, - burn_block_height: burnBlockMsg.burn_block_height, - address: r, - slot_index: index, - }; - return slotHolder; - }); - await db.updateBurnchainRewards({ - burnchainBlockHash: burnBlockMsg.burn_block_hash, - burnchainBlockHeight: burnBlockMsg.burn_block_height, - rewards: rewards, - }); - await db.updateBurnchainRewardSlotHolders({ - burnchainBlockHash: burnBlockMsg.burn_block_hash, - burnchainBlockHeight: burnBlockMsg.burn_block_height, - slotHolders: slotHolders, - }); -} - -async function handleMempoolTxsMessage(rawTxs: string[], db: PgWriteStore): Promise { - logger.debug(`Received ${rawTxs.length} mempool transactions`); - // TODO: mempool-tx receipt date should be sent from the core-node - const receiptDate = Math.round(Date.now() / 1000); - const decodedTxs = rawTxs.map(str => { - const parsedTx = decodeTransaction(str); - const txSender = getTxSenderAddress(parsedTx); - const sponsorAddress = getTxSponsorAddress(parsedTx); - return { - txId: parsedTx.tx_id, - sender: txSender, - sponsorAddress, - txData: parsedTx, - rawTx: str, - }; - }); - const dbMempoolTxs = decodedTxs.map(tx => { - logger.debug(`Received mempool tx: ${tx.txId}`); - const dbMempoolTx = createDbMempoolTxFromCoreMsg({ - txId: tx.txId, - txData: tx.txData, - sender: tx.sender, - sponsorAddress: tx.sponsorAddress, - rawTx: tx.rawTx, - receiptDate: receiptDate, - }); - return dbMempoolTx; - }); - await db.updateMempoolTxs({ mempoolTxs: dbMempoolTxs }); -} - -async function handleDroppedMempoolTxsMessage( - msg: CoreNodeDropMempoolTxMessage, - db: PgWriteStore -): Promise { - logger.debug(`Received ${msg.dropped_txids.length} dropped mempool txs`); - const dbTxStatus = getTxDbStatus(msg.reason); - await db.dropMempoolTxs({ status: dbTxStatus, txIds: msg.dropped_txids }); -} - -async function handleMicroblockMessage( - chainId: ChainID, - msg: CoreNodeMicroblockMessage, - db: PgWriteStore -): Promise { - logger.debug(`Received microblock with ${msg.transactions.length} txs`); - const dbMicroblocks = parseMicroblocksFromTxs({ - parentIndexBlockHash: msg.parent_index_block_hash, - txs: msg.transactions, - parentBurnBlock: { - height: msg.burn_block_height, - hash: msg.burn_block_hash, - time: msg.burn_block_timestamp, - }, - }); - const stacksBlockReceiptDate = Math.round(Date.now() / 1000); - const parsedTxs: CoreNodeParsedTxMessage[] = []; - msg.transactions.forEach(tx => { - const blockData: CoreNodeMsgBlockData = { - parent_index_block_hash: msg.parent_index_block_hash, - - parent_burn_block_timestamp: msg.burn_block_timestamp, - parent_burn_block_height: msg.burn_block_height, - parent_burn_block_hash: msg.burn_block_hash, - - // These properties aren't known until the next anchor block that accepts this microblock. - burn_block_time: -1, - burn_block_height: -1, - index_block_hash: '', - block_hash: '', - block_time: stacksBlockReceiptDate, - - // These properties can be determined with a db query, they are set while the db is inserting them. - block_height: -1, - parent_block_hash: '', - }; - const parsedTx = parseMessageTransaction(chainId, tx, blockData, msg.events); - if (parsedTx) { - parsedTxs.push(parsedTx); - } - }); - parsedTxs.forEach(tx => { - logger.debug(`Received microblock mined tx: ${tx.core_tx.txid}`); - }); - const updateData: DataStoreMicroblockUpdateData = { - microblocks: dbMicroblocks, - txs: parseDataStoreTxEventData( - parsedTxs, - msg.events, - { - block_height: -1, // TODO: fill during initial db insert - index_block_hash: '', - block_time: stacksBlockReceiptDate, - }, - chainId - ), - }; - await db.updateMicroblocks(updateData); -} - -async function handleBlockMessage( - chainId: ChainID, - msg: CoreNodeBlockMessage, - db: PgWriteStore -): Promise { - const ingestionTimer = stopwatch(); - const { dbData, counts } = parseNewBlockMessage(chainId, msg, db.isEventReplay); - await db.update(dbData); - const ingestionTime = ingestionTimer.getElapsed(); - logger.info( - counts, - `Ingested block ${msg.block_height} (${msg.block_hash}) in ${ingestionTime}ms` - ); -} - -function parseDataStoreTxEventData( - parsedTxs: CoreNodeParsedTxMessage[], - events: CoreNodeEvent[], - blockData: { - block_height: number; - index_block_hash: string; - block_time: number; - }, - chainId: ChainID -): DataStoreTxEventData[] { - const dbData: DataStoreTxEventData[] = parsedTxs.map(tx => { - const dbTx: DataStoreBlockUpdateData['txs'][number] = { - tx: createDbTxFromCoreMsg(tx), - stxEvents: [], - stxLockEvents: [], - ftEvents: [], - nftEvents: [], - contractLogEvents: [], - smartContracts: [], - names: [], - namespaces: [], - pox2Events: [], - pox3Events: [], - pox4Events: [], - }; - switch (tx.parsed_tx.payload.type_id) { - case TxPayloadTypeID.VersionedSmartContract: - case TxPayloadTypeID.SmartContract: - const contractId = `${tx.sender_address}.${tx.parsed_tx.payload.contract_name}`; - const clarityVersion = - tx.parsed_tx.payload.type_id == TxPayloadTypeID.VersionedSmartContract - ? tx.parsed_tx.payload.clarity_version - : null; - dbTx.smartContracts.push({ - tx_id: tx.core_tx.txid, - contract_id: contractId, - block_height: blockData.block_height, - clarity_version: clarityVersion, - source_code: tx.parsed_tx.payload.code_body, - abi: JSON.stringify(tx.core_tx.contract_abi), - canonical: true, - }); - break; - case TxPayloadTypeID.ContractCall: - // Name renewals can happen without a zonefile_hash. In that case, the BNS contract does NOT - // emit a `name-renewal` contract log, causing us to miss this event. This function catches - // those cases. - const name = parseNameRenewalWithNoZonefileHashFromContractCall(tx, chainId); - if (name) { - dbTx.names.push(name); - } - break; - default: - break; - } - return dbTx; - }); - - const poxEventLogs: Map = new Map(); - - for (const event of events) { - if (!event.committed) { - logger.debug(`Ignoring uncommitted tx event from tx ${event.txid}`); - continue; - } - const dbTx = dbData.find(entry => entry.tx.tx_id === event.txid); - if (!dbTx) { - throw new Error(`Unexpected missing tx during event parsing by tx_id ${event.txid}`); - } - - if (dbTx.tx.status !== DbTxStatus.Success) { - if (event.type === CoreNodeEventType.ContractEvent) { - let reprStr = '?'; - try { - reprStr = decodeClarityValue(event.contract_event.raw_value).repr; - } catch (e) { - logger.warn(`Failed to decode contract log event: ${event.contract_event.raw_value}`); - } - logger.debug( - `Ignoring tx event from unsuccessful tx ${event.txid}, status: ${dbTx.tx.status}, repr: ${reprStr}` - ); - } else { - logger.debug( - `Ignoring tx event from unsuccessful tx ${event.txid}, status: ${dbTx.tx.status}` - ); - } - continue; - } - - const dbEvent: DbEventBase = { - event_index: event.event_index, - tx_id: event.txid, - tx_index: dbTx.tx.tx_index, - block_height: blockData.block_height, - canonical: true, - }; - - switch (event.type) { - case CoreNodeEventType.ContractEvent: { - const entry: DbSmartContractEvent = { - ...dbEvent, - event_type: DbEventTypeId.SmartContractLog, - contract_identifier: event.contract_event.contract_identifier, - topic: event.contract_event.topic, - value: event.contract_event.raw_value, - }; - dbTx.contractLogEvents.push(entry); - - if (isPoxPrintEvent(event)) { - const network = getChainIDNetwork(chainId) === 'mainnet' ? 'mainnet' : 'testnet'; - const [, contractName] = event.contract_event.contract_identifier.split('.'); - // pox-1 is handled in custom node events - const processSyntheticEvent = [ - POX_2_CONTRACT_NAME, - POX_3_CONTRACT_NAME, - POX_4_CONTRACT_NAME, - ].includes(contractName); - if (processSyntheticEvent) { - const poxEventData = decodePoxSyntheticPrintEvent( - event.contract_event.raw_value, - network - ); - if (poxEventData !== null) { - logger.debug(`Synthetic pox event data for ${contractName}:`, poxEventData); - const dbPoxEvent: DbPoxSyntheticEvent = { - ...dbEvent, - ...poxEventData, - }; - poxEventLogs.set(dbPoxEvent, entry); - switch (contractName) { - case POX_2_CONTRACT_NAME: { - dbTx.pox2Events.push(dbPoxEvent); - break; - } - case POX_3_CONTRACT_NAME: { - dbTx.pox3Events.push(dbPoxEvent); - break; - } - case POX_4_CONTRACT_NAME: { - dbTx.pox4Events.push(dbPoxEvent); - break; - } - } - } - } - } - - // Check if we have new BNS names or namespaces. - const parsedTx = parsedTxs.find(entry => entry.core_tx.txid === event.txid); - if (!parsedTx) { - throw new Error(`Unexpected missing tx during BNS parsing by tx_id ${event.txid}`); - } - const name = parseNameFromContractEvent( - event, - parsedTx, - events, - blockData.block_height, - chainId - ); - if (name) { - dbTx.names.push(name); - } - const namespace = parseNamespaceFromContractEvent(event, parsedTx, blockData.block_height); - if (namespace) { - dbTx.namespaces.push(namespace); - } - break; - } - case CoreNodeEventType.StxLockEvent: { - const entry: DbStxLockEvent = { - ...dbEvent, - event_type: DbEventTypeId.StxLock, - locked_amount: BigInt(event.stx_lock_event.locked_amount), - unlock_height: Number(event.stx_lock_event.unlock_height), - locked_address: event.stx_lock_event.locked_address, - // if no contract name available, then we can correctly assume pox-v1 - contract_name: event.stx_lock_event.contract_identifier?.split('.')[1] ?? 'pox', - }; - dbTx.stxLockEvents.push(entry); - break; - } - case CoreNodeEventType.StxTransferEvent: { - const entry: DbStxEvent = { - ...dbEvent, - event_type: DbEventTypeId.StxAsset, - asset_event_type_id: DbAssetEventTypeId.Transfer, - sender: event.stx_transfer_event.sender, - recipient: event.stx_transfer_event.recipient, - amount: BigInt(event.stx_transfer_event.amount), - memo: event.stx_transfer_event.memo ? '0x' + event.stx_transfer_event.memo : undefined, - }; - dbTx.stxEvents.push(entry); - break; - } - case CoreNodeEventType.StxMintEvent: { - const entry: DbStxEvent = { - ...dbEvent, - event_type: DbEventTypeId.StxAsset, - asset_event_type_id: DbAssetEventTypeId.Mint, - recipient: event.stx_mint_event.recipient, - amount: BigInt(event.stx_mint_event.amount), - }; - dbTx.stxEvents.push(entry); - break; - } - case CoreNodeEventType.StxBurnEvent: { - const entry: DbStxEvent = { - ...dbEvent, - event_type: DbEventTypeId.StxAsset, - asset_event_type_id: DbAssetEventTypeId.Burn, - sender: event.stx_burn_event.sender, - amount: BigInt(event.stx_burn_event.amount), - }; - dbTx.stxEvents.push(entry); - break; - } - case CoreNodeEventType.FtTransferEvent: { - const entry: DbFtEvent = { - ...dbEvent, - event_type: DbEventTypeId.FungibleTokenAsset, - asset_event_type_id: DbAssetEventTypeId.Transfer, - sender: event.ft_transfer_event.sender, - recipient: event.ft_transfer_event.recipient, - asset_identifier: event.ft_transfer_event.asset_identifier, - amount: BigInt(event.ft_transfer_event.amount), - }; - dbTx.ftEvents.push(entry); - break; - } - case CoreNodeEventType.FtMintEvent: { - const entry: DbFtEvent = { - ...dbEvent, - event_type: DbEventTypeId.FungibleTokenAsset, - asset_event_type_id: DbAssetEventTypeId.Mint, - recipient: event.ft_mint_event.recipient, - asset_identifier: event.ft_mint_event.asset_identifier, - amount: BigInt(event.ft_mint_event.amount), - }; - dbTx.ftEvents.push(entry); - break; - } - case CoreNodeEventType.FtBurnEvent: { - const entry: DbFtEvent = { - ...dbEvent, - event_type: DbEventTypeId.FungibleTokenAsset, - asset_event_type_id: DbAssetEventTypeId.Burn, - sender: event.ft_burn_event.sender, - asset_identifier: event.ft_burn_event.asset_identifier, - amount: BigInt(event.ft_burn_event.amount), - }; - dbTx.ftEvents.push(entry); - break; - } - case CoreNodeEventType.NftTransferEvent: { - const entry: DbNftEvent = { - ...dbEvent, - event_type: DbEventTypeId.NonFungibleTokenAsset, - asset_event_type_id: DbAssetEventTypeId.Transfer, - recipient: event.nft_transfer_event.recipient, - sender: event.nft_transfer_event.sender, - asset_identifier: event.nft_transfer_event.asset_identifier, - value: event.nft_transfer_event.raw_value, - }; - dbTx.nftEvents.push(entry); - break; - } - case CoreNodeEventType.NftMintEvent: { - const entry: DbNftEvent = { - ...dbEvent, - event_type: DbEventTypeId.NonFungibleTokenAsset, - asset_event_type_id: DbAssetEventTypeId.Mint, - recipient: event.nft_mint_event.recipient, - asset_identifier: event.nft_mint_event.asset_identifier, - value: event.nft_mint_event.raw_value, - }; - dbTx.nftEvents.push(entry); - break; - } - case CoreNodeEventType.NftBurnEvent: { - const entry: DbNftEvent = { - ...dbEvent, - event_type: DbEventTypeId.NonFungibleTokenAsset, - asset_event_type_id: DbAssetEventTypeId.Burn, - sender: event.nft_burn_event.sender, - asset_identifier: event.nft_burn_event.asset_identifier, - value: event.nft_burn_event.raw_value, - }; - dbTx.nftEvents.push(entry); - break; - } - default: { - throw new Error(`Unexpected CoreNodeEventType: ${inspect(event)}`); - } - } - } - - // Normalize event indexes from per-block to per-transaction contiguous series. - for (const tx of dbData) { - const sortedEvents = [ - tx.contractLogEvents, - tx.ftEvents, - tx.nftEvents, - tx.stxEvents, - tx.stxLockEvents, - ] - .flat() - .sort((a, b) => a.event_index - b.event_index); - tx.tx.event_count = sortedEvents.length; - for (let i = 0; i < sortedEvents.length; i++) { - sortedEvents[i].event_index = i; - } - for (const poxEvent of [tx.pox2Events, tx.pox3Events, tx.pox4Events].flat()) { - const associatedLogEvent = poxEventLogs.get(poxEvent); - if (!associatedLogEvent) { - throw new Error(`Missing associated contract log event for pox event ${poxEvent.tx_id}`); - } - poxEvent.event_index = associatedLogEvent.event_index; - } - } - - return dbData; -} - -async function handleNewAttachmentMessage(msg: CoreNodeAttachmentMessage[], db: PgWriteStore) { - const attachments = msg - .map(message => { - if ( - message.contract_id === BnsContractIdentifier.mainnet || - message.contract_id === BnsContractIdentifier.testnet - ) { - const metadataCV = decodeClarityValue< - ClarityValueTuple<{ - op: ClarityValueStringAscii; - name: ClarityValueBuffer; - namespace: ClarityValueBuffer; - }> - >(message.metadata); - return { - op: metadataCV.data['op'].data, - zonefile: message.content.slice(2), - name: hexToBuffer(metadataCV.data['name'].buffer).toString('utf8'), - namespace: hexToBuffer(metadataCV.data['namespace'].buffer).toString('utf8'), - zonefileHash: message.content_hash, - txId: message.tx_id, - indexBlockHash: message.index_block_hash, - blockHeight: Number.parseInt(message.block_height, 10), - } as DataStoreAttachmentData; - } - }) - .filter((msg): msg is DataStoreAttachmentData => !!msg); - await db.updateAttachments(attachments); -} - -export const DummyEventMessageHandler: EventMessageHandler = { - handleRawEventRequest: () => {}, - handleBlockMessage: () => {}, - handleMicroblockMessage: () => {}, - handleBurnBlock: () => {}, - handleMempoolTxs: () => {}, - handleDroppedMempoolTxs: () => {}, - handleNewAttachment: () => {}, -}; - -interface EventMessageHandler { - handleRawEventRequest(eventPath: string, payload: any, db: PgWriteStore): Promise | void; - handleBlockMessage( - chainId: ChainID, - msg: CoreNodeBlockMessage, - db: PgWriteStore - ): Promise | void; - handleMicroblockMessage( - chainId: ChainID, - msg: CoreNodeMicroblockMessage, - db: PgWriteStore - ): Promise | void; - handleMempoolTxs(rawTxs: string[], db: PgWriteStore): Promise | void; - handleBurnBlock(msg: CoreNodeBurnBlockMessage, db: PgWriteStore): Promise | void; - handleDroppedMempoolTxs( - msg: CoreNodeDropMempoolTxMessage, - db: PgWriteStore - ): Promise | void; - handleNewAttachment(msg: CoreNodeAttachmentMessage[], db: PgWriteStore): Promise | void; -} - -function createMessageProcessorQueue(): EventMessageHandler { - // Create a promise queue so that only one message is handled at a time. - const processorQueue = new PQueue({ concurrency: 1 }); - - let eventTimer: prom.Histogram<'event'> | undefined; - if (isProdEnv) { - eventTimer = new prom.Histogram({ - name: 'stacks_event_ingestion_timers', - help: 'Event ingestion timers', - labelNames: ['event'], - buckets: prom.exponentialBuckets(50, 3, 10), // 10 buckets, from 50 ms to 15 minutes - }); - } - - const observeEvent = async (event: string, fn: () => Promise) => { - const timer = stopwatch(); - try { - await fn(); - } finally { - const elapsedMs = timer.getElapsed(); - eventTimer?.observe({ event }, elapsedMs); - } - }; - - const handler: EventMessageHandler = { - handleRawEventRequest: (eventPath: string, payload: any, db: PgWriteStore) => { - return processorQueue - .add(() => observeEvent('raw_event', () => handleRawEventRequest(eventPath, payload, db))) - .catch(e => { - logger.error(e, 'Error storing raw core node request data'); - throw e; - }); - }, - handleBlockMessage: (chainId: ChainID, msg: CoreNodeBlockMessage, db: PgWriteStore) => { - return processorQueue - .add(() => observeEvent('block', () => handleBlockMessage(chainId, msg, db))) - .catch(e => { - logger.error(e, 'Error processing core node block message'); - throw e; - }); - }, - handleMicroblockMessage: ( - chainId: ChainID, - msg: CoreNodeMicroblockMessage, - db: PgWriteStore - ) => { - return processorQueue - .add(() => observeEvent('microblock', () => handleMicroblockMessage(chainId, msg, db))) - .catch(e => { - logger.error(e, 'Error processing core node microblock message'); - throw e; - }); - }, - handleBurnBlock: (msg: CoreNodeBurnBlockMessage, db: PgWriteStore) => { - return processorQueue - .add(() => observeEvent('burn_block', () => handleBurnBlockMessage(msg, db))) - .catch(e => { - logger.error(e, 'Error processing core node burn block message'); - throw e; - }); - }, - handleMempoolTxs: (rawTxs: string[], db: PgWriteStore) => { - return processorQueue - .add(() => observeEvent('mempool_txs', () => handleMempoolTxsMessage(rawTxs, db))) - .catch(e => { - logger.error(e, 'Error processing core node mempool message'); - throw e; - }); - }, - handleDroppedMempoolTxs: (msg: CoreNodeDropMempoolTxMessage, db: PgWriteStore) => { - return processorQueue - .add(() => - observeEvent('dropped_mempool_txs', () => handleDroppedMempoolTxsMessage(msg, db)) - ) - .catch(e => { - logger.error(e, 'Error processing core node dropped mempool txs message'); - throw e; - }); - }, - handleNewAttachment: (msg: CoreNodeAttachmentMessage[], db: PgWriteStore) => { - return processorQueue - .add(() => observeEvent('new_attachment', () => handleNewAttachmentMessage(msg, db))) - .catch(e => { - logger.error(e, 'Error processing new attachment message'); - throw e; - }); - }, - }; - - return handler; -} - export type EventStreamServer = net.Server & { serverAddress: net.AddressInfo; closeAsync: () => Promise; @@ -738,7 +31,7 @@ export async function startEventServer(opts: { serverPort?: number; }): Promise { const db = opts.datastore; - const messageHandler = opts.messageHandler ?? createMessageProcessorQueue(); + const messageHandler = opts.messageHandler ?? newEventMessageHandler(); let eventHost = opts.serverHost ?? process.env['STACKS_CORE_EVENT_HOST']; const eventPort = opts.serverPort ?? parseInt(process.env['STACKS_CORE_EVENT_PORT'] ?? '', 10); @@ -963,277 +256,3 @@ export async function startEventServer(opts: { }); return eventStreamServer; } - -export function parseNewBlockMessage( - chainId: ChainID, - msg: CoreNodeBlockMessage, - isEventReplay: boolean -) { - const counts = newCoreNoreBlockEventCounts(); - - const parsedTxs: CoreNodeParsedTxMessage[] = []; - const blockData: CoreNodeMsgBlockData = { - ...msg, - }; - - if (!blockData.block_time) { - // If running in IBD mode, we use the parent burn block timestamp as the receipt date, - // otherwise, use the current timestamp. - const stacksBlockReceiptDate = isEventReplay - ? msg.burn_block_time - : Math.round(Date.now() / 1000); - blockData.block_time = stacksBlockReceiptDate; - } - - msg.transactions.forEach(item => { - const parsedTx = parseMessageTransaction(chainId, item, blockData, msg.events); - if (parsedTx) { - parsedTxs.push(parsedTx); - counts.tx_total += 1; - switch (parsedTx.parsed_tx.payload.type_id) { - case TxPayloadTypeID.Coinbase: - counts.txs.coinbase += 1; - break; - case TxPayloadTypeID.CoinbaseToAltRecipient: - counts.txs.coinbase_to_alt_recipient += 1; - break; - case TxPayloadTypeID.ContractCall: - counts.txs.contract_call += 1; - break; - case TxPayloadTypeID.NakamotoCoinbase: - counts.txs.nakamoto_coinbase += 1; - break; - case TxPayloadTypeID.PoisonMicroblock: - counts.txs.poison_microblock += 1; - break; - case TxPayloadTypeID.SmartContract: - counts.txs.smart_contract += 1; - break; - case TxPayloadTypeID.TenureChange: - counts.txs.tenure_change += 1; - break; - case TxPayloadTypeID.TokenTransfer: - counts.txs.token_transfer += 1; - break; - case TxPayloadTypeID.VersionedSmartContract: - counts.txs.versioned_smart_contract += 1; - break; - } - } - }); - for (const event of msg.events) { - counts.event_total += 1; - counts.events[event.type] += 1; - } - - const signerBitvec = msg.signer_bitvec - ? BitVec.consensusDeserializeToString(msg.signer_bitvec) - : null; - - // Stacks-core does not include the '0x' prefix in the signer signature hex strings - const signerSignatures = - msg.signer_signature?.map(s => (s.startsWith('0x') ? s : '0x' + s)) ?? null; - - // `anchored_cost` is not available in very old versions of stacks-core - const execCost = - msg.anchored_cost ?? - parsedTxs.reduce( - (acc, { core_tx: { execution_cost } }) => ({ - read_count: acc.read_count + execution_cost.read_count, - read_length: acc.read_length + execution_cost.read_length, - runtime: acc.runtime + execution_cost.runtime, - write_count: acc.write_count + execution_cost.write_count, - write_length: acc.write_length + execution_cost.write_length, - }), - { - read_count: 0, - read_length: 0, - runtime: 0, - write_count: 0, - write_length: 0, - } - ); - - if (typeof msg.tenure_height !== 'number' && msg.signer_bitvec) { - logger.warn( - `Nakamoto block ${msg.block_height} event payload has no tenure_height. Use stacks-core version 3.0.0.0.0-rc6 or newer!` - ); - } - - const dbBlock: DbBlock = { - canonical: true, - block_hash: msg.block_hash, - index_block_hash: msg.index_block_hash, - parent_index_block_hash: msg.parent_index_block_hash, - parent_block_hash: msg.parent_block_hash, - parent_microblock_hash: msg.parent_microblock, - parent_microblock_sequence: msg.parent_microblock_sequence, - block_height: msg.block_height, - burn_block_time: msg.burn_block_time, - burn_block_hash: msg.burn_block_hash, - burn_block_height: msg.burn_block_height, - miner_txid: msg.miner_txid, - execution_cost_read_count: execCost.read_count, - execution_cost_read_length: execCost.read_length, - execution_cost_runtime: execCost.runtime, - execution_cost_write_count: execCost.write_count, - execution_cost_write_length: execCost.write_length, - tx_count: msg.transactions.length, - block_time: blockData.block_time, - signer_bitvec: signerBitvec, - signer_signatures: signerSignatures, - tenure_height: msg.tenure_height ?? null, - }; - - logger.debug(`Received block ${msg.block_hash} (${msg.block_height}) from node`, dbBlock); - - const dbMinerRewards: DbMinerReward[] = []; - for (const minerReward of msg.matured_miner_rewards) { - const dbMinerReward: DbMinerReward = { - canonical: true, - block_hash: minerReward.from_stacks_block_hash, - index_block_hash: msg.index_block_hash, - from_index_block_hash: minerReward.from_index_consensus_hash, - mature_block_height: msg.block_height, - recipient: minerReward.recipient, - // If `miner_address` is null then it means pre-Stacks2.1 data, and the `recipient` can be accurately used - miner_address: minerReward.miner_address ?? minerReward.recipient, - coinbase_amount: BigInt(minerReward.coinbase_amount), - tx_fees_anchored: BigInt(minerReward.tx_fees_anchored), - tx_fees_streamed_confirmed: BigInt(minerReward.tx_fees_streamed_confirmed), - tx_fees_streamed_produced: BigInt(minerReward.tx_fees_streamed_produced), - }; - dbMinerRewards.push(dbMinerReward); - counts.miner_rewards += 1; - } - - logger.debug(`Received ${dbMinerRewards.length} matured miner rewards`); - - const dbMicroblocks = parseMicroblocksFromTxs({ - parentIndexBlockHash: msg.parent_index_block_hash, - txs: msg.transactions, - parentBurnBlock: { - height: msg.parent_burn_block_height, - hash: msg.parent_burn_block_hash, - time: msg.parent_burn_block_timestamp, - }, - }).map(mb => { - const microblock: DbMicroblock = { - ...mb, - canonical: true, - microblock_canonical: true, - block_height: msg.block_height, - parent_block_height: msg.block_height - 1, - parent_block_hash: msg.parent_block_hash, - index_block_hash: msg.index_block_hash, - block_hash: msg.block_hash, - }; - counts.microblocks += 1; - return microblock; - }); - - let poxSetSigners: DbPoxSetSigners | undefined; - if (msg.reward_set) { - assertNotNullish( - msg.cycle_number, - () => 'Cycle number must be present if reward set is present' - ); - let signers: DbPoxSetSigners['signers'] = []; - if (msg.reward_set.signers) { - signers = msg.reward_set.signers.map(signer => ({ - signing_key: '0x' + signer.signing_key, - weight: signer.weight, - stacked_amount: BigInt(signer.stacked_amt), - })); - logger.info( - `Received new pox set message, block=${msg.block_height}, cycle=${msg.cycle_number}, signers=${msg.reward_set.signers.length}` - ); - } - let rewardedAddresses: string[] = []; - if (msg.reward_set.rewarded_addresses) { - rewardedAddresses = msg.reward_set.rewarded_addresses; - logger.info( - `Received new pox set message, ${rewardedAddresses.length} rewarded BTC addresses` - ); - } - poxSetSigners = { - cycle_number: msg.cycle_number, - pox_ustx_threshold: BigInt(msg.reward_set.pox_ustx_threshold), - signers, - rewarded_addresses: rewardedAddresses, - }; - } - - const dbData: DataStoreBlockUpdateData = { - block: dbBlock, - microblocks: dbMicroblocks, - minerRewards: dbMinerRewards, - txs: parseDataStoreTxEventData(parsedTxs, msg.events, dbBlock, chainId), - pox_v1_unlock_height: msg.pox_v1_unlock_height, - pox_v2_unlock_height: msg.pox_v2_unlock_height, - pox_v3_unlock_height: msg.pox_v3_unlock_height, - poxSetSigners: poxSetSigners, - }; - - return { dbData, counts }; -} - -export function parseAttachment(msg: CoreNodeAttachmentMessage[]) { - const zoneFiles: { zonefile: string; zonefileHash: string; txId: string }[] = []; - const subdomains: DbBnsSubdomain[] = []; - for (const attachment of msg) { - if ( - attachment.contract_id === BnsContractIdentifier.mainnet || - attachment.contract_id === BnsContractIdentifier.testnet - ) { - const metadataCV = decodeClarityValue< - ClarityValueTuple<{ - op: ClarityValueStringAscii; - name: ClarityValueBuffer; - namespace: ClarityValueBuffer; - }> - >(attachment.metadata); - const op = metadataCV.data['op'].data; - const zonefile = Buffer.from(attachment.content.slice(2), 'hex').toString(); - const zonefileHash = attachment.content_hash; - zoneFiles.push({ - zonefile, - zonefileHash, - txId: attachment.tx_id, - }); - if (op === 'name-update') { - const name = hexToBuffer(metadataCV.data['name'].buffer).toString('utf8'); - const namespace = hexToBuffer(metadataCV.data['namespace'].buffer).toString('utf8'); - const zoneFileContents = zoneFileParser.parseZoneFile(zonefile); - const zoneFileTxt = zoneFileContents.txt; - // Case for subdomain - if (zoneFileTxt) { - for (let i = 0; i < zoneFileTxt.length; i++) { - const zoneFile = zoneFileTxt[i]; - const parsedTxt = parseZoneFileTxt(zoneFile.txt); - if (parsedTxt.owner === '') continue; //if txt has no owner , skip it - const subdomain: DbBnsSubdomain = { - name: name.concat('.', namespace), - namespace_id: namespace, - fully_qualified_subdomain: zoneFile.name.concat('.', name, '.', namespace), - owner: parsedTxt.owner, - zonefile_hash: parsedTxt.zoneFileHash, - zonefile: parsedTxt.zoneFile, - tx_id: attachment.tx_id, - tx_index: -1, - canonical: true, - parent_zonefile_hash: attachment.content_hash.slice(2), - parent_zonefile_index: 0, // TODO need to figure out this field - block_height: Number.parseInt(attachment.block_height, 10), - zonefile_offset: 1, - resolver: zoneFileContents.uri ? parseResolver(zoneFileContents.uri) : '', - index_block_hash: attachment.index_block_hash, - }; - subdomains.push(subdomain); - } - } - } - } - } - return { zoneFiles, subdomains }; -} diff --git a/src/event-stream/event-stream.ts b/src/event-stream/event-stream.ts new file mode 100644 index 000000000..a60cf6d27 --- /dev/null +++ b/src/event-stream/event-stream.ts @@ -0,0 +1,85 @@ +import { StacksEventStream, StacksEventStreamType } from '@hirosystems/salt-n-pepper-client'; +import { EventMessageHandler, newEventMessageHandler } from './event-message-handler'; +import { PgWriteStore } from '../datastore/pg-write-store'; +import { ChainID } from '@stacks/common'; +import { + CoreNodeAttachmentMessage, + CoreNodeBlockMessage, + CoreNodeBurnBlockMessage, + CoreNodeDropMempoolTxMessage, + CoreNodeMicroblockMessage, +} from './core-node-message'; +import { handleBnsImport } from '../import-v1'; +import { logger } from 'src/logger'; + +export async function startEventStream(opts: { + datastore: PgWriteStore; + chainId: ChainID; + messageHandler?: EventMessageHandler; +}) { + const db = opts.datastore; + const messageHandler = opts.messageHandler ?? newEventMessageHandler(); + + const lastSequenceId = (await db.getLastEventObserverRequestSequenceId()) ?? '0'; + const eventStream = new StacksEventStream({ + redisUrl: process.env['REDIS_URL'], + eventStreamType: StacksEventStreamType.all, + lastMessageId: lastSequenceId, + }); + await eventStream.connect({ waitForReady: true }); + eventStream.start(async (messageId, timestamp, path, body) => { + logger.info(`${path}: received Stacks stream event`); + switch (path) { + case '/new_block': { + const blockMessage = body as CoreNodeBlockMessage; + await messageHandler.handleBlockMessage(opts.chainId, blockMessage, db); + if (blockMessage.block_height === 1) { + await handleBnsImport(db); + } + await messageHandler.handleRawEventRequest(path, body, db, messageId, timestamp); + break; + } + + case '/new_burn_block': { + const msg = body as CoreNodeBurnBlockMessage; + await messageHandler.handleBurnBlock(msg, db); + await messageHandler.handleRawEventRequest(path, body, db, messageId, timestamp); + break; + } + + case '/new_mempool_tx': { + const rawTxs = body as string[]; + await messageHandler.handleMempoolTxs(rawTxs, db); + await messageHandler.handleRawEventRequest(path, body, db, messageId, timestamp); + break; + } + + case '/drop_mempool_tx': { + const msg = body as CoreNodeDropMempoolTxMessage; + await messageHandler.handleDroppedMempoolTxs(msg, db); + await messageHandler.handleRawEventRequest(path, body, db, messageId, timestamp); + break; + } + + case '/attachments/new': { + const msg = body as CoreNodeAttachmentMessage[]; + await messageHandler.handleNewAttachment(msg, db); + await messageHandler.handleRawEventRequest(path, body, db, messageId, timestamp); + break; + } + + case '/new_microblocks': { + const msg = body as CoreNodeMicroblockMessage; + await messageHandler.handleMicroblockMessage(opts.chainId, msg, db); + await messageHandler.handleRawEventRequest(path, body, db, messageId, timestamp); + break; + } + + default: + logger.warn(`Unhandled stacks stream event: ${path}`); + break; + } + }); + + return eventStream; +} diff --git a/src/index.ts b/src/index.ts index a1c0b1ee0..5121a7ee8 100644 --- a/src/index.ts +++ b/src/index.ts @@ -28,6 +28,7 @@ import { timeout, } from '@hirosystems/api-toolkit'; import Fastify from 'fastify'; +import { startEventStream } from './event-stream/event-stream'; enum StacksApiMode { /** @@ -135,15 +136,27 @@ async function init(): Promise { if (apiMode === StacksApiMode.default || apiMode === StacksApiMode.writeOnly) { const configuredChainID = getApiConfiguredChainID(); - const eventServer = await startEventServer({ - datastore: dbWriteStore, - chainId: configuredChainID, - }); - registerShutdownConfig({ - name: 'Event Server', - handler: () => eventServer.closeAsync(), - forceKillable: false, - }); + if (process.env['STACKS_API_EVENT_SOURCE'] == 'redis') { + const eventStream = await startEventStream({ + datastore: dbWriteStore, + chainId: configuredChainID, + }); + registerShutdownConfig({ + name: 'Event Stream', + handler: () => eventStream.stop(), + forceKillable: false, + }); + } else { + const eventServer = await startEventServer({ + datastore: dbWriteStore, + chainId: configuredChainID, + }); + registerShutdownConfig({ + name: 'Event Server', + handler: () => eventServer.closeAsync(), + forceKillable: false, + }); + } const skipChainIdCheck = parseBoolean(process.env['SKIP_STACKS_CHAIN_ID_CHECK']); if (!skipChainIdCheck) { diff --git a/tests/api/synthetic-stx-txs.test.ts b/tests/api/synthetic-stx-txs.test.ts index 6d961f1da..6dc919232 100644 --- a/tests/api/synthetic-stx-txs.test.ts +++ b/tests/api/synthetic-stx-txs.test.ts @@ -4,7 +4,7 @@ import * as path from 'path'; import { DecodedTxResult, TxPayloadTypeID } from 'stacks-encoding-native-js'; import { CoreNodeBlockMessage } from '../../src/event-stream/core-node-message'; import { parseMessageTransaction } from '../../src/event-stream/reader'; -import { parseNewBlockMessage } from '../../src/event-stream/event-server'; +import { parseNewBlockMessage } from '../../src/event-stream/event-message-handler'; // Test processing of the psuedo-Stacks transactions, i.e. the ones that // originate on the Bitcoin chain, and have a `raw_tx == '0x00'. diff --git a/tests/utils/shared-setup.ts b/tests/utils/shared-setup.ts index 098b23629..2cda5a023 100644 --- a/tests/utils/shared-setup.ts +++ b/tests/utils/shared-setup.ts @@ -1,15 +1,12 @@ import { StacksCoreRpcClient } from '../../src/core-rpc/client'; import { loadDotEnv } from '../../src/helpers'; import { PgWriteStore } from '../../src/datastore/pg-write-store'; -import { - DummyEventMessageHandler, - EventStreamServer, - startEventServer, -} from '../../src/event-stream/event-server'; +import { EventStreamServer, startEventServer } from '../../src/event-stream/event-server'; import { ChainID } from '@stacks/common'; import * as isCI from 'is-ci'; import { migrate } from './test-helpers'; import { timeout } from '@hirosystems/api-toolkit'; +import { DummyEventMessageHandler } from '../../src/event-stream/event-message-handler'; interface GlobalTestEnv { db: PgWriteStore;