-
-
Notifications
You must be signed in to change notification settings - Fork 65
/
client.ts
148 lines (137 loc) · 3.67 KB
/
client.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import { Connection, ConnectionState, ExecuteResult } from "./connection.ts";
import { ConnectionPool, PoolConnection } from "./pool.ts";
import { log } from "./logger.ts";
/**
* Client Config
*/
export interface ClientConfig {
/** Database hostname */
hostname?: string;
/** Database UNIX domain socket path. When used, `hostname` and `port` are ignored. */
socketPath?: string;
/** Database username */
username?: string;
/** Database password */
password?: string;
/** Database port */
port?: number;
/** Database name */
db?: string;
/** Whether to display packet debugging information */
debug?: boolean;
/** Connection read timeout (default: 30 seconds) */
timeout?: number;
/** Connection pool size (default: 1) */
poolSize?: number;
/** Connection pool idle timeout in microseconds (default: 4 hours) */
idleTimeout?: number;
/** charset */
charset?: string;
}
/** Transaction processor */
export interface TransactionProcessor<T> {
(connection: Connection): Promise<T>;
}
/**
* MySQL client
*/
export class Client {
config: ClientConfig = {};
private _pool?: ConnectionPool;
private async createConnection(): Promise<PoolConnection> {
let connection = new PoolConnection(this.config);
await connection.connect();
return connection;
}
/** get pool info */
get pool() {
return this._pool?.info;
}
/**
* connect to database
* @param config config for client
* @returns Clinet instance
*/
async connect(config: ClientConfig): Promise<Client> {
this.config = {
hostname: "127.0.0.1",
username: "root",
port: 3306,
poolSize: 1,
timeout: 30 * 1000,
idleTimeout: 4 * 3600 * 1000,
...config,
};
Object.freeze(this.config);
this._pool = new ConnectionPool(
this.config.poolSize || 10,
this.createConnection.bind(this),
);
return this;
}
/**
* execute query sql
* @param sql query sql string
* @param params query params
*/
async query(sql: string, params?: any[]): Promise<any> {
return await this.useConnection(async (connection) => {
return await connection.query(sql, params);
});
}
/**
* execute sql
* @param sql sql string
* @param params query params
*/
async execute(sql: string, params?: any[]): Promise<ExecuteResult> {
return await this.useConnection(async (connection) => {
return await connection.execute(sql, params);
});
}
async useConnection<T>(fn: (conn: Connection) => Promise<T>) {
if (!this._pool) {
throw new Error("Unconnected");
}
const connection = await this._pool.pop();
try {
return await fn(connection);
} finally {
if (connection.state == ConnectionState.CLOSED) {
connection.removeFromPool();
} else {
connection.returnToPool();
}
}
}
/**
* Execute a transaction process, and the transaction successfully
* returns the return value of the transaction process
* @param processor transation processor
*/
async transaction<T = any>(processor: TransactionProcessor<T>): Promise<T> {
return await this.useConnection(async (connection) => {
try {
await connection.execute("BEGIN");
const result = await processor(connection);
await connection.execute("COMMIT");
return result;
} catch (error) {
if (connection.state == ConnectionState.CONNECTED) {
log.info(`ROLLBACK: ${error.message}`);
await connection.execute("ROLLBACK");
}
throw error;
}
});
}
/**
* close connection
*/
async close() {
if (this._pool) {
this._pool.close();
this._pool = undefined;
}
}
}