Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add project.close() #375

Merged
merged 22 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
8adec47
initial implementation of project.close()
Nov 13, 2023
7529dc6
add `close` to Datastore, use `coreManager.close()`
Nov 13, 2023
356d4ee
add sqlite as a private field, `.close()` it on `MapeoProject.close()`
Nov 13, 2023
1df497d
close dataStores in parallel
Nov 15, 2023
e3c395c
await #coreManager.close and dataStore promises
Nov 16, 2023
0e7ce31
update multi-core-indexer to alpha8
Nov 16, 2023
1062410
update lock
Nov 16, 2023
3e33597
Merge branch 'main' of github.com:digidem/mapeo-core-next into feat/p…
Nov 16, 2023
ef7f93e
fix package-lock, add tests to close
Nov 21, 2023
bd88106
fix `.getMany` test for `project.close()`
Nov 21, 2023
904ee40
add tests for creating project after `project.close()`
Nov 21, 2023
f218f09
Merge branch 'main' of github.com:digidem/mapeo-core-next into feat/p…
Nov 21, 2023
6774279
Merge branch 'main' of github.com:digidem/mapeo-core-next into feat/p…
Dec 4, 2023
57ba9fc
* remove 'multiCoreIndexer.removeAllListener()' (the class is already
Dec 4, 2023
5fee846
remove cached project in manager after closing project
achou11 Dec 5, 2023
1ad7057
remove added listeners in mapeo project after close
achou11 Dec 5, 2023
a1b742e
[OPTIC-RELEASE-AUTOMATION] release/v9.0.0-alpha.3 (#404)
optic-release-automation[bot] Dec 5, 2023
c286486
Revert "[OPTIC-RELEASE-AUTOMATION] release/v9.0.0-alpha.3 (#404)"
achou11 Dec 5, 2023
d54311b
Merge branch 'main' into feat/projectClose
achou11 Dec 5, 2023
fd83222
update flaky sync e2e test now that project.close() is implemented
achou11 Dec 5, 2023
169af66
const instead of let in close() method
achou11 Dec 5, 2023
02ba36e
fix: close cores after indexing is closed
gmaclennan Dec 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/datastore/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ export class DataStore extends TypedEmitter {
return decode(block, { coreDiscoveryKey, index })
}

/** @param {Buffer} buf} */
/** @param {Buffer} buf */
async writeRaw(buf) {
const { length } = await this.#writerCore.append(buf)
const index = length - 1
Expand All @@ -219,6 +219,9 @@ export class DataStore extends TypedEmitter {
return block
}

async close() {
await this.#coreIndexer.close()
}
#handleIndexerIdle = () => {
for (const eventName of this.eventNames()) {
if (!(eventName in this.#pendingEmits)) continue
Expand Down
8 changes: 8 additions & 0 deletions src/mapeo-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,10 @@ export class MapeoManager extends TypedEmitter {
projectSecretKey: projectKeypair.secretKey,
})

project.once('close', () => {
this.#activeProjects.delete(projectPublicId)
})

// 5. Write project name and any other relevant metadata to project instance
await project.$setProjectSettings(settings)

Expand Down Expand Up @@ -369,6 +373,10 @@ export class MapeoManager extends TypedEmitter {

const project = this.#createProjectInstance(projectKeys)

project.once('close', () => {
this.#activeProjects.delete(projectPublicId)
})

// 3. Keep track of project instance as we know it's a properly existing project
this.#activeProjects.set(projectPublicId, project)

Expand Down
67 changes: 57 additions & 10 deletions src/mapeo-project.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { decodeBlockPrefix } from '@mapeo/schema'
import { drizzle } from 'drizzle-orm/better-sqlite3'
import { migrate } from 'drizzle-orm/better-sqlite3/migrator'
import { discoveryKey } from 'hypercore-crypto'
import { TypedEmitter } from 'tiny-typed-emitter'

import { CoreManager, NAMESPACES } from './core-manager/index.js'
import { DataStore } from './datastore/index.js'
Expand Down Expand Up @@ -51,7 +52,10 @@ export const kBlobStore = Symbol('blobStore')
export const kProjectReplicate = Symbol('replicate project')
const EMPTY_PROJECT_SETTINGS = Object.freeze({})

export class MapeoProject {
/**
* @extends {TypedEmitter<{ close: () => void }>}
*/
export class MapeoProject extends TypedEmitter {
#projectId
#deviceId
#coreManager
Expand All @@ -60,6 +64,9 @@ export class MapeoProject {
#blobStore
#coreOwnership
#capabilities
/** @ts-ignore */
#ownershipWriteDone
#sqlite
#memberApi
#iconApi
#syncApi
Expand Down Expand Up @@ -97,13 +104,15 @@ export class MapeoProject {
localPeers,
logger,
}) {
super()

this.#l = Logger.create('project', logger)
this.#deviceId = getDeviceId(keyManager)
this.#projectId = projectKeyToId(projectKey)

///////// 1. Setup database
const sqlite = new Database(dbPath)
const db = drizzle(sqlite)
this.#sqlite = new Database(dbPath)
const db = drizzle(this.#sqlite)
migrate(db, { migrationsFolder: projectMigrationsFolder })

///////// 2. Setup random-access-storage functions
Expand All @@ -124,7 +133,7 @@ export class MapeoProject {
projectKey,
keyManager,
storage: coreManagerStorage,
sqlite,
sqlite: this.#sqlite,
logger: this.#l,
})

Expand All @@ -138,7 +147,7 @@ export class MapeoProject {
deviceInfoTable,
iconTable,
],
sqlite,
sqlite: this.#sqlite,
getWinner,
mapDoc: (doc, version) => {
switch (doc.schemaName) {
Expand Down Expand Up @@ -294,17 +303,32 @@ export class MapeoProject {
this.#coreManager.creatorCore.replicate(peer.protomux)
}

/**
* @type {import('./local-peers.js').LocalPeersEvents['peer-add']}
*/
const onPeerAdd = (peer) => {
this.#coreManager.creatorCore.replicate(peer.protomux)
}

/**
* @type {import('./local-peers.js').LocalPeersEvents['discovery-key']}
*/
const onDiscoverykey = (discoveryKey, stream) => {
this.#syncApi[kHandleDiscoveryKey](discoveryKey, stream)
}

// When a new peer is found, try to replicate (if it is not a member of the
// project it will fail the capability check and be ignored)
localPeers.on('peer-add', (peer) => {
this.#coreManager.creatorCore.replicate(peer.protomux)
})
localPeers.on('peer-add', onPeerAdd)

// This happens whenever a peer replicates a core to the stream. SyncApi
// handles replicating this core if we also have it, or requesting the key
// for the core.
localPeers.on('discovery-key', (discoveryKey, stream) => {
this.#syncApi[kHandleDiscoveryKey](discoveryKey, stream)
localPeers.on('discovery-key', onDiscoverykey)

this.once('close', () => {
localPeers.off('peer-add', onPeerAdd)
localPeers.off('discovery-key', onDiscoverykey)
})

this.#l.log('Created project instance %h', projectKey)
Expand Down Expand Up @@ -339,6 +363,29 @@ export class MapeoProject {
return this.#deviceId
}

/**
* Resolves when hypercores have all loaded
*/
async ready() {
await Promise.all([this.#coreManager.ready(), this.#ownershipWriteDone])
}

/**
*/
async close() {
this.#l.log('closing project %h', this.#projectId)
const dataStorePromises = []
for (const dataStore of Object.values(this.#dataStores)) {
dataStorePromises.push(dataStore.close())
}
await Promise.all(dataStorePromises)
await this.#coreManager.close()

this.#sqlite.close()

this.emit('close')
}

/**
* @param {import('multi-core-indexer').Entry[]} entries
* @param {{projectIndexWriter: IndexWriter, sharedIndexWriter: IndexWriter}} indexWriters
Expand Down
66 changes: 66 additions & 0 deletions test-e2e/project-crud.js
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,71 @@ test('CRUD operations', async (t) => {
'expected values returns from getMany()'
)
})

t.test('create, close and then create, update', async (st) => {
const projectId = await manager.createProject()
const project = await manager.getProject(projectId)
const values = new Array(5).fill(null).map(() => {
return getUpdateFixture(value)
})
for (const value of values) {
// @ts-ignore
await project[schemaName].create(value)
}
// @ts-ignore
const written = await project[schemaName].create(value)
await project.close()

await st.exception(async () => {
const updateValue = getUpdateFixture(value)
// @ts-ignore
await project[schemaName].update(written.versionId, updateValue)
}, 'should fail updating since the project is already closed')

await st.exception(async () => {
for (const value of values) {
// @ts-ignore
await project[schemaName].create(value)
}
}, 'should fail creating since the project is already closed')

// @ts-ignore
await st.exception.all(async () => {
await project[schemaName].getMany()
}, 'should fail getting since the project is already closed')
})

t.test('create, read, close, re-open, read', async (st) => {
const projectId = await manager.createProject()

let project = await manager.getProject(projectId)

const values = new Array(5).fill(null).map(() => {
return getUpdateFixture(value)
})

for (const value of values) {
// @ts-ignore
await project[schemaName].create(value)
}

const many1 = await project[schemaName].getMany()
const manyValues1 = many1.map((doc) => valueOf(doc))

// close it
await project.close()

// re-open project
project = await manager.getProject(projectId)

const many2 = await project[schemaName].getMany()
const manyValues2 = many2.map((doc) => valueOf(doc))

st.alike(
stripUndef(manyValues1),
stripUndef(manyValues2),
'expected values returned before closing and after re-opening'
)
})
}
})
4 changes: 1 addition & 3 deletions test-e2e/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,5 @@ test('no sync capabilities === no namespaces sync apart from auth', async (t) =>

await disconnect1()

// Temp fix until we have .close() method - waits for indexing idle to ensure
// we don't close storage in teardown while index is still being written.
await Promise.all(projects.map((p) => p.$getProjectSettings()))
await Promise.all(projects.map((p) => p.close()))
})
Loading