Skip to content

Commit

Permalink
feat: create sync task with auth header (#442)
Browse files Browse the repository at this point in the history
The upstream repository carries authentication header information via
task parameters when alwaysAuth is enabled

---

上游仓库开启 alwaysAuth 时通过任务参数携带认证头信息
  • Loading branch information
hezhengxu2018 authored Apr 21, 2023
1 parent 59706ab commit d95c58b
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 19 deletions.
17 changes: 13 additions & 4 deletions app/common/FileUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ import { randomBytes } from 'crypto';
import { EggContextHttpClient, HttpClientResponse } from 'egg';
import dayjs from './dayjs';

interface DownloadToTempfileOptionalConfig {
retries?: number,
ignoreDownloadStatuses?: number[],
remoteAuthToken?: string
}

export async function createTempfile(dataDir: string, filename: string) {
// will auto clean on CleanTempDir Schedule
const tmpdir = path.join(dataDir, 'downloads', dayjs().format('YYYY/MM/DD'));
Expand All @@ -19,11 +25,12 @@ export async function createTempfile(dataDir: string, filename: string) {
}

export async function downloadToTempfile(httpclient: EggContextHttpClient,
dataDir: string, url: string, ignoreDownloadStatuses?: number[], retries = 3) {
dataDir: string, url: string, optionalConfig?: DownloadToTempfileOptionalConfig) {
let retries = optionalConfig?.retries || 3;
let lastError: any;
while (retries > 0) {
try {
return await _downloadToTempfile(httpclient, dataDir, url, ignoreDownloadStatuses);
return await _downloadToTempfile(httpclient, dataDir, url, optionalConfig);
} catch (err: any) {
if (err.name === 'DownloadNotFoundError') throw err;
lastError = err;
Expand All @@ -43,19 +50,21 @@ export interface Tempfile {
timing: HttpClientResponse['res']['timing'];
}
async function _downloadToTempfile(httpclient: EggContextHttpClient,
dataDir: string, url: string, ignoreDownloadStatuses?: number[]): Promise<Tempfile> {
dataDir: string, url: string, optionalConfig?: DownloadToTempfileOptionalConfig): Promise<Tempfile> {
const tmpfile = await createTempfile(dataDir, url);
const writeStream = createWriteStream(tmpfile);
try {
// max 10 mins to download
// FIXME: should show download progress
const authorization = optionalConfig?.remoteAuthToken ? `Bearer ${optionalConfig?.remoteAuthToken}` : '';
const { status, headers, res } = await httpclient.request(url, {
timeout: 60000 * 10,
headers: { authorization },
writeStream,
timing: true,
followRedirect: true,
}) as HttpClientResponse;
if (status === 404 || (ignoreDownloadStatuses && ignoreDownloadStatuses.includes(status))) {
if (status === 404 || (optionalConfig?.ignoreDownloadStatuses && optionalConfig.ignoreDownloadStatuses.includes(status))) {
const err = new Error(`Not found, status(${status})`);
err.name = 'DownloadNotFoundError';
throw err;
Expand Down
25 changes: 17 additions & 8 deletions app/common/adapter/NPMRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ export class NPMRegistry {
this.registryHost = registryHost;
}

public async getFullManifests(fullname: string, retries = 3): Promise<RegistryResponse> {
public async getFullManifests(fullname: string, optionalConfig?: {retries?:number, remoteAuthToken?:string}): Promise<RegistryResponse> {
let retries = optionalConfig?.retries || 3;
// set query t=timestamp, make sure CDN cache disable
// cache=0 is sync worker request flag
const url = `${this.registry}/${encodeURIComponent(fullname)}?t=${Date.now()}&cache=0`;
Expand All @@ -49,7 +50,8 @@ export class NPMRegistry {
try {
// large package: https://r.cnpmjs.org/%40procore%2Fcore-icons
// https://r.cnpmjs.org/intraactive-sdk-ui 44s
return await this.request('GET', url, undefined, { timeout: 120000 });
const authorization = this.genAuthorizationHeader(optionalConfig?.remoteAuthToken);
return await this.request('GET', url, undefined, { timeout: 120000, headers: { authorization } });
} catch (err: any) {
if (err.name === 'ResponseTimeoutError') throw err;
lastError = err;
Expand All @@ -65,25 +67,28 @@ export class NPMRegistry {
}

// app.put('/:name/sync', sync.sync);
public async createSyncTask(fullname: string): Promise<RegistryResponse> {
public async createSyncTask(fullname: string, optionalConfig?: { remoteAuthToken?:string}): Promise<RegistryResponse> {
const authorization = this.genAuthorizationHeader(optionalConfig?.remoteAuthToken);
const url = `${this.registry}/${encodeURIComponent(fullname)}/sync?sync_upstream=true&nodeps=true`;
// {
// ok: true,
// logId: logId
// };
return await this.request('PUT', url);
return await this.request('PUT', url, undefined, { authorization });
}

// app.get('/:name/sync/log/:id', sync.getSyncLog);
public async getSyncTask(fullname: string, id: string, offset: number): Promise<RegistryResponse> {
public async getSyncTask(fullname: string, id: string, offset: number, optionalConfig?:{ remoteAuthToken?:string }): Promise<RegistryResponse> {
const authorization = this.genAuthorizationHeader(optionalConfig?.remoteAuthToken);
const url = `${this.registry}/${encodeURIComponent(fullname)}/sync/log/${id}?offset=${offset}`;
// { ok: true, syncDone: syncDone, log: log }
return await this.request('GET', url);
return await this.request('GET', url, undefined, { authorization });
}

public async getDownloadRanges(registry: string, fullname: string, start: string, end: string): Promise<RegistryResponse> {
public async getDownloadRanges(registry: string, fullname: string, start: string, end: string, optionalConfig?:{ remoteAuthToken?:string }): Promise<RegistryResponse> {
const authorization = this.genAuthorizationHeader(optionalConfig?.remoteAuthToken);
const url = `${registry}/downloads/range/${start}:${end}/${encodeURIComponent(fullname)}`;
return await this.request('GET', url);
return await this.request('GET', url, undefined, { authorization });
}

private async request(method: HttpMethod, url: string, params?: object, options?: object): Promise<RegistryResponse> {
Expand All @@ -103,4 +108,8 @@ export class NPMRegistry {
...res,
};
}

private genAuthorizationHeader(remoteAuthToken?:string) {
return remoteAuthToken ? `Bearer ${remoteAuthToken}` : '';
}
}
3 changes: 3 additions & 0 deletions app/core/entity/Task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export interface TaskData<T = TaskBaseData> extends EntityData {
export type SyncPackageTaskOptions = {
authorId?: string;
authorIp?: string;
remoteAuthToken?: string;
tips?: string;
skipDependencies?: boolean;
syncDownloadData?: boolean;
Expand All @@ -50,6 +51,7 @@ export interface TriggerHookTaskData extends TaskBaseData {
}

export interface CreateSyncPackageTaskData extends TaskBaseData {
remoteAuthToken?: string;
tips?: string;
skipDependencies?: boolean;
syncDownloadData?: boolean;
Expand Down Expand Up @@ -129,6 +131,7 @@ export class Task<T extends TaskBaseData = TaskBaseData> extends Entity {
data: {
// task execute worker
taskWorker: '',
remoteAuthToken: options?.remoteAuthToken,
tips: options?.tips,
registryId: options?.registryId ?? '',
skipDependencies: options?.skipDependencies,
Expand Down
2 changes: 1 addition & 1 deletion app/core/service/BinarySyncerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export class BinarySyncerService extends AbstractService {
try {
const { tmpfile, headers, timing } =
await downloadToTempfile(
this.httpclient, this.config.dataDir, item.sourceUrl!, item.ignoreDownloadStatuses);
this.httpclient, this.config.dataDir, item.sourceUrl!, { ignoreDownloadStatuses: item.ignoreDownloadStatuses });
logs.push(`[${isoNow()}][${dir}] 🟢 [${parentIndex}${index}] HTTP content-length: ${headers['content-length']}, timing: ${JSON.stringify(timing)}, ${item.sourceUrl} => ${tmpfile}`);
localFile = tmpfile;
const binary = await this.saveBinaryItem(item, tmpfile);
Expand Down
14 changes: 8 additions & 6 deletions app/core/service/PackageSyncerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ export class PackageSyncerService extends AbstractService {
logs.push(`[${isoNow()}][DownloadData] 🚧🚧🚧🚧🚧 Syncing "${fullname}" download data "${start}:${end}" on ${registry} 🚧🚧🚧🚧🚧`);
const failEnd = '❌❌❌❌❌ 🚮 give up 🚮 ❌❌❌❌❌';
try {
const { data, status, res } = await this.npmRegistry.getDownloadRanges(registry, fullname, start, end);
const { remoteAuthToken } = task.data as SyncPackageTaskOptions;
const { data, status, res } = await this.npmRegistry.getDownloadRanges(registry, fullname, start, end, { remoteAuthToken });
downloads = data.downloads || [];
logs.push(`[${isoNow()}][DownloadData] 🚧 HTTP [${status}] timing: ${JSON.stringify(res.timing)}, downloads: ${downloads.length}`);
} catch (err: any) {
Expand Down Expand Up @@ -161,12 +162,13 @@ export class PackageSyncerService extends AbstractService {
private async syncUpstream(task: Task) {
const registry = this.npmRegistry.registry;
const fullname = task.targetName;
const { remoteAuthToken } = task.data as SyncPackageTaskOptions;
let logs: string[] = [];
let logId = '';
logs.push(`[${isoNow()}][UP] 🚧🚧🚧🚧🚧 Waiting sync "${fullname}" task on ${registry} 🚧🚧🚧🚧🚧`);
const failEnd = `❌❌❌❌❌ Sync ${registry}/${fullname} 🚮 give up 🚮 ❌❌❌❌❌`;
try {
const { data, status, res } = await this.npmRegistry.createSyncTask(fullname);
const { data, status, res } = await this.npmRegistry.createSyncTask(fullname, { remoteAuthToken });
logs.push(`[${isoNow()}][UP] 🚧 HTTP [${status}] timing: ${JSON.stringify(res.timing)}, data: ${JSON.stringify(data)}`);
logId = data.logId;
} catch (err: any) {
Expand All @@ -192,7 +194,7 @@ export class PackageSyncerService extends AbstractService {
const delay = process.env.NODE_ENV === 'test' ? 100 : 1000 + Math.random() * 5000;
await setTimeout(delay);
try {
const { data, status, url } = await this.npmRegistry.getSyncTask(fullname, logId, offset);
const { data, status, url } = await this.npmRegistry.getSyncTask(fullname, logId, offset, { remoteAuthToken });
useTime = Date.now() - startTime;
if (!logUrl) {
logUrl = url;
Expand Down Expand Up @@ -347,7 +349,7 @@ export class PackageSyncerService extends AbstractService {
public async executeTask(task: Task) {
const fullname = task.targetName;
const [ scope, name ] = getScopeAndName(fullname);
const { tips, skipDependencies: originSkipDependencies, syncDownloadData, forceSyncHistory } = task.data as SyncPackageTaskOptions;
const { tips, skipDependencies: originSkipDependencies, syncDownloadData, forceSyncHistory, remoteAuthToken } = task.data as SyncPackageTaskOptions;
let pkg = await this.packageRepository.findPackage(scope, name);
const registry = await this.initSpecRegistry(task, pkg, scope);
const registryHost = this.npmRegistry.registry;
Expand Down Expand Up @@ -410,7 +412,7 @@ export class PackageSyncerService extends AbstractService {

let registryFetchResult: RegistryResponse;
try {
registryFetchResult = await this.npmRegistry.getFullManifests(fullname);
registryFetchResult = await this.npmRegistry.getFullManifests(fullname, { remoteAuthToken });
} catch (err: any) {
const status = err.status || 'unknown';
task.error = `request manifests error: ${err}, status: ${status}`;
Expand Down Expand Up @@ -618,7 +620,7 @@ export class PackageSyncerService extends AbstractService {
let localFile: string;
try {
const { tmpfile, headers, timing } =
await downloadToTempfile(this.httpclient, this.config.dataDir, tarball);
await downloadToTempfile(this.httpclient, this.config.dataDir, tarball, { remoteAuthToken });
localFile = tmpfile;
logs.push(`[${isoNow()}] 🚧 [${syncIndex}] HTTP content-length: ${headers['content-length']}, timing: ${JSON.stringify(timing)} => ${localFile}`);
} catch (err: any) {
Expand Down
2 changes: 2 additions & 0 deletions app/port/controller/PackageSyncController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ export class PackageSyncController extends AbstractController {

const params = {
fullname,
remoteAuthToken: data.remoteAuthToken,
tips,
skipDependencies: !!data.skipDependencies,
syncDownloadData: !!data.syncDownloadData,
Expand Down Expand Up @@ -95,6 +96,7 @@ export class PackageSyncController extends AbstractController {
const task = await this.packageSyncerService.createTask(params.fullname, {
authorIp: ctx.ip,
authorId: authorized?.user.userId,
remoteAuthToken: params.remoteAuthToken,
tips: params.tips,
skipDependencies: params.skipDependencies,
syncDownloadData: params.syncDownloadData,
Expand Down
6 changes: 6 additions & 0 deletions app/port/typebox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,12 @@ export const TagWithVersionRule = Type.Object({

export const SyncPackageTaskRule = Type.Object({
fullname: Name,
remoteAuthToken: Type.Optional(
Type.String({
transform: [ 'trim' ],
maxLength: 200,
}),
),
tips: Type.String({
transform: [ 'trim' ],
maxLength: 1024,
Expand Down
40 changes: 40 additions & 0 deletions test/core/service/PackageSyncerService/executeTask.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,46 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => {
assert(log.includes('] 📦 Add dependency "@resvg/resvg-js-win32-x64-msvc" sync task: '));
});

it('should bring auth token when set remoteAuthToken', async () => {
const testToken = 'test-auth-token';
const fullManifests = await TestUtil.readFixturesFile('registry.npmjs.org/foobar.json');
const tgzBuffer1_0_0 = await TestUtil.readFixturesFile('registry.npmjs.org/foobar/-/foobar-1.0.0.tgz');
const tgzBuffer1_1_0 = await TestUtil.readFixturesFile('registry.npmjs.org/foobar/-/foobar-1.1.0.tgz');

let fullManifestsHeader;
let tgzBuffer1_0_0Header;
let tgzBuffer1_1_0Header;
app.mockHttpclient('https://registry.npmjs.org/foobar', 'GET', (_, opts) => {
fullManifestsHeader = opts.headers;
return {
data: fullManifests,
persist: false,
repeats: 2,
};
});
app.mockHttpclient('https://registry.npmjs.org/foobar/-/foobar-1.0.0.tgz', 'GET', (_, opts) => {
tgzBuffer1_0_0Header = opts.headers;
return {
data: tgzBuffer1_0_0,
persist: false,
};
});
app.mockHttpclient('https://registry.npmjs.org/foobar/-/foobar-1.1.0.tgz', 'GET', (_, opts) => {
tgzBuffer1_1_0Header = opts.headers;
return {
data: tgzBuffer1_1_0,
persist: false,
};
});
await packageSyncerService.createTask('foobar', { skipDependencies: true, remoteAuthToken: testToken });
const task = await packageSyncerService.findExecuteTask();
assert(task);
await packageSyncerService.executeTask(task);
assert.equal(fullManifestsHeader?.authorization, `Bearer ${testToken}`);
assert.equal(tgzBuffer1_0_0Header?.authorization, `Bearer ${testToken}`);
assert.equal(tgzBuffer1_1_0Header?.authorization, `Bearer ${testToken}`);
});

it('should ignore publish error on sync task', async () => {
app.mockHttpclient('https://registry.npmjs.org/cnpmcore-test-sync-deprecated', 'GET', {
data: await TestUtil.readFixturesFile('registry.npmjs.org/cnpmcore-test-sync-deprecated.json'),
Expand Down

0 comments on commit d95c58b

Please sign in to comment.