Skip to content

Commit

Permalink
feat: allow disposal of persistence adapters, collections and sync ma…
Browse files Browse the repository at this point in the history
…nagers
  • Loading branch information
maxnowack committed Sep 27, 2024
1 parent 81ff08f commit 577d263
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 2 deletions.
4 changes: 4 additions & 0 deletions docs/collections/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ collection.batch(() => {
})
```

### `dispose()`

Disposes the collection and all its resources. This will unregister the persistence adapter and clean up all internal data structures.

## Events

The Collection class is equipped with a set of events that provide insights into the state and changes within the collection. These events, emitted by the class, can be crucial for implementing reactive behaviors and persistence management. Here is an overview of the events:
Expand Down
1 change: 1 addition & 0 deletions docs/data-persistence/other/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ While SignalDB comes with a few built-in Persistence Adapters, there may be scen
You can create a custom persistene adapter by calling the `createPersistenceAdapter` function. The function takes the adapter definition as the only argument. The definition is an object with the following keys:

* `register` (`required`, `(onChange: (data?: LoadResponse<T>) => void) => Promise<void>`): This function should register the adapter. It will be called when initializing the collection and gets an `onChange` callback as the first parameter. This callback should be called, when the data in the adapter was updated externally, so that the collection could update it's internal memory. You can also pass a `LoadResponse<T>` object to the callback (same as the return value of the `load` function), to make the implementation of your adapter more straightforward.
* `unregister` (`optional`, `() => Promise<void>`): This function should unregister the adapter. Here you can clean up things. It will be called when the `dispose` method on the collection is called.
* `load` (`required`, `() => Promise<{ items: T[] } | { changes: { added: T[], modified: T[], removed: T[] } }>`): This function loads the data from the adapter and should return all it's items or a changeset, for optimizing performance. If the load function returns an object with an `items` property, the collection will do a full load and replace all it's items with the ones from the adapter. If the `items` property is omitted in the return value of the load function, the collection will do a partial load and apply the `changes` to it's internal memory.
* `save` (`required`, `(items: T[], changes: Changeset<T>) => Promise<void>`): This function will be called from the collection, when data was updated. This function should save this data to the adapter.

Expand Down
4 changes: 4 additions & 0 deletions docs/sync/reference/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,7 @@ Initiates the push process for a collection, syncing only if there are changes.
#### Parameters

- `name` (`string`): The name of the collection.

### `dispose()`

Disposes all internal collections and other data structures.
43 changes: 41 additions & 2 deletions packages/signaldb/__tests__/Collection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,8 @@ describe('Collection', () => {
})
})

describe('performance', () => {
// eslint-disable-next-line vitest/valid-describe-callback
describe('performance', { retry: 5 }, () => {
const measureTime = (fn: () => void) => {
const start = performance.now()
fn()
Expand Down Expand Up @@ -519,7 +520,7 @@ describe('Collection', () => {
// index query should use less than 10% of the time of a non-index query
expect(percentage).toBeLessThan(10)
})
}, { retry: 5 })
})

describe('Collection Debug Mode', () => {
it('should enable debug mode globally', () => {
Expand Down Expand Up @@ -610,5 +611,43 @@ describe('Collection', () => {
expect(col.find().fetch()).toEqual([{ id: '1', name: 'John Doe' }])
})
})

it('should dipose the collection', async () => {
const col = new Collection<{ id: string, name: string }>()
col.insert({ id: '1', name: 'John' })
await col.dispose()

expect(() => col.find()).toThrowError('Collection is disposed')
expect(() => col.findOne({})).toThrowError('Collection is disposed')
expect(() => col.insert({ name: 'Jane' })).toThrowError('Collection is disposed')
expect(() => col.insertMany([{ name: 'Jerry' }])).toThrowError('Collection is disposed')
expect(() => col.updateOne({}, {})).toThrowError('Collection is disposed')
expect(() => col.updateMany({}, {})).toThrowError('Collection is disposed')
expect(() => col.removeOne({})).toThrowError('Collection is disposed')
expect(() => col.removeMany({})).toThrowError('Collection is disposed')

// @ts-expect-error - private property
expect(col.memoryArray()).toEqual([])

// @ts-expect-error - private property
expect([...col.idIndex.keys()]).toEqual([])

// @ts-expect-error - private property
expect(col.indexProviders).toEqual([])
})

it('should call unregister on the persistence adapter during dispose', async () => {
const unregister = vi.fn()
const col = new Collection({
persistence: {
register: () => Promise.resolve(),
unregister,
load: () => Promise.resolve({ items: [] }),
save: () => Promise.resolve(),
},
})
await col.dispose()
expect(unregister).toHaveBeenCalledOnce()
})
})
})
29 changes: 29 additions & 0 deletions packages/signaldb/__tests__/sync/SyncManager.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -724,3 +724,32 @@ it('should not fail while removing non existing items', async () => {
// Verify that the collection includes the remote change
expect(collection.find().fetch()).toEqual([])
})

it('should clear all internal data structures on dispose', async () => {
const syncManager = new SyncManager<any, any>({
persistenceAdapter: () => memoryPersistenceAdapter([]),
pull: vi.fn(),
push: vi.fn(),
})
const collection = new Collection<TestItem, string, any>()
await syncManager.isReady()
syncManager.addCollection(collection, { name: 'test' })

// @ts-expect-error - private property
expect(syncManager.collections.size).toBe(1)

await syncManager.dispose()

// @ts-expect-error - private property
expect(syncManager.collections.size).toBe(0)
// @ts-expect-error - private property
expect(syncManager.syncQueues.size).toBe(0)
// @ts-expect-error - private property
expect(() => syncManager.changes.insert({})).toThrowError('Collection is disposed')
// @ts-expect-error - private property
expect(() => syncManager.remoteChanges.insert({})).toThrowError('Collection is disposed')
// @ts-expect-error - private property
expect(() => syncManager.snapshots.insert({})).toThrowError('Collection is disposed')
// @ts-expect-error - private property
expect(() => syncManager.syncOperations.insert({})).toThrowError('Collection is disposed')
})
22 changes: 22 additions & 0 deletions packages/signaldb/src/Collection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ export default class Collection<T extends BaseItem<I> = BaseItem, I = any, U = T
private idIndex = new Map<string, Set<number>>()
private debugMode
private batchOperationInProgress = false
private isDisposed = false

constructor(options?: CollectionOptions<T, I, U>) {
super()
Expand Down Expand Up @@ -410,7 +411,21 @@ export default class Collection<T extends BaseItem<I> = BaseItem, I = any, U = T
)
}

/**
* Disposes the collection, runs the dispose method of the persistence adapter
* and clears all internal data structures.
*/
public async dispose() {
if (this.persistenceAdapter?.unregister) await this.persistenceAdapter.unregister()
this.persistenceAdapter = null
this.memory().map(() => this.memory().pop())
this.idIndex.clear()
this.indexProviders = []
this.isDisposed = true
}

public find<O extends FindOptions<T>>(selector?: Selector<T>, options?: O) {
if (this.isDisposed) throw new Error('Collection is disposed')
if (selector !== undefined && (!selector || typeof selector !== 'object')) throw new Error('Invalid selector')
const cursor = new Cursor<T, U>(() => this.getItems(selector), {
reactive: this.options.reactivity,
Expand All @@ -437,6 +452,7 @@ export default class Collection<T extends BaseItem<I> = BaseItem, I = any, U = T
}

public findOne<O extends Omit<FindOptions<T>, 'limit'>>(selector: Selector<T>, options?: O) {
if (this.isDisposed) throw new Error('Collection is disposed')
const cursor = this.find(selector, {
limit: 1,
...options,
Expand All @@ -457,6 +473,7 @@ export default class Collection<T extends BaseItem<I> = BaseItem, I = any, U = T
}

public insert(item: Omit<T, 'id'> & Partial<Pick<T, 'id'>>) {
if (this.isDisposed) throw new Error('Collection is disposed')
if (!item) throw new Error('Invalid item')
const newItem = { id: randomId(), ...item } as T
if (this.idIndex.has(serializeValue(newItem.id))) throw new Error('Item with same id already exists')
Expand All @@ -471,6 +488,7 @@ export default class Collection<T extends BaseItem<I> = BaseItem, I = any, U = T
}

public insertMany(items: Array<Omit<T, 'id'> & Partial<Pick<T, 'id'>>>) {
if (this.isDisposed) throw new Error('Collection is disposed')
if (!items) throw new Error('Invalid items')
if (items.length === 0) {
return []
Expand All @@ -486,6 +504,7 @@ export default class Collection<T extends BaseItem<I> = BaseItem, I = any, U = T
}

public updateOne(selector: Selector<T>, modifier: Modifier<T>) {
if (this.isDisposed) throw new Error('Collection is disposed')
if (!selector) throw new Error('Invalid selector')
if (!modifier) throw new Error('Invalid modifier')

Expand All @@ -504,6 +523,7 @@ export default class Collection<T extends BaseItem<I> = BaseItem, I = any, U = T
}

public updateMany(selector: Selector<T>, modifier: Modifier<T>) {
if (this.isDisposed) throw new Error('Collection is disposed')
if (!selector) throw new Error('Invalid selector')
if (!modifier) throw new Error('Invalid modifier')

Expand All @@ -526,6 +546,7 @@ export default class Collection<T extends BaseItem<I> = BaseItem, I = any, U = T
}

public removeOne(selector: Selector<T>) {
if (this.isDisposed) throw new Error('Collection is disposed')
if (!selector) throw new Error('Invalid selector')
const { item, index } = this.getItemAndIndex(selector)
if (item != null) {
Expand All @@ -540,6 +561,7 @@ export default class Collection<T extends BaseItem<I> = BaseItem, I = any, U = T
}

public removeMany(selector: Selector<T>) {
if (this.isDisposed) throw new Error('Collection is disposed')
if (!selector) throw new Error('Invalid selector')
const items = this.getItems(selector)

Expand Down
19 changes: 19 additions & 0 deletions packages/signaldb/src/SyncManager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ export default class SyncManager<
private remoteChanges: Collection<Change, string>
private syncQueues: Map<string, PromiseQueue> = new Map()
private persistenceReady: Promise<void>
private isDisposed = false

/**
* @param options Collection options
Expand Down Expand Up @@ -195,6 +196,21 @@ export default class SyncManager<
return this.syncQueues.get(name) as PromiseQueue
}

/**
* Clears all internal data structures
*/
public async dispose() {
this.collections.clear()
this.syncQueues.clear()
await Promise.all([
this.changes.dispose(),
this.remoteChanges.dispose(),
this.snapshots.dispose(),
this.syncOperations.dispose(),
])
this.isDisposed = true
}

/**
* Gets a collection with it's options by name
* @param name Name of the collection
Expand All @@ -217,6 +233,7 @@ export default class SyncManager<
collection: Collection<ItemType, IdType, any>,
options: SyncOptions<CollectionOptions>,
) {
if (this.isDisposed) throw new Error('SyncManager is disposed')
this.collections.set(options.name, [collection, options])
collection.on('added', (item) => {
// skip the change if it was a remote change
Expand Down Expand Up @@ -281,6 +298,7 @@ export default class SyncManager<
* Starts the sync process for all collections
*/
public async syncAll() {
if (this.isDisposed) throw new Error('SyncManager is disposed')
const errors: {id: string, error: Error}[] = []
await Promise.all([...this.collections.keys()].map(id =>
this.sync(id).catch((error: Error) => {
Expand Down Expand Up @@ -317,6 +335,7 @@ export default class SyncManager<
* @param options.onlyWithChanges If true, the sync process will only be started if there are changes.
*/
public async sync(name: string, options: { force?: boolean, onlyWithChanges?: boolean } = {}) {
if (this.isDisposed) throw new Error('SyncManager is disposed')
await this.isReady()
const entry = this.getCollection(name)
const collectionOptions = entry[1]
Expand Down
1 change: 1 addition & 0 deletions packages/signaldb/src/types/PersistenceAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ export default interface PersistenceAdapter<T extends { id: I } & Record<string,
load(): Promise<LoadResponse<T>>,
save(items: T[], changes: Changeset<T>): Promise<void>,
register(onChange: (data?: LoadResponse<T>) => void | Promise<void>): Promise<void>,
unregister?(): Promise<void>,
}

0 comments on commit 577d263

Please sign in to comment.