Skip to content

Commit

Permalink
Merge pull request #711 from bluesky-social/rework-seqs
Browse files Browse the repository at this point in the history
Rework sequencer storage & add handle update
  • Loading branch information
dholms authored Mar 28, 2023
2 parents 1008ee4 + 94677b6 commit 6bec87c
Show file tree
Hide file tree
Showing 24 changed files with 507 additions and 294 deletions.
21 changes: 13 additions & 8 deletions lexicons/com/atproto/sync/subscribeRepos.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@
"prev": {"type": "cid-link"},
"blocks": {
"type": "bytes",
"description": "CAR file containing relevant blocks"
"description": "CAR file containing relevant blocks",
"maxLength": 1000000
},
"ops": {
"type": "array",
"items": { "type": "ref", "ref": "#repoOp"}
"items": { "type": "ref", "ref": "#repoOp"},
"maxLength": 200
},
"blobs": {
"type": "array",
Expand All @@ -58,29 +60,32 @@
},
"handle": {
"type": "object",
"required": ["seq", "did", "handle"],
"required": ["seq", "did", "handle", "time"],
"properties": {
"seq": {"type": "integer"},
"did": {"type": "string", "format": "did"},
"handle": {"type": "string", "format": "handle"}
"handle": {"type": "string", "format": "handle"},
"time": {"type": "string", "format": "datetime"}
}
},
"migrate": {
"type": "object",
"required": ["seq", "did", "migrateTo"],
"required": ["seq", "did", "migrateTo", "time"],
"nullable": ["migrateTo"],
"properties": {
"seq": {"type": "integer"},
"did": {"type": "string", "format": "did"},
"migrateTo": {"type": "string"}
"migrateTo": {"type": "string"},
"time": {"type": "string", "format": "datetime"}
}
},
"tombstone": {
"type": "object",
"required": ["seq", "did"],
"required": ["seq", "did", "time"],
"properties": {
"seq": {"type": "integer"},
"did": {"type": "string", "format": "did"}
"did": {"type": "string", "format": "did"},
"time": {"type": "string", "format": "datetime"}
}
},
"info": {
Expand Down
20 changes: 17 additions & 3 deletions packages/api/src/client/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2398,13 +2398,15 @@ export const schemaDict = {
blocks: {
type: 'bytes',
description: 'CAR file containing relevant blocks',
maxLength: 1000000,
},
ops: {
type: 'array',
items: {
type: 'ref',
ref: 'lex:com.atproto.sync.subscribeRepos#repoOp',
},
maxLength: 200,
},
blobs: {
type: 'array',
Expand All @@ -2420,7 +2422,7 @@ export const schemaDict = {
},
handle: {
type: 'object',
required: ['seq', 'did', 'handle'],
required: ['seq', 'did', 'handle', 'time'],
properties: {
seq: {
type: 'integer',
Expand All @@ -2433,11 +2435,15 @@ export const schemaDict = {
type: 'string',
format: 'handle',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
migrate: {
type: 'object',
required: ['seq', 'did', 'migrateTo'],
required: ['seq', 'did', 'migrateTo', 'time'],
nullable: ['migrateTo'],
properties: {
seq: {
Expand All @@ -2450,11 +2456,15 @@ export const schemaDict = {
migrateTo: {
type: 'string',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
tombstone: {
type: 'object',
required: ['seq', 'did'],
required: ['seq', 'did', 'time'],
properties: {
seq: {
type: 'integer',
Expand All @@ -2463,6 +2473,10 @@ export const schemaDict = {
type: 'string',
format: 'did',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
info: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export interface Handle {
seq: number
did: string
handle: string
time: string
[k: string]: unknown
}

Expand All @@ -57,6 +58,7 @@ export interface Migrate {
seq: number
did: string
migrateTo: string | null
time: string
[k: string]: unknown
}

Expand All @@ -75,6 +77,7 @@ export function validateMigrate(v: unknown): ValidationResult {
export interface Tombstone {
seq: number
did: string
time: string
[k: string]: unknown
}

Expand Down
3 changes: 3 additions & 0 deletions packages/pds/src/api/com/atproto/repo/applyWrites.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ export default function (server: Server, ctx: AppContext) {
'Unvalidated writes are not yet supported.',
)
}
if (tx.writes.length > 200) {
throw new InvalidRequestError('Too many writes. Max: 200')
}

const hasUpdate = tx.writes.some(isUpdate)
if (hasUpdate) {
Expand Down
29 changes: 14 additions & 15 deletions packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import Outbox from '../../../../sequencer/outbox'
import { Commit } from '../../../../lexicon/types/com/atproto/sync/subscribeRepos'
import { InvalidRequestError } from '@atproto/xrpc-server'

export default function (server: Server, ctx: AppContext) {
Expand Down Expand Up @@ -32,21 +31,21 @@ export default function (server: Server, ctx: AppContext) {
}

for await (const evt of outbox.events(cursor, backfillTime)) {
const { seq, time, repo, commit, prev, blocks, ops, blobs } = evt
const toYield: Commit = {
$type: '#commit',
seq,
rebase: false,
tooBig: false,
repo,
commit,
blocks,
ops,
blobs,
time,
prev: prev ?? null,
if (evt.type === 'commit') {
yield {
$type: '#commit',
seq: evt.seq,
time: evt.time,
...evt.evt,
}
} else if (evt.type === 'handle') {
yield {
$type: '#handle',
seq: evt.seq,
time: evt.time,
...evt.evt,
}
}
yield toYield
}
})
}
2 changes: 0 additions & 2 deletions packages/pds/src/db/database-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import * as backlink from './tables/backlink'
import * as repoCommitBlock from './tables/repo-commit-block'
import * as repoCommitHistory from './tables/repo-commit-history'
import * as ipldBlock from './tables/ipld-block'
import * as repoOp from './tables/repo-op'
import * as inviteCode from './tables/invite-code'
import * as notification from './tables/user-notification'
import * as blob from './tables/blob'
Expand All @@ -35,7 +34,6 @@ export type DatabaseSchemaType = appView.DatabaseSchemaType &
repoCommitBlock.PartialDB &
repoCommitHistory.PartialDB &
ipldBlock.PartialDB &
repoOp.PartialDB &
repoCommitBlock.PartialDB &
repoCommitHistory.PartialDB &
inviteCode.PartialDB &
Expand Down
110 changes: 110 additions & 0 deletions packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import { Kysely, sql } from 'kysely'
import { Dialect } from '..'

const repoSeqTable = 'repo_seq'
const repoOpTable = 'repo_op'
const repoSeqDidIndex = 'repo_seq_did_index'
const repoSeqCommitIndex = 'repo_seq_commit_index'
const repoSeqEventTypeIndex = 'repo_seq_event_type_index'
const repoSeqSequencedAtIndex = 'repo_seq_sequenced_at_index'

export async function up(db: Kysely<unknown>, dialect: Dialect): Promise<void> {
await db.schema.dropIndex(repoSeqCommitIndex).execute()
await db.schema.dropIndex(repoSeqDidIndex).execute()
await db.schema.dropTable(repoSeqTable).execute()
await db.schema.dropTable(repoOpTable).execute()

let builder = db.schema.createTable(repoSeqTable)
if (dialect === 'pg') {
builder = builder
.addColumn('seq', 'bigserial', (col) => col.primaryKey())
.addColumn('invalidatedBy', 'bigint')
} else {
builder = builder
.addColumn('seq', 'integer', (col) => col.autoIncrement().primaryKey())
.addColumn('invalidatedBy', 'integer')
}

const binaryDatatype = dialect === 'sqlite' ? sql`blob` : sql`bytea`
await builder
.addColumn('did', 'varchar', (col) => col.notNull())
.addColumn('eventType', 'varchar', (col) => col.notNull())
.addColumn('event', binaryDatatype, (col) => col.notNull())
.addColumn('sequencedAt', 'varchar', (col) => col.notNull())
.addForeignKeyConstraint(
'invalidated_by_fkey',
// @ts-ignore
['invalidatedBy'],
'repo_seq',
['seq'],
)
.execute()

// for filtering seqs based on did
await db.schema
.createIndex(repoSeqDidIndex)
.on(repoSeqTable)
.column('did')
.execute()

// for filtering seqs based on event type
await db.schema
.createIndex(repoSeqEventTypeIndex)
.on(repoSeqTable)
.column('eventType')
.execute()

// for entering into the seq stream at a particular time
await db.schema
.createIndex(repoSeqSequencedAtIndex)
.on(repoSeqTable)
.column('sequencedAt')
.execute()
}

export async function down(
db: Kysely<unknown>,
dialect: Dialect,
): Promise<void> {
await db.schema.dropIndex(repoSeqSequencedAtIndex).execute()
await db.schema.dropIndex(repoSeqEventTypeIndex).execute()
await db.schema.dropIndex(repoSeqDidIndex).execute()
await db.schema.dropTable(repoSeqTable).execute()

let builder = db.schema.createTable(repoSeqTable)
if (dialect === 'pg') {
builder = builder.addColumn('seq', 'serial', (col) => col.primaryKey())
} else {
builder = builder.addColumn('seq', 'integer', (col) =>
col.autoIncrement().primaryKey(),
)
}
await builder
.addColumn('did', 'varchar', (col) => col.notNull())
.addColumn('commit', 'varchar', (col) => col.notNull())
.addColumn('eventType', 'varchar', (col) => col.notNull())
.addColumn('sequencedAt', 'varchar', (col) => col.notNull())
.execute()

await db.schema
.createIndex(repoSeqDidIndex)
.on(repoSeqTable)
.column('did')
.execute()

await db.schema
.createIndex(repoSeqCommitIndex)
.on(repoSeqTable)
.column('commit')
.execute()

await db.schema
.createTable(repoOpTable)
.addColumn('did', 'text', (col) => col.notNull())
.addColumn('commit', 'text', (col) => col.notNull())
.addColumn('action', 'text', (col) => col.notNull())
.addColumn('path', 'text', (col) => col.notNull())
.addColumn('cid', 'text')
.addPrimaryKeyConstraint('repo_op_pkey', ['did', 'commit', 'path'])
.execute()
}
1 change: 1 addition & 0 deletions packages/pds/src/db/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ export * as _20230315T045113521Z from './20230315T045113521Z-votes-to-likes'
export * as _20230316T142245535Z from './20230316T142245535Z-remove-post-entities'
export * as _20230316T153255273Z from './20230316T153255273Z-backlinks'
export * as _20230316T225303411Z from './20230316T225303411Z-profile-display-name-empty'
export * as _20230323T202553064Z from './20230323T202553064Z-rework-seq'
13 changes: 0 additions & 13 deletions packages/pds/src/db/tables/repo-op.ts

This file was deleted.

5 changes: 3 additions & 2 deletions packages/pds/src/db/tables/repo-seq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { Generated, Selectable } from 'kysely'
export interface RepoSeq {
seq: Generated<number>
did: string
commit: string
eventType: 'repo_append'
eventType: 'append' | 'rebase' | 'handle' | 'migrate' | 'tombstone'
event: Uint8Array
invalidatedBy: number | null
sequencedAt: string
}

Expand Down
Loading

0 comments on commit 6bec87c

Please sign in to comment.