Skip to content

Commit

Permalink
hanle errors
Browse files Browse the repository at this point in the history
  • Loading branch information
maxdml committed Feb 1, 2025
1 parent d241a90 commit 84f2b81
Showing 1 changed file with 66 additions and 42 deletions.
108 changes: 66 additions & 42 deletions src/system_database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -559,14 +559,13 @@ export class PostgresSystemDatabase implements SystemDatabase {
const client: PoolClient = await this.pool.connect();

await client.query("BEGIN ISOLATION LEVEL READ COMMITTED");
const { rows } = await client.query<operation_outputs>(`SELECT output FROM ${DBOSExecutor.systemDBSchemaName}.operation_outputs WHERE workflow_uuid=$1 AND function_id=$2`, [workflowUUID, functionID]);
if (rows.length > 0) {
await client.query("ROLLBACK");
client.release();
return;
}

try {
const { rows } = await client.query<operation_outputs>(`SELECT output FROM ${DBOSExecutor.systemDBSchemaName}.operation_outputs WHERE workflow_uuid=$1 AND function_id=$2`, [workflowUUID, functionID]);
if (rows.length > 0) {
await client.query("ROLLBACK");
client.release();
return;
}
await client.query(
`INSERT INTO ${DBOSExecutor.systemDBSchemaName}.notifications (destination_uuid, topic, message) VALUES ($1, $2, $3);`,
[destinationUUID, topic, DBOSJSON.stringify(message)]
Expand All @@ -576,16 +575,19 @@ export class PostgresSystemDatabase implements SystemDatabase {
client.release();
const err: DatabaseError = error as DatabaseError;
if (err.code === "23503") {
// Foreign key constraint violation
// Foreign key constraint violation (only expected for the INSERT query)
throw new DBOSNonExistentWorkflowError(`Sent to non-existent destination workflow UUID: ${destinationUUID}`);
} else {
throw err;
}
}

await this.recordNotificationOutput(client, workflowUUID, functionID, undefined);
await client.query("COMMIT");
client.release();
try {
await client.query("COMMIT");
} finally {
client.release();
}
}

async recv<T>(
Expand Down Expand Up @@ -631,56 +633,78 @@ export class PostgresSystemDatabase implements SystemDatabase {
}

// Transactionally consume and return the message if it's in the DB, otherwise return null.
let message: T | null = null;
const client = await this.pool.connect();
await client.query(`BEGIN ISOLATION LEVEL READ COMMITTED`);
const finalRecvRows = (await client.query<notifications>(
`WITH oldest_entry AS (
try {
await client.query(`BEGIN ISOLATION LEVEL READ COMMITTED`);
const finalRecvRows = (await client.query<notifications>(
`WITH oldest_entry AS (
SELECT destination_uuid, topic, message, created_at_epoch_ms
FROM ${DBOSExecutor.systemDBSchemaName}.notifications
WHERE destination_uuid = $1
AND topic = $2
ORDER BY created_at_epoch_ms ASC
LIMIT 1
)
DELETE FROM ${DBOSExecutor.systemDBSchemaName}.notifications
USING oldest_entry
WHERE notifications.destination_uuid = oldest_entry.destination_uuid
AND notifications.topic = oldest_entry.topic
AND notifications.created_at_epoch_ms = oldest_entry.created_at_epoch_ms
RETURNING notifications.*;`,
[workflowUUID, topic])).rows;
let message: T | null = null;
if (finalRecvRows.length > 0) {
message = DBOSJSON.parse(finalRecvRows[0].message) as T;
)
DELETE FROM ${DBOSExecutor.systemDBSchemaName}.notifications
USING oldest_entry
WHERE notifications.destination_uuid = oldest_entry.destination_uuid
AND notifications.topic = oldest_entry.topic
AND notifications.created_at_epoch_ms = oldest_entry.created_at_epoch_ms
RETURNING notifications.*;`,
[workflowUUID, topic])).rows;
if (finalRecvRows.length > 0) {
message = DBOSJSON.parse(finalRecvRows[0].message) as T;
}
} catch (e) {
this.logger.error(e);
await client.query(`ROLLBACK`);
client.release();
throw e;
}

await this.recordNotificationOutput(client, workflowUUID, functionID, message);
await client.query(`COMMIT`);
client.release();
try {
await client.query(`COMMIT`);
} finally {
client.release();
}
return message;
}

async setEvent<T>(workflowUUID: string, functionID: number, key: string, message: T): Promise<void> {
const client: PoolClient = await this.pool.connect();

await client.query("BEGIN ISOLATION LEVEL READ COMMITTED");
let { rows } = await client.query<operation_outputs>(`SELECT output FROM ${DBOSExecutor.systemDBSchemaName}.operation_outputs WHERE workflow_uuid=$1 AND function_id=$2`, [workflowUUID, functionID]);
if (rows.length > 0) {
await client.query("ROLLBACK");
try {
await client.query("BEGIN ISOLATION LEVEL READ COMMITTED");
let { rows } = await client.query<operation_outputs>(`SELECT output FROM ${DBOSExecutor.systemDBSchemaName}.operation_outputs WHERE workflow_uuid=$1 AND function_id=$2`, [workflowUUID, functionID]);
if (rows.length > 0) {
await client.query("ROLLBACK");
client.release();
return;
}
({ rows } = await client.query(
`INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_events (workflow_uuid, key, value)
VALUES ($1, $2, $3)
ON CONFLICT (workflow_uuid, key)
DO UPDATE SET value = $3
RETURNING workflow_uuid;`,
[workflowUUID, key, DBOSJSON.stringify(message)]
));
} catch (e) {
this.logger.error(e);
await client.query(`ROLLBACK`);
client.release();
return;
throw e;
}
({ rows } = await client.query(
`INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_events (workflow_uuid, key, value)
VALUES ($1, $2, $3)
ON CONFLICT (workflow_uuid, key)
DO UPDATE SET value = $3
RETURNING workflow_uuid;`,
[workflowUUID, key, DBOSJSON.stringify(message)]
));

await this.recordNotificationOutput(client, workflowUUID, functionID, undefined);
await client.query("COMMIT");
client.release();
try {
await client.query("COMMIT");
} finally {
client.release();
}
}

async getEvent<T>(
Expand Down

0 comments on commit 84f2b81

Please sign in to comment.