Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 119 additions & 0 deletions genkit-tools/telemetry-server/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,117 @@ export async function startTelemetryServer(params: {
await params.traceStore.init();
const api = express();

// In-memory registry of streaming listeners keyed by traceId
const listeners = new Map<string, Set<http.ServerResponse>>();

function getListeners(traceId: string): Set<http.ServerResponse> {
let set = listeners.get(traceId);
if (!set) {
set = new Set<http.ServerResponse>();
listeners.set(traceId, set);
}
return set;
}

function findRootSpan(snapshot: any): any | undefined {
if (!snapshot || !snapshot.spans) return undefined;
const spans = Object.values(snapshot.spans) as any[];
return spans.find((s) => !s.parentSpanId);
}

function broadcastTrace(traceId: string, snapshot: any) {
const subs = listeners.get(traceId);
if (!subs || subs.size === 0) return;
const line = JSON.stringify(snapshot) + '\n';
for (const res of subs) {
try {
res.write(line);
} catch (_) {
// Best-effort write; ignore failures on individual clients
}
}
const root = findRootSpan(snapshot);
if (root && typeof root.endTime === 'number') {
// Root span ended: close all listeners for this trace
for (const res of subs) {
try {
res.end();
} catch (_) {
// ignore
}
}
listeners.delete(traceId);
}
}

function broadcastDelta(traceId: string, spansDelta: Record<string, any>) {
const subs = listeners.get(traceId);
if (!subs || subs.size === 0) return;
const payload = { type: 'upsert', traceId, spans: spansDelta };
const line = JSON.stringify(payload) + '\n';
for (const res of subs) {
try {
res.write(line);
} catch (_) {}
}
}

function broadcastDone(traceId: string) {
const subs = listeners.get(traceId);
if (!subs || subs.size === 0) return;
const line = JSON.stringify({ type: 'done', traceId }) + '\n';
for (const res of subs) {
try {
res.write(line);
res.end();
} catch (_) {}
}
listeners.delete(traceId);
}

api.use(express.json({ limit: params.maxRequestBodySize ?? '30mb' }));

api.get('/api/__health', async (_, response) => {
response.status(200).send('OK');
});

// Streaming endpoint for live trace updates (NDJSON over chunked HTTP)
api.get('/api/traces/:traceId/stream', async (request, response, next) => {
try {
const { traceId } = request.params;
response.writeHead(200, {
'Content-Type': 'text/plain',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Transfer-Encoding': 'chunked',
'Access-Control-Allow-Origin': '*',
});

// Send initial snapshot if present
const snapshot = await params.traceStore.load(traceId);
if (snapshot) {
response.write(JSON.stringify(snapshot) + '\n');
const root = findRootSpan(snapshot);
if (root && typeof root.endTime === 'number') {
response.end();
return;
}
}

// Register listener
getListeners(traceId).add(response);

// Cleanup on client disconnect
(request as any).on('close', () => {
const set = listeners.get(traceId);
set?.delete(response);
if (set && set.size === 0) listeners.delete(traceId);
});
} catch (e) {
next(e);
}
});

api.get('/api/traces/:traceId', async (request, response, next) => {
try {
const { traceId } = request.params;
Expand All @@ -65,6 +170,20 @@ export async function startTelemetryServer(params: {
try {
const traceData = TraceDataSchema.parse(request.body);
await params.traceStore.save(traceData.traceId, traceData);
// After saving, broadcast delta (upsert spans) to any listeners
try {
if (traceData.spans && Object.keys(traceData.spans).length > 0) {
broadcastDelta(traceData.traceId, traceData.spans);
}
// If root is now ended, emit done and close listeners
const snapshot = await params.traceStore.load(traceData.traceId);
const root = findRootSpan(snapshot);
if (root && typeof root.endTime === 'number') {
broadcastDone(traceData.traceId);
}
} catch (_) {
// Best-effort broadcast; do not fail the write path
}
response.status(200).send('OK');
} catch (e) {
next(e);
Expand Down
7 changes: 7 additions & 0 deletions js/core/src/action.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,13 @@ export function action<
metadata.name = actionName;
metadata.input = input;

// Emit an initial chunk with just the traceId
try {
if (options?.onChunk) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't think this approach will work... for one, what if its non-streaming call.
I think action.run needs another param (in ActionRunOptions), a callback of some sort, that will be called with the trace id when action starts. Then in the reflection api you can send the trace id in the headers.

options.onChunk({ telemetry: {traceId }});
}
} catch {}

try {
const actFn = () =>
fn(input, {
Expand Down
3 changes: 0 additions & 3 deletions js/core/src/reflection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,6 @@ export class ReflectionServer {
response.write(
JSON.stringify({
result: result.result,
telemetry: {
traceId: result.telemetry.traceId,
},
} as RunActionResponse)
);
response.end();
Expand Down
Loading