diff --git a/app/core/service/EventCorkerAdvice.ts b/app/core/service/EventCorkerAdvice.ts index 60813ab8..e2912567 100644 --- a/app/core/service/EventCorkerAdvice.ts +++ b/app/core/service/EventCorkerAdvice.ts @@ -1,19 +1,16 @@ -import { EventBus, Inject } from '@eggjs/tegg'; +import { ContextEventBus, Inject } from '@eggjs/tegg'; import { Advice, IAdvice } from '@eggjs/tegg/aop'; @Advice() export class EventCorkerAdvice implements IAdvice { @Inject() - private eventBus: EventBus; - // 依赖 https://github.com/eggjs/tegg/pull/60 合并后支持 + private eventBus: ContextEventBus; async beforeCall() { - this.eventBus; - // this.eventBus.cork(); + this.eventBus.cork(); } async afterFinally() { - this.eventBus; - // this.eventBus.uncork(); + this.eventBus.uncork(); } } diff --git a/app/core/service/PackageSyncerService.ts b/app/core/service/PackageSyncerService.ts index 06366a6d..3054238a 100644 --- a/app/core/service/PackageSyncerService.ts +++ b/app/core/service/PackageSyncerService.ts @@ -244,12 +244,16 @@ export class PackageSyncerService extends AbstractService { return registry; } + @Pointcut(EventCorkerAdvice) + public async executeTaskWithCorker(task: Task): Promise { + await this.executeTask(task); + } + // 由于 cnpmcore 将 version 和 tag 作为两个独立的 changes 事件分发 // 普通版本发布时,短时间内会有两条相同 task 进行同步 // 尽量保证读取和写入都需保证任务幂等,需要确保 changes 在同步任务完成后再触发 // 通过 DB 唯一索引来保证任务幂等,插入失败不影响 pkg.manifests 更新 // 通过 eventBus.cork/uncork 来暂缓事件触发 - @Pointcut(EventCorkerAdvice) public async executeTask(task: Task) { const fullname = task.targetName; const [ scope, name ] = getScopeAndName(fullname); @@ -563,21 +567,6 @@ export class PackageSyncerService extends AbstractService { pkg = await this.packageRepository.findPackage(scope, name); } - // pkg.manifests 和 version.manifests 是异步的 - // 需要确保外围能感知到 pkg.manifests 上的变更 - // FIXME 验证完成后可删除 - // if (pkg) { - // // check again, make sure prefix version not exists - // const existsPkgVersion = await this.packageRepository.findPackageVersion(pkg.packageId, version); - // if (existsPkgVersion) { - // await rm(localFile, { force: true }); - // logs.push(`[${isoNow()}] 🐛 [${syncIndex}] Synced version ${version} already exists, skip publish it`); - // await this.taskService.appendTaskLog(task, logs.join('\n')); - // logs = []; - // continue; - // } - // } - const publishCmd = { scope, name, diff --git a/app/port/schedule/SyncPackageWorker.ts b/app/port/schedule/SyncPackageWorker.ts index 3bd93a8c..c18ead10 100644 --- a/app/port/schedule/SyncPackageWorker.ts +++ b/app/port/schedule/SyncPackageWorker.ts @@ -34,7 +34,7 @@ export class SyncPackageWorker { this.logger.info('[SyncPackageWorker:subscribe:executeTask:start][%s] taskId: %s, targetName: %s, attempts: %s, params: %j, updatedAt: %s, delay %sms', executingCount, task.taskId, task.targetName, task.attempts, task.data, task.updatedAt, startTime - task.updatedAt.getTime()); - await this.packageSyncerService.executeTask(task); + await this.packageSyncerService.executeTaskWithCorker(task); const use = Date.now() - startTime; this.logger.info('[SyncPackageWorker:subscribe:executeTask:success][%s] taskId: %s, targetName: %s, use %sms', executingCount, task.taskId, task.targetName, use); diff --git a/test/core/service/PackageSyncerService/executeTask.test.ts b/test/core/service/PackageSyncerService/executeTask.test.ts index 15a971f0..c5d68705 100644 --- a/test/core/service/PackageSyncerService/executeTask.test.ts +++ b/test/core/service/PackageSyncerService/executeTask.test.ts @@ -5,7 +5,7 @@ import { PackageSyncerService } from 'app/core/service/PackageSyncerService'; import { PackageManagerService } from 'app/core/service/PackageManagerService'; import { Package as PackageModel } from 'app/repository/model/Package'; import { Task as TaskModel } from 'app/repository/model/Task'; -import { Task as TaskEntity } from 'app/core/entity/Task'; +import { Task, Task as TaskEntity } from 'app/core/entity/Task'; import { HistoryTask as HistoryTaskModel } from 'app/repository/model/HistoryTask'; import { TestUtil } from 'test/TestUtil'; import { NPMRegistry } from 'app/common/adapter/NPMRegistry'; @@ -18,6 +18,7 @@ import { RegistryType } from 'app/common/enum/Registry'; import { TaskService } from 'app/core/service/TaskService'; import { ScopeManagerService } from 'app/core/service/ScopeManagerService'; import { UserService } from 'app/core/service/UserService'; +import { ChangeRepository } from 'app/repository/ChangeRepository'; describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => { let ctx: Context; @@ -29,6 +30,7 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => { let taskService: TaskService; let scopeManagerService: ScopeManagerService; let userService: UserService; + let changeRepository: ChangeRepository; beforeEach(async () => { ctx = await app.mockModuleContext(); @@ -40,6 +42,7 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => { registryManagerService = await ctx.getEggObject(RegistryManagerService); scopeManagerService = await ctx.getEggObject(ScopeManagerService); userService = await ctx.getEggObject(UserService); + changeRepository = await ctx.getEggObject(ChangeRepository); }); afterEach(async () => { @@ -392,7 +395,6 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => { persist: false, }); name = 'cnpmcore-test-sync-dependencies'; - // don't add cnpmcore-test-sync-deprecated task if cnpmcore-test-sync-deprecated already exists const task = await packageSyncerService.createTask(name); assert(task); assert.equal(task.targetName, name); @@ -601,7 +603,7 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => { // https://www.npmjs.com/package/@cnpmcore/test-sync-package-has-two-versions const name = '@cnpmcore/test-sync-package-has-two-versions'; await packageSyncerService.createTask(name); - let task = await packageSyncerService.findExecuteTask(); + const task = await packageSyncerService.findExecuteTask(); assert(task); assert.equal(task.targetName, name); @@ -650,9 +652,9 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => { await packageSyncerService.executeTask(task); - let stream = await packageSyncerService.findTaskLog(task); + const stream = await packageSyncerService.findTaskLog(task); assert(stream); - let log = await TestUtil.readStreamToLog(stream); + const log = await TestUtil.readStreamToLog(stream); // console.log(log); assert(log.includes('Synced version 2.0.0 already exists, skip publish, try to set in local manifest')); assert(log.includes('] 🚧 Syncing versions 1 => 2')); @@ -662,6 +664,46 @@ describe('test/core/service/PackageSyncerService/executeTask.test.ts', () => { }); + it('event cork should work', async () => { + app.mockHttpclient('https://registry.npmjs.org/%40cnpmcore%2Ftest-sync-package-cork', 'GET', { + data: '{"_id":"@cnpmcore/test-sync-package-cork","_rev":"4-541287ae0a14039fea89ac08fa5ec53d","name":"@cnpmcore/test-sync-package-cork","dist-tags":{"latest":"2.0.0","next":"2.0.0"},"versions":{"1.0.0":{"name":"@cnpmcore/test-sync-package-cork","version":"1.0.0","description":"cnpmcore local test package","main":"index.js","scripts":{"test":"echo \\"hello\\""},"author":"","license":"MIT","gitHead":"60cfb1cf401f87a60a1b0dfd7ee739f98ffd7847","_id":"@cnpmcore/test-sync-package-cork@1.0.0","_nodeVersion":"16.13.1","_npmVersion":"8.1.2","dist":{"integrity":"sha512-WR0T96H8t7ss1FK8GWPPblx+usbjU4bNGRjMHS9t/oVA5DgJDxitydPSFPeIUtXciyekI7R47do9Lc3GgC4P5A==","shasum":"2ddc6ee93b92be6d64139fb1a631d2610f43e946","tarball":"https://registry.npmjs.org/@cnpmcore/test-sync-package-cork/-/test-sync-package-cork-1.0.0.tgz","fileCount":2,"unpackedSize":238,"signatures":[{"keyid":"SHA256:jl3bwswu80PjjokCgh0o2w5c2U4LhQAE57gj9cz1kzA","sig":"MEYCIQDj5Ui2GU8nVmHFk0hCt/i3gPW9eQdOCZgKzpAlkvERwQIhAPZ0NCefLoEfOpnbdKAUr7Ng9Sy6FMnTsDxDaM2dQHNw"}]},"_npmUser":{"name":"fengmk2","email":"fengmk2@gmail.com"},"directories":{},"maintainers":[{"name":"fengmk2","email":"fengmk2@gmail.com"}],"_npmOperationalInternal":{"host":"s3://npm-registry-packages","tmp":"tmp/test-sync-package-cork_1.0.0_1639442699824_0.6948988437963031"},"_hasShrinkwrap":false},"2.0.0":{"name":"@cnpmcore/test-sync-package-cork","version":"2.0.0","description":"cnpmcore local test package","main":"index.js","scripts":{"test":"echo \\"hello\\""},"author":"","license":"MIT","gitHead":"60cfb1cf401f87a60a1b0dfd7ee739f98ffd7847","_id":"@cnpmcore/test-sync-package-cork@2.0.0","_nodeVersion":"16.13.1","_npmVersion":"8.1.2","dist":{"integrity":"sha512-qgHLQzXq+VN7q0JWibeBYrqb3Iajl4lpVuxlQstclRz4ejujfDFswBGSXmCv9FyIIdmSAe5bZo0oHQLsod3pAA==","shasum":"891eb8e08ceadbd86e75b6d66f31f7e5a28a8d68","tarball":"https://registry.npmjs.org/@cnpmcore/test-sync-package-cork/-/test-sync-package-cork-2.0.0.tgz","fileCount":2,"unpackedSize":238,"signatures":[{"keyid":"SHA256:jl3bwswu80PjjokCgh0o2w5c2U4LhQAE57gj9cz1kzA","sig":"MEQCIAWVz7mIHF23Gq4a+Swsj2ZSdn87991HcE1+fQm8shNCAiByOIuhaZAbo9hct24qYf7FWqx6Lyluo+Rpnrn91//Ibg=="}]},"_npmUser":{"name":"fengmk2","email":"fengmk2@gmail.com"},"directories":{},"maintainers":[{"name":"fengmk2","email":"fengmk2@gmail.com"}],"_npmOperationalInternal":{"host":"s3://npm-registry-packages","tmp":"tmp/test-sync-package-cork_2.0.0_1639442732240_0.33204392278137207"},"_hasShrinkwrap":false}},"time":{"created":"2021-12-14T00:44:59.775Z","1.0.0":"2021-12-14T00:44:59.940Z","modified":"2022-05-23T02:33:52.613Z","2.0.0":"2021-12-14T00:45:32.457Z"},"maintainers":[{"email":"killa07071201@gmail.com","name":"killagu"},{"email":"fengmk2@gmail.com","name":"fengmk2"}],"description":"cnpmcore local test package","license":"MIT","readme":"ERROR: No README data found!","readmeFilename":""}', + persist: false, + repeats: 2, + }); + app.mockHttpclient('https://registry.npmjs.org/@cnpmcore/test-sync-package-cork/-/test-sync-package-cork-1.0.0.tgz', 'GET', { + data: await TestUtil.readFixturesFile('registry.npmjs.org/foobar/-/foobar-1.0.0.tgz'), + persist: false, + }); + + app.mockHttpclient('https://registry.npmjs.org/@cnpmcore/test-sync-package-cork/-/test-sync-package-cork-2.0.0.tgz', 'GET', { + data: await TestUtil.readFixturesFile('registry.npmjs.org/foobar/-/foobar-1.0.0.tgz'), + persist: false, + }); + + // https://www.npmjs.com/package/@cnpmcore/test-sync-package-has-two-versions + const name = '@cnpmcore/test-sync-package-cork'; + await packageSyncerService.createTask(name); + const task = await packageSyncerService.findExecuteTask(); + assert(task); + assert.equal(task.targetName, name); + + + await packageSyncerService.executeTaskWithCorker(task); + const stream = await packageSyncerService.findTaskLog(task); + assert(stream); + + const finishedTask = await taskService.findTask(task.taskId) as Task; + + const changes = await changeRepository.query(0, 100); + const [ firstChange ] = changes; + const firstChangeDate = new Date(firstChange.createdAt); + const taskFinishedDate = new Date(finishedTask!.updatedAt); + + // 任务结束后一起触发 + assert(firstChangeDate.getTime() - taskFinishedDate.getTime() > 0); + + }); + it.skip('should sync missing versions in database', async () => { // https://www.npmjs.com/package/@cnpmcore/test-sync-package-has-two-versions const name = '@cnpmcore/test-sync-package-has-two-versions';