Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reduce memory footprint of deleteFiles by utilizing getFilesStre… #2147

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 26 additions & 8 deletions src/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2028,6 +2028,7 @@ class Bucket extends ServiceObject {
}

const MAX_PARALLEL_LIMIT = 10;
const MAX_QUEUE_SIZE = 1000;
const errors = [] as Error[];

const deleteFile = (file: File) => {
Expand All @@ -2039,15 +2040,32 @@ class Bucket extends ServiceObject {
});
};

this.getFiles(query)
.then(([files]) => {
(async () => {
try {
let promises = [];
const limit = pLimit(MAX_PARALLEL_LIMIT);
const promises = files!.map(file => {
return limit(() => deleteFile(file));
});
return Promise.all(promises);
})
.then(() => callback!(errors.length > 0 ? errors : null), callback!);
const filesStream = this.getFilesStream(query);

for await (const curFile of filesStream) {
if (promises.length >= MAX_QUEUE_SIZE) {
Copy link

@timscottbell timscottbell Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is both MAX_QUEUE_SIZE and MAX_PARALLEL_LIMIT. Do we need both as they are trying to accomplish the same thing? Limiting the number of files being deleted at once.

Also 10 seems pretty low for deleting files, but I could be missing something, 1000 seems like a more reasonable number.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Full disclosure I'm not really sure why the original authors decided to utilize pLimit for this one particular function. That said, MAX_PARALLEL_LIMIT represents how many concurrent promises will be allowed to execute and I'm not sure why it was set to 10. We are having some discussion around if we want to expose this to be user setable. In unrelated testing we have found that setting concurrent promises really high / uncapped led to other problems, i.e. network saturation. It is probably less of a concern here as delete doesn't return much in the way of data.

I implemented MAX_QUEUE_SIZE as a means of further reducing memory utilization. The issue you were seeing was because 1. we fetched every file into memory during getFiles and 2. we then created a giant array of promises 1 for each file delete. We are also discussing allowing this to be user setable.

Copy link

@timscottbell timscottbell Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, when the authors leave it can be hard to figure out why they wrote something, and potentially dangerous to remove without sufficient testing. It seems after more testing a better number between 10 and 1000 could be set, or potentially as you said making it user-configurable.

await Promise.all(promises);
Copy link

@timscottbell timscottbell Feb 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once we hit the queue limit, all execution gets stopped until all the pending promises have finished. This technically works, but doesn't maximize the benefit of streams and results in lower performance.

When I was trying to find a workaround for this issue, I wrote this example transform stream for limiting execution of a stream: https://gist.github.com/timscottbell/364e2683354683c05cdfd93c708e9072. This is not production-ready code or fully tested, but more to give an idea of the direction I thought this solution would go in.

Our current workaround is this:

  async deleteBucket(name: string) {
    const bucket = StorageManagerGoogle._bucket(name, config.CloudProjectId);

    const bucketExists = await StorageManagerGoogle._exists(bucket);

    if (!bucketExists) {
      logger.warn(`Bucket [${name}] does not exist, cannot delete it`);

      return;
    }

    while (true) {
      const [files] = await bucket.getFiles({
        autoPaginate: false,
        maxResults: 1000,
      });

      if (files.length === 0) {
        break;
      }

      await Promise.all(
        files.map(async (file: File) => {
          try {
            await file.delete({ignoreNotFound: true});
          } catch (error) {
            const {response} = error;

            if (
              !response ||
              response.statusCode !== 409 ||
              response.statusMessage !== 'Conflict'
            ) {
              throw error;
            }
          }
        })
      );
    }

    await bucket.delete();
  }

which looks very surprisingly similar to the approach here. (We opted for the less performant solution over the stream solution in the gist because some team members have less familiarity with streams, and it was a workaround we only wanted to commit temporarily).

I couldn't find any standard libraries that do this (I didn't look super hard). Surprised... there really should be one that handles this common generic use case.

Maybe you could use pmap since the stream is an async iterable!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some other improvements to our backlog around deleteFiles performance in addition to some other stream related work we have planned for the next major version.

You are correct, we essentially back pressure the stream once the queue is full. I did this intentionally to not hold thousands / tens of thousands of objects in the queue at the same time. Hopefully this is enough of a reduction in memory for the time being. I don't think this will degrade performance as even the previous version only allowed 10 concurrent promises executing even if the queue contained 100,000 items.

If this doesn't help to resolve the issue you were seeing please feel free to update the open ticket and we will of course be happy to take a look.

promises = [];
}
promises.push(
limit(() => deleteFile(curFile)).catch(e => {
filesStream.destroy();
throw e;
})
);
}

await Promise.all(promises);
callback!(errors.length > 0 ? errors : null);
} catch (e) {
callback!(e as Error);
return;
}
})();
}

deleteLabels(labels?: string | string[]): Promise<DeleteLabelsResponse>;
Expand Down
136 changes: 123 additions & 13 deletions test/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class FakeFile {
options: FileOptions;
metadata: {};
createWriteStream: Function;
delete: Function;
isSameFile = () => false;
constructor(bucket: Bucket, name: string, options?: FileOptions) {
// eslint-disable-next-line prefer-rest-params
Expand All @@ -79,6 +80,10 @@ class FakeFile {
};
return ws;
};

this.delete = () => {
return Promise.resolve();
};
}
}

Expand Down Expand Up @@ -1226,10 +1231,35 @@ describe('Bucket', () => {
});

describe('deleteFiles', () => {
let readCount: number;

beforeEach(() => {
readCount = 0;
});

it('should accept only a callback', done => {
bucket.getFiles = (query: {}) => {
const files = [bucket.file('1'), bucket.file('2')].map(file => {
file.delete = () => {
return Promise.resolve();
};
return file;
});

const readable = new stream.Readable({
objectMode: true,
read() {
if (readCount < 1) {
this.push(files[readCount]);
readCount++;
} else {
this.push(null);
}
},
});

bucket.getFilesStream = (query: {}) => {
assert.deepStrictEqual(query, {});
return Promise.all([[]]);
return readable;
};

bucket.deleteFiles(done);
Expand All @@ -1238,9 +1268,28 @@ describe('Bucket', () => {
it('should get files from the bucket', done => {
const query = {a: 'b', c: 'd'};

bucket.getFiles = (query_: {}) => {
const files = [bucket.file('1'), bucket.file('2')].map(file => {
file.delete = () => {
return Promise.resolve();
};
return file;
});

const readable = new stream.Readable({
objectMode: true,
read() {
if (readCount < 1) {
this.push(files[readCount]);
readCount++;
} else {
this.push(null);
}
},
});

bucket.getFilesStream = (query_: {}) => {
assert.deepStrictEqual(query_, query);
return Promise.resolve([[]]);
return readable;
};

bucket.deleteFiles(query, done);
Expand All @@ -1253,7 +1302,26 @@ describe('Bucket', () => {
return () => {};
};

bucket.getFiles = () => Promise.resolve([[]]);
const files = [bucket.file('1'), bucket.file('2')].map(file => {
file.delete = () => {
return Promise.resolve();
};
return file;
});

const readable = new stream.Readable({
objectMode: true,
read() {
if (readCount < 1) {
this.push(files[readCount]);
readCount++;
} else {
this.push(null);
}
},
});

bucket.getFilesStream = () => readable;
bucket.deleteFiles({}, assert.ifError);
});

Expand All @@ -1270,9 +1338,21 @@ describe('Bucket', () => {
return file;
});

bucket.getFiles = (query_: {}) => {
const readable = new stream.Readable({
objectMode: true,
read() {
if (readCount < files.length) {
this.push(files[readCount]);
readCount++;
} else {
this.push(null);
}
},
});

bucket.getFilesStream = (query_: {}) => {
assert.strictEqual(query_, query);
return Promise.resolve([files]);
return readable;
};

bucket.deleteFiles(query, (err: Error) => {
Expand All @@ -1284,9 +1364,15 @@ describe('Bucket', () => {

it('should execute callback with error from getting files', done => {
const error = new Error('Error.');
const readable = new stream.Readable({
objectMode: true,
read() {
this.destroy(error);
},
});

bucket.getFiles = () => {
return Promise.reject(error);
bucket.getFilesStream = () => {
return readable;
};

bucket.deleteFiles({}, (err: Error) => {
Expand All @@ -1303,8 +1389,20 @@ describe('Bucket', () => {
return file;
});

bucket.getFiles = () => {
return Promise.resolve([files]);
const readable = new stream.Readable({
objectMode: true,
read() {
if (readCount < files.length) {
this.push(files[readCount]);
readCount++;
} else {
this.push(null);
}
},
});

bucket.getFilesStream = () => {
return readable;
};

bucket.deleteFiles({}, (err: Error) => {
Expand All @@ -1321,8 +1419,20 @@ describe('Bucket', () => {
return file;
});

bucket.getFiles = () => {
return Promise.resolve([files]);
const readable = new stream.Readable({
objectMode: true,
read() {
if (readCount < files.length) {
this.push(files[readCount]);
readCount++;
} else {
this.push(null);
}
},
});

bucket.getFilesStream = () => {
return readable;
};

bucket.deleteFiles({force: true}, (errs: Array<{}>) => {
Expand Down