From 2eae81ad50fbd64f60f05635de55345c03936fd9 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Fri, 11 Nov 2022 17:01:10 +0800 Subject: [PATCH] fix: create task when waiting --- app/core/service/TaskService.ts | 21 ++++++++----- .../PackageSyncerService/createTask.test.ts | 30 +++++++++++++++++++ 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/app/core/service/TaskService.ts b/app/core/service/TaskService.ts index bcec88e32..d066c1983 100644 --- a/app/core/service/TaskService.ts +++ b/app/core/service/TaskService.ts @@ -28,16 +28,21 @@ export class TaskService extends AbstractService { public async createTask(task: Task, addTaskQueueOnExists: boolean) { const existsTask = await this.taskRepository.findTaskByTargetName(task.targetName, task.type); if (existsTask) { - if (addTaskQueueOnExists && existsTask.state === TaskState.Waiting) { - const queueLength = await this.getTaskQueueLength(task.type); - if (queueLength < this.config.cnpmcore.taskQueueHighWaterSize) { - // make sure waiting task in queue - await this.queueAdapter.push(task.type, existsTask.taskId); - this.logger.info('[TaskService.createTask:exists-to-queue] taskType: %s, targetName: %s, taskId: %s, queue size: %s', - task.type, task.targetName, task.taskId, queueLength); + // 如果任务还未被触发,就不继续重复创建 + // 如果任务正在执行,可能任务状态已更新,这种情况需要继续创建 + if (existsTask.state === TaskState.Waiting) { + // 提高任务的优先级 + if (addTaskQueueOnExists) { + const queueLength = await this.getTaskQueueLength(task.type); + if (queueLength < this.config.cnpmcore.taskQueueHighWaterSize) { + // make sure waiting task in queue + await this.queueAdapter.push(task.type, existsTask.taskId); + this.logger.info('[TaskService.createTask:exists-to-queue] taskType: %s, targetName: %s, taskId: %s, queue size: %s', + task.type, task.targetName, task.taskId, queueLength); + } } + return existsTask; } - return existsTask; } await this.taskRepository.saveTask(task); await this.queueAdapter.push(task.type, task.taskId); diff --git a/test/core/service/PackageSyncerService/createTask.test.ts b/test/core/service/PackageSyncerService/createTask.test.ts index dd8e65a31..9d587c781 100644 --- a/test/core/service/PackageSyncerService/createTask.test.ts +++ b/test/core/service/PackageSyncerService/createTask.test.ts @@ -1,18 +1,27 @@ import assert = require('assert'); import { app, mock } from 'egg-mock/bootstrap'; import { Context } from 'egg'; +import { setTimeout } from 'timers/promises'; import { PackageSyncerService } from '../../../../app/core/service/PackageSyncerService'; import { TestUtil } from '../../../TestUtil'; +import { Task } from 'app/core/entity/Task'; +import { TaskState } from '../../../../app/common/enum/Task'; +import { TaskRepository } from '../../../../app/repository/TaskRepository'; +import { TaskService } from '../../../../app/core/service/TaskService'; describe('test/core/service/PackageSyncerService/createTask.test.ts', () => { let ctx: Context; const pkgName = '@cnpmcore/foo'; const username = 'mock_username'; let packageSyncerService: PackageSyncerService; + let taskRepository: TaskRepository; + let taskService: TaskService; beforeEach(async () => { ctx = await app.mockModuleContext(); packageSyncerService = await ctx.getEggObject(PackageSyncerService); + taskRepository = await ctx.getEggObject(TaskRepository); + taskService = await ctx.getEggObject(TaskService); await TestUtil.createPackage({ name: pkgName, @@ -56,4 +65,25 @@ describe('test/core/service/PackageSyncerService/createTask.test.ts', () => { }); assert(task); }); + + it('should create task when processing', async () => { + mock(PackageSyncerService.prototype, 'executeTask', async (task: Task) => { + task.state = TaskState.Processing; + await taskRepository.saveTask(task); + await setTimeout(2); + await taskService.finishTask(task, TaskState.Success); + }); + const task = await packageSyncerService.createTask(pkgName); + const res = await Promise.all([ packageSyncerService.executeTask(task), (async () => { + await setTimeout(1); + return await packageSyncerService.createTask(pkgName); + })() ]); + assert(res[1].taskId !== task.taskId); + }); + + it('should not duplicate task when waiting', async () => { + const task = await packageSyncerService.createTask(pkgName); + const newTask = await packageSyncerService.createTask(pkgName); + assert(newTask.taskId === task.taskId); + }); });