forked from GEOLYTIX/xyz
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
113 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,47 +1,138 @@ | ||
/** | ||
@module /utils/dbs | ||
@description | ||
## /utils/dbs | ||
Database connection and query management module that creates connection pools for multiple databases based on environment variables prefixed with 'DBS_'. | ||
The [node-postgres]{@link https://www.npmjs.com/package/pg} package is required to create a [new connection Pool]{@link https://node-postgres.com/apis/pool} for DBS connections. | ||
@requires pg | ||
@requires /utils/logger | ||
*/ | ||
|
||
const { Pool } = require('pg'); | ||
|
||
const logger = require('./logger'); | ||
|
||
const RETRY_LIMIT = process.env.RETRY_LIMIT ?? 3; | ||
|
||
const INITIAL_RETRY_DELAY = 1000; | ||
|
||
const dbs = {}; | ||
|
||
// Initialize database pools and create query functions | ||
Object.keys(process.env) | ||
|
||
// Filter keys which start with DBS | ||
.filter(key => key.startsWith('DBS_')) | ||
|
||
.forEach(key => { | ||
|
||
const id = key.split('_')[1] | ||
|
||
/** | ||
@type {Pool} @private | ||
*/ | ||
const pool = new Pool({ | ||
dbs: id, | ||
connectionString: process.env[key], | ||
keepAlive: true | ||
keepAlive: true, | ||
connectionTimeoutMillis: 5000, // 5 seconds | ||
idleTimeoutMillis: 30000, // 30 seconds | ||
max: 20 // Maximum number of clients in the pool | ||
}); | ||
|
||
dbs[key.split('_')[1]] = async (query, variables, timeout) => { | ||
// Handle pool errors | ||
pool.on('error', (err, client) => { | ||
logger({ | ||
err, | ||
message: 'Unexpected error on idle client', | ||
pool: id | ||
}); | ||
}); | ||
|
||
// Assigning clientQuery method to dbs property. | ||
dbs[id] = async (query, variables, timeout) => | ||
await clientQuery(pool, query, variables, timeout) | ||
}); | ||
|
||
try { | ||
// Export dbs constant | ||
module.exports = dbs; | ||
|
||
const client = await pool.connect() | ||
/** | ||
@function clientQuery | ||
@async | ||
@description | ||
The clientQuery method creates a client connection from the provided Pool and executes a query on this pool. | ||
@param {Pool} pool The node-postgres connection Pool for a Client connection. | ||
@param {string} query SQL query to execute | ||
@param {Array} [variables] Parameters for the SQL query | ||
@param {number} [timeout] Statement timeout in milliseconds | ||
@returns {Promise<Array|Error>} Query results or error object | ||
@throws {Error} Database connection or query errors | ||
*/ | ||
async function clientQuery(pool, query, variables, timeout) { | ||
|
||
if (timeout || process.env.STATEMENT_TIMEOUT) { | ||
await client.query(`SET statement_timeout = ${parseInt(timeout) || parseInt(process.env.STATEMENT_TIMEOUT)}`) | ||
} | ||
let retryCount = 0; | ||
let lastError; | ||
let client; | ||
|
||
const { rows } = await client.query(query, variables) | ||
while (retryCount < RETRY_LIMIT) { | ||
|
||
client.release() | ||
try { | ||
client = await pool.connect(); | ||
|
||
return rows | ||
timeout ??= process.env.STATEMENT_TIMEOUT | ||
|
||
} catch (err) { | ||
// Set statement timeout if specified | ||
if (timeout) { | ||
|
||
logger({ err, query, variables }) | ||
return err; | ||
await client.query(`SET statement_timeout = ${parseInt(timeout)}`); | ||
} | ||
|
||
const { rows } = await client.query(query, variables); | ||
|
||
return rows; | ||
|
||
} catch (err) { | ||
|
||
// Log the error with retry information | ||
logger({ | ||
err, | ||
query, | ||
variables, | ||
retry: retryCount + 1, | ||
pool: pool.options.dbs | ||
}); | ||
|
||
retryCount++; | ||
|
||
if (retryCount < RETRY_LIMIT) { | ||
// Exponential backoff | ||
const delay = INITIAL_RETRY_DELAY * Math.pow(2, retryCount - 1); | ||
await sleep(delay); | ||
} | ||
|
||
lastError = err | ||
|
||
} finally { | ||
if (client) { | ||
client.release(true); // Force release in case of errors | ||
} | ||
} | ||
}) | ||
} | ||
|
||
// If we've exhausted all retries, return the last error | ||
return lastError; | ||
}; | ||
|
||
module.exports = dbs | ||
/** | ||
@function sleep | ||
@description | ||
Helper function to pause execution | ||
@param {number} ms Time to sleep in milliseconds | ||
@returns {Promise<void>} | ||
*/ | ||
function sleep(ms) { | ||
return new Promise(resolve => setTimeout(resolve, ms)); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters