Skip to content

Commit

Permalink
feat: add context logger
Browse files Browse the repository at this point in the history
  • Loading branch information
mo4islona committed Aug 1, 2024
1 parent 03c34c3 commit 5bf7ea5
Show file tree
Hide file tree
Showing 11 changed files with 4,488 additions and 3,279 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@
},
"devDependencies": {
"turbo": "^1.9.9"
}
},
"packageManager": "pnpm@9.6.0+sha512.38dc6fba8dba35b39340b9700112c2fe1e12f10b17134715a4aa98ccf7bb035e76fd981cf0bb384dfa98f8d6af5481c2bef2f4266a24bfa20c34eb7147ce0b5e"
}
8 changes: 7 additions & 1 deletion packages/rewards-calculator/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,26 @@
"decimal.js": "^10.4.3",
"express": "^4.19.2",
"libp2p": "^1.0.0",
"ms": "^2.1.3",
"pino": "^9.3.2",
"pino-multi-stream": "^6.0.0",
"protobufjs": "^7.2.5",
"viem": "^1.19.10"
},
"scripts": {
"reward-simulation": "ts-node src/generate-logs.ts && ts-node src/index.ts",
"start": "TS_NODE_TRANSPILE_ONLY=true NODE_OPTIONS=\"--loader ts-node/esm\" ts-node src/index.ts",
"endpoints": "TS_NODE_TRANSPILE_ONLY=true NODE_OPTIONS=\"--loader ts-node/esm\" ts-node src/endpoints.ts",
"stats": "ts-node src/epochStats.ts"
"stats": "ts-node src/epochStats.ts",
"tsc": "tsc --noEmit"
},
"devDependencies": {
"@types/chai": "^4.3.4",
"@types/express": "^4.17.21",
"@types/mocha": "^10.0.6",
"@types/ms": "^0.7.34",
"@types/node": "^20.10.1",
"@types/pino-multi-stream": "^5.1.6",
"chai": "^4.3.10",
"mocha": "^10.2.0",
"ts-node": "^10.9.1",
Expand Down
47 changes: 37 additions & 10 deletions packages/rewards-calculator/src/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import {
isAddressEqual,
parseAbiItem,
} from "viem";
import { logger } from "./logger";
import { Context, logger } from './logger';
import { bigSum, fromBase58 } from "./utils";
import { Rewards } from "./reward";
import { Workers } from "./workers";
Expand Down Expand Up @@ -89,7 +89,7 @@ export async function getBlockNumber() {
return Number(await l1Client.getBlockNumber());
}

export async function lastRewardedBlock() {
export async function getLastRewardedBlock() {
return Number(await contracts.rewardsDistribution.read.lastBlockRewarded());
}

Expand Down Expand Up @@ -175,6 +175,7 @@ async function sendCommitRequest(
`Reward commit, blocks ${fromBlock} - ${toBlock}\n${totalWorkers} workers rewarded.
Worker reward: ${totalWorkerReward} SQD;\nStaker reward: ${totalStarkerReward} SQD`,
);

return sendFordefiTransaction(request);
}

Expand Down Expand Up @@ -204,6 +205,7 @@ async function logIfSuccessfulDistribution(
.some((event) => event.eventName === "Distributed")
) {
workers.noteSuccessfulCommit(txHash);

await workers.printLogs({
walletAddress: address,
index,
Expand All @@ -212,6 +214,7 @@ async function logIfSuccessfulDistribution(
}

export async function commitRewards(
ctx: Context,
fromBlock: number,
toBlock: number,
workers: Workers,
Expand All @@ -222,23 +225,25 @@ export async function commitRewards(
const { workerIds, rewardAmounts, stakedAmounts } = rewardsToTxArgs(rewards);

if (!(await canCommit(address))) {
console.log("Cannot commit", address);
ctx.logger.warn('Cannot commit rewards due blockchain read request');
return;
}

const tx = await sendCommitRequest(
BigInt(fromBlock),
BigInt(toBlock),
workerIds,
rewardAmounts,
stakedAmounts,
);

if (!tx) {
return;
}

ctx.logger.info(`committed rewards ${tx}, logging successful distribution...`);

await logIfSuccessfulDistribution(tx, workers, address, index);

logger.log("Commit rewards", tx);
return tx;
}

Expand All @@ -263,6 +268,7 @@ async function sendApproveRequest(
}

async function tryToRecommit(
ctx: Context,
fromBlock: number,
toBlock: number,
rewards: Rewards,
Expand All @@ -281,6 +287,9 @@ async function tryToRecommit(
) {
return;
}

ctx.logger.debug(`trying to recommit...`);

const { workerIds, rewardAmounts, stakedAmounts } = rewardsToTxArgs(rewards);
const tx = await sendCommitRequest(
BigInt(fromBlock),
Expand All @@ -289,11 +298,14 @@ async function tryToRecommit(
rewardAmounts,
stakedAmounts,
);
logger.log("Recommit rewards", tx);

ctx.logger.debug(`recommit rewards successfully, tx ${tx}`)

return tx;
}

export async function approveRewards(
ctx: Context,
fromBlock: number,
toBlock: number,
workers: Workers,
Expand All @@ -314,26 +326,41 @@ export async function approveRewards(
]))
) {
const tx = await tryToRecommit(
ctx,
fromBlock,
toBlock,
rewards,
address,
commitment,
);
if (!tx) logger.log("Cannot approve rewards", address);
if (tx) await logIfSuccessfulDistribution(tx, workers, address, index);
if (!tx) {
ctx.logger.info("cannot re-commit rewards");
}
else {
ctx.logger.info(`re-commited rewards ${tx}, logging successful distribution...`);

await logIfSuccessfulDistribution(tx, workers, address, index);
}

return;
}

const tx = await sendApproveRequest(
BigInt(fromBlock),
BigInt(toBlock),
workerIds,
rewardAmounts,
stakedAmounts,
);
if (!tx) return;
if (!tx) {
ctx.logger.info("cannot approve rewards, tx is missing");
return;
}

ctx.logger.info(`approved rewards ${tx}, logging successful distribution...`);

await logIfSuccessfulDistribution(tx, workers, address, index);
logger.log("Approve rewards", tx);

return tx;
}

Expand Down
52 changes: 30 additions & 22 deletions packages/rewards-calculator/src/clickhouseClient.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { ClickHouse } from "clickhouse";
import dayjs from "dayjs";
import utc from "dayjs/plugin/utc";
import { ClickHouse } from 'clickhouse';
import dayjs from 'dayjs';
import utc from 'dayjs/plugin/utc';

import { Workers } from "./workers";
import { logger } from "./logger";
import { config } from "./config";
import { sum } from "./utils";
import { Workers } from './workers';
import { Context } from './logger';
import { config } from './config';
import { sum } from './utils';

dayjs.extend(utc);
const clickhouse = new ClickHouse({
Expand All @@ -25,6 +25,7 @@ export class ClickhouseClient {
private readonly workers: Workers;

constructor(
public ctx: Context,
public from: Date,
public to: Date,
) {
Expand Down Expand Up @@ -55,13 +56,14 @@ export class ClickhouseClient {
"(collector_timestamp - worker_timestamp) / 60000 as timeDiff",
];
await this.logTotalQueries();
const query = `select ${columns.join(",")} from ${
config.clickhouse.logsTableName
} where ${
config.clickhouse.logsTableName
}.worker_timestamp >= '${formatDate(this.from)}' and ${
config.clickhouse.logsTableName
}.worker_timestamp <= '${formatDate(this.to)}' and timeDiff < 20 order by query_hash`;

const query = `
select ${columns.join(",")}
from ${config.clickhouse.logsTableName}
where worker_timestamp >= '${formatDate(this.from)}'
and worker_timestamp <= '${formatDate(this.to)}'
and timeDiff < 20 order by query_hash
`;
for await (const row of clickhouse.query(query).stream()) {
const worker = this.workers.add(row.worker_id);
await worker.processQuery(row, shouldSkipSignatureValidation);
Expand Down Expand Up @@ -89,15 +91,15 @@ export class ClickhouseClient {
}

private async logTotalQueries() {
const count = `select COUNT(*) as total from ${
config.clickhouse.logsTableName
} where ${
config.clickhouse.logsTableName
}.worker_timestamp >= '${formatDate(this.from)}' and ${
config.clickhouse.logsTableName
}.worker_timestamp <= '${formatDate(this.to)}'`;
const count = `
select COUNT(*) as total
from ${config.clickhouse.logsTableName}
where worker_timestamp >= '${formatDate(this.from)}' and worker_timestamp <= '${formatDate(this.to)}'
`;

const [{ total }] = (await clickhouse.query(count).toPromise()) as any;
logger.log("Processing queries:", total);

this.ctx.logger.debug(`processing queries: ${total}`);
}
}

Expand All @@ -115,18 +117,24 @@ function totalOfflineSeconds(diffs: number[]) {
}

export async function livenessFactor(clickhouseClient: ClickhouseClient) {
clickhouseClient.ctx.logger.debug('calculating liveness factor...')

const pings = await clickhouseClient.getPings();
const totalPeriodSeconds = dayjs(clickhouseClient.to).diff(
dayjs(clickhouseClient.from),
"second",
);

const res: Record<string, NetworkStatsEntry> = {};
for (const workersKey in pings) {
res[workersKey] = networkStats(
pings[workersKey],
totalPeriodSeconds,
);
}

clickhouseClient.ctx.logger.debug(`liveness factor calculated for ${Object.keys(pings).length}`)

return res;
}

Expand Down
7 changes: 7 additions & 0 deletions packages/rewards-calculator/src/endpoints.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import express from "express";
import { epochStats } from "./reward";
import { config, l1Client } from "./config";
import { getBlockNumber } from "./chain";
import { Context } from './logger';

const app = express();
const port = process.env.PORT ?? 3000;
Expand Down Expand Up @@ -37,16 +38,22 @@ async function rewards(
res.status(400).send("fromBlock is not an integer");
return;
}

if (!isInteger(toBlock)) {
res.status(400).send("toBlock is not an integer");
return;
}

if (Number(fromBlock) >= Number(toBlock)) {
res.status(400).send("fromBlock should be less than toBlock");
return;
}

const ctx = new Context()

try {
const _epochStats = await epochStats(
ctx,
Number(fromBlock),
Number(toBlock),
true,
Expand Down
57 changes: 57 additions & 0 deletions packages/rewards-calculator/src/logger.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,61 @@
import pino from 'pino';
import dayjs from 'dayjs';
import pinoms, { Level, prettyStream, Streams } from 'pino-multi-stream';

const shouldLog = () => process.env.VERBOSE === "true";
const prettyLog = process.env.DISABLE_PRETTY_PRINT === undefined && process.stdout.isTTY;
const logLevel = (process.env.LOG_LEVEL || 'debug') as Level;

export type CtxValue = Record<string, string | number>

const streams: Streams = [
{

level: logLevel,
stream: prettyLog
? prettyStream({
prettyPrint: {
messageKey: 'message',
singleLine: true,
},
})
: pino.destination(process.stdout),
},
];

const pino_logger = pinoms({
base: null,
level: logLevel,
streams,
timestamp: prettyLog
? () => `,"time":"${dayjs(new Date()).format('HH:mm:ss.SSS')}"`
: () => `,"timestamp":"${new Date().toISOString()}"`,
messageKey: 'message',
formatters: {
level(label) {
return { level: label };
},
},
serializers: {
error: e => pino.stdSerializers.err(e),
err: e => pino.stdSerializers.err(e),
},
})


export class Context {
value: CtxValue
logger: pinoms.Logger

constructor(value: CtxValue = {}) {
this.value = value;
this.logger = pino_logger.child(value);
}

child(value: CtxValue) {
return new Context({ ...this.value, ...value });
}
}

function logWithWorkerAddress(
workerAddress: string,
Expand Down
Loading

0 comments on commit 5bf7ea5

Please sign in to comment.