Skip to content

Commit

Permalink
Moved indexedAt time to record instead of ipld-block (#501)
Browse files Browse the repository at this point in the history
* moved indexedAt time to record instead of ipld-block

* use underlying kysely obj in tests

* pr feedback
  • Loading branch information
dholms authored Jan 31, 2023
1 parent 5821a67 commit b455b91
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 8 deletions.
14 changes: 14 additions & 0 deletions packages/pds/src/db/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,20 @@ export class Database {
await this.db.destroy()
}

async migrateToOrThrow(migration: string) {
if (this.schema !== undefined) {
await this.db.schema.createSchema(this.schema).ifNotExists().execute()
}
const { error, results } = await this.migrator.migrateTo(migration)
if (error) {
throw error
}
if (!results) {
throw new Error('An unknown failure occurred while migrating')
}
return results
}

async migrateToLatestOrThrow() {
if (this.schema !== undefined) {
await this.db.schema.createSchema(this.schema).ifNotExists().execute()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { Kysely } from 'kysely'

export async function up(db: Kysely<any>): Promise<void> {
const now = new Date().toISOString()
await db.schema
.alterTable('record')
.addColumn('indexedAt', 'varchar', (col) => col.notNull().defaultTo(now))
.execute()

const ref = db.dynamic.ref

const indexedAtForRecordQb = db
.selectFrom('ipld_block')
.whereRef('ipld_block.cid', '=', ref('record.cid'))
.select('indexedAt')

await db
.updateTable('record')
.set({
indexedAt: indexedAtForRecordQb,
})
.whereExists(indexedAtForRecordQb)
.execute()

await db.schema.alterTable('ipld_block').dropColumn('indexedAt').execute()
}

export async function down(db: Kysely<unknown>): Promise<void> {
const now = new Date().toISOString()
await db.schema
.alterTable('ipld_block')
.addColumn('indexedAt', 'varchar', (col) => col.notNull().defaultTo(now))
.execute()
await db.schema.alterTable('record').dropColumn('indexedAt').execute()
}
3 changes: 2 additions & 1 deletion packages/pds/src/db/migrations/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ export * as _20221212T195416407Z from './20221212T195416407Z-post-media'
export * as _20221215T220356370Z from './20221215T220356370Z-password-reset-otp'
export * as _20221226T213635517Z from './20221226T213635517Z-mute-init'
export * as _20221230T215012029Z from './20221230T215012029Z-moderation-init'
export * as _20230118T223059239Z from './20230118T223059239Z-repo-sync-data'
export * as _20230127T215753149Z from './20230127T215753149Z-indexed-at-on-record'
export * as _20230127T224743452Z from './20230127T224743452Z-repo-sync-data'
1 change: 0 additions & 1 deletion packages/pds/src/db/tables/ipld-block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ export interface IpldBlock {
cid: string
size: number
content: Uint8Array
indexedAt: string
}

export const tableName = 'ipld_block'
Expand Down
1 change: 1 addition & 0 deletions packages/pds/src/db/tables/record.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export interface Record {
did: string
collection: string
rkey: string
indexedAt: string
takedownId: number | null
}

Expand Down
2 changes: 1 addition & 1 deletion packages/pds/src/services/feed/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ export class FeedService {
'post.uri as uri',
'post.cid as cid',
'post.creator as creator',
'post.indexedAt as indexedAt',
'ipld_block.content as recordBytes',
'ipld_block.indexedAt as indexedAt',
db
.selectFrom('vote')
.whereRef('subject', '=', ref('post.uri'))
Expand Down
1 change: 1 addition & 0 deletions packages/pds/src/services/record/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export class RecordService {
did: uri.host,
collection: uri.collection,
rkey: uri.rkey,
indexedAt: timestamp || new Date().toISOString(),
}
if (!record.did.startsWith('did:')) {
throw new Error('Expected indexed URI to contain DID')
Expand Down
2 changes: 0 additions & 2 deletions packages/pds/src/sql-repo-storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ export class SqlRepoStorage extends RepoStorage {
cid: cid.toString(),
size: block.length,
content: block,
indexedAt: this.timestamp || new Date().toISOString(),
})
.onConflict((oc) => oc.doNothing())
.execute()
Expand All @@ -142,7 +141,6 @@ export class SqlRepoStorage extends RepoStorage {
cid: cid.toString(),
size: bytes.length,
content: bytes,
indexedAt: this.timestamp || new Date().toISOString(),
})
creators.push({
cid: cid.toString(),
Expand Down
1 change: 0 additions & 1 deletion packages/pds/tests/duplicate-records.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ describe('duplicate record', () => {
cid: cid.toString(),
size: bytes.length,
content: bytes,
indexedAt: new Date().toISOString(),
})
.onConflict((oc) => oc.doNothing())
.execute()
Expand Down
77 changes: 77 additions & 0 deletions packages/pds/tests/migrations/indexed-at-on-record.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { Database } from '../../src'
import { randomStr } from '@atproto/crypto'
import { dataToCborBlock, TID } from '@atproto/common'
import { AtUri } from '@atproto/uri'
import { Kysely } from 'kysely'

describe('indexedAt on record migration', () => {
let db: Database
let rawDb: Kysely<any>

beforeAll(async () => {
if (process.env.DB_POSTGRES_URL) {
db = Database.postgres({
url: process.env.DB_POSTGRES_URL,
schema: 'migration_indexed_at_on_record',
})
} else {
db = Database.memory()
}

await db.migrateToOrThrow('_20221230T215012029Z')
rawDb = db.db
})

afterAll(async () => {
await db.close()
})

const randomDate = () => {
const start = new Date(2022, 0, 1)
const end = new Date()
return new Date(
start.getTime() + Math.random() * (end.getTime() - start.getTime()),
).toISOString()
}

const times: { [cid: string]: string } = {}

it('fills the db with some records & blocks', async () => {
const blocks: any[] = []
const records: any[] = []
for (let i = 0; i < 100; i++) {
const date = randomDate()
const record = { test: randomStr(8, 'base32') }
const block = await dataToCborBlock(record)
blocks.push({
cid: block.cid.toString(),
content: block.bytes,
size: block.bytes.length,
indexedAt: date,
})
const uri = AtUri.make('did:example:alice', 'fake.posts', TID.nextStr())
records.push({
uri: uri.toString(),
cid: block.cid.toString(),
did: uri.hostname,
collection: uri.collection,
rkey: uri.rkey,
})
times[block.cid.toString()] = date
}

await rawDb.insertInto('ipld_block').values(blocks).execute()
await rawDb.insertInto('record').values(records).execute()
})

it('migrates up', async () => {
await db.migrateToOrThrow('_20230127T215753149Z')
})

it('associated the date to the correct record', async () => {
const res = await rawDb.selectFrom('record').selectAll().execute()
res.forEach((row) => {
expect(row.indexedAt).toEqual(times[row.cid])
})
})
})
4 changes: 2 additions & 2 deletions packages/pds/tests/migrations/repo-sync-data.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ describe('repo sync data migration', () => {
})

it('migrates down', async () => {
const migration = await db.migrator.migrateTo('_20221230T215012029Z')
const migration = await db.migrator.migrateTo('_20230127T215753149Z')
expect(migration.error).toBeUndefined()
// normal syntax for async exceptions as not catching the error for some reason
let err1
Expand All @@ -72,7 +72,7 @@ describe('repo sync data migration', () => {
})

it('migrates up', async () => {
const migration = await db.migrator.migrateTo('_20230118T223059239Z')
const migration = await db.migrator.migrateTo('_20230127T224743452Z')
expect(migration.error).toBeUndefined()
const migratedHistory = await getHistory()
const migratedBlocks = await getBlocks()
Expand Down

0 comments on commit b455b91

Please sign in to comment.