diff --git a/src/dbos-executor.ts b/src/dbos-executor.ts index 5a766b9e1..f7432d626 100644 --- a/src/dbos-executor.ts +++ b/src/dbos-executor.ts @@ -17,9 +17,10 @@ import { WorkflowContextImpl, WorkflowStatus, StatusString, + BufferedResult, } from './workflow'; -import { IsolationLevel, Transaction, TransactionConfig } from './transaction'; +import { Transaction, TransactionConfig } from './transaction'; import { CommunicatorConfig, Communicator } from './communicator'; import { TelemetryCollector } from './telemetry/collector'; import { Tracer } from './telemetry/traces'; @@ -36,7 +37,6 @@ import { TypeORMDatabase, UserDatabaseName, KnexUserDatabase, - UserDatabaseClient, } from './user_database'; import { MethodRegistrationBase, getRegisteredOperations, getOrCreateClassRegistration, MethodRegistration } from './decorators'; import { SpanStatusCode } from '@opentelemetry/api'; @@ -115,7 +115,7 @@ export class DBOSExecutor { readonly communicatorInfoMap: Map = new Map(); readonly registeredOperations: Array = []; readonly pendingWorkflowMap: Map> = new Map(); // Map from workflowUUID to workflow promise - readonly pendingAsyncWrites: Map> = new Map(); // Map from workflowUUID to asynchronous write promise + readonly workflowResultBuffer: Map> = new Map(); // Map from workflowUUID to its remaining result buffer. readonly telemetryCollector: TelemetryCollector; readonly flushBufferIntervalMs: number = 1000; @@ -167,7 +167,7 @@ export class DBOSExecutor { this.flushBufferID = setInterval(() => { if (!this.debugMode) { - void this.flushWorkflowStatusBuffer(); + void this.flushWorkflowBuffers(); } }, this.flushBufferIntervalMs); this.logger.debug("Started workflow status buffer worker"); @@ -306,12 +306,8 @@ export class DBOSExecutor { this.logger.info("Waiting for pending workflows to finish."); await Promise.allSettled(this.pendingWorkflowMap.values()); } - if (this.pendingAsyncWrites.size > 0) { - this.logger.info("Waiting for pending buffer flushes to finish"); - await Promise.allSettled(this.pendingAsyncWrites.values()); - } clearInterval(this.flushBufferID); - await this.flushWorkflowStatusBuffer(); + await this.flushWorkflowBuffers(); await this.systemDatabase.destroy(); await this.userDatabase.destroy(); await this.telemetryCollector.destroy(); @@ -395,16 +391,7 @@ export class DBOSExecutor { args = await this.systemDatabase.initWorkflowStatus(internalStatus, args); } else { // For temporary workflows, instead asynchronously record inputs. - const setWorkflowInputs: Promise = this.systemDatabase - .setWorkflowInputs(workflowUUID, args) - .catch((error) => { - (error as Error).message = `Error asynchronously setting workflow inputs: ${(error as Error).message}`; - this.logger.error(error); - }) - .finally(() => { - this.pendingAsyncWrites.delete(workflowUUID); - }); - this.pendingAsyncWrites.set(workflowUUID, setWorkflowInputs); + this.systemDatabase.bufferWorkflowInputs(workflowUUID, args); } const runWorkflow = async () => { @@ -444,23 +431,9 @@ export class DBOSExecutor { this.tracer.endSpan(wCtxt.span); } // Asynchronously flush the result buffer. - const resultBufferFlush: Promise = this.userDatabase - .transaction( - async (client: UserDatabaseClient) => { - if (wCtxt.resultBuffer.size > 0) { - await wCtxt.flushResultBuffer(client); - } - }, - { isolationLevel: IsolationLevel.ReadCommitted } - ) - .catch((error) => { - (error as Error).message = `Error asynchronously flushing result buffer: ${(error as Error).message}`; - this.logger.error(error); - }) - .finally(() => { - this.pendingAsyncWrites.delete(workflowUUID); - }); - this.pendingAsyncWrites.set(workflowUUID, resultBufferFlush); + if (wCtxt.resultBuffer.size > 0) { + this.workflowResultBuffer.set(wCtxt.workflowUUID, wCtxt.resultBuffer); + } return result; }; const workflowPromise: Promise = runWorkflow(); @@ -683,12 +656,62 @@ export class DBOSExecutor { /** * Periodically flush the workflow output buffer to the system database. */ - async flushWorkflowStatusBuffer() { + async flushWorkflowBuffers() { if (this.initialized) { + await this.systemDatabase.flushWorkflowInputsBuffer(); + await this.flushWorkflowResultBuffer(); await this.systemDatabase.flushWorkflowStatusBuffer(); } } + async flushWorkflowResultBuffer(): Promise { + const localBuffer = new Map(this.workflowResultBuffer); + this.workflowResultBuffer.clear(); + const totalSize = localBuffer.size; + const flushBatchSize = 50; + try { + let finishedCnt = 0; + while (finishedCnt < totalSize) { + let sqlStmt = "INSERT INTO dbos.transaction_outputs (workflow_uuid, function_id, output, error, txn_id, txn_snapshot) VALUES "; + let paramCnt = 1; + const values: any[] = []; + const batchUUIDs: string[] = []; + for (const [workflowUUID, wfBuffer] of localBuffer) { + for (const [funcID, recorded] of wfBuffer) { + const output = recorded.output; + const txnSnapshot = recorded.txn_snapshot; + if (paramCnt > 1) { + sqlStmt += ", "; + } + sqlStmt += `($${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, null, $${paramCnt++})`; + values.push(workflowUUID, funcID, JSON.stringify(output), JSON.stringify(null), txnSnapshot); + } + batchUUIDs.push(workflowUUID); + finishedCnt++; + if (batchUUIDs.length >= flushBatchSize) { + // Cap at the batch size. + break; + } + } + this.logger.debug(sqlStmt); + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + await this.userDatabase.query(sqlStmt, ...values); + + // Clean up after each batch succeeds + batchUUIDs.forEach((value) => { localBuffer.delete(value); }); + } + } catch (error) { + (error as Error).message = `Error flushing workflow result buffer: ${(error as Error).message}`; + this.logger.error(error); + // If there is a failure in flushing the buffer, return items to the global buffer for retrying later. + for (const [workflowUUID, wfBuffer] of localBuffer) { + if (!this.workflowResultBuffer.has(workflowUUID)) { + this.workflowResultBuffer.set(workflowUUID, wfBuffer) + } + } + } + } + logRegisteredHTTPUrls() { this.logger.info("HTTP endpoints supported:"); this.registeredOperations.forEach((registeredOperation) => { diff --git a/src/foundationdb/fdb_system_database.ts b/src/foundationdb/fdb_system_database.ts index 9bdd81a46..2dbd7fa65 100644 --- a/src/foundationdb/fdb_system_database.ts +++ b/src/foundationdb/fdb_system_database.ts @@ -30,6 +30,7 @@ export class FoundationDBSystemDatabase implements SystemDatabase { workflowInputsDB: fdb.Database; readonly workflowStatusBuffer: Map = new Map(); + readonly workflowInputsBuffer: Map = new Map(); constructor() { fdb.setAPIVersion(710, 710); @@ -106,7 +107,8 @@ export class FoundationDBSystemDatabase implements SystemDatabase { this.workflowStatusBuffer.set(workflowUUID, status); } - async flushWorkflowStatusBuffer(): Promise { + // TODO: support batching + async flushWorkflowStatusBuffer(): Promise { const localBuffer = new Map(this.workflowStatusBuffer); this.workflowStatusBuffer.clear(); // eslint-disable-next-line @typescript-eslint/require-await @@ -124,7 +126,7 @@ export class FoundationDBSystemDatabase implements SystemDatabase { }); } }); - return Array.from(localBuffer.keys()); + return; } async recordWorkflowError(workflowUUID: string, status: WorkflowStatusInternal): Promise { @@ -145,14 +147,24 @@ export class FoundationDBSystemDatabase implements SystemDatabase { return workflows.filter((i) => i[1].status === StatusString.PENDING && i[1].executorID === executorID).map((i) => i[0]); } - async setWorkflowInputs(workflowUUID: string, args: T): Promise { - await this.dbRoot.doTransaction(async (txn) => { - const inputsDB = txn.at(this.workflowInputsDB); - const inputs = await inputsDB.get(workflowUUID); - if (inputs === undefined) { - inputsDB.set(workflowUUID, args); + bufferWorkflowInputs(workflowUUID: string, args: T): void { + this.workflowInputsBuffer.set(workflowUUID, args); + } + + // TODO: support batching + async flushWorkflowInputsBuffer(): Promise { + const localBuffer = new Map(this.workflowInputsBuffer); + this.workflowInputsBuffer.clear(); + // eslint-disable-next-line @typescript-eslint/require-await + await this.workflowInputsDB.doTransaction(async (txn) => { + for (const [workflowUUID, args] of localBuffer) { + const inputs = await txn.get(workflowUUID); + if (inputs === undefined) { + txn.set(workflowUUID, args); + } } }); + return; } async getWorkflowInputs(workflowUUID: string): Promise { diff --git a/src/system_database.ts b/src/system_database.ts index f03d11843..f4a4db13d 100644 --- a/src/system_database.ts +++ b/src/system_database.ts @@ -17,12 +17,13 @@ export interface SystemDatabase { checkWorkflowOutput(workflowUUID: string): Promise; initWorkflowStatus(bufferedStatus: WorkflowStatusInternal, args: T): Promise; bufferWorkflowOutput(workflowUUID: string, status: WorkflowStatusInternal): void; - flushWorkflowStatusBuffer(): Promise>; + flushWorkflowStatusBuffer(): Promise; recordWorkflowError(workflowUUID: string, status: WorkflowStatusInternal): Promise; getPendingWorkflows(executorID: string): Promise>; - setWorkflowInputs(workflowUUID: string, args: T) : Promise; + bufferWorkflowInputs(workflowUUID: string, args: T) : void; getWorkflowInputs(workflowUUID: string): Promise; + flushWorkflowInputsBuffer(): Promise; checkOperationOutput(workflowUUID: string, functionID: number): Promise; recordOperationOutput(workflowUUID: string, functionID: number, output: R): Promise; @@ -64,6 +65,8 @@ export class PostgresSystemDatabase implements SystemDatabase { readonly workflowEventsMap: Record void> = {}; readonly workflowStatusBuffer: Map = new Map(); + readonly workflowInputsBuffer: Map = new Map(); + readonly flushBatchSize = 100; constructor(readonly pgPoolConfig: PoolConfig, readonly systemDatabaseName: string, readonly logger: Logger) { const poolConfig = { ...pgPoolConfig }; @@ -129,21 +132,38 @@ export class PostgresSystemDatabase implements SystemDatabase { /** * Flush the workflow output buffer to the database. */ - async flushWorkflowStatusBuffer() { + async flushWorkflowStatusBuffer(): Promise { const localBuffer = new Map(this.workflowStatusBuffer); this.workflowStatusBuffer.clear(); + const totalSize = localBuffer.size; try { - const client: PoolClient = await this.pool.connect(); - await client.query("BEGIN"); - for (const [workflowUUID, status] of localBuffer) { - await client.query( - `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_status (workflow_uuid, status, name, authenticated_user, assumed_role, authenticated_roles, request, output, executor_id) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (workflow_uuid) - DO UPDATE SET status=EXCLUDED.status, output=EXCLUDED.output;`, - [workflowUUID, status.status, status.name, status.authenticatedUser, status.assumedRole, JSON.stringify(status.authenticatedRoles), JSON.stringify(status.request), JSON.stringify(status.output), status.executorID] - ); + let finishedCnt = 0; + while (finishedCnt < totalSize) { + let sqlStmt = `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_status (workflow_uuid, status, name, authenticated_user, assumed_role, authenticated_roles, request, output, executor_id) VALUES `; + let paramCnt = 1; + const values: any[] = []; + const batchUUIDs: string[] = []; + for (const [workflowUUID, status] of localBuffer) { + if (paramCnt > 1) { + sqlStmt += ", "; + } + sqlStmt += `($${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++})`; + values.push(workflowUUID, status.status, status.name, status.authenticatedUser, status.assumedRole, JSON.stringify(status.authenticatedRoles), JSON.stringify(status.request), JSON.stringify(status.output), status.executorID); + batchUUIDs.push(workflowUUID); + finishedCnt++; + + if (batchUUIDs.length >= this.flushBatchSize) { + // Cap at the batch size. + break; + } + } + sqlStmt += " ON CONFLICT (workflow_uuid) DO UPDATE SET status=EXCLUDED.status, output=EXCLUDED.output;"; + + await this.pool.query(sqlStmt, values); + + // Clean up after each batch succeeds + batchUUIDs.forEach((value) => { localBuffer.delete(value); }); } - await client.query("COMMIT"); - client.release(); } catch (error) { (error as Error).message = `Error flushing workflow buffer: ${(error as Error).message}`; this.logger.error(error); @@ -154,7 +174,7 @@ export class PostgresSystemDatabase implements SystemDatabase { } } } - return Array.from(localBuffer.keys()); + return; } async recordWorkflowError(workflowUUID: string, status: WorkflowStatusInternal): Promise { @@ -173,11 +193,53 @@ export class PostgresSystemDatabase implements SystemDatabase { return rows.map(i => i.workflow_uuid); } - async setWorkflowInputs(workflowUUID: string, args: T): Promise { - await this.pool.query( - `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_inputs (workflow_uuid, inputs) VALUES($1, $2) ON CONFLICT (workflow_uuid) DO NOTHING`, - [workflowUUID, JSON.stringify(args)] - ) + bufferWorkflowInputs(workflowUUID: string, args: T): void { + this.workflowInputsBuffer.set(workflowUUID, args); + } + + async flushWorkflowInputsBuffer(): Promise { + const localBuffer = new Map(this.workflowInputsBuffer); + this.workflowInputsBuffer.clear(); + const totalSize = localBuffer.size; + try { + let finishedCnt = 0; + while (finishedCnt < totalSize) { + let sqlStmt = `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_inputs (workflow_uuid, inputs) VALUES `; + let paramCnt = 1; + const values: any[] = []; + const batchUUIDs: string[] = []; + for (const [workflowUUID, args] of localBuffer) { + if (paramCnt > 1) { + sqlStmt += ", "; + } + sqlStmt += `($${paramCnt++}, $${paramCnt++})`; + values.push(workflowUUID, JSON.stringify(args)); + batchUUIDs.push(workflowUUID); + finishedCnt++; + + if (batchUUIDs.length >= this.flushBatchSize) { + // Cap at the batch size. + break; + } + } + sqlStmt += " ON CONFLICT (workflow_uuid) DO NOTHING;"; + + await this.pool.query(sqlStmt, values); + + // Clean up after each batch succeeds + batchUUIDs.forEach((value) => { localBuffer.delete(value); }); + } + } catch (error) { + (error as Error).message = `Error flushing workflow inputs buffer: ${(error as Error).message}`; + this.logger.error(error); + // If there is a failure in flushing the buffer, return items to the global buffer for retrying later. + for (const [workflowUUID, args] of localBuffer) { + if (!this.workflowInputsBuffer.has(workflowUUID)) { + this.workflowInputsBuffer.set(workflowUUID, args) + } + } + } + return; } async getWorkflowInputs(workflowUUID: string): Promise { diff --git a/src/workflow.ts b/src/workflow.ts index 9a58c4524..567a63614 100644 --- a/src/workflow.ts +++ b/src/workflow.ts @@ -49,7 +49,7 @@ export interface PgTransactionId { txid: string; } -interface BufferedResult { +export interface BufferedResult { output: unknown; txn_snapshot: string; } diff --git a/tests/concurrency.test.ts b/tests/concurrency.test.ts index 487c15917..b485aca73 100644 --- a/tests/concurrency.test.ts +++ b/tests/concurrency.test.ts @@ -56,7 +56,7 @@ describe("concurrency-tests", () => { await ConcurrTestClass.promise2; const dbosExec = (testRuntime as TestingRuntimeImpl).getDBOSExec(); - await dbosExec.flushWorkflowStatusBuffer(); + await dbosExec.flushWorkflowBuffers(); ConcurrTestClass.resolve(); await handle.getResult(); diff --git a/tests/dbos.test.ts b/tests/dbos.test.ts index d65e1bd03..2199f118c 100644 --- a/tests/dbos.test.ts +++ b/tests/dbos.test.ts @@ -206,7 +206,7 @@ describe("dbos-tests", () => { // Flush workflow output buffer so the retrieved handle can proceed and the status would transition to SUCCESS. const dbosExec = (testRuntime as TestingRuntimeImpl).getDBOSExec(); - await dbosExec.flushWorkflowStatusBuffer(); + await dbosExec.flushWorkflowBuffers(); const retrievedHandle = testRuntime.retrieveWorkflow(workflowUUID); expect(retrievedHandle).not.toBeNull(); expect(retrievedHandle.getWorkflowUUID()).toBe(workflowUUID); diff --git a/tests/foundationdb/foundationdb.test.ts b/tests/foundationdb/foundationdb.test.ts index 109c93aab..660fe557b 100644 --- a/tests/foundationdb/foundationdb.test.ts +++ b/tests/foundationdb/foundationdb.test.ts @@ -73,7 +73,7 @@ describe("foundationdb-dbos", () => { await expect(invokedHandle.then((x) => x.getResult())).resolves.toBe(3); const dbosExec = (testRuntime as TestingRuntimeImpl).getDBOSExec(); - await dbosExec.flushWorkflowStatusBuffer(); + await dbosExec.flushWorkflowBuffers(); await expect(retrievedHandle.getResult()).resolves.toBe(3); await expect(retrievedHandle.getStatus()).resolves.toMatchObject({ status: StatusString.SUCCESS, diff --git a/tests/httpServer/server.test.ts b/tests/httpServer/server.test.ts index 74c30ea23..475c1e042 100644 --- a/tests/httpServer/server.test.ts +++ b/tests/httpServer/server.test.ts @@ -157,7 +157,7 @@ describe("httpserver-tests", () => { expect(response.text).toBe("hello 1"); const dbosExec = (testRuntime as TestingRuntimeImpl).getDBOSExec(); - await dbosExec.flushWorkflowStatusBuffer(); + await dbosExec.flushWorkflowBuffers(); // Retrieve the workflow with UUID. const retrievedHandle = testRuntime.retrieveWorkflow(workflowUUID); diff --git a/tests/oaoo.test.ts b/tests/oaoo.test.ts index 05fc47046..cbc9e90d7 100644 --- a/tests/oaoo.test.ts +++ b/tests/oaoo.test.ts @@ -157,7 +157,7 @@ describe("oaoo-tests", () => { ).resolves.toBe(1); // Retrieve output of the child workflow. - await dbosExec.flushWorkflowStatusBuffer(); + await dbosExec.flushWorkflowBuffers(); const retrievedHandle = testRuntime.retrieveWorkflow(workflowUUID + "-0"); await expect(retrievedHandle.getResult()).resolves.toBe(1); });