Skip to content

Commit

Permalink
CHANGE RxCollection.findByIds() now returns a RxQuery (#4191)
Browse files Browse the repository at this point in the history
* CHANGE `RxCollection.findByIds()` now returns a `RxQuery`

* ADD orga

* FIX do not return deleted documents
  • Loading branch information
pubkey authored Dec 21, 2022
1 parent 69dc866 commit 6adb2ed
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 163 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
- CHANGE Make RxDocuments immutable
- ADD `RxDocument.getLatest()`
- REMOVE deprecated `babel-plugin-transform-async-to-promises` plugin.
- CHANGE `RxCollection.findByIds()` now returns a `RxQuery`.
- REMOVED `RxCollection.findByIds$`, use `RxCollection.findByIds().$` instead.

<!-- ADD new changes here! -->

Expand Down
7 changes: 2 additions & 5 deletions orga/before-next-major.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ The `selector`part of queries is currently not fully typed.
Hint: We can find out the possible doc field names via https://stackoverflow.com/questions/58434389/typescript-deep-keyof-of-a-nested-object/58436959#58436959


## Use normal RxQuery for `findByIds$` and `findByIds`
## Use normal RxQuery for `findByIds$` and `findByIds` [DONE]

Atm `findByIds$` and `findByIds` are implemented with their own query and observe logic.
This is not necessary and confusing for the user.
Expand All @@ -61,9 +61,6 @@ Instead we should use a different `RxQuery.op` and use normal `RxQuery` objects
The user would call it like normal queries but with a different method input:

```ts
const result = await myRxCollection.findById('foo').exec();
const result$ = await myRxCollection.findById('foo').$;

const results = await myRxCollection.findByIds(['foo', 'bar']).exec();
const results$ = await myRxCollection.findByIds(['foo', 'bar']).$;
```
Expand All @@ -81,7 +78,7 @@ Also rename the method names and variables inside of the plugins.

If multiple atomic updates are run on the same document at the same time, we should merge them together and do a single database write.

## Add enum-compression to the key-compressio plugin
## Add enum-compression to the key-compression plugin
- Also rename the key-compression plugin to be just called 'compression'

## Fix migration+replication
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/backup/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ export class RxBackupState {
.filter((elem, pos, arr) => arr.indexOf(elem) === pos); // unique
await this.database.requestIdlePromise();

const docs: Map<string, RxDocument> = await collection.findByIds(docIds);
const docs: Map<string, RxDocument> = await collection.findByIds(docIds).exec();
if (docs.size === 0) {
hasMore = false;
continue;
Expand Down
147 changes: 12 additions & 135 deletions src/rx-collection.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import {
filter,
startWith,
mergeMap,
shareReplay
mergeMap
} from 'rxjs/operators';

import {
Expand All @@ -14,7 +12,6 @@ import {
getFromMapOrThrow,
PROMISE_RESOLVE_FALSE,
PROMISE_RESOLVE_VOID,
RXJS_SHARE_REPLAY_DEFAULTS,
getDefaultRxDocumentMeta,
getDefaultRevision
} from './util';
Expand Down Expand Up @@ -392,7 +389,7 @@ export class RxCollectionBase<
};
}

const rxDocumentMap = await this.findByIds(ids);
const rxDocumentMap = await this.findByIds(ids).exec();
const docsData: RxDocumentData<RxDocumentType>[] = [];
const docsMap: Map<string, RxDocumentData<RxDocumentType>> = new Map();
Array.from(rxDocumentMap.values()).forEach(rxDocument => {
Expand Down Expand Up @@ -595,138 +592,18 @@ export class RxCollectionBase<
* find a list documents by their primary key
* has way better performance then running multiple findOne() or a find() with a complex $or-selected
*/
async findByIds(
findByIds(
ids: string[]
): Promise<Map<string, RxDocument<RxDocumentType, OrmMethods>>> {
const ret = new Map<string, RxDocument<RxDocumentType, OrmMethods>>();
const mustBeQueried: string[] = [];

// first try to fill from docCache
ids.forEach(id => {
const docData = this._docCache.getLatestDocumentDataIfExists(id);
if (docData) {
const doc = this._docCache.getCachedRxDocument(docData);
ret.set(id, doc);
} else {
mustBeQueried.push(id);
}
});

// find everything which was not in docCache
if (mustBeQueried.length > 0) {
const docs = await this.storageInstance.findDocumentsById(mustBeQueried, false);
Object.values(docs).forEach(docData => {
const doc = this._docCache.getCachedRxDocument(docData);
ret.set(doc.primary, doc);
});
}
return ret;
}

/**
* like this.findByIds but returns an observable
* that always emits the current state
*/
findByIds$(
ids: string[]
): Observable<Map<string, RxDocument<RxDocumentType, OrmMethods>>> {
let currentValue: Map<string, RxDocument<RxDocumentType, OrmMethods>> | null = null;
let lastChangeEvent: number = -1;

/**
* Ensure we do not process events in parallel
*/
let queue: Promise<any> = PROMISE_RESOLVE_VOID;

const initialPromise = this.findByIds(ids).then(docsMap => {
lastChangeEvent = this._changeEventBuffer.counter;
currentValue = docsMap;
});
let firstEmitDone = false;

return this.$.pipe(
startWith(null),
/**
* Optimization shortcut.
* Do not proceed if the emitted RxChangeEvent
* is not relevant for the query.
*/
filter(changeEvent => {
if (
// first emit has no event
changeEvent &&
(
// local documents are not relevant for the query
changeEvent.isLocal ||
// document of the change is not in the ids list.
!ids.includes(changeEvent.documentId)
)
) {
return false;
} else {
return true;
): RxQuery<RxDocumentType, Map<string, RxDocument<RxDocumentType, OrmMethods>>> {
const mangoQuery: MangoQuery<RxDocumentType> = {
selector: {
[this.schema.primaryPath]: {
$in: ids.slice(0)
}
}),
mergeMap(() => initialPromise),
/**
* Because shareReplay with refCount: true
* will often subscribe/unsusbscribe
* we always ensure that we handled all missed events
* since the last subscription.
*/
mergeMap(() => {
queue = queue.then(async () => {
/**
* We first have to clone the Map
* to ensure we do not create side effects by mutating
* a Map that has already been returned before.
*/
currentValue = new Map(ensureNotFalsy(currentValue));
const missedChangeEvents = this._changeEventBuffer.getFrom(lastChangeEvent + 1);
lastChangeEvent = this._changeEventBuffer.counter;
if (missedChangeEvents === null) {
/**
* changeEventBuffer is of bounds -> we must re-execute over the database
* because we cannot calculate the new results just from the events.
*/
const newResult = await this.findByIds(ids);
lastChangeEvent = this._changeEventBuffer.counter;
return newResult;
} else {
let resultHasChanged = false;
missedChangeEvents
.forEach(rxChangeEvent => {
const docId = rxChangeEvent.documentId;
if (!ids.includes(docId)) {
// document is not relevant for the result set
return;
}
const op = rxChangeEvent.operation;
if (op === 'INSERT' || op === 'UPDATE') {
resultHasChanged = true;
const rxDocument = this._docCache.getCachedRxDocument(rxChangeEvent.documentData);
ensureNotFalsy(currentValue).set(docId, rxDocument);
} else {
if (ensureNotFalsy(currentValue).has(docId)) {
resultHasChanged = true;
ensureNotFalsy(currentValue).delete(docId);
}
}
});

// nothing happened that affects the result -> do not emit
if (!resultHasChanged && firstEmitDone) {
return false as any;
}
}
firstEmitDone = true;
return currentValue;
});
return queue;
}),
filter(x => !!x),
shareReplay(RXJS_SHARE_REPLAY_DEFAULTS)
);
}
};
const query = createRxQuery('findByIds', mangoQuery, this as any);
return query as any;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/rx-document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ export const basePrototype = {
}

if (schemaObj.type === 'array') {
return refCollection.findByIds(value).then(res => {
return refCollection.findByIds(value).exec().then(res => {
const valuesIterator = res.values();
return Array.from(valuesIterator) as any;
});
Expand Down
67 changes: 54 additions & 13 deletions src/rx-query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export class RxQueryBase<
docsData: RxDocumentData<RxDocType>[];
// A key->document map, used in the event reduce optimization.
docsDataMap: Map<string, RxDocType>;
docsMap: Map<string, RxDocument<RxDocType>>;
docs: RxDocument<RxDocType>[];
count: number;
/**
Expand Down Expand Up @@ -154,6 +155,8 @@ export class RxQueryBase<
} else if (this.op === 'findOne') {
// findOne()-queries emit RxDocument or null
return useResult.docs.length === 0 ? null : useResult.docs[0];
} else if (this.op === 'findByIds') {
return useResult.docsMap;
} else {
// find()-queries emit RxDocument[]
// Flat copy the array so it won't matter if the user modifies it.
Expand Down Expand Up @@ -204,34 +207,40 @@ export class RxQueryBase<
* set the new result-data as result-docs of the query
* @param newResultData json-docs that were received from pouchdb
*/
_setResultData(newResultData: RxDocumentData<RxDocType>[] | number): void {
_setResultData(newResultData: RxDocumentData<RxDocType>[] | number | Map<string, RxDocumentData<RxDocType>>): void {

if (typeof newResultData === 'number') {
this._result = {
docsData: [],
docsMap: new Map(),
docsDataMap: new Map(),
count: newResultData,
docs: [],
time: now()
};
return;
} else if (newResultData instanceof Map) {
newResultData = Array.from((newResultData as Map<string, RxDocumentData<RxDocType>>).values());
}

const docsDataMap = new Map();
const docsMap = new Map();
const docs = newResultData.map(docData => this.collection._docCache.getCachedRxDocument(docData));

/**
* Instead of using the newResultData in the result cache,
* we directly use the objects that are stored in the RxDocument
* to ensure we do not store the same data twice and fill up the memory.
*/
const docsDataMap = new Map();
const docsData = docs.map(doc => {
docsDataMap.set(doc.primary, doc._data);
docsMap.set(doc.primary, doc);
return doc._data;
});

this._result = {
docsData,
docsMap,
docsDataMap,
count: docsData.length,
docs,
Expand All @@ -243,26 +252,52 @@ export class RxQueryBase<
* executes the query on the database
* @return results-array with document-data
*/
_execOverDatabase(): Promise<RxDocumentData<RxDocType>[] | number> {
async _execOverDatabase(): Promise<RxDocumentData<RxDocType>[] | number> {
this._execOverDatabaseCount = this._execOverDatabaseCount + 1;
this._lastExecStart = now();


if (this.op === 'count') {
const preparedQuery = this.getPreparedQuery();
return this.collection.storageInstance.count(preparedQuery).then(result => {
if (result.mode === 'slow' && !this.collection.database.allowSlowCount) {
throw newRxError('QU14', {
collection: this.collection,
queryObj: this.mangoQuery
});
const result = await this.collection.storageInstance.count(preparedQuery);
if (result.mode === 'slow' && !this.collection.database.allowSlowCount) {
throw newRxError('QU14', {
collection: this.collection,
queryObj: this.mangoQuery
});
} else {
return result.count;
}
}

if (this.op === 'findByIds') {
const ids: string[] = ensureNotFalsy(this.mangoQuery.selector)[this.collection.schema.primaryPath].$in;
const ret = new Map<string, RxDocument<RxDocType>>();
const mustBeQueried: string[] = [];
// first try to fill from docCache
ids.forEach(id => {
const docData = this.collection._docCache.getLatestDocumentDataIfExists(id);
if (docData) {
if (!docData._deleted) {
const doc = this.collection._docCache.getCachedRxDocument(docData);
ret.set(id, doc);
}
} else {
return result.count;
mustBeQueried.push(id);
}

});
// everything which was not in docCache must be fetched from the storage
if (mustBeQueried.length > 0) {
const docs = await this.collection.storageInstance.findDocumentsById(mustBeQueried, false);
Object.values(docs).forEach(docData => {
const doc = this.collection._docCache.getCachedRxDocument(docData);
ret.set(doc.primary, doc);
});
}
return ret as any;
}


const docsPromise = queryCollection<RxDocType>(this as any);
return docsPromise.then(docs => {
this._lastExecEnd = now();
Expand Down Expand Up @@ -649,8 +684,14 @@ export async function queryCollection<RxDocType>(
*/
if (rxQuery.isFindOneByIdQuery) {
const docId = rxQuery.isFindOneByIdQuery;
const docsMap = await collection.storageInstance.findDocumentsById([docId], false);
const docData = docsMap[docId];

// first try to fill from docCache
let docData = rxQuery.collection._docCache.getLatestDocumentDataIfExists(docId);
if (!docData) {
// otherwise get from storage
const docsMap = await collection.storageInstance.findDocumentsById([docId], false);
docData = docsMap[docId];
}
if (docData) {
docs.push(docData);
}
Expand Down
2 changes: 1 addition & 1 deletion src/types/rx-query.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export type MangoQuery<RxDocType = any> = MangoQueryNoLimit<RxDocType> & {
limit?: number;
};

export type RxQueryOP = 'find' | 'findOne' | 'count';
export type RxQueryOP = 'find' | 'findOne' | 'count' | 'findByIds';

export declare class RxQuery<RxDocumentType = any, RxQueryResult = RxDocumentType | RxDocumentType[]> extends RxQueryBase<RxDocumentType, RxQueryResult> {
equals(queryObj: any): RxQuery<RxDocumentType, RxQueryResult>;
Expand Down
Loading

0 comments on commit 6adb2ed

Please sign in to comment.