diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..25c8fdb --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +node_modules +package-lock.json \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..c0db88c --- /dev/null +++ b/README.md @@ -0,0 +1,45 @@ +
+ +
+ +Service that acts as a central component to monitor BullMQ: + +- Exposes a BullMQ dashboard which is per default behind a login page +- Acts as a [Prometheus-Exporter](https://prometheus.io/docs/instrumenting/exporters/) to collect metrics about queues in BullMQ + +## Implementation + +The following section will provide a brief overview of the libraries and practices used in the implementation of this service. + + +### BullMQ Dashboard + +Implemented by using [@bull-board](https://github.com/felixmosh/bull-board) and secured using [passport](https://www.passportjs.org/). + + +### Prometheus Exporter + +Strongly influenced by [bull_exporter](https://github.com/UpHabit/bull_exporter). Which uses the old bull library. + +Implemented by using the [bullmq](https://docs.bullmq.io/) library (specifically the [QueueEvents](https://docs.bullmq.io/guide/events) class) and [prom-client](https://github.com/siimon/prom-client). + +For each queue a class extending the QueueEvents class is created. This class listens for the following events: `completed`. Whenever an eventListener is triggered, a [histogram](https://prometheus.io/docs/concepts/metric_types/#histogram) is updated with + +1. the duration between the start of the processing and the end of the job +2. the duration between the creation of the job and the end of its processing. + +Furthermore, a cron job is executed every n seconds which collects the current status of the queues (`completed`, `active`, `delayed`, `failed`, `waiting` jobs) and writes them to a [gauge](https://prometheus.io/docs/concepts/metric_types/#gauge). + +Thus, the following metrics are collected: + +| Metric | type | description | +|---------------------------|-----------|-------------| +| bullmq_processed_duration | histogram | Processing time for completed jobs | +| bullmq_completed_duration | histogram | Completion time for jobs | +| bullmq_completed | gauge | Total number of completed jobs | +| bullmq_active | gauge | Total number of active jobs (currently being processed) | +| bullmq_delayed | gauge | Total number of jobs that will run in the future | +| bullmq_failed | gauge | Total number of failed jobs | +| bullmq_waiting | gauge | Total number of jobs waiting to be processed | + +Each metric also has the attribute `queue` which indicated which queue the metric is associated with. \ No newline at end of file diff --git a/configs/config-local.json b/configs/config-local.json new file mode 100644 index 0000000..2a43632 --- /dev/null +++ b/configs/config-local.json @@ -0,0 +1,22 @@ +{ + "redis": { + "host": "localhost:49153/", + "username": "default", + "password": "redispw", + "ssl": false + }, + "cookieSecret": "myCookieSecret123!", + "cookieMaxAge": "1h", + "users": [ + { + "username": "admin", + "password": "password", + "role": "admin" + }, + { + "username": "user", + "password": "password", + "role": "user" + } + ] +} \ No newline at end of file diff --git a/docker/Dockerfile b/docker/Dockerfile new file mode 100644 index 0000000..24e23c4 --- /dev/null +++ b/docker/Dockerfile @@ -0,0 +1,59 @@ +ARG VERSION +ARG COMMIT +ARG NPM_REGISTRY +ARG BASE_REPO +ARG NODE_VERSION +ARG ALPINE_VERSION + +# STAGE 1: Build app +FROM $BASE_REPO/node:$NODE_VERSION-alpine$ALPINE_VERSION as builder + +ENV VERSION=${VERSION} +ENV COMMIT=${COMMIT} + +ARG NPM_REGISTRY +ARG UID=510 +ARG GID=510 + +RUN apk update && \ + apk upgrade + +RUN mkdir -p /usr/src/app/node_modules && \ + addgroup -g ${GID} app && \ + adduser -h /usr/src/app -G app -u ${UID} -D app && \ + chown -R ${UID}:${GID} /usr/src/app + +WORKDIR /usr/src/app +COPY --chown=${UID}:${GID} package*.json .npmrc ./ +USER app +COPY --chown=${UID}:${GID} . . +RUN npm install --registry=${NPM_REGISTRY} +RUN npm run build + +# STAGE 2: Run app +FROM $BASE_REPO/node:$NODE_VERSION-alpine$ALPINE_VERSION + +ENV VERSION=${VERSION} +ENV COMMIT=${COMMIT} + +ARG NPM_REGISTRY +ARG UID=510 +ARG GID=510 + +RUN apk update && \ + apk upgrade + +RUN mkdir -p /usr/src/app/node_modules && \ + addgroup -g ${GID} app && \ + adduser -h /usr/src/app -G app -u ${UID} -D app && \ + chown -R ${UID}:${GID} /usr/src/app + +WORKDIR /usr/src/app +COPY --chown=${UID}:${GID} package*.json .npmrc ./ +USER app +RUN npm ci --only=production --registry=${NPM_REGISTRY} +COPY --chown=${UID}:${GID} --from=builder /usr/src/app/dist ./dist + +# EXPOSE 8080 +CMD [ "node", "dist/server.js" ] + diff --git a/media/bullmq-monitor.png b/media/bullmq-monitor.png new file mode 100644 index 0000000..9b91589 Binary files /dev/null and b/media/bullmq-monitor.png differ diff --git a/package.json b/package.json new file mode 100644 index 0000000..ee805c1 --- /dev/null +++ b/package.json @@ -0,0 +1,58 @@ +{ + "name": "bullmq-prometheus", + "version": "1.0.0", + "description": "Service that can be used to monitor BullMQ by providing Prometheus metrics and a Bullmq dashboard secured behind a login wall.", + "main": "src/server.ts", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 0", + "lint": "eslint . --ext ts", + "prettier": "prettier -w src/**/*.ts", + "build": "tsc", + "run": "node dist/server.js", + "ts-node": "ts-node src/server.ts", + "nodemon": "nodemon", + "prepare": "node -e \"if (process.env.NODE_ENV !== 'production'){process.exit(1)} \" || husky install" + }, + "keywords": [], + "author": "", + "license": "ISC", + "dependencies": { + "@bull-board/api": "^4.2.2", + "@bull-board/express": "^4.2.2", + "bullmq": "^1.87.1", + "connect-ensure-login": "^0.1.1", + "ejs": "^3.1.8", + "express": "^4.18.1", + "express-session": "^1.17.3", + "ioredis": "^5.2.2", + "parse-duration": "^1.0.2", + "passport": "^0.6.0", + "passport-local": "^1.0.0", + "pino": "^8.5.0", + "pino-http": "^8.2.0", + "prom-client": "^14.0.1" + }, + "devDependencies": { + "@types/connect-ensure-login": "^0.1.7", + "@types/ejs": "^3.1.1", + "@types/express": "^4.17.13", + "@types/express-session": "^1.17.5", + "@types/passport": "^1.0.9", + "@types/passport-local": "^1.0.34", + "@typescript-eslint/eslint-plugin": "^5.33.0", + "@typescript-eslint/parser": "^5.33.0", + "eslint": "^8.21.0", + "husky": "^8.0.1", + "nodemon": "^2.0.19", + "prettier": "^2.7.1", + "ts-node": "^10.9.1", + "typescript": "^4.7.4" + }, + "nodemonConfig": { + "watch": [ + "src" + ], + "ext": "ts", + "exec": "npx ts-node src/server.ts" + } +} diff --git a/src/app.ts b/src/app.ts new file mode 100644 index 0000000..4a4b587 --- /dev/null +++ b/src/app.ts @@ -0,0 +1,71 @@ +import express from 'express'; +import Redis from 'ioredis'; +import config from './config'; +import { ConfigureRoutes as ConfigureDashboardRoutes, User } from './controllers/dashboard'; +import { ConfigureRoutes as ConfigureMetricsRoute } from './controllers/metrics'; +import logger from './logger'; +import { PrometheusMetricsCollector } from './monitor/promMetricsCollector'; +import { formatConnectionString, handleFutureShutdown } from './utils'; + +const log = logger.child({ pkg: "app" }) +export const app = express(); +app.disable('x-powered-by'); + +const pino = require('pino-http')() +app.use(pino) + +app.get('/health', async (req, res) => { + res.status(200).send('OK'); +}); + +const username = config.redis.username +const password = config.redis.password +const host = config.redis.host + +if (username === undefined || password === undefined || host === undefined) { + process.exit(125); +} + +const enableSsl = config.redis.ssl +const prefix = process.env.NODE_ENV?.toLowerCase() || 'local'; +const cookieSecret = config.cookieSecret +const cookieMaxAge = config.cookieMaxAge +const defaultUsers: Array = [ + { username: 'admin', password: 'secret', role: 'admin' }, + { username: 'user', password: 'secret', role: 'user' }, +]; + +const users = config.users || defaultUsers + +const redisConnString = formatConnectionString(host, username, password, enableSsl); + +export const metricsCollector = new PrometheusMetricsCollector('monitor', { + bullmqOpts: { + prefix: prefix, + }, + client: new Redis(redisConnString, { maxRetriesPerRequest: null }), + queues: [], +}); + +handleFutureShutdown(metricsCollector); + +const dashboardRouter = express.Router(); +app.use('/bullmq', dashboardRouter); + +metricsCollector + .discoverAllQueues() + .then((queues) => { + log.info(`Discovered ${queues.length} queues`); + ConfigureDashboardRoutes(dashboardRouter, { + basePath: '/bullmq', + queues: metricsCollector.monitoredQueues.map((q) => q.queue), + cookieSecret: cookieSecret, + cookieMaxAge: cookieMaxAge, + users: users, + }); + ConfigureMetricsRoute(app, metricsCollector); + }) + .catch((err) => { + console.error(err); + process.exit(125); + }); diff --git a/src/config.ts b/src/config.ts new file mode 100644 index 0000000..b421149 --- /dev/null +++ b/src/config.ts @@ -0,0 +1,19 @@ +import { readFileSync } from 'fs' + +export interface Config { + redis: { + host: string, + username: string, + password: string, + ssl: boolean + }, + cookieSecret: string, + cookieMaxAge: string, + users?: Array +} + +const prefix = process.env.NODE_ENV?.toLowerCase() || 'local'; +const jsonRaw = readFileSync(`./configs/config-${prefix}.json`) +const config = JSON.parse(jsonRaw.toString()) + +export default config as Config; \ No newline at end of file diff --git a/src/controllers/dashboard.ts b/src/controllers/dashboard.ts new file mode 100644 index 0000000..cf7a7fe --- /dev/null +++ b/src/controllers/dashboard.ts @@ -0,0 +1,115 @@ +import { Router } from 'express'; +import express from 'express'; +import session from 'express-session'; +import { ExpressAdapter } from '@bull-board/express'; +import { createBullBoard } from '@bull-board/api'; +import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; +import passport from 'passport'; +import { Strategy as LocalStrategy } from 'passport-local'; +import { ensureLoggedIn } from 'connect-ensure-login'; +import { Queue } from 'bullmq'; +import { renderLoginPage } from './views/login'; +import logger from '../logger'; +import parse from 'parse-duration'; + +export interface User { + username: string; + password: string; + role: string; +} + +export interface DashboardOptions { + basePath: string; + users: User[]; + cookieSecret: string; + cookieMaxAge: string; + queues: Array; +} + +const log = logger.child({ pkg: 'dashboard' }); +let users: Map = new Map(); + +passport.use( + new LocalStrategy(function (username, password, cb) { + const user = users.get(username); + if (user !== undefined && user.password === password) { + return cb(null, { user: user.username, role: user.role }); + } + return cb(null, false); + }), +); + +passport.serializeUser((user, cb) => { + cb(null, user); +}); + +passport.deserializeUser((user, cb) => { + cb(null, user as any); +}); + +const ensureRole = (options: any): express.RequestHandler => { + return (req, res, next) => { + const user = (req.session as any)?.passport.user; + if (user.role == options.role) return next(); + res.redirect(options.failureRedirect); + }; +}; + +export function ConfigureRoutes(app: Router, opts: DashboardOptions) { + const basePath = opts.basePath; + const cookieSecret = opts.cookieSecret; + const queues = opts.queues; + const cookieMaxAge = parse(opts.cookieMaxAge); + users = new Map(opts.users.map((u) => [u.username, u])); + + log.info(`Setting up routes for dashboard with basePath ${basePath == '' ? '/' : basePath}`); + const failedLoginRedirect = basePath + '/ui/login?invalid=true'; + const requireLoginRedirect = basePath + '/ui/login'; + + app.use(passport.initialize()); + app.use( + session({ + secret: cookieSecret, + saveUninitialized: true, + resave: true, + cookie: { maxAge: cookieMaxAge }, + }), + ); + app.use(express.urlencoded({ extended: false })); + app.use(passport.session()); + + app.get(`/ui/login`, (req, res) => { + res.send(renderLoginPage(req.query.invalid === 'true', requireLoginRedirect)); + }); + + app.post(`/ui/login`, passport.authenticate('local', { failureRedirect: failedLoginRedirect }), (req, res) => { + const user = (req.session as any)?.passport.user; + if (user.role == 'admin') return res.redirect(`${basePath}/ui/admin`); + return res.redirect(`${basePath}/ui`); + }); + + // readOnly bull board + const readOnlyAdapter = new ExpressAdapter(); + readOnlyAdapter.setBasePath(`${basePath}/ui`); + createBullBoard({ + queues: queues.map((q) => new BullMQAdapter(q, { readOnlyMode: true })), + serverAdapter: readOnlyAdapter, + }); + + app.use(`/ui`, ensureLoggedIn({ redirectTo: requireLoginRedirect }), readOnlyAdapter.getRouter()); + + // admin bull board + const adminAdapter = new ExpressAdapter(); + adminAdapter.setBasePath(`${basePath}/ui/admin`); + createBullBoard({ + queues: queues.map((q) => new BullMQAdapter(q)), + serverAdapter: adminAdapter, + }); + + app.use( + `/ui/admin`, + ensureLoggedIn({ redirectTo: requireLoginRedirect }), + ensureRole({ role: 'admin', failureRedirect: `${basePath}/ui` }), + adminAdapter.getRouter(), + ); +} diff --git a/src/controllers/metrics.ts b/src/controllers/metrics.ts new file mode 100644 index 0000000..800d105 --- /dev/null +++ b/src/controllers/metrics.ts @@ -0,0 +1,22 @@ +import { Router } from 'express'; + +export interface IFMetricsCollector { + collectSerialized: () => Promise; + discoverAllQueues: () => Promise; +} + +export function ConfigureRoutes(app: Router, metricsCollector: IFMetricsCollector) { + app.get(`/prometheus/metrics`, async (req, res) => { + const metrics = await metricsCollector.collectSerialized(); + res.header('content-type', 'text/plain'); + res.header('Pragma', 'no-cache'); + res.header('Cache-Control', 'no-store, no-cache, must-revalidate, proxy-revalidate'); + res.header('Content-Type-Options', 'nosniff'); + res.status(200).send(metrics); + }); + + app.post(`/discoverQueues`, async (req, res) => { + const queues = await metricsCollector.discoverAllQueues(); + res.status(200).json(queues); + }); +} diff --git a/src/controllers/views/login.ts b/src/controllers/views/login.ts new file mode 100644 index 0000000..2ebc3cf --- /dev/null +++ b/src/controllers/views/login.ts @@ -0,0 +1,92 @@ +import ejs from 'ejs'; + +const loginPage = ` + + + +`; + +export function renderLoginPage(invalid: boolean, loginPath: string) { + return ejs.render(loginPage, { invalid: invalid, loginPath: loginPath }); +} diff --git a/src/logger.ts b/src/logger.ts new file mode 100644 index 0000000..9a08339 --- /dev/null +++ b/src/logger.ts @@ -0,0 +1,4 @@ +import { Logger } from "pino" + +const logger = require('pino')() +export default logger as Logger \ No newline at end of file diff --git a/src/monitor/promMetricsCollector.ts b/src/monitor/promMetricsCollector.ts new file mode 100644 index 0000000..ecb877a --- /dev/null +++ b/src/monitor/promMetricsCollector.ts @@ -0,0 +1,150 @@ +import * as bullmq from 'bullmq'; +import Redis from 'ioredis'; +import * as prom_client from 'prom-client'; +import logger from '../logger'; +import { PrometheusMonitoredQueue } from './promQueue'; + +const log = logger.child({ pkg: 'metricsCollector' }); + +export interface MetricsCollectorOptions { + bullmqOpts: bullmq.QueueBaseOptions; + client: Redis; + queues?: Array; +} + +export interface PrometheusMetrics { + completedGauge: prom_client.Gauge; + activeGauge: prom_client.Gauge; + delayedGauge: prom_client.Gauge; + failedGauge: prom_client.Gauge; + waitingGauge: prom_client.Gauge; + completedDuration: prom_client.Histogram; + processedDuration: prom_client.Histogram; +} + +export class PrometheusMetricsCollector { + registry: prom_client.Registry; + monitoredQueues: Array = []; + + name: string; + bullmqOpts: bullmq.QueueBaseOptions; + defaultRedisClient: Redis; + + metrics: PrometheusMetrics | undefined; + + constructor(name: string, opts: MetricsCollectorOptions) { + this.registry = new prom_client.Registry(); + this.name = name; + this.bullmqOpts = opts.bullmqOpts ?? { connection: { maxRetriesPerRequest: null } }; + this.defaultRedisClient = opts.client; + this.registerMetrics(this.registry); + + if (opts.queues) { + this.registerQueues(opts.queues); + } else { + this.discoverAllQueues() + .then((queues) => { + log.info(`Discovered ${queues.length} queues`); + }) + .catch((err) => { + log.error(`Failed to discover queues: ${err}`); + process.exit(125); + }); + } + } + + registerMetrics(reg: prom_client.Registry, prefix = '') { + this.metrics = { + completedGauge: new prom_client.Gauge({ + name: `${prefix}bullmq_completed`, + help: 'Total number of completed jobs', + labelNames: ['queue'], + }), + activeGauge: new prom_client.Gauge({ + name: `${prefix}bullmq_active`, + help: 'Total number of active jobs (currently being processed)', + labelNames: ['queue'], + }), + failedGauge: new prom_client.Gauge({ + name: `${prefix}bullmq_failed`, + help: 'Total number of failed jobs', + labelNames: ['queue'], + }), + delayedGauge: new prom_client.Gauge({ + name: `${prefix}bullmq_delayed`, + help: 'Total number of jobs that will run in the future', + labelNames: ['queue'], + }), + waitingGauge: new prom_client.Gauge({ + name: `${prefix}bullmq_waiting`, + help: 'Total number of jobs waiting to be processed', + labelNames: ['queue'], + }), + processedDuration: new prom_client.Histogram({ + name: `${prefix}bullmq_processed_duration`, + help: 'Processing time for completed jobs (processing until completed)', + buckets: [5, 50, 100, 250, 500, 750, 1000, 2500], + labelNames: ['queue'], + }), + completedDuration: new prom_client.Histogram({ + name: `${prefix}bullmq_completed_duration`, + help: 'Completion time for jobs (created until completed)', + buckets: [5, 50, 100, 250, 500, 750, 1000, 2500, 5000, 10000], + labelNames: ['queue'], + }), + }; + + Object.values(this.metrics).forEach((metric) => reg.registerMetric(metric)); + } + + async discoverAllQueues() { + const keyPattern = new RegExp(`^${this.bullmqOpts.prefix}:([^:]+):(id|failed|active|waiting|stalled-check)$`); + const keyStream = await this.defaultRedisClient.scanStream({ + match: `${this.bullmqOpts.prefix}:*:*`, + }); + + const queues = new Set(); + for await (const keyChunk of keyStream) { + for (const key of keyChunk) { + const match = keyPattern.exec(key); + if (match && match[1]) { + queues.add(match[1]); + } + } + } + this.registerQueues(Array.from(queues)); + return Array.from(queues); + } + + registerQueues(queues: Array) { + this.monitoredQueues = queues.map( + (queueName) => + new PrometheusMonitoredQueue(queueName, this.metrics!, { + bullmqOpts: { + ...this.bullmqOpts, + connection: this.defaultRedisClient, + }, + name: queueName, + }), + ); + } + + async collect() { + return await this.registry.metrics(); + } + + async collectSerialized() { + return await this.collect(); + } + + async close() { + log.debug('Closing metrics collector'); + try { + const val = await this.defaultRedisClient.quit(); + log.debug('Successfully quit redis connection', 'response', val); + } catch (e: unknown) { + log.warn('Failed to quit redis connection', 'error', e); + } + return this.monitoredQueues.forEach(async (q) => await q.close()); + } +} diff --git a/src/monitor/promQueue.ts b/src/monitor/promQueue.ts new file mode 100644 index 0000000..8b1b8ff --- /dev/null +++ b/src/monitor/promQueue.ts @@ -0,0 +1,65 @@ +import * as prom_client from 'prom-client'; +import * as bullmq from 'bullmq'; +import { PrometheusMetrics } from './promMetricsCollector'; + +interface MonitoredQueueOptions { + bullmqOpts: bullmq.QueueBaseOptions; + name: string; + metricsPrefix?: string; +} + +async function sleep(ms: number) { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +/** + * @see https://github.com/taskforcesh/bullmq/blob/master/docs/gitbook/api/bullmq.queueeventslistener.md + */ +export class PrometheusMonitoredQueue extends bullmq.QueueEvents { + metrics: PrometheusMetrics; + queue: bullmq.Queue; + canceled = false; + + constructor(name: string, metrics: PrometheusMetrics, opts: MonitoredQueueOptions) { + super(name, opts.bullmqOpts); + this.queue = new bullmq.Queue(name, opts.bullmqOpts); + this.metrics = metrics; + this.on('completed', this.onCompleted); + this.loop(2000); + } + + async onCompleted(completedJob: { jobId: string }) { + const job = await this.queue.getJob(completedJob.jobId); + if (!job) { + return; + } + + const completedDuration = job.finishedOn! - job.timestamp!; // both cannot be null + const processedDuration = job.finishedOn! - job.processedOn!; // both cannot be null + this.metrics.completedDuration.labels({ queue: this.name }).observe(completedDuration); + this.metrics.processedDuration.labels({ queue: this.name }).observe(processedDuration); + } + + async loop(ms = 5000) { + while (this.canceled === false) { + await this.updateGauges(); + await sleep(ms); + } + console.log('Stopped updating gauges for ' + this.name); + } + + async updateGauges() { + const { completed, active, delayed, failed, waiting } = await this.queue.getJobCounts(); + this.metrics.activeGauge.labels({ queue: this.name }).set(active); + this.metrics.completedGauge.labels({ queue: this.name }).set(completed); + this.metrics.delayedGauge.labels({ queue: this.name }).set(delayed); + this.metrics.failedGauge.labels({ queue: this.name }).set(failed); + this.metrics.waitingGauge.labels({ queue: this.name }).set(waiting); + } + + async close() { + this.canceled = true; + await super.close(); + await this.queue.close(); + } +} diff --git a/src/server.ts b/src/server.ts new file mode 100644 index 0000000..0b8c798 --- /dev/null +++ b/src/server.ts @@ -0,0 +1,35 @@ +import fs from 'fs'; +import http from 'http'; +import https from 'https'; +import { app } from './app'; +import logger from './logger'; +import { handleShutdown } from './utils'; + +const log = logger.child({ pkg: "server" }) +const httpsPort = process.env.HTTPS_PORT || 8443; +const httpPort = process.env.HTTP_PORT || 8080; + +// comment this in if you want to change the loglevel programmatically +// instead of env variable. +// log.changeGlobalLevel('WARN'); + +if (["local", undefined].includes(process.env.NODE_ENV!)) { + // local environment serves http + const httpServer = http.createServer(app); + handleShutdown(httpServer); + httpServer.listen(httpPort, () => log.info(`Service listening on http://0.0.0.0:${httpPort}`)); + +} else if (process.env.NODE_ENV !== 'test') { + // live environment only serves https + const credentials = { + key: fs.readFileSync('/etc/tls/tls.key', 'utf8'), + cert: fs.readFileSync('/etc/tls/tls.crt', 'utf8'), + }; + const httpsServer = https.createServer(credentials, app); + handleShutdown(httpsServer); + httpsServer.listen(httpsPort, () => log.info(`Service listening on https://0.0.0.0:${httpsPort}`)); + +} else { + // ignoring in testing to prevent port occupation + log.info(`Running in test. Not listening...`); +} diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 0000000..85b94ab --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,61 @@ +interface Closeable { + close(cb?: (err?: Error) => void): void; +} + +export function handleShutdown(closable: Closeable) { + const exitCB = (err?: Error) => { + if (err) console.error(`Exit failed with ${err}`); + }; + process.on('SIGINT', () => { + closable.close(exitCB); + }); + process.on('SIGQUIT', () => { + closable.close(exitCB); + }); + process.on('SIGTERM', () => { + closable.close(exitCB); + }); +} + +interface FutureCloseable { + close(cb?: (err?: Error) => void): Promise; +} + +export function handleFutureShutdown(closable: FutureCloseable) { + const exitCB = (err?: Error) => { + if (err) console.error(`Exit failed with ${err}`); + }; + + process.on('SIGINT', async () => { + console.log('received SIGINT'); + try { + await closable.close(exitCB); + } catch (e: unknown) { + console.log(e); + } + console.log('closed'); + }); + process.on('SIGQUIT', async () => { + console.log('received SIGQUIT'); + try { + await closable.close(exitCB); + } catch (e: unknown) { + console.log(e); + } + console.log('closed'); + }); + process.on('SIGTERM', async () => { + console.log('received SIGTERM'); + try { + await closable.close(exitCB); + } catch (e: unknown) { + console.log(e); + } + console.log('closed'); + }); +} + +export function formatConnectionString(url: string, username?: string, password?: string, ssl = true): string { + const accessData = username && password ? `${username}:${password}@` : username ? `${username}@` : ''; + return `${ssl ? 'rediss' : 'redis'}://${accessData}${url}`; +} diff --git a/tsconfig.json b/tsconfig.json new file mode 100644 index 0000000..6b9c9ce --- /dev/null +++ b/tsconfig.json @@ -0,0 +1,22 @@ +{ + "include": ["src/**/*.ts"], + "compilerOptions": { + /* Visit https://aka.ms/tsconfig.json to read more about this file */ + + /* Basic Options */ + "resolveJsonModule": true, + "target": "es2020" /* Specify ECMAScript target version: 'ES3' (default), 'ES5', 'ES2015', 'ES2016', 'ES2017', 'ES2018', 'ES2019', 'ES2020', or 'ESNEXT'. */, + "module": "commonjs" /* Specify module code generation: 'none', 'commonjs', 'amd', 'system', 'umd', 'es2015', 'es2020', or 'ESNext'. */, + "declaration": true /* Generates corresponding '.d.ts' file. */, + "outDir": "dist" /* Redirect output structure to the directory. */, + "rootDir": "src" /* Specify the root directory of input files. Use to control the output directory structure with --outDir. */, + "strict": true /* Enable all strict type-checking options. */, + "esModuleInterop": true /* Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'. */, + "experimentalDecorators": true /* Enables experimental support for ES7 decorators. */, + "emitDecoratorMetadata": true /* Enables experimental support for emitting type metadata for decorators. */, + + /* Advanced Options */ + "skipLibCheck": true /* Skip type checking of declaration files. */, + "forceConsistentCasingInFileNames": true /* Disallow inconsistently-cased references to the same file. */ + } +}