diff --git a/README.md b/README.md index 903a1903..eb0c156f 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ - [x] @satorijs/core - [x] @satorijs/element - [x] @satorijs/satori - - [x] @satorijs/plugin-database + - [x] @satorijs/protocol - [x] @satorijs/plugin-server - Ecosystem - [x] DingTalk (钉钉) diff --git a/packages/database/package.json b/packages/database/package.json deleted file mode 100644 index 6ef9e58d..00000000 --- a/packages/database/package.json +++ /dev/null @@ -1,61 +0,0 @@ -{ - "name": "@satorijs/plugin-database", - "description": "Database for Satori protocol", - "version": "0.1.1", - "type": "module", - "main": "lib/index.cjs", - "module": "lib/index.mjs", - "types": "lib/index.d.ts", - "exports": { - ".": { - "require": "./lib/index.cjs", - "import": "./lib/index.mjs", - "types": "./lib/index.d.ts" - }, - "./package.json": "./package.json" - }, - "files": [ - "lib", - "src" - ], - "author": "Shigma ", - "license": "MIT", - "repository": { - "type": "git", - "url": "git+https://github.com/satorijs/satori.git", - "directory": "packages/database" - }, - "bugs": { - "url": "https://github.com/satorijs/satori/issues" - }, - "homepage": "https://github.com/satorijs/satori/tree/master/packages/database", - "keywords": [ - "satori", - "protocol", - "server", - "basic", - "api", - "database" - ], - "cordis": { - "service": { - "required": [ - "database" - ], - "implements": [ - "satori.database" - ] - } - }, - "devDependencies": { - "@satorijs/plugin-server": "^2.5.0", - "minato": "^3.3.0" - }, - "peerDependencies": { - "@satorijs/core": "^4.0.0", - "minato": "^3.3.0" - }, - "dependencies": { - "cosmokit": "^1.6.2" - } -} diff --git a/packages/database/src/channel.ts b/packages/database/src/channel.ts deleted file mode 100644 index 579599d3..00000000 --- a/packages/database/src/channel.ts +++ /dev/null @@ -1,230 +0,0 @@ -import { Bot, Context, Logger, Session, Universal } from '@satorijs/core' -import { Message } from './types' -import { Span } from './span' - -const logger = new Logger('sync') - -export enum SyncStatus { - INIT, - READY, - FAILED, -} - -type MessageLike = Message | { sid: bigint } - -type LocateResult = [Span, MessageLike] - -interface CollectResult { - temp?: Universal.TwoWayList - span?: Span -} - -export class SyncChannel { - public _spans: Span[] = [] - public _query: { platform: string; 'channel.id': string } - - public hasLatest = false - public hasEarliest = false - - private _initTask?: Promise - - constructor(public ctx: Context, public bot: Bot, public guildId: string, public data: Universal.Channel) { - this._query = { platform: bot.platform, 'channel.id': data.id } - bot.ctx.emit('satori/database/update') - } - - private async init() { - logger.debug('init channel %s %s %s', this.bot.platform, this.guildId, this.data.id) - const data = await this.ctx.database - .select('satori.message') - .where({ - ...this._query, - flag: { $bitsAnySet: Message.Flag.FRONT | Message.Flag.BACK }, - }) - .orderBy('sid', 'asc') - .project(['id', 'sid', 'flag']) - .execute() - while (data.length) { - const { flag, id: frontId, sid: frontUid } = data.pop()! - const front: Span.Endpoint = [frontUid, frontId] - if (!(flag & Message.Flag.FRONT)) { - throw new Error('malformed sync flag') - } else if (flag & Message.Flag.BACK) { - this._spans.push(new Span(this, Span.Type.REMOTE, front, front)) - } else { - const { flag, id, sid } = data.pop()! - if (flag & Message.Flag.BACK) { - this._spans.push(new Span(this, Span.Type.REMOTE, front, [sid, id])) - } else { - throw new Error('malformed sync flag') - } - } - } - } - - private binarySearch(sid: bigint) { - let left = 0 - let right = this._spans.length - while (left < right) { - const mid = Math.floor((left + right) / 2) - if (this._spans[mid].back[0] <= sid) { - right = mid - } else { - left = mid + 1 - } - } - return left - } - - insert(data: Message[], options: Pick = {}, forced: Span.PrevNext = {}) { - if (!data.length && !options.prev && !options.next) { - throw new Error('unexpected empty span') - } - const back: Span.Endpoint = [data.at(0)!.sid, data.at(0)!.id] - const front: Span.Endpoint = [data.at(-1)!.sid, data.at(-1)!.id] - const span = new Span(this, Span.Type.LOCAL, front, back, data) - const index = this.binarySearch(back[0]) - this._spans.splice(index, 0, span) - span.prevTemp = options.prevTemp - span.link('before', options.prev) - span.merge('before') - span.nextTemp = options.nextTemp - span.link('after', options.next) - span.merge('after') - span.flush(forced) - return span - } - - async queue(session: Session) { - const prev = this.hasLatest ? this._spans[0] : undefined - const message = Message.from(session.event.message!, session.platform, 'after', prev?.front[0]) - this.hasLatest = true - this.insert([message], { prev }, { prev: true, next: true }) - } - - getMessageList(id: string, dir?: Universal.Direction, limit?: number) { - return this.bot.getMessageList(this.data.id, id, dir, limit, 'asc') - } - - // TODO handle default limit - async list(id: string, dir: Universal.Direction, limit: number) { - await (this._initTask ||= this.init()) - const result = await this.locate(id, dir, limit) - if (!result) return [] - const [span, message] = result - if (dir === 'around') limit = Math.floor(limit / 2) + 1 - const beforeTask = dir === 'after' ? Promise.resolve([]) : this.extend(span, message, limit, 'before') - const afterTask = dir === 'before' ? Promise.resolve([]) : this.extend(span, message, limit, 'after') - const [before, after] = await Promise.all([beforeTask, afterTask]) - if (dir === 'after') return after - if (dir === 'before') return before - return [...before.slice(0, -1), message, ...after.slice(1)] - } - - collect(result: Universal.TwoWayList, dir: Span.Direction, data: Message[], index?: number): CollectResult { - const w = Span.words[dir] - index ??= dir === 'after' ? -1 : result.data.length - for (let i = index + w.inc; i >= 0 && i < result.data.length; i += w.inc) { - const span = this._spans.find(span => span[w.back][1] === result.data[i].id) - if (span) { - const data = w.slice(result.data, i + w.inc) - if (data.length) { - span[w.temp] = { [w.next]: result[w.next], data } - } - return { span } - } - data[w.push](Message.from(result.data[i], this.bot.platform, dir, data.at(w.last)?.sid)) - } - return { temp: { data: [], [w.next]: result[w.next] } } - } - - private async locate(id: string, dir: Universal.Direction, limit?: number): Promise { - // condition 1: message in memory - for (const span of this._spans) { - const message = span.data?.find(message => message.id === id) - if (message) return [span, message] - } - - // condition 2: message in database - const data = await this.ctx.database - .select('satori.message') - .where({ ...this._query, id }) - .execute() - if (data[0]) { - const { sid } = data[0] - const span = this._spans[this.binarySearch(sid)] - if (!span || span.back[0] > sid || span.front[0] < sid) throw new Error('malformed sync span') - return [span, data[0]] - } - - // condition 3: message not cached, request from adapter - let span: Span - let message: MessageLike - let index: number | undefined - const result = await this.getMessageList(id, dir, limit) - if (dir === 'around') { - index = result.data.findIndex(item => item.id === id) - if (index === -1) throw new Error('malformed message list') - message = Message.from(result.data[index], this.bot.platform) - data.push(message as Message) - } - - const { span: prev, temp: prevTemp } = this.collect(result, 'before', data, index) - const { span: next, temp: nextTemp } = this.collect(result, 'after', data, index) - - if (data.length || prev && next) { - span = this.insert(data, { prev, next }) - } else if (prev || next) { - span = prev || next! - } else { - if (dir === 'before') this.hasEarliest = true - return - } - - span.prevTemp = prevTemp - span.nextTemp = nextTemp - if (dir === 'before') { - message = { sid: span.front[0] } - } else if (dir === 'after') { - message = { sid: span.back[0] } - } - return [span, message!] - } - - private async extend(span: Span, message: MessageLike, limit: number, dir: Span.Direction) { - const buffer: Message[] = [] - const w = Span.words[dir] - - while (true) { - const data = await span.collect(message, dir, limit - buffer.length) - buffer[w.push](...data) - if (buffer.length >= limit) { - delete span[w.temp] - break - } - - let result = span[w.temp] - if (result) { - let i = dir === 'before' ? result.data.length - 1 : 0 - for (; i >= 0 && i < result.data.length; i += w.inc) { - if (!data.some(item => item.id === result!.data[i].id)) break - } - result.data = w.slice(result.data, i) - if (!result.data.length) result = undefined - delete span[w.temp] - } - - const next = span[w.next] ?? await (span[w.task] ??= span.extend(dir, limit - buffer.length, result)) - if (!next) break - - span = next - message = { sid: span[w.back][0] } - } - - if (dir === 'before') { - return buffer.slice(-limit) - } else { - return buffer.slice(0, limit) - } - } -} diff --git a/packages/database/src/guild.ts b/packages/database/src/guild.ts deleted file mode 100644 index 7293f749..00000000 --- a/packages/database/src/guild.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { Bot, Universal } from '@satorijs/core' - -export class SyncGuild { - public members?: Universal.List - - constructor(public bot: Bot, public data: Universal.Guild) { - bot.ctx.emit('satori/database/update') - } - - async getMembers() { - if (this.members) return this.members - return this.members = await this.bot.getGuildMemberList(this.data.id) - } -} - -export namespace SyncGuild { - export interface Data {} -} diff --git a/packages/database/src/index.ts b/packages/database/src/index.ts deleted file mode 100644 index d8c81d3d..00000000 --- a/packages/database/src/index.ts +++ /dev/null @@ -1,171 +0,0 @@ -import {} from 'minato' -import { Bot, Context, Dict, Schema, Service, Universal } from '@satorijs/core' -import { SyncChannel } from './channel' -import { SyncGuild } from './guild' - -export * from './types' - -declare module '@satorijs/core' { - interface Satori { - database: SatoriDatabase - } -} - -declare module 'cordis' { - interface Events { - 'satori/database/update'(): void - } -} - -class SatoriDatabase extends Service { - static inject = ['model', 'database'] - - _guilds: Dict = {} - _channels: Dict = {} - - stopped = false - - private _botTasks = new WeakMap>() - - constructor(ctx: Context, public config: SatoriDatabase.Config) { - super(ctx, 'satori.database', true) - - // TODO bot mixin - // ctx.mixin('satori.database', { - // 'createMessage': 'bot.createMessage', - // 'getMessage': 'bot.getMessage', - // 'getMessageList': 'bot.getMessageList', - // }) - - ctx.model.extend('satori.message', { - 'uid': 'unsigned(8)', - 'sid': 'bigint', // int64 - 'id': 'char(255)', - 'platform': 'char(255)', - 'user.id': 'char(255)', - 'channel.id': 'char(255)', - 'guild.id': 'char(255)', - 'quote.id': 'char(255)', - 'content': 'text', - 'createdAt': 'unsigned(8)', - 'updatedAt': 'unsigned(8)', - 'flag': 'unsigned(1)', - 'deleted': 'boolean', - 'edited': 'boolean', - }, { - primary: 'uid', - autoInc: true, - }) - - ctx.model.extend('satori.user', { - 'id': 'char(255)', - 'platform': 'char(255)', - 'name': 'char(255)', - 'nick': 'char(255)', - 'avatar': 'char(255)', - }, { - primary: ['id', 'platform'], - }) - - ctx.model.extend('satori.guild', { - 'id': 'char(255)', - 'platform': 'char(255)', - 'name': 'char(255)', - }, { - primary: ['id', 'platform'], - }) - - ctx.model.extend('satori.channel', { - 'id': 'char(255)', - 'platform': 'char(255)', - 'name': 'char(255)', - }, { - primary: ['id', 'platform'], - }) - } - - async start() { - this.ctx.on('message', (session) => { - const { platform, guildId, channelId } = session - if (session.bot.hidden) return - const key = platform + '/' + guildId + '/' + channelId - this._channels[key] ||= new SyncChannel(this.ctx, session.bot, session.guildId, session.event.channel!) - if (this._channels[key].bot === session.bot) { - this._channels[key].queue(session) - } - }) - - this.ctx.on('message-deleted', async (session) => { - // TODO update local message - await this.ctx.database.set('satori.message', { - id: session.messageId, - platform: session.platform, - }, { - deleted: true, - updatedAt: +new Date(), - }) - }) - - this.ctx.on('message-updated', async (session) => { - // TODO update local message - await this.ctx.database.set('satori.message', { - id: session.messageId, - platform: session.platform, - }, { - content: session.content, - updatedAt: +new Date(), - }) - }) - - this.ctx.on('bot-status-updated', async (bot) => { - this.updateBot(bot) - }) - - this.ctx.bots.forEach(async (bot) => { - this.updateBot(bot) - }) - } - - async stop() { - this.stopped = true - } - - private async updateBot(bot: Bot) { - if (bot.hidden) return - if (!await bot.supports('message.list') || !await bot.supports('guild.list')) return - if (bot.status !== Universal.Status.ONLINE) { - this._botTasks.delete(bot) - for (const channel of Object.values(this._channels)) { - if (channel.bot !== bot) continue - channel.hasLatest = false - } - return - } - this._botTasks.has(bot) || this._botTasks.set(bot, (async () => { - for await (const guild of bot.getGuildIter()) { - const key = bot.platform + '/' + guild.id - this._guilds[key] ||= new SyncGuild(bot, guild) - } - })()) - // const tasks: Promise[] = [] - // for await (const guild of bot.getGuildIter()) { - // const key = bot.platform + '/' + guild.id - // this._guilds[key] ||= new SyncGuild(bot, guild) - // tasks.push((async () => { - // for await (const channel of bot.getChannelIter(guild.id)) { - // const key = bot.platform + '/' + guild.id + '/' + channel.id - // this._channels[key] ||= new SyncChannel(this.ctx, bot, guild.id, channel) - // } - // })()) - // } - // await Promise.all(tasks) - } -} - -namespace SatoriDatabase { - export interface Config {} - - export const Config: Schema = Schema.object({}) -} - -export default SatoriDatabase diff --git a/packages/database/src/span.ts b/packages/database/src/span.ts deleted file mode 100644 index 7d89dd68..00000000 --- a/packages/database/src/span.ts +++ /dev/null @@ -1,169 +0,0 @@ -import { clone, remove, Universal } from '@satorijs/core' -import { $, Update } from 'minato' -import { Message } from './types' -import { SyncChannel } from './channel' - -export class Span { - prev?: Span - prevTask?: Promise - prevTemp?: Universal.TwoWayList - next?: Span - nextTask?: Promise - nextTemp?: Universal.TwoWayList - syncTask?: Promise - - constructor( - public channel: SyncChannel, - public type: Span.Type, - public front: Span.Endpoint, - public back: Span.Endpoint, - public data?: Message[], - ) {} - - link(dir: Span.Direction, span?: Span) { - const w = Span.words[dir] - this[w.next] = span - if (span) span[w.prev] = this - } - - merge(dir: Span.Direction) { - const w = Span.words[dir] - const next = this[w.next] - if (next?.type !== this.type) return false - remove(this.channel._spans, next) - this.data?.[w.push](...next.data!) - this[w.front] = next[w.front] - this[w.temp] = next[w.temp] - this[w.task] = next[w.task] - this.link(dir, next[w.next]) - return true - } - - async flush(forced: Span.PrevNext = {}) { - if (this.type !== Span.Type.LOCAL) throw new Error('expect local span') - if (!forced.prev && !this.prev && !(this === this.channel._spans.at(0) && this.channel.hasEarliest)) return - if (!forced.next && !this.next && !(this === this.channel._spans.at(-1) && this.channel.hasLatest)) return - await Promise.all([this.prev?.syncTask, this.next?.syncTask]) - if (!this.channel._spans.includes(this)) return - return this.syncTask ||= this.sync() - } - - private async sync() { - this.type = Span.Type.SYNC - await this.channel.ctx.database.upsert('satori.message', (row) => { - const data: Update[] = clone(this.data!) - if (this.next?.type === Span.Type.REMOTE) { - data.push({ - ...this.channel._query, - sid: this.next.back[0], - flag: $.and(row.flag, $.not(Message.Flag.BACK)), - }) - } else { - data.at(-1)!.flag |= Message.Flag.FRONT - } - if (this.prev?.type === Span.Type.REMOTE) { - data.unshift({ - ...this.channel._query, - sid: this.prev.front[0], - flag: $.and(row.flag, $.not(Message.Flag.FRONT)), - }) - } else { - data.at(0)!.flag |= Message.Flag.BACK - } - return data - }, ['sid', 'channel.id', 'platform']) - this.type = Span.Type.REMOTE - delete this.data - this.merge('after') - this.merge('before') - } - - async collect(message: Message | { sid: bigint }, dir: Span.Direction, limit: number) { - const w = Span.words[dir] - if (this.data) { - const index = this.data.findIndex(item => item.sid === message.sid) - return w.slice(this.data, index) - } else if ('id' in message && this[w.front][0] === message.sid) { - return [message] - } else { - const data = await this.channel.ctx.database - .select('satori.message') - .where({ - ...this.channel._query, - sid: { - [w.$gte]: message.sid, - [w.$lte]: this[w.front][0], - }, - }) - .orderBy('sid', w.order) - .limit(limit) - .execute() - if (dir === 'before') data.reverse() - return data - } - } - - async extend(dir: Span.Direction, limit: number, result?: Universal.TwoWayList) { - const w = Span.words[dir] - result ??= await this.channel.getMessageList(this[w.front][1], dir, limit) - const data: Message[] = [] - const { span, temp } = this.channel.collect(result, dir, data) - if (!span && dir === 'before' && !result[w.next]) this.channel.hasEarliest = true - if (data.length || span) { - return this.channel.insert(data, { - [w.prev]: this, - [w.next]: span, - [w.temp]: temp, - }) - } - } -} - -export namespace Span { - export type Direction = 'before' | 'after' - export type Endpoint = [bigint, string] - - export enum Type { - LOCAL, - SYNC, - REMOTE, - } - - export interface PrevNext { - prev?: T - next?: T - } - - export const words = { - before: { - prev: 'next', - next: 'prev', - push: 'unshift', - front: 'back', - back: 'front', - task: 'prevTask', - temp: 'prevTemp', - order: 'desc', - $lte: '$gte', - $gte: '$lte', - inc: -1, - last: 0, - slice: (arr: T[], index: number) => arr.slice(0, index + 1), - }, - after: { - prev: 'prev', - next: 'next', - push: 'push', - front: 'front', - back: 'back', - task: 'nextTask', - temp: 'nextTemp', - order: 'asc', - $lte: '$lte', - $gte: '$gte', - inc: 1, - last: -1, - slice: (arr: T[], index: number) => arr.slice(index), - }, - } as const -} diff --git a/packages/database/src/types.ts b/packages/database/src/types.ts deleted file mode 100644 index 5c019865..00000000 --- a/packages/database/src/types.ts +++ /dev/null @@ -1,58 +0,0 @@ -import { Universal } from '@satorijs/core' -import { Span } from './span' - -declare module 'minato' { - interface Tables { - 'satori.message': Message - 'satori.user': Universal.User & { platform: string } - 'satori.guild': Universal.Guild & { platform: string } - 'satori.channel': Universal.Channel & { platform: string } - } -} - -declare module '@satorijs/protocol' { - interface Message { - sid?: bigint - } -} - -export interface Message extends Universal.Message { - id: string - uid: number - sid: bigint - platform: string - flag: number - deleted: boolean - edited: boolean -} - -export namespace Message { - export enum Flag { - FRONT = 1, - BACK = 2, - FINAL = 4, - } - - function sequence(ts: bigint, dir?: Span.Direction, ref?: bigint) { - if (!dir || !ref) return (ts << 12n) + 0x800n - if (ts === ref >> 12n) { - return ref + (dir === 'before' ? -1n : 1n) - } else { - return (ts << 12n) + (dir === 'before' ? 0xfffn : 0n) - } - } - - export const from = (message: Universal.Message, platform: string, dir?: Span.Direction, ref?: bigint) => ({ - platform, - id: message.id, - sid: sequence(BigInt(message.createdAt!), dir, ref), - content: message.content, - channel: { id: message.channel?.id }, - user: { id: message.user?.id }, - guild: { id: message.guild?.id }, - quote: { id: message.quote?.id }, - createdAt: message.createdAt, - updatedAt: message.updatedAt, - flag: 0, - } as Message) -} diff --git a/packages/database/tsconfig.json b/packages/database/tsconfig.json deleted file mode 100644 index 6f11f324..00000000 --- a/packages/database/tsconfig.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "extends": "../../tsconfig.base", - "compilerOptions": { - "rootDir": "src", - "outDir": "lib", - "strict": true, - "noImplicitAny": false, - }, - "include": [ - "src", - ], -}