diff --git a/lexicons/com/atproto/sync/subscribeRepos.json b/lexicons/com/atproto/sync/subscribeRepos.json index 916d01dd7b9..4568b7a9240 100644 --- a/lexicons/com/atproto/sync/subscribeRepos.json +++ b/lexicons/com/atproto/sync/subscribeRepos.json @@ -43,11 +43,13 @@ "prev": {"type": "cid-link"}, "blocks": { "type": "bytes", - "description": "CAR file containing relevant blocks" + "description": "CAR file containing relevant blocks", + "maxLength": 1000000 }, "ops": { "type": "array", - "items": { "type": "ref", "ref": "#repoOp"} + "items": { "type": "ref", "ref": "#repoOp"}, + "maxLength": 200 }, "blobs": { "type": "array", @@ -58,29 +60,32 @@ }, "handle": { "type": "object", - "required": ["seq", "did", "handle"], + "required": ["seq", "did", "handle", "time"], "properties": { "seq": {"type": "integer"}, "did": {"type": "string", "format": "did"}, - "handle": {"type": "string", "format": "handle"} + "handle": {"type": "string", "format": "handle"}, + "time": {"type": "string", "format": "datetime"} } }, "migrate": { "type": "object", - "required": ["seq", "did", "migrateTo"], + "required": ["seq", "did", "migrateTo", "time"], "nullable": ["migrateTo"], "properties": { "seq": {"type": "integer"}, "did": {"type": "string", "format": "did"}, - "migrateTo": {"type": "string"} + "migrateTo": {"type": "string"}, + "time": {"type": "string", "format": "datetime"} } }, "tombstone": { "type": "object", - "required": ["seq", "did"], + "required": ["seq", "did", "time"], "properties": { "seq": {"type": "integer"}, - "did": {"type": "string", "format": "did"} + "did": {"type": "string", "format": "did"}, + "time": {"type": "string", "format": "datetime"} } }, "info": { diff --git a/packages/api/src/client/lexicons.ts b/packages/api/src/client/lexicons.ts index c78daed7a9f..d60878cc1de 100644 --- a/packages/api/src/client/lexicons.ts +++ b/packages/api/src/client/lexicons.ts @@ -2398,6 +2398,7 @@ export const schemaDict = { blocks: { type: 'bytes', description: 'CAR file containing relevant blocks', + maxLength: 1000000, }, ops: { type: 'array', @@ -2405,6 +2406,7 @@ export const schemaDict = { type: 'ref', ref: 'lex:com.atproto.sync.subscribeRepos#repoOp', }, + maxLength: 200, }, blobs: { type: 'array', @@ -2420,7 +2422,7 @@ export const schemaDict = { }, handle: { type: 'object', - required: ['seq', 'did', 'handle'], + required: ['seq', 'did', 'handle', 'time'], properties: { seq: { type: 'integer', @@ -2433,11 +2435,15 @@ export const schemaDict = { type: 'string', format: 'handle', }, + time: { + type: 'string', + format: 'datetime', + }, }, }, migrate: { type: 'object', - required: ['seq', 'did', 'migrateTo'], + required: ['seq', 'did', 'migrateTo', 'time'], nullable: ['migrateTo'], properties: { seq: { @@ -2450,11 +2456,15 @@ export const schemaDict = { migrateTo: { type: 'string', }, + time: { + type: 'string', + format: 'datetime', + }, }, }, tombstone: { type: 'object', - required: ['seq', 'did'], + required: ['seq', 'did', 'time'], properties: { seq: { type: 'integer', @@ -2463,6 +2473,10 @@ export const schemaDict = { type: 'string', format: 'did', }, + time: { + type: 'string', + format: 'datetime', + }, }, }, info: { diff --git a/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts b/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts index 8bd579335c1..7e41515badb 100644 --- a/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts +++ b/packages/api/src/client/types/com/atproto/sync/subscribeRepos.ts @@ -38,6 +38,7 @@ export interface Handle { seq: number did: string handle: string + time: string [k: string]: unknown } @@ -57,6 +58,7 @@ export interface Migrate { seq: number did: string migrateTo: string | null + time: string [k: string]: unknown } @@ -75,6 +77,7 @@ export function validateMigrate(v: unknown): ValidationResult { export interface Tombstone { seq: number did: string + time: string [k: string]: unknown } diff --git a/packages/pds/src/api/com/atproto/repo/applyWrites.ts b/packages/pds/src/api/com/atproto/repo/applyWrites.ts index c501b195399..a9b5ff46924 100644 --- a/packages/pds/src/api/com/atproto/repo/applyWrites.ts +++ b/packages/pds/src/api/com/atproto/repo/applyWrites.ts @@ -33,6 +33,9 @@ export default function (server: Server, ctx: AppContext) { 'Unvalidated writes are not yet supported.', ) } + if (tx.writes.length > 200) { + throw new InvalidRequestError('Too many writes. Max: 200') + } const hasUpdate = tx.writes.some(isUpdate) if (hasUpdate) { diff --git a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts index a43ff14959d..9c0ec962d26 100644 --- a/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/api/com/atproto/sync/subscribeRepos.ts @@ -1,7 +1,6 @@ import { Server } from '../../../../lexicon' import AppContext from '../../../../context' import Outbox from '../../../../sequencer/outbox' -import { Commit } from '../../../../lexicon/types/com/atproto/sync/subscribeRepos' import { InvalidRequestError } from '@atproto/xrpc-server' export default function (server: Server, ctx: AppContext) { @@ -32,21 +31,21 @@ export default function (server: Server, ctx: AppContext) { } for await (const evt of outbox.events(cursor, backfillTime)) { - const { seq, time, repo, commit, prev, blocks, ops, blobs } = evt - const toYield: Commit = { - $type: '#commit', - seq, - rebase: false, - tooBig: false, - repo, - commit, - blocks, - ops, - blobs, - time, - prev: prev ?? null, + if (evt.type === 'commit') { + yield { + $type: '#commit', + seq: evt.seq, + time: evt.time, + ...evt.evt, + } + } else if (evt.type === 'handle') { + yield { + $type: '#handle', + seq: evt.seq, + time: evt.time, + ...evt.evt, + } } - yield toYield } }) } diff --git a/packages/pds/src/db/database-schema.ts b/packages/pds/src/db/database-schema.ts index 63547ed6ead..1eeb0012470 100644 --- a/packages/pds/src/db/database-schema.ts +++ b/packages/pds/src/db/database-schema.ts @@ -9,7 +9,6 @@ import * as backlink from './tables/backlink' import * as repoCommitBlock from './tables/repo-commit-block' import * as repoCommitHistory from './tables/repo-commit-history' import * as ipldBlock from './tables/ipld-block' -import * as repoOp from './tables/repo-op' import * as inviteCode from './tables/invite-code' import * as notification from './tables/user-notification' import * as blob from './tables/blob' @@ -35,7 +34,6 @@ export type DatabaseSchemaType = appView.DatabaseSchemaType & repoCommitBlock.PartialDB & repoCommitHistory.PartialDB & ipldBlock.PartialDB & - repoOp.PartialDB & repoCommitBlock.PartialDB & repoCommitHistory.PartialDB & inviteCode.PartialDB & diff --git a/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts b/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts new file mode 100644 index 00000000000..144fa3a2075 --- /dev/null +++ b/packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts @@ -0,0 +1,110 @@ +import { Kysely, sql } from 'kysely' +import { Dialect } from '..' + +const repoSeqTable = 'repo_seq' +const repoOpTable = 'repo_op' +const repoSeqDidIndex = 'repo_seq_did_index' +const repoSeqCommitIndex = 'repo_seq_commit_index' +const repoSeqEventTypeIndex = 'repo_seq_event_type_index' +const repoSeqSequencedAtIndex = 'repo_seq_sequenced_at_index' + +export async function up(db: Kysely, dialect: Dialect): Promise { + await db.schema.dropIndex(repoSeqCommitIndex).execute() + await db.schema.dropIndex(repoSeqDidIndex).execute() + await db.schema.dropTable(repoSeqTable).execute() + await db.schema.dropTable(repoOpTable).execute() + + let builder = db.schema.createTable(repoSeqTable) + if (dialect === 'pg') { + builder = builder + .addColumn('seq', 'bigserial', (col) => col.primaryKey()) + .addColumn('invalidatedBy', 'bigint') + } else { + builder = builder + .addColumn('seq', 'integer', (col) => col.autoIncrement().primaryKey()) + .addColumn('invalidatedBy', 'integer') + } + + const binaryDatatype = dialect === 'sqlite' ? sql`blob` : sql`bytea` + await builder + .addColumn('did', 'varchar', (col) => col.notNull()) + .addColumn('eventType', 'varchar', (col) => col.notNull()) + .addColumn('event', binaryDatatype, (col) => col.notNull()) + .addColumn('sequencedAt', 'varchar', (col) => col.notNull()) + .addForeignKeyConstraint( + 'invalidated_by_fkey', + // @ts-ignore + ['invalidatedBy'], + 'repo_seq', + ['seq'], + ) + .execute() + + // for filtering seqs based on did + await db.schema + .createIndex(repoSeqDidIndex) + .on(repoSeqTable) + .column('did') + .execute() + + // for filtering seqs based on event type + await db.schema + .createIndex(repoSeqEventTypeIndex) + .on(repoSeqTable) + .column('eventType') + .execute() + + // for entering into the seq stream at a particular time + await db.schema + .createIndex(repoSeqSequencedAtIndex) + .on(repoSeqTable) + .column('sequencedAt') + .execute() +} + +export async function down( + db: Kysely, + dialect: Dialect, +): Promise { + await db.schema.dropIndex(repoSeqSequencedAtIndex).execute() + await db.schema.dropIndex(repoSeqEventTypeIndex).execute() + await db.schema.dropIndex(repoSeqDidIndex).execute() + await db.schema.dropTable(repoSeqTable).execute() + + let builder = db.schema.createTable(repoSeqTable) + if (dialect === 'pg') { + builder = builder.addColumn('seq', 'serial', (col) => col.primaryKey()) + } else { + builder = builder.addColumn('seq', 'integer', (col) => + col.autoIncrement().primaryKey(), + ) + } + await builder + .addColumn('did', 'varchar', (col) => col.notNull()) + .addColumn('commit', 'varchar', (col) => col.notNull()) + .addColumn('eventType', 'varchar', (col) => col.notNull()) + .addColumn('sequencedAt', 'varchar', (col) => col.notNull()) + .execute() + + await db.schema + .createIndex(repoSeqDidIndex) + .on(repoSeqTable) + .column('did') + .execute() + + await db.schema + .createIndex(repoSeqCommitIndex) + .on(repoSeqTable) + .column('commit') + .execute() + + await db.schema + .createTable(repoOpTable) + .addColumn('did', 'text', (col) => col.notNull()) + .addColumn('commit', 'text', (col) => col.notNull()) + .addColumn('action', 'text', (col) => col.notNull()) + .addColumn('path', 'text', (col) => col.notNull()) + .addColumn('cid', 'text') + .addPrimaryKeyConstraint('repo_op_pkey', ['did', 'commit', 'path']) + .execute() +} diff --git a/packages/pds/src/db/migrations/index.ts b/packages/pds/src/db/migrations/index.ts index c75e29fd0a0..c725883bb1c 100644 --- a/packages/pds/src/db/migrations/index.ts +++ b/packages/pds/src/db/migrations/index.ts @@ -33,3 +33,4 @@ export * as _20230315T045113521Z from './20230315T045113521Z-votes-to-likes' export * as _20230316T142245535Z from './20230316T142245535Z-remove-post-entities' export * as _20230316T153255273Z from './20230316T153255273Z-backlinks' export * as _20230316T225303411Z from './20230316T225303411Z-profile-display-name-empty' +export * as _20230323T202553064Z from './20230323T202553064Z-rework-seq' diff --git a/packages/pds/src/db/tables/repo-op.ts b/packages/pds/src/db/tables/repo-op.ts deleted file mode 100644 index ca7b80d83dd..00000000000 --- a/packages/pds/src/db/tables/repo-op.ts +++ /dev/null @@ -1,13 +0,0 @@ -import { WriteOpAction } from '@atproto/repo' - -export interface RepoOp { - did: string - commit: string - action: WriteOpAction - path: string - cid: string | null -} - -export const tableName = 'repo_op' - -export type PartialDB = { [tableName]: RepoOp } diff --git a/packages/pds/src/db/tables/repo-seq.ts b/packages/pds/src/db/tables/repo-seq.ts index 6a83cd2f189..61542068f4e 100644 --- a/packages/pds/src/db/tables/repo-seq.ts +++ b/packages/pds/src/db/tables/repo-seq.ts @@ -3,8 +3,9 @@ import { Generated, Selectable } from 'kysely' export interface RepoSeq { seq: Generated did: string - commit: string - eventType: 'repo_append' + eventType: 'append' | 'rebase' | 'handle' | 'migrate' | 'tombstone' + event: Uint8Array + invalidatedBy: number | null sequencedAt: string } diff --git a/packages/pds/src/lexicon/lexicons.ts b/packages/pds/src/lexicon/lexicons.ts index c78daed7a9f..d60878cc1de 100644 --- a/packages/pds/src/lexicon/lexicons.ts +++ b/packages/pds/src/lexicon/lexicons.ts @@ -2398,6 +2398,7 @@ export const schemaDict = { blocks: { type: 'bytes', description: 'CAR file containing relevant blocks', + maxLength: 1000000, }, ops: { type: 'array', @@ -2405,6 +2406,7 @@ export const schemaDict = { type: 'ref', ref: 'lex:com.atproto.sync.subscribeRepos#repoOp', }, + maxLength: 200, }, blobs: { type: 'array', @@ -2420,7 +2422,7 @@ export const schemaDict = { }, handle: { type: 'object', - required: ['seq', 'did', 'handle'], + required: ['seq', 'did', 'handle', 'time'], properties: { seq: { type: 'integer', @@ -2433,11 +2435,15 @@ export const schemaDict = { type: 'string', format: 'handle', }, + time: { + type: 'string', + format: 'datetime', + }, }, }, migrate: { type: 'object', - required: ['seq', 'did', 'migrateTo'], + required: ['seq', 'did', 'migrateTo', 'time'], nullable: ['migrateTo'], properties: { seq: { @@ -2450,11 +2456,15 @@ export const schemaDict = { migrateTo: { type: 'string', }, + time: { + type: 'string', + format: 'datetime', + }, }, }, tombstone: { type: 'object', - required: ['seq', 'did'], + required: ['seq', 'did', 'time'], properties: { seq: { type: 'integer', @@ -2463,6 +2473,10 @@ export const schemaDict = { type: 'string', format: 'did', }, + time: { + type: 'string', + format: 'datetime', + }, }, }, info: { diff --git a/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts b/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts index 468ef613cdf..760b5af766d 100644 --- a/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts +++ b/packages/pds/src/lexicon/types/com/atproto/sync/subscribeRepos.ts @@ -59,6 +59,7 @@ export interface Handle { seq: number did: string handle: string + time: string [k: string]: unknown } @@ -78,6 +79,7 @@ export interface Migrate { seq: number did: string migrateTo: string | null + time: string [k: string]: unknown } @@ -96,6 +98,7 @@ export function validateMigrate(v: unknown): ValidationResult { export interface Tombstone { seq: number did: string + time: string [k: string]: unknown } diff --git a/packages/pds/src/sequencer/events.ts b/packages/pds/src/sequencer/events.ts new file mode 100644 index 00000000000..0f0357aa97b --- /dev/null +++ b/packages/pds/src/sequencer/events.ts @@ -0,0 +1,144 @@ +import Database from '../db' +import { z } from 'zod' +import { cborEncode, schema } from '@atproto/common' +import { + BlockMap, + blocksToCar, + CidSet, + CommitData, + WriteOpAction, +} from '@atproto/repo' +import { PreparedWrite } from '../repo' +import { CID } from 'multiformats/cid' + +export const sequenceCommit = async ( + dbTxn: Database, + did: string, + commitData: CommitData, + writes: PreparedWrite[], +) => { + let tooBig: boolean + const ops: CommitEvtOp[] = [] + const blobs = new CidSet() + let carSlice: Uint8Array + + // max 200 ops or 1MB of data + if (writes.length > 200 || commitData.blocks.byteSize > 1000000) { + tooBig = true + const justRoot = new BlockMap() + justRoot.add(commitData.blocks.get(commitData.commit)) + carSlice = await blocksToCar(commitData.commit, justRoot) + } else { + tooBig = false + for (const w of writes) { + const path = w.uri.collection + '/' + w.uri.rkey + let cid: CID | null + if (w.action === WriteOpAction.Delete) { + cid = null + } else { + cid = w.cid + w.blobs.forEach((blob) => { + blobs.add(blob.cid) + }) + } + ops.push({ action: w.action, path, cid }) + } + carSlice = await blocksToCar(commitData.commit, commitData.blocks) + } + + const evt: CommitEvt = { + rebase: false, + tooBig, + repo: did, + commit: commitData.commit, + prev: commitData.prev, + ops, + blocks: carSlice, + blobs: blobs.toList(), + } + await dbTxn.db + .insertInto('repo_seq') + .values({ + did, + eventType: 'append', + event: cborEncode(evt), + sequencedAt: new Date().toISOString(), + }) + .execute() + await dbTxn.notify('repo_seq') +} + +export const sequenceHandleUpdate = async ( + dbTxn: Database, + did: string, + handle: string, +) => { + const evt: HandleEvt = { + did, + handle, + } + const res = await dbTxn.db + .insertInto('repo_seq') + .values({ + did, + eventType: 'handle', + event: cborEncode(evt), + sequencedAt: new Date().toISOString(), + }) + .returningAll() + .executeTakeFirst() + if (!res) { + throw new Error(`could not sequence handle change: ${evt}`) + } + await dbTxn.db + .updateTable('repo_seq') + .where('eventType', '=', 'handle') + .where('did', '=', did) + .where('seq', '!=', res.seq) + .set({ invalidatedBy: res.seq }) + .execute() + await dbTxn.notify('repo_seq') +} + +export const commitEvtOp = z.object({ + action: z.union([ + z.literal('create'), + z.literal('update'), + z.literal('delete'), + ]), + path: z.string(), + cid: schema.cid.nullable(), +}) +export type CommitEvtOp = z.infer + +export const commitEvt = z.object({ + rebase: z.boolean(), + tooBig: z.boolean(), + repo: z.string(), + commit: schema.cid, + prev: schema.cid.nullable(), + blocks: schema.bytes, + ops: z.array(commitEvtOp), + blobs: z.array(schema.cid), +}) +export type CommitEvt = z.infer + +export const handleEvt = z.object({ + did: z.string(), + handle: z.string(), +}) +export type HandleEvt = z.infer + +type TypedCommitEvt = { + type: 'commit' + seq: number + time: string + evt: CommitEvt +} +type TypedHandleEvt = { + type: 'handle' + seq: number + time: string + evt: HandleEvt +} +export type SeqEvt = TypedCommitEvt | TypedHandleEvt diff --git a/packages/pds/src/sequencer/index.ts b/packages/pds/src/sequencer/index.ts index de045539242..f883fb47839 100644 --- a/packages/pds/src/sequencer/index.ts +++ b/packages/pds/src/sequencer/index.ts @@ -1,10 +1,12 @@ -import { BlockMap, writeCar } from '@atproto/repo' import EventEmitter from 'events' import TypedEmitter from 'typed-emitter' -import { CID } from 'multiformats/cid' import Database from '../db' import { seqLogger as log } from '../logger' import { RepoSeqEntry } from '../db/tables/repo-seq' +import { cborDecode, check } from '@atproto/common' +import { commitEvt, handleEvt, SeqEvt } from './events' + +export * from './events' export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { polling = false @@ -56,24 +58,14 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { earliestSeq?: number earliestTime?: string limit?: number - }): Promise { + }): Promise { const { earliestSeq, earliestTime, limit } = opts let seqQb = this.db.db .selectFrom('repo_seq') - .innerJoin('repo_commit_history', (join) => - join - .onRef('repo_commit_history.creator', '=', 'repo_seq.did') - .onRef('repo_commit_history.commit', '=', 'repo_seq.commit'), - ) - .select([ - 'repo_seq.seq as seq', - 'repo_seq.did as did', - 'repo_seq.commit as commit', - 'repo_seq.sequencedAt as sequencedAt', - 'repo_commit_history.prev as prev', - ]) + .selectAll() .orderBy('seq', 'asc') + .where('invalidatedBy', 'is', null) if (earliestSeq !== undefined) { seqQb = seqQb.where('seq', '>', earliestSeq) } @@ -84,114 +76,32 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { seqQb = seqQb.limit(limit) } - const events = await seqQb.execute() - if (events.length < 1) { + const rows = await seqQb.execute() + if (rows.length < 1) { return [] } - // we don't chunk because this is only ever used with a limit of 50 - const seqs = events.map((evt) => evt.seq) - - const getBlocks = this.db.db - .selectFrom('repo_seq') - .where('repo_seq.seq', 'in', seqs) - .innerJoin('repo_commit_block', (join) => - join - .onRef('repo_commit_block.creator', '=', 'repo_seq.did') - .onRef('repo_commit_block.commit', '=', 'repo_seq.commit'), - ) - .innerJoin('ipld_block', (join) => - join - .onRef('ipld_block.cid', '=', 'repo_commit_block.block') - .onRef('ipld_block.creator', '=', 'repo_commit_block.creator'), - ) - .select([ - 'repo_seq.seq as seq', - 'ipld_block.cid as cid', - 'ipld_block.content as content', - ]) - .execute() - - const getBlobs = this.db.db - .selectFrom('repo_seq') - .where('repo_seq.seq', 'in', seqs) - .innerJoin('repo_blob', (join) => - join - .onRef('repo_blob.did', '=', 'repo_seq.did') - .onRef('repo_blob.commit', '=', 'repo_seq.commit'), - ) - .select(['repo_seq.seq as seq', 'repo_blob.cid as cid']) - .execute() - - const getOps = this.db.db - .selectFrom('repo_seq') - .where('repo_seq.seq', 'in', seqs) - .innerJoin('repo_op', (join) => - join - .onRef('repo_op.did', '=', 'repo_seq.did') - .onRef('repo_op.commit', '=', 'repo_seq.commit'), - ) - .select([ - 'repo_seq.seq as seq', - 'repo_op.action as action', - 'repo_op.path as path', - 'repo_op.cid as cid', - ]) - .execute() - - const [blocks, blobs, ops] = await Promise.all([ - getBlocks, - getBlobs, - getOps, - ]) - - const blocksBySeq = blocks.reduce((acc, cur) => { - acc[cur.seq] ??= new BlockMap() - acc[cur.seq].set(CID.parse(cur.cid), cur.content) - return acc - }, {} as Record) - - const blobsBySeq = blobs.reduce((acc, cur) => { - acc[cur.seq] ??= [] - acc[cur.seq].push(cur.cid) - return acc - }, {} as Record) - - const opsBySeq = ops.reduce((acc, cur) => { - acc[cur.seq] ??= [] - const { action, path } = cur - const cid = cur.cid ? CID.parse(cur.cid) : null - acc[cur.seq].push({ action, path, cid }) - return acc - }, {} as Record) - - return Promise.all( - events.map(async (evt) => { - const commit = CID.parse(evt.commit) - const prev = evt.prev ? CID.parse(evt.prev) : undefined - const carSlice = await writeCar(commit, async (car) => { - const blocks = blocksBySeq[evt.seq] - if (blocks) { - for (const block of blocks.entries()) { - await car.put(block) - } - } + const seqEvts: SeqEvt[] = [] + for (const row of rows) { + const evt = cborDecode(row.event) + if (check.is(evt, commitEvt)) { + seqEvts.push({ + type: 'commit', + seq: row.seq, + time: row.sequencedAt, + evt, + }) + } else if (check.is(evt, handleEvt)) { + seqEvts.push({ + type: 'handle', + seq: row.seq, + time: row.sequencedAt, + evt, }) - const blobStrs = blobsBySeq[evt.seq] || [] - const blobs = blobStrs.map((c) => CID.parse(c)) - const ops = opsBySeq[evt.seq] || [] - return { - seq: evt.seq, - time: evt.sequencedAt, - repo: evt.did, - commit, - prev, - blocks: carSlice, - ops, - blobs, - } - }), - ) + } + } + + return seqEvts } async pollDb() { @@ -215,25 +125,8 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { } } -export type RepoAppendEvent = { - seq: number - time: string - repo: string - commit: CID - prev?: CID - blocks: Uint8Array - ops: RepoAppendOp[] - blobs: CID[] -} - -export type RepoAppendOp = { - action: string - path: string - cid: CID | null -} - type SequencerEvents = { - events: (evts: RepoAppendEvent[]) => void + events: (evts: SeqEvt[]) => void } export type SequencerEmitter = TypedEmitter diff --git a/packages/pds/src/sequencer/outbox.ts b/packages/pds/src/sequencer/outbox.ts index a8f8f042597..c3f3621949a 100644 --- a/packages/pds/src/sequencer/outbox.ts +++ b/packages/pds/src/sequencer/outbox.ts @@ -1,5 +1,5 @@ import { AsyncBuffer, AsyncBufferFullError } from '@atproto/common' -import Sequencer, { RepoAppendEvent } from '.' +import Sequencer, { SeqEvt } from '.' export type OutboxOpts = { maxBufferSize: number @@ -9,13 +9,13 @@ export class Outbox { private caughtUp = false lastSeen = -1 - cutoverBuffer: RepoAppendEvent[] - outBuffer: AsyncBuffer + cutoverBuffer: SeqEvt[] + outBuffer: AsyncBuffer constructor(public sequencer: Sequencer, opts: Partial = {}) { const { maxBufferSize = 500 } = opts this.cutoverBuffer = [] - this.outBuffer = new AsyncBuffer(maxBufferSize) + this.outBuffer = new AsyncBuffer(maxBufferSize) } // event stream occurs in 3 phases @@ -30,7 +30,7 @@ export class Outbox { async *events( backfillCursor?: number, backFillTime?: string, - ): AsyncGenerator { + ): AsyncGenerator { // catch up as much as we can if (backfillCursor !== undefined) { for await (const evt of this.getBackfill(backfillCursor, backFillTime)) { @@ -98,10 +98,10 @@ export class Outbox { for (const evt of evts) { yield evt } - // we requested 50, if we get less than that then we know we're caught up & can do cutover - if (evts.length < 50) { - break - } + // if we're within 50 of the sequencer, we call it good & switch to cutover + const seqCursor = this.sequencer.lastSeen ?? -1 + if (seqCursor - this.lastSeen < 50) break + if (evts.length < 1) break } } } diff --git a/packages/pds/src/services/account/index.ts b/packages/pds/src/services/account/index.ts index 99c60d04e44..edba837729c 100644 --- a/packages/pds/src/services/account/index.ts +++ b/packages/pds/src/services/account/index.ts @@ -8,6 +8,7 @@ import { RepoRoot } from '../../db/tables/repo-root' import { notSoftDeletedClause } from '../../db/util' import { getUserSearchQueryPg, getUserSearchQuerySqlite } from '../util/search' import { paginate, TimeCidKeyset } from '../../db/pagination' +import { sequenceHandleUpdate } from '../../sequencer' export class AccountService { constructor(public db: Database) {} @@ -141,6 +142,7 @@ export class AccountService { if (res.numUpdatedRows < 1) { throw new UserAlreadyExistsError() } + await sequenceHandleUpdate(this.db, did, handle) } async updateUserPassword(did: string, password: string) { diff --git a/packages/pds/src/services/repo/index.ts b/packages/pds/src/services/repo/index.ts index a13f62c3176..39f8a525f79 100644 --- a/packages/pds/src/services/repo/index.ts +++ b/packages/pds/src/services/repo/index.ts @@ -14,6 +14,7 @@ import { import { RepoBlobs } from './blobs' import { createWriteToOp, writeToOp } from '../../repo' import { RecordService } from '../record' +import { sequenceCommit } from '../../sequencer' export class RepoService { blobs: RepoBlobs @@ -44,10 +45,16 @@ export class RepoService { this.db.assertTransaction() const storage = new SqlRepoStorage(this.db, did, now) const writeOps = writes.map(createWriteToOp) - const repo = await Repo.create(storage, did, this.repoSigningKey, writeOps) + const commit = await Repo.formatInitCommit( + storage, + did, + this.repoSigningKey, + writeOps, + ) + await storage.applyCommit(commit) await Promise.all([ this.indexWrites(writes, now), - this.afterWriteProcessing(did, repo.cid, writes), + this.afterWriteProcessing(did, commit, writes), ]) } @@ -66,7 +73,7 @@ export class RepoService { // & send to indexing this.indexWrites(writes, now), // do any other processing needed after write - this.afterWriteProcessing(did, commitData.commit, writes), + this.afterWriteProcessing(did, commitData, writes), ]) } @@ -136,47 +143,15 @@ export class RepoService { async afterWriteProcessing( did: string, - commit: CID, + commitData: CommitData, writes: PreparedWrite[], ) { await Promise.all([ - this.blobs.processWriteBlobs(did, commit, writes), - this.indexRepoOps(did, commit, writes), - this.sequenceWrite(did, commit), + this.blobs.processWriteBlobs(did, commitData.commit, writes), + sequenceCommit(this.db, did, commitData, writes), ]) } - async indexRepoOps(did: string, commit: CID, writes: PreparedWrite[]) { - if (!writes.length) { - return - } - const ops = writes.map((w) => { - const path = w.uri.collection + '/' + w.uri.rkey - const cid = w.action === WriteOpAction.Delete ? null : w.cid.toString() - return { - did, - commit: commit.toString(), - action: w.action, - path, - cid, - } - }) - await this.db.db.insertInto('repo_op').values(ops).execute() - } - - async sequenceWrite(did: string, commit: CID) { - await this.db.db - .insertInto('repo_seq') - .values({ - did, - commit: commit.toString(), - eventType: 'repo_append', - sequencedAt: new Date().toISOString(), - }) - .execute() - await this.db.notify('repo_seq') - } - async deleteRepo(did: string) { this.db.assertTransaction() // delete all blocks from this did & no other did diff --git a/packages/pds/src/sql-repo-storage.ts b/packages/pds/src/sql-repo-storage.ts index a502c92142c..0f7a24a35b5 100644 --- a/packages/pds/src/sql-repo-storage.ts +++ b/packages/pds/src/sql-repo-storage.ts @@ -145,7 +145,7 @@ export class SqlRepoStorage extends RepoStorage { const commitBlocks: RepoCommitBlock[] = [] const commitHistory: RepoCommitHistory[] = [] for (const commit of commits) { - const commitCids = commit.relatedCids || [] + const commitCids: CID[] = [] for (const block of commit.blocks.entries()) { commitCids.push(block.cid) allBlocks.set(block.cid, block.bytes) diff --git a/packages/pds/tests/seeds/client.ts b/packages/pds/tests/seeds/client.ts index f4830b6854b..e11ee20ec82 100644 --- a/packages/pds/tests/seeds/client.ts +++ b/packages/pds/tests/seeds/client.ts @@ -103,6 +103,13 @@ export class SeedClient { return this.accounts[account.did] } + async updateHandle(by: string, handle: string) { + await this.agent.api.com.atproto.identity.updateHandle( + { handle }, + { encoding: 'application/json', headers: this.getHeaders(by) }, + ) + } + async createProfile( by: string, displayName: string, diff --git a/packages/pds/tests/sequencer.test.ts b/packages/pds/tests/sequencer.test.ts index 851604b2450..ec807c45403 100644 --- a/packages/pds/tests/sequencer.test.ts +++ b/packages/pds/tests/sequencer.test.ts @@ -1,7 +1,7 @@ import AtpAgent from '@atproto/api' import { randomStr } from '@atproto/crypto' -import { readFromGenerator, wait } from '@atproto/common' -import Sequencer, { RepoAppendEvent } from '../src/sequencer' +import { cborEncode, readFromGenerator, wait } from '@atproto/common' +import Sequencer, { SeqEvt } from '../src/sequencer' import Outbox, { StreamConsumerTooSlowError } from '../src/sequencer/outbox' import { Database } from '../src' import { SeedClient } from './seeds/client' @@ -61,13 +61,17 @@ describe('sequencer', () => { .execute() } - const evtToDbRow = (e: RepoAppendEvent) => ({ - seq: e.seq, - did: e.repo, - commit: e.commit.toString(), - eventType: 'repo_append', - sequencedAt: e.time, - }) + const evtToDbRow = (e: SeqEvt) => { + const did = e.type === 'commit' ? e.evt.repo : e.evt.did + return { + seq: e.seq, + did, + eventType: 'append', + event: Buffer.from(cborEncode(e.evt)), + invalidatedBy: null, + sequencedAt: e.time, + } + } const caughtUp = (outbox: Outbox): (() => boolean) => { return () => { diff --git a/packages/pds/tests/sync/subscribe-repos.test.ts b/packages/pds/tests/sync/subscribe-repos.test.ts index c6a3cb20a7e..6d64a37c249 100644 --- a/packages/pds/tests/sync/subscribe-repos.test.ts +++ b/packages/pds/tests/sync/subscribe-repos.test.ts @@ -1,6 +1,6 @@ import AtpAgent from '@atproto/api' import { - cidForCbor, + cborDecode, HOUR, MINUTE, readFromGenerator, @@ -11,12 +11,15 @@ import * as repo from '@atproto/repo' import { getWriteLog, MemoryBlockstore, WriteOpAction } from '@atproto/repo' import { byFrame, ErrorFrame, Frame, MessageFrame } from '@atproto/xrpc-server' import { WebSocket } from 'ws' -import { Commit as CommitEvt } from '../../src/lexicon/types/com/atproto/sync/subscribeRepos' +import { + Commit as CommitEvt, + Handle as HandleEvt, +} from '../../src/lexicon/types/com/atproto/sync/subscribeRepos' import { AppContext, Database } from '../../src' import { SeedClient } from '../seeds/client' import basicSeed from '../seeds/basic' import { CloseFn, runTestServer } from '../_util' -import { sql } from 'kysely' +import { CID } from 'multiformats/cid' describe('repo subscribe repos', () => { let serverHost: string @@ -66,11 +69,30 @@ describe('repo subscribe repos', () => { return repo.Repo.load(storage, synced.root) } - const verifyEvents = async (evts: Frame[]) => { + const getHandleEvts = (frames: Frame[]): HandleEvt[] => { + const evts: HandleEvt[] = [] + for (const frame of frames) { + if (frame instanceof MessageFrame && frame.header.t === '#handle') { + evts.push(frame.body) + } + } + return evts + } + + const verifyHandleEvent = (evt: HandleEvt, did: string, handle: string) => { + expect(evt.did).toBe(did) + expect(evt.handle).toBe(handle) + expect(typeof evt.time).toBe('string') + expect(typeof evt.seq).toBe('number') + } + + const verifyCommitEvents = async (evts: Frame[]) => { const byUser = evts.reduce((acc, cur) => { - const evt = cur.body as CommitEvt - acc[evt.repo] ??= [] - acc[evt.repo].push(evt) + if (cur instanceof MessageFrame && cur.header.t === '#commit') { + const evt = cur.body as CommitEvt + acc[evt.repo] ??= [] + acc[evt.repo].push(evt) + } return acc }, {} as Record) @@ -151,7 +173,47 @@ describe('repo subscribe repos', () => { const evts = await readTillCaughtUp(gen) ws.terminate() - await verifyEvents(evts) + await verifyCommitEvents(evts) + }) + + it('syncs handle changes', async () => { + await sc.updateHandle(alice, 'alice2.test') + await sc.updateHandle(bob, 'bob2.test') + + const ws = new WebSocket( + `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`, + ) + + const gen = byFrame(ws) + const evts = await readTillCaughtUp(gen) + ws.terminate() + + await verifyCommitEvents(evts) + const handleEvts = getHandleEvts(evts.slice(-2)) + verifyHandleEvent(handleEvts[0], alice, 'alice2.test') + verifyHandleEvent(handleEvts[1], bob, 'bob2.test') + }) + + it('does not return invalidated events', async () => { + await sc.updateHandle(alice, 'alice3.test') + await sc.updateHandle(alice, 'alice4.test') + await sc.updateHandle(alice, 'alice5.test') + await sc.updateHandle(bob, 'bob3.test') + await sc.updateHandle(bob, 'bob4.test') + await sc.updateHandle(bob, 'bob5.test') + + const ws = new WebSocket( + `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`, + ) + + const gen = byFrame(ws) + const evts = await readTillCaughtUp(gen) + ws.terminate() + + const handleEvts = getHandleEvts(evts) + expect(handleEvts.length).toBe(2) + verifyHandleEvent(handleEvts[0], alice, 'alice5.test') + verifyHandleEvent(handleEvts[1], bob, 'bob5.test') }) it('syncs new events', async () => { @@ -169,7 +231,7 @@ describe('repo subscribe repos', () => { const [evts] = await Promise.all([readAfterDelay(), postPromise]) - await verifyEvents(evts) + await verifyCommitEvents(evts) }) it('handles no backfill', async () => { @@ -214,38 +276,26 @@ describe('repo subscribe repos', () => { for (let i = 0; i < evts.length; i++) { const evt = evts[i].body as CommitEvt const seq = seqSlice[i] + const seqEvt = cborDecode(seq.event) as { commit: CID } expect(evt.time).toEqual(seq.sequencedAt) - expect(evt.commit.toString()).toEqual(seq.commit) + expect(evt.commit.equals(seqEvt.commit)).toBeTruthy() expect(evt.repo).toEqual(seq.did) } }) it('sends info frame on out of date cursor', async () => { - // we stick three new seqs in with a date past the backfill cutoff - // then we increment the sequence number of everything else to test out of date cursor - const cid = await cidForCbor({ test: 123 }) + // we rewrite the sequenceAt time for existing seqs to be past the backfill cutoff + // then we create some new posts const overAnHourAgo = new Date(Date.now() - HOUR - MINUTE).toISOString() - const dummySeq = { - did: 'did:example:test', - commit: cid.toString(), - eventType: 'repo_append' as const, - sequencedAt: overAnHourAgo, - } - const newRows = await db.db - .insertInto('repo_seq') - .values([dummySeq, dummySeq, dummySeq]) - .returning('seq') - .execute() - const newSeqs = newRows.map((r) => r.seq) - const movedToFuture = await db.db + await db.db .updateTable('repo_seq') - .set({ seq: sql`seq+1000` }) - .where('seq', 'not in', newSeqs) - .returning('seq') + .set({ sequencedAt: overAnHourAgo }) .execute() + await makePosts() + const ws = new WebSocket( - `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${newSeqs[0]}`, + `ws://${serverHost}/xrpc/com.atproto.sync.subscribeRepos?cursor=${-1}`, ) const [info, ...evts] = await readTillCaughtUp(byFrame(ws)) ws.terminate() @@ -256,7 +306,7 @@ describe('repo subscribe repos', () => { expect(info.header.t).toBe('#info') const body = info.body as Record expect(body.name).toEqual('OutdatedCursor') - expect(evts.length).toBe(movedToFuture.length) + expect(evts.length).toBe(40) }) it('errors on future cursor', async () => { diff --git a/packages/repo/src/block-map.ts b/packages/repo/src/block-map.ts index ab8379c132b..beca16f83d8 100644 --- a/packages/repo/src/block-map.ts +++ b/packages/repo/src/block-map.ts @@ -64,6 +64,14 @@ export class BlockMap { return this.map.size } + get byteSize(): number { + let size = 0 + this.forEach((bytes) => { + size += bytes.length + }) + return size + } + equals(other: BlockMap): boolean { if (this.size !== other.size) { return false diff --git a/packages/repo/src/repo.ts b/packages/repo/src/repo.ts index eb610a35655..7f652380eb7 100644 --- a/packages/repo/src/repo.ts +++ b/packages/repo/src/repo.ts @@ -16,7 +16,6 @@ import log from './logger' import BlockMap from './block-map' import { ReadableRepo } from './readable-repo' import * as util from './util' -import CidSet from './cid-set' type Params = { storage: RepoStorage @@ -37,16 +36,17 @@ export class Repo extends ReadableRepo { storage: RepoStorage, did: string, keypair: crypto.Keypair, - initialRecords: Record, + initialWrites: RecordCreateOp[] = [], ): Promise { const newBlocks = new BlockMap() - const relatedCids = new CidSet() let data = await MST.create(storage) - for (const [dataKey, cid] of Object.entries(initialRecords)) { - relatedCids.add(cid) + for (const record of initialWrites) { + const cid = await newBlocks.add(record.record) + const dataKey = util.formatDataKey(record.collection, record.rkey) data = await data.add(dataKey, cid) } + const unstoredData = await data.getUnstoredBlocks() newBlocks.addMap(unstoredData.blocks) @@ -65,37 +65,30 @@ export class Repo extends ReadableRepo { commit: commitCid, prev: null, blocks: newBlocks, - relatedCids: relatedCids.toList(), } } + static async createFromCommit( + storage: RepoStorage, + commit: CommitData, + ): Promise { + await storage.applyCommit(commit) + return Repo.load(storage, commit.commit) + } + static async create( storage: RepoStorage, did: string, keypair: crypto.Keypair, initialWrites: RecordCreateOp[] = [], ): Promise { - const newBlocks = new BlockMap() - const initialRecords: Record = {} - for (const record of initialWrites) { - const cid = await newBlocks.add(record.record) - const dataKey = util.formatDataKey(record.collection, record.rkey) - initialRecords[dataKey] = cid - } const commit = await Repo.formatInitCommit( storage, did, keypair, - initialRecords, + initialWrites, ) - newBlocks.addMap(commit.blocks) - await storage.applyCommit({ - commit: commit.commit, - prev: commit.prev, - blocks: newBlocks, - }) - log.info({ did }, `created repo`) - return Repo.load(storage, commit.commit) + return Repo.createFromCommit(storage, commit) } static async load(storage: RepoStorage, cid?: CID) { diff --git a/packages/repo/src/types.ts b/packages/repo/src/types.ts index b17caee1f2e..3c2f81c4ff0 100644 --- a/packages/repo/src/types.ts +++ b/packages/repo/src/types.ts @@ -99,7 +99,6 @@ export type CommitBlockData = { export type CommitData = CommitBlockData & { prev: CID | null - relatedCids?: CID[] } export type RepoUpdate = CommitData & {