Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework sequencer storage & add handle update #711

Merged
merged 17 commits into from
Mar 28, 2023
15 changes: 9 additions & 6 deletions lexicons/com/atproto/sync/subscribeRepos.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,29 +58,32 @@
},
"handle": {
"type": "object",
"required": ["seq", "did", "handle"],
"required": ["seq", "did", "handle", "time"],
"properties": {
"seq": {"type": "integer"},
"did": {"type": "string", "format": "did"},
"handle": {"type": "string", "format": "handle"}
"handle": {"type": "string", "format": "handle"},
"time": {"type": "string", "format": "datetime"}
}
},
"migrate": {
"type": "object",
"required": ["seq", "did", "migrateTo"],
"required": ["seq", "did", "migrateTo", "time"],
"nullable": ["migrateTo"],
"properties": {
"seq": {"type": "integer"},
"did": {"type": "string", "format": "did"},
"migrateTo": {"type": "string"}
"migrateTo": {"type": "string"},
"time": {"type": "string", "format": "datetime"}
}
},
"tombstone": {
"type": "object",
"required": ["seq", "did"],
"required": ["seq", "did", "time"],
"properties": {
"seq": {"type": "integer"},
"did": {"type": "string", "format": "did"}
"did": {"type": "string", "format": "did"},
"time": {"type": "string", "format": "datetime"}
}
},
"info": {
Expand Down
18 changes: 15 additions & 3 deletions packages/api/src/client/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2420,7 +2420,7 @@ export const schemaDict = {
},
handle: {
type: 'object',
required: ['seq', 'did', 'handle'],
required: ['seq', 'did', 'handle', 'time'],
properties: {
seq: {
type: 'integer',
Expand All @@ -2433,11 +2433,15 @@ export const schemaDict = {
type: 'string',
format: 'handle',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
migrate: {
type: 'object',
required: ['seq', 'did', 'migrateTo'],
required: ['seq', 'did', 'migrateTo', 'time'],
nullable: ['migrateTo'],
properties: {
seq: {
Expand All @@ -2450,11 +2454,15 @@ export const schemaDict = {
migrateTo: {
type: 'string',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
tombstone: {
type: 'object',
required: ['seq', 'did'],
required: ['seq', 'did', 'time'],
properties: {
seq: {
type: 'integer',
Expand All @@ -2463,6 +2471,10 @@ export const schemaDict = {
type: 'string',
format: 'did',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
info: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export interface Handle {
seq: number
did: string
handle: string
time: string
[k: string]: unknown
}

Expand All @@ -57,6 +58,7 @@ export interface Migrate {
seq: number
did: string
migrateTo: string | null
time: string
[k: string]: unknown
}

Expand All @@ -75,6 +77,7 @@ export function validateMigrate(v: unknown): ValidationResult {
export interface Tombstone {
seq: number
did: string
time: string
[k: string]: unknown
}

Expand Down
3 changes: 3 additions & 0 deletions packages/pds/src/api/com/atproto/repo/applyWrites.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ export default function (server: Server, ctx: AppContext) {
'Unvalidated writes are not yet supported.',
)
}
if (tx.writes.length > 200) {
throw new InvalidRequestError('Two many writes. Max: 200')
dholms marked this conversation as resolved.
Show resolved Hide resolved
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this check lives here because you don't want it to be a hard limit in the lexicon (i.e. for all PDS implementations), is that right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea exactly. There are cases where you may want to do more than 200 writes & i feel that we shouldn't put this at the lexicon level. There's a good chance we'll remove this check at some point when we have a better handle on the repercussions of large mutations


const hasUpdate = tx.writes.some(isUpdate)
if (hasUpdate) {
Expand Down
29 changes: 14 additions & 15 deletions packages/pds/src/api/com/atproto/sync/subscribeRepos.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { Server } from '../../../../lexicon'
import AppContext from '../../../../context'
import Outbox from '../../../../sequencer/outbox'
import { Commit } from '../../../../lexicon/types/com/atproto/sync/subscribeRepos'
import { InvalidRequestError } from '@atproto/xrpc-server'

export default function (server: Server, ctx: AppContext) {
Expand Down Expand Up @@ -32,21 +31,21 @@ export default function (server: Server, ctx: AppContext) {
}

for await (const evt of outbox.events(cursor, backfillTime)) {
const { seq, time, repo, commit, prev, blocks, ops, blobs } = evt
const toYield: Commit = {
$type: '#commit',
seq,
rebase: false,
tooBig: false,
repo,
commit,
blocks,
ops,
blobs,
time,
prev: prev ?? null,
if (evt.type === 'commit') {
yield {
$type: '#commit',
seq: evt.seq,
time: evt.time,
...evt.evt,
}
} else if (evt.type === 'handle') {
yield {
$type: '#handle',
seq: evt.seq,
time: evt.time,
...evt.evt,
}
}
yield toYield
}
})
}
2 changes: 0 additions & 2 deletions packages/pds/src/db/database-schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import * as backlink from './tables/backlink'
import * as repoCommitBlock from './tables/repo-commit-block'
import * as repoCommitHistory from './tables/repo-commit-history'
import * as ipldBlock from './tables/ipld-block'
import * as repoOp from './tables/repo-op'
import * as inviteCode from './tables/invite-code'
import * as notification from './tables/user-notification'
import * as blob from './tables/blob'
Expand All @@ -35,7 +34,6 @@ export type DatabaseSchemaType = appView.DatabaseSchemaType &
repoCommitBlock.PartialDB &
repoCommitHistory.PartialDB &
ipldBlock.PartialDB &
repoOp.PartialDB &
repoCommitBlock.PartialDB &
repoCommitHistory.PartialDB &
inviteCode.PartialDB &
Expand Down
100 changes: 100 additions & 0 deletions packages/pds/src/db/migrations/20230323T202553064Z-rework-seq.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { Kysely, sql } from 'kysely'
import { Dialect } from '..'

const repoSeqTable = 'repo_seq'
const repoOpTable = 'repo_op'
const repoSeqDidIndex = 'repo_seq_did_index'
const repoSeqCommitIndex = 'repo_seq_commit_index'
const repoSeqEventTypeIndex = 'repo_seq_event_type_index'
const repoSeqSequencedAtIndex = 'repo_seq_sequenced_at_index'

export async function up(db: Kysely<unknown>, dialect: Dialect): Promise<void> {
await db.schema.dropIndex(repoSeqCommitIndex).execute()
await db.schema.dropIndex(repoSeqDidIndex).execute()
await db.schema.dropTable(repoSeqTable).execute()
await db.schema.dropTable(repoOpTable).execute()

let builder = db.schema.createTable(repoSeqTable)
if (dialect === 'pg') {
builder = builder
.addColumn('seq', 'bigserial', (col) => col.primaryKey())
.addColumn('invalidatedBy', 'bigint')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dig the invalidatedBy idea! I wonder if we should also make it a foreign key.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah I'd be game 👍

} else {
builder = builder
.addColumn('seq', 'integer', (col) => col.autoIncrement().primaryKey())
.addColumn('invalidatedBy', 'integer')
}

const binaryDatatype = dialect === 'sqlite' ? sql`blob` : sql`bytea`
await builder
.addColumn('did', 'varchar', (col) => col.notNull())
.addColumn('eventType', 'varchar', (col) => col.notNull())
.addColumn('event', binaryDatatype, (col) => col.notNull())
.addColumn('sequencedAt', 'varchar', (col) => col.notNull())
.execute()

await db.schema
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having trouble recalling which types of queries these indices support. I kinda like the comments we've added in some of the other migrations documenting this, have found it useful!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup i'll add some comments 👍

.createIndex(repoSeqDidIndex)
.on(repoSeqTable)
.column('did')
.execute()

await db.schema
.createIndex(repoSeqEventTypeIndex)
.on(repoSeqTable)
.column('eventType')
.execute()

await db.schema
.createIndex(repoSeqSequencedAtIndex)
.on(repoSeqTable)
.column('sequencedAt')
.execute()
}

export async function down(
db: Kysely<unknown>,
dialect: Dialect,
): Promise<void> {
await db.schema.dropIndex(repoSeqSequencedAtIndex).execute()
await db.schema.dropIndex(repoSeqEventTypeIndex).execute()
await db.schema.dropIndex(repoSeqDidIndex).execute()
await db.schema.dropTable(repoSeqTable).execute()

let builder = db.schema.createTable(repoSeqTable)
if (dialect === 'pg') {
builder = builder.addColumn('seq', 'serial', (col) => col.primaryKey())
} else {
builder = builder.addColumn('seq', 'integer', (col) =>
col.autoIncrement().primaryKey(),
)
}
await builder
.addColumn('did', 'varchar', (col) => col.notNull())
.addColumn('commit', 'varchar', (col) => col.notNull())
.addColumn('eventType', 'varchar', (col) => col.notNull())
.addColumn('sequencedAt', 'varchar', (col) => col.notNull())
.execute()

await db.schema
.createIndex(repoSeqDidIndex)
.on(repoSeqTable)
.column('did')
.execute()

await db.schema
.createIndex(repoSeqCommitIndex)
.on(repoSeqTable)
.column('commit')
.execute()

await db.schema
.createTable(repoOpTable)
.addColumn('did', 'text', (col) => col.notNull())
.addColumn('commit', 'text', (col) => col.notNull())
.addColumn('action', 'text', (col) => col.notNull())
.addColumn('path', 'text', (col) => col.notNull())
.addColumn('cid', 'text')
.addPrimaryKeyConstraint('repo_op_pkey', ['did', 'commit', 'path'])
.execute()
}
1 change: 1 addition & 0 deletions packages/pds/src/db/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,4 @@ export * as _20230315T045113521Z from './20230315T045113521Z-votes-to-likes'
export * as _20230316T142245535Z from './20230316T142245535Z-remove-post-entities'
export * as _20230316T153255273Z from './20230316T153255273Z-backlinks'
export * as _20230316T225303411Z from './20230316T225303411Z-profile-display-name-empty'
export * as _20230323T202553064Z from './20230323T202553064Z-rework-seq'
13 changes: 0 additions & 13 deletions packages/pds/src/db/tables/repo-op.ts

This file was deleted.

5 changes: 3 additions & 2 deletions packages/pds/src/db/tables/repo-seq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { Generated, Selectable } from 'kysely'
export interface RepoSeq {
seq: Generated<number>
did: string
commit: string
eventType: 'repo_append'
eventType: 'append' | 'rebase' | 'handle' | 'migrate' | 'tombstone'
event: Uint8Array
invalidatedBy: number | null
sequencedAt: string
}

Expand Down
18 changes: 15 additions & 3 deletions packages/pds/src/lexicon/lexicons.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2420,7 +2420,7 @@ export const schemaDict = {
},
handle: {
type: 'object',
required: ['seq', 'did', 'handle'],
required: ['seq', 'did', 'handle', 'time'],
properties: {
seq: {
type: 'integer',
Expand All @@ -2433,11 +2433,15 @@ export const schemaDict = {
type: 'string',
format: 'handle',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
migrate: {
type: 'object',
required: ['seq', 'did', 'migrateTo'],
required: ['seq', 'did', 'migrateTo', 'time'],
nullable: ['migrateTo'],
properties: {
seq: {
Expand All @@ -2450,11 +2454,15 @@ export const schemaDict = {
migrateTo: {
type: 'string',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
tombstone: {
type: 'object',
required: ['seq', 'did'],
required: ['seq', 'did', 'time'],
properties: {
seq: {
type: 'integer',
Expand All @@ -2463,6 +2471,10 @@ export const schemaDict = {
type: 'string',
format: 'did',
},
time: {
type: 'string',
format: 'datetime',
},
},
},
info: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export interface Handle {
seq: number
did: string
handle: string
time: string
[k: string]: unknown
}

Expand All @@ -78,6 +79,7 @@ export interface Migrate {
seq: number
did: string
migrateTo: string | null
time: string
[k: string]: unknown
}

Expand All @@ -96,6 +98,7 @@ export function validateMigrate(v: unknown): ValidationResult {
export interface Tombstone {
seq: number
did: string
time: string
[k: string]: unknown
}

Expand Down
Loading