Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ export class ClickhouseEventRepository implements IEventRepository {
task_identifier: event.taskSlug,
run_id: event.runId,
start_time: formatClickhouseDate64NanosecondsEpochString(event.startTime.toString()),
duration: (event.duration ?? 0).toString(),
duration: formatClickhouseUnsignedIntegerString(event.duration ?? 0),
trace_id: event.traceId,
span_id: event.spanId,
parent_span_id: event.parentId ?? "",
Expand Down Expand Up @@ -432,7 +432,9 @@ export class ClickhouseEventRepository implements IEventRepository {
const startTime = options.startTime ?? getNowInNanoseconds();
const duration =
options.duration ??
(options.endTime ? calculateDurationFromStart(startTime, options.endTime) : 100);
(options.endTime
? calculateDurationFromStart(startTime, options.endTime, 100 * 1_000_000)
: 100);

const traceId = propagatedContext?.traceparent?.traceId ?? generateTraceId();
const parentId = options.parentId ?? propagatedContext?.traceparent?.spanId;
Expand Down Expand Up @@ -460,7 +462,7 @@ export class ClickhouseEventRepository implements IEventRepository {
task_identifier: options.taskSlug,
run_id: options.attributes.runId,
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
duration: duration.toString(),
duration: formatClickhouseUnsignedIntegerString(duration),
trace_id: traceId,
span_id: spanId,
parent_span_id: parentId ?? "",
Expand Down Expand Up @@ -561,7 +563,7 @@ export class ClickhouseEventRepository implements IEventRepository {
task_identifier: options.taskSlug,
run_id: options.attributes.runId,
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
duration: String(options.incomplete ? 0 : duration),
duration: formatClickhouseUnsignedIntegerString(options.incomplete ? 0 : duration),
trace_id: traceId,
span_id: spanId,
parent_span_id: parentId ?? "",
Expand Down Expand Up @@ -595,7 +597,7 @@ export class ClickhouseEventRepository implements IEventRepository {
task_identifier: options.taskSlug,
run_id: options.attributes.runId,
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
duration: String(options.incomplete ? 0 : duration),
duration: formatClickhouseUnsignedIntegerString(options.incomplete ? 0 : duration),
trace_id: traceId,
span_id: spanId,
parent_span_id: parentId ?? "",
Expand Down Expand Up @@ -644,7 +646,9 @@ export class ClickhouseEventRepository implements IEventRepository {
task_identifier: run.taskIdentifier,
run_id: run.friendlyId,
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(),
duration: formatClickhouseUnsignedIntegerString(
calculateDurationFromStart(startTime, endTime ?? new Date())
),
trace_id: run.traceId,
span_id: run.spanId,
parent_span_id: run.parentSpanId ?? "",
Expand Down Expand Up @@ -692,7 +696,9 @@ export class ClickhouseEventRepository implements IEventRepository {
task_identifier: run.taskIdentifier,
run_id: blockedRun.friendlyId,
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(),
duration: formatClickhouseUnsignedIntegerString(
calculateDurationFromStart(startTime, endTime ?? new Date())
),
trace_id: blockedRun.traceId,
span_id: spanId,
parent_span_id: parentSpanId,
Expand Down Expand Up @@ -732,7 +738,9 @@ export class ClickhouseEventRepository implements IEventRepository {
task_identifier: run.taskIdentifier,
run_id: run.friendlyId,
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(),
duration: formatClickhouseUnsignedIntegerString(
calculateDurationFromStart(startTime, endTime ?? new Date())
),
trace_id: run.traceId,
span_id: run.spanId,
parent_span_id: run.parentSpanId ?? "",
Expand Down Expand Up @@ -778,7 +786,9 @@ export class ClickhouseEventRepository implements IEventRepository {
task_identifier: run.taskIdentifier,
run_id: run.friendlyId,
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
duration: calculateDurationFromStart(startTime, endTime ?? new Date()).toString(),
duration: formatClickhouseUnsignedIntegerString(
calculateDurationFromStart(startTime, endTime ?? new Date())
),
trace_id: run.traceId,
span_id: run.spanId,
parent_span_id: run.parentSpanId ?? "",
Expand Down Expand Up @@ -868,7 +878,9 @@ export class ClickhouseEventRepository implements IEventRepository {
task_identifier: run.taskIdentifier,
run_id: run.friendlyId,
start_time: formatClickhouseDate64NanosecondsEpochString(startTime.toString()),
duration: calculateDurationFromStart(startTime, cancelledAt).toString(),
duration: formatClickhouseUnsignedIntegerString(
calculateDurationFromStart(startTime, cancelledAt)
),
trace_id: run.traceId,
span_id: run.spanId,
parent_span_id: run.parentSpanId ?? "",
Expand Down Expand Up @@ -1841,3 +1853,15 @@ function convertClickhouseDate64NanosecondsEpochStringToBigInt(date: string): bi
const parts = date.split(".");
return BigInt(parts.join(""));
}

function formatClickhouseUnsignedIntegerString(value: number | bigint): string {
if (value < 0) {
return "0";
}

if (typeof value === "bigint") {
return value.toString();
}

return Math.floor(value).toString();
}
Comment on lines +1857 to +1867
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Harden formatter against NaN/Infinity and negative BigInt

Currently, NaN/Infinity will serialize as "NaN"/"Infinity", causing ClickHouse insert failures. Also guard negative BigInts.

Apply:

-function formatClickhouseUnsignedIntegerString(value: number | bigint): string {
-  if (value < 0) {
-    return "0";
-  }
-
-  if (typeof value === "bigint") {
-    return value.toString();
-  }
-
-  return Math.floor(value).toString();
-}
+function formatClickhouseUnsignedIntegerString(value: number | bigint): string {
+  if (typeof value === "bigint") {
+    return value < 0n ? "0" : value.toString();
+  }
+  if (!Number.isFinite(value) || value <= 0) {
+    return "0";
+  }
+  const floored = Math.floor(value);
+  return floored <= 0 ? "0" : String(floored);
+}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
function formatClickhouseUnsignedIntegerString(value: number | bigint): string {
if (value < 0) {
return "0";
}
if (typeof value === "bigint") {
return value.toString();
}
return Math.floor(value).toString();
}
function formatClickhouseUnsignedIntegerString(value: number | bigint): string {
if (typeof value === "bigint") {
return value < 0n ? "0" : value.toString();
}
if (!Number.isFinite(value) || value <= 0) {
return "0";
}
const floored = Math.floor(value);
return floored <= 0 ? "0" : String(floored);
}
🤖 Prompt for AI Agents
In apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts around
lines 1857 to 1867, the formatter currently allows NaN/Infinity and negative
BigInts to be serialized which breaks ClickHouse inserts; update the function to
first detect and return "0" for non-finite numbers (Number.isFinite(value) or
Number.isNaN checks) and for numbers that are finite use
Math.floor(value).toString(); for bigints, explicitly guard negative values
(value < 0n) and return "0" for negatives, otherwise return value.toString();
ensure the checks are ordered so typeof value === "bigint" is used for
BigInt-specific comparison and numbers are validated with isFinite/NaN before
flooring.

14 changes: 12 additions & 2 deletions apps/webapp/app/v3/eventRepository/common.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,20 @@ export function getDateFromNanoseconds(nanoseconds: bigint): Date {
return new Date(Number(nanoseconds) / 1_000_000);
}

export function calculateDurationFromStart(startTime: bigint, endTime: Date = new Date()) {
export function calculateDurationFromStart(
startTime: bigint,
endTime: Date = new Date(),
minimumDuration?: number
) {
const $endtime = typeof endTime === "string" ? new Date(endTime) : endTime;

return Number(BigInt($endtime.getTime() * 1_000_000) - startTime);
const duration = Number(BigInt($endtime.getTime() * 1_000_000) - startTime);

if (minimumDuration && duration < minimumDuration) {
return minimumDuration;
}

return duration;
}

export function calculateDurationFromStartJsDate(startTime: Date, endTime: Date = new Date()) {
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/v3/eventRepository/index.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { env } from "~/env.server";
import { eventRepository } from "./eventRepository.server";
import { clickhouseEventRepository } from "./clickhouseEventRepositoryInstance.server";
import { IEventRepository, TraceEventOptions } from "./eventRepository.types";
import { $replica } from "~/db.server";
import { $replica, prisma } from "~/db.server";
import { logger } from "~/services/logger.server";
import { FEATURE_FLAG, flags } from "../featureFlags.server";
import { getTaskEventStore } from "../taskEventStore.server";
Expand Down Expand Up @@ -145,7 +145,7 @@ async function recordRunEvent(
}

async function findRunForEventCreation(runId: string) {
return $replica.taskRun.findFirst({
return prisma.taskRun.findFirst({
where: {
id: runId,
},
Expand Down
8 changes: 7 additions & 1 deletion apps/webapp/app/v3/runEngineHandlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,13 @@ export function registerRunEngineEventBusHandlers() {

engine.eventBus.on("runRetryScheduled", async ({ time, run, environment, retryAt }) => {
try {
let retryMessage = `Retry #${run.attemptNumber} delay`;
if (retryAt && time && time >= retryAt) {
return;
}

let retryMessage = `Retry ${
typeof run.attemptNumber === "number" ? `#${run.attemptNumber - 1}` : ""
} delay`;

if (run.nextMachineAfterOOM) {
retryMessage += ` after OOM`;
Expand Down