Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Transfer Manager Metrics #2305

Merged
merged 16 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import {
ApiError,
Duplexify,
DuplexifyConstructor,
GCCL_GCS_CMD_KEY,
} from './nodejs-common/util';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const duplexify: DuplexifyConstructor = require('duplexify');
Expand Down Expand Up @@ -221,6 +222,7 @@ type PublicResumableUploadOptions =
export interface CreateResumableUploadOptions
extends Pick<resumableUpload.UploadConfig, PublicResumableUploadOptions> {
preconditionOpts?: PreconditionOptions;
[GCCL_GCS_CMD_KEY]?: resumableUpload.UploadConfig[typeof GCCL_GCS_CMD_KEY];
}

export type CreateResumableUploadResponse = [string];
Expand Down Expand Up @@ -371,6 +373,7 @@ export interface CreateReadStreamOptions {
start?: number;
end?: number;
decompress?: boolean;
[GCCL_GCS_CMD_KEY]?: string;
}

export interface SaveOptions extends CreateWriteStreamOptions {
Expand Down Expand Up @@ -1580,12 +1583,16 @@ class File extends ServiceObject<File, FileMetadata> {
headers.Range = `bytes=${tailRequest ? end : `${start}-${end}`}`;
}

const reqOpts = {
const reqOpts: DecorateRequestOptions = {
uri: '',
headers,
qs: query,
};

if (options[GCCL_GCS_CMD_KEY]) {
reqOpts[GCCL_GCS_CMD_KEY] = options[GCCL_GCS_CMD_KEY];
}

this.requestStream(reqOpts)
.on('error', err => {
throughStream.destroy(err);
Expand Down
7 changes: 7 additions & 0 deletions src/nodejs-common/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {Interceptor} from './service-object';
import {
BodyResponseCallback,
DecorateRequestOptions,
GCCL_GCS_CMD_KEY,
MakeAuthenticatedRequest,
PackageJson,
util,
Expand Down Expand Up @@ -253,6 +254,12 @@ export class Service {
} gccl-invocation-id/${uuid.v4()}`,
};

if (reqOpts[GCCL_GCS_CMD_KEY]) {
reqOpts.headers[
'x-goog-api-client'
] += ` gccl-gcs-cmd/${reqOpts[GCCL_GCS_CMD_KEY]}`;
}

if (reqOpts.shouldReturnStream) {
return this.makeAuthenticatedRequest(reqOpts) as {} as r.Request;
} else {
Expand Down
27 changes: 23 additions & 4 deletions src/nodejs-common/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ import {getRuntimeTrackingString} from '../util';

const packageJson = require('../../../package.json');

/**
* A unique symbol for providing a `gccl-gcs-cmd` value
* for the `X-Goog-API-Client` header.
*
* E.g the `V` in `X-Goog-API-Client: gccl-gcs-cmd/V`
**/
export const GCCL_GCS_CMD_KEY = Symbol('GCCL_GCS_CMD');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to go look this up, didn't know Symbol was a thing.


// eslint-disable-next-line @typescript-eslint/no-var-requires
const duplexify: DuplexifyConstructor = require('duplexify');

Expand Down Expand Up @@ -233,6 +241,7 @@ export interface DecorateRequestOptions extends r.CoreOptions {
interceptors_?: Interceptor[];
shouldReturnStream?: boolean;
projectId?: string;
[GCCL_GCS_CMD_KEY]?: string;
}

export interface ParsedHttpResponseBody {
Expand Down Expand Up @@ -530,7 +539,9 @@ export class Util {
body: writeStream,
},
],
} as {} as r.OptionsWithUri;
} as {} as r.OptionsWithUri & {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is clever!

[GCCL_GCS_CMD_KEY]?: string;
};

options.makeAuthenticatedRequest(reqOpts, {
onAuthenticated(err, authenticatedReqOpts) {
Expand All @@ -539,7 +550,9 @@ export class Util {
return;
}

requestDefaults.headers = util._getDefaultHeaders();
requestDefaults.headers = util._getDefaultHeaders(
reqOpts[GCCL_GCS_CMD_KEY]
);
const request = teenyRequest.defaults(requestDefaults);
request(authenticatedReqOpts!, (err, resp, body) => {
util.handleResp(err, resp, body, (err, data) => {
Expand Down Expand Up @@ -1014,13 +1027,19 @@ export class Util {
: [optionsOrCallback as T, cb as C];
}

_getDefaultHeaders() {
return {
_getDefaultHeaders(gcclGcsCmd?: string) {
const headers = {
'User-Agent': util.getUserAgentFromPackageJson(packageJson),
'x-goog-api-client': `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${uuid.v4()}`,
};

if (gcclGcsCmd) {
headers['x-goog-api-client'] += ` gccl-gcs-cmd/${gcclGcsCmd}`;
}

return headers;
}
}

Expand Down
42 changes: 33 additions & 9 deletions src/resumable-upload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import retry = require('async-retry');
import {RetryOptions, PreconditionOptions} from './storage';
import * as uuid from 'uuid';
import {getRuntimeTrackingString} from './util';
import {GCCL_GCS_CMD_KEY} from './nodejs-common/util';

const NOT_FOUND_STATUS_CODE = 404;
const RESUMABLE_INCOMPLETE_STATUS_CODE = 308;
Expand Down Expand Up @@ -193,6 +194,8 @@ export interface UploadConfig extends Pick<WritableOptions, 'highWaterMark'> {
* Configuration options for retrying retryable errors.
*/
retryOptions: RetryOptions;

[GCCL_GCS_CMD_KEY]?: string;
}

export interface ConfigMetadata {
Expand Down Expand Up @@ -274,6 +277,7 @@ export class Upload extends Writable {
private localWriteCache: Buffer[] = [];
private localWriteCacheByteLength = 0;
private upstreamEnded = false;
#gcclGcsCmd?: string;

constructor(cfg: UploadConfig) {
super(cfg);
Expand Down Expand Up @@ -347,6 +351,8 @@ export class Upload extends Writable {
: NaN;
this.contentLength = isNaN(contentLength) ? '*' : contentLength;

this.#gcclGcsCmd = cfg[GCCL_GCS_CMD_KEY];

this.once('writing', () => {
if (this.uri) {
this.continueUploading();
Expand Down Expand Up @@ -585,6 +591,14 @@ export class Upload extends Writable {
delete metadata.contentType;
}

let googAPIClient = `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${this.currentInvocationId.uri}`;

if (this.#gcclGcsCmd) {
googAPIClient += ` gccl-gcs-cmd/${this.#gcclGcsCmd}`;
}

// Check if headers already exist before creating new ones
const reqOpts: GaxiosOptions = {
method: 'POST',
Expand All @@ -598,9 +612,7 @@ export class Upload extends Writable {
),
data: metadata,
headers: {
'x-goog-api-client': `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${this.currentInvocationId.uri}`,
'x-goog-api-client': googAPIClient,
...headers,
},
};
Expand Down Expand Up @@ -766,10 +778,16 @@ export class Upload extends Writable {
},
});

let googAPIClient = `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${this.currentInvocationId.chunk}`;

if (this.#gcclGcsCmd) {
googAPIClient += ` gccl-gcs-cmd/${this.#gcclGcsCmd}`;
}

const headers: GaxiosOptions['headers'] = {
'x-goog-api-client': `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${this.currentInvocationId.chunk}`,
'x-goog-api-client': googAPIClient,
};

// If using multiple chunk upload, set appropriate header
Expand Down Expand Up @@ -904,15 +922,21 @@ export class Upload extends Writable {
}

private async getAndSetOffset() {
let googAPIClient = `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${this.currentInvocationId.offset}`;

if (this.#gcclGcsCmd) {
googAPIClient += ` gccl-gcs-cmd/${this.#gcclGcsCmd}`;
}

const opts: GaxiosOptions = {
method: 'PUT',
url: this.uri!,
headers: {
'Content-Length': 0,
'Content-Range': 'bytes */*',
'x-goog-api-client': `${getRuntimeTrackingString()} gccl/${
packageJson.version
} gccl-invocation-id/${this.currentInvocationId.offset}`,
'x-goog-api-client': googAPIClient,
},
};
try {
Expand Down
55 changes: 51 additions & 4 deletions src/transfer-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import * as retry from 'async-retry';
import {ApiError} from './nodejs-common';
import {GaxiosResponse, Headers} from 'gaxios';
import {createHash} from 'crypto';
import {GCCL_GCS_CMD_KEY} from './nodejs-common/util';

/**
* Default number of concurrently executing promises to use when calling uploadManyFiles.
Expand Down Expand Up @@ -64,6 +65,21 @@ const UPLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024;
const DEFAULT_PARALLEL_CHUNKED_UPLOAD_LIMIT = 2;

const EMPTY_REGEX = '(?:)';

/**
* The `gccl-gcs-cmd` value for the `X-Goog-API-Client` header.
* Example: `gccl-gcs-cmd/tm.upload_many`
*
* @see {@link GCCL_GCS_CMD}.
* @see {@link GCCL_GCS_CMD_KEY}.
*/
const GCCL_GCS_CMD_FEATURE = {
UPLOAD_MANY: 'tm.upload_many',
DOWNLOAD_MANY: 'tm.download_many',
UPLOAD_SHARDED: 'tm.upload_sharded',
DOWNLOAD_SHARDED: 'tm.download_sharded',
};

export interface UploadManyFilesOptions {
concurrencyLimit?: number;
skipIfExists?: boolean;
Expand Down Expand Up @@ -168,7 +184,9 @@ class XMLMultiPartUploadHelper implements MultiPartUploadHelper {
this.bucket = bucket;
this.fileName = fileName;
// eslint-disable-next-line prettier/prettier
this.baseUrl = `https://${bucket.name}.${new URL(this.bucket.storage.apiEndpoint).hostname}/${fileName}`;
this.baseUrl = `https://${bucket.name}.${
new URL(this.bucket.storage.apiEndpoint).hostname
}/${fileName}`;
this.xmlBuilder = new XMLBuilder({arrayNodeName: 'Part'});
this.xmlParser = new XMLParser();
this.partsMap = partsMap || new Map<number, string>();
Expand All @@ -189,7 +207,22 @@ class XMLMultiPartUploadHelper implements MultiPartUploadHelper {
const url = `${this.baseUrl}?uploads`;
return retry(async bail => {
try {
const headers = await this.authClient.getRequestHeaders();

for (const [key, value] of Object.entries(headers)) {
if (key.toLocaleLowerCase().trim() === 'x-goog-api-client') {
// Prepend command feature to value, if not already there
if (!value.includes(GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED)) {
headers[
key
] = `${value} gccl-gcs-cmd/${GCCL_GCS_CMD_FEATURE.UPLOAD_SHARDED}`;
}
break;
}
}

const res = await this.authClient.request({
headers,
method: 'POST',
url,
});
Expand Down Expand Up @@ -314,6 +347,10 @@ export class TransferManager {
this.bucket = bucket;
}

#setMethodHeader() {
// .
}

/**
* @typedef {object} UploadManyFilesOptions
* @property {number} [concurrencyLimit] The number of concurrently executing promises
Expand Down Expand Up @@ -394,6 +431,7 @@ export class TransferManager {

const passThroughOptionsCopy: UploadOptions = {
...options.passthroughOptions,
[GCCL_GCS_CMD_KEY]: GCCL_GCS_CMD_FEATURE.UPLOAD_MANY,
};

passThroughOptionsCopy.destination = filePath;
Expand All @@ -403,6 +441,7 @@ export class TransferManager {
passThroughOptionsCopy.destination
);
}

promises.push(
limit(() =>
this.bucket.upload(filePath, passThroughOptionsCopy as UploadOptions)
Expand Down Expand Up @@ -487,6 +526,7 @@ export class TransferManager {
for (const file of files) {
const passThroughOptionsCopy = {
...options.passthroughOptions,
[GCCL_GCS_CMD_KEY]: GCCL_GCS_CMD_FEATURE.DOWNLOAD_MANY,
};

if (options.prefix) {
Expand All @@ -499,6 +539,7 @@ export class TransferManager {
if (options.stripPrefix) {
passThroughOptionsCopy.destination = file.name.replace(regex, '');
}

promises.push(limit(() => file.download(passThroughOptionsCopy)));
}

Expand Down Expand Up @@ -569,9 +610,15 @@ export class TransferManager {
chunkEnd = chunkEnd > size ? size : chunkEnd;
promises.push(
limit(() =>
file.download({start: chunkStart, end: chunkEnd}).then(resp => {
return fileToWrite.write(resp[0], 0, resp[0].length, chunkStart);
})
file
.download({
start: chunkStart,
end: chunkEnd,
[GCCL_GCS_CMD_KEY]: GCCL_GCS_CMD_FEATURE.DOWNLOAD_SHARDED,
})
.then(resp => {
return fileToWrite.write(resp[0], 0, resp[0].length, chunkStart);
})
)
);

Expand Down