Skip to content

Commit

Permalink
fix(core): fix session.observe() racing issue
Browse files Browse the repository at this point in the history
  • Loading branch information
shigma committed Jan 28, 2022
1 parent 5473727 commit ae44c54
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 50 deletions.
8 changes: 2 additions & 6 deletions docs/guide/database/observer.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,15 @@ app.middleware((session, next) => {
```js
// 中间增加了一个第二参数,表示默认情况下的权限等级
// 如果找到该用户,则返回该用户本身
// 否则创建一个新的用户数据,权限为 authority
// 如果 authority 大于 0,则将新的用户数据添加到表中
session.getUser(id, authority, fields)
session.getUser(id, fields)

// 在当前会话上绑定一个可观测用户实例
// 也就是所谓的 session.user
session.observeUser(fields)

// 中间增加了一个第二参数,表示默认情况下的 assignee
// 如果找到该频道,则不修改任何数据,返回该频道本身
// 如果未找到该频道,则创建一个新的频道,代理者为 selfId
// 如果 selfId 大于 0,则将新的频道数据添加到表中
session.getChannel(id, selfId, fields)
session.getChannel(id, fields)

// 在当前会话上绑定一个可观测频道实例
// 也就是所谓的 session.channel
Expand Down
80 changes: 36 additions & 44 deletions packages/core/src/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,87 +199,79 @@ export class Session<U extends User.Field = never, G extends Channel.Field = nev
return typeof source === 'function' ? Reflect.apply(source, null, [this]) : source
}

async getChannel<K extends Channel.Field = never>(id = this.channelId, assignee = '', fields: K[] = []) {
async getChannel<K extends Channel.Field = never>(id = this.channelId, fields: K[] = []) {
const channel = await this.app.database.getChannel(this.platform, id, fields)
if (channel) return channel
const assignee = await this.resolveValue(this.app.options.autoAssign) ? this.selfId : ''
return this.app.database.createChannel(this.platform, id, { assignee })
}

/** 在当前会话上绑定一个可观测频道实例 */
async observeChannel<T extends Channel.Field = never>(fields: Iterable<T> = []): Promise<Channel.Observed<T | G>> {
const fieldSet = new Set<Channel.Field>(fields)
const { platform, channelId, channel } = this
const { platform, channelId } = this

// 对于已经绑定可观测频道的,判断字段是否需要自动补充
if (channel) {
for (const key in channel) {
// 如果存在满足可用的缓存数据,使用缓存代替数据获取
let cache = this.app._channelCache.get(this.id, this.cid)
if (cache) {
for (const key in cache) {
fieldSet.delete(key as any)
}
if (fieldSet.size) {
const data = await this.getChannel(channelId, '', [...fieldSet])
this.app._channelCache.set(this.id, this.cid, channel.$merge(data))
}
return channel as any
if (!fieldSet.size) return this.channel = cache
}

// 如果存在满足可用的缓存数据,使用缓存代替数据获取
const cache = this.app._channelCache.get(this.id, this.cid)
const fieldArray = [...fieldSet]
const hasActiveCache = cache && contain(Object.keys(cache), fieldArray)
if (hasActiveCache) return this.channel = cache as any

// 绑定一个新的可观测频道实例
const assignee = this.resolveValue(this.app.options.autoAssign) ? this.selfId : ''
const data = await this.getChannel(channelId, assignee, fieldArray)
const newChannel = observe(data, diff => this.app.database.setChannel(platform, channelId, diff), `channel ${this.cid}`)
this.app._channelCache.set(this.id, this.cid, newChannel)
return this.channel = newChannel
const data = await this.getChannel(channelId, [...fieldSet])
cache = this.app._channelCache.get(this.id, this.cid)
if (cache) {
cache.$merge(data)
} else {
cache = observe(data, diff => this.app.database.setChannel(platform, channelId, diff), `channel ${this.cid}`)
this.app._channelCache.set(this.id, this.cid, cache)
}
return this.channel = cache
}

async getUser<K extends User.Field = never>(id = this.userId, authority = 0, fields: K[] = []) {
async getUser<K extends User.Field = never>(id = this.userId, fields: K[] = []) {
const user = await this.app.database.getUser(this.platform, id, fields)
if (user) return user
const authority = await this.resolveValue(this.app.options.autoAuthorize)
return this.app.database.createUser(this.platform, id, { authority })
}

/** 在当前会话上绑定一个可观测用户实例 */
async observeUser<T extends User.Field = never>(fields: Iterable<T> = []): Promise<User.Observed<T | U>> {
const fieldSet = new Set<User.Field>(fields)
const { userId, user } = this
const { userId, platform } = this

// 对于已经绑定可观测用户的,判断字段是否需要自动补充
if (user && !this.author?.anonymous) {
for (const key in user) {
// 如果存在满足可用的缓存数据,使用缓存代替数据获取
let cache = this.app._userCache.get(this.id, this.uid)
if (cache) {
for (const key in cache) {
fieldSet.delete(key as any)
}
if (fieldSet.size) {
const data = await this.getUser(userId, 0, [...fieldSet])
this.app._userCache.set(this.id, this.uid, user.$merge(data))
}
if (!fieldSet.size) return this.user = cache
}

if (user) return user as any

// 确保匿名消息不会写回数据库
// 匿名消息不会写回数据库
if (this.author?.anonymous) {
const fallback = this.app.model.create('user')
fallback[this.platform] = this.userId
fallback[platform] = userId
fallback.authority = await this.resolveValue(this.app.options.autoAuthorize)
const user = observe(fallback, () => Promise.resolve())
return this.user = user
}

// 如果存在满足可用的缓存数据,使用缓存代替数据获取
const cache = this.app._userCache.get(this.id, this.uid)
const fieldArray = [...fieldSet]
const hasActiveCache = cache && contain(Object.keys(cache), fieldArray)
if (hasActiveCache) return this.user = cache as any

// 绑定一个新的可观测用户实例
const data = await this.getUser(userId, await this.resolveValue(this.app.options.autoAuthorize), fieldArray)
const newUser = observe(data, diff => this.app.database.setUser(this.platform, userId, diff), `user ${this.uid}`)
this.app._userCache.set(this.id, this.uid, newUser)
return this.user = newUser
const data = await this.getUser(userId, [...fieldSet])
cache = this.app._userCache.get(this.id, this.cid)
if (cache) {
cache.$merge(data)
} else {
cache = observe(data, diff => this.app.database.setUser(this.platform, userId, diff), `user ${this.uid}`)
this.app._userCache.set(this.id, this.uid, cache)
}
return this.user = cache
}

collect<T extends 'user' | 'channel'>(key: T, argv: Argv, fields = new Set<keyof Tables[T]>()) {
Expand Down

0 comments on commit ae44c54

Please sign in to comment.