diff --git a/js/ai/src/telemetry.ts b/js/ai/src/telemetry.ts index c832e0606..4a19893a3 100644 --- a/js/ai/src/telemetry.ts +++ b/js/ai/src/telemetry.ts @@ -21,7 +21,11 @@ import { MetricCounter, MetricHistogram, } from '@genkit-ai/core/metrics'; -import { spanMetadataAls, traceMetadataAls } from '@genkit-ai/core/tracing'; +import { + spanMetadataAls, + toDisplayPath, + traceMetadataAls, +} from '@genkit-ai/core/tracing'; import { ValueType } from '@opentelemetry/api'; import { createHash } from 'crypto'; import { GenerateOptions } from './generate.js'; @@ -171,8 +175,9 @@ export function recordGenerateActionInputLogs( input: GenerateRequest ) { const flowName = traceMetadataAls?.getStore()?.flowName; - const path = spanMetadataAls?.getStore()?.path; - const sharedMetadata = { model, path, flowName }; + const qualifiedPath = spanMetadataAls?.getStore()?.path || ''; + const path = toDisplayPath(qualifiedPath); + const sharedMetadata = { model, path, qualifiedPath, flowName }; logger.logStructured(`Config[${path}, ${model}]`, { ...sharedMetadata, temperature: options.config?.temperature, @@ -207,8 +212,9 @@ export function recordGenerateActionOutputLogs( output: GenerateResponseData ) { const flowName = traceMetadataAls?.getStore()?.flowName; - const path = spanMetadataAls?.getStore()?.path; - const sharedMetadata = { model, path, flowName }; + const qualifiedPath = spanMetadataAls?.getStore()?.path || ''; + const path = toDisplayPath(qualifiedPath); + const sharedMetadata = { model, path, qualifiedPath, flowName }; const candidates = output.candidates.length; output.candidates.forEach((cand, candIdx) => { const parts = cand.message.content.length; diff --git a/js/core/src/tracing/instrumentation.ts b/js/core/src/tracing/instrumentation.ts index 6dcd3dae7..db091b4af 100644 --- a/js/core/src/tracing/instrumentation.ts +++ b/js/core/src/tracing/instrumentation.ts @@ -195,6 +195,12 @@ export function setCustomMetadataAttributes(values: Record) { } } +/** Converts a fully annotated path to a friendly display version for logs */ +export function toDisplayPath(path: string): string { + const pathPartRegex = /\{([^\,}]+),[^\}]+\}/g; + return Array.from(path.matchAll(pathPartRegex), (m) => m[1]).join(' > '); +} + function getCurrentSpan(): SpanMetadata { const step = spanMetadataAls.getStore(); if (!step) { diff --git a/js/flow/src/telemetry.ts b/js/flow/src/telemetry.ts index 5d65314a6..2a37df90a 100644 --- a/js/flow/src/telemetry.ts +++ b/js/flow/src/telemetry.ts @@ -24,6 +24,7 @@ import { import { PathMetadata, spanMetadataAls, + toDisplayPath, traceMetadataAls, } from '@genkit-ai/core/tracing'; import { ValueType } from '@opentelemetry/api'; @@ -57,9 +58,11 @@ const flowLatencies = new MetricHistogram(_N('latency'), { }); export function recordError(err: any) { - const path = spanMetadataAls?.getStore()?.path; + const qualifiedPath = spanMetadataAls?.getStore()?.path || ''; + const path = toDisplayPath(qualifiedPath); logger.logStructuredError(`Error[${path}, ${err.name}]`, { - path: path, + path, + qualifiedPath, name: err.name, message: err.message, stack: err.stack, @@ -90,7 +93,7 @@ export function writeFlowSuccess(flowName: string, latencyMs: number) { meta.path.includes(flowName) ); - logger.logStructured(`Paths[/${flowName}]`, { + logger.logStructured(`Paths[${flowName}]`, { flowName: flowName, paths: relevantPaths.map((p) => p.path), }); @@ -124,17 +127,18 @@ export function writeFlowFailure( flowCounter.add(1, dimensions); flowLatencies.record(latencyMs, dimensions); - const allPaths = + const allQualifiedPaths = traceMetadataAls.getStore()?.paths || new Set(); - if (allPaths) { - const failPath = spanMetadataAls?.getStore()?.path; - const relevantPaths = Array.from(allPaths).filter( - (meta) => meta.path.includes(flowName) && meta.path !== failPath + if (allQualifiedPaths) { + const qualifiedFailPath = spanMetadataAls?.getStore()?.path || ''; + const failPath = toDisplayPath(qualifiedFailPath); + const relevantPaths = Array.from(allQualifiedPaths).filter( + (meta) => meta.path.includes(flowName) && meta.path !== qualifiedFailPath ); - logger.logStructured(`Paths[/${flowName}]`, { + logger.logStructured(`Paths[${flowName}]`, { flowName: flowName, - paths: relevantPaths.map((p) => p.path), + paths: relevantPaths.map((p) => toDisplayPath(p.path)), }); const pathDimensions = { @@ -144,6 +148,7 @@ export function writeFlowFailure( }; // All paths that have succeeded need to be tracked as succeeded. relevantPaths.forEach((p) => { + const path = toDisplayPath(p.path); pathCounter.add(1, { ...pathDimensions, status: 'success', @@ -161,20 +166,22 @@ export function writeFlowFailure( ...pathDimensions, status: 'failure', error: err.name, - path: failPath, + path: qualifiedFailPath, }); pathLatencies.record(latencyMs, { ...pathDimensions, status: 'failure', error: err.name, - path: failPath, + path: qualifiedFailPath, }); } } export function logRequest(flowName: string, req: express.Request) { - logger.logStructured(`Request[/${flowName}]`, { + const qualifiedPath = spanMetadataAls?.getStore()?.path || ''; + const path = toDisplayPath(qualifiedPath); + logger.logStructured(`Request[${flowName}]`, { flowName: flowName, headers: { ...req.headers, @@ -184,16 +191,20 @@ export function logRequest(flowName: string, req: express.Request) { body: req.body, query: req.query, originalUrl: req.originalUrl, - path: `/${flowName}`, + path, + qualifiedPath, source: 'ts', sourceVersion: GENKIT_VERSION, }); } export function logResponse(flowName: string, respCode: number, respBody: any) { - logger.logStructured(`Response[/${flowName}]`, { + const qualifiedPath = spanMetadataAls?.getStore()?.path || ''; + const path = toDisplayPath(qualifiedPath); + logger.logStructured(`Response[${flowName}]`, { flowName: flowName, - path: `/${flowName}`, + path, + qualifiedPath, code: respCode, body: respBody, source: 'ts', diff --git a/js/plugins/google-cloud/src/gcpLogger.ts b/js/plugins/google-cloud/src/gcpLogger.ts index 4c79354a4..3aa2fa062 100644 --- a/js/plugins/google-cloud/src/gcpLogger.ts +++ b/js/plugins/google-cloud/src/gcpLogger.ts @@ -16,8 +16,14 @@ import { LoggerConfig } from '@genkit-ai/core'; import { LoggingWinston } from '@google-cloud/logging-winston'; +import { Writable } from 'stream'; import { PluginOptions } from './index.js'; +/** + * Additional streams for writing log data to. Useful for unit testing. + */ +let additionalStream: Writable; + /** * Provides a {LoggerConfig} for exporting Genkit debug logs to GCP Cloud * logs. @@ -44,17 +50,25 @@ export class GcpLogger implements LoggerConfig { }), }; - const transport = this.shouldExport(env) - ? new LoggingWinston({ - projectId: this.options.projectId, - labels: { module: 'genkit' }, - prefix: 'genkit', - logName: 'genkit_log', - }) - : new winston.transports.Console(); + let transports: any[] = []; + transports.push( + this.shouldExport(env) + ? new LoggingWinston({ + projectId: this.options.projectId, + labels: { module: 'genkit' }, + prefix: 'genkit', + logName: 'genkit_log', + }) + : new winston.transports.Console() + ); + if (additionalStream) { + transports.push( + new winston.transports.Stream({ stream: additionalStream }) + ); + } return winston.createLogger({ - transports: [transport], + transports: transports, ...format, }); } @@ -63,3 +77,7 @@ export class GcpLogger implements LoggerConfig { return this.options.telemetryConfig?.forceDevExport || env !== 'dev'; } } + +export function __addTransportStreamForTesting(stream: Writable) { + additionalStream = stream; +} diff --git a/js/plugins/google-cloud/tests/logs_test.ts b/js/plugins/google-cloud/tests/logs_test.ts new file mode 100644 index 000000000..24e79a548 --- /dev/null +++ b/js/plugins/google-cloud/tests/logs_test.ts @@ -0,0 +1,229 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { generate } from '@genkit-ai/ai'; +import { defineModel } from '@genkit-ai/ai/model'; +import { + FlowState, + FlowStateQuery, + FlowStateQueryResponse, + FlowStateStore, + configureGenkit, +} from '@genkit-ai/core'; +import { registerFlowStateStore } from '@genkit-ai/core/registry'; +import { defineFlow, run, runFlow } from '@genkit-ai/flow'; +import { + __addTransportStreamForTesting, + googleCloud, +} from '@genkit-ai/google-cloud'; +import assert from 'node:assert'; +import { before, beforeEach, describe, it } from 'node:test'; +import { Writable } from 'stream'; +import { z } from 'zod'; + +describe('GoogleCloudLogs', () => { + let logLines = ''; + const logStream = new Writable(); + logStream._write = (chunk, encoding, next) => { + logLines = logLines += chunk.toString(); + next(); + }; + + before(async () => { + process.env.GENKIT_ENV = 'dev'; + __addTransportStreamForTesting(logStream); + const config = configureGenkit({ + // Force GCP Plugin to use in-memory metrics exporter + plugins: [ + googleCloud({ + telemetryConfig: { + forceDevExport: false, + metricExportIntervalMillis: 100, + metricExportTimeoutMillis: 100, + }, + }), + ], + enableTracingAndMetrics: true, + telemetry: { + instrumentation: 'googleCloud', + logger: 'googleCloud', + }, + }); + registerFlowStateStore('dev', async () => new NoOpFlowStateStore()); + // Wait for the telemetry plugin to be initialized + await config.getTelemetryConfig(); + await waitForLogsInit(); + }); + beforeEach(async () => { + logLines = ''; + }); + + it('writes path logs', async () => { + const testFlow = createFlow('testFlow'); + + await runFlow(testFlow); + + const logMessages = await getLogs(); + assert.equal(logMessages.includes('[info] Paths[testFlow]'), true); + }); + + it('writes error logs', async () => { + const testFlow = createFlow('testFlow', async () => { + const nothing = null; + nothing.something; + }); + + assert.rejects(async () => { + await runFlow(testFlow); + }); + + const logMessages = await getLogs(); + assert.equal( + logMessages.includes( + '[error] Error[testFlow, TypeError] Cannot read properties of null ' + + "(reading 'something')" + ), + true + ); + }); + + it('writes generate logs', async () => { + const testModel = createModel('testModel', async () => { + return { + candidates: [ + { + index: 0, + finishReason: 'stop', + message: { + role: 'user', + content: [ + { + text: 'response', + }, + ], + }, + }, + ], + usage: { + inputTokens: 10, + outputTokens: 14, + inputCharacters: 8, + outputCharacters: 16, + inputImages: 1, + outputImages: 3, + }, + }; + }); + const testFlow = createFlow('testFlow', async () => { + return await run('sub1', async () => { + return await run('sub2', async () => { + return await generate({ + model: testModel, + prompt: 'test prompt', + config: { + temperature: 1.0, + topK: 3, + topP: 5, + maxOutputTokens: 7, + }, + }); + }); + }); + }); + + await runFlow(testFlow); + + const logMessages = await getLogs(); + assert.equal( + logMessages.includes('[info] Config[testFlow > sub1 > sub2, testModel]'), + true + ); + assert.equal( + logMessages.includes('[info] Input[testFlow > sub1 > sub2, testModel]'), + true + ); + assert.equal( + logMessages.includes('[info] Output[testFlow > sub1 > sub2, testModel]'), + true + ); + }); + + /** Helper to create a flow with no inputs or outputs */ + function createFlow(name: string, fn: () => Promise = async () => {}) { + return defineFlow( + { + name, + inputSchema: z.void(), + outputSchema: z.void(), + }, + fn + ); + } + + /** + * Helper to create a model that returns the value produced by the given + * response function. + */ + function createModel( + name: string, + respFn: () => Promise + ) { + return defineModel({ name }, (req) => respFn()); + } + + async function waitForLogsInit() { + await import('winston'); + const testFlow = createFlow('testFlow'); + await runFlow(testFlow); + await getLogs(1); + } + + async function getLogs( + logCount: number = 1, + maxAttempts: number = 100 + ): promise { + var attempts = 0; + while (attempts++ < maxAttempts) { + await new Promise((resolve) => setTimeout(resolve, 100)); + const found = logLines + .trim() + .split('\n') + .map((l) => l.trim()); + if (found.length >= logCount) { + return found; + } + } + assert.fail(`Waiting for logs, but none have been written.`); + } +}); + +class NoOpFlowStateStore implements FlowStateStore { + state: Record = {}; + + load(id: string): Promise { + return Promise.resolve(undefined); + } + + save(id: string, state: FlowState): Promise { + return Promise.resolve(); + } + + async list( + query?: FlowStateQuery | undefined + ): Promise { + return {}; + } +}