From 8e17b13c317d2a660285daf964ff85093ab040cb Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 23 Mar 2023 10:49:03 -0500 Subject: [PATCH 01/14] wip --- .../api/com/atproto/sync/subscribeRepos.ts | 28 ++- packages/pds/src/db/tables/repo-seq.ts | 4 +- packages/pds/src/sequencer/index.ts | 192 ++++++------------ packages/pds/src/sequencer/outbox.ts | 23 +-- packages/pds/src/services/repo/index.ts | 31 ++- 5 files changed, 109 insertions(+), 169 deletions(-) diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts index a43ff14959d..f53a8059989 100644 --- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts @@ -1,7 +1,6 @@ import { Server } from '../../../../lexicon' import AppContext from '../../../../context' import Outbox from '../../../../sequencer/outbox' -import { Commit } from '../../../../lexicon/types/com/atproto/sync/subscribeRepos' import { InvalidRequestError } from '@atproto/xrpc-server' export default function (server: Server, ctx: AppContext) { @@ -32,21 +31,20 @@ export default function (server: Server, ctx: AppContext) { } for await (const evt of outbox.events(cursor, backfillTime)) { - const { seq, time, repo, commit, prev, blocks, ops, blobs } = evt - const toYield: Commit = { - $type: '#commit', - seq, - rebase: false, - tooBig: false, - repo, - commit, - blocks, - ops, - blobs, - time, - prev: prev ?? null, + if (evt.type === 'commit') { + const { commit, prev, ...rest } = evt.evt + yield { + ...rest, + $type: '#commit', + commit: commit.toString(), + prev: prev?.toString() ?? null, + } + } else if (evt.type === 'handle') { + yield { + ...evt, + $type: '#handle', + } } - yield toYield } }) } diff --git a/packages/pds/src/db/tables/repo-seq.ts b/packages/pds/src/db/tables/repo-seq.ts index 6a83cd2f189..290c852bff3 100644 --- a/packages/pds/src/db/tables/repo-seq.ts +++ b/packages/pds/src/db/tables/repo-seq.ts @@ -3,8 +3,8 @@ import { Generated, Selectable } from 'kysely' export interface RepoSeq { seq: Generated did: string - commit: string - eventType: 'repo_append' + eventType: 'append' | 'rebase' | 'handle' | 'migrate' | 'tombstone' + event: Uint8Array sequencedAt: string } diff --git a/packages/pds/src/sequencer/index.ts b/packages/pds/src/sequencer/index.ts index dddc0a6c3fc..4dcc5dde894 100644 --- a/packages/pds/src/sequencer/index.ts +++ b/packages/pds/src/sequencer/index.ts @@ -1,10 +1,10 @@ -import { BlockMap, writeCar } from '@atproto/repo' import EventEmitter from 'events' import TypedEmitter from 'typed-emitter' -import { CID } from 'multiformats/cid' import Database from '../db' import { seqLogger as log } from '../logger' import { RepoSeqEntry } from '../db/tables/repo-seq' +import { z } from 'zod' +import { cborDecode, check, schema } from '@atproto/common' export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { polling = false @@ -56,23 +56,12 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { earliestSeq?: number earliestTime?: string limit?: number - }): Promise { + }): Promise { const { earliestSeq, earliestTime, limit } = opts let seqQb = this.db.db .selectFrom('repo_seq') - .innerJoin('repo_commit_history', (join) => - join - .onRef('repo_commit_history.creator', '=', 'repo_seq.did') - .onRef('repo_commit_history.commit', '=', 'repo_seq.commit'), - ) - .select([ - 'repo_seq.seq as seq', - 'repo_seq.did as did', - 'repo_seq.commit as commit', - 'repo_seq.sequencedAt as sequencedAt', - 'repo_commit_history.prev as prev', - ]) + .selectAll() .orderBy('seq', 'asc') if (earliestSeq !== undefined) { seqQb = seqQb.where('seq', '>', earliestSeq) @@ -84,118 +73,35 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { seqQb = seqQb.limit(limit) } - const events = await seqQb.execute() - if (events.length < 1) { + const rows = await seqQb.execute() + if (rows.length < 1) { return [] } - // we don't chunk because this is only ever used with a limit of 50 - const seqs = events.map((evt) => evt.seq) - - const getBlocks = this.db.db - .selectFrom('repo_seq') - .where('repo_seq.seq', 'in', seqs) - .innerJoin('repo_commit_block', (join) => - join - .onRef('repo_commit_block.creator', '=', 'repo_seq.did') - .onRef('repo_commit_block.commit', '=', 'repo_seq.commit'), - ) - .innerJoin('ipld_block', (join) => - join - .onRef('ipld_block.cid', '=', 'repo_commit_block.block') - .onRef('ipld_block.creator', '=', 'repo_commit_block.creator'), - ) - .select([ - 'repo_seq.seq as seq', - 'ipld_block.cid as cid', - 'ipld_block.content as content', - ]) - .execute() - - const getBlobs = this.db.db - .selectFrom('repo_seq') - .where('repo_seq.seq', 'in', seqs) - .innerJoin('repo_blob', (join) => - join - .onRef('repo_blob.did', '=', 'repo_seq.did') - .onRef('repo_blob.commit', '=', 'repo_seq.commit'), - ) - .select(['repo_seq.seq as seq', 'repo_blob.cid as cid']) - .execute() - - const getOps = this.db.db - .selectFrom('repo_seq') - .where('repo_seq.seq', 'in', seqs) - .innerJoin('repo_op', (join) => - join - .onRef('repo_op.did', '=', 'repo_seq.did') - .onRef('repo_op.commit', '=', 'repo_seq.commit'), - ) - .select([ - 'repo_seq.seq as seq', - 'repo_op.action as action', - 'repo_op.path as path', - 'repo_op.cid as cid', - ]) - .execute() - - const [blocks, blobs, ops] = await Promise.all([ - getBlocks, - getBlobs, - getOps, - ]) - - const blocksBySeq = blocks.reduce((acc, cur) => { - acc[cur.seq] ??= new BlockMap() - acc[cur.seq].set(CID.parse(cur.cid), cur.content) - return acc - }, {} as Record) - - const blobsBySeq = blobs.reduce((acc, cur) => { - acc[cur.seq] ??= [] - acc[cur.seq].push(cur.cid) - return acc - }, {} as Record) - - const opsBySeq = ops.reduce((acc, cur) => { - acc[cur.seq] ??= [] - const { action, path, cid } = cur - acc[cur.seq].push({ action, path, cid }) - return acc - }, {} as Record) - - return Promise.all( - events.map(async (evt) => { - const commit = CID.parse(evt.commit) - const carSlice = await writeCar(commit, async (car) => { - const blocks = blocksBySeq[evt.seq] - if (blocks) { - for (const block of blocks.entries()) { - await car.put(block) - } - } + const seqEvts: SeqEvt[] = [] + for (const row of rows) { + const evt = cborDecode(row.event) + if (check.is(evt, commitEvt)) { + seqEvts.push({ + type: 'commit', + evt, + }) + } else if (check.is(evt, handleEvt)) { + seqEvts.push({ + type: 'handle', + evt, }) - const blobs = blobsBySeq[evt.seq] || [] - const ops = opsBySeq[evt.seq] || [] - return { - seq: evt.seq, - time: evt.sequencedAt, - repo: evt.did, - commit: evt.commit, - prev: evt.prev || undefined, - blocks: carSlice, - ops, - blobs, - } - }), - ) + } + } + + return seqEvts } async pollDb() { try { const evts = await this.requestSeqRange({ earliestSeq: this.lastSeen }) if (evts.length > 0) { - this.lastSeen = evts[evts.length - 1].seq + this.lastSeen = evts[evts.length - 1].evt.seq this.emit('events', evts) } } catch (err) { @@ -212,25 +118,43 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { } } -export type RepoAppendEvent = { - seq: number - time: string - repo: string - commit: string - prev?: string - blocks: Uint8Array - ops: RepoAppendOp[] - blobs: string[] -} - -export type RepoAppendOp = { - action: string - path: string - cid: string | null -} +const commitOp = z.object({ + action: z.union([ + z.literal('create'), + z.literal('update'), + z.literal('delete'), + ]), + path: z.string(), + cid: schema.cid.nullable(), +}) + +const commitEvt = z.object({ + seq: z.number(), + rebase: z.boolean(), + tooBig: z.boolean(), + repo: z.string(), + commit: schema.cid, + prev: schema.cid.nullable(), + blocks: schema.bytes, + ops: z.array(commitOp), + blobs: z.array(z.string()), + time: z.string(), +}) +type CommitEvt = z.infer + +const handleEvt = z.object({ + seq: z.number(), + did: z.string(), + handle: z.string(), +}) +type HandleEvt = z.infer + +type TypedCommitEvt = { type: 'commit'; evt: CommitEvt } +type TypedHandleEvt = { type: 'handle'; evt: HandleEvt } +export type SeqEvt = TypedCommitEvt | TypedHandleEvt type SequencerEvents = { - events: (evts: RepoAppendEvent[]) => void + events: (evts: SeqEvt[]) => void } export type SequencerEmitter = TypedEmitter diff --git a/packages/pds/src/sequencer/outbox.ts b/packages/pds/src/sequencer/outbox.ts index b48163b70cf..764fff688cc 100644 --- a/packages/pds/src/sequencer/outbox.ts +++ b/packages/pds/src/sequencer/outbox.ts @@ -1,5 +1,5 @@ import { AsyncBuffer, AsyncBufferFullError } from '@atproto/common' -import Sequencer, { RepoAppendEvent } from '.' +import Sequencer, { SeqEvt } from '.' export type OutboxOpts = { maxBufferSize: number @@ -9,13 +9,13 @@ export class Outbox { caughtUp = false lastSeen = -1 - cutoverBuffer: RepoAppendEvent[] - outBuffer: AsyncBuffer + cutoverBuffer: SeqEvt[] + outBuffer: AsyncBuffer constructor(public sequencer: Sequencer, opts: Partial = {}) { const { maxBufferSize = 500 } = opts this.cutoverBuffer = [] - this.outBuffer = new AsyncBuffer(maxBufferSize) + this.outBuffer = new AsyncBuffer(maxBufferSize) } // event stream occurs in 3 phases @@ -30,11 +30,11 @@ export class Outbox { async *events( backfillCursor?: number, backFillTime?: string, - ): AsyncGenerator { + ): AsyncGenerator { // catch up as much as we can if (backfillCursor !== undefined) { for await (const evt of this.getBackfill(backfillCursor, backFillTime)) { - this.lastSeen = evt.seq + this.lastSeen = evt.evt.seq yield evt } } else { @@ -72,8 +72,8 @@ export class Outbox { while (true) { try { for await (const evt of this.outBuffer.events()) { - if (evt.seq > this.lastSeen) { - this.lastSeen = evt.seq + if (evt.evt.seq > this.lastSeen) { + this.lastSeen = evt.evt.seq yield evt } } @@ -98,10 +98,9 @@ export class Outbox { for (const evt of evts) { yield evt } - // we requested 50, if we get less than that then we know we're caught up & can do cutover - if (evts.length < 50) { - break - } + // if we're within 50 of the sequencer, we call it good & switch to cutover + const seqCursor = this.sequencer.lastSeen ?? -1 + if (seqCursor - this.lastSeen < 50) break } } } diff --git a/packages/pds/src/services/repo/index.ts b/packages/pds/src/services/repo/index.ts index b7ffb9a0701..a569ff97839 100644 --- a/packages/pds/src/services/repo/index.ts +++ b/packages/pds/src/services/repo/index.ts @@ -56,7 +56,7 @@ export class RepoService { // & send to indexing this.indexWrites(writes, now), // do any other processing needed after write - this.afterWriteProcessing(did, commitData.commit, writes), + this.afterWriteProcessing(did, commitData, writes), ]) } @@ -113,13 +113,13 @@ export class RepoService { async afterWriteProcessing( did: string, - commit: CID, + commitData: CommitData, writes: PreparedWrite[], ) { await Promise.all([ - this.blobs.processWriteBlobs(did, commit, writes), - this.indexRepoOps(did, commit, writes), - this.sequenceWrite(did, commit), + this.blobs.processWriteBlobs(did, commitData.commit, writes), + this.indexRepoOps(did, commitData.commit, writes), + this.sequenceWrite(did, commitData, writes), ]) } @@ -138,7 +138,26 @@ export class RepoService { await this.db.db.insertInto('repo_op').values(ops).execute() } - async sequenceWrite(did: string, commit: CID) { + async sequenceWrite( + did: string, + commitData: CommitData, + writes: PreparedWrite[], + ) { + const ops = writes.map((w) => { + const path = w.uri.collection + '/' + w.uri.rkey + const cid = w.action === WriteOpAction.Delete ? null : w.cid + return { action: w.action, path, cid } + }) + const evt = { + rebase: false, + tooBig: false, + repo: did, + commit: commitData.commit, + prev: commitData.prev, + ops, + blobs: [], // @TODO + time: new Date().toISOString(), + } await this.db.db .insertInto('repo_seq') .values({ From 55d0da8342b6ff3ef6c9d79dedcc9acab3c79083 Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 23 Mar 2023 17:16:07 -0500 Subject: [PATCH 02/14] rework seqs --- .../api/com/atproto/sync/subscribeRepos.ts | 4 + packages/pds/src/db/database-schema.ts | 2 - .../20230323T202553064Z-rework-seq.ts | 100 ++++++++++++++++++ packages/pds/src/db/migrations/index.ts | 1 + packages/pds/src/db/tables/repo-op.ts | 13 --- packages/pds/src/db/tables/repo-seq.ts | 1 + packages/pds/src/index.ts | 1 + packages/pds/src/sequencer/index.ts | 93 ++++++++++++++-- packages/pds/src/sequencer/outbox.ts | 6 +- packages/pds/src/services/index.ts | 10 +- packages/pds/src/services/repo/index.ts | 65 +++--------- packages/pds/src/sql-repo-storage.ts | 2 +- packages/repo/src/repo.ts | 38 +++---- packages/repo/src/types.ts | 1 - 14 files changed, 231 insertions(+), 106 deletions(-) create mode 100644 packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts delete mode 100644 packages/pds/src/db/tables/repo-op.ts diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts index f53a8059989..d90d8eccc3f 100644 --- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts @@ -38,11 +38,15 @@ export default function (server: Server, ctx: AppContext) { $type: '#commit', commit: commit.toString(), prev: prev?.toString() ?? null, + seq: evt.seq, + time: evt.time, } } else if (evt.type === 'handle') { yield { ...evt, $type: '#handle', + seq: evt.seq, + time: evt.time, } } } diff --git a/packages/pds/src/db/database-schema.ts b/packages/pds/src/db/database-schema.ts index 52e32602716..cc764e8766c 100644 --- a/packages/pds/src/db/database-schema.ts +++ b/packages/pds/src/db/database-schema.ts @@ -8,7 +8,6 @@ import * as record from './tables/record' import * as repoCommitBlock from './tables/repo-commit-block' import * as repoCommitHistory from './tables/repo-commit-history' import * as ipldBlock from './tables/ipld-block' -import * as repoOp from './tables/repo-op' import * as inviteCode from './tables/invite-code' import * as notification from './tables/user-notification' import * as blob from './tables/blob' @@ -33,7 +32,6 @@ export type DatabaseSchemaType = appView.DatabaseSchemaType & repoCommitBlock.PartialDB & repoCommitHistory.PartialDB & ipldBlock.PartialDB & - repoOp.PartialDB & repoCommitBlock.PartialDB & repoCommitHistory.PartialDB & inviteCode.PartialDB & diff --git a/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts b/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts new file mode 100644 index 00000000000..a169fa46e2f --- /dev/null +++ b/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts @@ -0,0 +1,100 @@ +import { Kysely, sql } from 'kysely' +import { Dialect } from '..' + +const repoSeqTable = 'repo_seq' +const repoOpTable = 'repo_op' +const repoSeqDidIndex = 'repo_seq_did_index' +const repoSeqCommitIndex = 'repo_seq_commit_index' +const repoSeqEventTypeIndex = 'repo_seq_event_type_index' +const repoSeqSequencedAtIndex = 'repo_seq_sequenced_at_index' + +export async function up(db: Kysely, dialect: Dialect): Promise { + await db.schema.dropIndex(repoSeqCommitIndex).execute() + await db.schema.dropIndex(repoSeqDidIndex).execute() + await db.schema.dropTable(repoSeqTable).execute() + await db.schema.dropTable(repoOpTable).execute() + + let builder = db.schema.createTable(repoSeqTable) + if (dialect === 'pg') { + builder = builder + .addColumn('seq', 'bigserial', (col) => col.primaryKey()) + .addColumn('invalidatedBy', 'bigint') + } else { + builder = builder + .addColumn('seq', 'integer', (col) => col.autoIncrement().primaryKey()) + .addColumn('invalidatedBy', 'integer') + } + + const binaryDatatype = dialect === 'sqlite' ? sql`blob` : sql`bytea` + await builder + .addColumn('did', 'varchar', (col) => col.notNull()) + .addColumn('eventType', 'varchar', (col) => col.notNull()) + .addColumn('event', binaryDatatype, (col) => col.notNull()) + .addColumn('sequencedAt', 'varchar', (col) => col.notNull()) + .execute() + + await db.schema + .createIndex(repoSeqDidIndex) + .on(repoSeqTable) + .column('did') + .execute() + + await db.schema + .createIndex(repoSeqEventTypeIndex) + .on(repoSeqTable) + .column('eventType') + .execute() + + await db.schema + .createIndex(repoSeqSequencedAtIndex) + .on(repoSeqTable) + .column('sequencedAt') + .execute() +} + +export async function down( + db: Kysely, + dialect: Dialect, +): Promise { + await db.schema.dropTable(repoSeqTable).execute() + await db.schema.dropIndex(repoSeqDidIndex).execute() + await db.schema.dropIndex(repoSeqEventTypeIndex).execute() + await db.schema.dropIndex(repoSeqSequencedAtIndex).execute() + + let builder = db.schema.createTable(repoSeqTable) + if (dialect === 'pg') { + builder = builder.addColumn('seq', 'serial', (col) => col.primaryKey()) + } else { + builder = builder.addColumn('seq', 'integer', (col) => + col.autoIncrement().primaryKey(), + ) + } + await builder + .addColumn('did', 'varchar', (col) => col.notNull()) + .addColumn('commit', 'varchar', (col) => col.notNull()) + .addColumn('eventType', 'varchar', (col) => col.notNull()) + .addColumn('sequencedAt', 'varchar', (col) => col.notNull()) + .execute() + + await db.schema + .createIndex(repoSeqDidIndex) + .on(repoSeqTable) + .column('did') + .execute() + + await db.schema + .createIndex(repoSeqCommitIndex) + .on(repoSeqTable) + .column('commit') + .execute() + + await db.schema + .createTable(repoOpTable) + .addColumn('did', 'text', (col) => col.notNull()) + .addColumn('commit', 'text', (col) => col.notNull()) + .addColumn('action', 'text', (col) => col.notNull()) + .addColumn('path', 'text', (col) => col.notNull()) + .addColumn('cid', 'text') + .addPrimaryKeyConstraint('repo_op_pkey', ['did', 'commit', 'path']) + .execute() +} diff --git a/packages/pds/src/db/migrations/index.ts b/packages/pds/src/db/migrations/index.ts index 19e50432deb..5c670580cc3 100644 --- a/packages/pds/src/db/migrations/index.ts +++ b/packages/pds/src/db/migrations/index.ts @@ -28,3 +28,4 @@ export * as _20230309T012947663Z from './20230309T012947663Z-app-migration' export * as _20230310T205728933Z from './20230310T205728933Z-subscription-init' export * as _20230313T232322844Z from './20230313T232322844Z-blob-creator' export * as _20230314T023842127Z from './20230314T023842127Z-refresh-grace-period' +export * as _20230323T202553064Z from './20230323T202553064Z-rework-seq' diff --git a/packages/pds/src/db/tables/repo-op.ts b/packages/pds/src/db/tables/repo-op.ts deleted file mode 100644 index ca7b80d83dd..00000000000 --- a/packages/pds/src/db/tables/repo-op.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { WriteOpAction } from '@atproto/repo' - -export interface RepoOp { - did: string - commit: string - action: WriteOpAction - path: string - cid: string | null -} - -export const tableName = 'repo_op' - -export type PartialDB = { [tableName]: RepoOp } diff --git a/packages/pds/src/db/tables/repo-seq.ts b/packages/pds/src/db/tables/repo-seq.ts index 290c852bff3..61542068f4e 100644 --- a/packages/pds/src/db/tables/repo-seq.ts +++ b/packages/pds/src/db/tables/repo-seq.ts @@ -5,6 +5,7 @@ export interface RepoSeq { did: string eventType: 'append' | 'rebase' | 'handle' | 'migrate' | 'tombstone' event: Uint8Array + invalidatedBy: number | null sequencedAt: string } diff --git a/packages/pds/src/index.ts b/packages/pds/src/index.ts index 736e5a68657..383a91caeca 100644 --- a/packages/pds/src/index.ts +++ b/packages/pds/src/index.ts @@ -122,6 +122,7 @@ export class PDS { messageQueue, messageDispatcher, blobstore, + sequencer, imgUriBuilder, imgInvalidator, }) diff --git a/packages/pds/src/sequencer/index.ts b/packages/pds/src/sequencer/index.ts index 4dcc5dde894..f9e0fe8a5cb 100644 --- a/packages/pds/src/sequencer/index.ts +++ b/packages/pds/src/sequencer/index.ts @@ -4,7 +4,10 @@ import Database from '../db' import { seqLogger as log } from '../logger' import { RepoSeqEntry } from '../db/tables/repo-seq' import { z } from 'zod' -import { cborDecode, check, schema } from '@atproto/common' +import { cborDecode, cborEncode, check, schema } from '@atproto/common' +import { blocksToCar, CidSet, CommitData, WriteOpAction } from '@atproto/repo' +import { PreparedWrite } from '../repo' +import { CID } from 'multiformats/cid' export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { polling = false @@ -84,11 +87,15 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { if (check.is(evt, commitEvt)) { seqEvts.push({ type: 'commit', + seq: row.seq, + time: row.sequencedAt, evt, }) } else if (check.is(evt, handleEvt)) { seqEvts.push({ type: 'handle', + seq: row.seq, + time: row.sequencedAt, evt, }) } @@ -101,7 +108,7 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { try { const evts = await this.requestSeqRange({ earliestSeq: this.lastSeen }) if (evts.length > 0) { - this.lastSeen = evts[evts.length - 1].evt.seq + this.lastSeen = evts[evts.length - 1].seq this.emit('events', evts) } } catch (err) { @@ -116,9 +123,67 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { } } } + + async sequenceCommit( + dbTxn: Database, + did: string, + commitData: CommitData, + writes: PreparedWrite[], + ) { + // @TODO handle too big + const ops: CommitEvtOp[] = [] + const blobs = new CidSet() + for (const w of writes) { + const path = w.uri.collection + '/' + w.uri.rkey + let cid: CID | null + if (w.action === WriteOpAction.Delete) { + cid = null + } else { + cid = w.cid + w.blobs.forEach((blob) => { + blobs.add(blob.cid) + }) + } + ops.push({ action: w.action, path, cid }) + } + const carSlice = await blocksToCar(commitData.commit, commitData.blocks) + const evt: CommitEvt = { + rebase: false, + tooBig: false, + repo: did, + commit: commitData.commit, + prev: commitData.prev, + ops, + blocks: carSlice, + blobs: blobs.toList(), + } + await dbTxn.db + .insertInto('repo_seq') + .values({ + did, + eventType: 'append' as const, + event: cborEncode(evt), + sequencedAt: new Date().toISOString(), + }) + .execute() + await this.db.notify('repo_seq') + } + + async sequenceHandleUpdate(did: string, handle: string) { + const evt: HandleEvt = { + did, + handle, + } + await this.db.db.insertInto('repo_seq').values({ + did, + eventType: 'handle', + event: cborEncode(evt), + sequencedAt: new Date().toISOString(), + }) + } } -const commitOp = z.object({ +const commitEvtOp = z.object({ action: z.union([ z.literal('create'), z.literal('update'), @@ -127,30 +192,38 @@ const commitOp = z.object({ path: z.string(), cid: schema.cid.nullable(), }) +type CommitEvtOp = z.infer const commitEvt = z.object({ - seq: z.number(), rebase: z.boolean(), tooBig: z.boolean(), repo: z.string(), commit: schema.cid, prev: schema.cid.nullable(), blocks: schema.bytes, - ops: z.array(commitOp), - blobs: z.array(z.string()), - time: z.string(), + ops: z.array(commitEvtOp), + blobs: z.array(schema.cid), }) type CommitEvt = z.infer const handleEvt = z.object({ - seq: z.number(), did: z.string(), handle: z.string(), }) type HandleEvt = z.infer -type TypedCommitEvt = { type: 'commit'; evt: CommitEvt } -type TypedHandleEvt = { type: 'handle'; evt: HandleEvt } +type TypedCommitEvt = { + type: 'commit' + seq: number + time: string + evt: CommitEvt +} +type TypedHandleEvt = { + type: 'handle' + seq: number + time: string + evt: HandleEvt +} export type SeqEvt = TypedCommitEvt | TypedHandleEvt type SequencerEvents = { diff --git a/packages/pds/src/sequencer/outbox.ts b/packages/pds/src/sequencer/outbox.ts index 764fff688cc..e4b3370e2aa 100644 --- a/packages/pds/src/sequencer/outbox.ts +++ b/packages/pds/src/sequencer/outbox.ts @@ -34,7 +34,7 @@ export class Outbox { // catch up as much as we can if (backfillCursor !== undefined) { for await (const evt of this.getBackfill(backfillCursor, backFillTime)) { - this.lastSeen = evt.evt.seq + this.lastSeen = evt.seq yield evt } } else { @@ -72,8 +72,8 @@ export class Outbox { while (true) { try { for await (const evt of this.outBuffer.events()) { - if (evt.evt.seq > this.lastSeen) { - this.lastSeen = evt.evt.seq + if (evt.seq > this.lastSeen) { + this.lastSeen = evt.seq yield evt } } diff --git a/packages/pds/src/services/index.ts b/packages/pds/src/services/index.ts index 162a160851a..8f47bddbc8d 100644 --- a/packages/pds/src/services/index.ts +++ b/packages/pds/src/services/index.ts @@ -13,12 +13,14 @@ import { ModerationService } from './moderation' import { ActorService } from '../app-view/services/actor' import { FeedService } from '../app-view/services/feed' import { IndexingService } from '../app-view/services/indexing' +import Sequencer from '../sequencer' export function createServices(resources: { repoSigningKey: crypto.Keypair messageQueue: MessageQueue messageDispatcher: MessageDispatcher blobstore: BlobStore + sequencer: Sequencer imgUriBuilder: ImageUriBuilder imgInvalidator: ImageInvalidator }): Services { @@ -27,6 +29,7 @@ export function createServices(resources: { messageQueue, messageDispatcher, blobstore, + sequencer, imgUriBuilder, imgInvalidator, } = resources @@ -34,7 +37,12 @@ export function createServices(resources: { account: AccountService.creator(), auth: AuthService.creator(), record: RecordService.creator(messageDispatcher), - repo: RepoService.creator(repoSigningKey, messageDispatcher, blobstore), + repo: RepoService.creator( + repoSigningKey, + messageDispatcher, + blobstore, + sequencer, + ), moderation: ModerationService.creator( messageDispatcher, blobstore, diff --git a/packages/pds/src/services/repo/index.ts b/packages/pds/src/services/repo/index.ts index a569ff97839..2748289af14 100644 --- a/packages/pds/src/services/repo/index.ts +++ b/packages/pds/src/services/repo/index.ts @@ -9,6 +9,7 @@ import { PreparedCreate, PreparedWrite } from '../../repo/types' import { RepoBlobs } from './blobs' import { createWriteToOp, writeToOp } from '../../repo' import { RecordService } from '../record' +import Sequencer from '../../sequencer' export class RepoService { blobs: RepoBlobs @@ -18,6 +19,7 @@ export class RepoService { public repoSigningKey: crypto.Keypair, public messageDispatcher: MessageQueue, public blobstore: BlobStore, + public sequencer: Sequencer, ) { this.blobs = new RepoBlobs(db, blobstore) } @@ -26,9 +28,10 @@ export class RepoService { keypair: crypto.Keypair, messageDispatcher: MessageQueue, blobstore: BlobStore, + sequencer: Sequencer, ) { return (db: Database) => - new RepoService(db, keypair, messageDispatcher, blobstore) + new RepoService(db, keypair, messageDispatcher, blobstore, sequencer) } services = { @@ -39,10 +42,16 @@ export class RepoService { this.db.assertTransaction() const storage = new SqlRepoStorage(this.db, did, now) const writeOps = writes.map(createWriteToOp) - const repo = await Repo.create(storage, did, this.repoSigningKey, writeOps) + const commit = await Repo.formatInitCommit( + storage, + did, + this.repoSigningKey, + writeOps, + ) + await storage.applyCommit(commit) await Promise.all([ this.indexWrites(writes, now), - this.afterWriteProcessing(did, repo.cid, writes), + this.afterWriteProcessing(did, commit, writes), ]) } @@ -118,58 +127,10 @@ export class RepoService { ) { await Promise.all([ this.blobs.processWriteBlobs(did, commitData.commit, writes), - this.indexRepoOps(did, commitData.commit, writes), - this.sequenceWrite(did, commitData, writes), + this.sequencer.sequenceCommit(this.db, did, commitData, writes), ]) } - async indexRepoOps(did: string, commit: CID, writes: PreparedWrite[]) { - const ops = writes.map((w) => { - const path = w.uri.collection + '/' + w.uri.rkey - const cid = w.action === WriteOpAction.Delete ? null : w.cid.toString() - return { - did, - commit: commit.toString(), - action: w.action, - path, - cid, - } - }) - await this.db.db.insertInto('repo_op').values(ops).execute() - } - - async sequenceWrite( - did: string, - commitData: CommitData, - writes: PreparedWrite[], - ) { - const ops = writes.map((w) => { - const path = w.uri.collection + '/' + w.uri.rkey - const cid = w.action === WriteOpAction.Delete ? null : w.cid - return { action: w.action, path, cid } - }) - const evt = { - rebase: false, - tooBig: false, - repo: did, - commit: commitData.commit, - prev: commitData.prev, - ops, - blobs: [], // @TODO - time: new Date().toISOString(), - } - await this.db.db - .insertInto('repo_seq') - .values({ - did, - commit: commit.toString(), - eventType: 'repo_append', - sequencedAt: new Date().toISOString(), - }) - .execute() - await this.db.notify('repo_seq') - } - async deleteRepo(did: string) { this.db.assertTransaction() // delete all blocks from this did & no other did diff --git a/packages/pds/src/sql-repo-storage.ts b/packages/pds/src/sql-repo-storage.ts index a502c92142c..0f7a24a35b5 100644 --- a/packages/pds/src/sql-repo-storage.ts +++ b/packages/pds/src/sql-repo-storage.ts @@ -145,7 +145,7 @@ export class SqlRepoStorage extends RepoStorage { const commitBlocks: RepoCommitBlock[] = [] const commitHistory: RepoCommitHistory[] = [] for (const commit of commits) { - const commitCids = commit.relatedCids || [] + const commitCids: CID[] = [] for (const block of commit.blocks.entries()) { commitCids.push(block.cid) allBlocks.set(block.cid, block.bytes) diff --git a/packages/repo/src/repo.ts b/packages/repo/src/repo.ts index dc8d3a959eb..5bcbf5be1b4 100644 --- a/packages/repo/src/repo.ts +++ b/packages/repo/src/repo.ts @@ -16,8 +16,6 @@ import log from './logger' import BlockMap from './block-map' import { ReadableRepo } from './readable-repo' import * as util from './util' -import CidSet from './cid-set' -import { Block } from 'multiformats/block' type Params = { storage: RepoStorage @@ -37,16 +35,17 @@ export class Repo extends ReadableRepo { storage: RepoStorage, did: string, keypair: crypto.Keypair, - initialRecords: Record, + initialWrites: RecordCreateOp[] = [], ): Promise { const newBlocks = new BlockMap() - const relatedCids = new CidSet() let data = await MST.create(storage) - for (const [dataKey, cid] of Object.entries(initialRecords)) { - relatedCids.add(cid) + for (const record of initialWrites) { + const cid = await newBlocks.add(record.record) + const dataKey = util.formatDataKey(record.collection, record.rkey) data = await data.add(dataKey, cid) } + const unstoredData = await data.getUnstoredBlocks() newBlocks.addMap(unstoredData.blocks) @@ -65,37 +64,30 @@ export class Repo extends ReadableRepo { commit: commitCid, prev: null, blocks: newBlocks, - relatedCids: relatedCids.toList(), } } + static async createFromCommit( + storage: RepoStorage, + commit: CommitData, + ): Promise { + await storage.applyCommit(commit) + return Repo.load(storage, commit.commit) + } + static async create( storage: RepoStorage, did: string, keypair: crypto.Keypair, initialWrites: RecordCreateOp[] = [], ): Promise { - const newBlocks = new BlockMap() - const initialRecords: Record = {} - for (const record of initialWrites) { - const cid = await newBlocks.add(record.record) - const dataKey = util.formatDataKey(record.collection, record.rkey) - initialRecords[dataKey] = cid - } const commit = await Repo.formatInitCommit( storage, did, keypair, - initialRecords, + initialWrites, ) - newBlocks.addMap(commit.blocks) - await storage.applyCommit({ - commit: commit.commit, - prev: commit.prev, - blocks: newBlocks, - }) - log.info({ did }, `created repo`) - return Repo.load(storage, commit.commit) + return Repo.createFromCommit(storage, commit) } static async load(storage: RepoStorage, cid?: CID) { diff --git a/packages/repo/src/types.ts b/packages/repo/src/types.ts index f74c0ec31c5..65cc381a3b1 100644 --- a/packages/repo/src/types.ts +++ b/packages/repo/src/types.ts @@ -98,7 +98,6 @@ export type CommitBlockData = { export type CommitData = CommitBlockData & { prev: CID | null - relatedCids?: CID[] } export type RepoUpdate = CommitData & { From bd47e6a36ef1ed3333392bedad8261287ec4c80d Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 23 Mar 2023 17:26:48 -0500 Subject: [PATCH 03/14] fixed up tests --- .../api/com/atproto/sync/subscribeRepos.ts | 7 ++++++ packages/pds/src/sequencer/outbox.ts | 1 + packages/pds/tests/sequencer.test.ts | 22 +++++++++++-------- .../pds/tests/sync/subscribe-repos.test.ts | 16 ++++++-------- 4 files changed, 28 insertions(+), 18 deletions(-) diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts index d90d8eccc3f..dfb852d6846 100644 --- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts @@ -32,12 +32,19 @@ export default function (server: Server, ctx: AppContext) { for await (const evt of outbox.events(cursor, backfillTime)) { if (evt.type === 'commit') { + // @TODO clean this up after lex-refactor lands const { commit, prev, ...rest } = evt.evt + const ops = evt.evt.ops.map((op) => ({ + action: op.action, + path: op.path, + cid: op.cid?.toString() ?? null, + })) yield { ...rest, $type: '#commit', commit: commit.toString(), prev: prev?.toString() ?? null, + ops, seq: evt.seq, time: evt.time, } diff --git a/packages/pds/src/sequencer/outbox.ts b/packages/pds/src/sequencer/outbox.ts index e4b3370e2aa..e121c7e271d 100644 --- a/packages/pds/src/sequencer/outbox.ts +++ b/packages/pds/src/sequencer/outbox.ts @@ -101,6 +101,7 @@ export class Outbox { // if we're within 50 of the sequencer, we call it good & switch to cutover const seqCursor = this.sequencer.lastSeen ?? -1 if (seqCursor - this.lastSeen < 50) break + if (evts.length < 1) break } } } diff --git a/packages/pds/tests/sequencer.test.ts b/packages/pds/tests/sequencer.test.ts index bee94274832..7dca6790405 100644 --- a/packages/pds/tests/sequencer.test.ts +++ b/packages/pds/tests/sequencer.test.ts @@ -1,7 +1,7 @@ import AtpAgent from '@atproto/api' import { randomStr } from '@atproto/crypto' -import { readFromGenerator, wait } from '@atproto/common' -import Sequencer, { RepoAppendEvent } from '../src/sequencer' +import { cborEncode, readFromGenerator, wait } from '@atproto/common' +import Sequencer, { SeqEvt } from '../src/sequencer' import Outbox, { StreamConsumerTooSlowError } from '../src/sequencer/outbox' import { Database } from '../src' import { SeedClient } from './seeds/client' @@ -61,13 +61,17 @@ describe('sequencer', () => { .execute() } - const evtToDbRow = (e: RepoAppendEvent) => ({ - seq: e.seq, - did: e.repo, - commit: e.commit, - eventType: 'repo_append', - sequencedAt: e.time, - }) + const evtToDbRow = (e: SeqEvt) => { + const did = e.type === 'commit' ? e.evt.repo : e.evt.did + return { + seq: e.seq, + did, + eventType: 'append', + event: Buffer.from(cborEncode(e.evt)), + invalidatedBy: null, + sequencedAt: e.time, + } + } it('sends to outbox', async () => { const count = 20 diff --git a/packages/pds/tests/sync/subscribe-repos.test.ts b/packages/pds/tests/sync/subscribe-repos.test.ts index 9c59e9fbe36..7d242ad6fde 100644 --- a/packages/pds/tests/sync/subscribe-repos.test.ts +++ b/packages/pds/tests/sync/subscribe-repos.test.ts @@ -1,6 +1,6 @@ import AtpAgent from '@atproto/api' import { - cidForCbor, + cborDecode, HOUR, MINUTE, readFromGenerator, @@ -11,15 +11,13 @@ import * as repo from '@atproto/repo' import { getWriteLog, MemoryBlockstore, WriteOpAction } from '@atproto/repo' import { byFrame, ErrorFrame, Frame, MessageFrame } from '@atproto/xrpc-server' import { WebSocket } from 'ws' -import { - Commit as CommitEvt, - isInfo, -} from '../../src/lexicon/types/com/atproto/sync/subscribeRepos' +import { Commit as CommitEvt } from '../../src/lexicon/types/com/atproto/sync/subscribeRepos' import { AppContext, Database } from '../../src' import { SeedClient } from '../seeds/client' import basicSeed from '../seeds/basic' import { CloseFn, runTestServer } from '../_util' import { sql } from 'kysely' +import { CID } from 'multiformats/cid' describe('repo subscribe repos', () => { let serverHost: string @@ -192,8 +190,9 @@ describe('repo subscribe repos', () => { for (let i = 0; i < evts.length; i++) { const evt = evts[i].body as CommitEvt const seq = seqSlice[i] + const seqEvt = cborDecode(seq.event) as { commit: CID } expect(evt.time).toEqual(seq.sequencedAt) - expect(evt.commit).toEqual(seq.commit) + expect(evt.commit).toEqual(seqEvt.commit.toString()) expect(evt.repo).toEqual(seq.did) } }) @@ -201,12 +200,11 @@ describe('repo subscribe repos', () => { it('sends info frame on out of date cursor', async () => { // we stick three new seqs in with a date past the backfill cutoff // then we increment the sequence number of everything else to test out of date cursor - const cid = await cidForCbor({ test: 123 }) const overAnHourAgo = new Date(Date.now() - HOUR - MINUTE).toISOString() const dummySeq = { did: 'did:example:test', - commit: cid.toString(), - eventType: 'repo_append' as const, + eventType: 'append' as const, + event: new Uint8Array([1, 2, 3, 4]), sequencedAt: overAnHourAgo, } const newRows = await db.db From 6890991d754327389247edfdbf7b3656d911b6de Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 23 Mar 2023 17:49:07 -0500 Subject: [PATCH 04/14] bug fixing --- packages/pds/src/api/com/atproto/sync/subscribeRepos.ts | 2 ++ .../pds/src/db/migrations/20230323T202553064Z-rework-seq.ts | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts index dfb852d6846..60ac6e15490 100644 --- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts @@ -39,12 +39,14 @@ export default function (server: Server, ctx: AppContext) { path: op.path, cid: op.cid?.toString() ?? null, })) + const blobs = evt.evt.blobs.map((blob) => blob.toString()) yield { ...rest, $type: '#commit', commit: commit.toString(), prev: prev?.toString() ?? null, ops, + blobs, seq: evt.seq, time: evt.time, } diff --git a/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts b/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts index a169fa46e2f..30e82780d52 100644 --- a/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts +++ b/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts @@ -56,10 +56,10 @@ export async function down( db: Kysely, dialect: Dialect, ): Promise { - await db.schema.dropTable(repoSeqTable).execute() - await db.schema.dropIndex(repoSeqDidIndex).execute() - await db.schema.dropIndex(repoSeqEventTypeIndex).execute() await db.schema.dropIndex(repoSeqSequencedAtIndex).execute() + await db.schema.dropIndex(repoSeqEventTypeIndex).execute() + await db.schema.dropIndex(repoSeqDidIndex).execute() + await db.schema.dropTable(repoSeqTable).execute() let builder = db.schema.createTable(repoSeqTable) if (dialect === 'pg') { From 8de76f586e5c8962b29ea717e88e5852dc1d0a43 Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 23 Mar 2023 18:17:48 -0500 Subject: [PATCH 05/14] sequence handles & notify in dbTxn --- packages/pds/src/sequencer/index.ts | 22 +++++++++++-------- packages/pds/src/services/account/index.ts | 8 ++++--- packages/pds/src/services/index.ts | 3 ++- packages/pds/src/services/moderation/index.ts | 6 ++++- packages/pds/src/services/moderation/views.ts | 9 ++++++-- packages/pds/tests/sequencer.test.ts | 4 +++- 6 files changed, 35 insertions(+), 17 deletions(-) diff --git a/packages/pds/src/sequencer/index.ts b/packages/pds/src/sequencer/index.ts index f9e0fe8a5cb..7b4d7362aa1 100644 --- a/packages/pds/src/sequencer/index.ts +++ b/packages/pds/src/sequencer/index.ts @@ -161,25 +161,29 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { .insertInto('repo_seq') .values({ did, - eventType: 'append' as const, + eventType: 'append', event: cborEncode(evt), sequencedAt: new Date().toISOString(), }) .execute() - await this.db.notify('repo_seq') + await dbTxn.notify('repo_seq') } - async sequenceHandleUpdate(did: string, handle: string) { + async sequenceHandleUpdate(dbTxn: Database, did: string, handle: string) { const evt: HandleEvt = { did, handle, } - await this.db.db.insertInto('repo_seq').values({ - did, - eventType: 'handle', - event: cborEncode(evt), - sequencedAt: new Date().toISOString(), - }) + await dbTxn.db + .insertInto('repo_seq') + .values({ + did, + eventType: 'handle', + event: cborEncode(evt), + sequencedAt: new Date().toISOString(), + }) + .execute() + await dbTxn.notify('repo_seq') } } diff --git a/packages/pds/src/services/account/index.ts b/packages/pds/src/services/account/index.ts index a503a83b1ad..883dec3cc0c 100644 --- a/packages/pds/src/services/account/index.ts +++ b/packages/pds/src/services/account/index.ts @@ -10,12 +10,13 @@ import { Record as DeclarationRecord } from '../../lexicon/types/app/bsky/system import { notSoftDeletedClause } from '../../db/util' import { getUserSearchQueryPg, getUserSearchQuerySqlite } from '../util/search' import { paginate, TimeCidKeyset } from '../../db/pagination' +import Sequencer from '../../sequencer' export class AccountService { - constructor(public db: Database) {} + constructor(public db: Database, public sequencer: Sequencer) {} - static creator() { - return (db: Database) => new AccountService(db) + static creator(sequencer: Sequencer) { + return (db: Database) => new AccountService(db, sequencer) } async getUser( @@ -150,6 +151,7 @@ export class AccountService { if (res.numUpdatedRows < 1) { throw new UserAlreadyExistsError() } + await this.sequencer.sequenceHandleUpdate(this.db, did, handle) } async updateUserPassword(did: string, password: string) { diff --git a/packages/pds/src/services/index.ts b/packages/pds/src/services/index.ts index 8f47bddbc8d..209466eabd4 100644 --- a/packages/pds/src/services/index.ts +++ b/packages/pds/src/services/index.ts @@ -34,7 +34,7 @@ export function createServices(resources: { imgInvalidator, } = resources return { - account: AccountService.creator(), + account: AccountService.creator(sequencer), auth: AuthService.creator(), record: RecordService.creator(messageDispatcher), repo: RepoService.creator( @@ -46,6 +46,7 @@ export function createServices(resources: { moderation: ModerationService.creator( messageDispatcher, blobstore, + sequencer, imgUriBuilder, imgInvalidator, ), diff --git a/packages/pds/src/services/moderation/index.ts b/packages/pds/src/services/moderation/index.ts index e70239f841c..76288ba4ee2 100644 --- a/packages/pds/src/services/moderation/index.ts +++ b/packages/pds/src/services/moderation/index.ts @@ -11,12 +11,14 @@ import { ModerationViews } from './views' import SqlRepoStorage from '../../sql-repo-storage' import { ImageInvalidator } from '../../image/invalidator' import { ImageUriBuilder } from '../../image/uri' +import Sequencer from '../../sequencer' export class ModerationService { constructor( public db: Database, public messageDispatcher: MessageQueue, public blobstore: BlobStore, + public sequencer: Sequencer, public imgUriBuilder: ImageUriBuilder, public imgInvalidator: ImageInvalidator, ) {} @@ -24,6 +26,7 @@ export class ModerationService { static creator( messageDispatcher: MessageQueue, blobstore: BlobStore, + sequencer: Sequencer, imgUriBuilder: ImageUriBuilder, imgInvalidator: ImageInvalidator, ) { @@ -32,12 +35,13 @@ export class ModerationService { db, messageDispatcher, blobstore, + sequencer, imgUriBuilder, imgInvalidator, ) } - views = new ModerationViews(this.db, this.messageDispatcher) + views = new ModerationViews(this.db, this.messageDispatcher, this.sequencer) services = { record: RecordService.creator(this.messageDispatcher), diff --git a/packages/pds/src/services/moderation/views.ts b/packages/pds/src/services/moderation/views.ts index 6a106b86840..3c56ada9840 100644 --- a/packages/pds/src/services/moderation/views.ts +++ b/packages/pds/src/services/moderation/views.ts @@ -26,12 +26,17 @@ import { OutputSchema as ReportOutput } from '../../lexicon/types/com/atproto/re import { ModerationAction, ModerationReport } from '../../db/tables/moderation' import { AccountService } from '../account' import { RecordService } from '../record' +import Sequencer from '../../sequencer' export class ModerationViews { - constructor(private db: Database, private messageDispatcher: MessageQueue) {} + constructor( + private db: Database, + private messageDispatcher: MessageQueue, + private sequencer: Sequencer, + ) {} services = { - account: AccountService.creator(), + account: AccountService.creator(this.sequencer), record: RecordService.creator(this.messageDispatcher), } diff --git a/packages/pds/tests/sequencer.test.ts b/packages/pds/tests/sequencer.test.ts index 7dca6790405..80ec75778d0 100644 --- a/packages/pds/tests/sequencer.test.ts +++ b/packages/pds/tests/sequencer.test.ts @@ -92,9 +92,11 @@ describe('sequencer', () => { totalEvts += count const outbox = new Outbox(sequencer) const [evts] = await Promise.all([ - readFromGenerator(outbox.events(-1)), + readFromGenerator(outbox.events(-1), undefined, 10000), createPosts(count), ]) + const res = await db.db.selectFrom('repo_seq').select('seq').execute() + expect(evts.length).toBe(totalEvts) const fromDb = await loadFromDb(-1) From 5090601d65399e3fc170598189f26c0a46542905 Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 23 Mar 2023 18:18:09 -0500 Subject: [PATCH 06/14] tidy --- packages/pds/tests/sequencer.test.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/pds/tests/sequencer.test.ts b/packages/pds/tests/sequencer.test.ts index 80ec75778d0..9ce4273aaf6 100644 --- a/packages/pds/tests/sequencer.test.ts +++ b/packages/pds/tests/sequencer.test.ts @@ -95,8 +95,6 @@ describe('sequencer', () => { readFromGenerator(outbox.events(-1), undefined, 10000), createPosts(count), ]) - const res = await db.db.selectFrom('repo_seq').select('seq').execute() - expect(evts.length).toBe(totalEvts) const fromDb = await loadFromDb(-1) From 4ec8e338394766d9d40409934ec9ef19425ba083 Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 23 Mar 2023 18:19:07 -0500 Subject: [PATCH 07/14] update lex to include times --- lexicons/com/atproto/sync/subscribeRepos.json | 9 ++++--- packages/api/src/client/lexicons.ts | 27 +++++++++++++------ .../types/com/atproto/sync/subscribeRepos.ts | 3 +++ packages/pds/src/lexicon/lexicons.ts | 9 +++++++ .../types/com/atproto/sync/subscribeRepos.ts | 3 +++ 5 files changed, 40 insertions(+), 11 deletions(-) diff --git a/lexicons/com/atproto/sync/subscribeRepos.json b/lexicons/com/atproto/sync/subscribeRepos.json index 2862adc4999..94e20081990 100644 --- a/lexicons/com/atproto/sync/subscribeRepos.json +++ b/lexicons/com/atproto/sync/subscribeRepos.json @@ -62,7 +62,8 @@ "properties": { "seq": {"type": "integer"}, "did": {"type": "string"}, - "handle": {"type": "string"} + "handle": {"type": "string"}, + "time": {"type": "datetime"} } }, "migrate": { @@ -72,7 +73,8 @@ "properties": { "seq": {"type": "integer"}, "did": {"type": "string"}, - "migrateTo": {"type": "string"} + "migrateTo": {"type": "string"}, + "time": {"type": "datetime"} } }, "tombstone": { @@ -80,7 +82,8 @@ "required": ["seq", "did"], "properties": { "seq": {"type": "integer"}, - "did": {"type": "string"} + "did": {"type": "string"}, + "time": {"type": "datetime"} } }, "info": { diff --git a/packages/api/src/client/lexicons.ts b/packages/api/src/client/lexicons.ts index 5a91963da03..f4ed6694d7c 100644 --- a/packages/api/src/client/lexicons.ts +++ b/packages/api/src/client/lexicons.ts @@ -2354,14 +2354,16 @@ export const schemaDict = { }, }, message: { - type: 'union', - refs: [ - 'lex:com.atproto.sync.subscribeRepos#commit', - 'lex:com.atproto.sync.subscribeRepos#handle', - 'lex:com.atproto.sync.subscribeRepos#migrate', - 'lex:com.atproto.sync.subscribeRepos#tombstone', - 'lex:com.atproto.sync.subscribeRepos#info', - ], + schema: { + type: 'union', + refs: [ + 'lex:com.atproto.sync.subscribeRepos#commit', + 'lex:com.atproto.sync.subscribeRepos#handle', + 'lex:com.atproto.sync.subscribeRepos#migrate', + 'lex:com.atproto.sync.subscribeRepos#tombstone', + 'lex:com.atproto.sync.subscribeRepos#info', + ], + }, }, errors: [ { @@ -2438,6 +2440,9 @@ export const schemaDict = { handle: { type: 'string', }, + time: { + type: 'datetime', + }, }, }, migrate: { @@ -2454,6 +2459,9 @@ export const schemaDict = { migrateTo: { type: 'string', }, + time: { + type: 'datetime', + }, }, }, tombstone: { @@ -2466,6 +2474,9 @@ export const schemaDict = { did: { type: 'string', }, + time: { + type: 'datetime', + }, }, }, info: { diff --git a/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts b/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts index 838d416a9bb..91dafca2f7c 100644 --- a/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts +++ b/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts @@ -37,6 +37,7 @@ export interface Handle { seq: number did: string handle: string + time?: string [k: string]: unknown } @@ -56,6 +57,7 @@ export interface Migrate { seq: number did: string migrateTo: string | null + time?: string [k: string]: unknown } @@ -74,6 +76,7 @@ export function validateMigrate(v: unknown): ValidationResult { export interface Tombstone { seq: number did: string + time?: string [k: string]: unknown } diff --git a/packages/pds/src/lexicon/lexicons.ts b/packages/pds/src/lexicon/lexicons.ts index 058e4fc7504..f4ed6694d7c 100644 --- a/packages/pds/src/lexicon/lexicons.ts +++ b/packages/pds/src/lexicon/lexicons.ts @@ -2440,6 +2440,9 @@ export const schemaDict = { handle: { type: 'string', }, + time: { + type: 'datetime', + }, }, }, migrate: { @@ -2456,6 +2459,9 @@ export const schemaDict = { migrateTo: { type: 'string', }, + time: { + type: 'datetime', + }, }, }, tombstone: { @@ -2468,6 +2474,9 @@ export const schemaDict = { did: { type: 'string', }, + time: { + type: 'datetime', + }, }, }, info: { diff --git a/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts index b1f47fa5c70..bf7eda4f6c7 100644 --- a/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts @@ -58,6 +58,7 @@ export interface Handle { seq: number did: string handle: string + time?: string [k: string]: unknown } @@ -77,6 +78,7 @@ export interface Migrate { seq: number did: string migrateTo: string | null + time?: string [k: string]: unknown } @@ -95,6 +97,7 @@ export function validateMigrate(v: unknown): ValidationResult { export interface Tombstone { seq: number did: string + time?: string [k: string]: unknown } From 912de10bdf9c5a6f34aa5f6d2be1bd4c824dc05f Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 23 Mar 2023 18:53:14 -0500 Subject: [PATCH 08/14] test syncing handle changes --- lexicons/com/atproto/sync/subscribeRepos.json | 6 +-- packages/api/src/client/lexicons.ts | 6 +-- .../types/com/atproto/sync/subscribeRepos.ts | 6 +-- .../api/com/atproto/sync/subscribeRepos.ts | 19 ++++--- packages/pds/src/lexicon/lexicons.ts | 6 +-- .../types/com/atproto/sync/subscribeRepos.ts | 6 +-- packages/pds/src/sequencer/index.ts | 14 +++++- packages/pds/tests/seeds/client.ts | 7 +++ .../pds/tests/sync/subscribe-repos.test.ts | 49 ++++++++++++++++--- 9 files changed, 90 insertions(+), 29 deletions(-) diff --git a/lexicons/com/atproto/sync/subscribeRepos.json b/lexicons/com/atproto/sync/subscribeRepos.json index 94e20081990..fb04f74c5c4 100644 --- a/lexicons/com/atproto/sync/subscribeRepos.json +++ b/lexicons/com/atproto/sync/subscribeRepos.json @@ -58,7 +58,7 @@ }, "handle": { "type": "object", - "required": ["seq", "did", "handle"], + "required": ["seq", "did", "handle", "time"], "properties": { "seq": {"type": "integer"}, "did": {"type": "string"}, @@ -68,7 +68,7 @@ }, "migrate": { "type": "object", - "required": ["seq", "did", "migrateTo"], + "required": ["seq", "did", "migrateTo", "time"], "nullable": ["migrateTo"], "properties": { "seq": {"type": "integer"}, @@ -79,7 +79,7 @@ }, "tombstone": { "type": "object", - "required": ["seq", "did"], + "required": ["seq", "did", "time"], "properties": { "seq": {"type": "integer"}, "did": {"type": "string"}, diff --git a/packages/api/src/client/lexicons.ts b/packages/api/src/client/lexicons.ts index f4ed6694d7c..ff1d1f1a15f 100644 --- a/packages/api/src/client/lexicons.ts +++ b/packages/api/src/client/lexicons.ts @@ -2429,7 +2429,7 @@ export const schemaDict = { }, handle: { type: 'object', - required: ['seq', 'did', 'handle'], + required: ['seq', 'did', 'handle', 'time'], properties: { seq: { type: 'integer', @@ -2447,7 +2447,7 @@ export const schemaDict = { }, migrate: { type: 'object', - required: ['seq', 'did', 'migrateTo'], + required: ['seq', 'did', 'migrateTo', 'time'], nullable: ['migrateTo'], properties: { seq: { @@ -2466,7 +2466,7 @@ export const schemaDict = { }, tombstone: { type: 'object', - required: ['seq', 'did'], + required: ['seq', 'did', 'time'], properties: { seq: { type: 'integer', diff --git a/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts b/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts index 91dafca2f7c..f5c566b2fc1 100644 --- a/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts +++ b/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts @@ -37,7 +37,7 @@ export interface Handle { seq: number did: string handle: string - time?: string + time: string [k: string]: unknown } @@ -57,7 +57,7 @@ export interface Migrate { seq: number did: string migrateTo: string | null - time?: string + time: string [k: string]: unknown } @@ -76,7 +76,7 @@ export function validateMigrate(v: unknown): ValidationResult { export interface Tombstone { seq: number did: string - time?: string + time: string [k: string]: unknown } diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts index 60ac6e15490..0e681b17b0f 100644 --- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts @@ -33,7 +33,9 @@ export default function (server: Server, ctx: AppContext) { for await (const evt of outbox.events(cursor, backfillTime)) { if (evt.type === 'commit') { // @TODO clean this up after lex-refactor lands - const { commit, prev, ...rest } = evt.evt + const { rebase, tooBig, repo, blocks } = evt.evt + const commit = evt.evt.commit.toString() + const prev = evt.evt.prev?.toString() ?? null const ops = evt.evt.ops.map((op) => ({ action: op.action, path: op.path, @@ -41,20 +43,25 @@ export default function (server: Server, ctx: AppContext) { })) const blobs = evt.evt.blobs.map((blob) => blob.toString()) yield { - ...rest, $type: '#commit', - commit: commit.toString(), - prev: prev?.toString() ?? null, + seq: evt.seq, + rebase, + tooBig, + repo, + commit, + prev, + blocks, ops, blobs, - seq: evt.seq, time: evt.time, } } else if (evt.type === 'handle') { + const { handle, did } = evt.evt yield { - ...evt, $type: '#handle', seq: evt.seq, + handle, + did, time: evt.time, } } diff --git a/packages/pds/src/lexicon/lexicons.ts b/packages/pds/src/lexicon/lexicons.ts index f4ed6694d7c..ff1d1f1a15f 100644 --- a/packages/pds/src/lexicon/lexicons.ts +++ b/packages/pds/src/lexicon/lexicons.ts @@ -2429,7 +2429,7 @@ export const schemaDict = { }, handle: { type: 'object', - required: ['seq', 'did', 'handle'], + required: ['seq', 'did', 'handle', 'time'], properties: { seq: { type: 'integer', @@ -2447,7 +2447,7 @@ export const schemaDict = { }, migrate: { type: 'object', - required: ['seq', 'did', 'migrateTo'], + required: ['seq', 'did', 'migrateTo', 'time'], nullable: ['migrateTo'], properties: { seq: { @@ -2466,7 +2466,7 @@ export const schemaDict = { }, tombstone: { type: 'object', - required: ['seq', 'did'], + required: ['seq', 'did', 'time'], properties: { seq: { type: 'integer', diff --git a/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts index bf7eda4f6c7..d9a8b9ad750 100644 --- a/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts @@ -58,7 +58,7 @@ export interface Handle { seq: number did: string handle: string - time?: string + time: string [k: string]: unknown } @@ -78,7 +78,7 @@ export interface Migrate { seq: number did: string migrateTo: string | null - time?: string + time: string [k: string]: unknown } @@ -97,7 +97,7 @@ export function validateMigrate(v: unknown): ValidationResult { export interface Tombstone { seq: number did: string - time?: string + time: string [k: string]: unknown } diff --git a/packages/pds/src/sequencer/index.ts b/packages/pds/src/sequencer/index.ts index 7b4d7362aa1..2ac685e67b1 100644 --- a/packages/pds/src/sequencer/index.ts +++ b/packages/pds/src/sequencer/index.ts @@ -66,6 +66,7 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { .selectFrom('repo_seq') .selectAll() .orderBy('seq', 'asc') + .where('invalidatedBy', 'is', null) if (earliestSeq !== undefined) { seqQb = seqQb.where('seq', '>', earliestSeq) } @@ -174,7 +175,7 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { did, handle, } - await dbTxn.db + const res = await dbTxn.db .insertInto('repo_seq') .values({ did, @@ -182,6 +183,17 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { event: cborEncode(evt), sequencedAt: new Date().toISOString(), }) + .returningAll() + .executeTakeFirst() + if (!res) { + throw new Error(`could not sequence handle change: ${evt}`) + } + await dbTxn.db + .updateTable('repo_seq') + .where('eventType', '=', 'handle') + .where('did', '=', did) + .where('seq', '!=', res.seq) + .set({ invalidatedBy: res.seq }) .execute() await dbTxn.notify('repo_seq') } diff --git a/packages/pds/tests/seeds/client.ts b/packages/pds/tests/seeds/client.ts index 68aac8b2aa8..fcfdfb742cf 100644 --- a/packages/pds/tests/seeds/client.ts +++ b/packages/pds/tests/seeds/client.ts @@ -135,6 +135,13 @@ export class SeedClient { return this.accounts[account.did] } + async updateHandle(by: string, handle: string) { + await this.agent.api.com.atproto.handle.update( + { handle }, + { encoding: 'application/json', headers: this.getHeaders(by) }, + ) + } + async createProfile( by: string, displayName: string, diff --git a/packages/pds/tests/sync/subscribe-repos.test.ts b/packages/pds/tests/sync/subscribe-repos.test.ts index 7d242ad6fde..2bdd6b81f49 100644 --- a/packages/pds/tests/sync/subscribe-repos.test.ts +++ b/packages/pds/tests/sync/subscribe-repos.test.ts @@ -11,7 +11,10 @@ import * as repo from '@atproto/repo' import { getWriteLog, MemoryBlockstore, WriteOpAction } from '@atproto/repo' import { byFrame, ErrorFrame, Frame, MessageFrame } from '@atproto/xrpc-server' import { WebSocket } from 'ws' -import { Commit as CommitEvt } from '../../src/lexicon/types/com/atproto/sync/subscribeRepos' +import { + Commit as CommitEvt, + Handle as HandleEvt, +} from '../../src/lexicon/types/com/atproto/sync/subscribeRepos' import { AppContext, Database } from '../../src' import { SeedClient } from '../seeds/client' import basicSeed from '../seeds/basic' @@ -67,11 +70,25 @@ describe('repo subscribe repos', () => { return repo.Repo.load(storage, synced.root) } - const verifyEvents = async (evts: Frame[]) => { + const verifyHandleEvent = (frame: Frame, did: string, handle: string) => { + if (!(frame instanceof MessageFrame)) { + throw new Error('expected meesage frame') + } + expect(frame.header.t).toEqual('#handle') + const evt = frame.body as HandleEvt + expect(evt.did).toBe(did) + expect(evt.handle).toBe(handle) + expect(typeof evt.time).toBe('string') + expect(typeof evt.seq).toBe('number') + } + + const verifyCommitEvents = async (evts: Frame[]) => { const byUser = evts.reduce((acc, cur) => { - const evt = cur.body as CommitEvt - acc[evt.repo] ??= [] - acc[evt.repo].push(evt) + if (cur instanceof MessageFrame && cur.header.t === '#commit') { + const evt = cur.body as CommitEvt + acc[evt.repo] ??= [] + acc[evt.repo].push(evt) + } return acc }, {} as Record) @@ -131,7 +148,25 @@ describe('repo subscribe repos', () => { const evts = await readFromGenerator(gen) ws.terminate() - await verifyEvents(evts) + await verifyCommitEvents(evts) + }) + + it('syncs handle changes', async () => { + await sc.updateHandle(alice, 'alice2.test') + await sc.updateHandle(bob, 'bob2.test') + + const ws = new WebSocket( + `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`, + ) + + const gen = byFrame(ws) + const evts = await readFromGenerator(gen) + ws.terminate() + + await verifyCommitEvents(evts) + const lastTwo = evts.slice(-2) + verifyHandleEvent(lastTwo[0], alice, 'alice2.test') + verifyHandleEvent(lastTwo[1], bob, 'bob2.test') }) it('syncs new events', async () => { @@ -147,7 +182,7 @@ describe('repo subscribe repos', () => { const [evts] = await Promise.all([readAfterDelay(), makePosts()]) - await verifyEvents(evts) + await verifyCommitEvents(evts) }) it('handles no backfill', async () => { From a376d26fac2e81dcfde6500c4a6ecf7c82473d97 Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 23 Mar 2023 19:01:22 -0500 Subject: [PATCH 09/14] one more fix --- .../pds/tests/sync/subscribe-repos.test.ts | 45 +++++++++++++++---- 1 file changed, 37 insertions(+), 8 deletions(-) diff --git a/packages/pds/tests/sync/subscribe-repos.test.ts b/packages/pds/tests/sync/subscribe-repos.test.ts index 2bdd6b81f49..5e27086da74 100644 --- a/packages/pds/tests/sync/subscribe-repos.test.ts +++ b/packages/pds/tests/sync/subscribe-repos.test.ts @@ -21,6 +21,7 @@ import basicSeed from '../seeds/basic' import { CloseFn, runTestServer } from '../_util' import { sql } from 'kysely' import { CID } from 'multiformats/cid' +import handle from '../../src/api/com/atproto/handle' describe('repo subscribe repos', () => { let serverHost: string @@ -70,12 +71,17 @@ describe('repo subscribe repos', () => { return repo.Repo.load(storage, synced.root) } - const verifyHandleEvent = (frame: Frame, did: string, handle: string) => { - if (!(frame instanceof MessageFrame)) { - throw new Error('expected meesage frame') + const getHandleEvts = (frames: Frame[]): HandleEvt[] => { + const evts: HandleEvt[] = [] + for (const frame of frames) { + if (frame instanceof MessageFrame && frame.header.t === '#handle') { + evts.push(frame.body) + } } - expect(frame.header.t).toEqual('#handle') - const evt = frame.body as HandleEvt + return evts + } + + const verifyHandleEvent = (evt: HandleEvt, did: string, handle: string) => { expect(evt.did).toBe(did) expect(evt.handle).toBe(handle) expect(typeof evt.time).toBe('string') @@ -164,9 +170,31 @@ describe('repo subscribe repos', () => { ws.terminate() await verifyCommitEvents(evts) - const lastTwo = evts.slice(-2) - verifyHandleEvent(lastTwo[0], alice, 'alice2.test') - verifyHandleEvent(lastTwo[1], bob, 'bob2.test') + const handleEvts = getHandleEvts(evts.slice(-2)) + verifyHandleEvent(handleEvts[0], alice, 'alice2.test') + verifyHandleEvent(handleEvts[1], bob, 'bob2.test') + }) + + it('does not return invalidated events', async () => { + await sc.updateHandle(alice, 'alice3.test') + await sc.updateHandle(alice, 'alice4.test') + await sc.updateHandle(alice, 'alice5.test') + await sc.updateHandle(bob, 'bob3.test') + await sc.updateHandle(bob, 'bob4.test') + await sc.updateHandle(bob, 'bob5.test') + + const ws = new WebSocket( + `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`, + ) + + const gen = byFrame(ws) + const evts = await readFromGenerator(gen) + ws.terminate() + + const handleEvts = getHandleEvts(evts) + expect(handleEvts.length).toBe(2) + verifyHandleEvent(handleEvts[0], alice, 'alice5.test') + verifyHandleEvent(handleEvts[1], bob, 'bob5.test') }) it('syncs new events', async () => { @@ -252,6 +280,7 @@ describe('repo subscribe repos', () => { .updateTable('repo_seq') .set({ seq: sql`seq+1000` }) .where('seq', 'not in', newSeqs) + .where('invalidatedBy', 'is', null) .returning('seq') .execute() From 1021f6b8a63fdadbeaceb49d51fa1772c2b9822a Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 23 Mar 2023 19:11:21 -0500 Subject: [PATCH 10/14] handle too big evts --- .../src/api/com/atproto/repo/batchWrite.ts | 4 +- packages/pds/src/sequencer/index.ts | 48 +++++++++++++------ packages/repo/src/block-map.ts | 8 ++++ 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/packages/pds/src/api/com/atproto/repo/batchWrite.ts b/packages/pds/src/api/com/atproto/repo/batchWrite.ts index f477643247b..9e427da43c9 100644 --- a/packages/pds/src/api/com/atproto/repo/batchWrite.ts +++ b/packages/pds/src/api/com/atproto/repo/batchWrite.ts @@ -20,7 +20,9 @@ export default function (server: Server, ctx: AppContext) { 'Unvalidated writes are not yet supported.', ) } - + if (tx.writes.length > 200) { + throw new InvalidRequestError('Two many writes. Max: 200') + } const hasUpdate = tx.writes.some( (write) => write.action === WriteOpAction.Update, ) diff --git a/packages/pds/src/sequencer/index.ts b/packages/pds/src/sequencer/index.ts index 2ac685e67b1..9a29d04255e 100644 --- a/packages/pds/src/sequencer/index.ts +++ b/packages/pds/src/sequencer/index.ts @@ -5,7 +5,13 @@ import { seqLogger as log } from '../logger' import { RepoSeqEntry } from '../db/tables/repo-seq' import { z } from 'zod' import { cborDecode, cborEncode, check, schema } from '@atproto/common' -import { blocksToCar, CidSet, CommitData, WriteOpAction } from '@atproto/repo' +import { + BlockMap, + blocksToCar, + CidSet, + CommitData, + WriteOpAction, +} from '@atproto/repo' import { PreparedWrite } from '../repo' import { CID } from 'multiformats/cid' @@ -131,26 +137,38 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { commitData: CommitData, writes: PreparedWrite[], ) { - // @TODO handle too big + let tooBig: boolean const ops: CommitEvtOp[] = [] const blobs = new CidSet() - for (const w of writes) { - const path = w.uri.collection + '/' + w.uri.rkey - let cid: CID | null - if (w.action === WriteOpAction.Delete) { - cid = null - } else { - cid = w.cid - w.blobs.forEach((blob) => { - blobs.add(blob.cid) - }) + let carSlice: Uint8Array + + // max 200 ops or 1MB of data + if (writes.length > 200 || commitData.blocks.byteSize > 1024000) { + tooBig = true + const justRoot = new BlockMap() + justRoot.add(commitData.blocks.get(commitData.commit)) + carSlice = await blocksToCar(commitData.commit, justRoot) + } else { + tooBig = false + for (const w of writes) { + const path = w.uri.collection + '/' + w.uri.rkey + let cid: CID | null + if (w.action === WriteOpAction.Delete) { + cid = null + } else { + cid = w.cid + w.blobs.forEach((blob) => { + blobs.add(blob.cid) + }) + } + ops.push({ action: w.action, path, cid }) } - ops.push({ action: w.action, path, cid }) + carSlice = await blocksToCar(commitData.commit, commitData.blocks) } - const carSlice = await blocksToCar(commitData.commit, commitData.blocks) + const evt: CommitEvt = { rebase: false, - tooBig: false, + tooBig, repo: did, commit: commitData.commit, prev: commitData.prev, diff --git a/packages/repo/src/block-map.ts b/packages/repo/src/block-map.ts index 3fc273e002a..d95e72cf594 100644 --- a/packages/repo/src/block-map.ts +++ b/packages/repo/src/block-map.ts @@ -63,6 +63,14 @@ export class BlockMap { return this.map.size } + get byteSize(): number { + let size = 0 + this.forEach((bytes) => { + size += bytes.length + }) + return size + } + equals(other: BlockMap): boolean { if (this.size !== other.size) { return false From 2dca67d67f66aa20ecbbd9b02c1cd31920740cd0 Mon Sep 17 00:00:00 2001 From: dholms Date: Thu, 23 Mar 2023 19:20:20 -0500 Subject: [PATCH 11/14] dont thread sequencer through everything --- packages/pds/src/index.ts | 1 - packages/pds/src/sequencer/events.ts | 144 ++++++++++++++++++ packages/pds/src/sequencer/index.ts | 143 +---------------- packages/pds/src/services/account/index.ts | 10 +- packages/pds/src/services/index.ts | 13 +- packages/pds/src/services/moderation/index.ts | 6 +- packages/pds/src/services/moderation/views.ts | 9 +- packages/pds/src/services/repo/index.ts | 8 +- 8 files changed, 161 insertions(+), 173 deletions(-) create mode 100644 packages/pds/src/sequencer/events.ts diff --git a/packages/pds/src/index.ts b/packages/pds/src/index.ts index 383a91caeca..736e5a68657 100644 --- a/packages/pds/src/index.ts +++ b/packages/pds/src/index.ts @@ -122,7 +122,6 @@ export class PDS { messageQueue, messageDispatcher, blobstore, - sequencer, imgUriBuilder, imgInvalidator, }) diff --git a/packages/pds/src/sequencer/events.ts b/packages/pds/src/sequencer/events.ts new file mode 100644 index 00000000000..ed4fb8dce46 --- /dev/null +++ b/packages/pds/src/sequencer/events.ts @@ -0,0 +1,144 @@ +import Database from '../db' +import { z } from 'zod' +import { cborEncode, schema } from '@atproto/common' +import { + BlockMap, + blocksToCar, + CidSet, + CommitData, + WriteOpAction, +} from '@atproto/repo' +import { PreparedWrite } from '../repo' +import { CID } from 'multiformats/cid' + +export const sequenceCommit = async ( + dbTxn: Database, + did: string, + commitData: CommitData, + writes: PreparedWrite[], +) => { + let tooBig: boolean + const ops: CommitEvtOp[] = [] + const blobs = new CidSet() + let carSlice: Uint8Array + + // max 200 ops or 1MB of data + if (writes.length > 200 || commitData.blocks.byteSize > 1024000) { + tooBig = true + const justRoot = new BlockMap() + justRoot.add(commitData.blocks.get(commitData.commit)) + carSlice = await blocksToCar(commitData.commit, justRoot) + } else { + tooBig = false + for (const w of writes) { + const path = w.uri.collection + '/' + w.uri.rkey + let cid: CID | null + if (w.action === WriteOpAction.Delete) { + cid = null + } else { + cid = w.cid + w.blobs.forEach((blob) => { + blobs.add(blob.cid) + }) + } + ops.push({ action: w.action, path, cid }) + } + carSlice = await blocksToCar(commitData.commit, commitData.blocks) + } + + const evt: CommitEvt = { + rebase: false, + tooBig, + repo: did, + commit: commitData.commit, + prev: commitData.prev, + ops, + blocks: carSlice, + blobs: blobs.toList(), + } + await dbTxn.db + .insertInto('repo_seq') + .values({ + did, + eventType: 'append', + event: cborEncode(evt), + sequencedAt: new Date().toISOString(), + }) + .execute() + await dbTxn.notify('repo_seq') +} + +export const sequenceHandleUpdate = async ( + dbTxn: Database, + did: string, + handle: string, +) => { + const evt: HandleEvt = { + did, + handle, + } + const res = await dbTxn.db + .insertInto('repo_seq') + .values({ + did, + eventType: 'handle', + event: cborEncode(evt), + sequencedAt: new Date().toISOString(), + }) + .returningAll() + .executeTakeFirst() + if (!res) { + throw new Error(`could not sequence handle change: ${evt}`) + } + await dbTxn.db + .updateTable('repo_seq') + .where('eventType', '=', 'handle') + .where('did', '=', did) + .where('seq', '!=', res.seq) + .set({ invalidatedBy: res.seq }) + .execute() + await dbTxn.notify('repo_seq') +} + +export const commitEvtOp = z.object({ + action: z.union([ + z.literal('create'), + z.literal('update'), + z.literal('delete'), + ]), + path: z.string(), + cid: schema.cid.nullable(), +}) +export type CommitEvtOp = z.infer + +export const commitEvt = z.object({ + rebase: z.boolean(), + tooBig: z.boolean(), + repo: z.string(), + commit: schema.cid, + prev: schema.cid.nullable(), + blocks: schema.bytes, + ops: z.array(commitEvtOp), + blobs: z.array(schema.cid), +}) +export type CommitEvt = z.infer + +export const handleEvt = z.object({ + did: z.string(), + handle: z.string(), +}) +export type HandleEvt = z.infer + +type TypedCommitEvt = { + type: 'commit' + seq: number + time: string + evt: CommitEvt +} +type TypedHandleEvt = { + type: 'handle' + seq: number + time: string + evt: HandleEvt +} +export type SeqEvt = TypedCommitEvt | TypedHandleEvt diff --git a/packages/pds/src/sequencer/index.ts b/packages/pds/src/sequencer/index.ts index 9a29d04255e..f883fb47839 100644 --- a/packages/pds/src/sequencer/index.ts +++ b/packages/pds/src/sequencer/index.ts @@ -3,17 +3,10 @@ import TypedEmitter from 'typed-emitter' import Database from '../db' import { seqLogger as log } from '../logger' import { RepoSeqEntry } from '../db/tables/repo-seq' -import { z } from 'zod' -import { cborDecode, cborEncode, check, schema } from '@atproto/common' -import { - BlockMap, - blocksToCar, - CidSet, - CommitData, - WriteOpAction, -} from '@atproto/repo' -import { PreparedWrite } from '../repo' -import { CID } from 'multiformats/cid' +import { cborDecode, check } from '@atproto/common' +import { commitEvt, handleEvt, SeqEvt } from './events' + +export * from './events' export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { polling = false @@ -130,135 +123,7 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { } } } - - async sequenceCommit( - dbTxn: Database, - did: string, - commitData: CommitData, - writes: PreparedWrite[], - ) { - let tooBig: boolean - const ops: CommitEvtOp[] = [] - const blobs = new CidSet() - let carSlice: Uint8Array - - // max 200 ops or 1MB of data - if (writes.length > 200 || commitData.blocks.byteSize > 1024000) { - tooBig = true - const justRoot = new BlockMap() - justRoot.add(commitData.blocks.get(commitData.commit)) - carSlice = await blocksToCar(commitData.commit, justRoot) - } else { - tooBig = false - for (const w of writes) { - const path = w.uri.collection + '/' + w.uri.rkey - let cid: CID | null - if (w.action === WriteOpAction.Delete) { - cid = null - } else { - cid = w.cid - w.blobs.forEach((blob) => { - blobs.add(blob.cid) - }) - } - ops.push({ action: w.action, path, cid }) - } - carSlice = await blocksToCar(commitData.commit, commitData.blocks) - } - - const evt: CommitEvt = { - rebase: false, - tooBig, - repo: did, - commit: commitData.commit, - prev: commitData.prev, - ops, - blocks: carSlice, - blobs: blobs.toList(), - } - await dbTxn.db - .insertInto('repo_seq') - .values({ - did, - eventType: 'append', - event: cborEncode(evt), - sequencedAt: new Date().toISOString(), - }) - .execute() - await dbTxn.notify('repo_seq') - } - - async sequenceHandleUpdate(dbTxn: Database, did: string, handle: string) { - const evt: HandleEvt = { - did, - handle, - } - const res = await dbTxn.db - .insertInto('repo_seq') - .values({ - did, - eventType: 'handle', - event: cborEncode(evt), - sequencedAt: new Date().toISOString(), - }) - .returningAll() - .executeTakeFirst() - if (!res) { - throw new Error(`could not sequence handle change: ${evt}`) - } - await dbTxn.db - .updateTable('repo_seq') - .where('eventType', '=', 'handle') - .where('did', '=', did) - .where('seq', '!=', res.seq) - .set({ invalidatedBy: res.seq }) - .execute() - await dbTxn.notify('repo_seq') - } -} - -const commitEvtOp = z.object({ - action: z.union([ - z.literal('create'), - z.literal('update'), - z.literal('delete'), - ]), - path: z.string(), - cid: schema.cid.nullable(), -}) -type CommitEvtOp = z.infer - -const commitEvt = z.object({ - rebase: z.boolean(), - tooBig: z.boolean(), - repo: z.string(), - commit: schema.cid, - prev: schema.cid.nullable(), - blocks: schema.bytes, - ops: z.array(commitEvtOp), - blobs: z.array(schema.cid), -}) -type CommitEvt = z.infer - -const handleEvt = z.object({ - did: z.string(), - handle: z.string(), -}) -type HandleEvt = z.infer - -type TypedCommitEvt = { - type: 'commit' - seq: number - time: string - evt: CommitEvt -} -type TypedHandleEvt = { - type: 'handle' - seq: number - time: string - evt: HandleEvt } -export type SeqEvt = TypedCommitEvt | TypedHandleEvt type SequencerEvents = { events: (evts: SeqEvt[]) => void diff --git a/packages/pds/src/services/account/index.ts b/packages/pds/src/services/account/index.ts index 883dec3cc0c..644911e5d16 100644 --- a/packages/pds/src/services/account/index.ts +++ b/packages/pds/src/services/account/index.ts @@ -10,13 +10,13 @@ import { Record as DeclarationRecord } from '../../lexicon/types/app/bsky/system import { notSoftDeletedClause } from '../../db/util' import { getUserSearchQueryPg, getUserSearchQuerySqlite } from '../util/search' import { paginate, TimeCidKeyset } from '../../db/pagination' -import Sequencer from '../../sequencer' +import { sequenceHandleUpdate } from '../../sequencer' export class AccountService { - constructor(public db: Database, public sequencer: Sequencer) {} + constructor(public db: Database) {} - static creator(sequencer: Sequencer) { - return (db: Database) => new AccountService(db, sequencer) + static creator() { + return (db: Database) => new AccountService(db) } async getUser( @@ -151,7 +151,7 @@ export class AccountService { if (res.numUpdatedRows < 1) { throw new UserAlreadyExistsError() } - await this.sequencer.sequenceHandleUpdate(this.db, did, handle) + await sequenceHandleUpdate(this.db, did, handle) } async updateUserPassword(did: string, password: string) { diff --git a/packages/pds/src/services/index.ts b/packages/pds/src/services/index.ts index 209466eabd4..162a160851a 100644 --- a/packages/pds/src/services/index.ts +++ b/packages/pds/src/services/index.ts @@ -13,14 +13,12 @@ import { ModerationService } from './moderation' import { ActorService } from '../app-view/services/actor' import { FeedService } from '../app-view/services/feed' import { IndexingService } from '../app-view/services/indexing' -import Sequencer from '../sequencer' export function createServices(resources: { repoSigningKey: crypto.Keypair messageQueue: MessageQueue messageDispatcher: MessageDispatcher blobstore: BlobStore - sequencer: Sequencer imgUriBuilder: ImageUriBuilder imgInvalidator: ImageInvalidator }): Services { @@ -29,24 +27,17 @@ export function createServices(resources: { messageQueue, messageDispatcher, blobstore, - sequencer, imgUriBuilder, imgInvalidator, } = resources return { - account: AccountService.creator(sequencer), + account: AccountService.creator(), auth: AuthService.creator(), record: RecordService.creator(messageDispatcher), - repo: RepoService.creator( - repoSigningKey, - messageDispatcher, - blobstore, - sequencer, - ), + repo: RepoService.creator(repoSigningKey, messageDispatcher, blobstore), moderation: ModerationService.creator( messageDispatcher, blobstore, - sequencer, imgUriBuilder, imgInvalidator, ), diff --git a/packages/pds/src/services/moderation/index.ts b/packages/pds/src/services/moderation/index.ts index 76288ba4ee2..e70239f841c 100644 --- a/packages/pds/src/services/moderation/index.ts +++ b/packages/pds/src/services/moderation/index.ts @@ -11,14 +11,12 @@ import { ModerationViews } from './views' import SqlRepoStorage from '../../sql-repo-storage' import { ImageInvalidator } from '../../image/invalidator' import { ImageUriBuilder } from '../../image/uri' -import Sequencer from '../../sequencer' export class ModerationService { constructor( public db: Database, public messageDispatcher: MessageQueue, public blobstore: BlobStore, - public sequencer: Sequencer, public imgUriBuilder: ImageUriBuilder, public imgInvalidator: ImageInvalidator, ) {} @@ -26,7 +24,6 @@ export class ModerationService { static creator( messageDispatcher: MessageQueue, blobstore: BlobStore, - sequencer: Sequencer, imgUriBuilder: ImageUriBuilder, imgInvalidator: ImageInvalidator, ) { @@ -35,13 +32,12 @@ export class ModerationService { db, messageDispatcher, blobstore, - sequencer, imgUriBuilder, imgInvalidator, ) } - views = new ModerationViews(this.db, this.messageDispatcher, this.sequencer) + views = new ModerationViews(this.db, this.messageDispatcher) services = { record: RecordService.creator(this.messageDispatcher), diff --git a/packages/pds/src/services/moderation/views.ts b/packages/pds/src/services/moderation/views.ts index 3c56ada9840..6a106b86840 100644 --- a/packages/pds/src/services/moderation/views.ts +++ b/packages/pds/src/services/moderation/views.ts @@ -26,17 +26,12 @@ import { OutputSchema as ReportOutput } from '../../lexicon/types/com/atproto/re import { ModerationAction, ModerationReport } from '../../db/tables/moderation' import { AccountService } from '../account' import { RecordService } from '../record' -import Sequencer from '../../sequencer' export class ModerationViews { - constructor( - private db: Database, - private messageDispatcher: MessageQueue, - private sequencer: Sequencer, - ) {} + constructor(private db: Database, private messageDispatcher: MessageQueue) {} services = { - account: AccountService.creator(this.sequencer), + account: AccountService.creator(), record: RecordService.creator(this.messageDispatcher), } diff --git a/packages/pds/src/services/repo/index.ts b/packages/pds/src/services/repo/index.ts index 2748289af14..0e0eb5b770d 100644 --- a/packages/pds/src/services/repo/index.ts +++ b/packages/pds/src/services/repo/index.ts @@ -9,7 +9,7 @@ import { PreparedCreate, PreparedWrite } from '../../repo/types' import { RepoBlobs } from './blobs' import { createWriteToOp, writeToOp } from '../../repo' import { RecordService } from '../record' -import Sequencer from '../../sequencer' +import { sequenceCommit } from '../../sequencer' export class RepoService { blobs: RepoBlobs @@ -19,7 +19,6 @@ export class RepoService { public repoSigningKey: crypto.Keypair, public messageDispatcher: MessageQueue, public blobstore: BlobStore, - public sequencer: Sequencer, ) { this.blobs = new RepoBlobs(db, blobstore) } @@ -28,10 +27,9 @@ export class RepoService { keypair: crypto.Keypair, messageDispatcher: MessageQueue, blobstore: BlobStore, - sequencer: Sequencer, ) { return (db: Database) => - new RepoService(db, keypair, messageDispatcher, blobstore, sequencer) + new RepoService(db, keypair, messageDispatcher, blobstore) } services = { @@ -127,7 +125,7 @@ export class RepoService { ) { await Promise.all([ this.blobs.processWriteBlobs(did, commitData.commit, writes), - this.sequencer.sequenceCommit(this.db, did, commitData, writes), + sequenceCommit(this.db, did, commitData, writes), ]) } From 4636e69ed954c210c5784852c40579e73c7bd075 Mon Sep 17 00:00:00 2001 From: Daniel Holmgren Date: Mon, 27 Mar 2023 10:30:42 -0500 Subject: [PATCH 12/14] Update packages/pds/src/api/com/atproto/repo/applyWrites.ts Co-authored-by: devin ivy --- packages/pds/src/api/com/atproto/repo/applyWrites.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/pds/src/api/com/atproto/repo/applyWrites.ts b/packages/pds/src/api/com/atproto/repo/applyWrites.ts index 2e50712f36b..c3c7dc69266 100644 --- a/packages/pds/src/api/com/atproto/repo/applyWrites.ts +++ b/packages/pds/src/api/com/atproto/repo/applyWrites.ts @@ -30,7 +30,7 @@ export default function (server: Server, ctx: AppContext) { ) } if (tx.writes.length > 200) { - throw new InvalidRequestError('Two many writes. Max: 200') + throw new InvalidRequestError('Too many writes. Max: 200') } const hasUpdate = tx.writes.some(isUpdate) From 5287841d7296d5ab024f569eebc9ebe41dba481f Mon Sep 17 00:00:00 2001 From: dholms Date: Mon, 27 Mar 2023 09:52:18 -0600 Subject: [PATCH 13/14] pr feedback --- lexicons/com/atproto/sync/subscribeRepos.json | 6 ++++-- packages/api/src/client/lexicons.ts | 2 ++ .../db/migrations/20230323T202553064Z-rework-seq.ts | 10 ++++++++++ packages/pds/src/lexicon/lexicons.ts | 2 ++ packages/pds/src/sequencer/events.ts | 2 +- 5 files changed, 19 insertions(+), 3 deletions(-) diff --git a/lexicons/com/atproto/sync/subscribeRepos.json b/lexicons/com/atproto/sync/subscribeRepos.json index d6e9d3dd956..4568b7a9240 100644 --- a/lexicons/com/atproto/sync/subscribeRepos.json +++ b/lexicons/com/atproto/sync/subscribeRepos.json @@ -43,11 +43,13 @@ "prev": {"type": "cid-link"}, "blocks": { "type": "bytes", - "description": "CAR file containing relevant blocks" + "description": "CAR file containing relevant blocks", + "maxLength": 1000000 }, "ops": { "type": "array", - "items": { "type": "ref", "ref": "#repoOp"} + "items": { "type": "ref", "ref": "#repoOp"}, + "maxLength": 200 }, "blobs": { "type": "array", diff --git a/packages/api/src/client/lexicons.ts b/packages/api/src/client/lexicons.ts index 1911c8fb0f7..84a2e58e1a5 100644 --- a/packages/api/src/client/lexicons.ts +++ b/packages/api/src/client/lexicons.ts @@ -2398,6 +2398,7 @@ export const schemaDict = { blocks: { type: 'bytes', description: 'CAR file containing relevant blocks', + maxLength: 1000000, }, ops: { type: 'array', @@ -2405,6 +2406,7 @@ export const schemaDict = { type: 'ref', ref: 'lex:com.atproto.sync.subscribeRepos#repoOp', }, + maxLength: 200, }, blobs: { type: 'array', diff --git a/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts b/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts index 30e82780d52..144fa3a2075 100644 --- a/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts +++ b/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts @@ -31,20 +31,30 @@ export async function up(db: Kysely, dialect: Dialect): Promise { .addColumn('eventType', 'varchar', (col) => col.notNull()) .addColumn('event', binaryDatatype, (col) => col.notNull()) .addColumn('sequencedAt', 'varchar', (col) => col.notNull()) + .addForeignKeyConstraint( + 'invalidated_by_fkey', + // @ts-ignore + ['invalidatedBy'], + 'repo_seq', + ['seq'], + ) .execute() + // for filtering seqs based on did await db.schema .createIndex(repoSeqDidIndex) .on(repoSeqTable) .column('did') .execute() + // for filtering seqs based on event type await db.schema .createIndex(repoSeqEventTypeIndex) .on(repoSeqTable) .column('eventType') .execute() + // for entering into the seq stream at a particular time await db.schema .createIndex(repoSeqSequencedAtIndex) .on(repoSeqTable) diff --git a/packages/pds/src/lexicon/lexicons.ts b/packages/pds/src/lexicon/lexicons.ts index 1911c8fb0f7..84a2e58e1a5 100644 --- a/packages/pds/src/lexicon/lexicons.ts +++ b/packages/pds/src/lexicon/lexicons.ts @@ -2398,6 +2398,7 @@ export const schemaDict = { blocks: { type: 'bytes', description: 'CAR file containing relevant blocks', + maxLength: 1000000, }, ops: { type: 'array', @@ -2405,6 +2406,7 @@ export const schemaDict = { type: 'ref', ref: 'lex:com.atproto.sync.subscribeRepos#repoOp', }, + maxLength: 200, }, blobs: { type: 'array', diff --git a/packages/pds/src/sequencer/events.ts b/packages/pds/src/sequencer/events.ts index ed4fb8dce46..0f0357aa97b 100644 --- a/packages/pds/src/sequencer/events.ts +++ b/packages/pds/src/sequencer/events.ts @@ -23,7 +23,7 @@ export const sequenceCommit = async ( let carSlice: Uint8Array // max 200 ops or 1MB of data - if (writes.length > 200 || commitData.blocks.byteSize > 1024000) { + if (writes.length > 200 || commitData.blocks.byteSize > 1000000) { tooBig = true const justRoot = new BlockMap() justRoot.add(commitData.blocks.get(commitData.commit)) From 6d7a618222e04c29c33b21ba6a1c9817bc1b30a7 Mon Sep 17 00:00:00 2001 From: dholms Date: Mon, 27 Mar 2023 15:22:46 -0600 Subject: [PATCH 14/14] fix error in test from fkey constraint --- .../pds/tests/sync/subscribe-repos.test.ts | 29 +++++-------------- 1 file changed, 8 insertions(+), 21 deletions(-) diff --git a/packages/pds/tests/sync/subscribe-repos.test.ts b/packages/pds/tests/sync/subscribe-repos.test.ts index df03393be59..ff328a4f5f3 100644 --- a/packages/pds/tests/sync/subscribe-repos.test.ts +++ b/packages/pds/tests/sync/subscribe-repos.test.ts @@ -262,31 +262,18 @@ describe('repo subscribe repos', () => { }) it('sends info frame on out of date cursor', async () => { - // we stick three new seqs in with a date past the backfill cutoff - // then we increment the sequence number of everything else to test out of date cursor + // we rewrite the sequenceAt time for existing seqs to be past the backfill cutoff + // then we create some new posts const overAnHourAgo = new Date(Date.now() - HOUR - MINUTE).toISOString() - const dummySeq = { - did: 'did:example:test', - eventType: 'append' as const, - event: new Uint8Array([1, 2, 3, 4]), - sequencedAt: overAnHourAgo, - } - const newRows = await db.db - .insertInto('repo_seq') - .values([dummySeq, dummySeq, dummySeq]) - .returning('seq') - .execute() - const newSeqs = newRows.map((r) => r.seq) - const movedToFuture = await db.db + await db.db .updateTable('repo_seq') - .set({ seq: sql`seq+1000` }) - .where('seq', 'not in', newSeqs) - .where('invalidatedBy', 'is', null) - .returning('seq') + .set({ sequencedAt: overAnHourAgo }) .execute() + await makePosts() + const ws = new WebSocket( - `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${newSeqs[0]}`, + `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`, ) const [info, ...evts] = await readFromGenerator(byFrame(ws)) ws.terminate() @@ -297,7 +284,7 @@ describe('repo subscribe repos', () => { expect(info.header.t).toBe('#info') const body = info.body as Record expect(body.name).toEqual('OutdatedCursor') - expect(evts.length).toBe(movedToFuture.length) + expect(evts.length).toBe(40) }) it('errors on future cursor', async () => {