diff --git a/packages/embedder/addEmbeddingsMetadataForRet.ts b/packages/embedder/addEmbeddingsMetadataForRet.ts index a4e6dae8c4d..1faa89c179f 100644 --- a/packages/embedder/addEmbeddingsMetadataForRet.ts +++ b/packages/embedder/addEmbeddingsMetadataForRet.ts @@ -1,6 +1,9 @@ +import {ExpressionOrFactory, SqlBool, sql} from 'kysely' import getRethink from 'parabol-server/database/rethinkDriver' import {RDatum} from 'parabol-server/database/stricterR' import getKysely from 'parabol-server/postgres/getKysely' +import {DB} from 'parabol-server/postgres/pg' +import {Logger} from 'parabol-server/utils/Logger' import RedisInstance from 'parabol-server/utils/RedisInstance' import type {Lock} from 'redlock' import Redlock from 'redlock' @@ -32,27 +35,15 @@ const insertDiscussionsIntoMetadata = async (discussions: DiscussionMeta[]) => { refId: id, objectType: 'retrospectiveDiscussionTopic' as const, teamId, - // this is technically when the discussion was created. Discussions are mutable. - // The best solution would be a date range of min(commentUpdatedAt) to max(commentUpdatedAt) + // Not techincally updatedAt since discussions are be updated after they get created refUpdatedAt: createdAt })) if (!metadataRows[0]) return - const PG_MAX_PARAMS = 65535 - const metadataColParams = Object.keys(metadataRows[0]).length - const metadataBatchSize = Math.trunc(PG_MAX_PARAMS / metadataColParams) - const insertBatches = Array.from( - {length: Math.ceil(metadataRows.length / metadataBatchSize)}, - (_, i) => metadataRows.slice(i * metadataBatchSize, i * metadataBatchSize + metadataBatchSize) - ) - await Promise.all( - insertBatches.map((batch) => { - return pg - .insertInto('EmbeddingsMetadata') - .values(batch) - .onConflict((oc) => oc.doNothing()) - .execute() - }) - ) + await pg + .insertInto('EmbeddingsMetadata') + .values(metadataRows) + .onConflict((oc) => oc.doNothing()) + .execute() return } @@ -66,9 +57,10 @@ export const addEmbeddingsMetadataForRetrospectiveDiscussionTopic = async ( // use a short lock in case the server crashes (or restarts in dev mode) lock = await redlock.acquire([`embedder_metadata_retrospectiveDiscussionTopic`], 500) } catch { - // lock not acquired, another worker must be doing the job. abort + Logger.log('Lock not acquired for embedder_metadata_retrospectiveDiscussionTopic') return } + // load up the metadata table will all discussion topics that are a part of meetings ended within the given date range const pg = getKysely() lock = await lock.extend(5000) @@ -82,30 +74,49 @@ export const addEmbeddingsMetadataForRetrospectiveDiscussionTopic = async ( lock.release() return } - - const BATCH_SIZE = 1000 + const PG_MAX_PARAMS = 65535 + const METADATA_COLS_PER_ROW = 4 + const BATCH_SIZE = Math.floor(PG_MAX_PARAMS / METADATA_COLS_PER_ROW) // cannot exceed 65535 / 4 const pgStartAt = startAt || new Date(0) - const pgEndAt = endAt || new Date('4000-01-01') + const pgEndAt = (endAt || new Date('4000-01-01')).getTime() / 1000 let curEndAt = pgEndAt + let curEndId = '' for (let i = 0; i < 1e6; i++) { + // preserve microsecond resolution to keep timestamps equal + // so we can use the ID as a tiebreaker when count(createdAt) > BATCH_SIZE + const pgTime = sql`to_timestamp(${curEndAt})` + const lessThanTimeOrId: ExpressionOrFactory = curEndId + ? ({eb}) => + eb('createdAt', '<', pgTime).or(eb('createdAt', '=', pgTime).and('id', '>', curEndId)) + : ({eb}) => eb('createdAt', '<=', pgTime) const discussions = await pg .selectFrom('Discussion') - .select(['id', 'teamId', 'createdAt', 'meetingId']) - // SQL between is a closed interval, so there will be an overlap between batches - // as long as BATCH_SIZE > COUNT(createdAt) this isn't a problem - .where(({eb}) => eb.between('createdAt', pgStartAt, curEndAt)) + .select([ + 'id', + 'teamId', + 'createdAt', + 'meetingId', + sql`extract(epoch from "createdAt")`.as('createdAtEpoch') + ]) + .where('createdAt', '>', pgStartAt) + .where(lessThanTimeOrId) .where('discussionTopicType', '=', 'reflectionGroup') .orderBy('createdAt', 'desc') + .orderBy('id') .limit(BATCH_SIZE) .execute() - if (!discussions[0]) break - const [firstDiscussion] = discussions - curEndAt = firstDiscussion.createdAt - + const earliestDiscussionInBatch = discussions.at(-1) + if (!earliestDiscussionInBatch) break + const {createdAtEpoch, id} = earliestDiscussionInBatch + curEndId = curEndAt === createdAtEpoch ? id : '' + curEndAt = createdAtEpoch const validDiscussions = await validateDiscussions(discussions) - if (validDiscussions.length === 0) break await insertDiscussionsIntoMetadata(validDiscussions) + const jsTime = new Date(createdAtEpoch * 1000) + Logger.log( + `Inserted ${validDiscussions.length}/${discussions.length} discussions in metadata ending at ${jsTime}` + ) lock = await lock.extend(5000) } lock.release() diff --git a/packages/embedder/embedder.ts b/packages/embedder/embedder.ts index a21794e6abd..699b932978e 100644 --- a/packages/embedder/embedder.ts +++ b/packages/embedder/embedder.ts @@ -59,7 +59,6 @@ const run = async () => { const publisher = new RedisInstance(`embedder_pub_${SERVER_ID}`) const isPrimaryEmbedder = await establishPrimaryEmbedder(publisher) const modelManager = getModelManager() - if (isPrimaryEmbedder) { await modelManager.maybeCreateTables() await modelManager.removeOldTriggers() @@ -87,22 +86,22 @@ const run = async () => { console.log(`\n⚡⚡⚡️️ Server ID: ${SERVER_ID}. Embedder is ready ⚡⚡⚡️️️`) - setTimeout(() => { - console.log('pub') - publisher.xadd( - 'embedderStream', - 'MAXLEN', - '~', - 1000, - '*', - 'msg', - JSON.stringify({ - objectType: 'retrospectiveDiscussionTopic', - startAt: new Date(), - endAt: new Date() - }) - ) - }, 3000) + // setTimeout(() => { + // console.log('pub') + // publisher.xadd( + // 'embedderStream', + // 'MAXLEN', + // '~', + // 1000, + // '*', + // 'msg', + // JSON.stringify({ + // objectType: 'retrospectiveDiscussionTopic', + // startAt: new Date(), + // endAt: new Date() + // }) + // ) + // }, 3000) // async iterables run indefinitely and we have 2 of them, so merge them const streams = mergeAsyncIterators([incomingStream, jobQueueStream]) diff --git a/packages/embedder/establishPrimaryEmbedder.ts b/packages/embedder/establishPrimaryEmbedder.ts index f7e176458cb..0b8444d3ce9 100644 --- a/packages/embedder/establishPrimaryEmbedder.ts +++ b/packages/embedder/establishPrimaryEmbedder.ts @@ -4,16 +4,20 @@ import Redlock from 'redlock' export const establishPrimaryEmbedder = async (redis: RedisInstance) => { const redlock = new Redlock([redis], {retryCount: 0}) - const MAX_TIME_BETWEEN_WORKER_STARTUPS = ms('5m') + const MAX_TIME_BETWEEN_WORKER_STARTUPS = ms('5s') try { const primaryWorkerLock = await redlock.acquire( [`embedder_isPrimary_${process.env.npm_package_version}`], MAX_TIME_BETWEEN_WORKER_STARTUPS ) - process.on('SIGTERM', async () => { + const kill = () => { + console.log('killing') primaryWorkerLock?.release() process.exit() - }) + } + process.on('SIGTERM', kill) + process.on('SIGINT', kill) + return true } catch { return false diff --git a/packages/embedder/importHistoricalRetrospectiveDiscussionTopic.ts b/packages/embedder/importHistoricalRetrospectiveDiscussionTopic.ts index 41f83b03cf3..673f9343c70 100644 --- a/packages/embedder/importHistoricalRetrospectiveDiscussionTopic.ts +++ b/packages/embedder/importHistoricalRetrospectiveDiscussionTopic.ts @@ -1,4 +1,5 @@ import getKysely from 'parabol-server/postgres/getKysely' +import {Logger} from 'parabol-server/utils/Logger' import RedisInstance from 'parabol-server/utils/RedisInstance' import {addEmbeddingsMetadataForRetrospectiveDiscussionTopic} from './addEmbeddingsMetadataForRet' @@ -11,19 +12,25 @@ export const importHistoricalRetrospectiveDiscussionTopic = async (redis: RedisI eb( 'EmbeddingsMetadata.refId', '=', - selectFrom('Discussion').select('Discussion.id').orderBy(['createdAt', 'id']).limit(1) + selectFrom('Discussion') + .select('Discussion.id') + .where('discussionTopicType', '=', 'reflectionGroup') + .orderBy(['createdAt', 'id']) + .limit(1) ) ) .limit(1) .executeTakeFirst() if (isEarliestMetadataImported) return - const earliestDiscussion = await pg - .selectFrom('Discussion') - .select('createdAt') - .orderBy('createdAt') + const earliestImportedDiscussion = await pg + .selectFrom('EmbeddingsMetadata') + .select(['id', 'refUpdatedAt', 'refId']) + .where('objectType', '=', 'retrospectiveDiscussionTopic') + .orderBy('refUpdatedAt') .limit(1) .executeTakeFirst() - const endAt = earliestDiscussion?.createdAt ?? undefined + const endAt = earliestImportedDiscussion?.refUpdatedAt ?? undefined + Logger.log(`Importing discussion history up to ${endAt || 'now'}`) return addEmbeddingsMetadataForRetrospectiveDiscussionTopic(redis, {endAt}) }