Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update index json #379

Merged
merged 6 commits into from
Jan 28, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/common/constants.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export const BUG_VERSIONS = 'bug-versions';
export const LATEST_TAG = 'latest';
export const GLOBAL_WORKER = 'GLOBAL_WORKER';
37 changes: 24 additions & 13 deletions app/core/service/CacheService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,38 @@ import {
} from '@eggjs/tegg';
import { CacheAdapter } from '../../common/adapter/CacheAdapter';
import { AbstractService } from '../../common/AbstractService';
import { ChangesStreamTaskData } from '../entity/Task';

type PackageCacheAttribe = 'etag' | 'manifests';

type TotalData = {
export type UpstreamRegistryInfo = {
registry_name: string;
source_registry: string;
changes_stream_url: string;
} & ChangesStreamTaskData;

export type DownloadInfo = {
today: number;
yesterday: number;
samedayLastweek: number;
thisweek: number;
thismonth: number;
thisyear: number;
lastweek: number;
lastmonth: number;
lastyear: number;
};

export type TotalData = {
packageCount: number;
packageVersionCount: number;
lastPackage: string;
lastPackageVersion: string;
download: {
today: number;
yesterday: number;
samedayLastweek: number;
thisweek: number;
thismonth: number;
thisyear: number;
lastweek: number;
lastmonth: number;
lastyear: number;
};
changesStream: object,
download: DownloadInfo;
changesStream: ChangesStreamTaskData;
lastChangeId: number | bigint;
cacheTime: string;
upstreamRegistries: UpstreamRegistryInfo[];
};
const TOTAL_DATA_KEY = '__TOTAL_DATA__';

Expand Down Expand Up @@ -72,6 +82,7 @@ export class CacheService extends AbstractService {
lastyear: 0,
},
changesStream: {},
upstreamRegistries: [],
lastChangeId: 0,
cacheTime: '',
};
Expand Down
3 changes: 2 additions & 1 deletion app/core/service/ChangesStreamService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { E500 } from 'egg-errors';
import { Registry } from '../entity/Registry';
import { AbstractChangeStream } from '../../common/adapter/changesStream/AbstractChangesStream';
import { getScopeAndName } from '../../common/PackageUtil';
import { GLOBAL_WORKER } from '../../common/constants';
import { ScopeManagerService } from './ScopeManagerService';
import { PackageRepository } from '../../repository/PackageRepository';

Expand All @@ -44,7 +45,7 @@ export class ChangesStreamService extends AbstractService {
// GLOBAL_WORKER: 默认的同步源
// `{registryName}_WORKER`: 自定义 scope 的同步源
public async findExecuteTask(): Promise<ChangesStreamTask | null> {
const targetName = 'GLOBAL_WORKER';
const targetName = GLOBAL_WORKER;
const globalRegistryTask = await this.taskRepository.findTaskByTargetName(targetName, TaskType.ChangesStream);
// 如果没有配置默认同步源,先进行初始化
if (!globalRegistryTask) {
Expand Down
42 changes: 40 additions & 2 deletions app/port/controller/HomeController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,44 @@ import {
Inject,
} from '@eggjs/tegg';
import { AbstractController } from './AbstractController';
import { CacheService } from '../../core/service/CacheService';
import { CacheService, DownloadInfo, UpstreamRegistryInfo } from '../../core/service/CacheService';

const startTime = new Date();

// registry 站点信息数据 SiteTotalData
// SiteEnvInfo: 环境、运行时相关信息,实时查询
// UpstreamInfo: 上游信息,实时查询
// TotalInfo: 总数据信息,定时任务每分钟生成
// LegacyInfo: 旧版兼容信息
type SiteTotalData = LegacyInfo & SiteEnvInfo & TotalInfo;

type LegacyInfo = {
source_registry: string,
changes_stream_registry: string,
sync_changes_steam: any,
fengmk2 marked this conversation as resolved.
Show resolved Hide resolved
};

type SiteEnvInfo = {
sync_model: string;
sync_binary: string;
instance_start_time: Date;
node_version: string;
app_version: string;
engine: string;
cache_time: string;
};

type TotalInfo = {
last_package: string;
last_package_version: string;
doc_count: number | bigint;
doc_version_count: number | bigint;
update_seq: number | bigint;
download: DownloadInfo;
upstream_registries?: UpstreamRegistryInfo[];
};


@HTTPController()
export class HomeController extends AbstractController {
@Inject()
Expand All @@ -23,9 +57,12 @@ export class HomeController extends AbstractController {
path: '/',
method: HTTPMethodEnum.GET,
})
// 2023-1-20
// 原有 LegacyInfo 字段继续保留,由于 ChangesStream 信息通过 registry 表配置,可能会过期
// 新增 upstream_registries 字段,展示上游源站 registry 信息列表
async showTotal() {
const totalData = await this.cacheService.getTotalData();
const data = {
const data: SiteTotalData = {
last_package: totalData.lastPackage,
last_package_version: totalData.lastPackageVersion,
doc_count: totalData.packageCount,
Expand All @@ -42,6 +79,7 @@ export class HomeController extends AbstractController {
source_registry: this.config.cnpmcore.sourceRegistry,
changes_stream_registry: this.config.cnpmcore.changesStreamRegistry,
cache_time: totalData.cacheTime,
upstream_registries: totalData.upstreamRegistries,
};
return data;
}
Expand Down
48 changes: 40 additions & 8 deletions app/port/schedule/UpdateTotalData.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { EggLogger } from 'egg';
import { IntervalParams, Schedule, ScheduleType } from '@eggjs/tegg/schedule';
import { Inject } from '@eggjs/tegg';
import { ChangesStreamTaskData } from '../../core/entity/Task';
import { RegistryManagerService } from '../../core/service/RegistryManagerService';
import { PackageVersionDownloadRepository } from '../../repository/PackageVersionDownloadRepository';
import { PackageRepository } from '../../repository/PackageRepository';
import { TaskRepository } from '../../repository/TaskRepository';
import { ChangeRepository } from '../../repository/ChangeRepository';
import { CacheService } from '../../core/service/CacheService';
import { CacheService, DownloadInfo, TotalData } from '../../core/service/CacheService';
import { TaskType } from '../../common/enum/Task';
import { GLOBAL_WORKER } from '../../common/constants';
import dayjs from '../../common/dayjs';


@Schedule<IntervalParams>({
type: ScheduleType.WORKER,
scheduleData: {
Expand Down Expand Up @@ -38,11 +40,12 @@ export class UpdateTotalData {
@Inject()
private readonly cacheService: CacheService;

async subscribe() {
const changesStreamTask = await this.taskRepository.findTaskByTargetName('GLOBAL_WORKER', TaskType.ChangesStream);
const packageTotal = await this.packageRepository.queryTotal();
@Inject()
private readonly registryManagerService: RegistryManagerService;

const download = {
// 计算下载量相关信息,不区分不同 changesStream
private async calculateDownloadInfo() {
const download: DownloadInfo = {
today: 0,
yesterday: 0,
samedayLastweek: 0,
Expand Down Expand Up @@ -92,15 +95,44 @@ export class UpdateTotalData {
}
}
}
return download;
}

async subscribe() {
const packageTotal = await this.packageRepository.queryTotal();
const download = await this.calculateDownloadInfo();

const lastChange = await this.changeRepository.getLastChange();
const totalData = {
const totalData: TotalData = {
...packageTotal,
download,
changesStream: changesStreamTask && changesStreamTask.data || {},
lastChangeId: lastChange && lastChange.id || 0,
cacheTime: new Date().toISOString(),
changesStream: {} as unknown as ChangesStreamTaskData,
upstreamRegistries: [],
};

const tasks = await this.taskRepository.findTasksByCondition({ type: TaskType.ChangesStream });
for (const task of tasks) {
// 全局 changesStream
const data = task.data as ChangesStreamTaskData;
// 补充录入 upstreamRegistries
const registry = await this.registryManagerService.findByRegistryId(data.registryId as string);
if (registry) {
totalData.upstreamRegistries.push({
...data,
source_registry: registry?.host,
changes_stream_url: registry?.changeStream,
registry_name: registry?.name,
});
}

// 兼容 LegacyInfo 字段
if (task.targetName === GLOBAL_WORKER) {
totalData.changesStream = data;
}
}

await this.cacheService.saveTotalData(totalData);
this.logger.info('[UpdateTotalData.subscribe] total data: %j', totalData);
}
Expand Down
15 changes: 11 additions & 4 deletions app/repository/PackageRepository.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { AccessLevel, SingletonProto, Inject } from '@eggjs/tegg';
import type { Package as PackageModel } from './model/Package';
import { Package as PackageModel } from './model/Package';
import { Package as PackageEntity } from '../core/entity/Package';
import { ModelConvertor } from './util/ModelConvertor';
import { PackageVersion as PackageVersionEntity } from '../core/entity/PackageVersion';
import type { PackageVersion as PackageVersionModel } from './model/PackageVersion';
import { PackageVersion as PackageVersionModel } from './model/PackageVersion';
import { PackageVersionManifest as PackageVersionManifestEntity } from '../core/entity/PackageVersionManifest';
import type { PackageVersionManifest as PackageVersionManifestModel } from './model/PackageVersionManifest';
import type { Dist as DistModel } from './model/Dist';
Expand All @@ -14,6 +14,7 @@ import type { Maintainer as MaintainerModel } from './model/Maintainer';
import type { User as UserModel } from './model/User';
import { User as UserEntity } from '../core/entity/User';
import { AbstractRepository } from './AbstractRepository';
import { RawQueryUtil } from './util/RawQueryUtil';

@SingletonProto({
accessLevel: AccessLevel.PUBLIC,
Expand All @@ -40,6 +41,10 @@ export class PackageRepository extends AbstractRepository {
@Inject()
private readonly User: typeof UserModel;

@Inject()
private readonly rawQueryUtil : RawQueryUtil;


async findPackage(scope: string, name: string): Promise<PackageEntity | null> {
const model = await this.Package.findOne({ scope, name });
if (!model) return null;
Expand Down Expand Up @@ -252,7 +257,8 @@ export class PackageRepository extends AbstractRepository {
if (lastPkg) {
lastPackage = lastPkg.scope ? `${lastPkg.scope}/${lastPkg.name}` : lastPkg.name;
// FIXME: id will be out of range number
packageCount = Number(lastPkg.id);
// 可能存在 id 增长不连续的情况,通过 count 查询
packageCount = await this.rawQueryUtil.getCount(PackageModel);
}

if (lastVersion) {
Expand All @@ -261,7 +267,7 @@ export class PackageRepository extends AbstractRepository {
const fullname = pkg.scope ? `${pkg.scope}/${pkg.name}` : pkg.name;
lastPackageVersion = `${fullname}@${lastVersion.version}`;
}
packageVersionCount = Number(lastVersion.id);
packageVersionCount = await this.rawQueryUtil.getCount(PackageVersionModel);
}
return {
packageCount,
Expand Down Expand Up @@ -327,4 +333,5 @@ export class PackageRepository extends AbstractRepository {
}
return entities;
}

}
5 changes: 5 additions & 0 deletions app/repository/TaskRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ export class TaskRepository extends AbstractRepository {
return tasks.map(task => ModelConvertor.convertModelToEntity(task, TaskEntity));
}

async findTasksByCondition(where: { targetName?: string; state?: TaskState; type: TaskType }): Promise<Array<TaskEntity>> {
const tasks = await this.Task.find(where);
return tasks.map(task => ModelConvertor.convertModelToEntity(task, TaskEntity));
}

async findTaskByTargetName(targetName: string, type: TaskType, state?: TaskState) {
const where: any = { targetName, type };
if (state) {
Expand Down
43 changes: 43 additions & 0 deletions app/repository/util/RawQueryUtil.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {
AccessLevel,
EggObjectLifecycle,
Inject,
SingletonProto,
} from '@eggjs/tegg';
import { LeoricRegister } from '@eggjs/tegg-orm-plugin/lib/LeoricRegister';
import { EggAppConfig } from 'egg';
import Realm, { Bone } from 'leoric';

@SingletonProto({
name: 'rawQueryUtil',
accessLevel: AccessLevel.PUBLIC,
})
export class RawQueryUtil implements EggObjectLifecycle {
@Inject()
private leoricRegister: LeoricRegister;

@Inject()
private config: EggAppConfig;

private client: Realm;

async init() {
this.client = await this.leoricRegister.getOrCreateRealm(undefined);
elrrrrrrr marked this conversation as resolved.
Show resolved Hide resolved
}

public async getCount(model: typeof Bone): Promise<number> {
const { database } = this.config.orm;
const sql = `
SELECT
TABLE_ROWS
FROM
information_schema.tables
WHERE
table_schema = '${database}'
AND table_name = '${model.table}'
`;
const queryRes = await this.client.query(sql);
return queryRes.rows?.[0]?.TABLE_ROWS as number || 0;
}

}
Loading