Skip to content

Commit

Permalink
fix: import history
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Krick <matt.krick@gmail.com>
  • Loading branch information
mattkrick committed Mar 19, 2024
1 parent 5c96c41 commit 49b349b
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 57 deletions.
73 changes: 42 additions & 31 deletions packages/embedder/addEmbeddingsMetadataForRet.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import {ExpressionOrFactory, SqlBool, sql} from 'kysely'
import getRethink from 'parabol-server/database/rethinkDriver'
import {RDatum} from 'parabol-server/database/stricterR'
import getKysely from 'parabol-server/postgres/getKysely'
import {DB} from 'parabol-server/postgres/pg'
import {Logger} from 'parabol-server/utils/Logger'
import RedisInstance from 'parabol-server/utils/RedisInstance'
import type {Lock} from 'redlock'
import Redlock from 'redlock'
Expand Down Expand Up @@ -32,27 +35,15 @@ const insertDiscussionsIntoMetadata = async (discussions: DiscussionMeta[]) => {
refId: id,
objectType: 'retrospectiveDiscussionTopic' as const,
teamId,
// this is technically when the discussion was created. Discussions are mutable.
// The best solution would be a date range of min(commentUpdatedAt) to max(commentUpdatedAt)
// Not techincally updatedAt since discussions are be updated after they get created
refUpdatedAt: createdAt
}))
if (!metadataRows[0]) return
const PG_MAX_PARAMS = 65535
const metadataColParams = Object.keys(metadataRows[0]).length
const metadataBatchSize = Math.trunc(PG_MAX_PARAMS / metadataColParams)
const insertBatches = Array.from(
{length: Math.ceil(metadataRows.length / metadataBatchSize)},
(_, i) => metadataRows.slice(i * metadataBatchSize, i * metadataBatchSize + metadataBatchSize)
)
await Promise.all(
insertBatches.map((batch) => {
return pg
.insertInto('EmbeddingsMetadata')
.values(batch)
.onConflict((oc) => oc.doNothing())
.execute()
})
)
await pg
.insertInto('EmbeddingsMetadata')
.values(metadataRows)
.onConflict((oc) => oc.doNothing())
.execute()
return
}

Expand All @@ -66,9 +57,10 @@ export const addEmbeddingsMetadataForRetrospectiveDiscussionTopic = async (
// use a short lock in case the server crashes (or restarts in dev mode)
lock = await redlock.acquire([`embedder_metadata_retrospectiveDiscussionTopic`], 500)
} catch {
// lock not acquired, another worker must be doing the job. abort
Logger.log('Lock not acquired for embedder_metadata_retrospectiveDiscussionTopic')
return
}

// load up the metadata table will all discussion topics that are a part of meetings ended within the given date range
const pg = getKysely()
lock = await lock.extend(5000)
Expand All @@ -82,30 +74,49 @@ export const addEmbeddingsMetadataForRetrospectiveDiscussionTopic = async (
lock.release()
return
}

const BATCH_SIZE = 1000
const PG_MAX_PARAMS = 65535
const METADATA_COLS_PER_ROW = 4
const BATCH_SIZE = Math.floor(PG_MAX_PARAMS / METADATA_COLS_PER_ROW) // cannot exceed 65535 / 4
const pgStartAt = startAt || new Date(0)
const pgEndAt = endAt || new Date('4000-01-01')
const pgEndAt = (endAt || new Date('4000-01-01')).getTime() / 1000

let curEndAt = pgEndAt
let curEndId = ''
for (let i = 0; i < 1e6; i++) {
// preserve microsecond resolution to keep timestamps equal
// so we can use the ID as a tiebreaker when count(createdAt) > BATCH_SIZE
const pgTime = sql<Date>`to_timestamp(${curEndAt})`
const lessThanTimeOrId: ExpressionOrFactory<DB, 'Discussion', SqlBool> = curEndId
? ({eb}) =>
eb('createdAt', '<', pgTime).or(eb('createdAt', '=', pgTime).and('id', '>', curEndId))
: ({eb}) => eb('createdAt', '<=', pgTime)
const discussions = await pg
.selectFrom('Discussion')
.select(['id', 'teamId', 'createdAt', 'meetingId'])
// SQL between is a closed interval, so there will be an overlap between batches
// as long as BATCH_SIZE > COUNT(createdAt) this isn't a problem
.where(({eb}) => eb.between('createdAt', pgStartAt, curEndAt))
.select([
'id',
'teamId',
'createdAt',
'meetingId',
sql<number>`extract(epoch from "createdAt")`.as('createdAtEpoch')
])
.where('createdAt', '>', pgStartAt)
.where(lessThanTimeOrId)
.where('discussionTopicType', '=', 'reflectionGroup')
.orderBy('createdAt', 'desc')
.orderBy('id')
.limit(BATCH_SIZE)
.execute()
if (!discussions[0]) break
const [firstDiscussion] = discussions
curEndAt = firstDiscussion.createdAt

const earliestDiscussionInBatch = discussions.at(-1)
if (!earliestDiscussionInBatch) break
const {createdAtEpoch, id} = earliestDiscussionInBatch
curEndId = curEndAt === createdAtEpoch ? id : ''
curEndAt = createdAtEpoch
const validDiscussions = await validateDiscussions(discussions)
if (validDiscussions.length === 0) break
await insertDiscussionsIntoMetadata(validDiscussions)
const jsTime = new Date(createdAtEpoch * 1000)
Logger.log(
`Inserted ${validDiscussions.length}/${discussions.length} discussions in metadata ending at ${jsTime}`
)
lock = await lock.extend(5000)
}
lock.release()
Expand Down
33 changes: 16 additions & 17 deletions packages/embedder/embedder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ const run = async () => {
const publisher = new RedisInstance(`embedder_pub_${SERVER_ID}`)
const isPrimaryEmbedder = await establishPrimaryEmbedder(publisher)
const modelManager = getModelManager()

if (isPrimaryEmbedder) {
await modelManager.maybeCreateTables()
await modelManager.removeOldTriggers()
Expand Down Expand Up @@ -87,22 +86,22 @@ const run = async () => {

console.log(`\n⚡⚡⚡️️ Server ID: ${SERVER_ID}. Embedder is ready ⚡⚡⚡️️️`)

setTimeout(() => {
console.log('pub')
publisher.xadd(
'embedderStream',
'MAXLEN',
'~',
1000,
'*',
'msg',
JSON.stringify({
objectType: 'retrospectiveDiscussionTopic',
startAt: new Date(),
endAt: new Date()
})
)
}, 3000)
// setTimeout(() => {
// console.log('pub')
// publisher.xadd(
// 'embedderStream',
// 'MAXLEN',
// '~',
// 1000,
// '*',
// 'msg',
// JSON.stringify({
// objectType: 'retrospectiveDiscussionTopic',
// startAt: new Date(),
// endAt: new Date()
// })
// )
// }, 3000)

// async iterables run indefinitely and we have 2 of them, so merge them
const streams = mergeAsyncIterators([incomingStream, jobQueueStream])
Expand Down
10 changes: 7 additions & 3 deletions packages/embedder/establishPrimaryEmbedder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,20 @@ import Redlock from 'redlock'

export const establishPrimaryEmbedder = async (redis: RedisInstance) => {
const redlock = new Redlock([redis], {retryCount: 0})
const MAX_TIME_BETWEEN_WORKER_STARTUPS = ms('5m')
const MAX_TIME_BETWEEN_WORKER_STARTUPS = ms('5s')
try {
const primaryWorkerLock = await redlock.acquire(
[`embedder_isPrimary_${process.env.npm_package_version}`],
MAX_TIME_BETWEEN_WORKER_STARTUPS
)
process.on('SIGTERM', async () => {
const kill = () => {
console.log('killing')
primaryWorkerLock?.release()
process.exit()
})
}
process.on('SIGTERM', kill)
process.on('SIGINT', kill)

return true
} catch {
return false
Expand Down
19 changes: 13 additions & 6 deletions packages/embedder/importHistoricalRetrospectiveDiscussionTopic.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import getKysely from 'parabol-server/postgres/getKysely'
import {Logger} from 'parabol-server/utils/Logger'
import RedisInstance from 'parabol-server/utils/RedisInstance'
import {addEmbeddingsMetadataForRetrospectiveDiscussionTopic} from './addEmbeddingsMetadataForRet'

Expand All @@ -11,19 +12,25 @@ export const importHistoricalRetrospectiveDiscussionTopic = async (redis: RedisI
eb(
'EmbeddingsMetadata.refId',
'=',
selectFrom('Discussion').select('Discussion.id').orderBy(['createdAt', 'id']).limit(1)
selectFrom('Discussion')
.select('Discussion.id')
.where('discussionTopicType', '=', 'reflectionGroup')
.orderBy(['createdAt', 'id'])
.limit(1)
)
)
.limit(1)
.executeTakeFirst()

if (isEarliestMetadataImported) return
const earliestDiscussion = await pg
.selectFrom('Discussion')
.select('createdAt')
.orderBy('createdAt')
const earliestImportedDiscussion = await pg
.selectFrom('EmbeddingsMetadata')
.select(['id', 'refUpdatedAt', 'refId'])
.where('objectType', '=', 'retrospectiveDiscussionTopic')
.orderBy('refUpdatedAt')
.limit(1)
.executeTakeFirst()
const endAt = earliestDiscussion?.createdAt ?? undefined
const endAt = earliestImportedDiscussion?.refUpdatedAt ?? undefined
Logger.log(`Importing discussion history up to ${endAt || 'now'}`)
return addEmbeddingsMetadataForRetrospectiveDiscussionTopic(redis, {endAt})
}

0 comments on commit 49b349b

Please sign in to comment.