Skip to content

Commit

Permalink
fix: timeout handler not work
Browse files Browse the repository at this point in the history
  • Loading branch information
elrrrrrrr committed Mar 29, 2023
1 parent 8c6ce1b commit ab59abe
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 21 deletions.
48 changes: 30 additions & 18 deletions app/core/service/TaskService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,30 +106,42 @@ export class TaskService extends AbstractService {
// try processing timeout tasks in 10 mins
const tasks = await this.taskRepository.findTimeoutTasks(TaskState.Processing, 60000 * 10);
for (const task of tasks) {
// ignore ChangesStream task, it won't timeout
if (task.attempts >= 3 && task.type !== TaskType.ChangesStream) {
await this.finishTask(task, TaskState.Timeout);
this.logger.warn(
'[TaskService.retryExecuteTimeoutTasks:timeout] taskType: %s, targetName: %s, taskId: %s, attempts %s set to fail',
try {
// ignore ChangesStream task, it won't timeout
if (task.attempts >= 3 && task.type !== TaskType.ChangesStream) {
await this.finishTask(task, TaskState.Timeout);
this.logger.warn(
'[TaskService.retryExecuteTimeoutTasks:timeout] taskType: %s, targetName: %s, taskId: %s, attempts %s set to fail',
task.type, task.targetName, task.taskId, task.attempts);
continue;
}
if (task.attempts >= 1) {
// reset logPath
task.resetLogPath();
}
await this.retryTask(task);
this.logger.info(
'[TaskService.retryExecuteTimeoutTasks:retry] taskType: %s, targetName: %s, taskId: %s, attempts %s will retry again',
task.type, task.targetName, task.taskId, task.attempts);
} catch (e) {
this.logger.error(
'[TaskService.retryExecuteTimeoutTasks:error] processing task, taskType: %s, targetName: %s, taskId: %s, attempts %s will retry again',
task.type, task.targetName, task.taskId, task.attempts);
continue;
}
if (task.attempts >= 1) {
// reset logPath
task.resetLogPath();
}
await this.retryTask(task);
this.logger.info(
'[TaskService.retryExecuteTimeoutTasks:retry] taskType: %s, targetName: %s, taskId: %s, attempts %s will retry again',
task.type, task.targetName, task.taskId, task.attempts);
}
// try waiting timeout tasks in 30 mins
const waitingTasks = await this.taskRepository.findTimeoutTasks(TaskState.Waiting, 60000 * 30);
for (const task of waitingTasks) {
await this.retryTask(task);
this.logger.warn(
'[TaskService.retryExecuteTimeoutTasks:retryWaiting] taskType: %s, targetName: %s, taskId: %s waiting too long',
task.type, task.targetName, task.taskId);
try {
await this.retryTask(task);
this.logger.warn(
'[TaskService.retryExecuteTimeoutTasks:retryWaiting] taskType: %s, targetName: %s, taskId: %s waiting too long',
task.type, task.targetName, task.taskId);
} catch (e) {
this.logger.error(
'[TaskService.retryExecuteTimeoutTasks:error] waiting task, taskType: %s, targetName: %s, taskId: %s, attempts %s will retry again',
task.type, task.targetName, task.taskId, task.attempts);
}
}
return {
processing: tasks.length,
Expand Down
6 changes: 4 additions & 2 deletions app/repository/util/ModelConvertor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,14 @@ export class ModelConvertor {
const modelPropertyName = attributeMeta.propertyName;
const entityPropertyName = ModelConvertorUtil.getEntityPropertyName(ModelClazz, modelPropertyName);
if (entityPropertyName === CREATED_AT) continue;
// Restricted updates to the primary key
if (entityPropertyName === ID && model[ID]) continue;
const attributeValue = _.get(entity, entityPropertyName);
model[modelPropertyName] = attributeValue;
}

// 不允许设置 UPDATED_AT
// 通过 leoric 进行更新
// Restricted updates to the UPDATED_AT
// Leoric will set by default
model[UPDATED_AT] = undefined;
await model.save(options);
entity[UPDATED_AT] = model[UPDATED_AT];
Expand Down
53 changes: 52 additions & 1 deletion test/schedule/TaskTimeoutHandler.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import { app } from 'egg-mock/bootstrap';
import { TaskState } from 'app/common/enum/Task';
import { PackageSyncerService } from 'app/core/service/PackageSyncerService';
import { HistoryTask } from 'app/repository/model/HistoryTask';
import { ModelConvertor } from 'app/repository/util/ModelConvertor';
import { Task as TaskModel } from 'app/repository/model/Task';
import { app, mock } from 'egg-mock/bootstrap';
import { TaskService } from 'app/core/service/TaskService';

const TaskTimeoutHandlerPath = require.resolve('../../app/port/schedule/TaskTimeoutHandler');

Expand All @@ -10,4 +16,49 @@ describe('test/schedule/TaskTimeoutHandler.test.ts', () => {
// again should work
await app.runSchedule(TaskTimeoutHandlerPath);
});

it('should skip task when retry error', async () => {

const packageSyncerService = await app.getEggObject(PackageSyncerService);

const apple = await packageSyncerService.createTask('apple');
const banana = await packageSyncerService.createTask('banana');

// mock timeout 10mins
await TaskModel.update({ id: apple.id }, {
updatedAt: new Date(apple.updatedAt.getTime() - 60000 * 30 - 1),
state: TaskState.Processing,
});
await TaskModel.update({ id: banana.id }, {
updatedAt: new Date(banana.updatedAt.getTime() - 60000 * 40),
});

app.mockLog();
mock(TaskService.prototype, 'retryTask', async () => {
throw new Error('aba aba');
});
await app.runSchedule(TaskTimeoutHandlerPath);
app.expectLog('[TaskService.retryExecuteTimeoutTasks:error] processing task');
app.expectLog('[TaskService.retryExecuteTimeoutTasks:error] waiting task');
});

it('should modify history task', async () => {

const packageSyncerService = await app.getEggObject(PackageSyncerService);

await packageSyncerService.createTask('boo');
const task = await packageSyncerService.createTask('foo');

// mock task has been finished success
await ModelConvertor.convertEntityToModel({ ...task, state: TaskState.Success, id: 9527 }, HistoryTask);

// mock timeout 10mins
await TaskModel.update({ id: task.id }, {
updatedAt: new Date(task.updatedAt.getTime() - 60000 * 30 - 1),
});

app.mockLog();
await app.runSchedule(TaskTimeoutHandlerPath);
app.expectLog('[TaskTimeoutHandler:subscribe] retry execute timeout tasks');
});
});

0 comments on commit ab59abe

Please sign in to comment.