Skip to content

Commit

Permalink
feat: reindexing
Browse files Browse the repository at this point in the history
Passing the `reindex` option in the constructor will re-initialize the
index from scratch.

This is motivated by [this issue in `@comapeo/core`][0].

[0]: digidem/comapeo-core#436
  • Loading branch information
EvanHahn committed Oct 30, 2024
1 parent f2bd6d1 commit faf54d6
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 25 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ string, that should return a
is used to store the index state of each hypercore. (Index state is stored as a
bitfield).

#### opts.reindex

_Optional_\
Type: `boolean`

If `true`, the cores, and any new ones that are added, will be reindexed from
scratch.

#### opts.maxBatch

_Optional_\
Expand Down
16 changes: 13 additions & 3 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class MultiCoreIndexer extends TypedEmitter {
#rateMeasurementStart = Date.now()
#rate = 0
#createStorage
#reindex
/** @type {IndexState | undefined} */
#prevEmittedState
#emitStateBound
Expand All @@ -46,13 +47,18 @@ class MultiCoreIndexer extends TypedEmitter {
* @param {object} opts
* @param {(entries: Entry<T>[]) => Promise<void>} opts.batch
* @param {StorageParam} opts.storage
* @param {boolean} [opts.reindex]
* @param {number} [opts.maxBatch=100]
*/
constructor(cores, { batch, maxBatch = DEFAULT_BATCH_SIZE, storage }) {
constructor(
cores,
{ batch, maxBatch = DEFAULT_BATCH_SIZE, storage, reindex = false }
) {
super()
this.#createStorage = MultiCoreIndexer.defaultStorage(storage)
this.#reindex = reindex
const coreIndexStreams = cores.map((core) => {
return new CoreIndexStream(core, this.#createStorage)
return new CoreIndexStream(core, this.#createStorage, reindex)
})
this.#indexStream = new MultiCoreIndexStream(coreIndexStreams, {
highWaterMark: maxBatch,
Expand Down Expand Up @@ -95,7 +101,11 @@ class MultiCoreIndexer extends TypedEmitter {
*/
addCore(core) {
this.#assertOpen('Cannot add core after closing')
const coreIndexStream = new CoreIndexStream(core, this.#createStorage)
const coreIndexStream = new CoreIndexStream(
core,
this.#createStorage,
this.#reindex
)
this.#indexStream.addStream(coreIndexStream)
}

Expand Down
10 changes: 8 additions & 2 deletions lib/core-index-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ class CoreIndexStream extends Readable {
/**
* @param {import('hypercore')<T, any>} core
* @param {(name: string) => import('random-access-storage')} createStorage
* @param {boolean} reindex
*/
constructor(core, createStorage) {
constructor(core, createStorage, reindex) {
super({
// Treat as object stream, count each object as size `1` so that the
// `remaining` property can use the stream buffer to calculate how many
Expand All @@ -59,10 +60,15 @@ class CoreIndexStream extends Readable {
this.#handleDownloadBound = this.#handleDownload.bind(this)
this.#createStorage = async () => {
await this.#core.ready()

const { discoveryKey } = this.#core
/* c8 ignore next: just to keep TS happy - after core.ready() this is set */
if (!discoveryKey) throw new Error('Missing discovery key')
return createStorage(getStorageName(discoveryKey))
const storageName = getStorageName(discoveryKey)

if (reindex) await unlinkStorage(createStorage(storageName))

return createStorage(storageName)
}
}

Expand Down
36 changes: 36 additions & 0 deletions test/multi-core-indexer.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,42 @@ test('Entries are re-indexed if index storage unlinked', async () => {
await indexer2.close()
})

test('Entries can be explicitly reindexed with a startup option', async (t) => {
const cores = await createMultiple(3)
const [core1, core2, core3] = cores
const expectedIn1And2 = await generateFixtures([core1, core2], 3)
const expectedIn3 = await generateFixtures([core3], 3)

const storage = ram.reusable()

const indexer1 = new MultiCoreIndexer(cores, {
batch: async () => {},
storage,
})
await indexer1.idle()
await indexer1.close()

/** @type {Entry[]} */ const entries = []
const indexer2 = new MultiCoreIndexer([core1, core2], {
batch: async (data) => {
entries.push(...data)
},
storage,
reindex: true,
})
t.after(() => indexer2.close())

await indexer2.idle()
assert.deepEqual(new Set(entries), new Set(expectedIn1And2))

indexer2.addCore(core3)
await indexer2.idle()
assert.deepEqual(
new Set(entries),
new Set([...expectedIn1And2, ...expectedIn3])
)
})

test('Entries are batched to batchMax when indexing is slower than Hypercore reads', async () => {
const cores = await createMultiple(5)
await generateFixtures(cores, 500)
Expand Down
50 changes: 38 additions & 12 deletions test/unit-tests/core-index-stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const Hypercore = require('hypercore')

test('stream.core', async () => {
const a = await create()
const stream = new CoreIndexStream(a, () => new ram())
const stream = new CoreIndexStream(a, () => new ram(), false)
assert.deepEqual(stream.core, a)
})

Expand All @@ -25,7 +25,7 @@ test('destroy before open', async () => {
return new ram()
}
const a = new Hypercore(() => new ram())
const stream = new CoreIndexStream(a, createStorage)
const stream = new CoreIndexStream(a, createStorage, false)
stream.destroy()
await once(stream, 'close')
assert.equal(storageCreated, false, 'storage never created')
Expand All @@ -38,7 +38,7 @@ test('unlink before open', async () => {
return new ram()
}
const core = new Hypercore(() => new ram())
const stream = new CoreIndexStream(core, createStorage)
const stream = new CoreIndexStream(core, createStorage, false)
await stream.unlink()
assert.equal(storageCreated, true, 'storage was created')
})
Expand All @@ -50,15 +50,41 @@ test('Indexes all items already in a core', async () => {
await a.append(blocks)
/** @type {any[]} */
const entries = []
const stream = new CoreIndexStream(a, () => new ram())
const stream = new CoreIndexStream(a, () => new ram(), false)
stream.on('data', (entry) => entries.push(entry))
await once(stream, 'drained')
assert.deepEqual(entries, expected)
})

test('Re-indexing all items in a core', async () => {
const core = await create()
const blocks = generateFixture(0, 10)
const expected = blocksToExpected(blocks, core.key)
await core.append(blocks)

const storage = ram.reusable()

const stream1 = new CoreIndexStream(core, storage, false)
stream1.on('data', (entry) => {
stream1.setIndexed(entry.index)
})
await once(stream1, 'drained')
await stream1.destroy()

/** @type {any[]} */
const entries = []
const stream2 = new CoreIndexStream(core, storage, true)
stream2.on('data', (entry) => {
entries.push(entry)
})
await once(stream2, 'drained')

assert.deepEqual(entries, expected)
})

test("Empty core emits 'drained' event", async () => {
const a = await create()
const stream = new CoreIndexStream(a, () => new ram())
const stream = new CoreIndexStream(a, () => new ram(), false)
stream.resume()
stream.on('indexing', assert.fail)
await once(stream, 'drained')
Expand All @@ -72,7 +98,7 @@ test('.remaining property is accurate', async () => {
await a.append(blocks)
/** @type {any[]} */
const entries = []
const stream = new CoreIndexStream(a, () => new ram())
const stream = new CoreIndexStream(a, () => new ram(), false)
assert.equal(stream.remaining, totalBlocks)
stream.on('data', (entry) => {
entries.push(entry)
Expand All @@ -89,7 +115,7 @@ test('Indexes items appended after initial index', async () => {
const blocks = generateFixture(0, 10)
/** @type {any[]} */
const entries = []
const stream = new CoreIndexStream(a, () => new ram())
const stream = new CoreIndexStream(a, () => new ram(), false)
stream.on('data', (entry) => entries.push(entry))
await once(stream, 'drained')
assert.deepEqual(entries, [], 'no entries before append')
Expand All @@ -110,7 +136,7 @@ test('Readable stream from sparse hypercore', async () => {
const range = b.download({ start: 5, end: 20 })
await range.downloaded()

const stream = new CoreIndexStream(b, () => new ram())
const stream = new CoreIndexStream(b, () => new ram(), false)
/** @type {Buffer[]} */
const entries = []
stream.on('data', (entry) => entries.push(entry.block))
Expand All @@ -134,7 +160,7 @@ test("'indexing' and 'drained' events are paired", async () => {

replicate(a, b)

const stream = new CoreIndexStream(b, () => new ram())
const stream = new CoreIndexStream(b, () => new ram(), false)
let indexingEvents = 0
let idleEvents = 0
stream.on('indexing', () => {
Expand Down Expand Up @@ -166,7 +192,7 @@ test('Appends from a replicated core are indexed', async () => {
const range1 = b.download({ start: 0, end: b.length })
await range1.downloaded()

const stream = new CoreIndexStream(b, () => new ram())
const stream = new CoreIndexStream(b, () => new ram(), false)
/** @type {Buffer[]} */
const entries = []
stream.on('data', (entry) => entries.push(entry.block))
Expand All @@ -187,7 +213,7 @@ test('Maintains index state', async () => {
/** @type {any[]} */
const entries = []
const storage = ram.reusable()
const stream1 = new CoreIndexStream(a, storage)
const stream1 = new CoreIndexStream(a, storage, false)
stream1.on('data', (entry) => {
entries.push(entry.block)
stream1.setIndexed(entry.index)
Expand All @@ -200,7 +226,7 @@ test('Maintains index state', async () => {
stream1.destroy()
await once(stream1, 'close')
await a.append(blocks.slice(500, 1000))
const stream2 = new CoreIndexStream(a, storage)
const stream2 = new CoreIndexStream(a, storage, false)
stream2.on('data', (entry) => {
entries.push(entry.block)
stream2.setIndexed(entry.index)
Expand Down
16 changes: 8 additions & 8 deletions test/unit-tests/multi-core-index-stream.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ test('Indexes all items already in a core', async () => {
const cores = await createMultiple(5)
const expected = await generateFixtures(cores, 1000)
const indexStreams = cores.map(
(core) => new CoreIndexStream(core, () => new ram())
(core) => new CoreIndexStream(core, () => new ram(), false)
)
const entries = []
const stream = new MultiCoreIndexStream(indexStreams)
Expand All @@ -44,7 +44,7 @@ test('Adding index streams after initialization', async () => {
const cores = await createMultiple(3)
const expected = await generateFixtures(cores, 100)
const indexStreams = cores.map(
(core) => new CoreIndexStream(core, () => new ram())
(core) => new CoreIndexStream(core, () => new ram(), false)
)
const entries = []
const stream = new MultiCoreIndexStream(indexStreams.slice(0, 2))
Expand Down Expand Up @@ -74,7 +74,7 @@ test('.remaining is as expected', async () => {
const cores = await createMultiple(coreCount)
const expected = await generateFixtures(cores, blockCount)
const indexStreams = cores.map(
(core) => new CoreIndexStream(core, () => new ram())
(core) => new CoreIndexStream(core, () => new ram(), false)
)
const entries = []
const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 })
Expand Down Expand Up @@ -106,7 +106,7 @@ test('.remaining is as expected', async () => {
test('Indexes items appended after initial index', async () => {
const cores = await createMultiple(5)
const indexStreams = cores.map(
(core) => new CoreIndexStream(core, () => new ram())
(core) => new CoreIndexStream(core, () => new ram(), false)
)
const entries = []
const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 })
Expand Down Expand Up @@ -138,7 +138,7 @@ test('index sparse hypercores', async () => {
for (const core of remoteCores) {
const range = core.download({ start: 5, end: 20 })
await range.downloaded()
indexStreams.push(new CoreIndexStream(core, () => new ram()))
indexStreams.push(new CoreIndexStream(core, () => new ram(), false))
}
const entries = []
const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 })
Expand Down Expand Up @@ -172,7 +172,7 @@ test('Appends from a replicated core are indexed', async () => {
await remote.update({ wait: true })
const range = remote.download({ start: 0, end: remote.length })
await range.downloaded()
indexStreams.push(new CoreIndexStream(core, () => new ram()))
indexStreams.push(new CoreIndexStream(core, () => new ram(), false))
}
const entries = []
const stream = new MultiCoreIndexStream(indexStreams, { highWaterMark: 10 })
Expand Down Expand Up @@ -202,7 +202,7 @@ test('Maintains index state', async () => {
for (const core of cores) {
const storage = ram.reusable()
storages.push(storage)
const indexStream = new CoreIndexStream(core, storage)
const indexStream = new CoreIndexStream(core, storage, false)
indexStream.on('data', ({ index }) => {
indexStream.setIndexed(index)
})
Expand All @@ -211,7 +211,7 @@ test('Maintains index state', async () => {
}

const indexStreams = cores.map(
(core, i) => new CoreIndexStream(core, storages[i])
(core, i) => new CoreIndexStream(core, storages[i], false)
)
const stream = new MultiCoreIndexStream(indexStreams)
stream.on('data', (entry) => {
Expand Down

0 comments on commit faf54d6

Please sign in to comment.