Skip to content

Commit

Permalink
Add Snapshot Listeners to firestore-exp (#3317)
Browse files Browse the repository at this point in the history
  • Loading branch information
schmidt-sebastian committed Jun 30, 2020
1 parent fe2bd0f commit f54abc2
Show file tree
Hide file tree
Showing 7 changed files with 220 additions and 48 deletions.
2 changes: 2 additions & 0 deletions .changeset/seven-crabs-join.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
---
---
18 changes: 10 additions & 8 deletions packages/firestore/exp/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ export function updateDoc(
): Promise<void>;
export function deleteDoc(reference: DocumentReference<unknown>): Promise<void>;

// TODO(firestoreexp): Update API Proposal to use FirestoreError in these
// callbacks
export function onSnapshot<T>(
reference: DocumentReference<T>,
observer: {
Expand All @@ -375,28 +377,28 @@ export function onSnapshot<T>(
options: SnapshotListenOptions,
observer: {
next?: (snapshot: DocumentSnapshot<T>) => void;
error?: (error: Error) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): () => void;
export function onSnapshot<T>(
reference: DocumentReference<T>,
onNext: (snapshot: DocumentSnapshot<T>) => void,
onError?: (error: Error) => void,
onError?: (error: FirestoreError) => void,
onCompletion?: () => void
): () => void;
export function onSnapshot<T>(
reference: DocumentReference<T>,
options: SnapshotListenOptions,
onNext: (snapshot: DocumentSnapshot<T>) => void,
onError?: (error: Error) => void,
onError?: (error: FirestoreError) => void,
onCompletion?: () => void
): () => void;
export function onSnapshot<T>(
query: Query<T>,
observer: {
next?: (snapshot: QuerySnapshot<T>) => void;
error?: (error: Error) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): () => void;
Expand All @@ -405,28 +407,28 @@ export function onSnapshot<T>(
options: SnapshotListenOptions,
observer: {
next?: (snapshot: QuerySnapshot<T>) => void;
error?: (error: Error) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): () => void;
export function onSnapshot<T>(
query: Query<T>,
onNext: (snapshot: QuerySnapshot<T>) => void,
onError?: (error: Error) => void,
onError?: (error: FirestoreError) => void,
onCompletion?: () => void
): () => void;
export function onSnapshot<T>(
query: Query<T>,
options: SnapshotListenOptions,
onNext: (snapshot: QuerySnapshot<T>) => void,
onError?: (error: Error) => void,
onError?: (error: FirestoreError) => void,
onCompletion?: () => void
): () => void;
export function onSnapshotsInSync(
firestore: FirebaseFirestore,
observer: {
next?: (value: void) => void;
error?: (error: Error) => void;
error?: (error: FirestoreError) => void;
complete?: () => void;
}
): () => void;
Expand Down
11 changes: 10 additions & 1 deletion packages/firestore/exp/index.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,16 @@ export {

export { runTransaction, Transaction } from '../lite/src/api/transaction';

export { getDoc, getDocFromCache, getDocFromServer } from './src/api/reference';
export {
getDoc,
getDocFromCache,
getDocFromServer,
onSnapshot,
setDoc,
updateDoc,
deleteDoc,
addDoc
} from './src/api/reference';

export {
FieldValue,
Expand Down
189 changes: 170 additions & 19 deletions packages/firestore/exp/src/api/reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ import { debugAssert } from '../../../src/util/assert';
import { cast } from '../../../lite/src/api/util';
import { DocumentSnapshot, QuerySnapshot } from './snapshot';
import {
addDocSnapshotListener,
addQuerySnapshotListener,
applyFirestoreDataConverter,
getDocsViaSnapshotListener,
getDocViaSnapshotListener,
SnapshotMetadata
SnapshotMetadata,
validateHasExplicitOrderByForLimitToLast
} from '../../../src/api/database';
import { ViewSnapshot } from '../../../src/core/view_snapshot';
import {
Expand All @@ -44,6 +47,14 @@ import {
import { Document } from '../../../src/model/document';
import { DeleteMutation, Precondition } from '../../../src/model/mutation';
import { FieldPath } from '../../../src/api/field_path';
import {
CompleteFn,
ErrorFn,
isPartialObserver,
NextFn,
PartialObserver,
Unsubscribe
} from '../../../src/api/observer';

export function getDoc<T>(
reference: firestore.DocumentReference<T>
Expand Down Expand Up @@ -101,17 +112,14 @@ export function getQuery<T>(
): Promise<QuerySnapshot<T>> {
const internalQuery = cast<Query<T>>(query, Query);
const firestore = cast<Firestore>(query.firestore, Firestore);

validateHasExplicitOrderByForLimitToLast(internalQuery._query);
return firestore._getFirestoreClient().then(async firestoreClient => {
const snapshot = await getDocsViaSnapshotListener(
firestoreClient,
internalQuery._query
);
return new QuerySnapshot(
firestore,
internalQuery,
snapshot,
new SnapshotMetadata(snapshot.hasPendingWrites, snapshot.fromCache)
);
return new QuerySnapshot(firestore, internalQuery, snapshot);
});
}

Expand All @@ -124,12 +132,7 @@ export function getQueryFromCache<T>(
const snapshot = await firestoreClient.getDocumentsFromLocalCache(
internalQuery._query
);
return new QuerySnapshot(
firestore,
internalQuery,
snapshot,
new SnapshotMetadata(snapshot.hasPendingWrites, /* fromCache= */ true)
);
return new QuerySnapshot(firestore, internalQuery, snapshot);
});
}

Expand All @@ -144,12 +147,7 @@ export function getQueryFromServer<T>(
internalQuery._query,
{ source: 'server' }
);
return new QuerySnapshot(
firestore,
internalQuery,
snapshot,
new SnapshotMetadata(snapshot.hasPendingWrites, snapshot.fromCache)
);
return new QuerySnapshot(firestore, internalQuery, snapshot);
});
}

Expand Down Expand Up @@ -280,6 +278,159 @@ export function addDoc<T>(
.then(() => docRef);
}

// TODO(firestorexp): Make sure these overloads are tested via the Firestore
// integration tests
export function onSnapshot<T>(
reference: firestore.DocumentReference<T>,
observer: {
next?: (snapshot: firestore.DocumentSnapshot<T>) => void;
error?: (error: firestore.FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe;
export function onSnapshot<T>(
reference: firestore.DocumentReference<T>,
options: firestore.SnapshotListenOptions,
observer: {
next?: (snapshot: firestore.DocumentSnapshot<T>) => void;
error?: (error: firestore.FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe;
export function onSnapshot<T>(
reference: firestore.DocumentReference<T>,
onNext: (snapshot: firestore.DocumentSnapshot<T>) => void,
onError?: (error: firestore.FirestoreError) => void,
onCompletion?: () => void
): Unsubscribe;
export function onSnapshot<T>(
reference: firestore.DocumentReference<T>,
options: firestore.SnapshotListenOptions,
onNext: (snapshot: firestore.DocumentSnapshot<T>) => void,
onError?: (error: firestore.FirestoreError) => void,
onCompletion?: () => void
): Unsubscribe;
export function onSnapshot<T>(
query: firestore.Query<T>,
observer: {
next?: (snapshot: firestore.QuerySnapshot<T>) => void;
error?: (error: firestore.FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe;
export function onSnapshot<T>(
query: firestore.Query<T>,
options: firestore.SnapshotListenOptions,
observer: {
next?: (snapshot: firestore.QuerySnapshot<T>) => void;
error?: (error: firestore.FirestoreError) => void;
complete?: () => void;
}
): Unsubscribe;
export function onSnapshot<T>(
query: firestore.Query<T>,
onNext: (snapshot: firestore.QuerySnapshot<T>) => void,
onError?: (error: firestore.FirestoreError) => void,
onCompletion?: () => void
): Unsubscribe;
export function onSnapshot<T>(
query: firestore.Query<T>,
options: firestore.SnapshotListenOptions,
onNext: (snapshot: firestore.QuerySnapshot<T>) => void,
onError?: (error: firestore.FirestoreError) => void,
onCompletion?: () => void
): Unsubscribe;
export function onSnapshot<T>(
ref: firestore.Query<T> | firestore.DocumentReference<T>,
...args: unknown[]
): Unsubscribe {
let options: firestore.SnapshotListenOptions = {
includeMetadataChanges: false
};
let currArg = 0;
if (typeof args[currArg] === 'object' && !isPartialObserver(args[currArg])) {
options = args[currArg] as firestore.SnapshotListenOptions;
currArg++;
}

const internalOptions = {
includeMetadataChanges: options.includeMetadataChanges
};

if (isPartialObserver(args[currArg])) {
const userObserver = args[currArg] as PartialObserver<
firestore.QuerySnapshot<T>
>;
args[currArg] = userObserver.next?.bind(userObserver);
args[currArg + 1] = userObserver.error?.bind(userObserver);
args[currArg + 2] = userObserver.complete?.bind(userObserver);
}

let asyncObserver: Promise<Unsubscribe>;

if (ref instanceof DocumentReference) {
const firestore = cast(ref.firestore, Firestore);

const observer: PartialObserver<ViewSnapshot> = {
next: snapshot => {
if (args[currArg]) {
(args[currArg] as NextFn<firestore.DocumentSnapshot<T>>)(
convertToDocSnapshot(firestore, ref, snapshot)
);
}
},
error: args[currArg + 1] as ErrorFn,
complete: args[currArg + 2] as CompleteFn
};

asyncObserver = firestore
._getFirestoreClient()
.then(firestoreClient =>
addDocSnapshotListener(
firestoreClient,
ref._key,
internalOptions,
observer
)
);
} else {
const query = cast<Query<T>>(ref, Query);
const firestore = cast(query, Firestore);

const observer: PartialObserver<ViewSnapshot> = {
next: snapshot => {
if (args[currArg]) {
(args[currArg] as NextFn<firestore.QuerySnapshot<T>>)(
new QuerySnapshot(firestore, query, snapshot)
);
}
},
error: args[currArg + 1] as ErrorFn,
complete: args[currArg + 2] as CompleteFn
};

validateHasExplicitOrderByForLimitToLast(query._query);

asyncObserver = firestore
._getFirestoreClient()
.then(firestoreClient =>
addQuerySnapshotListener(
firestoreClient,
query._query,
internalOptions,
observer
)
);
}

// TODO(firestorexp): Add test that verifies that we don't raise a snapshot if
// unsubscribe is called before `asyncObserver` resolves.
return () => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
asyncObserver.then(unsubscribe => unsubscribe());
};
}

/**
* Converts a ViewSnapshot that contains the single document specified by `ref`
* to a DocumentSnapshot.
Expand Down
14 changes: 10 additions & 4 deletions packages/firestore/exp/src/api/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,21 @@ export class QueryDocumentSnapshot<T = firestore.DocumentData>

export class QuerySnapshot<T = firestore.DocumentData>
implements firestore.QuerySnapshot<T> {
readonly metadata: SnapshotMetadata;

private _cachedChanges?: Array<firestore.DocumentChange<T>>;
private _cachedChangesIncludeMetadataChanges?: boolean;

constructor(
readonly _firestore: Firestore,
readonly query: Query<T>,
readonly _snapshot: ViewSnapshot,
readonly metadata: SnapshotMetadata
) {}
readonly _snapshot: ViewSnapshot
) {
this.metadata = new SnapshotMetadata(
_snapshot.hasPendingWrites,
_snapshot.fromCache
);
}

get docs(): Array<firestore.QueryDocumentSnapshot<T>> {
const result: Array<firestore.QueryDocumentSnapshot<T>> = [];
Expand All @@ -154,7 +160,7 @@ export class QuerySnapshot<T = firestore.DocumentData>
thisArg,
this._convertToDocumentSnapshot(
doc,
this.metadata.fromCache,
this._snapshot.fromCache,
this._snapshot.mutatedKeys.has(doc.key)
)
);
Expand Down
4 changes: 3 additions & 1 deletion packages/firestore/lite/src/api/reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ import { hardAssert } from '../../../src/util/assert';
import { DeleteMutation, Precondition } from '../../../src/model/mutation';
import {
applyFirestoreDataConverter,
BaseQuery
BaseQuery,
validateHasExplicitOrderByForLimitToLast
} from '../../../src/api/database';
import { FieldPath } from './field_path';
import { cast } from './util';
Expand Down Expand Up @@ -417,6 +418,7 @@ export function getQuery<T>(
query: firestore.Query<T>
): Promise<firestore.QuerySnapshot<T>> {
const internalQuery = cast<Query<T>>(query, Query);
validateHasExplicitOrderByForLimitToLast(internalQuery._query);
return internalQuery.firestore._getDatastore().then(async datastore => {
const result = await invokeRunQueryRpc(datastore, internalQuery._query);
const docs = result.map(
Expand Down
Loading

0 comments on commit f54abc2

Please sign in to comment.