Skip to content

Commit

Permalink
fix: recursive Delete: backport changes from Java
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Chen committed May 25, 2021
1 parent 245c3a9 commit bd96583
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 34 deletions.
26 changes: 24 additions & 2 deletions dev/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,11 @@ const serviceConfig = interfaces['google.firestore.v1.Firestore'];

import api = google.firestore.v1;
import {CollectionGroup} from './collection-group';
import {RecursiveDelete} from './recursive-delete';
import {
RECURSIVE_DELETE_MAX_PENDING_OPS,
RECURSIVE_DELETE_MIN_PENDING_OPS,
RecursiveDelete
} from './recursive-delete';

export {
CollectionReference,
Expand Down Expand Up @@ -1251,8 +1255,26 @@ export class Firestore implements firestore.Firestore {
| firestore.DocumentReference<unknown>,
bulkWriter?: BulkWriter
): Promise<void> {
return this._recursiveDelete(ref, RECURSIVE_DELETE_MAX_PENDING_OPS, RECURSIVE_DELETE_MIN_PENDING_OPS, bulkWriter);
}

/**
* This overload is not private in order to test the query resumption with
* startAfter() once the RecursiveDelete instance has MAX_PENDING_OPS pending.
*
* @private
*/
// Visible for testing
_recursiveDelete(
ref:
| firestore.CollectionReference<unknown>
| firestore.DocumentReference<unknown>,
maxPendingOps: number,
minPendingOps: number,
bulkWriter?: BulkWriter
): Promise<void> {
const writer = bulkWriter ?? this.getBulkWriter();
const deleter = new RecursiveDelete(this, writer, ref);
const deleter = new RecursiveDelete(this, writer, ref, maxPendingOps, minPendingOps);
return deleter.run();
}

Expand Down
43 changes: 30 additions & 13 deletions dev/src/recursive-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export const REFERENCE_NAME_MIN_ID = '__id-9223372036854775808__';
* from streaming documents faster than Firestore can delete.
*/
// Visible for testing.
export const MAX_PENDING_OPS = 5000;
export const RECURSIVE_DELETE_MAX_PENDING_OPS = 5000;

/**
* The number of pending BulkWriter operations at which RecursiveDelete
Expand All @@ -57,7 +57,7 @@ export const MAX_PENDING_OPS = 5000;
* throughput. This helps prevent BulkWriter from idling while Firestore
* fetches the next query.
*/
const MIN_PENDING_OPS = 1000;
export const RECURSIVE_DELETE_MIN_PENDING_OPS = 1000;

/**
* Class used to store state required for running a recursive delete operation.
Expand All @@ -84,6 +84,21 @@ export class RecursiveDelete {
*/
private documentsPending = true;

/**
* Whether run() has been called.
* @private
*/
private started = false;

/** Query limit to use when fetching all descendants. */
private maxPendingOps: number;

/**
* The number of pending BulkWriter operations at which RecursiveDelete starts the next limit
* query to fetch descendants.
*/
private minPendingOps: number;

/**
* A deferred promise that resolves when the recursive delete operation
* is completed.
Expand Down Expand Up @@ -125,8 +140,13 @@ export class RecursiveDelete {
private readonly writer: BulkWriter,
private readonly ref:
| firestore.CollectionReference<unknown>
| firestore.DocumentReference<unknown>
) {}
| firestore.DocumentReference<unknown>,
private readonly maxLimit: number,
private readonly minLimit: number,
) {
this.maxPendingOps = maxLimit;
this.minPendingOps = minLimit;
}

/**
* Recursively deletes the reference provided in the class constructor.
Expand All @@ -135,8 +155,8 @@ export class RecursiveDelete {
*/
run(): Promise<void> {
assert(
this.documentsPending,
'The recursive delete operation has already been completed.'
!this.started,
'RecursiveDelete.run() should only be called once.'
);

// Capture the error stack to preserve stack tracing across async calls.
Expand All @@ -152,12 +172,10 @@ export class RecursiveDelete {
* @private
*/
private setupStream(): void {
const limit = MAX_PENDING_OPS;
const stream = this.getAllDescendants(
this.ref instanceof CollectionReference
? (this.ref as CollectionReference<unknown>)
: (this.ref as DocumentReference<unknown>),
limit
);
this.streamInProgress = true;
let streamedDocsCount = 0;
Expand All @@ -177,7 +195,7 @@ export class RecursiveDelete {
this.streamInProgress = false;
// If there are fewer than the number of documents specified in the
// limit() field, we know that the query is complete.
if (streamedDocsCount < limit) {
if (streamedDocsCount < this.minPendingOps) {
this.onQueryEnd();
} else if (this.pendingOpsCount === 0) {
this.setupStream();
Expand All @@ -193,8 +211,7 @@ export class RecursiveDelete {
* @return {Stream<QueryDocumentSnapshot>} Stream of descendant documents.
*/
private getAllDescendants(
ref: CollectionReference<unknown> | DocumentReference<unknown>,
limit: number
ref: CollectionReference<unknown> | DocumentReference<unknown>
): NodeJS.ReadableStream {
// The parent is the closest ancestor document to the location we're
// deleting. If we are deleting a document, the parent is the path of that
Expand All @@ -220,7 +237,7 @@ export class RecursiveDelete {
);

// Query for names only to fetch empty snapshots.
query = query.select(FieldPath.documentId()).limit(limit);
query = query.select(FieldPath.documentId()).limit(this.maxPendingOps);

if (ref instanceof CollectionReference) {
// To find all descendants of a collection reference, we need to use a
Expand Down Expand Up @@ -300,7 +317,7 @@ export class RecursiveDelete {
if (
this.documentsPending &&
!this.streamInProgress &&
this.pendingOpsCount < MIN_PENDING_OPS
this.pendingOpsCount < this.minPendingOps
) {
this.setupStream();
}
Expand Down
74 changes: 55 additions & 19 deletions dev/test/recursive-delete.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ import {
import {MAX_REQUEST_RETRIES} from '../src';

import api = google.firestore.v1;
import {MAX_PENDING_OPS, REFERENCE_NAME_MIN_ID} from '../src/recursive-delete';
import {RECURSIVE_DELETE_MAX_PENDING_OPS, REFERENCE_NAME_MIN_ID} from '../src/recursive-delete';
import {Deferred} from "../src/util";

const PROJECT_ID = 'test-project';
const DATABASE_ROOT = `projects/${PROJECT_ID}/databases/(default)`;
Expand Down Expand Up @@ -140,7 +141,7 @@ describe('recursiveDelete() method:', () => {
'LESS_THAN',
endAt('root')
),
limit(MAX_PENDING_OPS)
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
);
return stream();
},
Expand All @@ -165,7 +166,7 @@ describe('recursiveDelete() method:', () => {
'LESS_THAN',
endAt('root/doc/nestedCol')
),
limit(MAX_PENDING_OPS)
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
);
return stream();
},
Expand All @@ -184,7 +185,7 @@ describe('recursiveDelete() method:', () => {
'root/doc',
select('__name__'),
allDescendants(/* kindless= */ true),
limit(MAX_PENDING_OPS)
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
);
return stream();
},
Expand Down Expand Up @@ -222,7 +223,7 @@ describe('recursiveDelete() method:', () => {
'LESS_THAN',
endAt('root')
),
limit(MAX_PENDING_OPS)
limit(RECURSIVE_DELETE_MAX_PENDING_OPS)
);
return stream();
}
Expand All @@ -235,10 +236,32 @@ describe('recursiveDelete() method:', () => {
});

it('creates a second query with the correct startAfter', async () => {
const firstStream = Array.from(Array(MAX_PENDING_OPS).keys()).map(
// This test checks that the second query is created with the correct
// startAfter() once the RecursiveDelete instance is below the
// MIN_PENDING_OPS threshold to send the next batch. Use lower limits
// than the actual RecursiveDelete class in order to make this test run fast.
const maxPendingOps = 100;
const minPendingOps = 11;
const maxBatchSize = 10;
const cutoff = maxPendingOps - minPendingOps;
let numDeletesBuffered = 0;

// This deferred promise is used to delay the BatchWriteResponses from
// returning in order to create the situation where the number of pending
// operations is less than `minPendingOps`.
const bufferDeferred = new Deferred<void>();

// This deferred completes when the second query is run.
const secondQueryDeferred = new Deferred<void>();

const firstStream = Array.from(Array(maxPendingOps).keys()).map(
(_, i) => result('doc' + i)
);

const batchWriteResponse = mergeResponses(
Array.from(Array(maxBatchSize).keys()).map(() => successResponse(1))
);

// Use an array to store that the queryEquals() method succeeded, since
// thrown errors do not result in the recursiveDelete() method failing.
const called: number[] = [];
Expand All @@ -257,7 +280,7 @@ describe('recursiveDelete() method:', () => {
'LESS_THAN',
endAt('root')
),
limit(MAX_PENDING_OPS)
limit(maxPendingOps)
);
called.push(1);
return stream(...firstStream);
Expand All @@ -279,34 +302,47 @@ describe('recursiveDelete() method:', () => {
referenceValue:
`projects/${PROJECT_ID}/databases/(default)/` +
'documents/collectionId/doc' +
(MAX_PENDING_OPS - 1),
(maxPendingOps - 1),
}),
limit(MAX_PENDING_OPS)
limit(maxPendingOps)
);
called.push(2);
secondQueryDeferred.resolve();
return stream();
} else {
called.push(3);
return stream();
}
},
batchWrite: () => {
const responses = mergeResponses(
Array.from(Array(500).keys()).map(() => successResponse(1))
);
return response({
writeResults: responses.writeResults,
status: responses.status,
const returnedResponse = response({
writeResults: batchWriteResponse.writeResults,
status: batchWriteResponse.status,
});
if (numDeletesBuffered < cutoff) {
numDeletesBuffered += batchWriteResponse.writeResults!.length;

// By waiting for `bufferFuture` to complete, we can guarantee that
// the writes complete after all documents are streamed. Without
// this future, the test can race and complete the writes before
// the stream is finished, which is a different scenario this test
// is not for.
return bufferDeferred.promise.then(() => returnedResponse);
} else {
// Once there are `cutoff` pending deletes, completing the future
// allows enough responses to be returned such that the number of
// pending deletes should be less than `minPendingOps`. This allows
// us to test that the second query is made.
bufferDeferred.resolve();
return secondQueryDeferred.promise.then(() => returnedResponse);
}
},
};
const firestore = await createInstance(overrides);

// Use a custom batch size with BulkWriter to simplify the dummy
// batchWrite() response logic.
const bulkWriter = firestore.bulkWriter();
bulkWriter._maxBatchSize = 500;
await firestore.recursiveDelete(firestore.collection('root'), bulkWriter);
bulkWriter._maxBatchSize = maxBatchSize;
await firestore._recursiveDelete(firestore.collection('root'), maxPendingOps, minPendingOps, bulkWriter);
expect(called).to.deep.equal([1, 2]);
});
});
Expand Down

0 comments on commit bd96583

Please sign in to comment.