From b62724e8cba3c37476b82afbd1f1592a5abf1229 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Halvard=20M=C3=B8rstad?= Date: Sat, 22 Jun 2024 16:55:59 +0200 Subject: [PATCH] Implement SQLx interface (#168) --- .github/workflows/ci.yml | 107 ++- .github/workflows/publish-to-nest.land.yml | 23 - .github/workflows/publish.yml | 29 + .github/workflows/wait-for-mysql.sh | 11 - .gitignore | 3 +- cipher | Bin 390 -> 0 bytes compose.yml | 86 +++ deno.json | 35 + deps.ts | 10 - egg.json | 10 - .../caching_sha2_password.test.ts | 106 +++ lib/auth_plugins/caching_sha2_password.ts | 84 +++ lib/auth_plugins/mod.ts | 12 + lib/client.test.ts | 50 ++ lib/client.ts | 64 ++ lib/connection.test.ts | 460 +++++++++++++ lib/connection.ts | 623 ++++++++++++++++++ {src => lib}/constant/capabilities.ts | 7 +- {src => lib}/constant/charset.ts | 3 + lib/constant/mysql_types.ts | 35 + {src => lib}/constant/packet.ts | 5 +- {src => lib}/constant/server_status.ts | 4 +- {src => lib}/packets/builders/auth.ts | 14 +- .../packets/builders/client_capabilities.ts | 2 +- lib/packets/builders/query.ts | 16 + {src => lib}/packets/builders/tls.ts | 2 +- lib/packets/packet.ts | 155 +++++ {src => lib}/packets/parsers/authswitch.ts | 2 +- {src => lib}/packets/parsers/err.ts | 8 +- {src => lib}/packets/parsers/handshake.ts | 16 +- lib/packets/parsers/result.ts | 159 +++++ lib/pool.test.ts | 50 ++ lib/pool.ts | 159 +++++ lib/sqlx.ts | 377 +++++++++++ {src => lib/utils}/buffer.ts | 29 +- src/util.ts => lib/utils/bytes.ts | 0 .../crypt.ts => lib/utils/crypto.ts | 8 +- lib/utils/encoding.ts | 16 + lib/utils/errors.ts | 50 ++ lib/utils/events.ts | 68 ++ lib/utils/hash.ts | 53 ++ lib/utils/logger.ts | 12 + lib/utils/meta.ts | 4 + lib/utils/query.ts | 110 ++++ lib/utils/testing.ts | 87 +++ mod.ts | 18 +- package.json | 23 - src/auth.ts | 43 -- src/auth_plugin/caching_sha2_password.ts | 78 --- src/auth_plugin/index.ts | 4 - src/client.ts | 165 ----- src/connection.ts | 388 ----------- src/constant/errors.ts | 29 - src/constant/mysql_types.ts | 60 -- src/deferred.ts | 73 -- src/logger.ts | 51 -- src/packets/builders/query.ts | 11 - src/packets/packet.ts | 106 --- src/packets/parsers/result.ts | 125 ---- src/pool.ts | 128 ---- test.deps.ts | 6 - test.ts | 390 ----------- test.util.ts | 98 --- 63 files changed, 3005 insertions(+), 1955 deletions(-) delete mode 100644 .github/workflows/publish-to-nest.land.yml create mode 100644 .github/workflows/publish.yml delete mode 100755 .github/workflows/wait-for-mysql.sh delete mode 100644 cipher create mode 100644 compose.yml create mode 100644 deno.json delete mode 100644 deps.ts delete mode 100644 egg.json create mode 100644 lib/auth_plugins/caching_sha2_password.test.ts create mode 100644 lib/auth_plugins/caching_sha2_password.ts create mode 100644 lib/auth_plugins/mod.ts create mode 100644 lib/client.test.ts create mode 100644 lib/client.ts create mode 100644 lib/connection.test.ts create mode 100644 lib/connection.ts rename {src => lib}/constant/capabilities.ts (92%) rename {src => lib}/constant/charset.ts (99%) create mode 100644 lib/constant/mysql_types.ts rename {src => lib}/constant/packet.ts (55%) rename {src => lib}/constant/server_status.ts (93%) rename {src => lib}/packets/builders/auth.ts (85%) rename {src => lib}/packets/builders/client_capabilities.ts (92%) create mode 100644 lib/packets/builders/query.ts rename {src => lib}/packets/builders/tls.ts (91%) create mode 100644 lib/packets/packet.ts rename {src => lib}/packets/parsers/authswitch.ts (88%) rename {src => lib}/packets/parsers/err.ts (72%) rename {src => lib}/packets/parsers/handshake.ts (81%) create mode 100644 lib/packets/parsers/result.ts create mode 100644 lib/pool.test.ts create mode 100644 lib/pool.ts create mode 100644 lib/sqlx.ts rename {src => lib/utils}/buffer.ts (87%) rename src/util.ts => lib/utils/bytes.ts (100%) rename src/auth_plugin/crypt.ts => lib/utils/crypto.ts (78%) create mode 100644 lib/utils/encoding.ts create mode 100644 lib/utils/errors.ts create mode 100644 lib/utils/events.ts create mode 100644 lib/utils/hash.ts create mode 100644 lib/utils/logger.ts create mode 100644 lib/utils/meta.ts create mode 100644 lib/utils/query.ts create mode 100644 lib/utils/testing.ts delete mode 100644 package.json delete mode 100644 src/auth.ts delete mode 100644 src/auth_plugin/caching_sha2_password.ts delete mode 100644 src/auth_plugin/index.ts delete mode 100644 src/client.ts delete mode 100644 src/connection.ts delete mode 100644 src/constant/errors.ts delete mode 100644 src/constant/mysql_types.ts delete mode 100644 src/deferred.ts delete mode 100644 src/logger.ts delete mode 100644 src/packets/builders/query.ts delete mode 100644 src/packets/packet.ts delete mode 100644 src/packets/parsers/result.ts delete mode 100644 src/pool.ts delete mode 100644 test.deps.ts delete mode 100644 test.ts delete mode 100644 test.util.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3860f8a..939c8ff 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,69 +1,64 @@ name: ci -on: [push, pull_request] +on: + push: + branches: + - master + pull_request: + branches: + - master + +env: + DENO_VERSION: vx.x.x jobs: - fmt: + check: + name: Check format and lint + runs-on: ubuntu-latest + + steps: + - name: Clone repo + uses: actions/checkout@v4 + + - name: Install deno + uses: denoland/setup-deno@v1 + with: + deno-version: ${{env.DENO_VERSION}} + + - name: Check + run: deno task check + + tests: + name: Run tests runs-on: ubuntu-latest - continue-on-error: true + steps: - - uses: actions/checkout@v1 - - name: Install Deno 1.x + - name: Clone repo + uses: actions/checkout@v4 + + - name: Install deno uses: denoland/setup-deno@v1 with: - deno-version: v1.x - - name: Check fmt - run: deno fmt --check - test: + deno-version: ${{env.DENO_VERSION}} + + - name: Test + run: deno task test:ga + + publish: runs-on: ubuntu-latest - strategy: - fail-fast: false - matrix: - DENO_VERSION: - - v1.x - DB_VERSION: - - mysql:5.5 - - mysql:5.6 - - mysql:5.7 - - mysql:8 - - mysql:latest - - mariadb:5.5 - - mariadb:10.0 - - mariadb:10.1 - - mariadb:10.2 - - mariadb:10.3 - - mariadb:10.4 -# - mariadb:latest + + permissions: + contents: read + id-token: write steps: - - uses: actions/checkout@v1 - - name: Install Deno ${{ matrix.DENO_VERSION }} + - name: Checkout + uses: actions/checkout@v4 + + - name: Install deno uses: denoland/setup-deno@v1 with: - deno-version: ${{ matrix.DENO_VERSION }} - - name: Show Deno version - run: deno --version - - name: Start ${{ matrix.DB_VERSION }} - run: | - sudo mkdir -p /var/run/mysqld/tmp - sudo chmod -R 777 /var/run/mysqld - docker container run --name mysql --rm -d -p 3306:3306 \ - -v /var/run/mysqld:/var/run/mysqld \ - -v /var/run/mysqld/tmp:/tmp \ - -e MYSQL_ROOT_PASSWORD=root \ - ${{ matrix.DB_VERSION }} - ./.github/workflows/wait-for-mysql.sh - - name: Run tests (TCP) - run: | - deno test --allow-env --allow-net=127.0.0.1:3306 ./test.ts - - name: Run tests (--unstable) (UNIX domain socket) - run: | - SOCKPATH=/var/run/mysqld/mysqld.sock - if [[ "${{ matrix.DB_VERSION }}" == "mysql:5.5" ]]; then - SOCKPATH=/var/run/mysqld/tmp/mysql.sock - fi - echo "DROP USER 'root'@'localhost';" | docker exec -i mysql mysql -proot - DB_SOCKPATH=$SOCKPATH TEST_METHODS=unix \ - deno test --unstable --allow-env \ - --allow-read=/var/run/mysqld/ --allow-write=/var/run/mysqld/ \ - ./test.ts + deno-version: ${{env.DENO_VERSION}} + + - name: Publish (dry run) + run: deno publish --dry-run diff --git a/.github/workflows/publish-to-nest.land.yml b/.github/workflows/publish-to-nest.land.yml deleted file mode 100644 index 87cc581..0000000 --- a/.github/workflows/publish-to-nest.land.yml +++ /dev/null @@ -1,23 +0,0 @@ -name: "publish current release to https://nest.land" - -on: - release: - types: - - published - -jobs: - publishToNestDotLand: - runs-on: ubuntu-latest - - steps: - - name: Setup repo - uses: actions/checkout@v2 - - - name: "setup" # check: https://github.com/actions/virtual-environments/issues/1777 - uses: denolib/setup-deno@v2 - with: - deno-version: v1.4.6 - - - name: "check nest.land" - run: | - deno run --allow-net --allow-read --allow-run https://deno.land/x/cicd/publish-on-nest.land.ts ${{ secrets.GITHUB_TOKEN }} ${{ secrets.NESTAPIKEY }} ${{ github.repository }} diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml new file mode 100644 index 0000000..58837de --- /dev/null +++ b/.github/workflows/publish.yml @@ -0,0 +1,29 @@ +name: Publish + +on: + release: + types: [published] + +env: + DENO_VERSION: vx.x.x + +jobs: + publish: + runs-on: ubuntu-latest + + permissions: + contents: read + id-token: write + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Deno + uses: denoland/setup-deno@v1 + with: + deno-version: ${{env.DENO_VERSION}} + + - name: Publish + if: github.event_name == 'release' + run: deno publish diff --git a/.github/workflows/wait-for-mysql.sh b/.github/workflows/wait-for-mysql.sh deleted file mode 100755 index 13302dd..0000000 --- a/.github/workflows/wait-for-mysql.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/sh - -echo "Waiting for MySQL" -for i in `seq 1 30`; -do - echo '\q' | mysql -h 127.0.0.1 -uroot --password=root -P 3306 && exit 0 - >&2 echo "MySQL is waking up" - sleep 1 -done - -echo "Failed waiting for MySQL" && exit 1 diff --git a/.gitignore b/.gitignore index 520cb2b..17b78fb 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ mysql.log docs .DS_Store .idea - +dbtmp +tmp_test diff --git a/cipher b/cipher deleted file mode 100644 index 356b2f827f0c8ab871363d9d86f4a76a2f575f72..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 390 zcmWNMT}x8|0ESJ|C88ijOPM7DS9oCwIYq^rpho3|*fJ?t7pX+74-A|e$e7dlQTee_ zkJF4ZSGuS%zY;wj;d%d9=k~pLp67k6>@%znme02*W_;)!g$4B zX2^cjDg`oZ-70;;vGU>2IQDa_uy${2zPGZmqFXublO+>jkM$Pf@7x&r*2qkmk00Jh z6UzaLTJa22v)mEA+ImH+E&bZ+!Q{+ow3HyPHJ#p5GU!OpZfI61x7Nm*S&-SODX+4@ z64|vQq=s~K?JiR_uBYrjXeHz0k8*rcL9?%!^RxI;he}^$-T}s)b*Jl6u!dbu-m{`) z>`|2rePL1C&Nqt_E%B9vjv{S!d}i~`_4~fDYUM8v%Pf`Qjg9tUw_%giW;_4*?Idsi E2byixWB>pF diff --git a/compose.yml b/compose.yml new file mode 100644 index 0000000..0b984f8 --- /dev/null +++ b/compose.yml @@ -0,0 +1,86 @@ +services: + mysql: + image: mysql:latest + ports: + - 3313:3306 + pull_policy: always + restart: always + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: true + MYSQL_DATABASE: testdb + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "--user", "root"] + interval: 3s + timeout: 3s + retries: 10 + mysql5: + image: mysql:5 + platform: linux/amd64 + ports: + - 3311:3306 + pull_policy: always + restart: always + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: true + MYSQL_DATABASE: testdb + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "--user", "root"] + interval: 3s + timeout: 3s + retries: 10 + mysql8: + image: mysql:8 + ports: + - 3312:3306 + pull_policy: always + restart: always + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: true + MYSQL_DATABASE: testdb + healthcheck: + test: ["CMD", "mysqladmin", "ping", "-h", "localhost", "--user", "root"] + interval: 3s + timeout: 3s + retries: 10 + mariadb: + image: mariadb:latest + ports: + - 3316:3306 + pull_policy: always + restart: always + environment: + MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: true + MARIADB_DATABASE: testdb + healthcheck: + test: ["CMD", "mariadb-admin", "ping", "-h", "127.0.0.1"] + interval: 3s + timeout: 3s + retries: 10 + mariadb10: + image: mariadb:10 + ports: + - 3314:3306 + pull_policy: always + restart: always + environment: + MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: true + MARIADB_DATABASE: testdb + healthcheck: + test: ["CMD", "mariadb-admin", "ping", "-h", "127.0.0.1"] + interval: 3s + timeout: 3s + retries: 10 + mariadb11: + image: mariadb:11 + ports: + - 3315:3306 + pull_policy: always + restart: always + environment: + MARIADB_ALLOW_EMPTY_ROOT_PASSWORD: true + MARIADB_DATABASE: testdb + healthcheck: + test: ["CMD", "mariadb-admin", "ping", "-h", "127.0.0.1"] + interval: 3s + timeout: 3s + retries: 10 diff --git a/deno.json b/deno.json new file mode 100644 index 0000000..5f40acb --- /dev/null +++ b/deno.json @@ -0,0 +1,35 @@ +{ + "name": "@db/mysql", + "version": "2.12.2", + "exports": "./mod.ts", + "lock": false, + "tasks": { + "check": "deno task format:check && deno task lint:check && deno task type:check", + "lint:check": "deno lint", + "format:check": "deno fmt --check", + "type:check": "deno check mod.ts", + "doc:check": "deno doc --lint src", + "test": "deno task db:restart && deno test -A; deno task db:stop", + "test:ga": "deno task db:start && deno test -A && deno task db:stop", + "db:restart": "deno task db:stop && deno task db:start", + "db:start": "docker compose up -d --remove-orphans --wait && sleep 2", + "db:stop": "docker compose down --remove-orphans --volumes" + }, + "imports": { + "@halvardm/sqlx": "jsr:@halvardm/sqlx@0.0.0-13", + "@std/assert": "jsr:@std/assert@^0.221.0", + "@std/async": "jsr:@std/async@^0.221.0", + "@std/crypto": "jsr:@std/crypto@^0.221.0", + "@std/encoding": "jsr:@std/encoding@^0.221.0", + "@std/flags": "jsr:@std/flags@^0.221.0", + "@std/fmt": "jsr:@std/fmt@^0.221.0", + "@std/fs": "jsr:@std/fs@^0.222.1", + "@std/log": "jsr:@std/log@^0.221.0", + "@std/path": "jsr:@std/path@^0.222.1", + "@std/semver": "jsr:@std/semver@^0.220.1", + "@std/testing": "jsr:@std/testing@^0.221.0", + "@std/text": "jsr:@std/text@^0.222.1", + "@std/yaml": "jsr:@std/yaml@^0.223.0", + "@stdext/encoding": "jsr:@stdext/encoding@^0.0.2" + } +} diff --git a/deps.ts b/deps.ts deleted file mode 100644 index 93a948b..0000000 --- a/deps.ts +++ /dev/null @@ -1,10 +0,0 @@ -export type { Deferred } from "https://deno.land/std@0.104.0/async/mod.ts"; -export { deferred, delay } from "https://deno.land/std@0.104.0/async/mod.ts"; -export { format as byteFormat } from "https://deno.land/x/bytes_formater@v1.4.0/mod.ts"; -export { createHash } from "https://deno.land/std@0.104.0/hash/mod.ts"; -export { decode as base64Decode } from "https://deno.land/std@0.104.0/encoding/base64.ts"; -export type { - SupportedAlgorithm, -} from "https://deno.land/std@0.104.0/hash/mod.ts"; -export { replaceParams } from "https://deno.land/x/sql_builder@v1.9.1/util.ts"; -export * as log from "https://deno.land/std@0.104.0/log/mod.ts"; diff --git a/egg.json b/egg.json deleted file mode 100644 index e78ef5a..0000000 --- a/egg.json +++ /dev/null @@ -1,10 +0,0 @@ -{ - "name": "mysql", - "description": "MySQL driver for Deno", - "homepage": "https://github.com/manyuanrong/deno_mysql", - "files": [ - "./**/*.ts", - "README.md" - ], - "entry": "./mod.ts" -} diff --git a/lib/auth_plugins/caching_sha2_password.test.ts b/lib/auth_plugins/caching_sha2_password.test.ts new file mode 100644 index 0000000..dac791a --- /dev/null +++ b/lib/auth_plugins/caching_sha2_password.test.ts @@ -0,0 +1,106 @@ +import { assertEquals } from "@std/assert"; +import { PacketReader } from "../packets/packet.ts"; +import { + AuthPluginCachingSha2Password, + AuthStatusFlags, +} from "./caching_sha2_password.ts"; +import { ComQueryResponsePacket } from "../constant/packet.ts"; +import { BufferReader } from "../utils/buffer.ts"; + +Deno.test("AuthPluginCachingSha2Password", async (t) => { + await t.step("statusFlag FastPath", async () => { + const scramble = new Uint8Array([1, 2, 3]); + const password = "password"; + const authPlugin = new AuthPluginCachingSha2Password(scramble, password); + + assertEquals(authPlugin.scramble, scramble); + assertEquals(authPlugin.password, password); + assertEquals(authPlugin.done, false); + assertEquals(authPlugin.quickRead, false); + assertEquals(authPlugin.data, undefined); + + const bodyReader = new BufferReader( + new Uint8Array([0x00, AuthStatusFlags.FastPath]), + ); + await authPlugin.next( + new PacketReader( + { size: 2, no: 0 }, + bodyReader, + ComQueryResponsePacket.OK_Packet, + ), + ); + + assertEquals(authPlugin.done, false); + assertEquals(authPlugin.data, undefined); + assertEquals(authPlugin.quickRead, true); + + await authPlugin.next( + new PacketReader( + { size: 2, no: 0 }, + bodyReader, + ComQueryResponsePacket.OK_Packet, + ), + ); + + assertEquals(authPlugin.done, true); + }); + + await t.step("statusFlag FullAuth", async () => { + const scramble = new Uint8Array([1, 2, 3]); + const password = "password"; + const authPlugin = new AuthPluginCachingSha2Password(scramble, password); + + assertEquals(authPlugin.scramble, scramble); + assertEquals(authPlugin.password, password); + assertEquals(authPlugin.done, false); + assertEquals(authPlugin.quickRead, false); + assertEquals(authPlugin.data, undefined); + + let bodyReader = new BufferReader( + new Uint8Array([0x00, AuthStatusFlags.FullAuth]), + ); + await authPlugin.next( + new PacketReader( + { size: 2, no: 0 }, + bodyReader, + ComQueryResponsePacket.OK_Packet, + ), + ); + + assertEquals(authPlugin.done, false); + assertEquals(authPlugin.data, new Uint8Array([0x02])); + assertEquals(authPlugin.quickRead, false); + + const publicKey = `-----BEGIN PUBLIC KEY----- +MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCkFF85HndOJoTVsuYBNOu4N63s +bVMWfMVZ/ZXFVYeFE7H6Vp0jhu2d6JUUx9WCXV5JOt/mXoCirywhz2LM+f7kaBCh +0YIFh5JKS43a3COC9BJupj2dco/iWEmOFqRvCn/ErQNdmataqQlePq3SitusJwuj +PQogsoytp/nSKLsTLwIDA/+/ +-----END PUBLIC KEY-----`; + + const encodedPublicKey = new TextEncoder().encode(publicKey); + + bodyReader = new BufferReader(new Uint8Array([0x00, ...encodedPublicKey])); + await authPlugin.next( + new PacketReader( + { size: 2, no: 0 }, + bodyReader, + ComQueryResponsePacket.OK_Packet, + ), + ); + + assertEquals(authPlugin.done, false); + assertEquals(authPlugin.data?.length, 128); + assertEquals(authPlugin.quickRead, false); + + await authPlugin.next( + new PacketReader( + { size: 2, no: 0 }, + bodyReader, + ComQueryResponsePacket.OK_Packet, + ), + ); + + assertEquals(authPlugin.done, true); + }); +}); diff --git a/lib/auth_plugins/caching_sha2_password.ts b/lib/auth_plugins/caching_sha2_password.ts new file mode 100644 index 0000000..9a35c2b --- /dev/null +++ b/lib/auth_plugins/caching_sha2_password.ts @@ -0,0 +1,84 @@ +import { xor } from "../utils/bytes.ts"; +import type { PacketReader } from "../packets/packet.ts"; +import { encryptWithPublicKey } from "../utils/crypto.ts"; + +export const enum AuthStatusFlags { + FullAuth = 0x04, + FastPath = 0x03, +} + +export class AuthPluginCachingSha2Password { + readonly scramble: Uint8Array; + readonly password: string; + done: boolean = false; + quickRead: boolean = false; + data: Uint8Array | undefined = undefined; + + next: (packet: PacketReader) => Promise = this.authMoreResponse.bind( + this, + ); + + constructor(scramble: Uint8Array, password: string) { + this.scramble = scramble; + this.password = password; + } + + protected terminate() { + this.done = true; + return Promise.resolve(this); + } + + protected authMoreResponse(packet: PacketReader): Promise { + const REQUEST_PUBLIC_KEY = 0x02; + const statusFlag = packet.body.skip(1).readUint8(); + + switch (statusFlag) { + case AuthStatusFlags.FullAuth: { + this.data = new Uint8Array([REQUEST_PUBLIC_KEY]); + this.next = this.encryptWithKey.bind(this); + break; + } + case AuthStatusFlags.FastPath: { + this.quickRead = true; + this.next = this.terminate.bind(this); + break; + } + default: + this.done = true; + } + + return Promise.resolve(this); + } + + protected async encryptWithKey(packet: PacketReader): Promise { + const publicKey = this.parsePublicKey(packet); + const len = this.password.length; + const passwordBuffer: Uint8Array = new Uint8Array(len + 1); + for (let n = 0; n < len; n++) { + passwordBuffer[n] = this.password.charCodeAt(n); + } + passwordBuffer[len] = 0x00; + + const encryptedPassword = await this.encrypt( + passwordBuffer, + this.scramble, + publicKey, + ); + this.next = this.terminate.bind(this); + this.data = new Uint8Array(encryptedPassword); + return this; + } + + protected parsePublicKey(packet: PacketReader): string { + return packet.body.skip(1).readNullTerminatedString(); + } + + async encrypt( + password: Uint8Array, + scramble: Uint8Array, + key: string, + ): Promise { + const stage1 = xor(password, scramble); + return await encryptWithPublicKey(key, stage1); + } +} diff --git a/lib/auth_plugins/mod.ts b/lib/auth_plugins/mod.ts new file mode 100644 index 0000000..58a459d --- /dev/null +++ b/lib/auth_plugins/mod.ts @@ -0,0 +1,12 @@ +import { AuthPluginCachingSha2Password } from "./caching_sha2_password.ts"; + +export { AuthPluginCachingSha2Password }; + +export const AuthPluginName = { + CachingSha2Password: "caching_sha2_password", +} as const; +export type AuthPluginName = typeof AuthPluginName[keyof typeof AuthPluginName]; + +export const AuthPlugins = { + caching_sha2_password: AuthPluginCachingSha2Password, +} as const; diff --git a/lib/client.test.ts b/lib/client.test.ts new file mode 100644 index 0000000..ef1bfc6 --- /dev/null +++ b/lib/client.test.ts @@ -0,0 +1,50 @@ +import { MysqlClient } from "./client.ts"; +import { QUERIES, services } from "./utils/testing.ts"; +import { clientTest } from "@halvardm/sqlx/testing"; + +Deno.test("Client Test", async (t) => { + for (const service of services) { + await t.step(`Testing ${service.name}`, async (t) => { + await t.step(`TCP`, async (t) => { + await clientTest({ + t, + Client: MysqlClient, + connectionUrl: service.url, + connectionOptions: {}, + queries: QUERIES, + }); + }); + + // Enable once socket connection issue is fixed + // + // await t.step(`UNIX Socket`, async (t) => { + // await implementationTest({ + // t, + // Client: MysqlClient, + // // deno-lint-ignore no-explicit-any + // PoolClient: MysqlClientPool as any, + // connectionUrl: service.urlSocket, + // connectionOptions: {}, + // queries: { + // createTable: + // "CREATE TABLE IF NOT EXISTS sqlxtesttable (testcol TEXT)", + // dropTable: "DROP TABLE IF EXISTS sqlxtesttable", + // insertOneToTable: "INSERT INTO sqlxtesttable (testcol) VALUES (?)", + // insertManyToTable: + // "INSERT INTO sqlxtesttable (testcol) VALUES (?),(?),(?)", + // selectOneFromTable: + // "SELECT * FROM sqlxtesttable WHERE testcol = ? LIMIT 1", + // selectByMatchFromTable: + // "SELECT * FROM sqlxtesttable WHERE testcol = ?", + // selectManyFromTable: "SELECT * FROM sqlxtesttable", + // select1AsString: "SELECT '1' as result", + // select1Plus1AsNumber: "SELECT 1+1 as result", + // deleteByMatchFromTable: + // "DELETE FROM sqlxtesttable WHERE testcol = ?", + // deleteAllFromTable: "DELETE FROM sqlxtesttable", + // }, + // }); + // }); + }); + } +}); diff --git a/lib/client.ts b/lib/client.ts new file mode 100644 index 0000000..1a29909 --- /dev/null +++ b/lib/client.ts @@ -0,0 +1,64 @@ +import { SqlxClient } from "@halvardm/sqlx"; +import { MysqlConnection, type MysqlConnectionOptions } from "./connection.ts"; +import type { MysqlParameterType } from "./packets/parsers/result.ts"; +import { + MysqlClientCloseEvent, + MysqlClientEventTarget, +} from "./utils/events.ts"; +import { MysqlClientConnectEvent } from "../mod.ts"; +import { + type MysqlPrepared, + type MysqlQueryOptions, + type MySqlTransaction, + MysqlTransactionable, + type MysqlTransactionOptions, +} from "./sqlx.ts"; + +export interface MysqlClientOptions extends MysqlConnectionOptions { +} + +/** + * MySQL client + */ +export class MysqlClient extends MysqlTransactionable implements + SqlxClient< + MysqlClientEventTarget, + MysqlConnectionOptions, + MysqlConnection, + MysqlParameterType, + MysqlQueryOptions, + MysqlPrepared, + MysqlTransactionOptions, + MySqlTransaction + > { + eventTarget: MysqlClientEventTarget; + connectionUrl: string; + connectionOptions: MysqlConnectionOptions; + + constructor( + connectionUrl: string | URL, + connectionOptions: MysqlClientOptions = {}, + ) { + const conn = new MysqlConnection(connectionUrl, connectionOptions); + super(conn); + this.connectionUrl = connectionUrl.toString(); + this.connectionOptions = connectionOptions; + this.eventTarget = new MysqlClientEventTarget(); + } + async connect(): Promise { + await this.connection.connect(); + this.eventTarget.dispatchEvent( + new MysqlClientConnectEvent({ connectable: this }), + ); + } + async close(): Promise { + this.eventTarget.dispatchEvent( + new MysqlClientCloseEvent({ connectable: this }), + ); + await this.connection.close(); + } + + async [Symbol.asyncDispose](): Promise { + await this.close(); + } +} diff --git a/lib/connection.test.ts b/lib/connection.test.ts new file mode 100644 index 0000000..25c1ff9 --- /dev/null +++ b/lib/connection.test.ts @@ -0,0 +1,460 @@ +import { assertEquals, assertInstanceOf } from "@std/assert"; +import { emptyDir } from "@std/fs"; +import { join } from "@std/path"; +import { MysqlConnection } from "./connection.ts"; +import { DIR_TMP_TEST } from "./utils/testing.ts"; +import { buildQuery } from "./packets/builders/query.ts"; +import { URL_TEST_CONNECTION } from "./utils/testing.ts"; +import { connectionConstructorTest } from "@halvardm/sqlx/testing"; + +Deno.test("Connection", async (t) => { + await emptyDir(DIR_TMP_TEST); + + const PATH_PEM_CA = join(DIR_TMP_TEST, "ca.pem"); + const PATH_PEM_CA2 = join(DIR_TMP_TEST, "ca2.pem"); + const PATH_PEM_CERT = join(DIR_TMP_TEST, "cert.pem"); + const PATH_PEM_KEY = join(DIR_TMP_TEST, "key.pem"); + + await Deno.writeTextFile(PATH_PEM_CA, "ca"); + await Deno.writeTextFile(PATH_PEM_CA2, "ca2"); + await Deno.writeTextFile(PATH_PEM_CERT, "cert"); + await Deno.writeTextFile(PATH_PEM_KEY, "key"); + + await t.step("can construct", async (t) => { + const connection = new MysqlConnection(URL_TEST_CONNECTION); + + assertInstanceOf(connection, MysqlConnection); + assertEquals(connection.connectionUrl, URL_TEST_CONNECTION); + + await t.step("can parse connection config simple", () => { + const url = new URL("mysql://user:pass@127.0.0.1:3306/db"); + + const c = new MysqlConnection(url.toString()); + + assertEquals(c.config, { + protocol: "mysql", + username: "user", + password: "pass", + hostname: "127.0.0.1", + port: 3306, + schema: "db", + socket: undefined, + tls: undefined, + parameters: {}, + }); + }); + await t.step("can parse connection config full", () => { + const url = new URL("mysql://user:pass@127.0.0.1:3306/db"); + url.searchParams.set("socket", "/tmp/mysql.sock"); + url.searchParams.set("ssl-mode", "VERIFY_IDENTITY"); + url.searchParams.set("ssl-ca", PATH_PEM_CA); + url.searchParams.set("ssl-capath", DIR_TMP_TEST); + url.searchParams.set("ssl-cert", PATH_PEM_CERT); + url.searchParams.set("ssl-cipher", "cipher"); + url.searchParams.set("ssl-crl", "crl.pem"); + url.searchParams.set("ssl-crlpath", "crlpath.pem"); + url.searchParams.set("ssl-key", PATH_PEM_KEY); + url.searchParams.set("tls-version", "TLSv1.2,TLSv1.3"); + url.searchParams.set("tls-versions", "[TLSv1.2,TLSv1.3]"); + url.searchParams.set("tls-ciphersuites", "ciphersuites"); + url.searchParams.set("auth-method", "AUTO"); + url.searchParams.set("get-server-public-key", "true"); + url.searchParams.set("server-public-key-path", "key.pem"); + url.searchParams.set("ssh", "usr@host:port"); + url.searchParams.set("uri", "mysql://user@127.0.0.1:3306"); + url.searchParams.set("ssh-password", "pass"); + url.searchParams.set("ssh-config-file", "config"); + url.searchParams.set("ssh-config-file", "config"); + url.searchParams.set("ssh-identity-file", "identity"); + url.searchParams.set("ssh-identity-pass", "identitypass"); + url.searchParams.set("connect-timeout", "10"); + url.searchParams.set("compression", "preferred"); + url.searchParams.set("compression-algorithms", "algo"); + url.searchParams.set("compression-level", "level"); + url.searchParams.set("connection-attributes", "true"); + + const c = new MysqlConnection(url.toString()); + + assertEquals(c.config, { + protocol: "mysql", + username: "user", + password: "pass", + hostname: "127.0.0.1", + port: 3306, + socket: "/tmp/mysql.sock", + schema: "db", + tls: { + mode: "VERIFY_IDENTITY", + caCerts: [ + "ca", + "ca2", + "cert", + "key", + ], + cert: "cert", + hostname: "127.0.0.1", + key: "key", + port: 3306, + }, + parameters: { + socket: "/tmp/mysql.sock", + sslMode: "VERIFY_IDENTITY", + sslCa: [PATH_PEM_CA], + sslCapath: [DIR_TMP_TEST], + sslCert: PATH_PEM_CERT, + sslCipher: "cipher", + sslCrl: "crl.pem", + sslCrlpath: "crlpath.pem", + sslKey: PATH_PEM_KEY, + tlsVersion: "TLSv1.2,TLSv1.3", + tlsVersions: "[TLSv1.2,TLSv1.3]", + tlsCiphersuites: "ciphersuites", + authMethod: "AUTO", + getServerPublicKey: true, + serverPublicKeyPath: "key.pem", + ssh: "usr@host:port", + uri: "mysql://user@127.0.0.1:3306", + sshPassword: "pass", + sshConfigFile: "config", + sshIdentityFile: "identity", + sshIdentityPass: "identitypass", + connectTimeout: 10, + compression: "preferred", + compressionAlgorithms: "algo", + compressionLevel: "level", + connectionAttributes: "true", + }, + }); + }); + + await connection.close(); + }); + + await connectionConstructorTest({ + t, + Connection: MysqlConnection, + connectionUrl: URL_TEST_CONNECTION, + connectionOptions: {}, + }); + + await t.step("can query database", async (t) => { + await using connection = new MysqlConnection(URL_TEST_CONNECTION); + await connection.connect(); + await t.step("can sendData", async () => { + const data = buildQuery("SELECT 1+1 AS result;"); + for await (const result1 of connection.sendData(data)) { + assertEquals(result1, { + row: [2], + fields: [ + { + catalog: "def", + decimals: 0, + defaultVal: "", + encoding: 63, + fieldFlag: 129, + fieldLen: 3, + fieldType: 8, + name: "result", + originName: "", + originTable: "", + schema: "", + table: "", + }, + ], + }); + } + }); + + await t.step("can parse time", async () => { + const data = buildQuery(`SELECT CAST("09:04:10" AS time) as time`); + for await (const result1 of connection.sendData(data)) { + assertEquals(result1, { + row: ["09:04:10"], + fields: [ + { + catalog: "def", + decimals: 0, + defaultVal: "", + encoding: 63, + fieldFlag: 128, + fieldLen: 10, + fieldType: 11, + name: "time", + originName: "", + originTable: "", + schema: "", + table: "", + }, + ], + }); + } + }); + + await t.step("can parse date", async () => { + const data = buildQuery( + `SELECT CAST("2024-04-15 09:04:10" AS date) as date`, + ); + for await (const result1 of connection.sendData(data)) { + assertEquals(result1, { + row: [new Date("2024-04-15T00:00:00.000Z")], + fields: [ + { + catalog: "def", + decimals: 0, + defaultVal: "", + encoding: 63, + fieldFlag: 128, + fieldLen: 10, + fieldType: 10, + name: "date", + originName: "", + originTable: "", + schema: "", + table: "", + }, + ], + }); + } + }); + + await t.step("can parse bigint", async () => { + const data = buildQuery(`SELECT 9223372036854775807 as result`); + for await (const result1 of connection.sendData(data)) { + assertEquals(result1, { + row: [9223372036854775807n], + fields: [ + { + catalog: "def", + decimals: 0, + defaultVal: "", + encoding: 63, + fieldFlag: 129, + fieldLen: 20, + fieldType: 8, + name: "result", + originName: "", + originTable: "", + schema: "", + table: "", + }, + ], + }); + } + }); + + await t.step("can parse decimal", async () => { + const data = buildQuery( + `SELECT 0.012345678901234567890123456789 as result`, + ); + for await (const result1 of connection.sendData(data)) { + assertEquals(result1, { + row: ["0.012345678901234567890123456789"], + fields: [ + { + catalog: "def", + decimals: 30, + defaultVal: "", + encoding: 63, + fieldFlag: 129, + fieldLen: 33, + fieldType: 246, + name: "result", + originName: "", + originTable: "", + schema: "", + table: "", + }, + ], + }); + } + }); + + await t.step("can parse empty string", async () => { + const data = buildQuery(`SELECT '' as result`); + for await (const result1 of connection.sendData(data)) { + assertEquals(result1, { + row: [""], + fields: [ + { + catalog: "def", + decimals: 31, + defaultVal: "", + encoding: 33, + fieldFlag: 1, + fieldLen: 0, + fieldType: 253, + name: "result", + originName: "", + originTable: "", + schema: "", + table: "", + }, + ], + }); + } + }); + + await t.step("can drop and create table", async () => { + const dropTableSql = buildQuery("DROP TABLE IF EXISTS test;"); + const dropTableReturned = connection.sendData(dropTableSql); + assertEquals(await dropTableReturned.next(), { + done: true, + value: { affectedRows: 0, lastInsertId: 0 }, + }); + const createTableSql = buildQuery( + "CREATE TABLE IF NOT EXISTS test (id INT);", + ); + const createTableReturned = connection.sendData(createTableSql); + assertEquals(await createTableReturned.next(), { + done: true, + value: { affectedRows: 0, lastInsertId: 0 }, + }); + const result = await Array.fromAsync(createTableReturned); + assertEquals(result, []); + }); + + await t.step("can insert to table", async () => { + const data = buildQuery("INSERT INTO test (id) VALUES (1),(2),(3);"); + const returned = connection.sendData(data); + assertEquals(await returned.next(), { + done: true, + value: { affectedRows: 3, lastInsertId: 0 }, + }); + const result = await Array.fromAsync(returned); + assertEquals(result, []); + }); + + await t.step("can select from table using sendData", async () => { + const data = buildQuery("SELECT * FROM test;"); + const returned = connection.sendData(data); + const result = await Array.fromAsync(returned); + assertEquals(result, [ + { + fields: [ + { + catalog: "def", + decimals: 0, + defaultVal: "", + encoding: 63, + fieldFlag: 0, + fieldLen: 11, + fieldType: 3, + name: "id", + originName: "id", + originTable: "test", + schema: "testdb", + table: "test", + }, + ], + row: [ + 1, + ], + }, + { + fields: [ + { + catalog: "def", + decimals: 0, + defaultVal: "", + encoding: 63, + fieldFlag: 0, + fieldLen: 11, + fieldType: 3, + name: "id", + originName: "id", + originTable: "test", + schema: "testdb", + table: "test", + }, + ], + row: [ + 2, + ], + }, + { + fields: [ + { + catalog: "def", + decimals: 0, + defaultVal: "", + encoding: 63, + fieldFlag: 0, + fieldLen: 11, + fieldType: 3, + name: "id", + originName: "id", + originTable: "test", + schema: "testdb", + table: "test", + }, + ], + row: [ + 3, + ], + }, + ]); + }); + + await t.step("can insert to table using executeRaw", async () => { + const data = buildQuery("INSERT INTO test (id) VALUES (4);"); + const result = await connection.executeRaw(data); + assertEquals(result, 1); + }); + + await t.step("can select from table using executeRaw", async () => { + const data = buildQuery("SELECT * FROM test;"); + const result = await connection.executeRaw(data); + assertEquals(result, undefined); + }); + + await t.step("can insert to table using queryManyObjectRaw", async () => { + const data = buildQuery("INSERT INTO test (id) VALUES (5);"); + const result = await Array.fromAsync(connection.queryManyObjectRaw(data)); + assertEquals(result, []); + }); + + await t.step("can select from table using queryManyObjectRaw", async () => { + const data = buildQuery("SELECT * FROM test;"); + const result = await Array.fromAsync(connection.queryManyObjectRaw(data)); + assertEquals(result, [ + { id: 1 }, + { id: 2 }, + { id: 3 }, + { id: 4 }, + { id: 5 }, + ]); + }); + + await t.step("can insert to table using queryManyArrayRaw", async () => { + const data = buildQuery("INSERT INTO test (id) VALUES (6);"); + const result = await Array.fromAsync(connection.queryManyArrayRaw(data)); + assertEquals(result, []); + }); + + await t.step("can select from table using queryManyArrayRaw", async () => { + const data = buildQuery("SELECT * FROM test;"); + const result = await Array.fromAsync(connection.queryManyArrayRaw(data)); + assertEquals(result, [ + [1], + [2], + [3], + [4], + [5], + [6], + ]); + }); + + await t.step("can drop table", async () => { + const data = buildQuery("DROP TABLE IF EXISTS test;"); + const returned = connection.sendData(data); + assertEquals(await returned.next(), { + done: true, + value: { affectedRows: 0, lastInsertId: 0 }, + }); + const result = await Array.fromAsync(returned); + assertEquals(result, []); + }); + }); + + await emptyDir(DIR_TMP_TEST); +}); diff --git a/lib/connection.ts b/lib/connection.ts new file mode 100644 index 0000000..03c6157 --- /dev/null +++ b/lib/connection.ts @@ -0,0 +1,623 @@ +import { + MysqlConnectionError, + MysqlProtocolError, + MysqlReadError, + MysqlResponseTimeoutError, +} from "./utils/errors.ts"; +import { buildAuth } from "./packets/builders/auth.ts"; +import { PacketReader, PacketWriter } from "./packets/packet.ts"; +import { parseError } from "./packets/parsers/err.ts"; +import { + AuthResult, + parseAuth, + parseHandshake, +} from "./packets/parsers/handshake.ts"; +import { + type FieldInfo, + getRowObject, + type MysqlParameterType, + parseField, + parseRowArray, +} from "./packets/parsers/result.ts"; +import { ComQueryResponsePacket } from "./constant/packet.ts"; +import { AuthPlugins } from "./auth_plugins/mod.ts"; +import { parseAuthSwitch } from "./packets/parsers/authswitch.ts"; +import auth from "./utils/hash.ts"; +import { ServerCapabilities } from "./constant/capabilities.ts"; +import { buildSSLRequest } from "./packets/builders/tls.ts"; +import { logger } from "./utils/logger.ts"; +import { + type ArrayRow, + type Row, + SqlxBase, + type SqlxConnection, + type SqlxConnectionOptions, + type SqlxQueryOptions, +} from "@halvardm/sqlx"; +import { resolve } from "@std/path"; +import { toCamelCase } from "@std/text"; +import { AuthPluginName } from "./auth_plugins/mod.ts"; + +/** + * Connection state + */ +export enum ConnectionState { + CONNECTING, + CONNECTED, + CLOSING, + CLOSED, +} + +export type ConnectionSendDataNext = { + row: ArrayRow; + fields: FieldInfo[]; +}; +export type ConnectionSendDataResult = { + affectedRows: number | undefined; + lastInsertId: number | undefined; +}; + +/** + * Tls mode for mysql connection + * + * @see {@link https://dev.mysql.com/doc/refman/8.0/en/connection-options.html#option_general_ssl-mode} + */ +export const TlsMode = { + Preferred: "PREFERRED", + Disabled: "DISABLED", + Required: "REQUIRED", + VerifyCa: "VERIFY_CA", + VerifyIdentity: "VERIFY_IDENTITY", +} as const; +export type TlsMode = typeof TlsMode[keyof typeof TlsMode]; + +export interface TlsOptions extends Deno.ConnectTlsOptions { + mode: TlsMode; +} + +/** + * Aditional connection parameters + * + * @see {@link https://dev.mysql.com/doc/refman/8.0/en/connecting-using-uri-or-key-value-pairs.html#connecting-using-uri} + */ +export interface ConnectionParameters { + socket?: string; + sslMode?: TlsMode; + sslCa?: string[]; + sslCapath?: string[]; + sslCert?: string; + sslCipher?: string; + sslCrl?: string; + sslCrlpath?: string; + sslKey?: string; + tlsVersion?: string; + tlsVersions?: string; + tlsCiphersuites?: string; + authMethod?: string; + getServerPublicKey?: boolean; + serverPublicKeyPath?: string; + ssh?: string; + uri?: string; + sshPassword?: string; + sshConfigFile?: string; + sshIdentityFile?: string; + sshIdentityPass?: string; + connectTimeout?: number; + compression?: string; + compressionAlgorithms?: string; + compressionLevel?: string; + connectionAttributes?: string; +} + +export interface ConnectionConfig { + protocol: string; + username: string; + password?: string; + hostname: string; + port: number; + socket?: string; + schema?: string; + /** + * Tls options + */ + tls?: Partial; + /** + * Aditional connection parameters + */ + parameters: ConnectionParameters; +} + +export interface MysqlConnectionOptions extends SqlxConnectionOptions { + tls?: Partial; +} + +/** Connection for mysql */ +export class MysqlConnection extends SqlxBase implements + SqlxConnection< + MysqlConnectionOptions + > { + state: ConnectionState = ConnectionState.CONNECTING; + capabilities: number = 0; + serverVersion: string = ""; + + protected _conn: Deno.Conn | null = null; + private _timedOut = false; + + readonly connectionUrl: string; + readonly connectionOptions: MysqlConnectionOptions; + readonly config: ConnectionConfig; + + get conn(): Deno.Conn { + if (!this._conn) { + throw new MysqlConnectionError("Not connected"); + } + if (this.state != ConnectionState.CONNECTED) { + if (this.state == ConnectionState.CLOSED) { + throw new MysqlConnectionError("Connection is closed"); + } else { + throw new MysqlConnectionError("Must be connected first"); + } + } + return this._conn; + } + + set conn(conn: Deno.Conn | null) { + this._conn = conn; + } + get connected(): boolean { + return this.state === ConnectionState.CONNECTED; + } + constructor( + connectionUrl: string | URL, + connectionOptions: MysqlConnectionOptions = {}, + ) { + super(); + this.connectionUrl = connectionUrl.toString().split("?")[0]; + this.connectionOptions = connectionOptions; + this.config = this.#parseConnectionConfig( + connectionUrl, + connectionOptions, + ); + } + + async connect(): Promise { + // TODO: implement connect timeout + if ( + this.config.tls?.mode && + this.config.tls?.mode !== TlsMode.Disabled && + this.config.tls?.mode !== TlsMode.VerifyIdentity + ) { + throw new Error("unsupported tls mode"); + } + + logger().info( + `connecting ${this.connectionUrl},${JSON.stringify(this.config)}`, + ); + + if (this.config.socket) { + this.conn = await Deno.connect({ + transport: "unix", + path: this.config.socket, + }); + } else { + this.conn = await Deno.connect({ + transport: "tcp", + hostname: this.config.hostname, + port: this.config.port, + }); + } + + try { + let receive = await this.#nextPacket(); + const handshakePacket = parseHandshake(receive.body); + + let handshakeSequenceNumber = receive.header.no; + + // Deno.startTls() only supports VERIFY_IDENTITY now. + let isSSL = false; + if ( + this.config.tls?.mode === TlsMode.VerifyIdentity + ) { + if ( + (handshakePacket.serverCapabilities & + ServerCapabilities.CLIENT_SSL) === 0 + ) { + throw new Error("Server does not support TLS"); + } + if ( + (handshakePacket.serverCapabilities & + ServerCapabilities.CLIENT_SSL) !== 0 + ) { + const tlsData = buildSSLRequest(handshakePacket, { + db: this.config.schema, + }); + await PacketWriter.write( + this.conn, + tlsData, + ++handshakeSequenceNumber, + ); + this.conn = await Deno.startTls(this.conn as Deno.TcpConn, { + hostname: this.config.hostname, + caCerts: this.config.tls?.caCerts, + }); + } + isSSL = true; + } + + const data = await buildAuth(handshakePacket, { + username: this.config.username, + password: this.config.password, + db: this.config.schema, + ssl: isSSL, + }); + + await PacketWriter.write(this._conn!, data, ++handshakeSequenceNumber); + + this.state = ConnectionState.CONNECTING; + this.serverVersion = handshakePacket.serverVersion; + this.capabilities = handshakePacket.serverCapabilities; + + receive = await this.#nextPacket(); + + const authResult = parseAuth(receive); + let authPlugin: AuthPluginName | undefined = undefined; + + switch (authResult) { + case AuthResult.AuthMoreRequired: { + authPlugin = handshakePacket.authPluginName as AuthPluginName; + break; + } + case AuthResult.MethodMismatch: { + const authSwitch = parseAuthSwitch(receive.body); + // If CLIENT_PLUGIN_AUTH capability is not supported, no new cipher is + // sent and we have to keep using the cipher sent in the init packet. + if ( + authSwitch.authPluginData === undefined || + authSwitch.authPluginData.length === 0 + ) { + authSwitch.authPluginData = handshakePacket.seed; + } + + let authData; + if (this.config.password) { + authData = await auth( + authSwitch.authPluginName, + this.config.password, + authSwitch.authPluginData, + ); + } else { + authData = Uint8Array.from([]); + } + + await PacketWriter.write( + this.conn, + authData, + receive.header.no + 1, + ); + + receive = await this.#nextPacket(); + const authSwitch2 = parseAuthSwitch(receive.body); + if (authSwitch2.authPluginName !== "") { + throw new Error( + "Do not allow to change the auth plugin more than once!", + ); + } + } + } + + if (authPlugin) { + switch (authPlugin) { + case AuthPluginName.CachingSha2Password: { + const plugin = new AuthPlugins[authPlugin]( + handshakePacket.seed, + this.config.password!, + ); + + while (!plugin.done) { + if (plugin.data) { + const sequenceNumber = receive.header.no + 1; + await PacketWriter.write( + this.conn, + plugin.data, + sequenceNumber, + ); + receive = await this.#nextPacket(); + } + if (plugin.quickRead) { + await this.#nextPacket(); + } + + await plugin.next(receive); + } + break; + } + default: + throw new Error("Unsupported auth plugin"); + } + } + + const header = receive.body.readUint8(); + if (header === 0xff) { + const error = parseError(receive.body, this); + logger().error(`connect error(${error.code}): ${error.message}`); + this.close(); + throw new Error(error.message); + } else { + logger().info(`connected to ${this.connectionUrl}`); + this.state = ConnectionState.CONNECTED; + } + } catch (error) { + // Call close() to avoid leaking socket. + this.close(); + throw error; + } + } + + close(): Promise { + if (this.state != ConnectionState.CLOSED) { + logger().info("close connection"); + this._conn?.close(); + this.state = ConnectionState.CLOSED; + } + return Promise.resolve(); + } + + /** + * Parses the connection url and options into a connection config + */ + #parseConnectionConfig( + connectionUrl: string | URL, + connectionOptions: MysqlConnectionOptions, + ): ConnectionConfig { + function parseParameters(url: URL): ConnectionParameters { + const parameters: ConnectionParameters = {}; + for (const [key, value] of url.searchParams) { + const pKey = toCamelCase(key); + if (pKey === "sslCa") { + if (!parameters.sslCa) { + parameters.sslCa = []; + } + parameters.sslCa.push(value); + } else if (pKey === "sslCapath") { + if (!parameters.sslCapath) { + parameters.sslCapath = []; + } + parameters.sslCapath.push(value); + } else if (pKey === "getServerPublicKey") { + parameters.getServerPublicKey = value === "true"; + } else if (pKey === "connectTimeout") { + parameters.connectTimeout = parseInt(value); + } else { + // deno-lint-ignore no-explicit-any + parameters[pKey as keyof ConnectionParameters] = value as any; + } + } + return parameters; + } + + function parseTlsOptions(config: ConnectionConfig): TlsOptions | undefined { + const baseTlsOptions: TlsOptions = { + port: config.port, + hostname: config.hostname, + mode: TlsMode.Preferred, + }; + + if (connectionOptions.tls) { + return { + ...baseTlsOptions, + ...connectionOptions.tls, + }; + } + + if (config.parameters.sslMode) { + const tlsOptions: TlsOptions = { + ...baseTlsOptions, + mode: config.parameters.sslMode, + }; + + const caCertPaths = new Set(); + + if (config.parameters.sslCa?.length) { + for (const caCert of config.parameters.sslCa) { + caCertPaths.add(resolve(caCert)); + } + } + + if (config.parameters.sslCapath?.length) { + for (const caPath of config.parameters.sslCapath) { + for (const f of Deno.readDirSync(caPath)) { + if (f.isFile && f.name.endsWith(".pem")) { + caCertPaths.add(resolve(caPath, f.name)); + } + } + } + } + + if (caCertPaths.size) { + tlsOptions.caCerts = []; + for (const caCert of caCertPaths) { + const content = Deno.readTextFileSync(caCert); + tlsOptions.caCerts.push(content); + } + // Due to some random bug in CI, we need to sort this for the test to pass consistently. + tlsOptions.caCerts.sort(); + } + + if (config.parameters.sslKey) { + tlsOptions.key = Deno.readTextFileSync( + resolve(config.parameters.sslKey), + ); + } + + if (config.parameters.sslCert) { + tlsOptions.cert = Deno.readTextFileSync( + resolve(config.parameters.sslCert), + ); + } + + return tlsOptions; + } + return undefined; + } + + const url = new URL(connectionUrl); + const parameters = parseParameters(url); + const config: ConnectionConfig = { + protocol: url.protocol.slice(0, -1), + username: url.username, + password: url.password || undefined, + hostname: url.hostname, + port: parseInt(url.port || "3306"), + schema: url.pathname.slice(1), + parameters: parameters, + socket: parameters.socket, + }; + + config.tls = parseTlsOptions(config); + + return config; + } + + async #nextPacket(): Promise { + if (!this._conn) { + throw new MysqlConnectionError("Not connected"); + } + + const timeoutTimer = this.config.parameters.connectTimeout + ? setTimeout( + this.#timeoutCallback, + this.config.parameters.connectTimeout, + ) + : null; + let packet: PacketReader | null; + try { + packet = await PacketReader.read(this._conn); + } catch (error) { + if (this._timedOut) { + // Connection has been closed by timeoutCallback. + throw new MysqlResponseTimeoutError("Connection read timed out"); + } + timeoutTimer && clearTimeout(timeoutTimer); + this.close(); + throw error; + } + timeoutTimer && clearTimeout(timeoutTimer); + + if (!packet) { + // Connection is half-closed by the remote host. + // Call close() to avoid leaking socket. + this.close(); + throw new MysqlReadError("Connection closed unexpectedly"); + } + if (packet.type === ComQueryResponsePacket.ERR_Packet) { + packet.body.skip(1); + const error = parseError(packet.body, this); + throw new Error(error.message); + } + return packet; + } + + #timeoutCallback = () => { + logger().info("connection read timed out"); + this._timedOut = true; + this.close(); + }; + + async *sendData( + data: Uint8Array, + options?: SqlxQueryOptions, + ): AsyncGenerator< + ConnectionSendDataNext, + ConnectionSendDataResult | undefined + > { + try { + await PacketWriter.write(this.conn, data, 0); + let receive = await this.#nextPacket(); + logger().debug(`packet type: ${receive.type.toString()}`); + if (receive.type === ComQueryResponsePacket.OK_Packet) { + receive.body.skip(1); + return { + affectedRows: receive.body.readEncodedLen(), + lastInsertId: receive.body.readEncodedLen(), + }; + } else if (receive.type !== ComQueryResponsePacket.Result) { + throw new MysqlProtocolError(receive.type.toString()); + } + let fieldCount = receive.body.readEncodedLen(); + const fields: FieldInfo[] = []; + while (fieldCount--) { + const packet = await this.#nextPacket(); + if (packet) { + const field = parseField(packet.body); + fields.push(field); + } + } + + if (!(this.capabilities & ServerCapabilities.CLIENT_DEPRECATE_EOF)) { + // EOF(mysql < 5.7 or mariadb < 10.2) + receive = await this.#nextPacket(); + if (receive.type !== ComQueryResponsePacket.EOF_Packet) { + throw new MysqlProtocolError(receive.type.toString()); + } + } + + receive = await this.#nextPacket(); + + while (receive.type !== ComQueryResponsePacket.EOF_Packet) { + const row = parseRowArray(receive.body, fields, options); + yield { + row, + fields, + }; + receive = await this.#nextPacket(); + } + } catch (error) { + this.close(); + throw error; + } + } + + async executeRaw( + data: Uint8Array, + options?: SqlxQueryOptions, + ): Promise { + const gen = this.sendData(data, options); + let result = await gen.next(); + if (result.done) { + return result.value?.affectedRows; + } + + const debugRest = []; + debugRest.push(result); + while (!result.done) { + result = await gen.next(); + debugRest.push(result); + logger().debug(`executeRaw overflow: ${JSON.stringify(debugRest)}`); + } + logger().debug(`executeRaw overflow: ${JSON.stringify(debugRest)}`); + return undefined; + } + + async *queryManyObjectRaw = Row>( + data: Uint8Array, + options?: SqlxQueryOptions, + ): AsyncIterableIterator { + for await (const res of this.sendData(data, options)) { + yield getRowObject(res.fields, res.row) as T; + } + } + + async *queryManyArrayRaw = ArrayRow>( + data: Uint8Array, + options?: SqlxQueryOptions, + ): AsyncIterableIterator { + for await (const res of this.sendData(data, options)) { + const row = res.row as T; + yield row as T; + } + } + + async [Symbol.asyncDispose](): Promise { + await this.close(); + } +} diff --git a/src/constant/capabilities.ts b/lib/constant/capabilities.ts similarity index 92% rename from src/constant/capabilities.ts rename to lib/constant/capabilities.ts index a411d79..d5cab54 100644 --- a/src/constant/capabilities.ts +++ b/lib/constant/capabilities.ts @@ -1,4 +1,7 @@ -enum ServerCapabilities { +/** + * MySQL Server Capabilities + */ +export enum ServerCapabilities { CLIENT_LONG_PASSWORD = 0x00000001, CLIENT_FOUND_ROWS = 0x00000002, CLIENT_LONG_FLAG = 0x00000004, @@ -23,5 +26,3 @@ enum ServerCapabilities { CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA = 0x00200000, CLIENT_DEPRECATE_EOF = 0x01000000, } - -export default ServerCapabilities; diff --git a/src/constant/charset.ts b/lib/constant/charset.ts similarity index 99% rename from src/constant/charset.ts rename to lib/constant/charset.ts index 40447c9..f450832 100644 --- a/src/constant/charset.ts +++ b/lib/constant/charset.ts @@ -1,3 +1,6 @@ +/** + * MySQL Charset + */ export enum Charset { BIG5_CHINESE_CI = 1, LATIN2_CZECH_CS = 2, diff --git a/lib/constant/mysql_types.ts b/lib/constant/mysql_types.ts new file mode 100644 index 0000000..997fba8 --- /dev/null +++ b/lib/constant/mysql_types.ts @@ -0,0 +1,35 @@ +/** + * MySQL data types + */ +export const MysqlDataType = { + Decimal: 0x00, + Tiny: 0x01, + Short: 0x02, + Long: 0x03, + Float: 0x04, + Double: 0x05, + Null: 0x06, + Timestamp: 0x07, + LongLong: 0x08, + Int24: 0x09, + Date: 0x0a, + Time: 0x0b, + DateTime: 0x0c, + Year: 0x0d, + NewDate: 0x0e, + VarChar: 0x0f, + Bit: 0x10, + Timestamp2: 0x11, + DateTime2: 0x12, + Time2: 0x13, + NewDecimal: 0xf6, + Enum: 0xf7, + Set: 0xf8, + TinyBlob: 0xf9, + MediumBlob: 0xfa, + LongBlob: 0xfb, + Blob: 0xfc, + VarString: 0xfd, + String: 0xfe, + Geometry: 0xff, +} as const; diff --git a/src/constant/packet.ts b/lib/constant/packet.ts similarity index 55% rename from src/constant/packet.ts rename to lib/constant/packet.ts index 715e411..0cd49fe 100644 --- a/src/constant/packet.ts +++ b/lib/constant/packet.ts @@ -1,4 +1,7 @@ -export enum PacketType { +/** + * PacketType + */ +export enum ComQueryResponsePacket { OK_Packet = 0x00, EOF_Packet = 0xfe, ERR_Packet = 0xff, diff --git a/src/constant/server_status.ts b/lib/constant/server_status.ts similarity index 93% rename from src/constant/server_status.ts rename to lib/constant/server_status.ts index 146889f..d38c5d2 100644 --- a/src/constant/server_status.ts +++ b/lib/constant/server_status.ts @@ -1,4 +1,6 @@ -/** @ignore */ +/** + * Server status flags + */ export enum ServerStatus { IN_TRANSACTION = 0x0001, AUTO_COMMIT = 0x0002, diff --git a/src/packets/builders/auth.ts b/lib/packets/builders/auth.ts similarity index 85% rename from src/packets/builders/auth.ts rename to lib/packets/builders/auth.ts index 194c485..3cff7ac 100644 --- a/src/packets/builders/auth.ts +++ b/lib/packets/builders/auth.ts @@ -1,15 +1,15 @@ -import auth from "../../auth.ts"; -import { BufferWriter } from "../../buffer.ts"; -import ServerCapabilities from "../../constant/capabilities.ts"; +import auth from "../../utils/hash.ts"; +import { BufferWriter } from "../../utils/buffer.ts"; +import { ServerCapabilities } from "../../constant/capabilities.ts"; import { Charset } from "../../constant/charset.ts"; import type { HandshakeBody } from "../parsers/handshake.ts"; import { clientCapabilities } from "./client_capabilities.ts"; /** @ignore */ -export function buildAuth( +export async function buildAuth( packet: HandshakeBody, - params: { username: string; password?: string; db?: string; ssl?: boolean }, -): Uint8Array { + params: { username: string; password?: string; db?: string; ssl: boolean }, +): Promise { const clientParam: number = clientCapabilities(packet, params); if (packet.serverCapabilities & ServerCapabilities.CLIENT_PLUGIN_AUTH) { @@ -21,7 +21,7 @@ export function buildAuth( .skip(23) .writeNullTerminatedString(params.username); if (params.password) { - const authData = auth( + const authData = await auth( packet.authPluginName, params.password, packet.seed, diff --git a/src/packets/builders/client_capabilities.ts b/lib/packets/builders/client_capabilities.ts similarity index 92% rename from src/packets/builders/client_capabilities.ts rename to lib/packets/builders/client_capabilities.ts index 842fdcb..b03125e 100644 --- a/src/packets/builders/client_capabilities.ts +++ b/lib/packets/builders/client_capabilities.ts @@ -1,4 +1,4 @@ -import ServerCapabilities from "../../constant/capabilities.ts"; +import { ServerCapabilities } from "../../constant/capabilities.ts"; import type { HandshakeBody } from "../parsers/handshake.ts"; export function clientCapabilities( diff --git a/lib/packets/builders/query.ts b/lib/packets/builders/query.ts new file mode 100644 index 0000000..0ba9d75 --- /dev/null +++ b/lib/packets/builders/query.ts @@ -0,0 +1,16 @@ +import { replaceParams } from "../../utils/query.ts"; +import { BufferWriter } from "../../utils/buffer.ts"; +import { encode } from "../../utils/encoding.ts"; +import type { MysqlParameterType } from "../parsers/result.ts"; + +/** @ignore */ +export function buildQuery( + sql: string, + params: MysqlParameterType[] = [], +): Uint8Array { + const data = encode(replaceParams(sql, params)); + const writer = new BufferWriter(new Uint8Array(data.length + 1)); + writer.write(0x03); + writer.writeBuffer(data); + return writer.buffer; +} diff --git a/src/packets/builders/tls.ts b/lib/packets/builders/tls.ts similarity index 91% rename from src/packets/builders/tls.ts rename to lib/packets/builders/tls.ts index 487301a..5963c01 100644 --- a/src/packets/builders/tls.ts +++ b/lib/packets/builders/tls.ts @@ -1,4 +1,4 @@ -import { BufferWriter } from "../../buffer.ts"; +import { BufferWriter } from "../../utils/buffer.ts"; import { Charset } from "../../constant/charset.ts"; import type { HandshakeBody } from "../parsers/handshake.ts"; import { clientCapabilities } from "./client_capabilities.ts"; diff --git a/lib/packets/packet.ts b/lib/packets/packet.ts new file mode 100644 index 0000000..9af5894 --- /dev/null +++ b/lib/packets/packet.ts @@ -0,0 +1,155 @@ +import { dump } from "@stdext/encoding/hex"; +import { BufferReader, BufferWriter } from "../utils/buffer.ts"; +import { MysqlWriteError } from "../utils/errors.ts"; +import { logger } from "../utils/logger.ts"; +import { ComQueryResponsePacket } from "../constant/packet.ts"; + +/** @ignore */ +interface PacketHeader { + size: number; + no: number; +} + +/** + * Helper for sending a packet through the connection + */ +export class PacketWriter { + header: PacketHeader; + body: Uint8Array; + + constructor(body: Uint8Array, no: number) { + this.body = body; + this.header = { size: body.length, no }; + } + + /** + * Send the packet through the connection + * + * @param conn The connection + */ + async write(conn: Deno.Conn) { + const body = this.body; + + const data = new BufferWriter(new Uint8Array(4 + body.length)); + data.writeUints(3, this.header.size); + data.write(this.header.no); + data.writeBuffer(body); + logger().debug(`send: ${data.length}B \n${dump(data.buffer)}\n`); + try { + let wrote = 0; + do { + wrote += await conn.write(data.buffer.subarray(wrote)); + } while (wrote < data.length); + } catch (error) { + throw new MysqlWriteError(error.message); + } + } + + /** + * Send a packet through the connection + * + * @param conn The connection + * @param body The packet body + * @param no The packet number + * @returns SendPacket instance + */ + static async write( + conn: Deno.Conn, + body: Uint8Array, + no: number, + ): Promise { + const packet = new PacketWriter(body, no); + await packet.write(conn); + return packet; + } +} + +/** + * Helper for receiving a packet through the connection + */ +export class PacketReader { + header: PacketHeader; + body: BufferReader; + type: ComQueryResponsePacket; + + constructor( + header: PacketHeader, + body: BufferReader, + type: ComQueryResponsePacket, + ) { + this.header = header; + this.body = body; + this.type = type; + } + + /** + * Read a subarray from the connection + * + * @param conn The connection + * @param buffer The buffer to read into + * @returns The number of bytes read + */ + static async #readSubarray( + conn: Deno.Conn, + buffer: Uint8Array, + ): Promise { + const size = buffer.length; + let haveRead = 0; + while (haveRead < size) { + const nread = await conn.read(buffer.subarray(haveRead)); + if (nread === null) return null; + haveRead += nread; + } + return haveRead; + } + + /** + * Read a subarray from the connection + * + * @param conn + * @returns The PacketReader instance or null if nothing could be read + */ + static async read(conn: Deno.Conn): Promise { + const headerReader = new BufferReader(new Uint8Array(4)); + let readCount = 0; + let nread = await this.#readSubarray(conn, headerReader.buffer); + if (nread === null) return null; + readCount = nread; + const bodySize = headerReader.readUints(3); + const header = { + size: bodySize, + no: headerReader.readUint8(), + }; + const bodyReader = new BufferReader(new Uint8Array(bodySize)); + nread = await this.#readSubarray(conn, bodyReader.buffer); + if (nread === null) return null; + readCount += nread; + + let type: ComQueryResponsePacket; + switch (bodyReader.buffer[0]) { + case ComQueryResponsePacket.OK_Packet: + type = ComQueryResponsePacket.OK_Packet; + break; + case ComQueryResponsePacket.ERR_Packet: + type = ComQueryResponsePacket.ERR_Packet; + break; + case ComQueryResponsePacket.EOF_Packet: + type = ComQueryResponsePacket.EOF_Packet; + break; + default: + type = ComQueryResponsePacket.Result; + break; + } + + logger().debug(() => { + const data = new Uint8Array(readCount); + data.set(headerReader.buffer); + data.set(bodyReader.buffer, 4); + return `receive: ${readCount}B, size = ${header.size}, no = ${header.no} \n${ + dump(data) + }\n`; + }); + + return new PacketReader(header, bodyReader, type); + } +} diff --git a/src/packets/parsers/authswitch.ts b/lib/packets/parsers/authswitch.ts similarity index 88% rename from src/packets/parsers/authswitch.ts rename to lib/packets/parsers/authswitch.ts index ac8b728..698b186 100644 --- a/src/packets/parsers/authswitch.ts +++ b/lib/packets/parsers/authswitch.ts @@ -1,4 +1,4 @@ -import { BufferReader } from "../../buffer.ts"; +import type { BufferReader } from "../../utils/buffer.ts"; /** @ignore */ export interface authSwitchBody { diff --git a/src/packets/parsers/err.ts b/lib/packets/parsers/err.ts similarity index 72% rename from src/packets/parsers/err.ts rename to lib/packets/parsers/err.ts index 8589446..dac14ef 100644 --- a/src/packets/parsers/err.ts +++ b/lib/packets/parsers/err.ts @@ -1,6 +1,6 @@ -import type { BufferReader } from "../../buffer.ts"; -import type { Connection } from "../../connection.ts"; -import ServerCapabilities from "../../constant/capabilities.ts"; +import type { BufferReader } from "../../utils/buffer.ts"; +import type { MysqlConnection } from "../../connection.ts"; +import { ServerCapabilities } from "../../constant/capabilities.ts"; /** @ignore */ export interface ErrorPacket { @@ -13,7 +13,7 @@ export interface ErrorPacket { /** @ignore */ export function parseError( reader: BufferReader, - conn: Connection, + conn: MysqlConnection, ): ErrorPacket { const code = reader.readUint16(); const packet: ErrorPacket = { diff --git a/src/packets/parsers/handshake.ts b/lib/packets/parsers/handshake.ts similarity index 81% rename from src/packets/parsers/handshake.ts rename to lib/packets/parsers/handshake.ts index 959e028..c89ef1a 100644 --- a/src/packets/parsers/handshake.ts +++ b/lib/packets/parsers/handshake.ts @@ -1,7 +1,7 @@ -import { BufferReader, BufferWriter } from "../../buffer.ts"; -import ServerCapabilities from "../../constant/capabilities.ts"; -import { PacketType } from "../../constant/packet.ts"; -import { ReceivePacket } from "../packet.ts"; +import { type BufferReader, BufferWriter } from "../../utils/buffer.ts"; +import { ServerCapabilities } from "../../constant/capabilities.ts"; +import { ComQueryResponsePacket } from "../../constant/packet.ts"; +import type { PacketReader } from "../packet.ts"; /** @ignore */ export interface HandshakeBody { @@ -73,13 +73,13 @@ export enum AuthResult { MethodMismatch, AuthMoreRequired, } -export function parseAuth(packet: ReceivePacket): AuthResult { +export function parseAuth(packet: PacketReader): AuthResult { switch (packet.type) { - case PacketType.EOF_Packet: + case ComQueryResponsePacket.EOF_Packet: return AuthResult.MethodMismatch; - case PacketType.Result: + case ComQueryResponsePacket.Result: return AuthResult.AuthMoreRequired; - case PacketType.OK_Packet: + case ComQueryResponsePacket.OK_Packet: return AuthResult.AuthPassed; default: return AuthResult.AuthPassed; diff --git a/lib/packets/parsers/result.ts b/lib/packets/parsers/result.ts new file mode 100644 index 0000000..2b889e1 --- /dev/null +++ b/lib/packets/parsers/result.ts @@ -0,0 +1,159 @@ +import type { BufferReader } from "../../utils/buffer.ts"; +import { MysqlDataType } from "../../constant/mysql_types.ts"; +import type { ArrayRow, Row, SqlxQueryOptions } from "@halvardm/sqlx"; + +export type MysqlParameterType = + | null + | string + | number + | boolean + | bigint + | Date + // deno-lint-ignore no-explicit-any + | Array + | object + | undefined; + +/** + * Field information + */ +export interface FieldInfo { + catalog: string; + schema: string; + table: string; + originTable: string; + name: string; + originName: string; + encoding: number; + fieldLen: number; + fieldType: number; + fieldFlag: number; + decimals: number; + defaultVal: string; +} + +/** + * Parses the field + */ +export function parseField(reader: BufferReader): FieldInfo { + const catalog = reader.readLenCodeString()!; + const schema = reader.readLenCodeString()!; + const table = reader.readLenCodeString()!; + const originTable = reader.readLenCodeString()!; + const name = reader.readLenCodeString()!; + const originName = reader.readLenCodeString()!; + reader.skip(1); + const encoding = reader.readUint16()!; + const fieldLen = reader.readUint32()!; + const fieldType = reader.readUint8()!; + const fieldFlag = reader.readUint16()!; + const decimals = reader.readUint8()!; + reader.skip(1); + const defaultVal = reader.readLenCodeString()!; + return { + catalog, + schema, + table, + originName, + fieldFlag, + originTable, + fieldLen, + name, + fieldType, + encoding, + decimals, + defaultVal, + }; +} + +/** + * Parse the row as an array + */ +export function parseRowArray( + reader: BufferReader, + fields: FieldInfo[], + options?: SqlxQueryOptions, +): ArrayRow { + const row: MysqlParameterType[] = []; + for (const field of fields) { + const val = reader.readLenCodeString(); + const parsedVal = val === null ? null : convertType(field, val, options); + row.push(parsedVal); + } + return row; +} + +/** + * Parses the row as an object + */ +export function parseRowObject( + reader: BufferReader, + fields: FieldInfo[], +): Row { + const rowArray = parseRowArray(reader, fields); + return getRowObject(fields, rowArray); +} + +export function getRowObject( + fields: FieldInfo[], + row: ArrayRow, +): Row { + const obj: Row = {}; + for (const [i, field] of fields.entries()) { + const name = field.name; + obj[name] = row[i]; + } + return obj; +} + +/** + * Converts the value to the correct type + */ +function convertType( + field: FieldInfo, + val: string, + options?: SqlxQueryOptions, +): MysqlParameterType { + if (options?.transformOutput) { + // deno-lint-ignore no-explicit-any + return options.transformOutput(val) as any; + } + const { fieldType } = field; + switch (fieldType) { + case MysqlDataType.Decimal: + case MysqlDataType.Double: + case MysqlDataType.Float: + case MysqlDataType.DateTime2: + return parseFloat(val); + case MysqlDataType.NewDecimal: + return val; // #42 MySQL's decimal type cannot be accurately represented by the Number. + case MysqlDataType.Tiny: + case MysqlDataType.Short: + case MysqlDataType.Long: + case MysqlDataType.Int24: + return parseInt(val); + case MysqlDataType.LongLong: + if ( + Number(val) < Number.MIN_SAFE_INTEGER || + Number(val) > Number.MAX_SAFE_INTEGER + ) { + return BigInt(val); + } else { + return parseInt(val); + } + case MysqlDataType.VarChar: + case MysqlDataType.VarString: + case MysqlDataType.String: + case MysqlDataType.Time: + case MysqlDataType.Time2: + return val; + case MysqlDataType.Date: + case MysqlDataType.Timestamp: + case MysqlDataType.DateTime: + case MysqlDataType.NewDate: + case MysqlDataType.Timestamp2: + return new Date(val); + default: + return val; + } +} diff --git a/lib/pool.test.ts b/lib/pool.test.ts new file mode 100644 index 0000000..0309a62 --- /dev/null +++ b/lib/pool.test.ts @@ -0,0 +1,50 @@ +import { MysqlClientPool } from "./pool.ts"; +import { QUERIES, services } from "./utils/testing.ts"; +import { clientPoolTest } from "@halvardm/sqlx/testing"; + +Deno.test("Pool Test", async (t) => { + for (const service of services) { + await t.step(`Testing ${service.name}`, async (t) => { + await t.step(`TCP`, async (t) => { + await clientPoolTest({ + t, + Client: MysqlClientPool, + connectionUrl: service.url, + connectionOptions: {}, + queries: QUERIES, + }); + }); + + // Enable once socket connection issue is fixed + // + // await t.step(`UNIX Socket`, async (t) => { + // await implementationTest({ + // t, + // Client: MysqlClient, + // // deno-lint-ignore no-explicit-any + // PoolClient: MysqlClientPool as any, + // connectionUrl: service.urlSocket, + // connectionOptions: {}, + // queries: { + // createTable: + // "CREATE TABLE IF NOT EXISTS sqlxtesttable (testcol TEXT)", + // dropTable: "DROP TABLE IF EXISTS sqlxtesttable", + // insertOneToTable: "INSERT INTO sqlxtesttable (testcol) VALUES (?)", + // insertManyToTable: + // "INSERT INTO sqlxtesttable (testcol) VALUES (?),(?),(?)", + // selectOneFromTable: + // "SELECT * FROM sqlxtesttable WHERE testcol = ? LIMIT 1", + // selectByMatchFromTable: + // "SELECT * FROM sqlxtesttable WHERE testcol = ?", + // selectManyFromTable: "SELECT * FROM sqlxtesttable", + // select1AsString: "SELECT '1' as result", + // select1Plus1AsNumber: "SELECT 1+1 as result", + // deleteByMatchFromTable: + // "DELETE FROM sqlxtesttable WHERE testcol = ?", + // deleteAllFromTable: "DELETE FROM sqlxtesttable", + // }, + // }); + // }); + }); + } +}); diff --git a/lib/pool.ts b/lib/pool.ts new file mode 100644 index 0000000..0228ccf --- /dev/null +++ b/lib/pool.ts @@ -0,0 +1,159 @@ +import { + SqlxBase, + type SqlxClientPool, + type SqlxClientPoolOptions, + SqlxDeferredStack, + SqlxError, + type SqlxPoolClient, +} from "@halvardm/sqlx"; +import { + type MysqlPrepared, + type MysqlQueryOptions, + type MySqlTransaction, + MysqlTransactionable, + type MysqlTransactionOptions, +} from "./sqlx.ts"; +import { MysqlConnection, type MysqlConnectionOptions } from "./connection.ts"; +import type { MysqlParameterType } from "./packets/parsers/result.ts"; +import { + MysqlPoolAcquireEvent, + MysqlPoolCloseEvent, + MysqlPoolConnectEvent, + MysqlPoolReleaseEvent, +} from "./utils/events.ts"; +import { MysqlClientEventTarget } from "./utils/events.ts"; + +export interface MysqlClientPoolOptions + extends MysqlConnectionOptions, SqlxClientPoolOptions { +} + +export class MysqlPoolClient extends MysqlTransactionable + implements + SqlxPoolClient< + MysqlConnectionOptions, + MysqlConnection, + MysqlParameterType, + MysqlQueryOptions, + MysqlPrepared, + MysqlTransactionOptions, + MySqlTransaction + > { + /** + * Must be set by the client pool on creation + * @inheritdoc + */ + release(): Promise { + throw new Error("Method not implemented."); + } + + async [Symbol.asyncDispose](): Promise { + await this.release(); + } +} + +export class MysqlClientPool extends SqlxBase implements + SqlxClientPool< + MysqlConnectionOptions, + MysqlConnection, + MysqlParameterType, + MysqlQueryOptions, + MysqlPrepared, + MysqlTransactionOptions, + MySqlTransaction, + MysqlPoolClient, + SqlxDeferredStack + > { + readonly connectionUrl: string; + readonly connectionOptions: MysqlClientPoolOptions; + readonly eventTarget: EventTarget; + readonly deferredStack: SqlxDeferredStack; + readonly queryOptions: MysqlQueryOptions; + + #connected: boolean = false; + + get connected(): boolean { + return this.#connected; + } + + constructor( + connectionUrl: string | URL, + connectionOptions: MysqlClientPoolOptions = {}, + ) { + super(); + this.connectionUrl = connectionUrl.toString(); + this.connectionOptions = connectionOptions; + this.queryOptions = connectionOptions; + this.eventTarget = new MysqlClientEventTarget(); + this.deferredStack = new SqlxDeferredStack( + connectionOptions, + ); + } + + async connect(): Promise { + for (let i = 0; i < this.deferredStack.maxSize; i++) { + const conn = new MysqlConnection( + this.connectionUrl, + this.connectionOptions, + ); + const client = new MysqlPoolClient( + conn, + this.queryOptions, + ); + client.release = () => this.release(client); + + if (!this.connectionOptions.lazyInitialization) { + await client.connection.connect(); + this.eventTarget.dispatchEvent( + new MysqlPoolConnectEvent({ connectable: client }), + ); + } + + this.deferredStack.push(client); + } + + this.#connected = true; + } + + async close(): Promise { + this.#connected = false; + + for (const client of this.deferredStack.elements) { + this.eventTarget.dispatchEvent( + new MysqlPoolCloseEvent({ connectable: client }), + ); + await client.connection.close(); + } + } + + async acquire(): Promise { + const client = await this.deferredStack.pop(); + if (!client.connected) { + await client.connection.connect(); + } + + this.eventTarget.dispatchEvent( + new MysqlPoolAcquireEvent({ connectable: client }), + ); + return client; + } + + async release(client: MysqlPoolClient): Promise { + this.eventTarget.dispatchEvent( + new MysqlPoolReleaseEvent({ connectable: client }), + ); + try { + this.deferredStack.push(client); + } catch (e) { + if (e instanceof SqlxError && e.message === "Max pool size reached") { + await client.connection.close(); + throw e; + } else { + throw e; + } + } + } + + async [Symbol.asyncDispose](): Promise { + await this.close(); + } +} diff --git a/lib/sqlx.ts b/lib/sqlx.ts new file mode 100644 index 0000000..0dc5679 --- /dev/null +++ b/lib/sqlx.ts @@ -0,0 +1,377 @@ +import { + type ArrayRow, + type Row, + SqlxBase, + SqlxPreparable, + SqlxPreparedQueriable, + SqlxQueriable, + type SqlxQueryOptions, + SqlxTransactionable, + type SqlxTransactionOptions, + SqlxTransactionQueriable, +} from "@halvardm/sqlx"; +import type { MysqlConnection, MysqlConnectionOptions } from "./connection.ts"; +import { buildQuery } from "./packets/builders/query.ts"; +import type { MysqlParameterType } from "./packets/parsers/result.ts"; +import { MysqlTransactionError } from "./utils/errors.ts"; + +export interface MysqlQueryOptions extends SqlxQueryOptions { +} + +export interface MysqlTransactionOptions extends SqlxTransactionOptions { + beginTransactionOptions: { + withConsistentSnapshot?: boolean; + readWrite?: "READ WRITE" | "READ ONLY"; + }; + commitTransactionOptions: { + chain?: boolean; + release?: boolean; + }; + rollbackTransactionOptions: { + chain?: boolean; + release?: boolean; + savepoint?: string; + }; +} + +export class MysqlQueriable extends SqlxBase implements + SqlxQueriable< + MysqlConnectionOptions, + MysqlConnection, + MysqlParameterType, + MysqlQueryOptions + > { + readonly connection: MysqlConnection; + readonly queryOptions: MysqlQueryOptions; + + get connected(): boolean { + return this.connection.connected; + } + + constructor( + connection: MysqlConnection, + queryOptions: MysqlQueryOptions = {}, + ) { + super(); + this.connection = connection; + this.queryOptions = queryOptions; + } + + execute( + sql: string, + params?: MysqlParameterType[] | undefined, + _options?: MysqlQueryOptions | undefined, + ): Promise { + const data = buildQuery(sql, params); + return this.connection.executeRaw(data); + } + query = Row>( + sql: string, + params?: MysqlParameterType[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + return Array.fromAsync(this.queryMany(sql, params, options)); + } + async queryOne = Row>( + sql: string, + params?: MysqlParameterType[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + const res = await this.query(sql, params, options); + return res[0]; + } + async *queryMany = Row>( + sql: string, + params?: MysqlParameterType[], + options?: MysqlQueryOptions | undefined, + ): AsyncGenerator { + const data = buildQuery(sql, params); + for await ( + const res of this.connection.queryManyObjectRaw(data, options) + ) { + yield res; + } + } + + queryArray< + T extends ArrayRow = ArrayRow, + >( + sql: string, + params?: MysqlParameterType[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + return Array.fromAsync(this.queryManyArray(sql, params, options)); + } + async queryOneArray< + T extends ArrayRow = ArrayRow, + >( + sql: string, + params?: MysqlParameterType[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + const res = await this.queryArray(sql, params, options); + return res[0]; + } + async *queryManyArray< + T extends ArrayRow = ArrayRow, + >( + sql: string, + params?: MysqlParameterType[] | undefined, + options?: MysqlQueryOptions | undefined, + ): AsyncGenerator { + const data = buildQuery(sql, params); + for await ( + const res of this.connection.queryManyArrayRaw(data, options) + ) { + yield res; + } + } + sql = Row>( + strings: TemplateStringsArray, + ...parameters: MysqlParameterType[] + ): Promise { + return this.query(strings.join("?"), parameters); + } + sqlArray< + T extends ArrayRow = ArrayRow, + >( + strings: TemplateStringsArray, + ...parameters: MysqlParameterType[] + ): Promise { + return this.queryArray(strings.join("?"), parameters); + } +} + +/** + * Prepared statement + * + * @todo implement prepared statements properly + */ +export class MysqlPrepared extends SqlxBase implements + SqlxPreparedQueriable< + MysqlConnectionOptions, + MysqlConnection, + MysqlParameterType, + MysqlQueryOptions + > { + readonly sql: string; + readonly queryOptions: MysqlQueryOptions; + + #queriable: MysqlQueriable; + + connection: MysqlConnection; + + get connected(): boolean { + return this.connection.connected; + } + + constructor( + connection: MysqlConnection, + sql: string, + options: MysqlQueryOptions = {}, + ) { + super(); + this.connection = connection; + this.sql = sql; + this.queryOptions = options; + this.#queriable = new MysqlQueriable(connection, this.queryOptions); + } + + execute( + params?: MysqlParameterType[] | undefined, + _options?: MysqlQueryOptions | undefined, + ): Promise { + return this.#queriable.execute(this.sql, params); + } + query = Row>( + params?: MysqlParameterType[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + return this.#queriable.query(this.sql, params, options); + } + queryOne = Row>( + params?: MysqlParameterType[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + return this.#queriable.queryOne(this.sql, params, options); + } + queryMany = Row>( + params?: MysqlParameterType[] | undefined, + options?: MysqlQueryOptions | undefined, + ): AsyncGenerator { + return this.#queriable.queryMany(this.sql, params, options); + } + queryArray< + T extends ArrayRow = ArrayRow, + >( + params?: MysqlParameterType[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + return this.#queriable.queryArray(this.sql, params, options); + } + queryOneArray< + T extends ArrayRow = ArrayRow, + >( + params?: MysqlParameterType[] | undefined, + options?: MysqlQueryOptions | undefined, + ): Promise { + return this.#queriable.queryOneArray(this.sql, params, options); + } + queryManyArray< + T extends ArrayRow = ArrayRow, + >( + params?: MysqlParameterType[] | undefined, + options?: MysqlQueryOptions | undefined, + ): AsyncGenerator { + return this.#queriable.queryManyArray(this.sql, params, options); + } +} + +export class MysqlPreparable extends MysqlQueriable implements + SqlxPreparable< + MysqlConnectionOptions, + MysqlConnection, + MysqlParameterType, + MysqlQueryOptions, + MysqlPrepared + > { + prepare(sql: string, options?: MysqlQueryOptions | undefined): MysqlPrepared { + return new MysqlPrepared(this.connection, sql, options); + } +} + +export class MySqlTransaction extends MysqlPreparable + implements + SqlxTransactionQueriable< + MysqlConnectionOptions, + MysqlConnection, + MysqlParameterType, + MysqlQueryOptions, + MysqlTransactionOptions + > { + #inTransaction: boolean = true; + get inTransaction(): boolean { + return this.connected && this.#inTransaction; + } + + get connected(): boolean { + if (!this.#inTransaction) { + throw new MysqlTransactionError( + "Transaction is not active, create a new one using beginTransaction", + ); + } + + return super.connected; + } + + async commitTransaction( + options?: MysqlTransactionOptions["commitTransactionOptions"], + ): Promise { + try { + let sql = "COMMIT"; + + if (options?.chain === true) { + sql += " AND CHAIN"; + } else if (options?.chain === false) { + sql += " AND NO CHAIN"; + } + + if (options?.release === true) { + sql += " RELEASE"; + } else if (options?.release === false) { + sql += " NO RELEASE"; + } + await this.execute(sql); + } catch (e) { + this.#inTransaction = false; + throw e; + } + } + async rollbackTransaction( + options?: MysqlTransactionOptions["rollbackTransactionOptions"], + ): Promise { + try { + let sql = "ROLLBACK"; + + if (options?.savepoint) { + sql += ` TO ${options.savepoint}`; + await this.execute(sql); + return; + } + + if (options?.chain === true) { + sql += " AND CHAIN"; + } else if (options?.chain === false) { + sql += " AND NO CHAIN"; + } + + if (options?.release === true) { + sql += " RELEASE"; + } else if (options?.release === false) { + sql += " NO RELEASE"; + } + + await this.execute(sql); + } catch (e) { + this.#inTransaction = false; + throw e; + } + } + async createSavepoint(name: string = `\t_bm.\t`): Promise { + await this.execute(`SAVEPOINT ${name}`); + } + async releaseSavepoint(name: string = `\t_bm.\t`): Promise { + await this.execute(`RELEASE SAVEPOINT ${name}`); + } +} + +/** + * Represents a queriable class that can be used to run transactions. + */ +export class MysqlTransactionable extends MysqlPreparable + implements + SqlxTransactionable< + MysqlConnectionOptions, + MysqlConnection, + MysqlParameterType, + MysqlQueryOptions, + MysqlTransactionOptions, + MySqlTransaction + > { + async beginTransaction( + options?: MysqlTransactionOptions["beginTransactionOptions"], + ): Promise { + let sql = "START TRANSACTION"; + if (options?.withConsistentSnapshot) { + sql += ` WITH CONSISTENT SNAPSHOT`; + } + + if (options?.readWrite) { + sql += ` ${options.readWrite}`; + } + + await this.execute(sql); + + return new MySqlTransaction(this.connection, this.queryOptions); + } + + async transaction( + fn: (t: MySqlTransaction) => Promise, + options?: MysqlTransactionOptions, + ): Promise { + const transaction = await this.beginTransaction( + options?.beginTransactionOptions, + ); + + try { + const result = await fn(transaction); + await transaction.commitTransaction(options?.commitTransactionOptions); + return result; + } catch (error) { + await transaction.rollbackTransaction( + options?.rollbackTransactionOptions, + ); + throw error; + } + } +} diff --git a/src/buffer.ts b/lib/utils/buffer.ts similarity index 87% rename from src/buffer.ts rename to lib/utils/buffer.ts index 5c3e48b..d1ab5ed 100644 --- a/src/buffer.ts +++ b/lib/utils/buffer.ts @@ -1,17 +1,8 @@ -const encoder = new TextEncoder(); -const decoder = new TextDecoder(); +import { decode, encode } from "./encoding.ts"; -/** @ignore */ -export function encode(input: string) { - return encoder.encode(input); -} - -/** @ignore */ -export function decode(input: BufferSource) { - return decoder.decode(input); -} - -/** @ignore */ +/** + * Buffer reader utility class + */ export class BufferReader { private pos: number = 0; constructor(readonly buffer: Uint8Array) {} @@ -96,7 +87,9 @@ export class BufferReader { } } -/** @ignore */ +/** + * Buffer writer utility class + */ export class BufferWriter { private pos: number = 0; constructor(readonly buffer: Uint8Array) {} @@ -132,14 +125,6 @@ export class BufferWriter { return this; } - writeInt16LE(num: number) {} - - writeIntLE(num: number, len: number) { - const int = new Int32Array(1); - int[0] = 40; - console.log(int); - } - writeUint16(num: number): BufferWriter { return this.writeUints(2, num); } diff --git a/src/util.ts b/lib/utils/bytes.ts similarity index 100% rename from src/util.ts rename to lib/utils/bytes.ts diff --git a/src/auth_plugin/crypt.ts b/lib/utils/crypto.ts similarity index 78% rename from src/auth_plugin/crypt.ts rename to lib/utils/crypto.ts index 8eb2339..af4f947 100644 --- a/src/auth_plugin/crypt.ts +++ b/lib/utils/crypto.ts @@ -1,6 +1,6 @@ -import { base64Decode } from "../../deps.ts"; +import { decodeBase64 } from "@std/encoding/base64"; -async function encryptWithPublicKey( +export async function encryptWithPublicKey( key: string, data: Uint8Array, ): Promise { @@ -10,7 +10,7 @@ async function encryptWithPublicKey( key = key.substring(pemHeader.length, key.length - pemFooter.length); const importedKey = await crypto.subtle.importKey( "spki", - base64Decode(key), + decodeBase64(key), { name: "RSA-OAEP", hash: "SHA-256" }, false, ["encrypt"], @@ -24,5 +24,3 @@ async function encryptWithPublicKey( data, ); } - -export { encryptWithPublicKey }; diff --git a/lib/utils/encoding.ts b/lib/utils/encoding.ts new file mode 100644 index 0000000..c535c0a --- /dev/null +++ b/lib/utils/encoding.ts @@ -0,0 +1,16 @@ +const encoder = new TextEncoder(); +const decoder = new TextDecoder(); + +/** + * Shorthand for `new TextEncoder().encode(input)`. + */ +export function encode(input: string) { + return encoder.encode(input); +} + +/** + * Shorthand for `new TextDecoder().decode(input)`. + */ +export function decode(input: BufferSource) { + return decoder.decode(input); +} diff --git a/lib/utils/errors.ts b/lib/utils/errors.ts new file mode 100644 index 0000000..be73e89 --- /dev/null +++ b/lib/utils/errors.ts @@ -0,0 +1,50 @@ +import { isSqlxError, SqlxError } from "@halvardm/sqlx"; + +export class MysqlError extends SqlxError { + constructor(msg: string) { + super(msg); + } +} + +export class MysqlConnectionError extends MysqlError { + constructor(msg: string) { + super(msg); + } +} + +export class MysqlWriteError extends MysqlError { + constructor(msg: string) { + super(msg); + } +} + +export class MysqlReadError extends MysqlError { + constructor(msg: string) { + super(msg); + } +} + +export class MysqlResponseTimeoutError extends MysqlError { + constructor(msg: string) { + super(msg); + } +} + +export class MysqlProtocolError extends MysqlError { + constructor(msg: string) { + super(msg); + } +} + +export class MysqlTransactionError extends MysqlError { + constructor(msg: string) { + super(msg); + } +} + +/** + * Check if an error is a MysqlError + */ +export function isMysqlError(err: unknown): err is MysqlError { + return isSqlxError(err) && err instanceof MysqlError; +} diff --git a/lib/utils/events.ts b/lib/utils/events.ts new file mode 100644 index 0000000..ba89720 --- /dev/null +++ b/lib/utils/events.ts @@ -0,0 +1,68 @@ +import { + type SqlxClientEventType, + SqlxConnectableCloseEvent, + SqlxConnectableConnectEvent, + type SqlxConnectableEventInit, + SqlxEventTarget, + SqlxPoolConnectableAcquireEvent, + SqlxPoolConnectableReleaseEvent, + type SqlxPoolConnectionEventType, +} from "@halvardm/sqlx"; +import type { MysqlConnectionOptions } from "../connection.ts"; +import type { MysqlConnection } from "../connection.ts"; +import type { MysqlClient } from "../client.ts"; +import type { MysqlPoolClient } from "../pool.ts"; + +export class MysqlClientEventTarget extends SqlxEventTarget< + MysqlConnectionOptions, + MysqlConnection, + SqlxClientEventType, + MysqlClientEventInit, + MysqlClientEvents +> { +} +export class MysqlPoolClientEventTarget extends SqlxEventTarget< + MysqlConnectionOptions, + MysqlConnection, + SqlxPoolConnectionEventType, + MysqlPoolEventInit, + MysqlPoolEvents +> { +} + +export type MysqlClientEventInit = SqlxConnectableEventInit< + MysqlClient +>; + +export type MysqlPoolEventInit = SqlxConnectableEventInit< + MysqlPoolClient +>; + +export class MysqlClientConnectEvent + extends SqlxConnectableConnectEvent {} + +export class MysqlClientCloseEvent + extends SqlxConnectableCloseEvent {} +export class MysqlPoolConnectEvent + extends SqlxConnectableConnectEvent {} + +export class MysqlPoolCloseEvent + extends SqlxConnectableCloseEvent {} + +export class MysqlPoolAcquireEvent + extends SqlxPoolConnectableAcquireEvent { +} + +export class MysqlPoolReleaseEvent + extends SqlxPoolConnectableReleaseEvent { +} + +export type MysqlClientEvents = + | MysqlClientConnectEvent + | MysqlClientCloseEvent; + +export type MysqlPoolEvents = + | MysqlClientConnectEvent + | MysqlClientCloseEvent + | MysqlPoolAcquireEvent + | MysqlPoolReleaseEvent; diff --git a/lib/utils/hash.ts b/lib/utils/hash.ts new file mode 100644 index 0000000..b3f8aa5 --- /dev/null +++ b/lib/utils/hash.ts @@ -0,0 +1,53 @@ +import { crypto, type DigestAlgorithm } from "@std/crypto"; +import { xor } from "./bytes.ts"; +import { MysqlError } from "./errors.ts"; +import { encode } from "./encoding.ts"; + +async function hash( + algorithm: DigestAlgorithm, + data: Uint8Array, +): Promise { + return new Uint8Array(await crypto.subtle.digest(algorithm, data)); +} + +async function mysqlNativePassword( + password: string, + seed: Uint8Array, +): Promise { + const pwd1 = await hash("SHA-1", encode(password)); + const pwd2 = await hash("SHA-1", pwd1); + + let seedAndPwd2 = new Uint8Array(seed.length + pwd2.length); + seedAndPwd2.set(seed); + seedAndPwd2.set(pwd2, seed.length); + seedAndPwd2 = await hash("SHA-1", seedAndPwd2); + + return xor(seedAndPwd2, pwd1); +} + +async function cachingSha2Password( + password: string, + seed: Uint8Array, +): Promise { + const stage1 = await hash("SHA-256", encode(password)); + const stage2 = await hash("SHA-256", stage1); + const stage3 = await hash("SHA-256", Uint8Array.from([...stage2, ...seed])); + return xor(stage1, stage3); +} + +export default function auth( + authPluginName: string, + password: string, + seed: Uint8Array, +) { + switch (authPluginName) { + case "mysql_native_password": + // Native password authentication only need and will need 20-byte challenge. + return mysqlNativePassword(password, seed.slice(0, 20)); + + case "caching_sha2_password": + return cachingSha2Password(password, seed); + default: + throw new MysqlError("Not supported"); + } +} diff --git a/lib/utils/logger.ts b/lib/utils/logger.ts new file mode 100644 index 0000000..28830cd --- /dev/null +++ b/lib/utils/logger.ts @@ -0,0 +1,12 @@ +import { getLogger, type Logger } from "@std/log"; +import { MODULE_NAME } from "./meta.ts"; + +/** + * Used for internal module logging, + * do not import this directly outside of this module. + * + * @see {@link https://deno.land/std/log/mod.ts} + */ +export function logger(): Logger { + return getLogger(MODULE_NAME); +} diff --git a/lib/utils/meta.ts b/lib/utils/meta.ts new file mode 100644 index 0000000..4b71791 --- /dev/null +++ b/lib/utils/meta.ts @@ -0,0 +1,4 @@ +import meta from "../../deno.json" with { type: "json" }; + +export const MODULE_NAME = meta.name; +export const VERSION = meta.version; diff --git a/lib/utils/query.ts b/lib/utils/query.ts new file mode 100644 index 0000000..1d4e5fc --- /dev/null +++ b/lib/utils/query.ts @@ -0,0 +1,110 @@ +import type { MysqlParameterType } from "../packets/parsers/result.ts"; + +/** + * Replaces parameters in a SQL query with the given values. + * + * Taken from https://github.com/manyuanrong/sql-builder/blob/master/util.ts + */ +export function replaceParams( + sql: string, + params: MysqlParameterType[], +): string { + if (!params) return sql; + let paramIndex = 0; + sql = sql.replace( + /('[^'\\]*(?:\\.[^'\\]*)*')|("[^"\\]*(?:\\.[^"\\]*)*")|(\?\?)|(\?)/g, + (str) => { + if (paramIndex >= params.length) return str; + // ignore + if (/".*"/g.test(str) || /'.*'/g.test(str)) { + return str; + } + // identifier + if (str === "??") { + const val = params[paramIndex++]; + if (val instanceof Array) { + return `(${ + val.map((item) => replaceParams("??", [item])).join(",") + })`; + } else if (val === "*") { + return val; + } else if (typeof val === "string" && val.includes(".")) { + // a.b => `a`.`b` + const _arr = val.split("."); + return replaceParams(_arr.map(() => "??").join("."), _arr); + } else if ( + typeof val === "string" && + (val.includes(" as ") || val.includes(" AS ")) + ) { + // a as b => `a` AS `b` + const newVal = val.replace(" as ", " AS "); + const _arr = newVal.split(" AS "); + return replaceParams(_arr.map(() => "??").join(" AS "), _arr); + } else { + return ["`", val, "`"].join(""); + } + } + // value + const val = params[paramIndex++]; + if (val === null) return "NULL"; + switch (typeof val) { + // deno-lint-ignore no-fallthrough + case "object": + if (val instanceof Date) return `"${formatDate(val)}"`; + if ((val as unknown) instanceof Array) { + return `(${ + (val as Array).map((item) => replaceParams("?", [item])) + .join(",") + })`; + } + case "string": + return `"${escapeString(val as string)}"`; + case "undefined": + return "NULL"; + case "number": + case "boolean": + default: + return val.toString(); + } + }, + ); + return sql; +} + +/** + * Formats date to a 'YYYY-MM-DD HH:MM:SS.SSS' string. + */ +function formatDate(date: Date) { + date.toISOString(); + const year = date.getFullYear(); + const month = (date.getMonth() + 1).toString().padStart(2, "0"); + const days = date + .getDate() + .toString() + .padStart(2, "0"); + const hours = date + .getHours() + .toString() + .padStart(2, "0"); + const minutes = date + .getMinutes() + .toString() + .padStart(2, "0"); + const seconds = date + .getSeconds() + .toString() + .padStart(2, "0"); + // Date does not support microseconds precision, so we only keep the milliseconds part. + const milliseconds = date + .getMilliseconds() + .toString() + .padStart(3, "0"); + return `${year}-${month}-${days} ${hours}:${minutes}:${seconds}.${milliseconds}`; +} + +/** + * Escapes a string for use in a SQL query. + */ +function escapeString(str: string) { + return str.replaceAll("\\", "\\\\").replaceAll('"', '\\"'); +} diff --git a/lib/utils/testing.ts b/lib/utils/testing.ts new file mode 100644 index 0000000..013ceaf --- /dev/null +++ b/lib/utils/testing.ts @@ -0,0 +1,87 @@ +import { resolve } from "@std/path"; +import { ConsoleHandler, setup } from "@std/log"; +import { MODULE_NAME } from "./meta.ts"; +import { parse } from "@std/yaml"; +import type { BaseQueriableTestOptions } from "@halvardm/sqlx/testing"; + +type DockerCompose = { + services: { + [key: string]: { + image: string; + ports: string[]; + environment: Record; + volumes: string[]; + }; + }; +}; + +type ServiceParsed = { + name: string; + port: string; + database: string; + // socket: string; + url: string; + // urlSocket: string; +}; + +setup({ + handlers: { + console: new ConsoleHandler("DEBUG"), + }, + loggers: { + // configure default logger available via short-hand methods above + default: { + level: "WARN", + handlers: ["console"], + }, + [MODULE_NAME]: { + level: "WARN", + handlers: ["console"], + }, + }, +}); + +export const DIR_TMP_TEST = resolve(Deno.cwd(), "tmp_test"); + +const composeParsed = parse( + Deno.readTextFileSync(resolve(Deno.cwd(), "compose.yml")), + { "onWarning": console.warn }, +) as DockerCompose; + +export const services: ServiceParsed[] = Object.entries(composeParsed.services) + .map( + ([key, value]) => { + const port = value.ports[0].split(":")[0]; + const database = Object.entries(value.environment).find(([e]) => + e.includes("DATABASE") + )?.[1] as string; + // const socket = resolve(value.volumes[0].split(":")[0])+"/mysqld.sock"; + const url = `mysql://root@0.0.0.0:${port}/${database}`; + // const urlSocket = `${url}?socket=${socket}`; + return { + name: key, + port, + database, + // socket, + url, + // urlSocket, + }; + }, + ); + +export const URL_TEST_CONNECTION = services.find((s) => s.name === "mysql") + ?.url as string; + +export const QUERIES: BaseQueriableTestOptions["queries"] = { + createTable: "CREATE TABLE IF NOT EXISTS sqlxtesttable (testcol TEXT)", + dropTable: "DROP TABLE IF EXISTS sqlxtesttable", + insertOneToTable: "INSERT INTO sqlxtesttable (testcol) VALUES (?)", + insertManyToTable: "INSERT INTO sqlxtesttable (testcol) VALUES (?),(?),(?)", + selectOneFromTable: "SELECT * FROM sqlxtesttable WHERE testcol = ? LIMIT 1", + selectByMatchFromTable: "SELECT * FROM sqlxtesttable WHERE testcol = ?", + selectManyFromTable: "SELECT * FROM sqlxtesttable", + select1AsString: "SELECT '1' as result", + select1Plus1AsNumber: "SELECT 1+1 as result", + deleteByMatchFromTable: "DELETE FROM sqlxtesttable WHERE testcol = ?", + deleteAllFromTable: "DELETE FROM sqlxtesttable", +}; diff --git a/mod.ts b/mod.ts index 193240d..97922ec 100644 --- a/mod.ts +++ b/mod.ts @@ -1,12 +1,6 @@ -export type { ClientConfig } from "./src/client.ts"; -export { Client } from "./src/client.ts"; -export type { TLSConfig } from "./src/client.ts"; -export { TLSMode } from "./src/client.ts"; - -export type { ExecuteResult } from "./src/connection.ts"; -export { Connection } from "./src/connection.ts"; - -export type { LoggerConfig } from "./src/logger.ts"; -export { configLogger } from "./src/logger.ts"; - -export { log } from "./deps.ts"; +export * from "./lib/client.ts"; +export * from "./lib/connection.ts"; +export * from "./lib/pool.ts"; +export * from "./lib/utils/errors.ts"; +export * from "./lib/utils/events.ts"; +export * from "./lib/utils/meta.ts"; diff --git a/package.json b/package.json deleted file mode 100644 index 1a9fcac..0000000 --- a/package.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "name": "deno_mysql", - "version": "1.0.0", - "description": "[![Build Status](https://www.travis-ci.org/manyuanrong/deno_mysql.svg?branch=master)](https://www.travis-ci.org/manyuanrong/deno_mysql)", - "main": "index.js", - "scripts": { - "docs": "typedoc --theme minimal --ignoreCompilerErrors --excludePrivate --excludeExternals --entryPoint client.ts --mode file ./src --out ./docs" - }, - "repository": { - "type": "git", - "url": "git+https://github.com/manyuanrong/deno_mysql.git" - }, - "keywords": [], - "author": "", - "license": "ISC", - "bugs": { - "url": "https://github.com/manyuanrong/deno_mysql/issues" - }, - "homepage": "https://github.com/manyuanrong/deno_mysql#readme", - "devDependencies": { - "typedoc": "^0.14.2" - } -} diff --git a/src/auth.ts b/src/auth.ts deleted file mode 100644 index deafa1d..0000000 --- a/src/auth.ts +++ /dev/null @@ -1,43 +0,0 @@ -import { createHash, SupportedAlgorithm } from "../deps.ts"; -import { xor } from "./util.ts"; -import { encode } from "./buffer.ts"; - -function hash(algorithm: SupportedAlgorithm, data: Uint8Array): Uint8Array { - return new Uint8Array(createHash(algorithm).update(data).digest()); -} - -function mysqlNativePassword(password: string, seed: Uint8Array): Uint8Array { - const pwd1 = hash("sha1", encode(password)); - const pwd2 = hash("sha1", pwd1); - - let seedAndPwd2 = new Uint8Array(seed.length + pwd2.length); - seedAndPwd2.set(seed); - seedAndPwd2.set(pwd2, seed.length); - seedAndPwd2 = hash("sha1", seedAndPwd2); - - return xor(seedAndPwd2, pwd1); -} - -function cachingSha2Password(password: string, seed: Uint8Array): Uint8Array { - const stage1 = hash("sha256", encode(password)); - const stage2 = hash("sha256", stage1); - const stage3 = hash("sha256", Uint8Array.from([...stage2, ...seed])); - return xor(stage1, stage3); -} - -export default function auth( - authPluginName: string, - password: string, - seed: Uint8Array, -) { - switch (authPluginName) { - case "mysql_native_password": - // Native password authentication only need and will need 20-byte challenge. - return mysqlNativePassword(password, seed.slice(0, 20)); - - case "caching_sha2_password": - return cachingSha2Password(password, seed); - default: - throw new Error("Not supported"); - } -} diff --git a/src/auth_plugin/caching_sha2_password.ts b/src/auth_plugin/caching_sha2_password.ts deleted file mode 100644 index 1e8cbbe..0000000 --- a/src/auth_plugin/caching_sha2_password.ts +++ /dev/null @@ -1,78 +0,0 @@ -import { xor } from "../util.ts"; -import { ReceivePacket } from "../packets/packet.ts"; -import { encryptWithPublicKey } from "./crypt.ts"; - -interface handler { - done: boolean; - quickRead?: boolean; - next?: (packet: ReceivePacket) => any; - data?: Uint8Array; -} - -let scramble: Uint8Array, password: string; - -async function start( - scramble_: Uint8Array, - password_: string, -): Promise { - scramble = scramble_; - password = password_; - return { done: false, next: authMoreResponse }; -} - -async function authMoreResponse(packet: ReceivePacket): Promise { - const enum AuthStatusFlags { - FullAuth = 0x04, - FastPath = 0x03, - } - const REQUEST_PUBLIC_KEY = 0x02; - const statusFlag = packet.body.skip(1).readUint8(); - let authMoreData, done = true, next, quickRead = false; - if (statusFlag === AuthStatusFlags.FullAuth) { - authMoreData = new Uint8Array([REQUEST_PUBLIC_KEY]); - done = false; - next = encryptWithKey; - } - if (statusFlag === AuthStatusFlags.FastPath) { - done = false; - quickRead = true; - next = terminate; - } - return { done, next, quickRead, data: authMoreData }; -} - -async function encryptWithKey(packet: ReceivePacket): Promise { - const publicKey = parsePublicKey(packet); - const len = password.length; - const passwordBuffer: Uint8Array = new Uint8Array(len + 1); - for (let n = 0; n < len; n++) { - passwordBuffer[n] = password.charCodeAt(n); - } - passwordBuffer[len] = 0x00; - - const encryptedPassword = await encrypt(passwordBuffer, scramble, publicKey); - return { - done: false, - next: terminate, - data: new Uint8Array(encryptedPassword), - }; -} - -function parsePublicKey(packet: ReceivePacket): string { - return packet.body.skip(1).readNullTerminatedString(); -} - -async function encrypt( - password: Uint8Array, - scramble: Uint8Array, - key: string, -): Promise { - const stage1 = xor(password, scramble); - return await encryptWithPublicKey(key, stage1); -} - -function terminate() { - return { done: true }; -} - -export { start }; diff --git a/src/auth_plugin/index.ts b/src/auth_plugin/index.ts deleted file mode 100644 index 198e023..0000000 --- a/src/auth_plugin/index.ts +++ /dev/null @@ -1,4 +0,0 @@ -import * as caching_sha2_password from "./caching_sha2_password.ts"; -export default { - caching_sha2_password, -}; diff --git a/src/client.ts b/src/client.ts deleted file mode 100644 index 7d91489..0000000 --- a/src/client.ts +++ /dev/null @@ -1,165 +0,0 @@ -import { Connection, ConnectionState, ExecuteResult } from "./connection.ts"; -import { ConnectionPool, PoolConnection } from "./pool.ts"; -import { log } from "./logger.ts"; - -/** - * Client Config - */ -export interface ClientConfig { - /** Database hostname */ - hostname?: string; - /** Database UNIX domain socket path. When used, `hostname` and `port` are ignored. */ - socketPath?: string; - /** Database username */ - username?: string; - /** Database password */ - password?: string; - /** Database port */ - port?: number; - /** Database name */ - db?: string; - /** Whether to display packet debugging information */ - debug?: boolean; - /** Connection read timeout (default: 30 seconds) */ - timeout?: number; - /** Connection pool size (default: 1) */ - poolSize?: number; - /** Connection pool idle timeout in microseconds (default: 4 hours) */ - idleTimeout?: number; - /** charset */ - charset?: string; - /** tls config */ - tls?: TLSConfig; -} - -export enum TLSMode { - DISABLED = "disabled", - VERIFY_IDENTITY = "verify_identity", -} -/** - * TLS Config - */ -export interface TLSConfig { - /** mode of tls. only support disabled and verify_identity now*/ - mode?: TLSMode; - /** A list of root certificates (must be PEM format) that will be used in addition to the - * default root certificates to verify the peer's certificate. */ - caCerts?: string[]; -} - -/** Transaction processor */ -export interface TransactionProcessor { - (connection: Connection): Promise; -} - -/** - * MySQL client - */ -export class Client { - config: ClientConfig = {}; - private _pool?: ConnectionPool; - - private async createConnection(): Promise { - let connection = new PoolConnection(this.config); - await connection.connect(); - return connection; - } - - /** get pool info */ - get pool() { - return this._pool?.info; - } - - /** - * connect to database - * @param config config for client - * @returns Client instance - */ - async connect(config: ClientConfig): Promise { - this.config = { - hostname: "127.0.0.1", - username: "root", - port: 3306, - poolSize: 1, - timeout: 30 * 1000, - idleTimeout: 4 * 3600 * 1000, - ...config, - }; - Object.freeze(this.config); - this._pool = new ConnectionPool( - this.config.poolSize || 10, - this.createConnection.bind(this), - ); - return this; - } - - /** - * execute query sql - * @param sql query sql string - * @param params query params - */ - async query(sql: string, params?: any[]): Promise { - return await this.useConnection(async (connection) => { - return await connection.query(sql, params); - }); - } - - /** - * execute sql - * @param sql sql string - * @param params query params - */ - async execute(sql: string, params?: any[]): Promise { - return await this.useConnection(async (connection) => { - return await connection.execute(sql, params); - }); - } - - async useConnection(fn: (conn: Connection) => Promise) { - if (!this._pool) { - throw new Error("Unconnected"); - } - const connection = await this._pool.pop(); - try { - return await fn(connection); - } finally { - if (connection.state == ConnectionState.CLOSED) { - connection.removeFromPool(); - } else { - connection.returnToPool(); - } - } - } - - /** - * Execute a transaction process, and the transaction successfully - * returns the return value of the transaction process - * @param processor transation processor - */ - async transaction(processor: TransactionProcessor): Promise { - return await this.useConnection(async (connection) => { - try { - await connection.execute("BEGIN"); - const result = await processor(connection); - await connection.execute("COMMIT"); - return result; - } catch (error) { - if (connection.state == ConnectionState.CONNECTED) { - log.info(`ROLLBACK: ${error.message}`); - await connection.execute("ROLLBACK"); - } - throw error; - } - }); - } - - /** - * close connection - */ - async close() { - if (this._pool) { - this._pool.close(); - this._pool = undefined; - } - } -} diff --git a/src/connection.ts b/src/connection.ts deleted file mode 100644 index 3f2b23f..0000000 --- a/src/connection.ts +++ /dev/null @@ -1,388 +0,0 @@ -import { ClientConfig, TLSMode } from "./client.ts"; -import { - ConnnectionError, - ProtocolError, - ReadError, - ResponseTimeoutError, -} from "./constant/errors.ts"; -import { log } from "./logger.ts"; -import { buildAuth } from "./packets/builders/auth.ts"; -import { buildQuery } from "./packets/builders/query.ts"; -import { ReceivePacket, SendPacket } from "./packets/packet.ts"; -import { parseError } from "./packets/parsers/err.ts"; -import { - AuthResult, - parseAuth, - parseHandshake, -} from "./packets/parsers/handshake.ts"; -import { FieldInfo, parseField, parseRow } from "./packets/parsers/result.ts"; -import { PacketType } from "./constant/packet.ts"; -import authPlugin from "./auth_plugin/index.ts"; -import { parseAuthSwitch } from "./packets/parsers/authswitch.ts"; -import auth from "./auth.ts"; -import ServerCapabilities from "./constant/capabilities.ts"; -import { buildSSLRequest } from "./packets/builders/tls.ts"; - -/** - * Connection state - */ -export enum ConnectionState { - CONNECTING, - CONNECTED, - CLOSING, - CLOSED, -} - -/** - * Result for execute sql - */ -export type ExecuteResult = { - affectedRows?: number; - lastInsertId?: number; - fields?: FieldInfo[]; - rows?: any[]; - iterator?: any; -}; - -/** Connection for mysql */ -export class Connection { - state: ConnectionState = ConnectionState.CONNECTING; - capabilities: number = 0; - serverVersion: string = ""; - - private conn?: Deno.Conn = undefined; - private _timedOut = false; - - get remoteAddr(): string { - return this.config.socketPath - ? `unix:${this.config.socketPath}` - : `${this.config.hostname}:${this.config.port}`; - } - - constructor(readonly config: ClientConfig) {} - - private async _connect() { - // TODO: implement connect timeout - if ( - this.config.tls?.mode && - this.config.tls.mode !== TLSMode.DISABLED && - this.config.tls.mode !== TLSMode.VERIFY_IDENTITY - ) { - throw new Error("unsupported tls mode"); - } - const { hostname, port = 3306, socketPath, username = "", password } = - this.config; - log.info(`connecting ${this.remoteAddr}`); - this.conn = !socketPath - ? await Deno.connect({ - transport: "tcp", - hostname, - port, - }) - : await Deno.connect({ - transport: "unix", - path: socketPath, - } as any); - - try { - let receive = await this.nextPacket(); - const handshakePacket = parseHandshake(receive.body); - - let handshakeSequenceNumber = receive.header.no; - - // Deno.startTls() only supports VERIFY_IDENTITY now. - let isSSL = false; - if ( - this.config.tls?.mode === TLSMode.VERIFY_IDENTITY - ) { - if ( - (handshakePacket.serverCapabilities & - ServerCapabilities.CLIENT_SSL) === 0 - ) { - throw new Error("Server does not support TLS"); - } - if ( - (handshakePacket.serverCapabilities & - ServerCapabilities.CLIENT_SSL) !== 0 - ) { - const tlsData = buildSSLRequest(handshakePacket, { - db: this.config.db, - }); - await new SendPacket(tlsData, ++handshakeSequenceNumber).send( - this.conn, - ); - this.conn = await Deno.startTls(this.conn, { - hostname, - caCerts: this.config.tls?.caCerts, - }); - } - isSSL = true; - } - - const data = buildAuth(handshakePacket, { - username, - password, - db: this.config.db, - ssl: isSSL, - }); - - await new SendPacket(data, ++handshakeSequenceNumber).send(this.conn); - - this.state = ConnectionState.CONNECTING; - this.serverVersion = handshakePacket.serverVersion; - this.capabilities = handshakePacket.serverCapabilities; - - receive = await this.nextPacket(); - - const authResult = parseAuth(receive); - let handler; - - switch (authResult) { - case AuthResult.AuthMoreRequired: - const adaptedPlugin = - (authPlugin as any)[handshakePacket.authPluginName]; - handler = adaptedPlugin; - break; - case AuthResult.MethodMismatch: - const authSwitch = parseAuthSwitch(receive.body); - // If CLIENT_PLUGIN_AUTH capability is not supported, no new cipher is - // sent and we have to keep using the cipher sent in the init packet. - if ( - authSwitch.authPluginData === undefined || - authSwitch.authPluginData.length === 0 - ) { - authSwitch.authPluginData = handshakePacket.seed; - } - - let authData; - if (password) { - authData = auth( - authSwitch.authPluginName, - password, - authSwitch.authPluginData, - ); - } else { - authData = Uint8Array.from([]); - } - - await new SendPacket(authData, receive.header.no + 1).send(this.conn); - - receive = await this.nextPacket(); - const authSwitch2 = parseAuthSwitch(receive.body); - if (authSwitch2.authPluginName !== "") { - throw new Error( - "Do not allow to change the auth plugin more than once!", - ); - } - } - - let result; - if (handler) { - result = await handler.start(handshakePacket.seed, password!); - while (!result.done) { - if (result.data) { - const sequenceNumber = receive.header.no + 1; - await new SendPacket(result.data, sequenceNumber).send(this.conn); - receive = await this.nextPacket(); - } - if (result.quickRead) { - await this.nextPacket(); - } - if (result.next) { - result = await result.next(receive); - } - } - } - - const header = receive.body.readUint8(); - if (header === 0xff) { - const error = parseError(receive.body, this); - log.error(`connect error(${error.code}): ${error.message}`); - this.close(); - throw new Error(error.message); - } else { - log.info(`connected to ${this.remoteAddr}`); - this.state = ConnectionState.CONNECTED; - } - - if (this.config.charset) { - await this.execute(`SET NAMES ${this.config.charset}`); - } - } catch (error) { - // Call close() to avoid leaking socket. - this.close(); - throw error; - } - } - - /** Connect to database */ - async connect(): Promise { - await this._connect(); - } - - private async nextPacket(): Promise { - if (!this.conn) { - throw new ConnnectionError("Not connected"); - } - - const timeoutTimer = this.config.timeout - ? setTimeout( - this._timeoutCallback, - this.config.timeout, - ) - : null; - let packet: ReceivePacket | null; - try { - packet = await new ReceivePacket().parse(this.conn!); - } catch (error) { - if (this._timedOut) { - // Connection has been closed by timeoutCallback. - throw new ResponseTimeoutError("Connection read timed out"); - } - timeoutTimer && clearTimeout(timeoutTimer); - this.close(); - throw error; - } - timeoutTimer && clearTimeout(timeoutTimer); - - if (!packet) { - // Connection is half-closed by the remote host. - // Call close() to avoid leaking socket. - this.close(); - throw new ReadError("Connection closed unexpectedly"); - } - if (packet.type === PacketType.ERR_Packet) { - packet.body.skip(1); - const error = parseError(packet.body, this); - throw new Error(error.message); - } - return packet!; - } - - private _timeoutCallback = () => { - log.info("connection read timed out"); - this._timedOut = true; - this.close(); - }; - - /** Close database connection */ - close(): void { - if (this.state != ConnectionState.CLOSED) { - log.info("close connection"); - this.conn?.close(); - this.state = ConnectionState.CLOSED; - } - } - - /** - * excute query sql - * @param sql query sql string - * @param params query params - */ - async query(sql: string, params?: any[]): Promise { - const result = await this.execute(sql, params); - if (result && result.rows) { - return result.rows; - } else { - return result; - } - } - - /** - * execute sql - * @param sql sql string - * @param params query params - * @param iterator whether to return an ExecuteIteratorResult or ExecuteResult - */ - async execute( - sql: string, - params?: any[], - iterator = false, - ): Promise { - if (this.state != ConnectionState.CONNECTED) { - if (this.state == ConnectionState.CLOSED) { - throw new ConnnectionError("Connection is closed"); - } else { - throw new ConnnectionError("Must be connected first"); - } - } - const data = buildQuery(sql, params); - try { - await new SendPacket(data, 0).send(this.conn!); - let receive = await this.nextPacket(); - if (receive.type === PacketType.OK_Packet) { - receive.body.skip(1); - return { - affectedRows: receive.body.readEncodedLen(), - lastInsertId: receive.body.readEncodedLen(), - }; - } else if (receive.type !== PacketType.Result) { - throw new ProtocolError(); - } - let fieldCount = receive.body.readEncodedLen(); - const fields: FieldInfo[] = []; - while (fieldCount--) { - const packet = await this.nextPacket(); - if (packet) { - const field = parseField(packet.body); - fields.push(field); - } - } - - const rows = []; - if (!(this.capabilities & ServerCapabilities.CLIENT_DEPRECATE_EOF)) { - // EOF(mysql < 5.7 or mariadb < 10.2) - receive = await this.nextPacket(); - if (receive.type !== PacketType.EOF_Packet) { - throw new ProtocolError(); - } - } - - if (!iterator) { - while (true) { - receive = await this.nextPacket(); - if (receive.type === PacketType.EOF_Packet) { - break; - } else { - const row = parseRow(receive.body, fields); - rows.push(row); - } - } - return { rows, fields }; - } - - return { - fields, - iterator: this.buildIterator(fields), - }; - } catch (error) { - this.close(); - throw error; - } - } - - private buildIterator(fields: FieldInfo[]): any { - const next = async () => { - const receive = await this.nextPacket(); - - if (receive.type === PacketType.EOF_Packet) { - return { done: true }; - } - - const value = parseRow(receive.body, fields); - - return { - done: false, - value, - }; - }; - - return { - [Symbol.asyncIterator]: () => { - return { - next, - }; - }, - }; - } -} diff --git a/src/constant/errors.ts b/src/constant/errors.ts deleted file mode 100644 index bd79fdb..0000000 --- a/src/constant/errors.ts +++ /dev/null @@ -1,29 +0,0 @@ -export class ConnnectionError extends Error { - constructor(msg?: string) { - super(msg); - } -} - -export class WriteError extends ConnnectionError { - constructor(msg?: string) { - super(msg); - } -} - -export class ReadError extends ConnnectionError { - constructor(msg?: string) { - super(msg); - } -} - -export class ResponseTimeoutError extends ConnnectionError { - constructor(msg?: string) { - super(msg); - } -} - -export class ProtocolError extends ConnnectionError { - constructor(msg?: string) { - super(msg); - } -} diff --git a/src/constant/mysql_types.ts b/src/constant/mysql_types.ts deleted file mode 100644 index dd6a62c..0000000 --- a/src/constant/mysql_types.ts +++ /dev/null @@ -1,60 +0,0 @@ -/** @ignore */ -export const MYSQL_TYPE_DECIMAL = 0x00; -/** @ignore */ -export const MYSQL_TYPE_TINY = 0x01; -/** @ignore */ -export const MYSQL_TYPE_SHORT = 0x02; -/** @ignore */ -export const MYSQL_TYPE_LONG = 0x03; -/** @ignore */ -export const MYSQL_TYPE_FLOAT = 0x04; -/** @ignore */ -export const MYSQL_TYPE_DOUBLE = 0x05; -/** @ignore */ -export const MYSQL_TYPE_NULL = 0x06; -/** @ignore */ -export const MYSQL_TYPE_TIMESTAMP = 0x07; -/** @ignore */ -export const MYSQL_TYPE_LONGLONG = 0x08; -/** @ignore */ -export const MYSQL_TYPE_INT24 = 0x09; -/** @ignore */ -export const MYSQL_TYPE_DATE = 0x0a; -/** @ignore */ -export const MYSQL_TYPE_TIME = 0x0b; -/** @ignore */ -export const MYSQL_TYPE_DATETIME = 0x0c; -/** @ignore */ -export const MYSQL_TYPE_YEAR = 0x0d; -/** @ignore */ -export const MYSQL_TYPE_NEWDATE = 0x0e; -/** @ignore */ -export const MYSQL_TYPE_VARCHAR = 0x0f; -/** @ignore */ -export const MYSQL_TYPE_BIT = 0x10; -/** @ignore */ -export const MYSQL_TYPE_TIMESTAMP2 = 0x11; -/** @ignore */ -export const MYSQL_TYPE_DATETIME2 = 0x12; -/** @ignore */ -export const MYSQL_TYPE_TIME2 = 0x13; -/** @ignore */ -export const MYSQL_TYPE_NEWDECIMAL = 0xf6; -/** @ignore */ -export const MYSQL_TYPE_ENUM = 0xf7; -/** @ignore */ -export const MYSQL_TYPE_SET = 0xf8; -/** @ignore */ -export const MYSQL_TYPE_TINY_BLOB = 0xf9; -/** @ignore */ -export const MYSQL_TYPE_MEDIUM_BLOB = 0xfa; -/** @ignore */ -export const MYSQL_TYPE_LONG_BLOB = 0xfb; -/** @ignore */ -export const MYSQL_TYPE_BLOB = 0xfc; -/** @ignore */ -export const MYSQL_TYPE_VAR_STRING = 0xfd; -/** @ignore */ -export const MYSQL_TYPE_STRING = 0xfe; -/** @ignore */ -export const MYSQL_TYPE_GEOMETRY = 0xff; diff --git a/src/deferred.ts b/src/deferred.ts deleted file mode 100644 index 0b3e95b..0000000 --- a/src/deferred.ts +++ /dev/null @@ -1,73 +0,0 @@ -import { Deferred, deferred } from "../deps.ts"; - -/** @ignore */ -export class DeferredStack { - private _queue: Deferred[] = []; - private _size = 0; - - constructor( - readonly _maxSize: number, - private _array: T[] = [], - private readonly creator: () => Promise, - ) { - this._size = _array.length; - } - - get size(): number { - return this._size; - } - - get maxSize(): number { - return this._maxSize; - } - - get available(): number { - return this._array.length; - } - - async pop(): Promise { - if (this._array.length) { - return this._array.pop()!; - } else if (this._size < this._maxSize) { - this._size++; - let item: T; - try { - item = await this.creator(); - } catch (err) { - this._size--; - throw err; - } - return item; - } - const defer = deferred(); - this._queue.push(defer); - return await defer; - } - - /** Returns false if the item is consumed by a deferred pop */ - push(item: T): boolean { - if (this._queue.length) { - this._queue.shift()!.resolve(item); - return false; - } else { - this._array.push(item); - return true; - } - } - - tryPopAvailable() { - return this._array.pop(); - } - - remove(item: T): boolean { - const index = this._array.indexOf(item); - if (index < 0) return false; - this._array.splice(index, 1); - this._size--; - return true; - } - - reduceSize() { - this._size--; - } -} diff --git a/src/logger.ts b/src/logger.ts deleted file mode 100644 index dad062a..0000000 --- a/src/logger.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { log } from "../deps.ts"; - -let logger = log.getLogger(); - -export { logger as log }; - -let isDebug = false; - -/** @ignore */ -export function debug(func: Function) { - if (isDebug) { - func(); - } -} - -export interface LoggerConfig { - /** Enable logging (default: true) */ - enable?: boolean; - /** The minimal level to print (default: "INFO") */ - level?: log.LevelName; - /** A deno_std/log.Logger instance to be used as logger. When used, `level` is ignored. */ - logger?: log.Logger; -} - -export async function configLogger(config: LoggerConfig) { - let { enable = true, level = "INFO" } = config; - if (config.logger) level = config.logger.levelName; - isDebug = level == "DEBUG"; - - if (!enable) { - logger = new log.Logger("fakeLogger", "NOTSET", {}); - logger.level = 0; - } else { - if (!config.logger) { - await log.setup({ - handlers: { - console: new log.handlers.ConsoleHandler(level), - }, - loggers: { - default: { - level: "DEBUG", - handlers: ["console"], - }, - }, - }); - logger = log.getLogger(); - } else { - logger = config.logger; - } - } -} diff --git a/src/packets/builders/query.ts b/src/packets/builders/query.ts deleted file mode 100644 index 8882f06..0000000 --- a/src/packets/builders/query.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { replaceParams } from "../../../deps.ts"; -import { BufferWriter, encode } from "../../buffer.ts"; - -/** @ignore */ -export function buildQuery(sql: string, params: any[] = []): Uint8Array { - const data = encode(replaceParams(sql, params)); - const writer = new BufferWriter(new Uint8Array(data.length + 1)); - writer.write(0x03); - writer.writeBuffer(data); - return writer.buffer; -} diff --git a/src/packets/packet.ts b/src/packets/packet.ts deleted file mode 100644 index d58c41c..0000000 --- a/src/packets/packet.ts +++ /dev/null @@ -1,106 +0,0 @@ -import { byteFormat } from "../../deps.ts"; -import { BufferReader, BufferWriter } from "../buffer.ts"; -import { WriteError } from "../constant/errors.ts"; -import { debug, log } from "../logger.ts"; -import { PacketType } from "../../src/constant/packet.ts"; - -/** @ignore */ -interface PacketHeader { - size: number; - no: number; -} - -/** @ignore */ -export class SendPacket { - header: PacketHeader; - - constructor(readonly body: Uint8Array, no: number) { - this.header = { size: body.length, no }; - } - - async send(conn: Deno.Conn) { - const body = this.body as Uint8Array; - const data = new BufferWriter(new Uint8Array(4 + body.length)); - data.writeUints(3, this.header.size); - data.write(this.header.no); - data.writeBuffer(body); - debug(() => { - log.debug(`send: ${data.length}B \n${byteFormat(data.buffer)}\n`); - }); - try { - let wrote = 0; - do { - wrote += await conn.write(data.buffer.subarray(wrote)); - } while (wrote < data.length); - } catch (error) { - throw new WriteError(error.message); - } - } -} - -/** @ignore */ -export class ReceivePacket { - header!: PacketHeader; - body!: BufferReader; - type!: PacketType; - - async parse(reader: Deno.Reader): Promise { - const header = new BufferReader(new Uint8Array(4)); - let readCount = 0; - let nread = await this.read(reader, header.buffer); - if (nread === null) return null; - readCount = nread; - const bodySize = header.readUints(3); - this.header = { - size: bodySize, - no: header.readUint8(), - }; - this.body = new BufferReader(new Uint8Array(bodySize)); - nread = await this.read(reader, this.body.buffer); - if (nread === null) return null; - readCount += nread; - - const { OK_Packet, ERR_Packet, EOF_Packet, Result } = PacketType; - switch (this.body.buffer[0]) { - case OK_Packet: - this.type = OK_Packet; - break; - case 0xff: - this.type = ERR_Packet; - break; - case 0xfe: - this.type = EOF_Packet; - break; - default: - this.type = Result; - break; - } - - debug(() => { - const data = new Uint8Array(readCount); - data.set(header.buffer); - data.set(this.body.buffer, 4); - log.debug( - `receive: ${readCount}B, size = ${this.header.size}, no = ${this.header.no} \n${ - byteFormat(data) - }\n`, - ); - }); - - return this; - } - - private async read( - reader: Deno.Reader, - buffer: Uint8Array, - ): Promise { - const size = buffer.length; - let haveRead = 0; - while (haveRead < size) { - const nread = await reader.read(buffer.subarray(haveRead)); - if (nread === null) return null; - haveRead += nread; - } - return haveRead; - } -} diff --git a/src/packets/parsers/result.ts b/src/packets/parsers/result.ts deleted file mode 100644 index 83adab6..0000000 --- a/src/packets/parsers/result.ts +++ /dev/null @@ -1,125 +0,0 @@ -import type { BufferReader } from "../../buffer.ts"; -import { - MYSQL_TYPE_DATE, - MYSQL_TYPE_DATETIME, - MYSQL_TYPE_DATETIME2, - MYSQL_TYPE_DECIMAL, - MYSQL_TYPE_DOUBLE, - MYSQL_TYPE_FLOAT, - MYSQL_TYPE_INT24, - MYSQL_TYPE_LONG, - MYSQL_TYPE_LONGLONG, - MYSQL_TYPE_NEWDATE, - MYSQL_TYPE_NEWDECIMAL, - MYSQL_TYPE_SHORT, - MYSQL_TYPE_STRING, - MYSQL_TYPE_TIME, - MYSQL_TYPE_TIME2, - MYSQL_TYPE_TIMESTAMP, - MYSQL_TYPE_TIMESTAMP2, - MYSQL_TYPE_TINY, - MYSQL_TYPE_VAR_STRING, - MYSQL_TYPE_VARCHAR, -} from "../../constant/mysql_types.ts"; - -/** @ignore */ -export interface FieldInfo { - catalog: string; - schema: string; - table: string; - originTable: string; - name: string; - originName: string; - encoding: number; - fieldLen: number; - fieldType: number; - fieldFlag: number; - decimals: number; - defaultVal: string; -} - -/** @ignore */ -export function parseField(reader: BufferReader): FieldInfo { - const catalog = reader.readLenCodeString()!; - const schema = reader.readLenCodeString()!; - const table = reader.readLenCodeString()!; - const originTable = reader.readLenCodeString()!; - const name = reader.readLenCodeString()!; - const originName = reader.readLenCodeString()!; - reader.skip(1); - const encoding = reader.readUint16()!; - const fieldLen = reader.readUint32()!; - const fieldType = reader.readUint8()!; - const fieldFlag = reader.readUint16()!; - const decimals = reader.readUint8()!; - reader.skip(1); - const defaultVal = reader.readLenCodeString()!; - return { - catalog, - schema, - table, - originName, - fieldFlag, - originTable, - fieldLen, - name, - fieldType, - encoding, - decimals, - defaultVal, - }; -} - -/** @ignore */ -export function parseRow(reader: BufferReader, fields: FieldInfo[]): any { - const row: any = {}; - for (const field of fields) { - const name = field.name; - const val = reader.readLenCodeString(); - row[name] = val === null ? null : convertType(field, val); - } - return row; -} - -/** @ignore */ -function convertType(field: FieldInfo, val: string): any { - const { fieldType, fieldLen } = field; - switch (fieldType) { - case MYSQL_TYPE_DECIMAL: - case MYSQL_TYPE_DOUBLE: - case MYSQL_TYPE_FLOAT: - case MYSQL_TYPE_DATETIME2: - return parseFloat(val); - case MYSQL_TYPE_NEWDECIMAL: - return val; // #42 MySQL's decimal type cannot be accurately represented by the Number. - case MYSQL_TYPE_TINY: - case MYSQL_TYPE_SHORT: - case MYSQL_TYPE_LONG: - case MYSQL_TYPE_INT24: - return parseInt(val); - case MYSQL_TYPE_LONGLONG: - if ( - Number(val) < Number.MIN_SAFE_INTEGER || - Number(val) > Number.MAX_SAFE_INTEGER - ) { - return BigInt(val); - } else { - return parseInt(val); - } - case MYSQL_TYPE_VARCHAR: - case MYSQL_TYPE_VAR_STRING: - case MYSQL_TYPE_STRING: - case MYSQL_TYPE_TIME: - case MYSQL_TYPE_TIME2: - return val; - case MYSQL_TYPE_DATE: - case MYSQL_TYPE_TIMESTAMP: - case MYSQL_TYPE_DATETIME: - case MYSQL_TYPE_NEWDATE: - case MYSQL_TYPE_TIMESTAMP2: - case MYSQL_TYPE_DATETIME2: - return new Date(val); - default: - return val; - } -} diff --git a/src/pool.ts b/src/pool.ts deleted file mode 100644 index f1de757..0000000 --- a/src/pool.ts +++ /dev/null @@ -1,128 +0,0 @@ -import { DeferredStack } from "./deferred.ts"; -import { Connection } from "./connection.ts"; -import { log } from "./logger.ts"; - -/** @ignore */ -export class PoolConnection extends Connection { - _pool?: ConnectionPool = undefined; - - private _idleTimer?: number = undefined; - private _idle = false; - - /** - * Should be called by the pool. - */ - enterIdle() { - this._idle = true; - if (this.config.idleTimeout) { - this._idleTimer = setTimeout(() => { - log.info("connection idle timeout"); - this._pool!.remove(this); - try { - this.close(); - } catch (error) { - log.warning(`error closing idle connection`, error); - } - }, this.config.idleTimeout); - try { - // Don't block the event loop from finishing - Deno.unrefTimer(this._idleTimer); - } catch (_error) { - // unrefTimer() is unstable API in older version of Deno - } - } - } - - /** - * Should be called by the pool. - */ - exitIdle() { - this._idle = false; - if (this._idleTimer !== undefined) { - clearTimeout(this._idleTimer); - } - } - - /** - * Remove the connection from the pool permanently, when the connection is not usable. - */ - removeFromPool() { - this._pool!.reduceSize(); - this._pool = undefined; - } - - returnToPool() { - this._pool?.push(this); - } -} - -/** @ignore */ -export class ConnectionPool { - _deferred: DeferredStack; - _connections: PoolConnection[] = []; - _closed: boolean = false; - - constructor(maxSize: number, creator: () => Promise) { - this._deferred = new DeferredStack(maxSize, this._connections, async () => { - const conn = await creator(); - conn._pool = this; - return conn; - }); - } - - get info() { - return { - size: this._deferred.size, - maxSize: this._deferred.maxSize, - available: this._deferred.available, - }; - } - - push(conn: PoolConnection) { - if (this._closed) { - conn.close(); - this.reduceSize(); - } - if (this._deferred.push(conn)) { - conn.enterIdle(); - } - } - - async pop(): Promise { - if (this._closed) { - throw new Error("Connection pool is closed"); - } - let conn = this._deferred.tryPopAvailable(); - if (conn) { - conn.exitIdle(); - } else { - conn = await this._deferred.pop(); - } - return conn; - } - - remove(conn: PoolConnection) { - return this._deferred.remove(conn); - } - - /** - * Close the pool and all connections in the pool. - * - * After closing, pop() will throw an error, - * push() will close the connection immediately. - */ - close() { - this._closed = true; - - let conn: PoolConnection | undefined; - while (conn = this._deferred.tryPopAvailable()) { - conn.exitIdle(); - conn.close(); - this.reduceSize(); - } - } - - reduceSize() { - this._deferred.reduceSize(); - } -} diff --git a/test.deps.ts b/test.deps.ts deleted file mode 100644 index c48f9b8..0000000 --- a/test.deps.ts +++ /dev/null @@ -1,6 +0,0 @@ -export { - assertEquals, - assertThrowsAsync, -} from "https://deno.land/std@0.104.0/testing/asserts.ts"; -export * as semver from "https://deno.land/x/semver@v1.4.0/mod.ts"; -export { parse } from "https://deno.land/std@0.104.0/flags/mod.ts"; diff --git a/test.ts b/test.ts deleted file mode 100644 index e030e8c..0000000 --- a/test.ts +++ /dev/null @@ -1,390 +0,0 @@ -import { assertEquals, assertThrowsAsync, semver } from "./test.deps.ts"; -import { - ConnnectionError, - ResponseTimeoutError, -} from "./src/constant/errors.ts"; -import { - createTestDB, - delay, - isMariaDB, - registerTests, - testWithClient, -} from "./test.util.ts"; -import { log as stdlog } from "./deps.ts"; -import { log } from "./src/logger.ts"; -import { configLogger } from "./mod.ts"; - -testWithClient(async function testCreateDb(client) { - await client.query(`CREATE DATABASE IF NOT EXISTS enok`); -}); - -testWithClient(async function testCreateTable(client) { - await client.query(`DROP TABLE IF EXISTS users`); - await client.query(` - CREATE TABLE users ( - id int(11) NOT NULL AUTO_INCREMENT, - name varchar(100) NOT NULL, - is_top tinyint(1) default 0, - created_at timestamp not null default current_timestamp, - PRIMARY KEY (id) - ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; - `); -}); - -testWithClient(async function testInsert(client) { - let result = await client.execute(`INSERT INTO users(name) values(?)`, [ - "manyuanrong", - ]); - assertEquals(result, { affectedRows: 1, lastInsertId: 1 }); - result = await client.execute(`INSERT INTO users ?? values ?`, [ - ["id", "name"], - [2, "MySQL"], - ]); - assertEquals(result, { affectedRows: 1, lastInsertId: 2 }); -}); - -testWithClient(async function testUpdate(client) { - let result = await client.execute( - `update users set ?? = ?, ?? = ? WHERE id = ?`, - ["name", "MYR🦕", "created_at", new Date(), 1], - ); - assertEquals(result, { affectedRows: 1, lastInsertId: 0 }); -}); - -testWithClient(async function testQuery(client) { - let result = await client.query( - "select ??,`is_top`,`name` from ?? where id = ?", - ["id", "users", 1], - ); - assertEquals(result, [{ id: 1, name: "MYR🦕", is_top: 0 }]); -}); - -testWithClient(async function testQueryErrorOccurred(client) { - assertEquals(client.pool, { - size: 0, - maxSize: client.config.poolSize, - available: 0, - }); - await assertThrowsAsync( - () => client.query("select unknownfield from `users`"), - Error, - ); - await client.query("select 1"); - assertEquals(client.pool, { - size: 1, - maxSize: client.config.poolSize, - available: 1, - }); -}); - -testWithClient(async function testQueryList(client) { - const sql = "select ??,?? from ??"; - let result = await client.query(sql, ["id", "name", "users"]); - assertEquals(result, [ - { id: 1, name: "MYR🦕" }, - { id: 2, name: "MySQL" }, - ]); -}); - -testWithClient(async function testQueryTime(client) { - const sql = `SELECT CAST("09:04:10" AS time) as time`; - let result = await client.query(sql); - assertEquals(result, [{ time: "09:04:10" }]); -}); - -testWithClient(async function testQueryBigint(client) { - await client.query(`DROP TABLE IF EXISTS test_bigint`); - await client.query(`CREATE TABLE test_bigint ( - id int(11) NOT NULL AUTO_INCREMENT, - bigint_column bigint NOT NULL, - PRIMARY KEY (id) - ) ENGINE=MEMORY DEFAULT CHARSET=utf8mb4`); - - const value = "9223372036854775807"; - await client.execute( - "INSERT INTO test_bigint(bigint_column) VALUES (?)", - [value], - ); - - const result = await client.query("SELECT bigint_column FROM test_bigint"); - assertEquals(result, [{ bigint_column: BigInt(value) }]); -}); - -testWithClient(async function testQueryDecimal(client) { - await client.query(`DROP TABLE IF EXISTS test_decimal`); - await client.query(`CREATE TABLE test_decimal ( - id int(11) NOT NULL AUTO_INCREMENT, - decimal_column decimal(65,30) NOT NULL, - PRIMARY KEY (id) - ) ENGINE=MEMORY DEFAULT CHARSET=utf8mb4`); - - const value = "0.012345678901234567890123456789"; - await client.execute( - "INSERT INTO test_decimal(decimal_column) VALUES (?)", - [value], - ); - - const result = await client.query("SELECT decimal_column FROM test_decimal"); - assertEquals(result, [{ decimal_column: value }]); -}); - -testWithClient(async function testQueryDatetime(client) { - await client.useConnection(async (connection) => { - if (isMariaDB(connection) || semver.lt(connection.serverVersion, "5.6.0")) { - return; - } - - await client.query(`DROP TABLE IF EXISTS test_datetime`); - await client.query(`CREATE TABLE test_datetime ( - id int(11) NOT NULL AUTO_INCREMENT, - datetime datetime(6) NOT NULL, - PRIMARY KEY (id) - ) ENGINE=MEMORY DEFAULT CHARSET=utf8mb4`); - const datetime = new Date(); - await client.execute( - ` - INSERT INTO test_datetime (datetime) - VALUES (?)`, - [datetime], - ); - - const [row] = await client.query("SELECT datetime FROM test_datetime"); - assertEquals(row.datetime.toISOString(), datetime.toISOString()); // See https://github.com/denoland/deno/issues/6643 - }); -}); - -testWithClient(async function testDelete(client) { - let result = await client.execute(`delete from users where ?? = ?`, [ - "id", - 1, - ]); - assertEquals(result, { affectedRows: 1, lastInsertId: 0 }); -}); - -testWithClient(async function testPool(client) { - assertEquals(client.pool, { - maxSize: client.config.poolSize, - available: 0, - size: 0, - }); - const expect = new Array(10).fill([{ "1": 1 }]); - const result = await Promise.all(expect.map(() => client.query(`select 1`))); - - assertEquals(client.pool, { - maxSize: client.config.poolSize, - available: 3, - size: 3, - }); - assertEquals(result, expect); -}); - -testWithClient(async function testQueryOnClosed(client) { - for (const i of [0, 0, 0]) { - await assertThrowsAsync(async () => { - await client.transaction(async (conn) => { - conn.close(); - await conn.query("SELECT 1"); - }); - }, ConnnectionError); - } - assertEquals(client.pool?.size, 0); - await client.query("select 1"); -}); - -testWithClient(async function testTransactionSuccess(client) { - const success = await client.transaction(async (connection) => { - await connection.execute("insert into users(name) values(?)", [ - "transaction1", - ]); - await connection.execute("delete from users where id = ?", [2]); - return true; - }); - assertEquals(true, success); - const result = await client.query("select name,id from users"); - assertEquals([{ name: "transaction1", id: 3 }], result); -}); - -testWithClient(async function testTransactionRollback(client) { - let success; - await assertThrowsAsync(async () => { - success = await client.transaction(async (connection) => { - // Insert an existing id - await connection.execute("insert into users(name,id) values(?,?)", [ - "transaction2", - 3, - ]); - return true; - }); - }); - assertEquals(undefined, success); - const result = await client.query("select name from users"); - assertEquals([{ name: "transaction1" }], result); -}); - -testWithClient(async function testIdleTimeout(client) { - assertEquals(client.pool, { - maxSize: 3, - available: 0, - size: 0, - }); - await Promise.all(new Array(10).fill(0).map(() => client.query("select 1"))); - assertEquals(client.pool, { - maxSize: 3, - available: 3, - size: 3, - }); - await delay(500); - assertEquals(client.pool, { - maxSize: 3, - available: 3, - size: 3, - }); - await client.query("select 1"); - await delay(500); - assertEquals(client.pool, { - maxSize: 3, - available: 1, - size: 1, - }); - await delay(500); - assertEquals(client.pool, { - maxSize: 3, - available: 0, - size: 0, - }); -}, { - idleTimeout: 750, -}); - -testWithClient(async function testReadTimeout(client) { - await client.execute("select sleep(0.3)"); - - await assertThrowsAsync(async () => { - await client.execute("select sleep(0.7)"); - }, ResponseTimeoutError); - - assertEquals(client.pool, { - maxSize: 3, - available: 0, - size: 0, - }); -}, { - timeout: 500, -}); - -testWithClient(async function testLargeQueryAndResponse(client) { - function buildLargeString(len: number) { - let str = ""; - for (let i = 0; i < len; i++) { - str += i % 10; - } - return str; - } - const largeString = buildLargeString(512 * 1024); - assertEquals( - await client.query(`select "${largeString}" as str`), - [{ str: largeString }], - ); -}); - -testWithClient(async function testExecuteIterator(client) { - await client.useConnection(async (conn) => { - await conn.execute(`DROP TABLE IF EXISTS numbers`); - await conn.execute(`CREATE TABLE numbers (num INT NOT NULL)`); - await conn.execute( - `INSERT INTO numbers (num) VALUES ${ - new Array(64).fill(0).map((v, idx) => `(${idx})`).join(",") - }`, - ); - const r = await conn.execute(`SELECT num FROM numbers`, [], true); - let count = 0; - for await (const row of r.iterator) { - assertEquals(row.num, count); - count++; - } - assertEquals(count, 64); - }); -}); - -// For MySQL 8, the default auth plugin is `caching_sha2_password`. Create user -// using `mysql_native_password` to test Authentication Method Mismatch. -testWithClient(async function testCreateUserWithMysqlNativePassword(client) { - const { version } = (await client.query(`SELECT VERSION() as version`))[0]; - if (version.startsWith("8.")) { - // MySQL 8 does not have `PASSWORD()` function - await client.execute( - `CREATE USER 'testuser'@'%' IDENTIFIED WITH mysql_native_password BY 'testpassword'`, - ); - } else { - await client.execute( - `CREATE USER 'testuser'@'%' IDENTIFIED WITH mysql_native_password`, - ); - await client.execute( - `SET PASSWORD FOR 'testuser'@'%' = PASSWORD('testpassword')`, - ); - } - await client.execute(`GRANT ALL ON test.* TO 'testuser'@'%'`); -}); - -testWithClient(async function testConnectWithMysqlNativePassword(client) { - assertEquals( - await client.query(`SELECT CURRENT_USER() AS user`), - [{ user: "testuser@%" }], - ); -}, { username: "testuser", password: "testpassword" }); - -testWithClient(async function testDropUserWithMysqlNativePassword(client) { - await client.execute(`DROP USER 'testuser'@'%'`); -}); - -testWithClient(async function testSelectEmptyString(client) { - assertEquals( - await client.query(`SELECT '' AS a`), - [{ a: "" }], - ); - assertEquals( - await client.query(`SELECT '' AS a, '' AS b, '' AS c`), - [{ a: "", b: "", c: "" }], - ); - assertEquals( - await client.query(`SELECT '' AS a, 'b' AS b, '' AS c`), - [{ a: "", b: "b", c: "" }], - ); -}); - -registerTests(); - -Deno.test("configLogger()", async () => { - let logCount = 0; - const fakeHandler = new class extends stdlog.handlers.BaseHandler { - constructor() { - super("INFO"); - } - log(msg: string) { - logCount++; - } - }(); - - await stdlog.setup({ - handlers: { - fake: fakeHandler, - }, - loggers: { - mysql: { - handlers: ["fake"], - }, - }, - }); - await configLogger({ logger: stdlog.getLogger("mysql") }); - log.info("Test log"); - assertEquals(logCount, 1); - - await configLogger({ enable: false }); - log.info("Test log"); - assertEquals(logCount, 1); -}); - -await createTestDB(); - -await new Promise((r) => setTimeout(r, 0)); -// Workaround to https://github.com/denoland/deno/issues/7844 diff --git a/test.util.ts b/test.util.ts deleted file mode 100644 index 985fb7f..0000000 --- a/test.util.ts +++ /dev/null @@ -1,98 +0,0 @@ -import { Client, ClientConfig, Connection } from "./mod.ts"; -import { assertEquals, parse } from "./test.deps.ts"; - -const { DB_PORT, DB_NAME, DB_PASSWORD, DB_USER, DB_HOST, DB_SOCKPATH } = Deno - .env.toObject(); -const port = DB_PORT ? parseInt(DB_PORT) : 3306; -const db = DB_NAME || "test"; -const password = DB_PASSWORD || "root"; -const username = DB_USER || "root"; -const hostname = DB_HOST || "127.0.0.1"; -const sockPath = DB_SOCKPATH || "/var/run/mysqld/mysqld.sock"; -const testMethods = - Deno.env.get("TEST_METHODS")?.split(",") as ("tcp" | "unix")[] || ["tcp"]; -const unixSocketOnly = testMethods.length === 1 && testMethods[0] === "unix"; - -const config: ClientConfig = { - timeout: 10000, - poolSize: 3, - debug: true, - hostname, - username, - port, - db, - charset: "utf8mb4", - password, -}; - -const tests: (Parameters)[] = []; - -export function testWithClient( - fn: (client: Client) => void | Promise, - overrideConfig?: ClientConfig, -): void { - tests.push([fn, overrideConfig]); -} - -export function registerTests(methods: ("tcp" | "unix")[] = testMethods) { - if (methods!.includes("tcp")) { - tests.forEach(([fn, overrideConfig]) => { - Deno.test({ - name: fn.name + " (TCP)", - async fn() { - await test({ ...config, ...overrideConfig }, fn); - }, - }); - }); - } - if (methods!.includes("unix")) { - tests.forEach(([fn, overrideConfig]) => { - Deno.test({ - name: fn.name + " (UNIX domain socket)", - async fn() { - await test( - { ...config, socketPath: sockPath, ...overrideConfig }, - fn, - ); - }, - }); - }); - } -} - -async function test( - config: ClientConfig, - fn: (client: Client) => void | Promise, -) { - const resources = Deno.resources(); - const client = await new Client().connect(config); - try { - await fn(client); - } finally { - await client.close(); - } - assertEquals( - Deno.resources(), - resources, - "The client is leaking resources", - ); -} - -export async function createTestDB() { - const client = await new Client().connect({ - ...config, - poolSize: 1, - db: undefined, - socketPath: unixSocketOnly ? sockPath : undefined, - }); - await client.execute(`CREATE DATABASE IF NOT EXISTS ${db}`); - await client.close(); -} - -export function isMariaDB(connection: Connection): boolean { - return connection.serverVersion.includes("MariaDB"); -} - -export function delay(ms: number) { - return new Promise((resolve) => setTimeout(resolve, ms)); -}