Skip to content

Commit

Permalink
batch insert for workflow remaining outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
qianl15 committed Jan 4, 2024
1 parent e9e2a07 commit cd71b63
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 27 deletions.
79 changes: 55 additions & 24 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import {
WorkflowContextImpl,
WorkflowStatus,
StatusString,
BufferedResult,
} from './workflow';

import { IsolationLevel, Transaction, TransactionConfig } from './transaction';
import { Transaction, TransactionConfig } from './transaction';
import { CommunicatorConfig, Communicator } from './communicator';
import { TelemetryCollector } from './telemetry/collector';
import { Tracer } from './telemetry/traces';
Expand All @@ -36,7 +37,6 @@ import {
TypeORMDatabase,
UserDatabaseName,
KnexUserDatabase,
UserDatabaseClient,
} from './user_database';
import { MethodRegistrationBase, getRegisteredOperations, getOrCreateClassRegistration, MethodRegistration } from './decorators';
import { SpanStatusCode } from '@opentelemetry/api';
Expand Down Expand Up @@ -115,7 +115,7 @@ export class DBOSExecutor {
readonly communicatorInfoMap: Map<string, CommunicatorInfo> = new Map();
readonly registeredOperations: Array<MethodRegistrationBase> = [];
readonly pendingWorkflowMap: Map<string, Promise<unknown>> = new Map(); // Map from workflowUUID to workflow promise
readonly pendingAsyncWrites: Map<string, Promise<unknown>> = new Map(); // Map from workflowUUID to asynchronous write promise
readonly workflowResultBuffer: Map<string, Map<number, BufferedResult>> = new Map(); // Map from workflowUUID to its remaining result buffer.

readonly telemetryCollector: TelemetryCollector;
readonly flushBufferIntervalMs: number = 1000;
Expand Down Expand Up @@ -306,10 +306,6 @@ export class DBOSExecutor {
this.logger.info("Waiting for pending workflows to finish.");
await Promise.allSettled(this.pendingWorkflowMap.values());
}
if (this.pendingAsyncWrites.size > 0) {
this.logger.info("Waiting for pending buffer flushes to finish");
await Promise.allSettled(this.pendingAsyncWrites.values());
}
clearInterval(this.flushBufferID);
await this.flushWorkflowBuffers();
await this.systemDatabase.destroy();
Expand Down Expand Up @@ -435,23 +431,9 @@ export class DBOSExecutor {
this.tracer.endSpan(wCtxt.span);
}
// Asynchronously flush the result buffer.
const resultBufferFlush: Promise<void> = this.userDatabase
.transaction(
async (client: UserDatabaseClient) => {
if (wCtxt.resultBuffer.size > 0) {
await wCtxt.flushResultBuffer(client);
}
},
{ isolationLevel: IsolationLevel.ReadCommitted }
)
.catch((error) => {
(error as Error).message = `Error asynchronously flushing result buffer: ${(error as Error).message}`;
this.logger.error(error);
})
.finally(() => {
this.pendingAsyncWrites.delete(workflowUUID);
});
this.pendingAsyncWrites.set(workflowUUID, resultBufferFlush);
if (wCtxt.resultBuffer.size > 0) {
this.workflowResultBuffer.set(wCtxt.workflowUUID, wCtxt.resultBuffer);
}
return result;
};
const workflowPromise: Promise<R> = runWorkflow();
Expand Down Expand Up @@ -677,10 +659,59 @@ export class DBOSExecutor {
async flushWorkflowBuffers() {
if (this.initialized) {
await this.systemDatabase.flushWorkflowInputsBuffer();
await this.flushWorkflowResultBuffer();
await this.systemDatabase.flushWorkflowStatusBuffer();
}
}

async flushWorkflowResultBuffer(): Promise<void> {
const localBuffer = new Map(this.workflowResultBuffer);
this.workflowResultBuffer.clear();
const totalSize = localBuffer.size;
const flushBatchSize = 50;
try {
let finishedCnt = 0;
while (finishedCnt < totalSize) {
let sqlStmt = "INSERT INTO dbos.transaction_outputs (workflow_uuid, function_id, output, error, txn_id, txn_snapshot) VALUES ";
let paramCnt = 1;
const values: any[] = [];
const batchUUIDs: string[] = [];
for (const [workflowUUID, wfBuffer] of localBuffer) {
for (const [funcID, recorded] of wfBuffer) {
const output = recorded.output;
const txnSnapshot = recorded.txn_snapshot;
if (paramCnt > 1) {
sqlStmt += ", ";
}
sqlStmt += `($${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, null, $${paramCnt++})`;
values.push(workflowUUID, funcID, JSON.stringify(output), JSON.stringify(null), txnSnapshot);
}
batchUUIDs.push(workflowUUID);
finishedCnt++;
if (batchUUIDs.length >= flushBatchSize) {
// Cap at the batch size.
break;
}
}
this.logger.debug(sqlStmt);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await this.userDatabase.query(sqlStmt, ...values);

// Clean up after each batch succeeds
batchUUIDs.forEach((value) => { localBuffer.delete(value); });
}
} catch (error) {
(error as Error).message = `Error flushing workflow result buffer: ${(error as Error).message}`;
this.logger.error(error);
// If there is a failure in flushing the buffer, return items to the global buffer for retrying later.
for (const [workflowUUID, wfBuffer] of localBuffer) {
if (!this.workflowResultBuffer.has(workflowUUID)) {
this.workflowResultBuffer.set(workflowUUID, wfBuffer)
}
}
}
}

logRegisteredHTTPUrls() {
this.logger.info("HTTP endpoints supported:");
this.registeredOperations.forEach((registeredOperation) => {
Expand Down
2 changes: 0 additions & 2 deletions src/system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ export class PostgresSystemDatabase implements SystemDatabase {
}
sqlStmt += " ON CONFLICT (workflow_uuid) DO UPDATE SET status=EXCLUDED.status, output=EXCLUDED.output;";

// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await this.pool.query(sqlStmt, values);

// Clean up after each batch succeeds
Expand Down Expand Up @@ -225,7 +224,6 @@ export class PostgresSystemDatabase implements SystemDatabase {
}
sqlStmt += " ON CONFLICT (workflow_uuid) DO NOTHING;";

// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await this.pool.query(sqlStmt, values);

// Clean up after each batch succeeds
Expand Down
2 changes: 1 addition & 1 deletion src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export interface PgTransactionId {
txid: string;
}

interface BufferedResult {
export interface BufferedResult {
output: unknown;
txn_snapshot: string;
}
Expand Down

0 comments on commit cd71b63

Please sign in to comment.