From edf5d8058b01ca149cf276943c5fe458d6629b20 Mon Sep 17 00:00:00 2001 From: Harminder virk Date: Sat, 3 Aug 2019 19:33:10 +0530 Subject: [PATCH] feat: add connection manager to manage database connections --- adonis-typings/database.ts | 50 +++++++-- package.json | 1 + src/Connection/index.ts | 19 ++-- src/ConnectionManager/index.ts | 188 ++++++++++++++++++++++++++++++++ test/connection-manager.spec.ts | 99 +++++++++++++++++ test/connection.spec.ts | 42 ++++--- 6 files changed, 358 insertions(+), 41 deletions(-) create mode 100644 src/ConnectionManager/index.ts create mode 100644 test/connection-manager.spec.ts diff --git a/adonis-typings/database.ts b/adonis-typings/database.ts index 984ace30..b9e4e819 100644 --- a/adonis-typings/database.ts +++ b/adonis-typings/database.ts @@ -12,6 +12,7 @@ declare module '@ioc:Adonis/Addons/Database' { import * as knex from 'knex' import { Pool } from 'tarn' + import { EventEmitter } from 'events' import { DatabaseQueryBuilderContract } from '@ioc:Adonis/Addons/DatabaseQueryBuilder' @@ -190,32 +191,57 @@ declare module '@ioc:Adonis/Addons/Database' { */ export type DatabaseConfigContract = { connection: string } & { [key: string]: ConnectionConfigContract } + /** + * The shape of a connection within the connection manager + */ + type ConnectionManagerConnectionNode = { + name: string, + config: ConnectionConfigContract, + connection?: ConnectionContract, + state: 'idle' | 'open' | 'closed', + } + /** * Connection manager to manage one or more database * connections. */ - export interface ConnectionManagerContract { - connections: Map - add (connectionName: string, config: ConnectionConfigContract): void, - connect (connectionName: string): void, - get (connectionName: string): ConnectionContract, - has (connectionName: string): boolean, - close (connectionName: string): Promise, - closeAll (): Promise, + export interface ConnectionManagerContract extends EventEmitter { + connections: Map + + on (event: 'connect', callback: (connection: ConnectionContract) => void) + on (event: 'disconnect', callback: (connection: ConnectionContract) => void) + + add (connectionName: string, config: ConnectionConfigContract): void + connect (connectionName: string): void + get (connectionName: string): ConnectionManagerConnectionNode | undefined + has (connectionName: string): boolean + isConnected (connectionName: string): boolean + + close (connectionName: string, release?: boolean): Promise + closeAll (release?: boolean): Promise + release (connectionName: string): Promise } /** * Connection represents a single knex instance with inbuilt * pooling capabilities. */ - export interface ConnectionContract { + export interface ConnectionContract extends EventEmitter { client?: knex, pool: null | Pool, name: string, config: ConnectionConfigContract, - readonly EVENTS: ['open', 'close', 'close:error'], - open (): void, - close (): void, + + /** + * List of emitted events + */ + on (event: 'connect', callback: (connection: ConnectionContract) => void) + on (event: 'error', callback: (connection: ConnectionContract, error: Error) => void) + on (event: 'disconnect', callback: (connection: ConnectionContract) => void) + on (event: 'disconnect:error', callback: (connection: ConnectionContract, error: Error) => void) + + connect (): void, + disconnect (): Promise, } export interface DatabaseContract { diff --git a/package.json b/package.json index 07656ee1..5860bbce 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,7 @@ }, "homepage": "https://github.com/adonisjs/adonis-lucid#readme", "dependencies": { + "@poppinss/utils": "^1.0.4", "knex": "^0.19.1", "ts-essentials": "^3.0.0", "utility-types": "^3.7.0" diff --git a/src/Connection/index.ts b/src/Connection/index.ts index 26639728..b54f3ce5 100644 --- a/src/Connection/index.ts +++ b/src/Connection/index.ts @@ -26,11 +26,6 @@ export class Connection extends EventEmitter implements ConnectionContract { */ public client?: knex - /** - * List of events emitted by this class - */ - public readonly EVENTS: ['open', 'close', 'close:error'] - constructor (public name: string, public config: ConnectionConfigContract) { super() } @@ -46,7 +41,7 @@ export class Connection extends EventEmitter implements ConnectionContract { * when `min` resources inside the pool are set to `0`. */ if (this.pool!.numFree() === 0 && this.pool!.numUsed() === 0) { - this.close() + this.disconnect() } }) @@ -56,7 +51,7 @@ export class Connection extends EventEmitter implements ConnectionContract { */ this.pool!.on('poolDestroySuccess', () => { this.client = undefined - this.emit('close') + this.emit('disconnect', this) this.removeAllListeners() }) } @@ -71,13 +66,13 @@ export class Connection extends EventEmitter implements ConnectionContract { /** * Opens the connection by creating knex instance */ - public open () { + public connect () { try { this.client = knex(this.config) this._monitorPoolResources() - this.emit('open') + this.emit('connect', this) } catch (error) { - this.emit('error', error) + this.emit('error', error, this) throw error } } @@ -89,12 +84,12 @@ export class Connection extends EventEmitter implements ConnectionContract { * In case of error this method will emit `close:error` event followed * by the `close` event. */ - public async close (): Promise { + public async disconnect (): Promise { if (this.client) { try { await this.client!.destroy() } catch (error) { - this.emit('close:error', error) + this.emit('disconnect:error', error, this) } } } diff --git a/src/ConnectionManager/index.ts b/src/ConnectionManager/index.ts new file mode 100644 index 00000000..8afb788a --- /dev/null +++ b/src/ConnectionManager/index.ts @@ -0,0 +1,188 @@ +/* +* @adonisjs/lucid +* +* (c) Harminder Virk +* +* For the full copyright and license information, please view the LICENSE +* file that was distributed with this source code. +*/ + +/// + +import { EventEmitter } from 'events' +import { Exception } from '@poppinss/utils' + +import { + ConnectionConfigContract, + ConnectionContract, + ConnectionManagerContract, +} from '@ioc:Adonis/Addons/Database' + +import { Connection } from '../Connection' + +/** + * Connection class manages a given database connection. Internally it uses + * knex to build the database connection with appropriate database + * driver. + */ +export class ConnectionManager extends EventEmitter implements ConnectionManagerContract { + public connections: ConnectionManagerContract['connections'] = new Map() + + /** + * Monitors a given connection by listening for lifecycle events + */ + private _monitorConnection (connection: ConnectionContract) { + /** + * Listens for disconnect to set the connection state and cleanup + * memory + */ + connection.on('disconnect', ($connection) => { + const internalConnection = this.get($connection.name) + + /** + * This will be false, when connection was released at the + * time of closing + */ + if (!internalConnection) { + return + } + + this.emit('disconnect', $connection) + delete internalConnection.connection + internalConnection.state = 'closed' + }) + + /** + * Listens for connect to set the connection state to open + */ + connection.on('connect', ($connection) => { + const internalConnection = this.get($connection.name) + if (!internalConnection) { + return + } + + this.emit('connect', $connection) + internalConnection.state = 'open' + }) + } + + /** + * Add a named connection with it's configuration. Make sure to call `connect` + * before using the connection to make database queries. + */ + public add (connectionName: string, config: ConnectionConfigContract): void { + /** + * Raise an exception when someone is trying to re-add the same connection. We + * should not silently avoid this scanerio, since there is a valid use case + * in which the config has been changed and someone wants to re-add the + * connection with new config. In that case, they must + * + * 1. Close and release the old connection + * 2. Then add the new connection + */ + if (this.isConnected(connectionName)) { + throw new Exception( + `Attempt to add duplicate connection ${connectionName} failed`, + 500, + 'E_DUPLICATE_DB_CONNECTION', + ) + } + + this.connections.set(connectionName, { + name: connectionName, + config: config, + state: 'idle', + }) + } + + /** + * Connect to the database using config for a given named connection + */ + public connect (connectionName: string): void { + const connection = this.connections.get(connectionName) + if (!connection) { + throw new Exception( + `Cannot connect to unregisted connection ${connectionName}`, + 500, + 'E_MISSING_DB_CONNECTION_CONFIG', + ) + } + + /** + * Do not do anything when `connection` property already exists, since it will + * always be set to `undefined` for a closed connection + */ + if (connection.connection) { + return + } + + /** + * Create a new connection and monitor it's state + */ + connection.connection = new Connection(connection.name, connection.config) + this._monitorConnection(connection.connection) + connection.connection.connect() + } + + /** + * Returns the connection node for a given named connection + */ + public get (connectionName: string) { + return this.connections.get(connectionName) + } + + /** + * Returns a boolean telling if we have connection details for + * a given named connection. This method doesn't tell if + * connection is connected or not. + */ + public has (connectionName: string) { + return this.connections.has(connectionName) + } + + /** + * Returns a boolean telling if connection has been established + * with the database or not + */ + public isConnected (connectionName: string) { + if (!this.has(connectionName)) { + return false + } + + const connection = this.get(connectionName)! + return (!!connection.connection && connection.state === 'open') + } + + /** + * Closes a given connection and can optionally release it from the + * tracking list + */ + public async close (connectioName: string, release: boolean = false) { + if (this.isConnected(connectioName)) { + await this.get(connectioName)!.connection!.disconnect() + } + + if (release) { + await this.release(connectioName) + } + } + + /** + * Close all tracked connections + */ + public async closeAll (release: boolean = false) { + await Promise.all(Array.from(this.connections.keys()).map((name) => this.close(name, release))) + } + + /** + * Release a connection. This will disconnect the connection + * and will delete it from internal list + */ + public async release (connectionName: string) { + if (this.isConnected(connectionName)) { + await this.close(connectionName, true) + } else { + this.connections.delete(connectionName) + } + } +} diff --git a/test/connection-manager.spec.ts b/test/connection-manager.spec.ts new file mode 100644 index 00000000..24bd03e2 --- /dev/null +++ b/test/connection-manager.spec.ts @@ -0,0 +1,99 @@ +/* +* @adonisjs/lucid +* +* (c) Harminder Virk +* +* For the full copyright and license information, please view the LICENSE +* file that was distributed with this source code. +*/ + +/// + +import * as test from 'japa' + +import { getConfig, setup, cleanup } from '../test-helpers' +import { ConnectionManager } from '../src/ConnectionManager' + +test.group('ConnectionManager', (group) => { + group.before(async () => { + await setup() + }) + + group.after(async () => { + await cleanup() + }) + + test('do not connect until connect is called', (assert) => { + const manager = new ConnectionManager() + manager.add('primary', getConfig()) + + assert.isTrue(manager.has('primary')) + assert.isFalse(manager.isConnected('primary')) + }) + + test('connect and set its state to open', async (assert) => { + const manager = new ConnectionManager() + manager.add('primary', getConfig()) + manager.connect('primary') + + assert.equal(manager.get('primary')!.state, 'open') + assert.isTrue(manager.isConnected('primary')) + }) + + test('on disconnect set state to closed', async (assert) => { + const manager = new ConnectionManager() + manager.add('primary', getConfig()) + manager.connect('primary') + + await manager.connections.get('primary')!.connection!.disconnect() + assert.equal(manager.get('primary')!.state, 'closed') + assert.isFalse(manager.isConnected('primary')) + }) + + test('raise exception when attempt to add a duplication connection', async (assert) => { + const manager = new ConnectionManager() + manager.add('primary', getConfig()) + manager.connect('primary') + + const fn = () => manager.add('primary', getConfig()) + assert.throw(fn, 'E_DUPLICATE_DB_CONNECTION: Attempt to add duplicate connection primary failed') + }) + + test('patch config when connection is not in open state', async (assert) => { + const manager = new ConnectionManager() + manager.add('primary', getConfig()) + manager.connect('primary') + + await manager.close('primary') + + const fn = () => manager.add('primary', getConfig()) + assert.doesNotThrow(fn) + }) + + test('ignore multiple calls to `connect` on a single connection', async () => { + const manager = new ConnectionManager() + manager.add('primary', getConfig()) + manager.connect('primary') + + manager.on('connect', () => { + throw new Error('Never expected to be called') + }) + + manager.connect('primary') + }) + + test('releasing a connection must close it first', async (assert) => { + assert.plan(2) + + const manager = new ConnectionManager() + manager.add('primary', getConfig()) + manager.connect('primary') + + manager.on('disconnect', (connection) => { + assert.equal(connection.name, 'primary') + }) + + await manager.release('primary') + assert.isFalse(manager.has('primary')) + }) +}) diff --git a/test/connection.spec.ts b/test/connection.spec.ts index 76d91079..a3059219 100644 --- a/test/connection.spec.ts +++ b/test/connection.spec.ts @@ -12,30 +12,38 @@ import * as test from 'japa' import { MysqlConfigContract } from '@ioc:Adonis/Addons/Database' -import { getConfig } from '../test-helpers' +import { getConfig, setup, cleanup } from '../test-helpers' import { Connection } from '../src/Connection' -test.group('Connection', () => { - test('do not instantiate knex unless open is called', (assert) => { +test.group('Connection', (group) => { + group.before(async () => { + await setup() + }) + + group.after(async () => { + await cleanup() + }) + + test('do not instantiate knex unless connect is called', (assert) => { const connection = new Connection('primary', getConfig()) assert.isUndefined(connection.client) }) - test('instantiate knex when open is invoked', async (assert, done) => { + test('instantiate knex when connect is invoked', async (assert, done) => { const connection = new Connection('primary', getConfig()) - connection.on('open', () => { + connection.on('connect', () => { assert.isDefined(connection.client) assert.equal(connection.pool!.numUsed(), 0) done() }) - connection.open() + connection.connect() }) - test('on close destroy knex', async (assert) => { + test('on disconnect destroy knex', async (assert) => { const connection = new Connection('primary', getConfig()) - connection.open() - await connection.close() + connection.connect() + await connection.disconnect() assert.isUndefined(connection.client) }) @@ -47,31 +55,31 @@ test.group('Connection', () => { }, })) - connection.open() + connection.connect() await connection.client!.raw('select 1+1 as result') - connection.on('close', () => { + connection.on('disconnect', () => { assert.isUndefined(connection.client) done() }) }) - test('on close emit close event', async (assert, done) => { + test('on disconnect emit disconnect event', async (assert, done) => { const connection = new Connection('primary', getConfig()) - connection.open() + connection.connect() - connection.on('close', () => { + connection.on('disconnect', () => { assert.isUndefined(connection.client) done() }) - await connection.close() + await connection.disconnect() }) test('raise error when unable to make connection', (assert) => { const connection = new Connection('primary', Object.assign({}, getConfig(), { client: null })) - const fn = () => connection.open() + const fn = () => connection.connect() assert.throw(fn, /knex: Required configuration option/) }) }) @@ -84,7 +92,7 @@ if (process.env.DB === 'mysql') { config.connection.typeCast = false const connection = new Connection('primary', config) - connection.open() + connection.connect() assert.equal(connection.client!['_context'].client.constructor.name, 'Client_MySQL') assert.equal(connection.client!['_context'].client.config.connection.charset, 'utf-8')