Skip to content

Commit

Permalink
feat: add response time to logs
Browse files Browse the repository at this point in the history
  • Loading branch information
manast committed May 20, 2024
1 parent f41c201 commit 9b461f7
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 47 deletions.
36 changes: 19 additions & 17 deletions lib/responders/bull-responders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function paginate(
return (<any>queue)
[method](start, end, opts)
.then(function (jobs: Bull.Job[]) {
respond(ws, messageId, jobs);
respond(ws, Date.now(), messageId, jobs);
});
}

Expand All @@ -29,6 +29,7 @@ async function respondJobCommand(
msg: any
) {
const data = msg.data;
const startTime = Date.now();
const job = await queue.getJob(data.jobId);

switch (data.cmd) {
Expand All @@ -54,31 +55,32 @@ async function respondJobCommand(
`Missing command ${data.cmd}. Too old version of taskforce-connector?`
);
}
respond(ws, msg.id);
respond(ws, startTime, msg.id);
}

async function respondQueueCommand(
ws: WebSocketClient,
queue: Bull.Queue,
msg: any
) {
const startTime = Date.now();
const data = msg.data;
switch (data.cmd) {
case "getJob":
const job = await queue.getJob(data.jobId);
respond(ws, msg.id, job);
respond(ws, startTime, msg.id, job);
break;
case "getJobCounts":
const jobCounts = await queue.getJobCounts();
respond(ws, msg.id, jobCounts);
respond(ws, startTime, msg.id, jobCounts);
break;
case "getMetrics":
const metrics = await (<any>queue).getMetrics(
data.type,
data.start,
data.end
);
respond(ws, msg.id, metrics);
respond(ws, startTime, msg.id, metrics);
break;
case "getWaiting":
case "getActive":
Expand All @@ -92,7 +94,7 @@ async function respondQueueCommand(

case "getJobLogs":
const logs = await queue.getJobLogs(data.jobId, data.start, data.end);
respond(ws, msg.id, logs);
respond(ws, startTime, msg.id, logs);

case "getWaitingCount":
case "getActiveCount":
Expand All @@ -101,50 +103,50 @@ async function respondQueueCommand(
case "getFailedCount":
case "getRepeatableCount":
const count = await (<any>queue)[data.cmd]();
respond(ws, msg.id, count);
respond(ws, startTime, msg.id, count);
break;
case "getWorkersCount":
const workers = await queue.getWorkers();
respond(ws, msg.id, workers.length);
respond(ws, startTime, msg.id, workers.length);
break;
case "removeRepeatableByKey":
await queue.removeRepeatableByKey(data.key);
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "add":
await queue.add(...(data.args as [string, object, object]));
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "empty":
await queue.empty();
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "pause":
await queue.pause();
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "resume":
await queue.resume();
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "isPaused":
const isPaused = await queue.isPaused();
respond(ws, msg.id, isPaused);
respond(ws, startTime, msg.id, isPaused);
break;
case "obliterate":
await queue.obliterate();
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "clean":
await queue.clean(data.grace, data.status, data.limit);
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "retryJobs":
await (<any>queue).retryJobs({
status: data.status,
count: data.count,
});
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
default:
console.error(
Expand Down
40 changes: 21 additions & 19 deletions lib/responders/bullmq-responders.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ function paginate(
start = start || 0;
end = end || -1;
return (<any>queue)[method](start, end, opts).then(function (jobs: Job[]) {
respond(ws, messageId, jobs);
respond(ws, Date.now(), messageId, jobs);
});
}

async function respondJobCommand(ws: WebSocketClient, queue: Queue, msg: any) {
const data = msg.data;
const startTime = Date.now();
const job = await queue.getJob(data.jobId);

switch (data.cmd) {
Expand All @@ -48,7 +49,7 @@ async function respondJobCommand(ws: WebSocketClient, queue: Queue, msg: any) {
`Missing command ${data.cmd}. Too old version of taskforce-connector?`
);
}
respond(ws, msg.id);
respond(ws, startTime, msg.id);
}

async function respondQueueCommand(
Expand All @@ -57,26 +58,27 @@ async function respondQueueCommand(
msg: any
) {
const data = msg.data;
const startTime = Date.now();
switch (data.cmd) {
case "getJob":
const job = await queue.getJob(data.jobId);
respond(ws, msg.id, job);
respond(ws, startTime, msg.id, job);
break;
case "getJobCounts":
const jobCounts = await queue.getJobCounts();
respond(ws, msg.id, jobCounts);
respond(ws, startTime, msg.id, jobCounts);
break;
case "getMetrics":
const metrics = await (<any>queue).getMetrics(
data.type,
data.start,
data.end
);
respond(ws, msg.id, metrics);
respond(ws, startTime, msg.id, metrics);
break;
case "getDependencies":
const dependencies = await queue.getDependencies(data.parentId, data.type, data.start, data.end);
respond(ws, msg.id, dependencies);
respond(ws, startTime, msg.id, dependencies);
break;

case "getWaitingChildren":
Expand All @@ -92,7 +94,7 @@ async function respondQueueCommand(

case "getJobLogs":
const logs = await queue.getJobLogs(data.jobId, data.start, data.end);
respond(ws, msg.id, logs);
respond(ws, startTime, msg.id, logs);

case "getWaitingChildrenCount":
case "getWaitingCount":
Expand All @@ -102,57 +104,57 @@ async function respondQueueCommand(
case "getFailedCount":
case "getRepeatableCount":
const count = await (<any>queue)[data.cmd]();
respond(ws, msg.id, count);
respond(ws, startTime, msg.id, count);
break;
case "getWorkersCount":
const workers = await queue.getWorkers();
respond(ws, msg.id, workers.length);
respond(ws, startTime, msg.id, workers.length);
break;
case "removeRepeatableByKey":
await queue.removeRepeatableByKey(data.key);
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "add":
const [name, jobData, opts] = data.args as [string, object, object];
await queue.add(name, jobData, opts);
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "empty":
await queue.drain();
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "pause":
await queue.pause();
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "resume":
await queue.resume();
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "isPaused":
const isPaused = await queue.isPaused();
respond(ws, msg.id, isPaused);
respond(ws, startTime, msg.id, isPaused);
break;
case "obliterate":
await queue.obliterate();
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "clean":
await queue.clean(data.grace, data.limit, data.status);
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
case "retryJobs":
await (<any>queue).retryJobs({
status: data.status,
count: data.count,
});
respond(ws, msg.id);
respond(ws, startTime, msg.id);
break;
default:
console.error(
`Missing command ${data.cmd}. Too old version of taskforce-connector?`
);
respond(ws, msg.id, null);
respond(ws, startTime, msg.id, null);
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/responders/respond.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { WebSocketClient } from "../ws-autoreconnect";

export const respond = (ws: WebSocketClient, id: string, data: any = {}) => {
export const respond = (ws: WebSocketClient, startTime: number, id: string, data: any = {}) => {
const response = JSON.stringify({
id,
data,
});
ws.send(response);
ws.send(response, startTime);
};
19 changes: 12 additions & 7 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ export const Socket = (
};

ws.onmessage = async function incoming(input: string) {
const startTime = Date.now();

console.log(
`${chalk.yellow("WebSocket:")} ${chalk.blueBright("received")} %s`,
input
Expand Down Expand Up @@ -102,7 +104,7 @@ export const Socket = (
connection: name,
team,
version,
})
}), startTime
);
} else {
const msg = JSON.parse(input);
Expand Down Expand Up @@ -138,7 +140,7 @@ export const Socket = (
JSON.stringify({
id: msg.id,
err: "Queue not found",
})
}), startTime
);
} else {
switch (res) {
Expand All @@ -159,7 +161,10 @@ export const Socket = (
};

async function respondConnectionCommand(connection: Connection, msg: any) {
const startTime = Date.now();

const data = msg.data;

switch (data.cmd) {
case "ping":
const pong = await ping(redisOpts, nodes);
Expand All @@ -178,7 +183,7 @@ export const Socket = (

logSendingQueues(queues)

respond(msg.id, {
respond(msg.id, startTime, {
queues,
connection: name,
team,
Expand All @@ -192,7 +197,7 @@ export const Socket = (

logSendingQueues(queues);

respond(msg.id, queues);
respond(msg.id, startTime, queues);
}
break;
case "getInfo":
Expand All @@ -206,7 +211,7 @@ export const Socket = (
(client) => getQueueType(data.name, data.prefix, client),
nodes
);
respond(msg.id, { queueType });
respond(msg.id, startTime, { queueType });
break;
}
}
Expand All @@ -226,12 +231,12 @@ export const Socket = (
}
}

function respond(id: string, data: any = {}) {
function respond(id: string, startTime: number, data: any = {}) {
const response = JSON.stringify({
id,
data,
});
ws.send(response);
ws.send(response, startTime);
}
};

Expand Down
5 changes: 3 additions & 2 deletions lib/ws-autoreconnect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export class WebSocketClient {
});
}

send(data: string, option?: {
send(data: string, startTime: number, option?: {
mask?: boolean;
binary?: boolean;
compress?: boolean;
Expand All @@ -76,7 +76,8 @@ export class WebSocketClient {
);
} else {
console.log(
`${chalk.yellow("WebSocket:")} ${chalk.blue("data sent successfully")}`
`${chalk.yellow("WebSocket:")} ${chalk.blue("data sent successfully in")} ${chalk.green(
Date.now() - startTime)}ms`
);
}
});
Expand Down

0 comments on commit 9b461f7

Please sign in to comment.