diff --git a/packages/database/src/channel.ts b/packages/database/src/channel.ts index 885f8f1e..da58a00e 100644 --- a/packages/database/src/channel.ts +++ b/packages/database/src/channel.ts @@ -1,6 +1,7 @@ -import { Bot, Context, Logger, remove, Session, Universal } from '@satorijs/satori' +import { Bot, Context, Logger, Session, Universal } from '@satorijs/satori' import { Flatten, Query } from 'minato' import { Message, SyncFlag } from '.' +import { Span } from './span' const logger = new Logger('sync') @@ -10,47 +11,16 @@ export enum SyncStatus { FAILED, } -enum SpanType { - LOCAL, - SYNC, - REMOTE, -} - -interface Span { - type: SpanType - front: Endpoint - back: Endpoint - prev?: Span - prevTask?: Promise - next?: Span - nextTask?: Promise - data?: Message[] - dataTask?: Promise -} - -namespace Span { - export function from(data: Message[], reverse: boolean) { - if (reverse) data.reverse() - const span = { data, type: SpanType.LOCAL } as Span - span.back = [data[0].uid, data[0].id] - span.front = [data[data.length - 1].uid, data[data.length - 1].id] - return span - } -} - -type Endpoint = [bigint, string] - export class SyncChannel { - private _spans: Span[] = [] + public spans: Span[] = [] + private _status = SyncStatus.INIT private _initTask?: Promise - private _queueTask = Promise.resolve() - private _hasLatest = false private _baseQuery: Query.Expr> - constructor(private ctx: Context, public bot: Bot, public guildId: string, public channelId: string) { + constructor(public ctx: Context, public bot: Bot, public guildId: string, public channelId: string) { this._baseQuery = { platform: bot.platform, 'channel.id': channelId } this._initTask ||= this._init().then(() => { this._status = SyncStatus.READY @@ -73,13 +43,13 @@ export class SyncChannel { .execute() while (data.length) { const { syncFlag, id: frontId, uid: frontUid } = data.pop()! - const front: Endpoint = [frontUid, frontId] + const front: Span.Endpoint = [frontUid, frontId] if (syncFlag === SyncFlag.BOTH) { - this._spans.push({ front, back: front, type: SpanType.REMOTE }) + this.spans.push(new Span(this, Span.Type.REMOTE, front, front)) } else if (syncFlag === SyncFlag.FRONT) { const { syncFlag, id, uid } = data.pop()! if (syncFlag === SyncFlag.BACK) { - this._spans.push({ front, back: [uid, id], type: SpanType.REMOTE }) + this.spans.push(new Span(this, Span.Type.REMOTE, front, [uid, id])) } else { throw new Error('malformed sync flag') } @@ -89,83 +59,49 @@ export class SyncChannel { } } + private createSpan(data: Message[], index?: number) { + const back: Span.Endpoint = [data[0].uid, data[0].id] + const front: Span.Endpoint = [data[data.length - 1].uid, data[data.length - 1].id] + const span = new Span(this, Span.Type.LOCAL, front, back, data) + if (typeof index !== 'number') { + index = 0 + let right = this.spans.length + while (index < right) { + const mid = Math.floor((index + right) / 2) + if (this.spans[mid].front[0] < front[0]) { + right = mid + } else { + index = mid + } + } + } + this.spans.splice(index, 0, span) + return span + } + async queue(session: Session) { const message = Message.from(session.event.message!, session.platform) - if (this._hasLatest && this._spans[0]?.type === SpanType.LOCAL) { - this._spans[0].data!.push(message) - } else { - this._spans.unshift({ - type: SpanType.LOCAL, - front: [message.uid, message.id], - back: [message.uid, message.id], - data: [message], - prev: this._hasLatest ? this._spans[0] : undefined, - }) + const span = this.createSpan([message], 0) + if (this._hasLatest) { + span.prev = this.spans[1] + this.spans[1].next = span } this._hasLatest = true await this._initTask if (this._status === SyncStatus.FAILED) return try { - await (this._queueTask = this._queueTask.then(async () => { - await this._flushSpan(this._spans[0], true) - })) + await this.spans[0].flush() } catch (error) { logger.warn(error) this._status = SyncStatus.FAILED } } - private async flushSpan(span: Span) { - if (!span.data) return - return span.dataTask ||= this._flushSpan(span) - } - - private async _flushSpan(span: Span, isLastest = false) { - if (!span.data) return - do { - const data: Partial[] = isLastest ? span.data.splice(0) : span.data.slice() - if (span.next) { - data.unshift({ - uid: span.next.front[0], - syncFlag: span.next.front[0] === span.next.back[0] ? SyncFlag.FRONT : SyncFlag.NONE, - }) - } else { - span.data[span.data.length - 1].syncFlag = SyncFlag.FRONT - } - if (span.prev) { - data.unshift({ - uid: span.prev.back[0], - syncFlag: span.prev.front[0] === span.prev.back[0] ? SyncFlag.BACK : SyncFlag.NONE, - }) - } else { - span.data[0].syncFlag = span.data[0].syncFlag ? SyncFlag.BOTH : SyncFlag.BACK - } - await this.ctx.database.upsert('satori.message', data) - // eslint-disable-next-line no-unmodified-loop-condition - } while (isLastest && span.data.length) - if (span.prev && span.next) { - remove(this._spans, span) - remove(this._spans, span.next) - span.prev.front = span.next.front - } else if (span.prev) { - remove(this._spans, span) - span.prev.front = span.front - } else if (span.next) { - remove(this._spans, span) - span.next.back = span.back - } else { - span.type = SpanType.REMOTE - delete span.data - delete span.prev - delete span.next - } - } - async getMessageList(id: string, direction: Universal.Direction, limit: number) { let span: Span | undefined, message: Message | undefined // condition 1: message in local - for (span of this._spans) { + for (span of this.spans) { message = span.data?.find(message => message.id === id) if (message) break } @@ -178,7 +114,7 @@ export class SyncChannel { .execute() if (data[0]) { message = data[0] - span = this._spans.find(span => span.front[0] <= data[0].uid && data[0].uid <= span.back[0]) + span = this.spans.find(span => span.front[0] <= data[0].uid && data[0].uid <= span.back[0]) if (!span) throw new Error('malformed sync span') } } @@ -192,19 +128,19 @@ export class SyncChannel { const data = [message] let prev: Span | undefined, next: Span | undefined for (let i = index - 1; i >= 0; i--) { - prev = this._spans.find(span => span.front[1] === result.data[i].id) + prev = this.spans.find(span => span.front[1] === result.data[i].id) if (prev) break data.unshift(Message.from(result.data[i], this.bot.platform)) } for (let i = index + 1; i < result.data.length; i++) { - next = this._spans.find(span => span.back[1] === result.data[i].id) + next = this.spans.find(span => span.back[1] === result.data[i].id) if (next) break data.push(Message.from(result.data[i], this.bot.platform)) } - span = Span.from(data, false) + span = this.createSpan(data) span.prev = prev span.next = next - this._spans.push(span) + this.spans.push(span) } if (direction === 'around') { @@ -277,13 +213,13 @@ export class SyncChannel { const result = await this.bot.getMessageList(this.channelId, token, direction) if (direction === 'before') result.data.reverse() for (const item of result.data) { - const prev = this._spans.find(span => span[dir.front][1] === item.id) + const prev = this.spans.find(span => span[dir.front][1] === item.id) if (prev) { - const _span = Span.from(data, direction === 'before') + const _span = this.createSpan(data) // FIXME reverse _span[dir.next] = next _span[dir.prev] = prev - this._spans.push(_span) - this.flushSpan(_span) + this.spans.push(_span) + _span.flush() next = prev message = { uid: prev[dir.front][0] } continue outer @@ -296,10 +232,10 @@ export class SyncChannel { } token = result.next! } - const _span = Span.from(data, direction === 'before') + const _span = this.createSpan(data) // FIXME reverse _span[dir.next] = next - this._spans.push(_span) - this.flushSpan(_span) + this.spans.push(_span) + _span.flush() return local.slice(0, limit) } } diff --git a/packages/database/src/index.ts b/packages/database/src/index.ts index 641a6f08..e9ad7dec 100644 --- a/packages/database/src/index.ts +++ b/packages/database/src/index.ts @@ -55,6 +55,7 @@ export namespace Message { quote: { id: message.quote?.id }, createdAt: message.createdAt, updatedAt: message.updatedAt, + syncFlag: SyncFlag.NONE, } as Message) } diff --git a/packages/database/src/span.ts b/packages/database/src/span.ts new file mode 100644 index 00000000..9ba2b1f9 --- /dev/null +++ b/packages/database/src/span.ts @@ -0,0 +1,90 @@ +import { clone, remove } from '@satorijs/satori' +import { Message, SyncFlag } from '.' +import { SyncChannel } from './channel' +import { $, Update } from 'minato' + +export class Span { + prev?: Span + prevTask?: Promise + next?: Span + nextTask?: Promise + syncTask?: Promise + + constructor( + public channel: SyncChannel, + public type: Span.Type, + public front: Span.Endpoint, + public back: Span.Endpoint, + public data?: Message[], + ) {} + + mergeNext() { + if (!this.next) return + if (this.next.type !== this.type) throw new Error('malformed span type') + remove(this.channel.spans, this.next) + this.data?.push(...this.next.data!) + this.front = this.next.front + this.next = this.next.next + if (this.next) this.next.prev = this + } + + mergePrev() { + if (!this.prev) return + if (this.prev.type !== this.type) throw new Error('malformed span type') + remove(this.channel.spans, this.prev) + this.data?.unshift(...this.prev.data!) + this.back = this.prev.back + this.prev = this.prev.prev + if (this.prev) this.prev.next = this + } + + flush() { + if (this.type !== Span.Type.LOCAL) throw new Error('expect local span') + while (this.next?.type === Span.Type.LOCAL) { + this.mergeNext() + } + while (this.prev?.type === Span.Type.LOCAL) { + this.mergePrev() + } + this.type = Span.Type.SYNC + return this.syncTask ||= this.sync() + } + + async sync() { + await Promise.all([this.prev?.syncTask, this.next?.syncTask]) + await this.channel.ctx.database.upsert('satori.message', (row) => { + const data: Update[] = clone(this.data!) + if (this.next) { + data.push({ + uid: this.next.back[0], + syncFlag: $.bitAnd(row.syncFlag, $.bitNot(SyncFlag.BACK)), + }) + } else { + data[data.length - 1].syncFlag |= SyncFlag.FRONT + } + if (this.prev) { + data.unshift({ + uid: this.prev.front[0], + syncFlag: $.bitAnd(row.syncFlag, $.bitNot(SyncFlag.FRONT)), + }) + } else { + data[0].syncFlag |= SyncFlag.BACK + } + return data + }) + this.type = Span.Type.REMOTE + delete this.data + this.mergeNext() + this.mergePrev() + } +} + +export namespace Span { + export type Endpoint = [bigint, string] + + export enum Type { + LOCAL, + SYNC, + REMOTE, + } +}