From 86df73cf263d6fb9638d45ad9189e6f6bfe9e65a Mon Sep 17 00:00:00 2001 From: Andrey Sobolev Date: Thu, 15 Aug 2024 16:18:36 +0700 Subject: [PATCH] UBERF-7690: Use query joiner for server/trigger requests Signed-off-by: Andrey Sobolev --- server/core/src/server/index.ts | 1 + server/core/src/server/storage.ts | 18 ++++- server/core/src/server/utils.ts | 107 +++++++++++++++++++++++++++++ server/middleware/src/queryJoin.ts | 83 +++++----------------- 4 files changed, 141 insertions(+), 68 deletions(-) create mode 100644 server/core/src/server/utils.ts diff --git a/server/core/src/server/index.ts b/server/core/src/server/index.ts index 8065f4b20fc..a3c99bb99c9 100644 --- a/server/core/src/server/index.ts +++ b/server/core/src/server/index.ts @@ -199,3 +199,4 @@ export function createNullStorageFactory (): StorageAdapter { export { AggregatorStorageAdapter, buildStorage } from './aggregator' export { DomainIndexHelperImpl } from './domainHelper' +export { QueryJoiner } from './utils' diff --git a/server/core/src/server/storage.ts b/server/core/src/server/storage.ts index c0b21756462..88dbfee1b09 100644 --- a/server/core/src/server/storage.ts +++ b/server/core/src/server/storage.ts @@ -78,6 +78,7 @@ import type { TriggerControl } from '../types' import { SessionContextImpl, createBroadcastEvent } from '../utils' +import { QueryJoiner } from './utils' interface DomainInfo { exists: boolean @@ -101,6 +102,8 @@ export class TServerStorage implements ServerStorage { emptyAdapter = new DummyDbAdapter() + joiner: QueryJoiner + constructor ( private readonly _domains: Record, private readonly defaultAdapter: string, @@ -122,6 +125,9 @@ export class TServerStorage implements ServerStorage { this.hierarchy = hierarchy this.fulltext = indexFactory(this) this.branding = options.branding + this.joiner = new QueryJoiner((ctx, _class, query, options) => { + return this.liveQuery.findAll(_class, query, { ...options, ctx } as any) + }) this.setModel(model) } @@ -949,12 +955,20 @@ export class TServerStorage implements ServerStorage { async processTxes (ctx: SessionOperationContext, txes: Tx[]): Promise { // store tx const _findAll: ServerStorage['findAll'] = async ( - ctx: MeasureContext, + _ctx: MeasureContext, clazz: Ref>, query: DocumentQuery, options?: FindOptions ): Promise> => { - return await this.findAll(ctx, clazz, query, { ...options, prefix: 'server' }) + return await _ctx.with( + 'findAll', + { _class: clazz }, + async (ctx) => await this.joiner.findAll(ctx, clazz, query, { ...options, prefix: 'server' } as any), + { + query, + options + } + ) } const txToStore: Tx[] = [] const modelTx: Tx[] = [] diff --git a/server/core/src/server/utils.ts b/server/core/src/server/utils.ts new file mode 100644 index 00000000000..920ddfe01b5 --- /dev/null +++ b/server/core/src/server/utils.ts @@ -0,0 +1,107 @@ +// +// Copyright © 2024 Hardcore Engineering Inc. +// +// Licensed under the Eclipse Public License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. You may +// obtain a copy of the License at https://www.eclipse.org/legal/epl-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// +// See the License for the specific language governing permissions and +// limitations under the License. +// + +import { + type Class, + type Doc, + type DocumentQuery, + type FindOptions, + type FindResult, + type MeasureContext, + type Ref +} from '@hcengineering/core' + +import { deepEqual } from 'fast-equals' +import type { ServerStorage } from '../types' + +interface Query { + _class: Ref> + query: DocumentQuery + result: FindResult | Promise> | undefined + options?: FindOptions + callbacks: number + max: number +} +/** + * @public + */ +export class QueryJoiner { + private readonly queries: Map>, Query[]> = new Map>, Query[]>() + + constructor (readonly _findAll: ServerStorage['findAll']) {} + + async findAll( + ctx: MeasureContext, + _class: Ref>, + query: DocumentQuery, + options?: FindOptions + ): Promise> { + // Will find a query or add + 1 to callbacks + const q = this.findQuery(_class, query, options) ?? this.createQuery(_class, query, options) + if (q.result === undefined) { + q.result = this._findAll(ctx, _class, query, options) + } + if (q.result instanceof Promise) { + q.result = await q.result + q.callbacks-- + } + this.removeFromQueue(q) + + return q.result as FindResult + } + + private findQuery( + _class: Ref>, + query: DocumentQuery, + options?: FindOptions + ): Query | undefined { + const queries = this.queries.get(_class) + if (queries === undefined) return + for (const q of queries) { + if (!deepEqual(query, q.query) || !deepEqual(options, q.options)) { + continue + } + q.callbacks++ + q.max++ + return q + } + } + + private createQuery(_class: Ref>, query: DocumentQuery, options?: FindOptions): Query { + const queries = this.queries.get(_class) ?? [] + const q: Query = { + _class, + query, + result: undefined, + options: options as FindOptions, + callbacks: 1, + max: 1 + } + + queries.push(q) + this.queries.set(_class, queries) + return q + } + + private removeFromQueue (q: Query): void { + if (q.callbacks === 0) { + const queries = this.queries.get(q._class) ?? [] + this.queries.set( + q._class, + queries.filter((it) => it !== q) + ) + } + } +} diff --git a/server/middleware/src/queryJoin.ts b/server/middleware/src/queryJoin.ts index 17cbc5397ec..d28777579b9 100644 --- a/server/middleware/src/queryJoin.ts +++ b/server/middleware/src/queryJoin.ts @@ -13,28 +13,32 @@ // limitations under the License. // -import { Class, Doc, DocumentQuery, FindOptions, FindResult, MeasureContext, Ref, Tx } from '@hcengineering/core' -import { Middleware, SessionContext, TxMiddlewareResult, type ServerStorage } from '@hcengineering/server-core' +import { + type Class, + type Doc, + DocumentQuery, + FindOptions, + FindResult, + type MeasureContext, + Ref, + type Tx +} from '@hcengineering/core' +import { Middleware, type ServerStorage, SessionContext, TxMiddlewareResult } from '@hcengineering/server-core' import { BaseMiddleware } from './base' -import { deepEqual } from 'fast-equals' +import { QueryJoiner } from '@hcengineering/server-core' -interface Query { - _class: Ref> - query: DocumentQuery - result: FindResult | Promise> | undefined - options?: FindOptions - callbacks: number - max: number -} /** * @public */ export class QueryJoinMiddleware extends BaseMiddleware implements Middleware { - private readonly queries: Map>, Query[]> = new Map>, Query[]>() + private readonly joiner: QueryJoiner private constructor (storage: ServerStorage, next?: Middleware) { super(storage, next) + this.joiner = new QueryJoiner((ctx, _class, query, options) => { + return storage.findAll(ctx, _class, query, options) + }) } static async create (ctx: MeasureContext, storage: ServerStorage, next?: Middleware): Promise { @@ -52,59 +56,6 @@ export class QueryJoinMiddleware extends BaseMiddleware implements Middleware { options?: FindOptions ): Promise> { // Will find a query or add + 1 to callbacks - const q = this.findQuery(_class, query, options) ?? this.createQuery(_class, query, options) - if (q.result === undefined) { - q.result = this.provideFindAll(ctx, _class, query, options) - } - if (q.result instanceof Promise) { - q.result = await q.result - q.callbacks-- - } - this.removeFromQueue(q) - - return q.result as FindResult - } - - private findQuery( - _class: Ref>, - query: DocumentQuery, - options?: FindOptions - ): Query | undefined { - const queries = this.queries.get(_class) - if (queries === undefined) return - for (const q of queries) { - if (!deepEqual(query, q.query) || !deepEqual(options, q.options)) { - continue - } - q.callbacks++ - q.max++ - return q - } - } - - private createQuery(_class: Ref>, query: DocumentQuery, options?: FindOptions): Query { - const queries = this.queries.get(_class) ?? [] - const q: Query = { - _class, - query, - result: undefined, - options: options as FindOptions, - callbacks: 1, - max: 1 - } - - queries.push(q) - this.queries.set(_class, queries) - return q - } - - private removeFromQueue (q: Query): void { - if (q.callbacks === 0) { - const queries = this.queries.get(q._class) ?? [] - this.queries.set( - q._class, - queries.filter((it) => it !== q) - ) - } + return await this.joiner.findAll(ctx.ctx, _class, query, options) } }