Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize Async Writes #238

Merged
merged 3 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 60 additions & 37 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import {
WorkflowContextImpl,
WorkflowStatus,
StatusString,
BufferedResult,
} from './workflow';

import { IsolationLevel, Transaction, TransactionConfig } from './transaction';
import { Transaction, TransactionConfig } from './transaction';
import { CommunicatorConfig, Communicator } from './communicator';
import { TelemetryCollector } from './telemetry/collector';
import { Tracer } from './telemetry/traces';
Expand All @@ -36,7 +37,6 @@ import {
TypeORMDatabase,
UserDatabaseName,
KnexUserDatabase,
UserDatabaseClient,
} from './user_database';
import { MethodRegistrationBase, getRegisteredOperations, getOrCreateClassRegistration, MethodRegistration } from './decorators';
import { SpanStatusCode } from '@opentelemetry/api';
Expand Down Expand Up @@ -115,7 +115,7 @@ export class DBOSExecutor {
readonly communicatorInfoMap: Map<string, CommunicatorInfo> = new Map();
readonly registeredOperations: Array<MethodRegistrationBase> = [];
readonly pendingWorkflowMap: Map<string, Promise<unknown>> = new Map(); // Map from workflowUUID to workflow promise
readonly pendingAsyncWrites: Map<string, Promise<unknown>> = new Map(); // Map from workflowUUID to asynchronous write promise
readonly workflowResultBuffer: Map<string, Map<number, BufferedResult>> = new Map(); // Map from workflowUUID to its remaining result buffer.

readonly telemetryCollector: TelemetryCollector;
readonly flushBufferIntervalMs: number = 1000;
Expand Down Expand Up @@ -167,7 +167,7 @@ export class DBOSExecutor {

this.flushBufferID = setInterval(() => {
if (!this.debugMode) {
void this.flushWorkflowStatusBuffer();
void this.flushWorkflowBuffers();
}
}, this.flushBufferIntervalMs);
this.logger.debug("Started workflow status buffer worker");
Expand Down Expand Up @@ -306,12 +306,8 @@ export class DBOSExecutor {
this.logger.info("Waiting for pending workflows to finish.");
await Promise.allSettled(this.pendingWorkflowMap.values());
}
if (this.pendingAsyncWrites.size > 0) {
this.logger.info("Waiting for pending buffer flushes to finish");
await Promise.allSettled(this.pendingAsyncWrites.values());
}
clearInterval(this.flushBufferID);
await this.flushWorkflowStatusBuffer();
await this.flushWorkflowBuffers();
await this.systemDatabase.destroy();
await this.userDatabase.destroy();
await this.telemetryCollector.destroy();
Expand Down Expand Up @@ -395,16 +391,7 @@ export class DBOSExecutor {
args = await this.systemDatabase.initWorkflowStatus(internalStatus, args);
} else {
// For temporary workflows, instead asynchronously record inputs.
const setWorkflowInputs: Promise<void> = this.systemDatabase
.setWorkflowInputs<T>(workflowUUID, args)
.catch((error) => {
(error as Error).message = `Error asynchronously setting workflow inputs: ${(error as Error).message}`;
this.logger.error(error);
})
.finally(() => {
this.pendingAsyncWrites.delete(workflowUUID);
});
this.pendingAsyncWrites.set(workflowUUID, setWorkflowInputs);
this.systemDatabase.bufferWorkflowInputs(workflowUUID, args);
}

const runWorkflow = async () => {
Expand Down Expand Up @@ -444,23 +431,9 @@ export class DBOSExecutor {
this.tracer.endSpan(wCtxt.span);
}
// Asynchronously flush the result buffer.
const resultBufferFlush: Promise<void> = this.userDatabase
.transaction(
async (client: UserDatabaseClient) => {
if (wCtxt.resultBuffer.size > 0) {
await wCtxt.flushResultBuffer(client);
}
},
{ isolationLevel: IsolationLevel.ReadCommitted }
)
.catch((error) => {
(error as Error).message = `Error asynchronously flushing result buffer: ${(error as Error).message}`;
this.logger.error(error);
})
.finally(() => {
this.pendingAsyncWrites.delete(workflowUUID);
});
this.pendingAsyncWrites.set(workflowUUID, resultBufferFlush);
if (wCtxt.resultBuffer.size > 0) {
this.workflowResultBuffer.set(wCtxt.workflowUUID, wCtxt.resultBuffer);
}
return result;
};
const workflowPromise: Promise<R> = runWorkflow();
Expand Down Expand Up @@ -683,12 +656,62 @@ export class DBOSExecutor {
/**
* Periodically flush the workflow output buffer to the system database.
*/
async flushWorkflowStatusBuffer() {
async flushWorkflowBuffers() {
if (this.initialized) {
await this.systemDatabase.flushWorkflowInputsBuffer();
await this.flushWorkflowResultBuffer();
await this.systemDatabase.flushWorkflowStatusBuffer();
}
}

async flushWorkflowResultBuffer(): Promise<void> {
const localBuffer = new Map(this.workflowResultBuffer);
this.workflowResultBuffer.clear();
const totalSize = localBuffer.size;
const flushBatchSize = 50;
try {
let finishedCnt = 0;
while (finishedCnt < totalSize) {
let sqlStmt = "INSERT INTO dbos.transaction_outputs (workflow_uuid, function_id, output, error, txn_id, txn_snapshot) VALUES ";
let paramCnt = 1;
const values: any[] = [];
const batchUUIDs: string[] = [];
for (const [workflowUUID, wfBuffer] of localBuffer) {
for (const [funcID, recorded] of wfBuffer) {
const output = recorded.output;
const txnSnapshot = recorded.txn_snapshot;
if (paramCnt > 1) {
sqlStmt += ", ";
}
sqlStmt += `($${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, null, $${paramCnt++})`;
values.push(workflowUUID, funcID, JSON.stringify(output), JSON.stringify(null), txnSnapshot);
}
batchUUIDs.push(workflowUUID);
finishedCnt++;
if (batchUUIDs.length >= flushBatchSize) {
// Cap at the batch size.
break;
}
}
this.logger.debug(sqlStmt);
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
await this.userDatabase.query(sqlStmt, ...values);

// Clean up after each batch succeeds
batchUUIDs.forEach((value) => { localBuffer.delete(value); });
}
} catch (error) {
(error as Error).message = `Error flushing workflow result buffer: ${(error as Error).message}`;
this.logger.error(error);
// If there is a failure in flushing the buffer, return items to the global buffer for retrying later.
for (const [workflowUUID, wfBuffer] of localBuffer) {
if (!this.workflowResultBuffer.has(workflowUUID)) {
this.workflowResultBuffer.set(workflowUUID, wfBuffer)
}
}
}
}

logRegisteredHTTPUrls() {
this.logger.info("HTTP endpoints supported:");
this.registeredOperations.forEach((registeredOperation) => {
Expand Down
28 changes: 20 additions & 8 deletions src/foundationdb/fdb_system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export class FoundationDBSystemDatabase implements SystemDatabase {
workflowInputsDB: fdb.Database<string, string, unknown, unknown>;

readonly workflowStatusBuffer: Map<string, WorkflowStatusInternal> = new Map();
readonly workflowInputsBuffer: Map<string, any[]> = new Map();

constructor() {
fdb.setAPIVersion(710, 710);
Expand Down Expand Up @@ -106,7 +107,8 @@ export class FoundationDBSystemDatabase implements SystemDatabase {
this.workflowStatusBuffer.set(workflowUUID, status);
}

async flushWorkflowStatusBuffer(): Promise<string[]> {
// TODO: support batching
async flushWorkflowStatusBuffer(): Promise<void> {
const localBuffer = new Map(this.workflowStatusBuffer);
this.workflowStatusBuffer.clear();
// eslint-disable-next-line @typescript-eslint/require-await
Expand All @@ -124,7 +126,7 @@ export class FoundationDBSystemDatabase implements SystemDatabase {
});
}
});
return Array.from(localBuffer.keys());
return;
}

async recordWorkflowError(workflowUUID: string, status: WorkflowStatusInternal): Promise<void> {
Expand All @@ -145,14 +147,24 @@ export class FoundationDBSystemDatabase implements SystemDatabase {
return workflows.filter((i) => i[1].status === StatusString.PENDING && i[1].executorID === executorID).map((i) => i[0]);
}

async setWorkflowInputs<T extends any[]>(workflowUUID: string, args: T): Promise<void> {
await this.dbRoot.doTransaction(async (txn) => {
const inputsDB = txn.at(this.workflowInputsDB);
const inputs = await inputsDB.get(workflowUUID);
if (inputs === undefined) {
inputsDB.set(workflowUUID, args);
bufferWorkflowInputs<T extends any[]>(workflowUUID: string, args: T): void {
this.workflowInputsBuffer.set(workflowUUID, args);
}

// TODO: support batching
async flushWorkflowInputsBuffer(): Promise<void> {
const localBuffer = new Map(this.workflowInputsBuffer);
this.workflowInputsBuffer.clear();
// eslint-disable-next-line @typescript-eslint/require-await
await this.workflowInputsDB.doTransaction(async (txn) => {
for (const [workflowUUID, args] of localBuffer) {
const inputs = await txn.get(workflowUUID);
if (inputs === undefined) {
txn.set(workflowUUID, args);
}
}
});
return;
}

async getWorkflowInputs<T extends any[]>(workflowUUID: string): Promise<T | null> {
Expand Down
100 changes: 81 additions & 19 deletions src/system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ export interface SystemDatabase {
checkWorkflowOutput<R>(workflowUUID: string): Promise<DBOSNull | R>;
initWorkflowStatus<T extends any[]>(bufferedStatus: WorkflowStatusInternal, args: T): Promise<T>;
bufferWorkflowOutput(workflowUUID: string, status: WorkflowStatusInternal): void;
flushWorkflowStatusBuffer(): Promise<Array<string>>;
flushWorkflowStatusBuffer(): Promise<void>;
recordWorkflowError(workflowUUID: string, status: WorkflowStatusInternal): Promise<void>;

getPendingWorkflows(executorID: string): Promise<Array<string>>;
setWorkflowInputs<T extends any[]>(workflowUUID: string, args: T) : Promise<void>;
bufferWorkflowInputs<T extends any[]>(workflowUUID: string, args: T) : void;
getWorkflowInputs<T extends any[]>(workflowUUID: string): Promise<T | null>;
flushWorkflowInputsBuffer(): Promise<void>;

checkOperationOutput<R>(workflowUUID: string, functionID: number): Promise<DBOSNull | R>;
recordOperationOutput<R>(workflowUUID: string, functionID: number, output: R): Promise<void>;
Expand Down Expand Up @@ -64,6 +65,8 @@ export class PostgresSystemDatabase implements SystemDatabase {
readonly workflowEventsMap: Record<string, () => void> = {};

readonly workflowStatusBuffer: Map<string, WorkflowStatusInternal> = new Map();
readonly workflowInputsBuffer: Map<string, any[]> = new Map();
readonly flushBatchSize = 100;

constructor(readonly pgPoolConfig: PoolConfig, readonly systemDatabaseName: string, readonly logger: Logger) {
const poolConfig = { ...pgPoolConfig };
Expand Down Expand Up @@ -129,21 +132,38 @@ export class PostgresSystemDatabase implements SystemDatabase {
/**
* Flush the workflow output buffer to the database.
*/
async flushWorkflowStatusBuffer() {
async flushWorkflowStatusBuffer(): Promise<void> {
const localBuffer = new Map(this.workflowStatusBuffer);
this.workflowStatusBuffer.clear();
const totalSize = localBuffer.size;
try {
const client: PoolClient = await this.pool.connect();
await client.query("BEGIN");
for (const [workflowUUID, status] of localBuffer) {
await client.query(
`INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_status (workflow_uuid, status, name, authenticated_user, assumed_role, authenticated_roles, request, output, executor_id) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (workflow_uuid)
DO UPDATE SET status=EXCLUDED.status, output=EXCLUDED.output;`,
[workflowUUID, status.status, status.name, status.authenticatedUser, status.assumedRole, JSON.stringify(status.authenticatedRoles), JSON.stringify(status.request), JSON.stringify(status.output), status.executorID]
);
let finishedCnt = 0;
while (finishedCnt < totalSize) {
let sqlStmt = `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_status (workflow_uuid, status, name, authenticated_user, assumed_role, authenticated_roles, request, output, executor_id) VALUES `;
let paramCnt = 1;
const values: any[] = [];
const batchUUIDs: string[] = [];
for (const [workflowUUID, status] of localBuffer) {
if (paramCnt > 1) {
sqlStmt += ", ";
}
sqlStmt += `($${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++}, $${paramCnt++})`;
values.push(workflowUUID, status.status, status.name, status.authenticatedUser, status.assumedRole, JSON.stringify(status.authenticatedRoles), JSON.stringify(status.request), JSON.stringify(status.output), status.executorID);
batchUUIDs.push(workflowUUID);
finishedCnt++;

if (batchUUIDs.length >= this.flushBatchSize) {
// Cap at the batch size.
break;
}
}
sqlStmt += " ON CONFLICT (workflow_uuid) DO UPDATE SET status=EXCLUDED.status, output=EXCLUDED.output;";

await this.pool.query(sqlStmt, values);

// Clean up after each batch succeeds
batchUUIDs.forEach((value) => { localBuffer.delete(value); });
}
await client.query("COMMIT");
client.release();
} catch (error) {
(error as Error).message = `Error flushing workflow buffer: ${(error as Error).message}`;
this.logger.error(error);
Expand All @@ -154,7 +174,7 @@ export class PostgresSystemDatabase implements SystemDatabase {
}
}
}
return Array.from(localBuffer.keys());
return;
}

async recordWorkflowError(workflowUUID: string, status: WorkflowStatusInternal): Promise<void> {
Expand All @@ -173,11 +193,53 @@ export class PostgresSystemDatabase implements SystemDatabase {
return rows.map(i => i.workflow_uuid);
}

async setWorkflowInputs<T extends any[]>(workflowUUID: string, args: T): Promise<void> {
await this.pool.query<workflow_inputs>(
`INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_inputs (workflow_uuid, inputs) VALUES($1, $2) ON CONFLICT (workflow_uuid) DO NOTHING`,
[workflowUUID, JSON.stringify(args)]
)
bufferWorkflowInputs<T extends any[]>(workflowUUID: string, args: T): void {
this.workflowInputsBuffer.set(workflowUUID, args);
}

async flushWorkflowInputsBuffer(): Promise<void> {
const localBuffer = new Map(this.workflowInputsBuffer);
this.workflowInputsBuffer.clear();
const totalSize = localBuffer.size;
try {
let finishedCnt = 0;
while (finishedCnt < totalSize) {
let sqlStmt = `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_inputs (workflow_uuid, inputs) VALUES `;
let paramCnt = 1;
const values: any[] = [];
const batchUUIDs: string[] = [];
for (const [workflowUUID, args] of localBuffer) {
if (paramCnt > 1) {
sqlStmt += ", ";
}
sqlStmt += `($${paramCnt++}, $${paramCnt++})`;
values.push(workflowUUID, JSON.stringify(args));
batchUUIDs.push(workflowUUID);
finishedCnt++;

if (batchUUIDs.length >= this.flushBatchSize) {
// Cap at the batch size.
break;
}
}
sqlStmt += " ON CONFLICT (workflow_uuid) DO NOTHING;";

await this.pool.query(sqlStmt, values);

// Clean up after each batch succeeds
batchUUIDs.forEach((value) => { localBuffer.delete(value); });
}
} catch (error) {
(error as Error).message = `Error flushing workflow inputs buffer: ${(error as Error).message}`;
this.logger.error(error);
// If there is a failure in flushing the buffer, return items to the global buffer for retrying later.
for (const [workflowUUID, args] of localBuffer) {
if (!this.workflowInputsBuffer.has(workflowUUID)) {
this.workflowInputsBuffer.set(workflowUUID, args)
}
}
}
return;
}

async getWorkflowInputs<T extends any[]>(workflowUUID: string): Promise<T | null> {
Expand Down
2 changes: 1 addition & 1 deletion src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export interface PgTransactionId {
txid: string;
}

interface BufferedResult {
export interface BufferedResult {
output: unknown;
txn_snapshot: string;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/concurrency.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe("concurrency-tests", () => {
await ConcurrTestClass.promise2;

const dbosExec = (testRuntime as TestingRuntimeImpl).getDBOSExec();
await dbosExec.flushWorkflowStatusBuffer();
await dbosExec.flushWorkflowBuffers();
ConcurrTestClass.resolve();
await handle.getResult();

Expand Down
2 changes: 1 addition & 1 deletion tests/dbos.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ describe("dbos-tests", () => {

// Flush workflow output buffer so the retrieved handle can proceed and the status would transition to SUCCESS.
const dbosExec = (testRuntime as TestingRuntimeImpl).getDBOSExec();
await dbosExec.flushWorkflowStatusBuffer();
await dbosExec.flushWorkflowBuffers();
const retrievedHandle = testRuntime.retrieveWorkflow<string>(workflowUUID);
expect(retrievedHandle).not.toBeNull();
expect(retrievedHandle.getWorkflowUUID()).toBe(workflowUUID);
Expand Down
2 changes: 1 addition & 1 deletion tests/foundationdb/foundationdb.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ describe("foundationdb-dbos", () => {
await expect(invokedHandle.then((x) => x.getResult())).resolves.toBe(3);

const dbosExec = (testRuntime as TestingRuntimeImpl).getDBOSExec();
await dbosExec.flushWorkflowStatusBuffer();
await dbosExec.flushWorkflowBuffers();
await expect(retrievedHandle.getResult()).resolves.toBe(3);
await expect(retrievedHandle.getStatus()).resolves.toMatchObject({
status: StatusString.SUCCESS,
Expand Down
Loading
Loading