From a1f5b12675118f6d7742c54e3420c38151aef4a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20C=C3=A1rdenas?= Date: Tue, 20 Aug 2024 10:47:07 -0600 Subject: [PATCH] feat: export events tsv directly to postgres instance (#2048) (#2058) * fix: export events tsv directly to postgres instance * fix: remove unused function * chore: option to export events to either local file or client * chore: try new docker path * chore: divide remote and local paths * fix: try relative path for mkdir * ci: try chmod * ci: run mkdir first * ci: try with sudo * fix: file paths --------- Co-authored-by: Matthew Little --- .github/workflows/ci.yml | 3 ++ docker/docker-compose.dev.bitcoind.yml | 1 - docker/docker-compose.dev.postgres.yml | 5 +- .../docker-compose.dev.stacks-blockchain.yml | 1 - src/event-replay/connection-legacy.ts | 22 -------- src/event-replay/event-replay.ts | 24 ++++++--- src/event-replay/event-requests.ts | 52 +++++++++++-------- src/tests-event-replay/import-export-tests.ts | 45 +++++++++++++--- 8 files changed, 89 insertions(+), 64 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 39d80ecf6f..ee940ce5a5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -201,6 +201,9 @@ jobs: - name: Setup integration environment run: | sudo ufw disable + mkdir -p src/tests-event-replay/.tmp/local/ + sudo chown 999:999 src/tests-event-replay/.tmp/local/ + sudo chmod -R 777 src/tests-event-replay/.tmp/local/ docker compose -f docker/docker-compose.dev.postgres.yml up -d npm run devenv:logs -- --no-color &> docker-compose-logs.txt & diff --git a/docker/docker-compose.dev.bitcoind.yml b/docker/docker-compose.dev.bitcoind.yml index 3ed5851cb7..ace8c81aa5 100644 --- a/docker/docker-compose.dev.bitcoind.yml +++ b/docker/docker-compose.dev.bitcoind.yml @@ -1,4 +1,3 @@ -version: "3" services: bitcoind: image: "blockstack/bitcoind:v0.20.99.0" diff --git a/docker/docker-compose.dev.postgres.yml b/docker/docker-compose.dev.postgres.yml index 188c6075e7..316fbab99e 100644 --- a/docker/docker-compose.dev.postgres.yml +++ b/docker/docker-compose.dev.postgres.yml @@ -1,7 +1,6 @@ -version: '3.7' services: postgres: - image: "postgres:14" + image: "postgres:15" ports: - "5490:5432" environment: @@ -9,3 +8,5 @@ services: POSTGRES_PASSWORD: postgres POSTGRES_DB: stacks_blockchain_api POSTGRES_PORT: 5432 + volumes: + - ../src/tests-event-replay/.tmp/local/:/root/ diff --git a/docker/docker-compose.dev.stacks-blockchain.yml b/docker/docker-compose.dev.stacks-blockchain.yml index 2a353119ea..0fecb0a1bb 100644 --- a/docker/docker-compose.dev.stacks-blockchain.yml +++ b/docker/docker-compose.dev.stacks-blockchain.yml @@ -1,4 +1,3 @@ -version: '3.7' services: stacks-blockchain: image: 'hirosystems/stacks-api-e2e:stacks3.0-0a2c0e2' diff --git a/src/event-replay/connection-legacy.ts b/src/event-replay/connection-legacy.ts index 19564f9e5d..a26c285224 100644 --- a/src/event-replay/connection-legacy.ts +++ b/src/event-replay/connection-legacy.ts @@ -150,25 +150,3 @@ function getPgClientConfig({ return clientConfig; } } - -/** - * Creates a postgres pool client connection. If the connection fails due to a transient error, it is retried until successful. - * You'd expect that the pg lib to handle this, but it doesn't, see https://github.com/brianc/node-postgres/issues/1789 - */ -export async function connectWithRetry(pool: Pool): Promise { - for (let retryAttempts = 1; ; retryAttempts++) { - try { - const client = await pool.connect(); - return client; - } catch (error: any) { - // Check for transient errors, and retry after 1 second - const pgConnectionError = isPgConnectionError(error); - if (pgConnectionError) { - logger.warn(`${pgConnectionError}, will retry, attempt #${retryAttempts}`); - await timeout(1000); - } else { - throw error; - } - } - } -} diff --git a/src/event-replay/event-replay.ts b/src/event-replay/event-replay.ts index e3f7b93bc6..c5faae65b5 100644 --- a/src/event-replay/event-replay.ts +++ b/src/event-replay/event-replay.ts @@ -38,16 +38,24 @@ export async function exportEventsAsTsv( if (!filePath) { throw new Error(`A file path should be specified with the --file option`); } - const resolvedFilePath = path.resolve(filePath); - if (fs.existsSync(resolvedFilePath) && overwriteFile !== true) { - throw new Error( - `A file already exists at ${resolvedFilePath}. Add --overwrite-file to truncate an existing file` - ); + const isLocal = filePath.startsWith('local:'); + if (isLocal) { + filePath = filePath.replace(/^local:/, ''); + if (!path.isAbsolute(filePath)) { + throw new Error(`The file path must be absolute`); + } + } else { + const resolvedFilePath = path.resolve(filePath); + if (fs.existsSync(resolvedFilePath) && overwriteFile !== true) { + throw new Error( + `A file already exists at ${resolvedFilePath}. Add --overwrite-file to truncate an existing file` + ); + } } - console.log(`Export event data to file: ${resolvedFilePath}`); - const writeStream = fs.createWriteStream(resolvedFilePath); + + console.log(`Exporting event data to ${filePath}`); console.log(`Export started...`); - await exportRawEventRequests(writeStream); + await exportRawEventRequests(filePath, isLocal); console.log('Export successful.'); } diff --git a/src/event-replay/event-requests.ts b/src/event-replay/event-requests.ts index eb789f523e..ad9d9e2dd9 100644 --- a/src/event-replay/event-requests.ts +++ b/src/event-replay/event-requests.ts @@ -1,30 +1,38 @@ -import { pipelineAsync } from '../helpers'; -import { Readable, Writable } from 'stream'; +import { pipeline } from 'node:stream/promises'; +import { Readable } from 'stream'; import { DbRawEventRequest } from '../datastore/common'; -import { PgServer } from '../datastore/connection'; -import { connectPgPool, connectWithRetry } from './connection-legacy'; +import { getConnectionArgs, getConnectionConfig, PgServer } from '../datastore/connection'; +import { connectPgPool } from './connection-legacy'; import * as pgCopyStreams from 'pg-copy-streams'; import * as PgCursor from 'pg-cursor'; +import { connectPostgres } from '@hirosystems/api-toolkit'; +import { createWriteStream } from 'node:fs'; -export async function exportRawEventRequests(targetStream: Writable): Promise { - const pool = await connectPgPool({ - usageName: 'export-raw-events', - pgServer: PgServer.primary, +export async function exportRawEventRequests(filePath: string, local: boolean): Promise { + const sql = await connectPostgres({ + usageName: `export-events`, + connectionArgs: getConnectionArgs(PgServer.primary), + connectionConfig: getConnectionConfig(PgServer.primary), }); - const client = await connectWithRetry(pool); - try { - const copyQuery = pgCopyStreams.to( - ` - COPY (SELECT id, receive_timestamp, event_path, payload FROM event_observer_requests ORDER BY id ASC) - TO STDOUT ENCODING 'UTF8' - ` - ); - const queryStream = client.query(copyQuery); - await pipelineAsync(queryStream, targetStream); - } finally { - client.release(); - await pool.end(); + const copyQuery = sql` + COPY ( + SELECT id, receive_timestamp, event_path, payload + FROM event_observer_requests + ORDER BY id ASC + )`; + if (local) { + await sql`${copyQuery} + TO '${sql.unsafe(filePath)}' + WITH (FORMAT TEXT, DELIMITER E'\t', ENCODING 'UTF8') + `; + } else { + const readableStream = await sql`${copyQuery} + TO STDOUT + WITH (FORMAT TEXT, DELIMITER E'\t', ENCODING 'UTF8') + `.readable(); + await pipeline(readableStream, createWriteStream(filePath)); } + await sql.end(); } export async function* getRawEventRequests( @@ -61,7 +69,7 @@ export async function* getRawEventRequests( `); onStatusUpdate?.('Importing raw event requests into temporary table...'); const importStream = client.query(pgCopyStreams.from(`COPY temp_raw_tsv FROM STDIN`)); - await pipelineAsync(readStream, importStream); + await pipeline(readStream, importStream); onStatusUpdate?.('Removing any duplicate raw event requests...'); await client.query(` INSERT INTO temp_event_observer_requests diff --git a/src/tests-event-replay/import-export-tests.ts b/src/tests-event-replay/import-export-tests.ts index cdb8f29a8f..c5d71d14d4 100644 --- a/src/tests-event-replay/import-export-tests.ts +++ b/src/tests-event-replay/import-export-tests.ts @@ -1,5 +1,6 @@ import { ChainID } from '@stacks/transactions'; import * as fs from 'fs'; +import * as path from 'path'; import { getRawEventRequests } from '../event-replay/event-requests'; import { PgWriteStore } from '../datastore/pg-write-store'; import { exportEventsAsTsv, importEventsFromTsv } from '../event-replay/event-replay'; @@ -25,7 +26,7 @@ describe('import/export tests', () => { await db?.close(); }); - test('event import and export cycle', async () => { + test('event import and export cycle - remote', async () => { // Import from mocknet TSV await importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true); const chainTip = await db.getChainTip(db.sql); @@ -38,14 +39,42 @@ describe('import/export tests', () => { ); // Export into temp TSV - const tmpDir = 'src/tests-event-replay/.tmp'; + const tmpDir = 'src/tests-event-replay/.tmp/remote'; + fs.mkdirSync(tmpDir, { recursive: true }); + await exportEventsAsTsv(`${tmpDir}/export.tsv`); + + // Re-import with exported TSV and check that chain tip matches. try { - fs.mkdirSync(tmpDir); - } catch (error: any) { - if (error.code != 'EEXIST') throw error; + await importEventsFromTsv(`${tmpDir}/export.tsv`, 'archival', true, true); + const newChainTip = await db.getChainTip(db.sql); + expect(newChainTip.block_height).toBe(28); + expect(newChainTip.index_block_hash).toBe( + '0x76cd67a65c0dfd5ea450bb9efe30da89fa125bfc077c953802f718353283a533' + ); + expect(newChainTip.block_hash).toBe( + '0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2' + ); + } finally { + fs.rmSync(`${tmpDir}/export.tsv`); } - const tmpTsvPath = `${tmpDir}/export.tsv`; - await exportEventsAsTsv(tmpTsvPath, true); + }); + + test('event import and export cycle - local', async () => { + // Import from mocknet TSV + await importEventsFromTsv('src/tests-event-replay/tsv/mocknet.tsv', 'archival', true, true); + const chainTip = await db.getChainTip(db.sql); + expect(chainTip.block_height).toBe(28); + expect(chainTip.index_block_hash).toBe( + '0x76cd67a65c0dfd5ea450bb9efe30da89fa125bfc077c953802f718353283a533' + ); + expect(chainTip.block_hash).toBe( + '0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2' + ); + + // Export into temp TSV + const tmpDir = 'src/tests-event-replay/.tmp/local'; + fs.mkdirSync(tmpDir, { recursive: true }); + await exportEventsAsTsv('local:/root/export.tsv'); // Re-import with exported TSV and check that chain tip matches. try { @@ -59,7 +88,7 @@ describe('import/export tests', () => { '0x7682af212d3c1ef62613412f9b5a727269b4548f14eca2e3f941f7ad8b3c11b2' ); } finally { - fs.rmSync(tmpDir, { force: true, recursive: true }); + fs.rmSync(`${tmpDir}/export.tsv`); } });