diff --git a/dev/src/index.ts b/dev/src/index.ts index d6ab41765..6088be6f6 100644 --- a/dev/src/index.ts +++ b/dev/src/index.ts @@ -76,7 +76,11 @@ const serviceConfig = interfaces['google.firestore.v1.Firestore']; import api = google.firestore.v1; import {CollectionGroup} from './collection-group'; -import {RecursiveDelete} from './recursive-delete'; +import { + RECURSIVE_DELETE_MAX_PENDING_OPS, + RECURSIVE_DELETE_MIN_PENDING_OPS, + RecursiveDelete, +} from './recursive-delete'; export { CollectionReference, @@ -1250,9 +1254,38 @@ export class Firestore implements firestore.Firestore { | firestore.CollectionReference | firestore.DocumentReference, bulkWriter?: BulkWriter + ): Promise { + return this._recursiveDelete( + ref, + RECURSIVE_DELETE_MAX_PENDING_OPS, + RECURSIVE_DELETE_MIN_PENDING_OPS, + bulkWriter + ); + } + + /** + * This overload is not private in order to test the query resumption with + * startAfter() once the RecursiveDelete instance has MAX_PENDING_OPS pending. + * + * @private + */ + // Visible for testing + _recursiveDelete( + ref: + | firestore.CollectionReference + | firestore.DocumentReference, + maxPendingOps: number, + minPendingOps: number, + bulkWriter?: BulkWriter ): Promise { const writer = bulkWriter ?? this.getBulkWriter(); - const deleter = new RecursiveDelete(this, writer, ref); + const deleter = new RecursiveDelete( + this, + writer, + ref, + maxPendingOps, + minPendingOps + ); return deleter.run(); } diff --git a/dev/src/recursive-delete.ts b/dev/src/recursive-delete.ts index dd14ed949..97071b7ac 100644 --- a/dev/src/recursive-delete.ts +++ b/dev/src/recursive-delete.ts @@ -48,7 +48,7 @@ export const REFERENCE_NAME_MIN_ID = '__id-9223372036854775808__'; * from streaming documents faster than Firestore can delete. */ // Visible for testing. -export const MAX_PENDING_OPS = 5000; +export const RECURSIVE_DELETE_MAX_PENDING_OPS = 5000; /** * The number of pending BulkWriter operations at which RecursiveDelete @@ -57,7 +57,7 @@ export const MAX_PENDING_OPS = 5000; * throughput. This helps prevent BulkWriter from idling while Firestore * fetches the next query. */ -const MIN_PENDING_OPS = 1000; +export const RECURSIVE_DELETE_MIN_PENDING_OPS = 1000; /** * Class used to store state required for running a recursive delete operation. @@ -84,6 +84,25 @@ export class RecursiveDelete { */ private documentsPending = true; + /** + * Whether run() has been called. + * @private + */ + private started = false; + + /** + * Query limit to use when fetching all descendants. + * @private + */ + private readonly maxPendingOps: number; + + /** + * The number of pending BulkWriter operations at which RecursiveDelete + * starts the next limit query to fetch descendants. + * @private + */ + private readonly minPendingOps: number; + /** * A deferred promise that resolves when the recursive delete operation * is completed. @@ -119,14 +138,22 @@ export class RecursiveDelete { * @param firestore The Firestore instance to use. * @param writer The BulkWriter instance to use for delete operations. * @param ref The document or collection reference to recursively delete. + * @param maxLimit The query limit to use when fetching descendants + * @param minLimit The number of pending BulkWriter operations at which + * RecursiveDelete starts the next limit query to fetch descendants. */ constructor( private readonly firestore: Firestore, private readonly writer: BulkWriter, private readonly ref: | firestore.CollectionReference - | firestore.DocumentReference - ) {} + | firestore.DocumentReference, + private readonly maxLimit: number, + private readonly minLimit: number + ) { + this.maxPendingOps = maxLimit; + this.minPendingOps = minLimit; + } /** * Recursively deletes the reference provided in the class constructor. @@ -134,10 +161,7 @@ export class RecursiveDelete { * if an error occurs. */ run(): Promise { - assert( - this.documentsPending, - 'The recursive delete operation has already been completed.' - ); + assert(!this.started, 'RecursiveDelete.run() should only be called once.'); // Capture the error stack to preserve stack tracing across async calls. this.errorStack = Error().stack!; @@ -152,12 +176,10 @@ export class RecursiveDelete { * @private */ private setupStream(): void { - const limit = MAX_PENDING_OPS; const stream = this.getAllDescendants( this.ref instanceof CollectionReference ? (this.ref as CollectionReference) - : (this.ref as DocumentReference), - limit + : (this.ref as DocumentReference) ); this.streamInProgress = true; let streamedDocsCount = 0; @@ -177,7 +199,7 @@ export class RecursiveDelete { this.streamInProgress = false; // If there are fewer than the number of documents specified in the // limit() field, we know that the query is complete. - if (streamedDocsCount < limit) { + if (streamedDocsCount < this.minPendingOps) { this.onQueryEnd(); } else if (this.pendingOpsCount === 0) { this.setupStream(); @@ -188,13 +210,11 @@ export class RecursiveDelete { /** * Retrieves all descendant documents nested under the provided reference. * @param ref The reference to fetch all descendants for. - * @param limit The number of descendants to fetch in the query. * @private * @return {Stream} Stream of descendant documents. */ private getAllDescendants( - ref: CollectionReference | DocumentReference, - limit: number + ref: CollectionReference | DocumentReference ): NodeJS.ReadableStream { // The parent is the closest ancestor document to the location we're // deleting. If we are deleting a document, the parent is the path of that @@ -220,7 +240,7 @@ export class RecursiveDelete { ); // Query for names only to fetch empty snapshots. - query = query.select(FieldPath.documentId()).limit(limit); + query = query.select(FieldPath.documentId()).limit(this.maxPendingOps); if (ref instanceof CollectionReference) { // To find all descendants of a collection reference, we need to use a @@ -300,7 +320,7 @@ export class RecursiveDelete { if ( this.documentsPending && !this.streamInProgress && - this.pendingOpsCount < MIN_PENDING_OPS + this.pendingOpsCount < this.minPendingOps ) { this.setupStream(); } diff --git a/dev/test/recursive-delete.ts b/dev/test/recursive-delete.ts index 62e2d85a4..0d0840e36 100644 --- a/dev/test/recursive-delete.ts +++ b/dev/test/recursive-delete.ts @@ -50,7 +50,11 @@ import { import {MAX_REQUEST_RETRIES} from '../src'; import api = google.firestore.v1; -import {MAX_PENDING_OPS, REFERENCE_NAME_MIN_ID} from '../src/recursive-delete'; +import { + RECURSIVE_DELETE_MAX_PENDING_OPS, + REFERENCE_NAME_MIN_ID, +} from '../src/recursive-delete'; +import {Deferred} from '../src/util'; const PROJECT_ID = 'test-project'; const DATABASE_ROOT = `projects/${PROJECT_ID}/databases/(default)`; @@ -140,7 +144,7 @@ describe('recursiveDelete() method:', () => { 'LESS_THAN', endAt('root') ), - limit(MAX_PENDING_OPS) + limit(RECURSIVE_DELETE_MAX_PENDING_OPS) ); return stream(); }, @@ -165,7 +169,7 @@ describe('recursiveDelete() method:', () => { 'LESS_THAN', endAt('root/doc/nestedCol') ), - limit(MAX_PENDING_OPS) + limit(RECURSIVE_DELETE_MAX_PENDING_OPS) ); return stream(); }, @@ -184,7 +188,7 @@ describe('recursiveDelete() method:', () => { 'root/doc', select('__name__'), allDescendants(/* kindless= */ true), - limit(MAX_PENDING_OPS) + limit(RECURSIVE_DELETE_MAX_PENDING_OPS) ); return stream(); }, @@ -222,7 +226,7 @@ describe('recursiveDelete() method:', () => { 'LESS_THAN', endAt('root') ), - limit(MAX_PENDING_OPS) + limit(RECURSIVE_DELETE_MAX_PENDING_OPS) ); return stream(); } @@ -235,8 +239,32 @@ describe('recursiveDelete() method:', () => { }); it('creates a second query with the correct startAfter', async () => { - const firstStream = Array.from(Array(MAX_PENDING_OPS).keys()).map( - (_, i) => result('doc' + i) + // This test checks that the second query is created with the correct + // startAfter() once the RecursiveDelete instance is below the + // MIN_PENDING_OPS threshold to send the next batch. Use lower limits + // than the actual RecursiveDelete class in order to make this test run fast. + const maxPendingOps = 100; + const minPendingOps = 11; + const maxBatchSize = 10; + const cutoff = maxPendingOps - minPendingOps; + let numDeletesBuffered = 0; + + // This deferred promise is used to delay the BatchWriteResponses from + // returning in order to create the situation where the number of pending + // operations is less than `minPendingOps`. + const bufferDeferred = new Deferred(); + + // This deferred completes when the second query is run. + const secondQueryDeferred = new Deferred(); + + const nLengthArray = (n: number): number[] => Array.from(Array(n).keys()); + + const firstStream = nLengthArray(maxPendingOps).map((_, i) => + result('doc' + i) + ); + + const batchWriteResponse = mergeResponses( + nLengthArray(maxBatchSize).map(() => successResponse(1)) ); // Use an array to store that the queryEquals() method succeeded, since @@ -257,7 +285,7 @@ describe('recursiveDelete() method:', () => { 'LESS_THAN', endAt('root') ), - limit(MAX_PENDING_OPS) + limit(maxPendingOps) ); called.push(1); return stream(...firstStream); @@ -279,11 +307,12 @@ describe('recursiveDelete() method:', () => { referenceValue: `projects/${PROJECT_ID}/databases/(default)/` + 'documents/collectionId/doc' + - (MAX_PENDING_OPS - 1), + (maxPendingOps - 1), }), - limit(MAX_PENDING_OPS) + limit(maxPendingOps) ); called.push(2); + secondQueryDeferred.resolve(); return stream(); } else { called.push(3); @@ -291,22 +320,39 @@ describe('recursiveDelete() method:', () => { } }, batchWrite: () => { - const responses = mergeResponses( - Array.from(Array(500).keys()).map(() => successResponse(1)) - ); - return response({ - writeResults: responses.writeResults, - status: responses.status, + const returnedResponse = response({ + writeResults: batchWriteResponse.writeResults, + status: batchWriteResponse.status, }); + if (numDeletesBuffered < cutoff) { + numDeletesBuffered += batchWriteResponse.writeResults!.length; + + // By waiting for `bufferFuture` to complete, we can guarantee that + // the writes complete after all documents are streamed. Without + // this future, the test can race and complete the writes before + // the stream is finished, which is a different scenario this test + // is not for. + return bufferDeferred.promise.then(() => returnedResponse); + } else { + // Once there are `cutoff` pending deletes, completing the future + // allows enough responses to be returned such that the number of + // pending deletes should be less than `minPendingOps`. This allows + // us to test that the second query is made. + bufferDeferred.resolve(); + return secondQueryDeferred.promise.then(() => returnedResponse); + } }, }; const firestore = await createInstance(overrides); - // Use a custom batch size with BulkWriter to simplify the dummy - // batchWrite() response logic. const bulkWriter = firestore.bulkWriter(); - bulkWriter._maxBatchSize = 500; - await firestore.recursiveDelete(firestore.collection('root'), bulkWriter); + bulkWriter._maxBatchSize = maxBatchSize; + await firestore._recursiveDelete( + firestore.collection('root'), + maxPendingOps, + minPendingOps, + bulkWriter + ); expect(called).to.deep.equal([1, 2]); }); });