diff --git a/src/system_database.ts b/src/system_database.ts index 9a8849db..c4dad0e8 100644 --- a/src/system_database.ts +++ b/src/system_database.ts @@ -559,14 +559,13 @@ export class PostgresSystemDatabase implements SystemDatabase { const client: PoolClient = await this.pool.connect(); await client.query("BEGIN ISOLATION LEVEL READ COMMITTED"); - const { rows } = await client.query(`SELECT output FROM ${DBOSExecutor.systemDBSchemaName}.operation_outputs WHERE workflow_uuid=$1 AND function_id=$2`, [workflowUUID, functionID]); - if (rows.length > 0) { - await client.query("ROLLBACK"); - client.release(); - return; - } - try { + const { rows } = await client.query(`SELECT output FROM ${DBOSExecutor.systemDBSchemaName}.operation_outputs WHERE workflow_uuid=$1 AND function_id=$2`, [workflowUUID, functionID]); + if (rows.length > 0) { + await client.query("ROLLBACK"); + client.release(); + return; + } await client.query( `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.notifications (destination_uuid, topic, message) VALUES ($1, $2, $3);`, [destinationUUID, topic, DBOSJSON.stringify(message)] @@ -576,7 +575,7 @@ export class PostgresSystemDatabase implements SystemDatabase { client.release(); const err: DatabaseError = error as DatabaseError; if (err.code === "23503") { - // Foreign key constraint violation + // Foreign key constraint violation (only expected for the INSERT query) throw new DBOSNonExistentWorkflowError(`Sent to non-existent destination workflow UUID: ${destinationUUID}`); } else { throw err; @@ -584,8 +583,11 @@ export class PostgresSystemDatabase implements SystemDatabase { } await this.recordNotificationOutput(client, workflowUUID, functionID, undefined); - await client.query("COMMIT"); - client.release(); + try { + await client.query("COMMIT"); + } finally { + client.release(); + } } async recv( @@ -631,56 +633,78 @@ export class PostgresSystemDatabase implements SystemDatabase { } // Transactionally consume and return the message if it's in the DB, otherwise return null. + let message: T | null = null; const client = await this.pool.connect(); - await client.query(`BEGIN ISOLATION LEVEL READ COMMITTED`); - const finalRecvRows = (await client.query( - `WITH oldest_entry AS ( + try { + await client.query(`BEGIN ISOLATION LEVEL READ COMMITTED`); + const finalRecvRows = (await client.query( + `WITH oldest_entry AS ( SELECT destination_uuid, topic, message, created_at_epoch_ms FROM ${DBOSExecutor.systemDBSchemaName}.notifications WHERE destination_uuid = $1 AND topic = $2 ORDER BY created_at_epoch_ms ASC LIMIT 1 - ) - - DELETE FROM ${DBOSExecutor.systemDBSchemaName}.notifications - USING oldest_entry - WHERE notifications.destination_uuid = oldest_entry.destination_uuid - AND notifications.topic = oldest_entry.topic - AND notifications.created_at_epoch_ms = oldest_entry.created_at_epoch_ms - RETURNING notifications.*;`, - [workflowUUID, topic])).rows; - let message: T | null = null; - if (finalRecvRows.length > 0) { - message = DBOSJSON.parse(finalRecvRows[0].message) as T; + ) + + DELETE FROM ${DBOSExecutor.systemDBSchemaName}.notifications + USING oldest_entry + WHERE notifications.destination_uuid = oldest_entry.destination_uuid + AND notifications.topic = oldest_entry.topic + AND notifications.created_at_epoch_ms = oldest_entry.created_at_epoch_ms + RETURNING notifications.*;`, + [workflowUUID, topic])).rows; + if (finalRecvRows.length > 0) { + message = DBOSJSON.parse(finalRecvRows[0].message) as T; + } + } catch (e) { + this.logger.error(e); + await client.query(`ROLLBACK`); + client.release(); + throw e; } + await this.recordNotificationOutput(client, workflowUUID, functionID, message); - await client.query(`COMMIT`); - client.release(); + try { + await client.query(`COMMIT`); + } finally { + client.release(); + } return message; } async setEvent(workflowUUID: string, functionID: number, key: string, message: T): Promise { const client: PoolClient = await this.pool.connect(); - await client.query("BEGIN ISOLATION LEVEL READ COMMITTED"); - let { rows } = await client.query(`SELECT output FROM ${DBOSExecutor.systemDBSchemaName}.operation_outputs WHERE workflow_uuid=$1 AND function_id=$2`, [workflowUUID, functionID]); - if (rows.length > 0) { - await client.query("ROLLBACK"); + try { + await client.query("BEGIN ISOLATION LEVEL READ COMMITTED"); + let { rows } = await client.query(`SELECT output FROM ${DBOSExecutor.systemDBSchemaName}.operation_outputs WHERE workflow_uuid=$1 AND function_id=$2`, [workflowUUID, functionID]); + if (rows.length > 0) { + await client.query("ROLLBACK"); + client.release(); + return; + } + ({ rows } = await client.query( + `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_events (workflow_uuid, key, value) + VALUES ($1, $2, $3) + ON CONFLICT (workflow_uuid, key) + DO UPDATE SET value = $3 + RETURNING workflow_uuid;`, + [workflowUUID, key, DBOSJSON.stringify(message)] + )); + } catch (e) { + this.logger.error(e); + await client.query(`ROLLBACK`); client.release(); - return; + throw e; } - ({ rows } = await client.query( - `INSERT INTO ${DBOSExecutor.systemDBSchemaName}.workflow_events (workflow_uuid, key, value) - VALUES ($1, $2, $3) - ON CONFLICT (workflow_uuid, key) - DO UPDATE SET value = $3 - RETURNING workflow_uuid;`, - [workflowUUID, key, DBOSJSON.stringify(message)] - )); + await this.recordNotificationOutput(client, workflowUUID, functionID, undefined); - await client.query("COMMIT"); - client.release(); + try { + await client.query("COMMIT"); + } finally { + client.release(); + } } async getEvent(