Skip to content

Commit

Permalink
Updates logs to write friendly display paths instead of the annotated…
Browse files Browse the repository at this point in the history
… version (#504)
  • Loading branch information
bryanatkinson authored Jun 27, 2024
1 parent f362268 commit 39e88b0
Show file tree
Hide file tree
Showing 5 changed files with 300 additions and 30 deletions.
16 changes: 11 additions & 5 deletions js/ai/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import {
MetricCounter,
MetricHistogram,
} from '@genkit-ai/core/metrics';
import { spanMetadataAls, traceMetadataAls } from '@genkit-ai/core/tracing';
import {
spanMetadataAls,
toDisplayPath,
traceMetadataAls,
} from '@genkit-ai/core/tracing';
import { ValueType } from '@opentelemetry/api';
import { createHash } from 'crypto';
import { GenerateOptions } from './generate.js';
Expand Down Expand Up @@ -171,8 +175,9 @@ export function recordGenerateActionInputLogs(
input: GenerateRequest
) {
const flowName = traceMetadataAls?.getStore()?.flowName;
const path = spanMetadataAls?.getStore()?.path;
const sharedMetadata = { model, path, flowName };
const qualifiedPath = spanMetadataAls?.getStore()?.path || '';
const path = toDisplayPath(qualifiedPath);
const sharedMetadata = { model, path, qualifiedPath, flowName };
logger.logStructured(`Config[${path}, ${model}]`, {
...sharedMetadata,
temperature: options.config?.temperature,
Expand Down Expand Up @@ -207,8 +212,9 @@ export function recordGenerateActionOutputLogs(
output: GenerateResponseData
) {
const flowName = traceMetadataAls?.getStore()?.flowName;
const path = spanMetadataAls?.getStore()?.path;
const sharedMetadata = { model, path, flowName };
const qualifiedPath = spanMetadataAls?.getStore()?.path || '';
const path = toDisplayPath(qualifiedPath);
const sharedMetadata = { model, path, qualifiedPath, flowName };
const candidates = output.candidates.length;
output.candidates.forEach((cand, candIdx) => {
const parts = cand.message.content.length;
Expand Down
6 changes: 6 additions & 0 deletions js/core/src/tracing/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ export function setCustomMetadataAttributes(values: Record<string, string>) {
}
}

/** Converts a fully annotated path to a friendly display version for logs */
export function toDisplayPath(path: string): string {
const pathPartRegex = /\{([^\,}]+),[^\}]+\}/g;
return Array.from(path.matchAll(pathPartRegex), (m) => m[1]).join(' > ');
}

function getCurrentSpan(): SpanMetadata {
const step = spanMetadataAls.getStore();
if (!step) {
Expand Down
43 changes: 27 additions & 16 deletions js/flow/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
import {
PathMetadata,
spanMetadataAls,
toDisplayPath,
traceMetadataAls,
} from '@genkit-ai/core/tracing';
import { ValueType } from '@opentelemetry/api';
Expand Down Expand Up @@ -57,9 +58,11 @@ const flowLatencies = new MetricHistogram(_N('latency'), {
});

export function recordError(err: any) {
const path = spanMetadataAls?.getStore()?.path;
const qualifiedPath = spanMetadataAls?.getStore()?.path || '';
const path = toDisplayPath(qualifiedPath);
logger.logStructuredError(`Error[${path}, ${err.name}]`, {
path: path,
path,
qualifiedPath,
name: err.name,
message: err.message,
stack: err.stack,
Expand Down Expand Up @@ -90,7 +93,7 @@ export function writeFlowSuccess(flowName: string, latencyMs: number) {
meta.path.includes(flowName)
);

logger.logStructured(`Paths[/${flowName}]`, {
logger.logStructured(`Paths[${flowName}]`, {
flowName: flowName,
paths: relevantPaths.map((p) => p.path),
});
Expand Down Expand Up @@ -124,17 +127,18 @@ export function writeFlowFailure(
flowCounter.add(1, dimensions);
flowLatencies.record(latencyMs, dimensions);

const allPaths =
const allQualifiedPaths =
traceMetadataAls.getStore()?.paths || new Set<PathMetadata>();
if (allPaths) {
const failPath = spanMetadataAls?.getStore()?.path;
const relevantPaths = Array.from(allPaths).filter(
(meta) => meta.path.includes(flowName) && meta.path !== failPath
if (allQualifiedPaths) {
const qualifiedFailPath = spanMetadataAls?.getStore()?.path || '';
const failPath = toDisplayPath(qualifiedFailPath);
const relevantPaths = Array.from(allQualifiedPaths).filter(
(meta) => meta.path.includes(flowName) && meta.path !== qualifiedFailPath
);

logger.logStructured(`Paths[/${flowName}]`, {
logger.logStructured(`Paths[${flowName}]`, {
flowName: flowName,
paths: relevantPaths.map((p) => p.path),
paths: relevantPaths.map((p) => toDisplayPath(p.path)),
});

const pathDimensions = {
Expand All @@ -144,6 +148,7 @@ export function writeFlowFailure(
};
// All paths that have succeeded need to be tracked as succeeded.
relevantPaths.forEach((p) => {
const path = toDisplayPath(p.path);
pathCounter.add(1, {
...pathDimensions,
status: 'success',
Expand All @@ -161,20 +166,22 @@ export function writeFlowFailure(
...pathDimensions,
status: 'failure',
error: err.name,
path: failPath,
path: qualifiedFailPath,
});

pathLatencies.record(latencyMs, {
...pathDimensions,
status: 'failure',
error: err.name,
path: failPath,
path: qualifiedFailPath,
});
}
}

export function logRequest(flowName: string, req: express.Request) {
logger.logStructured(`Request[/${flowName}]`, {
const qualifiedPath = spanMetadataAls?.getStore()?.path || '';
const path = toDisplayPath(qualifiedPath);
logger.logStructured(`Request[${flowName}]`, {
flowName: flowName,
headers: {
...req.headers,
Expand All @@ -184,16 +191,20 @@ export function logRequest(flowName: string, req: express.Request) {
body: req.body,
query: req.query,
originalUrl: req.originalUrl,
path: `/${flowName}`,
path,
qualifiedPath,
source: 'ts',
sourceVersion: GENKIT_VERSION,
});
}

export function logResponse(flowName: string, respCode: number, respBody: any) {
logger.logStructured(`Response[/${flowName}]`, {
const qualifiedPath = spanMetadataAls?.getStore()?.path || '';
const path = toDisplayPath(qualifiedPath);
logger.logStructured(`Response[${flowName}]`, {
flowName: flowName,
path: `/${flowName}`,
path,
qualifiedPath,
code: respCode,
body: respBody,
source: 'ts',
Expand Down
36 changes: 27 additions & 9 deletions js/plugins/google-cloud/src/gcpLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@

import { LoggerConfig } from '@genkit-ai/core';
import { LoggingWinston } from '@google-cloud/logging-winston';
import { Writable } from 'stream';
import { PluginOptions } from './index.js';

/**
* Additional streams for writing log data to. Useful for unit testing.
*/
let additionalStream: Writable;

/**
* Provides a {LoggerConfig} for exporting Genkit debug logs to GCP Cloud
* logs.
Expand All @@ -44,17 +50,25 @@ export class GcpLogger implements LoggerConfig {
}),
};

const transport = this.shouldExport(env)
? new LoggingWinston({
projectId: this.options.projectId,
labels: { module: 'genkit' },
prefix: 'genkit',
logName: 'genkit_log',
})
: new winston.transports.Console();
let transports: any[] = [];
transports.push(
this.shouldExport(env)
? new LoggingWinston({
projectId: this.options.projectId,
labels: { module: 'genkit' },
prefix: 'genkit',
logName: 'genkit_log',
})
: new winston.transports.Console()
);
if (additionalStream) {
transports.push(
new winston.transports.Stream({ stream: additionalStream })
);
}

return winston.createLogger({
transports: [transport],
transports: transports,
...format,
});
}
Expand All @@ -63,3 +77,7 @@ export class GcpLogger implements LoggerConfig {
return this.options.telemetryConfig?.forceDevExport || env !== 'dev';
}
}

export function __addTransportStreamForTesting(stream: Writable) {
additionalStream = stream;
}
Loading

0 comments on commit 39e88b0

Please sign in to comment.