Skip to content

Commit

Permalink
Optimize Async Writes (#238)
Browse files Browse the repository at this point in the history
This PR groups individual async writes into larger insert batches into
Postgres, reducing the number of round trips to the database as well as
improving efficiency. Specifically, this PR groups async writes of:
- Workflow inputs for temporary workflows
- Workflow final result/status
- Workflow unflushed and buffered transaction outputs

We can tune the batch size later and implement batching for FDB as well.
  • Loading branch information
qianl15 authored Jan 4, 2024
1 parent 1467abb commit e4986c1
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 70 deletions.
97 changes: 60 additions & 37 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import {
WorkflowContextImpl,
WorkflowStatus,
StatusString,
BufferedResult,
} from './workflow';

import { IsolationLevel, Transaction, TransactionConfig } from './transaction';
import { Transaction, TransactionConfig } from './transaction';
import { CommunicatorConfig, Communicator } from './communicator';
import { TelemetryCollector } from './telemetry/collector';
import { Tracer } from './telemetry/traces';
Expand All @@ -36,7 +37,6 @@ import {
TypeORMDatabase,
UserDatabaseName,
KnexUserDatabase,
UserDatabaseClient,
} from './user_database';
import { MethodRegistrationBase, getRegisteredOperations, getOrCreateClassRegistration, MethodRegistration } from './decorators';
import { SpanStatusCode } from '@opentelemetry/api';
Expand Down Expand Up @@ -115,7 +115,7 @@ export class DBOSExecutor {
readonly communicatorInfoMap: Map<string, CommunicatorInfo> = new Map();
readonly registeredOperations: Array<MethodRegistrationBase> = [];
readonly pendingWorkflowMap: Map<string, Promise<unknown>> = new Map(); // Map from workflowUUID to workflow promise
readonly pendingAsyncWrites: Map<string, Promise<unknown>> = new Map(); // Map from workflowUUID to asynchronous write promise
readonly workflowResultBuffer: Map<string, Map<number, BufferedResult>> = new Map(); // Map from workflowUUID to its remaining result buffer.

readonly telemetryCollector: TelemetryCollector;
readonly flushBufferIntervalMs: number = 1000;
Expand Down Expand Up @@ -167,7 +167,7 @@ export class DBOSExecutor {

this.flushBufferID = setInterval(() => {
if (!this.debugMode) {
void this.flushWorkflowStatusBuffer();
void this.flushWorkflowBuffers();
}
}, this.flushBufferIntervalMs);
this.logger.debug("Started workflow status buffer worker");
Expand Down Expand Up @@ -306,12 +306,8 @@ export class DBOSExecutor {
this.logger.info("Waiting for pending workflows to finish.");
await Promise.allSettled(this.pendingWorkflowMap.values());
}
if (this.pendingAsyncWrites.size > 0) {
this.logger.info("Waiting for pending buffer flushes to finish");
await Promise.allSettled(this.pendingAsyncWrites.values());
}
clearInterval(this.flushBufferID);
await this.flushWorkflowStatusBuffer();
await this.flushWorkflowBuffers();
await this.systemDatabase.destroy();
await this.userDatabase.destroy();
await this.telemetryCollector.destroy();
Expand Down Expand Up @@ -395,16 +391,7 @@ export class DBOSExecutor {
args = await this.systemDatabase.initWorkflowStatus(internalStatus, args);
} else {
// For temporary workflows, instead asynchronously record inputs.
const setWorkflowInputs: Promise<void> = this.systemDatabase
.setWorkflowInputs<T>(workflowUUID, args)
.catch((error) => {
(error as Error).message = `Error asynchronously setting workflow inputs: ${(error as Error).message}`;
this.logger.error(error);
})
.finally(() => {
this.pendingAsyncWrites.delete(workflowUUID);
});
this.pendingAsyncWrites.set(workflowUUID, setWorkflowInputs);
this.systemDatabase.bufferWorkflowInputs(workflowUUID, args);
}

const runWorkflow = async () => {
Expand Down Expand Up @@ -444,23 +431,9 @@ export class DBOSExecutor {
this.tracer.endSpan(wCtxt.span);
}
// Asynchronously flush the result buffer.
const resultBufferFlush: Promise<void> = this.userDatabase
.transaction(
async (client: UserDatabaseClient) => {
if (wCtxt.resultBuffer.size > 0) {
await wCtxt.flushResultBuffer(client);
}
},
{ isolationLevel: IsolationLevel.ReadCommitted }
)
.catch((error) => {
(error as Error).message = `Error asynchronously flushing result buffer: ${(error as Error).message}`;
this.logger.error(error);
})
.finally(() => {
this.pendingAsyncWrites.delete(workflowUUID);
});
this.pendingAsyncWrites.set(workflowUUID, resultBufferFlush);
if (wCtxt.resultBuffer.size > 0) {
this.workflowResultBuffer.set(wCtxt.workflowUUID, wCtxt.resultBuffer);
}
return result;
};
const workflowPromise: Promise<R> = runWorkflow();
Expand Down Expand Up @@ -683,12 +656,62 @@ export class DBOSExecutor {
/**
* Periodically flush the workflow output buffer to the system database.
*/
async flushWorkflowStatusBuffer() {
async flushWorkflowBuffers() {
if (this.initialized) {
await this.systemDatabase.flushWorkflowInputsBuffer();
await this.flushWorkflowResultBuffer();
await this.systemDatabase.flushWorkflowStatusBuffer();
}
}

async flushWorkflowResultBuffer(): Promise<void> {
const localBuffer = new Map(this.workflowResultBuffer);
this.workflowResultBuffer.clear();
const totalSize = localBuffer.size;
const flushBatchSize = 50;
try {
let finishedCnt = 0;
while (finishedCnt < totalSize) {
let sqlStmt = "INSERT INTO dbos.transaction_outputs (workflow_uuid, function_id, output, error, txn_id, txn_snapshot) VALUES ";
let paramCnt = 1;
const values: any[] = [];
const batchUUIDs: string[] = [];
for (const [workflowUUID, wfBuffer] of localBuffer) {
for (const [funcID, recorded] of wfBuffer) {
const output = recorded.output;
const txnSnapshot = recorded.txn_snapshot;
if (paramCnt > 1) {
sqlStmt += ", ";
}
sqlStmt += `($${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, null, $${paramCnt++})`;
values.push(workflowUUID, funcID, JSON.stringify(output), JSON.stringify(null), txnSnapshot);
}
batchUUIDs.push(workflowUUID);
finishedCnt++;
if (batchUUIDs.length >= flushBatchSize) {
// Cap at the batch size.
break;
}
}
this.logger.debug(sqlStmt);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await this.userDatabase.query(sqlStmt, ...values);

// Clean up after each batch succeeds
batchUUIDs.forEach((value) => { localBuffer.delete(value); });
}
} catch (error) {
(error as Error).message = `Error flushing workflow result buffer: ${(error as Error).message}`;
this.logger.error(error);
// If there is a failure in flushing the buffer, return items to the global buffer for retrying later.
for (const [workflowUUID, wfBuffer] of localBuffer) {
if (!this.workflowResultBuffer.has(workflowUUID)) {
this.workflowResultBuffer.set(workflowUUID, wfBuffer)
}
}
}
}

logRegisteredHTTPUrls() {
this.logger.info("HTTP endpoints supported:");
this.registeredOperations.forEach((registeredOperation) => {
Expand Down
28 changes: 20 additions & 8 deletions src/foundationdb/fdb_system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class FoundationDBSystemDatabase implements SystemDatabase {
workflowInputsDB: fdb.Database<string, string, unknown, unknown>;

readonly workflowStatusBuffer: Map<string, WorkflowStatusInternal> = new Map();
readonly workflowInputsBuffer: Map<string, any[]> = new Map();

constructor() {
fdb.setAPIVersion(710, 710);
Expand Down Expand Up @@ -106,7 +107,8 @@ export class FoundationDBSystemDatabase implements SystemDatabase {
this.workflowStatusBuffer.set(workflowUUID, status);
}

async flushWorkflowStatusBuffer(): Promise<string[]> {
// TODO: support batching
async flushWorkflowStatusBuffer(): Promise<void> {
const localBuffer = new Map(this.workflowStatusBuffer);
this.workflowStatusBuffer.clear();
// eslint-disable-next-line @typescript-eslint/require-await
Expand All @@ -124,7 +126,7 @@ export class FoundationDBSystemDatabase implements SystemDatabase {
});
}
});
return Array.from(localBuffer.keys());
return;
}

async recordWorkflowError(workflowUUID: string, status: WorkflowStatusInternal): Promise<void> {
Expand All @@ -145,14 +147,24 @@ export class FoundationDBSystemDatabase implements SystemDatabase {
return workflows.filter((i) => i[1].status === StatusString.PENDING && i[1].executorID === executorID).map((i) => i[0]);
}

async setWorkflowInputs<T extends any[]>(workflowUUID: string, args: T): Promise<void> {
await this.dbRoot.doTransaction(async (txn) => {
const inputsDB = txn.at(this.workflowInputsDB);
const inputs = await inputsDB.get(workflowUUID);
if (inputs === undefined) {
inputsDB.set(workflowUUID, args);
bufferWorkflowInputs<T extends any[]>(workflowUUID: string, args: T): void {
this.workflowInputsBuffer.set(workflowUUID, args);
}

// TODO: support batching
async flushWorkflowInputsBuffer(): Promise<void> {
const localBuffer = new Map(this.workflowInputsBuffer);
this.workflowInputsBuffer.clear();
// eslint-disable-next-line @typescript-eslint/require-await
await this.workflowInputsDB.doTransaction(async (txn) => {
for (const [workflowUUID, args] of localBuffer) {
const inputs = await txn.get(workflowUUID);
if (inputs === undefined) {
txn.set(workflowUUID, args);
}
}
});
return;
}

async getWorkflowInputs<T extends any[]>(workflowUUID: string): Promise<T | null> {
Expand Down
100 changes: 81 additions & 19 deletions src/system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ export interface SystemDatabase {
checkWorkflowOutput<R>(workflowUUID: string): Promise<DBOSNull | R>;
initWorkflowStatus<T extends any[]>(bufferedStatus: WorkflowStatusInternal, args: T): Promise<T>;
bufferWorkflowOutput(workflowUUID: string, status: WorkflowStatusInternal): void;
flushWorkflowStatusBuffer(): Promise<Array<string>>;
flushWorkflowStatusBuffer(): Promise<void>;
recordWorkflowError(workflowUUID: string, status: WorkflowStatusInternal): Promise<void>;

getPendingWorkflows(executorID: string): Promise<Array<string>>;
setWorkflowInputs<T extends any[]>(workflowUUID: string, args: T) : Promise<void>;
bufferWorkflowInputs<T extends any[]>(workflowUUID: string, args: T) : void;
getWorkflowInputs<T extends any[]>(workflowUUID: string): Promise<T | null>;
flushWorkflowInputsBuffer(): Promise<void>;

checkOperationOutput<R>(workflowUUID: string, functionID: number): Promise<DBOSNull | R>;
recordOperationOutput<R>(workflowUUID: string, functionID: number, output: R): Promise<void>;
Expand Down Expand Up @@ -64,6 +65,8 @@ export class PostgresSystemDatabase implements SystemDatabase {
readonly workflowEventsMap: Record<string, () => void> = {};

readonly workflowStatusBuffer: Map<string, WorkflowStatusInternal> = new Map();
readonly workflowInputsBuffer: Map<string, any[]> = new Map();
readonly flushBatchSize = 100;

constructor(readonly pgPoolConfig: PoolConfig, readonly systemDatabaseName: string, readonly logger: Logger) {
const poolConfig = { ...pgPoolConfig };
Expand Down Expand Up @@ -129,21 +132,38 @@ export class PostgresSystemDatabase implements SystemDatabase {
/**
* Flush the workflow output buffer to the database.
*/
async flushWorkflowStatusBuffer() {
async flushWorkflowStatusBuffer(): Promise<void> {
const localBuffer = new Map(this.workflowStatusBuffer);
this.workflowStatusBuffer.clear();
const totalSize = localBuffer.size;
try {
const client: PoolClient = await this.pool.connect();
await client.query("BEGIN");
for (const [workflowUUID, status] of localBuffer) {
await client.query(
`INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_status (workflow_uuid, status, name, authenticated_user, assumed_role, authenticated_roles, request, output, executor_id) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (workflow_uuid)
DO UPDATE SET status=EXCLUDED.status, output=EXCLUDED.output;`,
[workflowUUID, status.status, status.name, status.authenticatedUser, status.assumedRole, JSON.stringify(status.authenticatedRoles), JSON.stringify(status.request), JSON.stringify(status.output), status.executorID]
);
let finishedCnt = 0;
while (finishedCnt < totalSize) {
let sqlStmt = `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_status (workflow_uuid, status, name, authenticated_user, assumed_role, authenticated_roles, request, output, executor_id) VALUES `;
let paramCnt = 1;
const values: any[] = [];
const batchUUIDs: string[] = [];
for (const [workflowUUID, status] of localBuffer) {
if (paramCnt > 1) {
sqlStmt += ", ";
}
sqlStmt += `($${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++})`;
values.push(workflowUUID, status.status, status.name, status.authenticatedUser, status.assumedRole, JSON.stringify(status.authenticatedRoles), JSON.stringify(status.request), JSON.stringify(status.output), status.executorID);
batchUUIDs.push(workflowUUID);
finishedCnt++;

if (batchUUIDs.length >= this.flushBatchSize) {
// Cap at the batch size.
break;
}
}
sqlStmt += " ON CONFLICT (workflow_uuid) DO UPDATE SET status=EXCLUDED.status, output=EXCLUDED.output;";

await this.pool.query(sqlStmt, values);

// Clean up after each batch succeeds
batchUUIDs.forEach((value) => { localBuffer.delete(value); });
}
await client.query("COMMIT");
client.release();
} catch (error) {
(error as Error).message = `Error flushing workflow buffer: ${(error as Error).message}`;
this.logger.error(error);
Expand All @@ -154,7 +174,7 @@ export class PostgresSystemDatabase implements SystemDatabase {
}
}
}
return Array.from(localBuffer.keys());
return;
}

async recordWorkflowError(workflowUUID: string, status: WorkflowStatusInternal): Promise<void> {
Expand All @@ -173,11 +193,53 @@ export class PostgresSystemDatabase implements SystemDatabase {
return rows.map(i => i.workflow_uuid);
}

async setWorkflowInputs<T extends any[]>(workflowUUID: string, args: T): Promise<void> {
await this.pool.query<workflow_inputs>(
`INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_inputs (workflow_uuid, inputs) VALUES($1, $2) ON CONFLICT (workflow_uuid) DO NOTHING`,
[workflowUUID, JSON.stringify(args)]
)
bufferWorkflowInputs<T extends any[]>(workflowUUID: string, args: T): void {
this.workflowInputsBuffer.set(workflowUUID, args);
}

async flushWorkflowInputsBuffer(): Promise<void> {
const localBuffer = new Map(this.workflowInputsBuffer);
this.workflowInputsBuffer.clear();
const totalSize = localBuffer.size;
try {
let finishedCnt = 0;
while (finishedCnt < totalSize) {
let sqlStmt = `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_inputs (workflow_uuid, inputs) VALUES `;
let paramCnt = 1;
const values: any[] = [];
const batchUUIDs: string[] = [];
for (const [workflowUUID, args] of localBuffer) {
if (paramCnt > 1) {
sqlStmt += ", ";
}
sqlStmt += `($${paramCnt++}, $${paramCnt++})`;
values.push(workflowUUID, JSON.stringify(args));
batchUUIDs.push(workflowUUID);
finishedCnt++;

if (batchUUIDs.length >= this.flushBatchSize) {
// Cap at the batch size.
break;
}
}
sqlStmt += " ON CONFLICT (workflow_uuid) DO NOTHING;";

await this.pool.query(sqlStmt, values);

// Clean up after each batch succeeds
batchUUIDs.forEach((value) => { localBuffer.delete(value); });
}
} catch (error) {
(error as Error).message = `Error flushing workflow inputs buffer: ${(error as Error).message}`;
this.logger.error(error);
// If there is a failure in flushing the buffer, return items to the global buffer for retrying later.
for (const [workflowUUID, args] of localBuffer) {
if (!this.workflowInputsBuffer.has(workflowUUID)) {
this.workflowInputsBuffer.set(workflowUUID, args)
}
}
}
return;
}

async getWorkflowInputs<T extends any[]>(workflowUUID: string): Promise<T | null> {
Expand Down
2 changes: 1 addition & 1 deletion src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export interface PgTransactionId {
txid: string;
}

interface BufferedResult {
export interface BufferedResult {
output: unknown;
txn_snapshot: string;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/concurrency.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe("concurrency-tests", () => {
await ConcurrTestClass.promise2;

const dbosExec = (testRuntime as TestingRuntimeImpl).getDBOSExec();
await dbosExec.flushWorkflowStatusBuffer();
await dbosExec.flushWorkflowBuffers();
ConcurrTestClass.resolve();
await handle.getResult();

Expand Down
2 changes: 1 addition & 1 deletion tests/dbos.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ describe("dbos-tests", () => {

// Flush workflow output buffer so the retrieved handle can proceed and the status would transition to SUCCESS.
const dbosExec = (testRuntime as TestingRuntimeImpl).getDBOSExec();
await dbosExec.flushWorkflowStatusBuffer();
await dbosExec.flushWorkflowBuffers();
const retrievedHandle = testRuntime.retrieveWorkflow<string>(workflowUUID);
expect(retrievedHandle).not.toBeNull();
expect(retrievedHandle.getWorkflowUUID()).toBe(workflowUUID);
Expand Down
2 changes: 1 addition & 1 deletion tests/foundationdb/foundationdb.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe("foundationdb-dbos", () => {
await expect(invokedHandle.then((x) => x.getResult())).resolves.toBe(3);

const dbosExec = (testRuntime as TestingRuntimeImpl).getDBOSExec();
await dbosExec.flushWorkflowStatusBuffer();
await dbosExec.flushWorkflowBuffers();
await expect(retrievedHandle.getResult()).resolves.toBe(3);
await expect(retrievedHandle.getStatus()).resolves.toMatchObject({
status: StatusString.SUCCESS,
Expand Down
Loading

0 comments on commit e4986c1

Please sign in to comment.