From dca45527946ebcc06fcc004454c0217beb7261da Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Wed, 29 Mar 2023 22:56:22 +0800 Subject: [PATCH 1/3] fix: timeout handler not work --- app/core/service/TaskService.ts | 48 +++++++++++++-------- app/repository/util/ModelConvertor.ts | 6 ++- test/schedule/TaskTimeoutHandler.test.ts | 53 +++++++++++++++++++++++- 3 files changed, 86 insertions(+), 21 deletions(-) diff --git a/app/core/service/TaskService.ts b/app/core/service/TaskService.ts index 94f7152f..4a431967 100644 --- a/app/core/service/TaskService.ts +++ b/app/core/service/TaskService.ts @@ -106,30 +106,42 @@ export class TaskService extends AbstractService { // try processing timeout tasks in 10 mins const tasks = await this.taskRepository.findTimeoutTasks(TaskState.Processing, 60000 * 10); for (const task of tasks) { - // ignore ChangesStream task, it won't timeout - if (task.attempts >= 3 && task.type !== TaskType.ChangesStream) { - await this.finishTask(task, TaskState.Timeout); - this.logger.warn( - '[TaskService.retryExecuteTimeoutTasks:timeout] taskType: %s, targetName: %s, taskId: %s, attempts %s set to fail', + try { + // ignore ChangesStream task, it won't timeout + if (task.attempts >= 3 && task.type !== TaskType.ChangesStream) { + await this.finishTask(task, TaskState.Timeout); + this.logger.warn( + '[TaskService.retryExecuteTimeoutTasks:timeout] taskType: %s, targetName: %s, taskId: %s, attempts %s set to fail', + task.type, task.targetName, task.taskId, task.attempts); + continue; + } + if (task.attempts >= 1) { + // reset logPath + task.resetLogPath(); + } + await this.retryTask(task); + this.logger.info( + '[TaskService.retryExecuteTimeoutTasks:retry] taskType: %s, targetName: %s, taskId: %s, attempts %s will retry again', + task.type, task.targetName, task.taskId, task.attempts); + } catch (e) { + this.logger.error( + '[TaskService.retryExecuteTimeoutTasks:error] processing task, taskType: %s, targetName: %s, taskId: %s, attempts %s will retry again', task.type, task.targetName, task.taskId, task.attempts); - continue; - } - if (task.attempts >= 1) { - // reset logPath - task.resetLogPath(); } - await this.retryTask(task); - this.logger.info( - '[TaskService.retryExecuteTimeoutTasks:retry] taskType: %s, targetName: %s, taskId: %s, attempts %s will retry again', - task.type, task.targetName, task.taskId, task.attempts); } // try waiting timeout tasks in 30 mins const waitingTasks = await this.taskRepository.findTimeoutTasks(TaskState.Waiting, 60000 * 30); for (const task of waitingTasks) { - await this.retryTask(task); - this.logger.warn( - '[TaskService.retryExecuteTimeoutTasks:retryWaiting] taskType: %s, targetName: %s, taskId: %s waiting too long', - task.type, task.targetName, task.taskId); + try { + await this.retryTask(task); + this.logger.warn( + '[TaskService.retryExecuteTimeoutTasks:retryWaiting] taskType: %s, targetName: %s, taskId: %s waiting too long', + task.type, task.targetName, task.taskId); + } catch (e) { + this.logger.error( + '[TaskService.retryExecuteTimeoutTasks:error] waiting task, taskType: %s, targetName: %s, taskId: %s, attempts %s will retry again', + task.type, task.targetName, task.taskId, task.attempts); + } } return { processing: tasks.length, diff --git a/app/repository/util/ModelConvertor.ts b/app/repository/util/ModelConvertor.ts index 57835861..fd14c85c 100644 --- a/app/repository/util/ModelConvertor.ts +++ b/app/repository/util/ModelConvertor.ts @@ -61,12 +61,14 @@ export class ModelConvertor { const modelPropertyName = attributeMeta.propertyName; const entityPropertyName = ModelConvertorUtil.getEntityPropertyName(ModelClazz, modelPropertyName); if (entityPropertyName === CREATED_AT) continue; + // Restricted updates to the primary key + if (entityPropertyName === ID && model[ID]) continue; const attributeValue = _.get(entity, entityPropertyName); model[modelPropertyName] = attributeValue; } - // 不允许设置 UPDATED_AT - // 通过 leoric 进行更新 + // Restricted updates to the UPDATED_AT + // Leoric will set by default model[UPDATED_AT] = undefined; await model.save(options); entity[UPDATED_AT] = model[UPDATED_AT]; diff --git a/test/schedule/TaskTimeoutHandler.test.ts b/test/schedule/TaskTimeoutHandler.test.ts index 9d15dd39..97c8c4b6 100644 --- a/test/schedule/TaskTimeoutHandler.test.ts +++ b/test/schedule/TaskTimeoutHandler.test.ts @@ -1,4 +1,10 @@ -import { app } from 'egg-mock/bootstrap'; +import { app, mock } from 'egg-mock/bootstrap'; +import { TaskState } from 'app/common/enum/Task'; +import { PackageSyncerService } from 'app/core/service/PackageSyncerService'; +import { HistoryTask } from '../../app/repository/model/HistoryTask'; +import { ModelConvertor } from '../../app/repository/util/ModelConvertor'; +import { Task as TaskModel } from '../../app/repository/model/Task'; +import { TaskService } from '../../app/core/service/TaskService'; const TaskTimeoutHandlerPath = require.resolve('../../app/port/schedule/TaskTimeoutHandler'); @@ -10,4 +16,49 @@ describe('test/schedule/TaskTimeoutHandler.test.ts', () => { // again should work await app.runSchedule(TaskTimeoutHandlerPath); }); + + it('should skip task when retry error', async () => { + + const packageSyncerService = await app.getEggObject(PackageSyncerService); + + const apple = await packageSyncerService.createTask('apple'); + const banana = await packageSyncerService.createTask('banana'); + + // mock timeout 10mins + await TaskModel.update({ id: apple.id }, { + updatedAt: new Date(apple.updatedAt.getTime() - 60000 * 30 - 1), + state: TaskState.Processing, + }); + await TaskModel.update({ id: banana.id }, { + updatedAt: new Date(banana.updatedAt.getTime() - 60000 * 40), + }); + + app.mockLog(); + mock(TaskService.prototype, 'retryTask', async () => { + throw new Error('aba aba'); + }); + await app.runSchedule(TaskTimeoutHandlerPath); + app.expectLog('[TaskService.retryExecuteTimeoutTasks:error] processing task'); + app.expectLog('[TaskService.retryExecuteTimeoutTasks:error] waiting task'); + }); + + it('should modify history task', async () => { + + const packageSyncerService = await app.getEggObject(PackageSyncerService); + + await packageSyncerService.createTask('boo'); + const task = await packageSyncerService.createTask('foo'); + + // mock task has been finished success + await ModelConvertor.convertEntityToModel({ ...task, state: TaskState.Success, id: 9527 }, HistoryTask); + + // mock timeout 10mins + await TaskModel.update({ id: task.id }, { + updatedAt: new Date(task.updatedAt.getTime() - 60000 * 30 - 1), + }); + + app.mockLog(); + await app.runSchedule(TaskTimeoutHandlerPath); + app.expectLog('[TaskTimeoutHandler:subscribe] retry execute timeout tasks'); + }); }); From 44cabd5c5becbe91a6bf445c1db023fe0dc1e797 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Thu, 30 Mar 2023 09:48:05 +0800 Subject: [PATCH 2/3] chore: test ci --- test/schedule/TaskTimeoutHandler.test.ts | 53 +----------------------- 1 file changed, 1 insertion(+), 52 deletions(-) diff --git a/test/schedule/TaskTimeoutHandler.test.ts b/test/schedule/TaskTimeoutHandler.test.ts index 97c8c4b6..9d15dd39 100644 --- a/test/schedule/TaskTimeoutHandler.test.ts +++ b/test/schedule/TaskTimeoutHandler.test.ts @@ -1,10 +1,4 @@ -import { app, mock } from 'egg-mock/bootstrap'; -import { TaskState } from 'app/common/enum/Task'; -import { PackageSyncerService } from 'app/core/service/PackageSyncerService'; -import { HistoryTask } from '../../app/repository/model/HistoryTask'; -import { ModelConvertor } from '../../app/repository/util/ModelConvertor'; -import { Task as TaskModel } from '../../app/repository/model/Task'; -import { TaskService } from '../../app/core/service/TaskService'; +import { app } from 'egg-mock/bootstrap'; const TaskTimeoutHandlerPath = require.resolve('../../app/port/schedule/TaskTimeoutHandler'); @@ -16,49 +10,4 @@ describe('test/schedule/TaskTimeoutHandler.test.ts', () => { // again should work await app.runSchedule(TaskTimeoutHandlerPath); }); - - it('should skip task when retry error', async () => { - - const packageSyncerService = await app.getEggObject(PackageSyncerService); - - const apple = await packageSyncerService.createTask('apple'); - const banana = await packageSyncerService.createTask('banana'); - - // mock timeout 10mins - await TaskModel.update({ id: apple.id }, { - updatedAt: new Date(apple.updatedAt.getTime() - 60000 * 30 - 1), - state: TaskState.Processing, - }); - await TaskModel.update({ id: banana.id }, { - updatedAt: new Date(banana.updatedAt.getTime() - 60000 * 40), - }); - - app.mockLog(); - mock(TaskService.prototype, 'retryTask', async () => { - throw new Error('aba aba'); - }); - await app.runSchedule(TaskTimeoutHandlerPath); - app.expectLog('[TaskService.retryExecuteTimeoutTasks:error] processing task'); - app.expectLog('[TaskService.retryExecuteTimeoutTasks:error] waiting task'); - }); - - it('should modify history task', async () => { - - const packageSyncerService = await app.getEggObject(PackageSyncerService); - - await packageSyncerService.createTask('boo'); - const task = await packageSyncerService.createTask('foo'); - - // mock task has been finished success - await ModelConvertor.convertEntityToModel({ ...task, state: TaskState.Success, id: 9527 }, HistoryTask); - - // mock timeout 10mins - await TaskModel.update({ id: task.id }, { - updatedAt: new Date(task.updatedAt.getTime() - 60000 * 30 - 1), - }); - - app.mockLog(); - await app.runSchedule(TaskTimeoutHandlerPath); - app.expectLog('[TaskTimeoutHandler:subscribe] retry execute timeout tasks'); - }); }); From f71b3203e22aa704455e75535badf467f237ae81 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Thu, 30 Mar 2023 10:07:20 +0800 Subject: [PATCH 3/3] chore: restore cci --- test/schedule/TaskTimeoutHandler.test.ts | 53 +++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/test/schedule/TaskTimeoutHandler.test.ts b/test/schedule/TaskTimeoutHandler.test.ts index 9d15dd39..97c8c4b6 100644 --- a/test/schedule/TaskTimeoutHandler.test.ts +++ b/test/schedule/TaskTimeoutHandler.test.ts @@ -1,4 +1,10 @@ -import { app } from 'egg-mock/bootstrap'; +import { app, mock } from 'egg-mock/bootstrap'; +import { TaskState } from 'app/common/enum/Task'; +import { PackageSyncerService } from 'app/core/service/PackageSyncerService'; +import { HistoryTask } from '../../app/repository/model/HistoryTask'; +import { ModelConvertor } from '../../app/repository/util/ModelConvertor'; +import { Task as TaskModel } from '../../app/repository/model/Task'; +import { TaskService } from '../../app/core/service/TaskService'; const TaskTimeoutHandlerPath = require.resolve('../../app/port/schedule/TaskTimeoutHandler'); @@ -10,4 +16,49 @@ describe('test/schedule/TaskTimeoutHandler.test.ts', () => { // again should work await app.runSchedule(TaskTimeoutHandlerPath); }); + + it('should skip task when retry error', async () => { + + const packageSyncerService = await app.getEggObject(PackageSyncerService); + + const apple = await packageSyncerService.createTask('apple'); + const banana = await packageSyncerService.createTask('banana'); + + // mock timeout 10mins + await TaskModel.update({ id: apple.id }, { + updatedAt: new Date(apple.updatedAt.getTime() - 60000 * 30 - 1), + state: TaskState.Processing, + }); + await TaskModel.update({ id: banana.id }, { + updatedAt: new Date(banana.updatedAt.getTime() - 60000 * 40), + }); + + app.mockLog(); + mock(TaskService.prototype, 'retryTask', async () => { + throw new Error('aba aba'); + }); + await app.runSchedule(TaskTimeoutHandlerPath); + app.expectLog('[TaskService.retryExecuteTimeoutTasks:error] processing task'); + app.expectLog('[TaskService.retryExecuteTimeoutTasks:error] waiting task'); + }); + + it('should modify history task', async () => { + + const packageSyncerService = await app.getEggObject(PackageSyncerService); + + await packageSyncerService.createTask('boo'); + const task = await packageSyncerService.createTask('foo'); + + // mock task has been finished success + await ModelConvertor.convertEntityToModel({ ...task, state: TaskState.Success, id: 9527 }, HistoryTask); + + // mock timeout 10mins + await TaskModel.update({ id: task.id }, { + updatedAt: new Date(task.updatedAt.getTime() - 60000 * 30 - 1), + }); + + app.mockLog(); + await app.runSchedule(TaskTimeoutHandlerPath); + app.expectLog('[TaskTimeoutHandler:subscribe] retry execute timeout tasks'); + }); });