Skip to content

Commit

Permalink
Move sequenceCommit into actor store transaction (#3580)
Browse files Browse the repository at this point in the history
* serialize sequencing by moving sequenceCommit into actor store transaction

* do not throw on updateRepoRoot failure

* build branch

* changset

* dont build branch
  • Loading branch information
dholms authored Feb 25, 2025
1 parent 5cce766 commit d4e14b7
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 55 deletions.
5 changes: 5 additions & 0 deletions .changeset/good-dryers-poke.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@atproto/pds": patch
---

Fix bug where racing writes to the same repository can get sequenced out-of-order.
32 changes: 21 additions & 11 deletions packages/pds/src/api/com/atproto/repo/applyWrites.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
isDelete,
isUpdate,
} from '../../../../lexicon/types/com/atproto/repo/applyWrites'
import { dbLogger } from '../../../../logger'
import {
BadCommitSwapError,
InvalidRecordError,
Expand Down Expand Up @@ -119,19 +120,28 @@ export default function (server: Server, ctx: AppContext) {
const swapCommitCid = swapCommit ? CID.parse(swapCommit) : undefined

const commit = await ctx.actorStore.transact(did, async (actorTxn) => {
try {
return await actorTxn.repo.processWrites(writes, swapCommitCid)
} catch (err) {
if (err instanceof BadCommitSwapError) {
throw new InvalidRequestError(err.message, 'InvalidSwap')
} else {
throw err
}
}
const commit = await actorTxn.repo
.processWrites(writes, swapCommitCid)
.catch((err) => {
if (err instanceof BadCommitSwapError) {
throw new InvalidRequestError(err.message, 'InvalidSwap')
} else {
throw err
}
})

await ctx.sequencer.sequenceCommit(did, commit)
return commit
})

await ctx.sequencer.sequenceCommit(did, commit)
await ctx.accountManager.updateRepoRoot(did, commit.cid, commit.rev)
await ctx.accountManager
.updateRepoRoot(did, commit.cid, commit.rev)
.catch((err) => {
dbLogger.error(
{ err, did, cid: commit.cid, rev: commit.rev },
'failed to update account root',
)
})

return {
encoding: 'application/json',
Expand Down
33 changes: 19 additions & 14 deletions packages/pds/src/api/com/atproto/repo/createRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { InvalidRecordKeyError } from '@atproto/syntax'
import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server'
import { AppContext } from '../../../../context'
import { Server } from '../../../../lexicon'
import { dbLogger } from '../../../../logger'
import {
BadCommitSwapError,
InvalidRecordError,
Expand Down Expand Up @@ -82,22 +83,26 @@ export default function (server: Server, ctx: AppContext) {
}),
)
const writes = [...backlinkDeletions, write]
try {
const commit = await actorTxn.repo.processWrites(
writes,
swapCommitCid,
)
return commit
} catch (err) {
if (err instanceof BadCommitSwapError) {
throw new InvalidRequestError(err.message, 'InvalidSwap')
}
throw err
}
const commit = await actorTxn.repo
.processWrites(writes, swapCommitCid)
.catch((err) => {
if (err instanceof BadCommitSwapError) {
throw new InvalidRequestError(err.message, 'InvalidSwap')
}
throw err
})
await ctx.sequencer.sequenceCommit(did, commit)
return commit
})

await ctx.sequencer.sequenceCommit(did, commit)
await ctx.accountManager.updateRepoRoot(did, commit.cid, commit.rev)
await ctx.accountManager
.updateRepoRoot(did, commit.cid, commit.rev)
.catch((err) => {
dbLogger.error(
{ err, did, cid: commit.cid, rev: commit.rev },
'failed to update account root',
)
})

return {
encoding: 'application/json',
Expand Down
40 changes: 26 additions & 14 deletions packages/pds/src/api/com/atproto/repo/deleteRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { CID } from 'multiformats/cid'
import { AuthRequiredError, InvalidRequestError } from '@atproto/xrpc-server'
import { AppContext } from '../../../../context'
import { Server } from '../../../../lexicon'
import { dbLogger } from '../../../../logger'
import {
BadCommitSwapError,
BadRecordSwapError,
Expand Down Expand Up @@ -56,24 +57,35 @@ export default function (server: Server, ctx: AppContext) {
if (!record) {
return null // No-op if record already doesn't exist
}
try {
return await actorTxn.repo.processWrites([write], swapCommitCid)
} catch (err) {
if (
err instanceof BadCommitSwapError ||
err instanceof BadRecordSwapError
) {
throw new InvalidRequestError(err.message, 'InvalidSwap')
} else {
throw err
}
}

const commit = await actorTxn.repo
.processWrites([write], swapCommitCid)
.catch((err) => {
if (
err instanceof BadCommitSwapError ||
err instanceof BadRecordSwapError
) {
throw new InvalidRequestError(err.message, 'InvalidSwap')
} else {
throw err
}
})

await ctx.sequencer.sequenceCommit(did, commit)
return commit
})

if (commit !== null) {
await ctx.sequencer.sequenceCommit(did, commit)
await ctx.accountManager.updateRepoRoot(did, commit.cid, commit.rev)
await ctx.accountManager
.updateRepoRoot(did, commit.cid, commit.rev)
.catch((err) => {
dbLogger.error(
{ err, did, cid: commit.cid, rev: commit.rev },
'failed to update account root',
)
})
}

return {
encoding: 'application/json',
body: {
Expand Down
40 changes: 24 additions & 16 deletions packages/pds/src/api/com/atproto/repo/putRecord.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import { AppContext } from '../../../../context'
import { Server } from '../../../../lexicon'
import { ids } from '../../../../lexicon/lexicons'
import { Record as ProfileRecord } from '../../../../lexicon/types/app/bsky/actor/profile'
import { dbLogger } from '../../../../logger'
import {
BadCommitSwapError,
BadRecordSwapError,
CommitDataWithOps,
InvalidRecordError,
PreparedCreate,
PreparedUpdate,
Expand Down Expand Up @@ -104,26 +104,34 @@ export default function (server: Server, ctx: AppContext) {
}
}

let commit: CommitDataWithOps
try {
commit = await actorTxn.repo.processWrites([write], swapCommitCid)
} catch (err) {
if (
err instanceof BadCommitSwapError ||
err instanceof BadRecordSwapError
) {
throw new InvalidRequestError(err.message, 'InvalidSwap')
} else {
throw err
}
}
const commit = await actorTxn.repo
.processWrites([write], swapCommitCid)
.catch((err) => {
if (
err instanceof BadCommitSwapError ||
err instanceof BadRecordSwapError
) {
throw new InvalidRequestError(err.message, 'InvalidSwap')
} else {
throw err
}
})

await ctx.sequencer.sequenceCommit(did, commit)

return { commit, write }
},
)

if (commit !== null) {
await ctx.sequencer.sequenceCommit(did, commit)
await ctx.accountManager.updateRepoRoot(did, commit.cid, commit.rev)
await ctx.accountManager
.updateRepoRoot(did, commit.cid, commit.rev)
.catch((err) => {
dbLogger.error(
{ err, did, cid: commit.cid, rev: commit.rev },
'failed to update account root',
)
})
}

return {
Expand Down

0 comments on commit d4e14b7

Please sign in to comment.