Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates logs to write friendly display paths instead of the annotated version #504

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions js/ai/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import {
MetricCounter,
MetricHistogram,
} from '@genkit-ai/core/metrics';
import { spanMetadataAls, traceMetadataAls } from '@genkit-ai/core/tracing';
import {
spanMetadataAls,
toDisplayPath,
traceMetadataAls,
} from '@genkit-ai/core/tracing';
import { ValueType } from '@opentelemetry/api';
import { createHash } from 'crypto';
import { GenerateOptions } from './generate.js';
Expand Down Expand Up @@ -171,8 +175,9 @@ export function recordGenerateActionInputLogs(
input: GenerateRequest
) {
const flowName = traceMetadataAls?.getStore()?.flowName;
const path = spanMetadataAls?.getStore()?.path;
const sharedMetadata = { model, path, flowName };
const qualifiedPath = spanMetadataAls?.getStore()?.path || '';
const path = toDisplayPath(qualifiedPath);
const sharedMetadata = { model, path, qualifiedPath, flowName };
logger.logStructured(`Config[${path}, ${model}]`, {
...sharedMetadata,
temperature: options.config?.temperature,
Expand Down Expand Up @@ -207,8 +212,9 @@ export function recordGenerateActionOutputLogs(
output: GenerateResponseData
) {
const flowName = traceMetadataAls?.getStore()?.flowName;
const path = spanMetadataAls?.getStore()?.path;
const sharedMetadata = { model, path, flowName };
const qualifiedPath = spanMetadataAls?.getStore()?.path || '';
const path = toDisplayPath(qualifiedPath);
const sharedMetadata = { model, path, qualifiedPath, flowName };
const candidates = output.candidates.length;
output.candidates.forEach((cand, candIdx) => {
const parts = cand.message.content.length;
Expand Down
6 changes: 6 additions & 0 deletions js/core/src/tracing/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,12 @@ export function setCustomMetadataAttributes(values: Record<string, string>) {
}
}

/** Converts a fully annotated path to a friendly display version for logs */
export function toDisplayPath(path: string): string {
const pathPartRegex = /\{([^\,}]+),[^\}]+\}/g;
return Array.from(path.matchAll(pathPartRegex), (m) => m[1]).join(' > ');
}

function getCurrentSpan(): SpanMetadata {
const step = spanMetadataAls.getStore();
if (!step) {
Expand Down
43 changes: 27 additions & 16 deletions js/flow/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
import {
PathMetadata,
spanMetadataAls,
toDisplayPath,
traceMetadataAls,
} from '@genkit-ai/core/tracing';
import { ValueType } from '@opentelemetry/api';
Expand Down Expand Up @@ -57,9 +58,11 @@ const flowLatencies = new MetricHistogram(_N('latency'), {
});

export function recordError(err: any) {
const path = spanMetadataAls?.getStore()?.path;
const qualifiedPath = spanMetadataAls?.getStore()?.path || '';
const path = toDisplayPath(qualifiedPath);
logger.logStructuredError(`Error[${path}, ${err.name}]`, {
path: path,
path,
qualifiedPath,
name: err.name,
message: err.message,
stack: err.stack,
Expand Down Expand Up @@ -90,7 +93,7 @@ export function writeFlowSuccess(flowName: string, latencyMs: number) {
meta.path.includes(flowName)
);

logger.logStructured(`Paths[/${flowName}]`, {
logger.logStructured(`Paths[${flowName}]`, {
flowName: flowName,
paths: relevantPaths.map((p) => p.path),
});
Expand Down Expand Up @@ -124,17 +127,18 @@ export function writeFlowFailure(
flowCounter.add(1, dimensions);
flowLatencies.record(latencyMs, dimensions);

const allPaths =
const allQualifiedPaths =
traceMetadataAls.getStore()?.paths || new Set<PathMetadata>();
if (allPaths) {
const failPath = spanMetadataAls?.getStore()?.path;
const relevantPaths = Array.from(allPaths).filter(
(meta) => meta.path.includes(flowName) && meta.path !== failPath
if (allQualifiedPaths) {
const qualifiedFailPath = spanMetadataAls?.getStore()?.path || '';
const failPath = toDisplayPath(qualifiedFailPath);
const relevantPaths = Array.from(allQualifiedPaths).filter(
(meta) => meta.path.includes(flowName) && meta.path !== qualifiedFailPath
);

logger.logStructured(`Paths[/${flowName}]`, {
logger.logStructured(`Paths[${flowName}]`, {
flowName: flowName,
paths: relevantPaths.map((p) => p.path),
paths: relevantPaths.map((p) => toDisplayPath(p.path)),
});

const pathDimensions = {
Expand All @@ -144,6 +148,7 @@ export function writeFlowFailure(
};
// All paths that have succeeded need to be tracked as succeeded.
relevantPaths.forEach((p) => {
const path = toDisplayPath(p.path);
pathCounter.add(1, {
...pathDimensions,
status: 'success',
Expand All @@ -161,20 +166,22 @@ export function writeFlowFailure(
...pathDimensions,
status: 'failure',
error: err.name,
path: failPath,
path: qualifiedFailPath,
});

pathLatencies.record(latencyMs, {
...pathDimensions,
status: 'failure',
error: err.name,
path: failPath,
path: qualifiedFailPath,
});
}
}

export function logRequest(flowName: string, req: express.Request) {
logger.logStructured(`Request[/${flowName}]`, {
const qualifiedPath = spanMetadataAls?.getStore()?.path || '';
const path = toDisplayPath(qualifiedPath);
logger.logStructured(`Request[${flowName}]`, {
flowName: flowName,
headers: {
...req.headers,
Expand All @@ -184,16 +191,20 @@ export function logRequest(flowName: string, req: express.Request) {
body: req.body,
query: req.query,
originalUrl: req.originalUrl,
path: `/${flowName}`,
path,
qualifiedPath,
source: 'ts',
sourceVersion: GENKIT_VERSION,
});
}

export function logResponse(flowName: string, respCode: number, respBody: any) {
logger.logStructured(`Response[/${flowName}]`, {
const qualifiedPath = spanMetadataAls?.getStore()?.path || '';
const path = toDisplayPath(qualifiedPath);
logger.logStructured(`Response[${flowName}]`, {
flowName: flowName,
path: `/${flowName}`,
path,
qualifiedPath,
code: respCode,
body: respBody,
source: 'ts',
Expand Down
36 changes: 27 additions & 9 deletions js/plugins/google-cloud/src/gcpLogger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@

import { LoggerConfig } from '@genkit-ai/core';
import { LoggingWinston } from '@google-cloud/logging-winston';
import { Writable } from 'stream';
import { PluginOptions } from './index.js';

/**
* Additional streams for writing log data to. Useful for unit testing.
*/
let additionalStream: Writable;

/**
* Provides a {LoggerConfig} for exporting Genkit debug logs to GCP Cloud
* logs.
Expand All @@ -44,17 +50,25 @@ export class GcpLogger implements LoggerConfig {
}),
};

const transport = this.shouldExport(env)
? new LoggingWinston({
projectId: this.options.projectId,
labels: { module: 'genkit' },
prefix: 'genkit',
logName: 'genkit_log',
})
: new winston.transports.Console();
let transports: any[] = [];
transports.push(
this.shouldExport(env)
? new LoggingWinston({
projectId: this.options.projectId,
labels: { module: 'genkit' },
prefix: 'genkit',
logName: 'genkit_log',
})
: new winston.transports.Console()
);
if (additionalStream) {
transports.push(
new winston.transports.Stream({ stream: additionalStream })
);
}

return winston.createLogger({
transports: [transport],
transports: transports,
...format,
});
}
Expand All @@ -63,3 +77,7 @@ export class GcpLogger implements LoggerConfig {
return this.options.telemetryConfig?.forceDevExport || env !== 'dev';
}
}

export function __addTransportStreamForTesting(stream: Writable) {
additionalStream = stream;
}
Loading
Loading