From c562645db7c88f9c3c5787fd450b457574d1cce6 Mon Sep 17 00:00:00 2001 From: elrrrrrrr Date: Thu, 15 Dec 2022 21:34:24 +0800 Subject: [PATCH] feat: suspend task before app close (#365) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit > 机器计划内重启时,需要等待超时后再重新 retry 恢复 changesStream 同步,新增任务挂起机器,应用退出前,挂起当前机器正在执行的同步任务 * 新增 taskRepository#findTaskByAuthorIpAndType 方法,查找所有当前机器所有 worker 同步的任务 * 新增 module.d.ts 定义,目前仅消费 cnpmcoreCore module 内的 changesStream 方法 * app.ts 内调用 changesStreamService#suspendTaskWhenExit 在应用退出前触发 Co-authored-by: killa --- app.ts | 10 ++++- app/core/service/ChangesStreamService.ts | 22 ++++++++- app/core/typing.ts | 5 +++ app/repository/TaskRepository.ts | 8 ++++ module.d.ts | 7 +++ .../CnpmjsorgChangesStream.test.ts | 8 ++-- .../core/service/ChangesStreamService.test.ts | 45 ++++++++++++++++++- 7 files changed, 98 insertions(+), 7 deletions(-) create mode 100644 app/core/typing.ts create mode 100644 module.d.ts diff --git a/app.ts b/app.ts index b22474ac..2109246a 100644 --- a/app.ts +++ b/app.ts @@ -1,7 +1,6 @@ import path from 'path'; import { readFile } from 'fs/promises'; import { Application } from 'egg'; - declare module 'egg' { interface Application { binaryHTML: string; @@ -23,4 +22,13 @@ export default class CnpmcoreAppHook { const text = await readFile(filepath, 'utf-8'); this.app.binaryHTML = text.replace('{{registry}}', this.app.config.cnpmcore.registry); } + + // 应用退出时执行 + // 需要暂停当前执行的 changesStream task + async beforeClose() { + const ctx = this.app.createAnonymousContext(); + await ctx.beginModuleScope(async () => { + await ctx.module.cnpmcoreCore.changesStreamService.suspendTaskWhenExit(); + }); + } } diff --git a/app/core/service/ChangesStreamService.ts b/app/core/service/ChangesStreamService.ts index dd8097ca..b951ed61 100644 --- a/app/core/service/ChangesStreamService.ts +++ b/app/core/service/ChangesStreamService.ts @@ -6,7 +6,7 @@ import { EggObjectFactory, Inject, } from '@eggjs/tegg'; -import { TaskType } from '../../common/enum/Task'; +import { TaskState, TaskType } from '../../common/enum/Task'; import { AbstractService } from '../../common/AbstractService'; import { TaskRepository } from '../../repository/TaskRepository'; import { HOST_NAME, ChangesStreamTask, Task } from '../entity/Task'; @@ -55,6 +55,26 @@ export class ChangesStreamService extends AbstractService { return await this.taskService.findExecuteTask(TaskType.ChangesStream) as ChangesStreamTask; } + public async suspendTaskWhenExit() { + this.logger.info('[ChangesStreamService.suspendTaskWhenExit:start]'); + if (this.config.cnpmcore.enableChangesStream) { + // 防止继续获取新的任务 + this.config.cnpmcore.enableChangesStream = false; + const authorIp = os.hostname(); + // 暂停当前机器所有的 changesStream 任务 + const tasks = await this.taskRepository.findTaskByAuthorIpAndType(authorIp, TaskType.ChangesStream); + for (const task of tasks) { + if (task.state === TaskState.Processing) { + this.logger.info('[ChangesStreamService.suspendTaskWhenExit:suspend] taskId: %s', task.taskId); + // 1. 更新任务状态为 waiting + // 2. 重新推入任务队列供其他机器执行 + await this.taskService.retryTask(task); + } + } + } + this.logger.info('[ChangesStreamService.suspendTaskWhenExit:finish]'); + } + public async executeTask(task: ChangesStreamTask) { task.authorIp = os.hostname(); task.authorId = `pid_${process.pid}`; diff --git a/app/core/typing.ts b/app/core/typing.ts new file mode 100644 index 00000000..0302ee6b --- /dev/null +++ b/app/core/typing.ts @@ -0,0 +1,5 @@ +import { ChangesStreamService } from './service/ChangesStreamService'; + +export interface ContextCnpmcore { + changesStreamService: ChangesStreamService; +} diff --git a/app/repository/TaskRepository.ts b/app/repository/TaskRepository.ts index 105d2050..07119fb3 100644 --- a/app/repository/TaskRepository.ts +++ b/app/repository/TaskRepository.ts @@ -116,4 +116,12 @@ export class TaskRepository extends AbstractRepository { }).limit(1000); return models.map(model => ModelConvertor.convertModelToEntity(model, TaskEntity)); } + + async findTaskByAuthorIpAndType(authorIp: string, type: TaskType) { + const models = await this.Task.find({ + type, + authorIp, + }).limit(1000); + return models.map(model => ModelConvertor.convertModelToEntity(model, TaskEntity)); + } } diff --git a/module.d.ts b/module.d.ts new file mode 100644 index 00000000..d41dbda3 --- /dev/null +++ b/module.d.ts @@ -0,0 +1,7 @@ +import { ContextCnpmcore } from "./app/core/typing"; + +declare module "egg" { + export interface EggContextModule { + cnpmcoreCore: ContextCnpmcore; + } +} diff --git a/test/common/adapter/changesStream/CnpmjsorgChangesStream.test.ts b/test/common/adapter/changesStream/CnpmjsorgChangesStream.test.ts index 4c8c5f28..6002edbb 100644 --- a/test/common/adapter/changesStream/CnpmjsorgChangesStream.test.ts +++ b/test/common/adapter/changesStream/CnpmjsorgChangesStream.test.ts @@ -5,7 +5,7 @@ import { Registry } from 'app/core/entity/Registry'; import { RegistryManagerService } from 'app/core/service/RegistryManagerService'; import assert = require('assert'); import { Context } from 'egg'; -import { app, mock } from 'egg-mock/bootstrap'; +import { app } from 'egg-mock/bootstrap'; describe('test/common/adapter/changesStream/CnpmjsorgChangesStream.test.ts', () => { let ctx: Context; @@ -83,7 +83,7 @@ describe('test/common/adapter/changesStream/CnpmjsorgChangesStream.test.ts', () ], }, }); - const stream = await cnpmjsorgChangesStream.fetchChanges(registry, '1'); + const stream = cnpmjsorgChangesStream.fetchChanges(registry, '1'); const changes:ChangesStreamChange[] = []; for await (const change of stream) { changes.push(change); @@ -92,7 +92,7 @@ describe('test/common/adapter/changesStream/CnpmjsorgChangesStream.test.ts', () }); it('should reject max limit', async () => { - mock(ctx.httpclient, 'request', async (url: string) => { + app.mockHttpclient('https://r2.cnpmjs.org/_changes?since=1&limit=', 'GET', (url = '') => { const limit = (new URL(url)).searchParams.get('limit'); return { data: { @@ -105,7 +105,7 @@ describe('test/common/adapter/changesStream/CnpmjsorgChangesStream.test.ts', () }, }; }); - const stream = await cnpmjsorgChangesStream.fetchChanges(registry, '1'); + const stream = cnpmjsorgChangesStream.fetchChanges(registry, '1'); await assert.rejects(async () => { // eslint-disable-next-line @typescript-eslint/no-unused-vars for await (const _ of stream) { diff --git a/test/core/service/ChangesStreamService.test.ts b/test/core/service/ChangesStreamService.test.ts index 2cac6939..d2339e68 100644 --- a/test/core/service/ChangesStreamService.test.ts +++ b/test/core/service/ChangesStreamService.test.ts @@ -1,4 +1,4 @@ -import assert = require('assert/strict'); +import assert = require('assert'); import { Readable } from 'node:stream'; import { app, mock } from 'egg-mock/bootstrap'; import { Context } from 'egg'; @@ -10,6 +10,7 @@ import { RegistryType } from 'app/common/enum/Registry'; import { ScopeManagerService } from 'app/core/service/ScopeManagerService'; import { Registry } from 'app/core/entity/Registry'; import { TestUtil } from 'test/TestUtil'; +import { RedisQueueAdapter } from 'app/infra/QueueAdapter'; describe('test/core/service/ChangesStreamService.test.ts', () => { let ctx: Context; @@ -20,12 +21,14 @@ describe('test/core/service/ChangesStreamService.test.ts', () => { let task: ChangesStreamTask; let npmRegistry: Registry; let cnpmRegistry: Registry; + let queueAdapter: RedisQueueAdapter; beforeEach(async () => { ctx = await app.mockModuleContext(); changesStreamService = await ctx.getEggObject(ChangesStreamService); taskService = await ctx.getEggObject(TaskService); registryManagerService = await ctx.getEggObject(RegistryManagerService); scopeManagerService = await ctx.getEggObject(ScopeManagerService); + queueAdapter = await ctx.getEggObject(RedisQueueAdapter); assert(changesStreamService); task = Task.createChangesStream('GLOBAL_WORKER', '', '9527'); taskService.createTask(task, false); @@ -194,4 +197,44 @@ describe('test/core/service/ChangesStreamService.test.ts', () => { assert(task.data.since === '3'); }); }); + + describe('suspendTaskWhenExit()', () => { + it('should work', async () => { + app.mockLog(); + mock(app.config.cnpmcore, 'enableChangesStream', true); + app.mockHttpclient('https://replicate.npmjs.com/_changes?since=9527', 'GET', () => { + return { + data: { + res: Readable.from(''), + }, + }; + }); + + const task = await changesStreamService.findExecuteTask(); + assert(task); + await changesStreamService.executeTask(task); + assert(task.state === 'processing'); + + let len = await queueAdapter.length('changes_stream'); + assert(len === 0); + await changesStreamService.suspendTaskWhenExit(); + const newTask = await taskService.findTask(task.taskId); + assert(newTask); + assert(newTask.taskId === task.taskId); + assert(newTask.state === 'waiting'); + len = await queueAdapter.length('changes_stream'); + assert(len === 1); + + app.expectLog('[ChangesStreamService.suspendTaskWhenExit:suspend] taskId'); + + }); + + it('should ignore when changesStream disable', async () => { + app.mockLog(); + mock(app.config.cnpmcore, 'enableChangesStream', true); + await changesStreamService.suspendTaskWhenExit(); + app.expectLog('[ChangesStreamService.suspendTaskWhenExit:finish]'); + }); + + }); });