Skip to content

Commit

Permalink
Event cork (#361)
Browse files Browse the repository at this point in the history
> syncPackage 同步时,由于任务并发,可能会导致同步过程中 versions 表记录已经创建,pkg.manifests 还没有同步
> 针对这种场景做补偿逻辑,防止 tag 打在一个 pkg.manifests 没有的版本里

* 修改 pkg.manifests 补偿逻辑,兼容有 versions 没 pkg.manifests 的情况
* 添加 eventCork 的 advice,在 syncPackage 任务结束后,再统一触发 changes,依赖
[ref](eggjs/tegg#60)

在同步和被同步的场景,确保 changes 发出时,pkg.manifests 已经更新
统一 ctx 内不同 changes 时序可能影响,不影响重新读取 manifests 一致性
  • Loading branch information
elrrrrrrr authored Nov 28, 2022
1 parent c1eb097 commit d55c680
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 32 deletions.
16 changes: 16 additions & 0 deletions app/core/service/EventCorkerAdvice.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { ContextEventBus, Inject } from '@eggjs/tegg';
import { Advice, IAdvice } from '@eggjs/tegg/aop';

@Advice()
export class EventCorkAdvice implements IAdvice {
@Inject()
private eventBus: ContextEventBus;

async beforeCall() {
this.eventBus.cork();
}

async afterFinally() {
this.eventBus.uncork();
}
}
38 changes: 12 additions & 26 deletions app/core/service/PackageSyncerService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
ContextProto,
Inject,
} from '@eggjs/tegg';
import { Pointcut } from '@eggjs/tegg/aop';
import {
EggContextHttpClient,
} from 'egg';
Expand All @@ -18,7 +19,6 @@ import { TaskRepository } from '../../repository/TaskRepository';
import { PackageRepository } from '../../repository/PackageRepository';
import { PackageVersionDownloadRepository } from '../../repository/PackageVersionDownloadRepository';
import { UserRepository } from '../../repository/UserRepository';
import { DistRepository } from '../../repository/DistRepository';
import { Task, SyncPackageTaskOptions, CreateSyncPackageTask } from '../entity/Task';
import { Package } from '../entity/Package';
import { UserService } from './UserService';
Expand All @@ -30,6 +30,7 @@ import { RegistryManagerService } from './RegistryManagerService';
import { Registry } from '../entity/Registry';
import { BadRequestError } from 'egg-errors';
import { ScopeManagerService } from './ScopeManagerService';
import { EventCorkAdvice } from './EventCorkerAdvice';

function isoNow() {
return new Date().toISOString();
Expand Down Expand Up @@ -63,8 +64,6 @@ export class PackageSyncerService extends AbstractService {
@Inject()
private readonly httpclient: EggContextHttpClient;
@Inject()
private readonly distRepository: DistRepository;
@Inject()
private readonly registryManagerService: RegistryManagerService;
@Inject()
private readonly scopeManagerService: ScopeManagerService;
Expand Down Expand Up @@ -245,6 +244,12 @@ export class PackageSyncerService extends AbstractService {
return registry;
}

// 由于 cnpmcore 将 version 和 tag 作为两个独立的 changes 事件分发
// 普通版本发布时,短时间内会有两条相同 task 进行同步
// 尽量保证读取和写入都需保证任务幂等,需要确保 changes 在同步任务完成后再触发
// 通过 DB 唯一索引来保证任务幂等,插入失败不影响 pkg.manifests 更新
// 通过 eventBus.cork/uncork 来暂缓事件触发
@Pointcut(EventCorkAdvice)
public async executeTask(task: Task) {
const fullname = task.targetName;
const [ scope, name ] = getScopeAndName(fullname);
Expand Down Expand Up @@ -470,17 +475,6 @@ export class PackageSyncerService extends AbstractService {
updateVersions.push(version);
logs.push(`[${isoNow()}] 🐛 Remote version ${version} not exists on local abbreviated manifests, need to refresh`);
}
} else {
// try to read from db detect if last sync interrupt before refreshPackageManifestsToDists() be called
existsItem = await this.distRepository.findPackageVersionManifest(pkg.packageId, version);
// only allow existsItem on db to force refresh, to avoid big versions fresh
// see https://r.cnpmjs.org/-/package/@npm-torg/public-scoped-free-org-test-package-2/syncs/61fcc7e8c1646e26a845b674/log
if (existsItem) {
// version not exists on manifests, need to refresh
// bugfix: https://github.com/cnpm/cnpmcore/issues/115
updateVersions.push(version);
logs.push(`[${isoNow()}] 🐛 Remote version ${version} not exists on local manifests, need to refresh`);
}
}

if (existsItem && forceSyncHistory === true) {
Expand Down Expand Up @@ -568,17 +562,6 @@ export class PackageSyncerService extends AbstractService {
if (!pkg) {
pkg = await this.packageRepository.findPackage(scope, name);
}
if (pkg) {
// check again, make sure prefix version not exists
const existsPkgVersion = await this.packageRepository.findPackageVersion(pkg.packageId, version);
if (existsPkgVersion) {
await rm(localFile, { force: true });
logs.push(`[${isoNow()}] 🐛 [${syncIndex}] Synced version ${version} already exists, skip publish it`);
await this.taskService.appendTaskLog(task, logs.join('\n'));
logs = [];
continue;
}
}

const publishCmd = {
scope,
Expand All @@ -596,12 +579,15 @@ export class PackageSyncerService extends AbstractService {
skipRefreshPackageManifests: true,
};
try {
// 当 version 记录已经存在时,还需要校验一下 pkg.manifests 是否存在
const pkgVersion = await this.packageManagerService.publish(publishCmd, users[0]);
updateVersions.push(pkgVersion.version);
logs.push(`[${isoNow()}] 🟢 [${syncIndex}] Synced version ${version} success, packageVersionId: ${pkgVersion.packageVersionId}, db id: ${pkgVersion.id}`);
} catch (err: any) {
if (err.name === 'ForbiddenError') {
logs.push(`[${isoNow()}] 🐛 [${syncIndex}] Synced version ${version} already exists, skip publish error`);
logs.push(`[${isoNow()}] 🐛 [${syncIndex}] Synced version ${version} already exists, skip publish, try to set in local manifest`);
// 如果 pkg.manifests 不存在,需要补充一下
updateVersions.push(version);
} else {
err.taskId = task.taskId;
this.logger.error(err);
Expand Down
2 changes: 1 addition & 1 deletion test/core/service/PackageSyncerService/createTask.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ describe('test/core/service/PackageSyncerService/createTask.test.ts', () => {
});

it('should create task when processing', async () => {
mock(PackageSyncerService.prototype, 'executeTask', async (task: Task) => {
mock(packageSyncerService, 'executeTask', async (task: Task) => {
task.state = TaskState.Processing;
await taskRepository.saveTask(task);
await setTimeout(2);
Expand Down
Loading

0 comments on commit d55c680

Please sign in to comment.