Skip to content

Commit

Permalink
feat(database): basic support for sync history
Browse files Browse the repository at this point in the history
  • Loading branch information
shigma committed May 4, 2024
1 parent 4e90bab commit 0a199ab
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 16 deletions.
120 changes: 107 additions & 13 deletions packages/database/src/channel.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Context, Logger, pick, Session } from '@satorijs/satori'
import { Context, Logger, Session, Universal } from '@satorijs/satori'
import { Flatten, Query } from 'minato'
import { Message, SyncFlag } from '.'

const logger = new Logger('sync')
Expand All @@ -9,46 +10,54 @@ export enum SyncStatus {
FAILED,
}

interface Interval {
front: bigint
back: bigint
interface Span {
front: SpanID
back: SpanID
queue?: boolean
}

type SpanID = [bigint, string]

export class SyncChannel {
public data: SyncChannel.Data
/** 消息同步区间,倒序存放 */
public _spans: Interval[] = []
public _spans: Span[] = []
public status = SyncStatus.INIT

private _buffer: Message[] = []
private _initTask?: Promise<void>
private _queueTask = Promise.resolve()

private _baseQuery: Query.Expr<Flatten<Message>>

constructor(private ctx: Context, platform: string, guildId: string, channelId: string) {
this.data = { platform, guildId, channelId }
this._baseQuery = {
platform,
'channel.id': channelId,
}
}

private async init() {
logger.debug('init channel %s %s %s', this.data.platform, this.data.guildId, this.data.channelId)
const data = await this.ctx.database
.select('satori.message')
.where({
'platform': this.data.platform,
'channel.id': this.data.channelId,
'syncFlag': { $gt: 0 },
...this._baseQuery,
syncFlag: { $gt: 0 },
})
.orderBy('uid', 'asc')
.project(['uid', 'syncFlag'])
.project(['id', 'uid', 'syncFlag'])
.execute()
while (data.length) {
const { syncFlag, uid: front } = data.pop()!
const { syncFlag, id: frontId, uid: frontUid } = data.pop()!
const front: SpanID = [frontUid, frontId]
if (syncFlag === SyncFlag.BOTH) {
this._spans.push({ front, back: front })
} else if (syncFlag === SyncFlag.FRONT) {
const { syncFlag, uid: back } = data.pop()!
const { syncFlag, id, uid } = data.pop()!
if (syncFlag === SyncFlag.BACK) {
this._spans.push({ front, back })
this._spans.push({ front, back: [uid, id] })
} else {
throw new Error('malformed sync flag')
}
Expand Down Expand Up @@ -98,7 +107,7 @@ export class SyncChannel {
...data,
{ ...last, syncFlag: SyncFlag.FRONT },
])
this._spans[0].front = last.uid
this._spans[0].front = [last.uid, last.id]
} else {
const last = data.pop()!
const first = data.shift()
Expand All @@ -116,6 +125,91 @@ export class SyncChannel {
}
}
}

async getMessageList(id: string, count: number, direction: Universal.Direction) {
if (this._buffer.some(message => message.id === id)) {
// TODO
} else {
const [message] = await this.ctx.database
.select('satori.message')
.where({ ...this._baseQuery, id })
.execute()
if (message) {
const span = this._spans.find(span => span.front[0] <= message.uid && message.uid <= span.back[0])
if (!span) throw new Error('malformed sync span')
const beforeTask = direction === 'after' ? Promise.resolve([]) : this.syncHistory(span, message, count, 'before')
const afterTask = direction === 'before' ? Promise.resolve([]) : this.syncHistory(span, message, count, 'after')
const [before, after] = await Promise.all([beforeTask, afterTask])
after.shift()
before.shift()
before.reverse()
if (direction === 'after') return after
if (direction === 'before') return before
return [...before, message, ...after]
}
}
}

private async syncHistory(span: Span, message: Message | { uid: bigint }, count: number, direction: 'before' | 'after') {
const buffer: Message[] = []
const { channelId, platform, assignee } = this.data
const bot = this.ctx.bots[`${platform}:${assignee}`]
const dir = ({
before: {
front: 'front',
back: 'back',
desc: 'desc',
$lte: '$lte',
$gte: '$gte',
},
after: {
front: 'back',
back: 'front',
desc: 'asc',
$lte: '$gte',
$gte: '$lte',
},
} as const)[direction]
outer: while (true) {
if ('id' in message && span[dir.front][0] === message.uid) {
buffer.push(message)
} else {
const before = await this.ctx.database
.select('satori.message')
.where({
...this._baseQuery,
uid: {
[dir.$lte]: message.uid,
[dir.$gte]: span[dir.front][0],
},
})
.orderBy('uid', dir.desc)
.limit(count - buffer.length)
.execute()
buffer.push(...before)
}
if (buffer.length >= count) return buffer
let next = span[dir.front][1]
while (true) {
const result = await bot.getMessageList(channelId, next, direction)
next = result.next!
for (let index = result.data.length - 1; index >= 0; index--) {
const prevSpan = this._spans.find(span => span[dir.back][1] === result.data[index].id)
if (prevSpan) {
span = prevSpan
message = { uid: prevSpan[dir.back][0] }
continue outer
}
buffer.push(Message.from(result.data[index], platform))
if (buffer.length >= count) return buffer
}
}
}
}

toJSON(): SyncChannel.Data {
return this.data
}
}

export namespace SyncChannel {
Expand Down
14 changes: 11 additions & 3 deletions packages/protocol/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const Methods: Dict<Method> = {
'message.update': Method('editMessage', ['channel_id', 'message_id', 'content']),
'message.delete': Method('deleteMessage', ['channel_id', 'message_id']),
'message.get': Method('getMessage', ['channel_id', 'message_id']),
'message.list': Method('getMessageList', ['channel_id', 'next']),
'message.list': Method('getMessageList', ['channel_id', 'next', 'direction']),

'reaction.create': Method('createReaction', ['channel_id', 'message_id', 'emoji']),
'reaction.delete': Method('deleteReaction', ['channel_id', 'message_id', 'emoji', 'user_id']),
Expand Down Expand Up @@ -72,13 +72,21 @@ export interface List<T> {
next?: string
}

export interface TwoWayList<T> {
data: T[]
prev?: string
next?: string
}

export type Direction = 'before' | 'after' | 'around'

export interface Methods {
// message
createMessage(channelId: string, content: Element.Fragment, guildId?: string, options?: SendOptions): Promise<Message[]>
sendMessage(channelId: string, content: Element.Fragment, guildId?: string, options?: SendOptions): Promise<string[]>
sendPrivateMessage(userId: string, content: Element.Fragment, guildId?: string, options?: SendOptions): Promise<string[]>
getMessage(channelId: string, messageId: string): Promise<Message>
getMessageList(channelId: string, next?: string): Promise<List<Message>>
getMessageList(channelId: string, next?: string, direction?: Direction): Promise<TwoWayList<Message>>
getMessageIter(channelId: string): AsyncIterable<Message>
editMessage(channelId: string, messageId: string, content: Element.Fragment): Promise<void>
deleteMessage(channelId: string, messageId: string): Promise<void>
Expand Down Expand Up @@ -211,7 +219,7 @@ export const enum Status {
}

export interface Message {
id?: string
id: string
/** @deprecated */
messageId?: string
channel?: Channel
Expand Down

0 comments on commit 0a199ab

Please sign in to comment.