From af5b35e6ec080ef5c5b8260e0bd3daf4905ce702 Mon Sep 17 00:00:00 2001 From: Jeff Huang Date: Tue, 30 Sep 2025 11:31:49 -0500 Subject: [PATCH] rt telemetry --- genkit-tools/telemetry-server/src/index.ts | 119 +++++++++++++++++++++ js/core/src/action.ts | 7 ++ js/core/src/reflection.ts | 3 - 3 files changed, 126 insertions(+), 3 deletions(-) diff --git a/genkit-tools/telemetry-server/src/index.ts b/genkit-tools/telemetry-server/src/index.ts index d934fb9968..83b5b92b3e 100644 --- a/genkit-tools/telemetry-server/src/index.ts +++ b/genkit-tools/telemetry-server/src/index.ts @@ -46,12 +46,117 @@ export async function startTelemetryServer(params: { await params.traceStore.init(); const api = express(); + // In-memory registry of streaming listeners keyed by traceId + const listeners = new Map>(); + + function getListeners(traceId: string): Set { + let set = listeners.get(traceId); + if (!set) { + set = new Set(); + listeners.set(traceId, set); + } + return set; + } + + function findRootSpan(snapshot: any): any | undefined { + if (!snapshot || !snapshot.spans) return undefined; + const spans = Object.values(snapshot.spans) as any[]; + return spans.find((s) => !s.parentSpanId); + } + + function broadcastTrace(traceId: string, snapshot: any) { + const subs = listeners.get(traceId); + if (!subs || subs.size === 0) return; + const line = JSON.stringify(snapshot) + '\n'; + for (const res of subs) { + try { + res.write(line); + } catch (_) { + // Best-effort write; ignore failures on individual clients + } + } + const root = findRootSpan(snapshot); + if (root && typeof root.endTime === 'number') { + // Root span ended: close all listeners for this trace + for (const res of subs) { + try { + res.end(); + } catch (_) { + // ignore + } + } + listeners.delete(traceId); + } + } + + function broadcastDelta(traceId: string, spansDelta: Record) { + const subs = listeners.get(traceId); + if (!subs || subs.size === 0) return; + const payload = { type: 'upsert', traceId, spans: spansDelta }; + const line = JSON.stringify(payload) + '\n'; + for (const res of subs) { + try { + res.write(line); + } catch (_) {} + } + } + + function broadcastDone(traceId: string) { + const subs = listeners.get(traceId); + if (!subs || subs.size === 0) return; + const line = JSON.stringify({ type: 'done', traceId }) + '\n'; + for (const res of subs) { + try { + res.write(line); + res.end(); + } catch (_) {} + } + listeners.delete(traceId); + } + api.use(express.json({ limit: params.maxRequestBodySize ?? '30mb' })); api.get('/api/__health', async (_, response) => { response.status(200).send('OK'); }); + // Streaming endpoint for live trace updates (NDJSON over chunked HTTP) + api.get('/api/traces/:traceId/stream', async (request, response, next) => { + try { + const { traceId } = request.params; + response.writeHead(200, { + 'Content-Type': 'text/plain', + 'Cache-Control': 'no-cache', + 'Connection': 'keep-alive', + 'Transfer-Encoding': 'chunked', + 'Access-Control-Allow-Origin': '*', + }); + + // Send initial snapshot if present + const snapshot = await params.traceStore.load(traceId); + if (snapshot) { + response.write(JSON.stringify(snapshot) + '\n'); + const root = findRootSpan(snapshot); + if (root && typeof root.endTime === 'number') { + response.end(); + return; + } + } + + // Register listener + getListeners(traceId).add(response); + + // Cleanup on client disconnect + (request as any).on('close', () => { + const set = listeners.get(traceId); + set?.delete(response); + if (set && set.size === 0) listeners.delete(traceId); + }); + } catch (e) { + next(e); + } + }); + api.get('/api/traces/:traceId', async (request, response, next) => { try { const { traceId } = request.params; @@ -65,6 +170,20 @@ export async function startTelemetryServer(params: { try { const traceData = TraceDataSchema.parse(request.body); await params.traceStore.save(traceData.traceId, traceData); + // After saving, broadcast delta (upsert spans) to any listeners + try { + if (traceData.spans && Object.keys(traceData.spans).length > 0) { + broadcastDelta(traceData.traceId, traceData.spans); + } + // If root is now ended, emit done and close listeners + const snapshot = await params.traceStore.load(traceData.traceId); + const root = findRootSpan(snapshot); + if (root && typeof root.endTime === 'number') { + broadcastDone(traceData.traceId); + } + } catch (_) { + // Best-effort broadcast; do not fail the write path + } response.status(200).send('OK'); } catch (e) { next(e); diff --git a/js/core/src/action.ts b/js/core/src/action.ts index a767dcedf0..e3bc80c5a7 100644 --- a/js/core/src/action.ts +++ b/js/core/src/action.ts @@ -344,6 +344,13 @@ export function action< metadata.name = actionName; metadata.input = input; + // Emit an initial chunk with just the traceId + try { + if (options?.onChunk) { + options.onChunk({ telemetry: {traceId }}); + } + } catch {} + try { const actFn = () => fn(input, { diff --git a/js/core/src/reflection.ts b/js/core/src/reflection.ts index 659bd40b30..1b68c78dab 100644 --- a/js/core/src/reflection.ts +++ b/js/core/src/reflection.ts @@ -180,9 +180,6 @@ export class ReflectionServer { response.write( JSON.stringify({ result: result.result, - telemetry: { - traceId: result.telemetry.traceId, - }, } as RunActionResponse) ); response.end();