Skip to content

Commit

Permalink
feat: suspend task before app close (#365)
Browse files Browse the repository at this point in the history
> 机器计划内重启时,需要等待超时后再重新 retry 恢复 changesStream
同步,新增任务挂起机器,应用退出前,挂起当前机器正在执行的同步任务

* 新增 taskRepository#findTaskByAuthorIpAndType 方法,查找所有当前机器所有 worker 同步的任务
* 新增 module.d.ts 定义,目前仅消费 cnpmcoreCore module 内的 changesStream 方法
* app.ts 内调用 changesStreamService#suspendTaskWhenExit 在应用退出前触发

Co-authored-by: killa <killa123@126.com>
  • Loading branch information
elrrrrrrr and killagu authored Dec 15, 2022
1 parent eb04533 commit c562645
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 7 deletions.
10 changes: 9 additions & 1 deletion app.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import path from 'path';
import { readFile } from 'fs/promises';
import { Application } from 'egg';

declare module 'egg' {
interface Application {
binaryHTML: string;
Expand All @@ -23,4 +22,13 @@ export default class CnpmcoreAppHook {
const text = await readFile(filepath, 'utf-8');
this.app.binaryHTML = text.replace('{{registry}}', this.app.config.cnpmcore.registry);
}

// 应用退出时执行
// 需要暂停当前执行的 changesStream task
async beforeClose() {
const ctx = this.app.createAnonymousContext();
await ctx.beginModuleScope(async () => {
await ctx.module.cnpmcoreCore.changesStreamService.suspendTaskWhenExit();
});
}
}
22 changes: 21 additions & 1 deletion app/core/service/ChangesStreamService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
EggObjectFactory,
Inject,
} from '@eggjs/tegg';
import { TaskType } from '../../common/enum/Task';
import { TaskState, TaskType } from '../../common/enum/Task';
import { AbstractService } from '../../common/AbstractService';
import { TaskRepository } from '../../repository/TaskRepository';
import { HOST_NAME, ChangesStreamTask, Task } from '../entity/Task';
Expand Down Expand Up @@ -55,6 +55,26 @@ export class ChangesStreamService extends AbstractService {
return await this.taskService.findExecuteTask(TaskType.ChangesStream) as ChangesStreamTask;
}

public async suspendTaskWhenExit() {
this.logger.info('[ChangesStreamService.suspendTaskWhenExit:start]');
if (this.config.cnpmcore.enableChangesStream) {
// 防止继续获取新的任务
this.config.cnpmcore.enableChangesStream = false;
const authorIp = os.hostname();
// 暂停当前机器所有的 changesStream 任务
const tasks = await this.taskRepository.findTaskByAuthorIpAndType(authorIp, TaskType.ChangesStream);
for (const task of tasks) {
if (task.state === TaskState.Processing) {
this.logger.info('[ChangesStreamService.suspendTaskWhenExit:suspend] taskId: %s', task.taskId);
// 1. 更新任务状态为 waiting
// 2. 重新推入任务队列供其他机器执行
await this.taskService.retryTask(task);
}
}
}
this.logger.info('[ChangesStreamService.suspendTaskWhenExit:finish]');
}

public async executeTask(task: ChangesStreamTask) {
task.authorIp = os.hostname();
task.authorId = `pid_${process.pid}`;
Expand Down
5 changes: 5 additions & 0 deletions app/core/typing.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import { ChangesStreamService } from './service/ChangesStreamService';

export interface ContextCnpmcore {
changesStreamService: ChangesStreamService;
}
8 changes: 8 additions & 0 deletions app/repository/TaskRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,12 @@ export class TaskRepository extends AbstractRepository {
}).limit(1000);
return models.map(model => ModelConvertor.convertModelToEntity(model, TaskEntity));
}

async findTaskByAuthorIpAndType(authorIp: string, type: TaskType) {
const models = await this.Task.find({
type,
authorIp,
}).limit(1000);
return models.map(model => ModelConvertor.convertModelToEntity(model, TaskEntity));
}
}
7 changes: 7 additions & 0 deletions module.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { ContextCnpmcore } from "./app/core/typing";

declare module "egg" {
export interface EggContextModule {
cnpmcoreCore: ContextCnpmcore;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { Registry } from 'app/core/entity/Registry';
import { RegistryManagerService } from 'app/core/service/RegistryManagerService';
import assert = require('assert');
import { Context } from 'egg';
import { app, mock } from 'egg-mock/bootstrap';
import { app } from 'egg-mock/bootstrap';

describe('test/common/adapter/changesStream/CnpmjsorgChangesStream.test.ts', () => {
let ctx: Context;
Expand Down Expand Up @@ -83,7 +83,7 @@ describe('test/common/adapter/changesStream/CnpmjsorgChangesStream.test.ts', ()
],
},
});
const stream = await cnpmjsorgChangesStream.fetchChanges(registry, '1');
const stream = cnpmjsorgChangesStream.fetchChanges(registry, '1');
const changes:ChangesStreamChange[] = [];
for await (const change of stream) {
changes.push(change);
Expand All @@ -92,7 +92,7 @@ describe('test/common/adapter/changesStream/CnpmjsorgChangesStream.test.ts', ()
});

it('should reject max limit', async () => {
mock(ctx.httpclient, 'request', async (url: string) => {
app.mockHttpclient('https://r2.cnpmjs.org/_changes?since=1&limit=', 'GET', (url = '') => {
const limit = (new URL(url)).searchParams.get('limit');
return {
data: {
Expand All @@ -105,7 +105,7 @@ describe('test/common/adapter/changesStream/CnpmjsorgChangesStream.test.ts', ()
},
};
});
const stream = await cnpmjsorgChangesStream.fetchChanges(registry, '1');
const stream = cnpmjsorgChangesStream.fetchChanges(registry, '1');
await assert.rejects(async () => {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _ of stream) {
Expand Down
45 changes: 44 additions & 1 deletion test/core/service/ChangesStreamService.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import assert = require('assert/strict');
import assert = require('assert');
import { Readable } from 'node:stream';
import { app, mock } from 'egg-mock/bootstrap';
import { Context } from 'egg';
Expand All @@ -10,6 +10,7 @@ import { RegistryType } from 'app/common/enum/Registry';
import { ScopeManagerService } from 'app/core/service/ScopeManagerService';
import { Registry } from 'app/core/entity/Registry';
import { TestUtil } from 'test/TestUtil';
import { RedisQueueAdapter } from 'app/infra/QueueAdapter';

describe('test/core/service/ChangesStreamService.test.ts', () => {
let ctx: Context;
Expand All @@ -20,12 +21,14 @@ describe('test/core/service/ChangesStreamService.test.ts', () => {
let task: ChangesStreamTask;
let npmRegistry: Registry;
let cnpmRegistry: Registry;
let queueAdapter: RedisQueueAdapter;
beforeEach(async () => {
ctx = await app.mockModuleContext();
changesStreamService = await ctx.getEggObject(ChangesStreamService);
taskService = await ctx.getEggObject(TaskService);
registryManagerService = await ctx.getEggObject(RegistryManagerService);
scopeManagerService = await ctx.getEggObject(ScopeManagerService);
queueAdapter = await ctx.getEggObject(RedisQueueAdapter);
assert(changesStreamService);
task = Task.createChangesStream('GLOBAL_WORKER', '', '9527');
taskService.createTask(task, false);
Expand Down Expand Up @@ -194,4 +197,44 @@ describe('test/core/service/ChangesStreamService.test.ts', () => {
assert(task.data.since === '3');
});
});

describe('suspendTaskWhenExit()', () => {
it('should work', async () => {
app.mockLog();
mock(app.config.cnpmcore, 'enableChangesStream', true);
app.mockHttpclient('https://replicate.npmjs.com/_changes?since=9527', 'GET', () => {
return {
data: {
res: Readable.from(''),
},
};
});

const task = await changesStreamService.findExecuteTask();
assert(task);
await changesStreamService.executeTask(task);
assert(task.state === 'processing');

let len = await queueAdapter.length('changes_stream');
assert(len === 0);
await changesStreamService.suspendTaskWhenExit();
const newTask = await taskService.findTask(task.taskId);
assert(newTask);
assert(newTask.taskId === task.taskId);
assert(newTask.state === 'waiting');
len = await queueAdapter.length('changes_stream');
assert(len === 1);

app.expectLog('[ChangesStreamService.suspendTaskWhenExit:suspend] taskId');

});

it('should ignore when changesStream disable', async () => {
app.mockLog();
mock(app.config.cnpmcore, 'enableChangesStream', true);
await changesStreamService.suspendTaskWhenExit();
app.expectLog('[ChangesStreamService.suspendTaskWhenExit:finish]');
});

});
});

0 comments on commit c562645

Please sign in to comment.