Skip to content

Commit

Permalink
feat(database): sync channel init and flush
Browse files Browse the repository at this point in the history
  • Loading branch information
shigma committed May 4, 2024
1 parent 4d0b4d1 commit 4e90bab
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 21 deletions.
131 changes: 131 additions & 0 deletions packages/database/src/channel.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { Context, Logger, pick, Session } from '@satorijs/satori'

Check failure on line 1 in packages/database/src/channel.ts

View workflow job for this annotation

GitHub Actions / lint

'pick' is defined but never used
import { Message, SyncFlag } from '.'

const logger = new Logger('sync')

export enum SyncStatus {
INIT,
SYNCED,
FAILED,
}

interface Interval {
front: bigint
back: bigint
queue?: boolean
}

export class SyncChannel {
public data: SyncChannel.Data
/** 消息同步区间,倒序存放 */
public _spans: Interval[] = []
public status = SyncStatus.INIT

private _buffer: Message[] = []
private _initTask?: Promise<void>
private _queueTask = Promise.resolve()

constructor(private ctx: Context, platform: string, guildId: string, channelId: string) {
this.data = { platform, guildId, channelId }
}

private async init() {
logger.debug('init channel %s %s %s', this.data.platform, this.data.guildId, this.data.channelId)
const data = await this.ctx.database
.select('satori.message')
.where({
'platform': this.data.platform,
'channel.id': this.data.channelId,
'syncFlag': { $gt: 0 },
})
.orderBy('uid', 'asc')
.project(['uid', 'syncFlag'])
.execute()
while (data.length) {
const { syncFlag, uid: front } = data.pop()!
if (syncFlag === SyncFlag.BOTH) {
this._spans.push({ front, back: front })
} else if (syncFlag === SyncFlag.FRONT) {
const { syncFlag, uid: back } = data.pop()!
if (syncFlag === SyncFlag.BACK) {
this._spans.push({ front, back })
} else {
throw new Error('malformed sync flag')
}
} else {
throw new Error('malformed sync flag')
}
}
this.status = SyncStatus.SYNCED
}

accept(session: Session) {
if (!this.data.assignee) {
this.data.assignee = session.selfId
} else if (this.data.assignee !== session.selfId) {
return true
}

if (session.event.channel?.name) {
this.data.channelName = session.event.channel.name
}
}

async queue(session: Session) {
if (this.accept(session)) return
this._buffer.push(Message.from(session.event.message!, session.platform))
try {
if (this.status === SyncStatus.INIT) {
await (this._initTask ||= this.init())
}
if (this.status === SyncStatus.SYNCED) {
return this._queueTask = this._queueTask.then(() => this.flush())
}
} catch (error) {
logger.warn(error)
this.status = SyncStatus.FAILED
}
}

private async flush() {
while (this._buffer.length) {
const data = this._buffer.splice(0)
if (this._spans[0]?.queue) {
const { front, back } = this._spans[0]
const last = data.pop()!
await this.ctx.database.upsert('satori.message', [
{ uid: front, syncFlag: front === back ? SyncFlag.BACK : SyncFlag.NONE },
...data,
{ ...last, syncFlag: SyncFlag.FRONT },
])
this._spans[0].front = last.uid
} else {
const last = data.pop()!
const first = data.shift()
if (first) {
await this.ctx.database.upsert('satori.message', [
{ ...first, syncFlag: SyncFlag.BACK },
...data,
{ ...last, syncFlag: SyncFlag.FRONT },
])
} else {
await this.ctx.database.upsert('satori.message', [
{ ...last, syncFlag: SyncFlag.BOTH },
])
}
}
}
}
}

export namespace SyncChannel {
export interface Data {
platform: string
guildId: string
channelId: string
assignee?: string
guildName?: string
channelName?: string
avatar?: string
}
}
20 changes: 20 additions & 0 deletions packages/database/src/guild.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { Bot, Universal } from '@satorijs/satori'

export class SyncGuild {
public members?: Universal.List<Universal.GuildMember>

constructor(public bot: Bot, public data: Universal.Guild) {}

async getMembers() {
if (this.members) return this.members
return this.members = await this.bot.getGuildMemberList(this.data.id)
}

toJSON(): SyncGuild.Data {
return {}
}
}

export namespace SyncGuild {
export interface Data {}
}
116 changes: 95 additions & 21 deletions packages/database/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import {} from 'minato'
import { Context, Schema, Service } from '@satorijs/satori'
import { Channel, Guild, Message, User } from '@satorijs/protocol'
import { Bot, Context, Dict, Schema, Service } from '@satorijs/satori'
import * as Universal from '@satorijs/protocol'
import { SyncChannel } from './channel'
import { SyncGuild } from './guild'

declare module 'minato' {
interface Tables {
'satori.message': SDBMessage
'satori.user': User & { platform: string }
'satori.guild': Guild & { platform: string }
'satori.channel': Channel & { platform: string }
'satori.message': Message
'satori.user': Universal.User & { platform: string }
'satori.guild': Universal.Guild & { platform: string }
'satori.channel': Universal.Channel & { platform: string }
}
}

Expand All @@ -21,21 +23,49 @@ declare module '@satorijs/core' {

declare module '@satorijs/protocol' {
interface Message {
uid?: number
uid?: bigint
}
}

interface SDBMessage extends Message {
export enum SyncFlag {
NONE = 0,
FRONT = 1,
BACK = 2,
BOTH = 3,
}

export interface Message extends Universal.Message {
uid: bigint
platform: string
syncFlag: number
syncFlag: SyncFlag
sendFlag: number
deleted: boolean
edited: boolean
}

class SatoriDatabase extends Service<SatoriDatabase.Config> {
export namespace Message {
export const from = (message: Universal.Message, platform: string) => ({
platform,
id: message.id,
content: message.content,
timestamp: message.timestamp,
channel: { id: message.channel?.id },
user: { id: message.user?.id },
guild: { id: message.guild?.id },
quote: { id: message.quote?.id },
createdAt: message.createdAt,
updatedAt: message.updatedAt,
} as Message)
}

class SatoriDatabase extends Service<SatoriDatabase.Config, Context> {
inject = ['model', 'database']

_guilds: Dict<SyncGuild> = {}
_channels: Dict<SyncChannel> = {}

stopped = false

constructor(ctx: Context, public config: SatoriDatabase.Config) {
super(ctx, 'satori.database', true)

Expand All @@ -47,7 +77,7 @@ class SatoriDatabase extends Service<SatoriDatabase.Config> {
// })

ctx.model.extend('satori.message', {
'uid': 'unsigned(8)', // int64
'uid': 'bigint', // int64
'id': 'char(255)',
'platform': 'char(255)',
'user.id': 'char(255)',
Expand All @@ -59,8 +89,8 @@ class SatoriDatabase extends Service<SatoriDatabase.Config> {
'updatedAt': 'unsigned(8)',
'syncFlag': 'unsigned(1)',
'sendFlag': 'unsigned(1)',
'deleted': 'boolean',
'edited': 'boolean',
// 'deleted': 'boolean',
// 'edited': 'boolean',
}, {
primary: 'uid',
})
Expand Down Expand Up @@ -90,22 +120,66 @@ class SatoriDatabase extends Service<SatoriDatabase.Config> {
}, {
primary: ['id', 'platform'],
})
}

ctx.on('login-updated', () => {
// TODO
async start() {
this.ctx.on('message', (session) => {
const { platform, guildId, channelId } = session
if (session.bot.hidden) return
const key = platform + '/' + guildId + '/' + channelId
this._channels[key] ||= new SyncChannel(this.ctx, session.platform, session.guildId, session.channelId)
this._channels[key].queue(session)
})

ctx.on('message', () => {
// TODO
this.ctx.on('message-deleted', async (session) => {
await this.ctx.database.set('satori.message', {
messageId: session.messageId,
platform: session.platform,
}, {
deleted: true,
updatedAt: +new Date(),
})
})

ctx.on('message-deleted', () => {
// TODO
this.ctx.on('message-updated', async (session) => {
await this.ctx.database.set('satori.message', {
messageId: session.messageId,
platform: session.platform,
}, {
content: session.content,
updatedAt: +new Date(),
})
})

ctx.on('message-updated', () => {
// TODO
this.ctx.on('bot-status-updated', async (bot) => {
this.onBotOnline(bot)
})

this.ctx.bots.forEach(async (bot) => {
this.onBotOnline(bot)
})
}

async stop() {
this.stopped = true
}

private async onBotOnline(bot: Bot) {
if (bot.status !== Universal.Status.ONLINE || bot.hidden || !bot.getMessageList || !bot.getGuildList) return
const tasks: Promise<any>[] = []
for await (const guild of bot.getGuildIter()) {
const key = bot.platform + '/' + guild.id
this._guilds[key] ||= new SyncGuild(bot, guild)
tasks.push((async () => {
for await (const channel of bot.getChannelIter(guild.id)) {
const key = bot.platform + '/' + guild.id + '/' + channel.id
this._channels[key] ||= new SyncChannel(this.ctx, bot.platform, guild.id, channel.id)
this._channels[key].data.assignee = bot.selfId
this._channels[key].data.guildName = guild.name
this._channels[key].data.channelName = channel.name
}
})())
}
}
}

Expand Down

0 comments on commit 4e90bab

Please sign in to comment.