Skip to content

Commit

Permalink
fix: ci
Browse files Browse the repository at this point in the history
  • Loading branch information
elrrrrrrr committed Nov 28, 2022
1 parent c1e1de2 commit 9bfd8ff
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 29 deletions.
11 changes: 4 additions & 7 deletions app/core/service/EventCorkerAdvice.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import { EventBus, Inject } from '@eggjs/tegg';
import { ContextEventBus, Inject } from '@eggjs/tegg';
import { Advice, IAdvice } from '@eggjs/tegg/aop';

@Advice()
export class EventCorkerAdvice implements IAdvice {
@Inject()
private eventBus: EventBus;
// 依赖 https://github.com/eggjs/tegg/pull/60 合并后支持
private eventBus: ContextEventBus;

async beforeCall() {
this.eventBus;
// this.eventBus.cork();
this.eventBus.cork();
}

async afterFinally() {
this.eventBus;
// this.eventBus.uncork();
this.eventBus.uncork();
}
}
21 changes: 5 additions & 16 deletions app/core/service/PackageSyncerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,16 @@ export class PackageSyncerService extends AbstractService {
return registry;
}

@Pointcut(EventCorkerAdvice)
public async executeTaskWithCorker(task: Task): Promise<void> {
await this.executeTask(task);
}

// 由于 cnpmcore 将 version 和 tag 作为两个独立的 changes 事件分发
// 普通版本发布时,短时间内会有两条相同 task 进行同步
// 尽量保证读取和写入都需保证任务幂等,需要确保 changes 在同步任务完成后再触发
// 通过 DB 唯一索引来保证任务幂等,插入失败不影响 pkg.manifests 更新
// 通过 eventBus.cork/uncork 来暂缓事件触发
@Pointcut(EventCorkerAdvice)
public async executeTask(task: Task) {
const fullname = task.targetName;
const [ scope, name ] = getScopeAndName(fullname);
Expand Down Expand Up @@ -563,21 +567,6 @@ export class PackageSyncerService extends AbstractService {
pkg = await this.packageRepository.findPackage(scope, name);
}

// pkg.manifests 和 version.manifests 是异步的
// 需要确保外围能感知到 pkg.manifests 上的变更
// FIXME 验证完成后可删除
// if (pkg) {
// // check again, make sure prefix version not exists
// const existsPkgVersion = await this.packageRepository.findPackageVersion(pkg.packageId, version);
// if (existsPkgVersion) {
// await rm(localFile, { force: true });
// logs.push(`[${isoNow()}] 🐛 [${syncIndex}] Synced version ${version} already exists, skip publish it`);
// await this.taskService.appendTaskLog(task, logs.join('\n'));
// logs = [];
// continue;
// }
// }

const publishCmd = {
scope,
name,
Expand Down
2 changes: 1 addition & 1 deletion app/port/schedule/SyncPackageWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export class SyncPackageWorker {
this.logger.info('[SyncPackageWorker:subscribe:executeTask:start][%s] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms',
executingCount, task.taskId, task.targetName, task.attempts, task.data, task.updatedAt,
startTime - task.updatedAt.getTime());
await this.packageSyncerService.executeTask(task);
await this.packageSyncerService.executeTaskWithCorker(task);
const use = Date.now() - startTime;
this.logger.info('[SyncPackageWorker:subscribe:executeTask:success][%s] taskId: %s, targetName: %s, use %sms',
executingCount, task.taskId, task.targetName, use);
Expand Down
52 changes: 47 additions & 5 deletions test/core/service/PackageSyncerService/executeTask.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { PackageSyncerService } from 'app/core/service/PackageSyncerService';
import { PackageManagerService } from 'app/core/service/PackageManagerService';
import { Package as PackageModel } from 'app/repository/model/Package';
import { Task as TaskModel } from 'app/repository/model/Task';
import { Task as TaskEntity } from 'app/core/entity/Task';
import { Task, Task as TaskEntity } from 'app/core/entity/Task';
import { HistoryTask as HistoryTaskModel } from 'app/repository/model/HistoryTask';
import { TestUtil } from 'test/TestUtil';
import { NPMRegistry } from 'app/common/adapter/NPMRegistry';
Expand All @@ -18,6 +18,7 @@ import { RegistryType } from 'app/common/enum/Registry';
import { TaskService } from 'app/core/service/TaskService';
import { ScopeManagerService } from 'app/core/service/ScopeManagerService';
import { UserService } from 'app/core/service/UserService';
import { ChangeRepository } from 'app/repository/ChangeRepository';

describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => {
let ctx: Context;
Expand All @@ -29,6 +30,7 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => {
let taskService: TaskService;
let scopeManagerService: ScopeManagerService;
let userService: UserService;
let changeRepository: ChangeRepository;

beforeEach(async () => {
ctx = await app.mockModuleContext();
Expand All @@ -40,6 +42,7 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => {
registryManagerService = await ctx.getEggObject(RegistryManagerService);
scopeManagerService = await ctx.getEggObject(ScopeManagerService);
userService = await ctx.getEggObject(UserService);
changeRepository = await ctx.getEggObject(ChangeRepository);
});

afterEach(async () => {
Expand Down Expand Up @@ -392,7 +395,6 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => {
persist: false,
});
name = 'cnpmcore-test-sync-dependencies';
// don't add cnpmcore-test-sync-deprecated task if cnpmcore-test-sync-deprecated already exists
const task = await packageSyncerService.createTask(name);
assert(task);
assert.equal(task.targetName, name);
Expand Down Expand Up @@ -601,7 +603,7 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => {
// https://www.npmjs.com/package/@cnpmcore/test-sync-package-has-two-versions
const name = '@cnpmcore/test-sync-package-has-two-versions';
await packageSyncerService.createTask(name);
let task = await packageSyncerService.findExecuteTask();
const task = await packageSyncerService.findExecuteTask();
assert(task);
assert.equal(task.targetName, name);

Expand Down Expand Up @@ -650,9 +652,9 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => {

await packageSyncerService.executeTask(task);

let stream = await packageSyncerService.findTaskLog(task);
const stream = await packageSyncerService.findTaskLog(task);
assert(stream);
let log = await TestUtil.readStreamToLog(stream);
const log = await TestUtil.readStreamToLog(stream);
// console.log(log);
assert(log.includes('Synced version 2.0.0 already exists, skip publish, try to set in local manifest'));
assert(log.includes('] 🚧 Syncing versions 1 => 2'));
Expand All @@ -662,6 +664,46 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => {

});

it('event cork should work', async () => {
app.mockHttpclient('https://registry.npmjs.org/%40cnpmcore%2Ftest-sync-package-cork', 'GET', {
data: '{"_id":"@cnpmcore/test-sync-package-cork","_rev":"4-541287ae0a14039fea89ac08fa5ec53d","name":"@cnpmcore/test-sync-package-cork","dist-tags":{"latest":"2.0.0","next":"2.0.0"},"versions":{"1.0.0":{"name":"@cnpmcore/test-sync-package-cork","version":"1.0.0","description":"cnpmcore local test package","main":"index.js","scripts":{"test":"echo \\"hello\\""},"author":"","license":"MIT","gitHead":"60cfb1cf401f87a60a1b0dfd7ee739f98ffd7847","_id":"@cnpmcore/test-sync-package-cork@1.0.0","_nodeVersion":"16.13.1","_npmVersion":"8.1.2","dist":{"integrity":"sha512-WR0T96H8t7ss1FK8GWPPblx+usbjU4bNGRjMHS9t/oVA5DgJDxitydPSFPeIUtXciyekI7R47do9Lc3GgC4P5A==","shasum":"2ddc6ee93b92be6d64139fb1a631d2610f43e946","tarball":"https://registry.npmjs.org/@cnpmcore/test-sync-package-cork/-/test-sync-package-cork-1.0.0.tgz","fileCount":2,"unpackedSize":238,"signatures":[{"keyid":"SHA256:jl3bwswu80PjjokCgh0o2w5c2U4LhQAE57gj9cz1kzA","sig":"MEYCIQDj5Ui2GU8nVmHFk0hCt/i3gPW9eQdOCZgKzpAlkvERwQIhAPZ0NCefLoEfOpnbdKAUr7Ng9Sy6FMnTsDxDaM2dQHNw"}]},"_npmUser":{"name":"fengmk2","email":"fengmk2@gmail.com"},"directories":{},"maintainers":[{"name":"fengmk2","email":"fengmk2@gmail.com"}],"_npmOperationalInternal":{"host":"s3://npm-registry-packages","tmp":"tmp/test-sync-package-cork_1.0.0_1639442699824_0.6948988437963031"},"_hasShrinkwrap":false},"2.0.0":{"name":"@cnpmcore/test-sync-package-cork","version":"2.0.0","description":"cnpmcore local test package","main":"index.js","scripts":{"test":"echo \\"hello\\""},"author":"","license":"MIT","gitHead":"60cfb1cf401f87a60a1b0dfd7ee739f98ffd7847","_id":"@cnpmcore/test-sync-package-cork@2.0.0","_nodeVersion":"16.13.1","_npmVersion":"8.1.2","dist":{"integrity":"sha512-qgHLQzXq+VN7q0JWibeBYrqb3Iajl4lpVuxlQstclRz4ejujfDFswBGSXmCv9FyIIdmSAe5bZo0oHQLsod3pAA==","shasum":"891eb8e08ceadbd86e75b6d66f31f7e5a28a8d68","tarball":"https://registry.npmjs.org/@cnpmcore/test-sync-package-cork/-/test-sync-package-cork-2.0.0.tgz","fileCount":2,"unpackedSize":238,"signatures":[{"keyid":"SHA256:jl3bwswu80PjjokCgh0o2w5c2U4LhQAE57gj9cz1kzA","sig":"MEQCIAWVz7mIHF23Gq4a+Swsj2ZSdn87991HcE1+fQm8shNCAiByOIuhaZAbo9hct24qYf7FWqx6Lyluo+Rpnrn91//Ibg=="}]},"_npmUser":{"name":"fengmk2","email":"fengmk2@gmail.com"},"directories":{},"maintainers":[{"name":"fengmk2","email":"fengmk2@gmail.com"}],"_npmOperationalInternal":{"host":"s3://npm-registry-packages","tmp":"tmp/test-sync-package-cork_2.0.0_1639442732240_0.33204392278137207"},"_hasShrinkwrap":false}},"time":{"created":"2021-12-14T00:44:59.775Z","1.0.0":"2021-12-14T00:44:59.940Z","modified":"2022-05-23T02:33:52.613Z","2.0.0":"2021-12-14T00:45:32.457Z"},"maintainers":[{"email":"killa07071201@gmail.com","name":"killagu"},{"email":"fengmk2@gmail.com","name":"fengmk2"}],"description":"cnpmcore local test package","license":"MIT","readme":"ERROR: No README data found!","readmeFilename":""}',
persist: false,
repeats: 2,
});
app.mockHttpclient('https://registry.npmjs.org/@cnpmcore/test-sync-package-cork/-/test-sync-package-cork-1.0.0.tgz', 'GET', {
data: await TestUtil.readFixturesFile('registry.npmjs.org/foobar/-/foobar-1.0.0.tgz'),
persist: false,
});

app.mockHttpclient('https://registry.npmjs.org/@cnpmcore/test-sync-package-cork/-/test-sync-package-cork-2.0.0.tgz', 'GET', {
data: await TestUtil.readFixturesFile('registry.npmjs.org/foobar/-/foobar-1.0.0.tgz'),
persist: false,
});

// https://www.npmjs.com/package/@cnpmcore/test-sync-package-has-two-versions
const name = '@cnpmcore/test-sync-package-cork';
await packageSyncerService.createTask(name);
const task = await packageSyncerService.findExecuteTask();
assert(task);
assert.equal(task.targetName, name);


await packageSyncerService.executeTaskWithCorker(task);
const stream = await packageSyncerService.findTaskLog(task);
assert(stream);

const finishedTask = await taskService.findTask(task.taskId) as Task;

const changes = await changeRepository.query(0, 100);
const [ firstChange ] = changes;
const firstChangeDate = new Date(firstChange.createdAt);
const taskFinishedDate = new Date(finishedTask!.updatedAt);

// 任务结束后一起触发
assert(firstChangeDate.getTime() - taskFinishedDate.getTime() > 0);

});

it.skip('should sync missing versions in database', async () => {
// https://www.npmjs.com/package/@cnpmcore/test-sync-package-has-two-versions
const name = '@cnpmcore/test-sync-package-has-two-versions';
Expand Down

0 comments on commit 9bfd8ff

Please sign in to comment.