Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UBERF-7690: Use query joiner for server/trigger requests #6339

Merged
merged 1 commit into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/core/src/server/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,3 +199,4 @@ export function createNullStorageFactory (): StorageAdapter {

export { AggregatorStorageAdapter, buildStorage } from './aggregator'
export { DomainIndexHelperImpl } from './domainHelper'
export { QueryJoiner } from './utils'
18 changes: 16 additions & 2 deletions server/core/src/server/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import type {
TriggerControl
} from '../types'
import { SessionContextImpl, createBroadcastEvent } from '../utils'
import { QueryJoiner } from './utils'

interface DomainInfo {
exists: boolean
Expand All @@ -101,6 +102,8 @@ export class TServerStorage implements ServerStorage {

emptyAdapter = new DummyDbAdapter()

joiner: QueryJoiner

constructor (
private readonly _domains: Record<string, string>,
private readonly defaultAdapter: string,
Expand All @@ -122,6 +125,9 @@ export class TServerStorage implements ServerStorage {
this.hierarchy = hierarchy
this.fulltext = indexFactory(this)
this.branding = options.branding
this.joiner = new QueryJoiner((ctx, _class, query, options) => {
return this.liveQuery.findAll(_class, query, { ...options, ctx } as any)
})

this.setModel(model)
}
Expand Down Expand Up @@ -949,12 +955,20 @@ export class TServerStorage implements ServerStorage {
async processTxes (ctx: SessionOperationContext, txes: Tx[]): Promise<TxResult> {
// store tx
const _findAll: ServerStorage['findAll'] = async <T extends Doc>(
ctx: MeasureContext,
_ctx: MeasureContext,
clazz: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> => {
return await this.findAll(ctx, clazz, query, { ...options, prefix: 'server' })
return await _ctx.with(
'findAll',
{ _class: clazz },
async (ctx) => await this.joiner.findAll(ctx, clazz, query, { ...options, prefix: 'server' } as any),
{
query,
options
}
)
}
const txToStore: Tx[] = []
const modelTx: Tx[] = []
Expand Down
107 changes: 107 additions & 0 deletions server/core/src/server/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
//
// Copyright © 2024 Hardcore Engineering Inc.
//
// Licensed under the Eclipse Public License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. You may
// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//
// See the License for the specific language governing permissions and
// limitations under the License.
//

import {
type Class,
type Doc,
type DocumentQuery,
type FindOptions,
type FindResult,
type MeasureContext,
type Ref
} from '@hcengineering/core'

import { deepEqual } from 'fast-equals'
import type { ServerStorage } from '../types'

interface Query {
_class: Ref<Class<Doc>>
query: DocumentQuery<Doc>
result: FindResult<Doc> | Promise<FindResult<Doc>> | undefined
options?: FindOptions<Doc>
callbacks: number
max: number
}
/**
* @public
*/
export class QueryJoiner {
private readonly queries: Map<Ref<Class<Doc>>, Query[]> = new Map<Ref<Class<Doc>>, Query[]>()

constructor (readonly _findAll: ServerStorage['findAll']) {}

async findAll<T extends Doc>(
ctx: MeasureContext,
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Promise<FindResult<T>> {
// Will find a query or add + 1 to callbacks
const q = this.findQuery(_class, query, options) ?? this.createQuery(_class, query, options)
if (q.result === undefined) {
q.result = this._findAll(ctx, _class, query, options)
}
if (q.result instanceof Promise) {
q.result = await q.result
q.callbacks--
}
this.removeFromQueue(q)

return q.result as FindResult<T>
}

private findQuery<T extends Doc>(
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Query | undefined {
const queries = this.queries.get(_class)
if (queries === undefined) return
for (const q of queries) {
if (!deepEqual(query, q.query) || !deepEqual(options, q.options)) {
continue
}
q.callbacks++
q.max++
return q
}
}

private createQuery<T extends Doc>(_class: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>): Query {
const queries = this.queries.get(_class) ?? []
const q: Query = {
_class,
query,
result: undefined,
options: options as FindOptions<Doc>,
callbacks: 1,
max: 1
}

queries.push(q)
this.queries.set(_class, queries)
return q
}

private removeFromQueue (q: Query): void {
if (q.callbacks === 0) {
const queries = this.queries.get(q._class) ?? []
this.queries.set(
q._class,
queries.filter((it) => it !== q)
)
}
}
}
83 changes: 17 additions & 66 deletions server/middleware/src/queryJoin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,32 @@
// limitations under the License.
//

import { Class, Doc, DocumentQuery, FindOptions, FindResult, MeasureContext, Ref, Tx } from '@hcengineering/core'
import { Middleware, SessionContext, TxMiddlewareResult, type ServerStorage } from '@hcengineering/server-core'
import {
type Class,
type Doc,
DocumentQuery,
FindOptions,
FindResult,
type MeasureContext,
Ref,
type Tx
} from '@hcengineering/core'
import { Middleware, type ServerStorage, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core'
import { BaseMiddleware } from './base'

import { deepEqual } from 'fast-equals'
import { QueryJoiner } from '@hcengineering/server-core'

interface Query {
_class: Ref<Class<Doc>>
query: DocumentQuery<Doc>
result: FindResult<Doc> | Promise<FindResult<Doc>> | undefined
options?: FindOptions<Doc>
callbacks: number
max: number
}
/**
* @public
*/
export class QueryJoinMiddleware extends BaseMiddleware implements Middleware {
private readonly queries: Map<Ref<Class<Doc>>, Query[]> = new Map<Ref<Class<Doc>>, Query[]>()
private readonly joiner: QueryJoiner

private constructor (storage: ServerStorage, next?: Middleware) {
super(storage, next)
this.joiner = new QueryJoiner((ctx, _class, query, options) => {
return storage.findAll(ctx, _class, query, options)
})
}

static async create (ctx: MeasureContext, storage: ServerStorage, next?: Middleware): Promise<QueryJoinMiddleware> {
Expand All @@ -52,59 +56,6 @@ export class QueryJoinMiddleware extends BaseMiddleware implements Middleware {
options?: FindOptions<T>
): Promise<FindResult<T>> {
// Will find a query or add + 1 to callbacks
const q = this.findQuery(_class, query, options) ?? this.createQuery(_class, query, options)
if (q.result === undefined) {
q.result = this.provideFindAll(ctx, _class, query, options)
}
if (q.result instanceof Promise) {
q.result = await q.result
q.callbacks--
}
this.removeFromQueue(q)

return q.result as FindResult<T>
}

private findQuery<T extends Doc>(
_class: Ref<Class<T>>,
query: DocumentQuery<T>,
options?: FindOptions<T>
): Query | undefined {
const queries = this.queries.get(_class)
if (queries === undefined) return
for (const q of queries) {
if (!deepEqual(query, q.query) || !deepEqual(options, q.options)) {
continue
}
q.callbacks++
q.max++
return q
}
}

private createQuery<T extends Doc>(_class: Ref<Class<T>>, query: DocumentQuery<T>, options?: FindOptions<T>): Query {
const queries = this.queries.get(_class) ?? []
const q: Query = {
_class,
query,
result: undefined,
options: options as FindOptions<Doc>,
callbacks: 1,
max: 1
}

queries.push(q)
this.queries.set(_class, queries)
return q
}

private removeFromQueue (q: Query): void {
if (q.callbacks === 0) {
const queries = this.queries.get(q._class) ?? []
this.queries.set(
q._class,
queries.filter((it) => it !== q)
)
}
return await this.joiner.findAll(ctx.ctx, _class, query, options)
}
}