diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index 0985cacfde8..092451bd012 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -48,13 +48,14 @@ import serverToken, { decodeToken, generateToken } from '@hcengineering/server-t import toolPlugin, { FileModelLogger } from '@hcengineering/server-tool' import { program, type Command } from 'commander' -import { MongoClient, type Db } from 'mongodb' +import { type Db, type MongoClient } from 'mongodb' import { clearTelegramHistory } from './telegram' import { diffWorkspace, updateField } from './workspace' import { getWorkspaceId, MeasureMetricsContext, + metricsToString, RateLimiter, type AccountRole, type Data, @@ -62,6 +63,7 @@ import { type Version } from '@hcengineering/core' import { consoleModelLogger, type MigrateOperation } from '@hcengineering/model' +import { getMongoClient } from '@hcengineering/mongo' import { openAIConfigDefaults } from '@hcengineering/openai' import { type StorageAdapter } from '@hcengineering/server-core' import path from 'path' @@ -129,9 +131,10 @@ export function devTool ( async function withDatabase (uri: string, f: (db: Db, client: MongoClient) => Promise): Promise { console.log(`connecting to database '${uri}'...`) - const client = await MongoClient.connect(uri) - await f(client.db(ACCOUNT_DB), client) - await client.close() + const client = getMongoClient(uri) + const _client = await client.getClient() + await f(_client.db(ACCOUNT_DB), _client) + client.close() } program.version('0.0.1') @@ -285,14 +288,29 @@ export function devTool ( program .command('upgrade-workspace ') .description('upgrade workspace') - .action(async (workspace, cmd) => { + .option('-f|--force [force]', 'Force update', true) + .action(async (workspace, cmd: { force: boolean }) => { const { mongodbUri, version, txes, migrateOperations } = prepareTools() await withDatabase(mongodbUri, async (db) => { const info = await getWorkspaceById(db, productId, workspace) if (info === null) { throw new Error(`workspace ${workspace} not found`) } - await upgradeWorkspace(version, txes, migrateOperations, productId, db, info.workspaceUrl ?? info.workspace) + + const measureCtx = new MeasureMetricsContext('upgrade', {}) + + await upgradeWorkspace( + measureCtx, + version, + txes, + migrateOperations, + productId, + db, + info.workspaceUrl ?? info.workspace, + consoleModelLogger, + cmd.force + ) + console.log(metricsToString(measureCtx.metrics, 'upgrade', 60), {}) }) }) @@ -312,6 +330,7 @@ export function devTool ( const { mongodbUri, version, txes, migrateOperations } = prepareTools() await withDatabase(mongodbUri, async (db) => { const workspaces = await listWorkspacesRaw(db, productId) + workspaces.sort((a, b) => b.lastVisit - a.lastVisit) // We need to update workspaces with missing workspaceUrl for (const ws of workspaces) { @@ -343,15 +362,16 @@ export function devTool ( console.log( '---UPGRADING----', ws.workspace, - !cmd.console ? (logger as FileModelLogger).file : '', + ws.workspaceUrl, 'pending: ', toProcess, 'ETA:', - avgTime * toProcess + Math.floor(avgTime * toProcess * 100) / 100 ) toProcess-- try { await upgradeWorkspace( + toolCtx, version, txes, migrateOperations, @@ -383,6 +403,10 @@ export function devTool ( }) ) ) + console.log('Upgrade done') + // console.log((process as any)._getActiveHandles()) + // console.log((process as any)._getActiveRequests()) + process.exit() } else { console.log('UPGRADE write logs at:', cmd.logs) for (const ws of workspaces) { diff --git a/models/chunter/src/migration.ts b/models/chunter/src/migration.ts index 63802c1752a..77a7614d092 100644 --- a/models/chunter/src/migration.ts +++ b/models/chunter/src/migration.ts @@ -13,6 +13,7 @@ // limitations under the License. // +import { chunterId } from '@hcengineering/chunter' import core, { type Class, type Doc, type Domain, type Ref, TxOperations } from '@hcengineering/core' import { type MigrateOperation, @@ -20,7 +21,6 @@ import { type MigrationUpgradeClient, tryMigrate } from '@hcengineering/model' -import { chunterId } from '@hcengineering/chunter' import activity, { DOMAIN_ACTIVITY } from '@hcengineering/model-activity' import notification from '@hcengineering/notification' @@ -35,15 +35,16 @@ export async function createDocNotifyContexts ( attachedToClass: Ref> ): Promise { const users = await client.findAll(core.class.Account, {}) + const docNotifyContexts = await client.findAll(notification.class.DocNotifyContext, { + user: { $in: users.map((it) => it._id) }, + attachedTo, + attachedToClass + }) for (const user of users) { if (user._id === core.account.System) { continue } - const docNotifyContext = await client.findOne(notification.class.DocNotifyContext, { - user: user._id, - attachedTo, - attachedToClass - }) + const docNotifyContext = docNotifyContexts.find((it) => it.user === user._id) if (docNotifyContext === undefined) { await tx.createDoc(notification.class.DocNotifyContext, core.space.Space, { diff --git a/plugins/client-resources/src/index.ts b/plugins/client-resources/src/index.ts index 490fa2ca9b4..4a3eb007835 100644 --- a/plugins/client-resources/src/index.ts +++ b/plugins/client-resources/src/index.ts @@ -85,6 +85,11 @@ export default async () => { } } function createModelPersistence (workspace: string): TxPersistenceStore | undefined { + const overrideStore = getMetadata(clientPlugin.metadata.OverridePersistenceStore) + if (overrideStore !== undefined) { + return overrideStore + } + let dbRequest: IDBOpenDBRequest | undefined let dbPromise: Promise = Promise.resolve(undefined) diff --git a/plugins/client/src/index.ts b/plugins/client/src/index.ts index aa079af9bd7..a2b1621c9cf 100644 --- a/plugins/client/src/index.ts +++ b/plugins/client/src/index.ts @@ -13,7 +13,7 @@ // limitations under the License. // -import type { AccountClient, ClientConnectEvent } from '@hcengineering/core' +import type { AccountClient, ClientConnectEvent, TxPersistenceStore } from '@hcengineering/core' import type { Plugin, Resource } from '@hcengineering/platform' import { Metadata, plugin } from '@hcengineering/platform' @@ -76,7 +76,8 @@ export default plugin(clientId, { FilterModel: '' as Metadata, ExtraPlugins: '' as Metadata, UseBinaryProtocol: '' as Metadata, - UseProtocolCompression: '' as Metadata + UseProtocolCompression: '' as Metadata, + OverridePersistenceStore: '' as Metadata }, function: { GetClient: '' as Resource diff --git a/server/account/src/index.ts b/server/account/src/index.ts index 8cff1cdd332..fb3c0de5833 100644 --- a/server/account/src/index.ts +++ b/server/account/src/index.ts @@ -840,12 +840,12 @@ export async function createWorkspace ( getWorkspaceId(initWS, productId), getWorkspaceId(workspaceInfo.workspace, productId) ) - client = await upgradeModel(getTransactor(), wsId, txes, migrationOperation, ctxModellogger) + client = await upgradeModel(ctx, getTransactor(), wsId, txes, migrationOperation, ctxModellogger) } else { client = await initModel(ctx, getTransactor(), wsId, txes, migrationOperation, ctxModellogger) } } catch (err: any) { - return { workspaceInfo, err, client: {} as any } + return { workspaceInfo, err, client: null as any } } // Workspace is created, we need to clear disabled flag. await db @@ -859,6 +859,7 @@ export async function createWorkspace ( * @public */ export async function upgradeWorkspace ( + ctx: MeasureContext, version: Data, txes: Tx[], migrationOperation: [string, MigrateOperation][], @@ -895,7 +896,7 @@ export async function upgradeWorkspace ( } ) await ( - await upgradeModel(getTransactor(), getWorkspaceId(ws.workspace, productId), txes, migrationOperation, logger) + await upgradeModel(ctx, getTransactor(), getWorkspaceId(ws.workspace, productId), txes, migrationOperation, logger) ).close() return versionStr } diff --git a/server/server/src/starter.ts b/server/server/src/starter.ts index dbb6565a185..1dd3d5a90bc 100644 --- a/server/server/src/starter.ts +++ b/server/server/src/starter.ts @@ -8,7 +8,7 @@ export function storageConfigFromEnv (): StorageConfiguration { process.env.STORAGE_CONFIG ?? '{ "default": "", "storages": []}' ) if (storageConfig.storages.length === 0 || storageConfig.default === '') { - console.info('STORAGE_CONFIG is required for complex configuration, fallback to minio config') + // 'STORAGE_CONFIG is required for complex configuration, fallback to minio config' let minioEndpoint = process.env.MINIO_ENDPOINT if (minioEndpoint === undefined) { diff --git a/server/tool/src/connect.ts b/server/tool/src/connect.ts index 70c5fbfbc3c..35e55cd4f33 100644 --- a/server/tool/src/connect.ts +++ b/server/tool/src/connect.ts @@ -15,9 +15,10 @@ // import client, { clientId } from '@hcengineering/client' -import { Client, systemAccountEmail, WorkspaceId } from '@hcengineering/core' +import { Client, LoadModelResponse, systemAccountEmail, Tx, WorkspaceId } from '@hcengineering/core' import { addLocation, getMetadata, getResource, setMetadata } from '@hcengineering/platform' import { generateToken } from '@hcengineering/server-token' +import crypto from 'node:crypto' import plugin from './plugin' /** @@ -27,7 +28,8 @@ export async function connect ( transactorUrl: string, workspace: WorkspaceId, email?: string, - extra?: Record + extra?: Record, + model?: Tx[] ): Promise { const token = generateToken(email ?? systemAccountEmail, workspace, extra) @@ -48,7 +50,25 @@ export async function connect ( }) addLocation(clientId, () => import('@hcengineering/client-resources')) - return await ( - await getResource(client.function.GetClient) - )(token, transactorUrl) + if (model !== undefined) { + let prev = '' + const hashes = model.map((it) => { + const h = crypto.createHash('sha1') + h.update(prev) + h.update(JSON.stringify(it)) + prev = h.digest('hex') + return prev + }) + setMetadata(client.metadata.OverridePersistenceStore, { + load: async () => ({ + hash: hashes[hashes.length - 1], + transactions: model, + full: true + }), + store: async (model: LoadModelResponse) => {} + }) + } + + const clientFactory = await getResource(client.function.GetClient) + return await clientFactory(token, transactorUrl) } diff --git a/server/tool/src/index.ts b/server/tool/src/index.ts index a4d86fd26ea..c7e59b88e75 100644 --- a/server/tool/src/index.ts +++ b/server/tool/src/index.ts @@ -31,10 +31,10 @@ import core, { WorkspaceId } from '@hcengineering/core' import { consoleModelLogger, MigrateOperation, ModelLogger } from '@hcengineering/model' -import { getWorkspaceDB } from '@hcengineering/mongo' +import { createMongoTxAdapter, getMongoClient, getWorkspaceDB } from '@hcengineering/mongo' import { buildStorageFromConfig, storageConfigFromEnv } from '@hcengineering/server' import { StorageAdapter, StorageConfiguration } from '@hcengineering/server-core' -import { Db, Document, MongoClient } from 'mongodb' +import { Db, Document } from 'mongodb' import { connect } from './connect' import toolPlugin from './plugin' import { MigrateClientImpl } from './upgrade' @@ -118,22 +118,29 @@ export async function initModel ( throw Error('Model txes must target only core.space.Model') } - const client = new MongoClient(mongodbUri) + const _client = getMongoClient(mongodbUri) + const client = await _client.getClient() let connection: CoreClient & BackupClient try { - await client.connect() const db = getWorkspaceDB(client, workspaceId) logger.log('creating model...', workspaceId) - const model = txes - const result = await db.collection(DOMAIN_TX).insertMany(model as Document[]) + const result = await db.collection(DOMAIN_TX).insertMany(txes as Document[]) logger.log('model transactions inserted.', { count: result.insertedCount }) logger.log('creating data...', { transactorUrl }) - connection = (await connect(transactorUrl, workspaceId, undefined, { - model: 'upgrade', - admin: 'true' - })) as unknown as CoreClient & BackupClient + const { model } = await fetchModelFromMongo(ctx, mongodbUri, workspaceId) + + connection = (await connect( + transactorUrl, + workspaceId, + undefined, + { + model: 'upgrade', + admin: 'true' + }, + model + )) as unknown as CoreClient & BackupClient try { for (const op of migrateOperations) { @@ -142,7 +149,7 @@ export async function initModel ( } // Create update indexes - await createUpdateIndexes(connection, db, logger) + await createUpdateIndexes(ctx, connection, db, logger) logger.log('create minio bucket', { workspaceId }) if (!(await minio.exists(ctx, workspaceId))) { @@ -150,9 +157,10 @@ export async function initModel ( } } catch (e: any) { logger.error('error', { error: e }) + throw e } } finally { - await client.close() + _client.close() } return connection } @@ -161,6 +169,7 @@ export async function initModel ( * @public */ export async function upgradeModel ( + ctx: MeasureContext, transactorUrl: string, workspaceId: WorkspaceId, rawTxes: Tx[], @@ -173,70 +182,117 @@ export async function upgradeModel ( throw Error('Model txes must target only core.space.Model') } - const client = new MongoClient(mongodbUri) + // const client = new MongoClient(mongodbUri) + const _client = getMongoClient(mongodbUri) + const client = await _client.getClient() try { - await client.connect() const db = getWorkspaceDB(client, workspaceId) logger.log('removing model...', { workspaceId: workspaceId.name }) // we're preserving accounts (created by core.account.System). - const result = await db.collection(DOMAIN_TX).deleteMany({ - objectSpace: core.space.Model, - modifiedBy: core.account.System, - objectClass: { $nin: [contact.class.PersonAccount, 'contact:class:EmployeeAccount'] } - }) + const result = await ctx.with( + 'mongo-delete', + {}, + async () => + await db.collection(DOMAIN_TX).deleteMany({ + objectSpace: core.space.Model, + modifiedBy: core.account.System, + objectClass: { $nin: [contact.class.PersonAccount, 'contact:class:EmployeeAccount'] } + }) + ) logger.log('transactions deleted.', { workspaceId: workspaceId.name, count: result.deletedCount }) logger.log('creating model...', { workspaceId: workspaceId.name }) - const model = txes - const insert = await db.collection(DOMAIN_TX).insertMany(model as Document[]) - logger.log('model transactions inserted.', { workspaceId: workspaceId.name, count: insert.insertedCount }) + const insert = await ctx.with( + 'mongo-insert', + {}, + async () => await db.collection(DOMAIN_TX).insertMany(txes as Document[]) + ) - const hierarchy = new Hierarchy() - const modelDb = new ModelDb(hierarchy) - for (const tx of txes) { - try { - hierarchy.tx(tx) - } catch (err: any) {} - } - for (const tx of txes) { - try { - await modelDb.tx(tx) - } catch (err: any) {} - } + logger.log('model transactions inserted.', { workspaceId: workspaceId.name, count: insert.insertedCount }) - const migrateClient = new MigrateClientImpl(db, hierarchy, modelDb, logger) - for (const op of migrateOperations) { - const t = Date.now() - await op[1].migrate(migrateClient, logger) - logger.log('migrate:', { workspaceId: workspaceId.name, operation: op[0], time: Date.now() - t }) - } + const { hierarchy, modelDb, model } = await fetchModelFromMongo(ctx, mongodbUri, workspaceId) + await ctx.with('migrate', {}, async () => { + const migrateClient = new MigrateClientImpl(db, hierarchy, modelDb, logger) + for (const op of migrateOperations) { + const t = Date.now() + await op[1].migrate(migrateClient, logger) + logger.log('migrate:', { workspaceId: workspaceId.name, operation: op[0], time: Date.now() - t }) + } + }) logger.log('Apply upgrade operations', { workspaceId: workspaceId.name }) - const connection = await connect(transactorUrl, workspaceId, undefined, { - mode: 'backup', - model: 'upgrade', - admin: 'true' - }) + const connection = await ctx.with( + 'connect-platform', + {}, + async (ctx) => + await connect( + transactorUrl, + workspaceId, + undefined, + { + mode: 'backup', + model: 'upgrade', + admin: 'true' + }, + model + ) + ) // Create update indexes - await createUpdateIndexes(connection, db, logger) - - for (const op of migrateOperations) { - const t = Date.now() - await op[1].upgrade(connection, logger) - logger.log('upgrade:', { operation: op[0], time: Date.now() - t, workspaceId: workspaceId.name }) - } + await ctx.with('create-indexes', {}, async (ctx) => { + await createUpdateIndexes(ctx, connection, db, logger) + }) + await ctx.with('upgrade', {}, async () => { + for (const op of migrateOperations) { + const t = Date.now() + await op[1].upgrade(connection, logger) + logger.log('upgrade:', { operation: op[0], time: Date.now() - t, workspaceId: workspaceId.name }) + } + }) return connection } finally { - await client.close() + _client.close() } } -async function createUpdateIndexes (connection: CoreClient, db: Db, logger: ModelLogger): Promise { - const classes = await connection.findAll(core.class.Class, {}) +async function fetchModelFromMongo ( + ctx: MeasureContext, + mongodbUri: string, + workspaceId: WorkspaceId +): Promise<{ hierarchy: Hierarchy, modelDb: ModelDb, model: Tx[] }> { + const hierarchy = new Hierarchy() + const modelDb = new ModelDb(hierarchy) + + const txAdapter = await createMongoTxAdapter(ctx, hierarchy, mongodbUri, workspaceId, modelDb) + + const model = await ctx.with('get-model', {}, async () => await txAdapter.getModel()) + + await ctx.with('build local model', {}, async () => { + for (const tx of model) { + try { + hierarchy.tx(tx) + } catch (err: any) {} + } + for (const tx of model) { + try { + await modelDb.tx(tx) + } catch (err: any) {} + } + }) + await txAdapter.close() + return { hierarchy, modelDb, model } +} + +async function createUpdateIndexes ( + ctx: MeasureContext, + connection: CoreClient, + db: Db, + logger: ModelLogger +): Promise { + const classes = await ctx.with('find-classes', {}, async () => await connection.findAll(core.class.Class, {})) const hierarchy = connection.getHierarchy() const domains = new Map>>() @@ -273,10 +329,15 @@ async function createUpdateIndexes (connection: CoreClient, db: Db, logger: Mode } } + const collections = await ctx.with( + 'list-collections', + {}, + async () => await db.listCollections({}, { nameOnly: true }).toArray() + ) for (const [d, v] of domains.entries()) { - const collInfo = await db.listCollections({ name: d }).next() - if (collInfo === null) { - await db.createCollection(d) + const collInfo = collections.find((it) => it.name === d) + if (collInfo == null) { + await ctx.with('create-collection', { d }, async () => await db.createCollection(d)) } const collection = db.collection(d) const bb: (string | FieldIndex)[] = [] @@ -287,11 +348,11 @@ async function createUpdateIndexes (connection: CoreClient, db: Db, logger: Mode const exists = await collection.indexExists(name) if (!exists) { await collection.createIndex(vv) + bb.push(vv) } } catch (err: any) { logger.error('error: failed to create index', { d, vv, err }) } - bb.push(vv) } if (bb.length > 0) { logger.log('created indexes', { d, bb })