Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry collection p2 #222

Merged
merged 22 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,286 changes: 708 additions & 578 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@
"@koa/cors": "^5.0.0",
"@koa/router": "^12.0.0",
"@opentelemetry/api": "^1.4.1",
"@opentelemetry/exporter-logs-otlp-http": "0.41.2",
"@opentelemetry/exporter-trace-otlp-http": "^0.41.2",
"@opentelemetry/exporter-logs-otlp-proto": "0.46.0",
"@opentelemetry/exporter-trace-otlp-proto": "0.46.0",
Comment on lines +50 to +51
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the latest version and lock it. These are experimental packages and breaking changes happen often.

"@opentelemetry/sdk-trace-base": "^1.15.2",
"@types/koa": "^2.13.8",
"@types/lodash": "^4.14.197",
Expand Down
2 changes: 1 addition & 1 deletion schemas/system_db_schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export const systemDBSchema = `
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER dbos_notifications_trigger
AFTER INSERT ON notifications
FOR EACH ROW EXECUTE FUNCTION notifications_function();
Expand Down
2 changes: 2 additions & 0 deletions src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export class DBOSContextImpl implements DBOSContext {
authenticatedRoles: string[] = []; // All roles the user has according to authentication
assumedRole: string = ""; // Role in use - that user has and provided authorization to current function
workflowUUID: string = ""; // Workflow UUID. Empty for HandlerContexts.
executorID: string = "local"; // Executor ID. Gathered from request headers and "local" otherwise
readonly logger: DBOSLogger; // Wrapper around the global logger for this context.

constructor(readonly operationName: string, readonly span: Span, logger: Logger, parentCtx?: DBOSContextImpl) {
Expand All @@ -50,6 +51,7 @@ export class DBOSContextImpl implements DBOSContext {
this.authenticatedRoles = parentCtx.authenticatedRoles;
this.assumedRole = parentCtx.assumedRole;
this.workflowUUID = parentCtx.workflowUUID;
this.executorID = parentCtx.executorID;
}
this.logger = new DBOSLogger(logger, this);
}
Expand Down
32 changes: 21 additions & 11 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ interface CommunicatorInfo {
config: CommunicatorConfig;
}

export const OperationType = {
HANDLER: "handler",
WORKFLOW: "workflow",
TRANSACTION: "transaction",
COMMUNICATOR: "communicator",
} as const;
Comment on lines +80 to +85
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useful as span metadata


const TempWorkflowType = {
transaction: "transaction",
external: "external",
Expand Down Expand Up @@ -276,7 +283,7 @@ export class DBOSExecutor {
} catch (err) {
(err as Error).message = `failed to initialize workflow executor: ${(err as Error).message}`
this.logger.error(err);
throw new DBOSInitializationError(`failed to initialize workflow executor: ${(err as Error).message}`);
throw new DBOSInitializationError(`${(err as Error).message}`);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove duplicate text in this error message.

}
this.initialized = true;

Expand Down Expand Up @@ -364,12 +371,8 @@ export class DBOSExecutor {
const wConfig = wInfo.config;

const wCtxt: WorkflowContextImpl = new WorkflowContextImpl(this, params.parentCtx, workflowUUID, wConfig, wf.name, presetUUID);
wCtxt.span.setAttributes({ args: JSON.stringify(args) }); // TODO enforce skipLogging & request for hashing

let executorID: string = "local";
if (wCtxt.request.headers && wCtxt.request.headers[DBOSExecutorIDHeader]) {
executorID = wCtxt.request.headers[DBOSExecutorIDHeader] as string;
}
wCtxt.span.setAttribute("executorID", wCtxt.executorID);
const internalStatus: WorkflowStatusInternal = {
workflowUUID: workflowUUID,
status: StatusString.PENDING,
Expand All @@ -380,7 +383,7 @@ export class DBOSExecutor {
assumedRole: wCtxt.assumedRole,
authenticatedRoles: wCtxt.authenticatedRoles,
request: wCtxt.request,
executorID: executorID,
executorID: wCtxt.executorID,
};
// Synchronously set the workflow's status to PENDING and record workflow inputs.
if (!wCtxt.isTempWorkflow) {
Expand Down Expand Up @@ -638,10 +641,17 @@ export class DBOSExecutor {
}

#getRecoveryContext(workflowUUID: string, status: WorkflowStatus): DBOSContextImpl {
const span = this.tracer.startSpan(status.workflowName);
span.setAttributes({
operationName: status.workflowName,
});
const span = this.tracer.startSpan(
status.workflowName,
{
operationUUID: workflowUUID,
operationType: OperationType.WORKFLOW,
status: status.status,
authenticatedUser: status.authenticatedUser,
assumedRole: status.assumedRole,
authenticatedRoles: status.authenticatedRoles,
},
);
const oc = new DBOSContextImpl(status.workflowName, span, this.logger);
oc.request = status.request;
oc.authenticatedUser = status.authenticatedUser;
Expand Down
1 change: 0 additions & 1 deletion src/dbos-runtime/applicationVersion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ export function setApplicationVersion(version: string): void {
}

let applicationVersion: string | undefined = undefined;

30 changes: 27 additions & 3 deletions src/debugger/debug_workflow.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { DBOSExecutor, DBOSNull, dbosNull } from "../dbos-executor";
import { DBOSExecutor, DBOSNull, OperationType, dbosNull } from "../dbos-executor";
import { transaction_outputs } from "../../schemas/user_db_schema";
import { Transaction, TransactionContextImpl } from "../transaction";
import { Communicator } from "../communicator";
Expand Down Expand Up @@ -27,7 +27,17 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
readonly isTempWorkflow: boolean;

constructor(dbosExec: DBOSExecutor, parentCtx: DBOSContextImpl | undefined, workflowUUID: string, readonly workflowConfig: WorkflowConfig, workflowName: string) {
const span = dbosExec.tracer.startSpan(workflowName, { workflowUUID: workflowUUID }, parentCtx?.span);
const span = dbosExec.tracer.startSpan(
workflowName,
Copy link
Member

@kraftp kraftp Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing any sort of tracing in debug workflows? @qianl15 what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah right now the usage is unclear. Maybe we could need it for retro action, in the future. Happy to remove it if @qianl15 doesn't find it useful.

{
operationUUID: workflowUUID,
operationType: OperationType.WORKFLOW,
authenticatedUser: parentCtx?.authenticatedUser ?? "",
authenticatedRoles: parentCtx?.authenticatedRoles ?? [],
assumedRole: parentCtx?.assumedRole ?? "",
},
parentCtx?.span,
);
super(workflowName, span, dbosExec.logger, parentCtx);
this.workflowUUID = workflowUUID;
this.#dbosExec = dbosExec;
Expand Down Expand Up @@ -98,7 +108,19 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
}
// const readOnly = true; // TODO: eventually, this transaction must be read-only.
const funcID = this.functionIDGetIncrement();
const span: Span = this.#dbosExec.tracer.startSpan(txn.name, {}, this.span);
const span: Span = this.#dbosExec.tracer.startSpan(
txn.name,
{
operationUUID: this.workflowUUID,
operationType: OperationType.TRANSACTION,
authenticatedUser: this.authenticatedUser,
authenticatedRoles: this.authenticatedRoles,
assumedRole: this.assumedRole,
readOnly: txnInfo.config.readOnly ?? false, // For now doing as in src/workflow.ts:272
isolationLevel: txnInfo.config.isolationLevel,
},
this.span
);
let check: RecordedResult<R>;

const wrappedTransaction = async (client: UserDatabaseClient): Promise<R> => {
Expand Down Expand Up @@ -126,6 +148,8 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
}
const funcID = this.functionIDGetIncrement();

// FIXME: we do not create a span for the replay communicator. Do we want to?

// Original result must exist during replay.
const check: R | DBOSNull = await this.#dbosExec.systemDatabase.checkOperationOutput<R>(this.workflowUUID, funcID);
if (check === dbosNull) {
Expand Down
2 changes: 1 addition & 1 deletion src/decorators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,14 @@ function getOrCreateMethodRegistration<This, Args extends unknown[], Return>(
const requiredRoles = methReg.getRequiredRoles();
if (requiredRoles.length > 0) {
opCtx = rawArgs[0] as DBOSContextImpl;
opCtx.span.setAttribute("requiredRoles", requiredRoles);
const curRoles = opCtx.authenticatedRoles;
let authorized = false;
const set = new Set(curRoles);
for (const role of requiredRoles) {
if (set.has(role)) {
authorized = true;
opCtx.assumedRole = role;
opCtx.span.setAttribute("assumedRole", role);
break;
}
}
Expand Down
20 changes: 17 additions & 3 deletions src/httpServer/handler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { MethodRegistration, MethodParameter, registerAndWrapFunction, getOrCreateMethodArgsRegistration, MethodRegistrationBase, getRegisteredOperations } from "../decorators";
import { DBOSExecutor } from "../dbos-executor";
import { DBOSExecutor, DBOSExecutorIDHeader, OperationType } from "../dbos-executor";
import { DBOSContext, DBOSContextImpl } from "../context";
import Koa from "koa";
import { Workflow, TailParameters, WorkflowHandle, WorkflowParams, WorkflowContext, WFInvokeFuncs } from "../workflow";
Expand Down Expand Up @@ -43,22 +43,35 @@ export class HandlerContextImpl extends DBOSContextImpl implements HandlerContex
readonly W3CTraceContextPropagator: W3CTraceContextPropagator;

constructor(dbosExec: DBOSExecutor, readonly koaContext: Koa.Context) {
// Retrieve or generate the request ID
const requestID = getOrGenerateRequestID(koaContext);

// If present, retrieve the trace context from the request
const httpTracer = new W3CTraceContextPropagator();
const extractedSpanContext = trace.getSpanContext(
httpTracer.extract(ROOT_CONTEXT, koaContext.request.headers, defaultTextMapGetter)
)
let span: Span;
const spanAttributes = {
operationName: koaContext.url,
operationType: OperationType.HANDLER,
requestID: requestID,
requestIP: koaContext.request.ip,
requestURL: koaContext.request.url,
requestMethod: koaContext.request.method,
};
if (extractedSpanContext === undefined) {
span = dbosExec.tracer.startSpan(koaContext.url, spanAttributes);
} else {
extractedSpanContext.isRemote = true;
span = dbosExec.tracer.startSpanWithContext(extractedSpanContext, koaContext.url, spanAttributes);
}

super(koaContext.url, span, dbosExec.logger);

if (koaContext.request.headers && koaContext.request.headers[DBOSExecutorIDHeader]) {
this.executorID = koaContext.request.headers[DBOSExecutorIDHeader] as string;
}
Comment on lines +71 to +73
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now get executor ID as soon as we create the handler context


this.W3CTraceContextPropagator = httpTracer;
this.request = {
headers: koaContext.request.headers,
Expand All @@ -72,8 +85,9 @@ export class HandlerContextImpl extends DBOSContextImpl implements HandlerContex
querystring: koaContext.request.querystring,
url: koaContext.request.url,
ip: koaContext.request.ip,
requestID: getOrGenerateRequestID(koaContext),
requestID: requestID,
};

if (dbosExec.config.application) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.applicationConfig = dbosExec.config.application;
Expand Down
4 changes: 2 additions & 2 deletions src/telemetry/exporters.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { TelemetrySignal } from "./collector";
import { isLogSignal, isTraceSignal } from "./";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-http";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto";
import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-proto";
import type { ReadableSpan } from "@opentelemetry/sdk-trace-base";
import type { ReadableLogRecord } from '@opentelemetry/sdk-logs';
import { ExportResult, ExportResultCode } from "@opentelemetry/core";
Expand Down
83 changes: 41 additions & 42 deletions src/telemetry/logs.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { transports, createLogger, format, Logger as IWinstonLogger } from "winston";
import TransportStream = require("winston-transport");
import { getApplicationVersion } from "../dbos-runtime/applicationVersion";
import { DBOSContext } from "../context";
import { DBOSContextImpl } from "../context";
import { context } from "@opentelemetry/api";
import { LogAttributes, LogRecord, SeverityNumber } from "@opentelemetry/api-logs";
import { Logger as OTelLogger, LogRecord as OTelLogRecord, LoggerProvider as OTelLoggerProvider } from "@opentelemetry/sdk-logs";
import { Logger as OTelLogger, LogAttributes, SeverityNumber } from "@opentelemetry/api-logs";
import { LogRecord, LoggerProvider} from "@opentelemetry/sdk-logs";
import { Span } from "@opentelemetry/sdk-trace-base";
import { TelemetryCollector } from "./collector";

/*****************/
Expand All @@ -17,12 +18,16 @@ export interface LoggerConfig {
addContextMetadata?: boolean;
}

// This structure is mostly used to share contextual metadata with the console logger
// The span field is used by the OTLP transport for injection in the LogRecord. It allows us to tightly link logs and traces
type ContextualMetadata = {
includeContextMetadata?: boolean;
workflowUUID: string;
authenticatedUser: string;
traceId: string;
spanId: string;
executorID?: string;
operationUUID?: string;
authenticatedUser?: string;
authenticatedRoles?: string[];
assumedRole?: string;
span?: Span;
};

interface StackTrace {
Expand Down Expand Up @@ -105,14 +110,10 @@ export class Logger {
readonly metadata: ContextualMetadata;
constructor(
private readonly globalLogger: GlobalLogger,
private readonly ctx: DBOSContext
private readonly ctx: DBOSContextImpl,
) {
this.metadata = {
includeContextMetadata: this.globalLogger.addContextMetadata,
workflowUUID: this.ctx.workflowUUID,
authenticatedUser: this.ctx.authenticatedUser,
traceId: this.ctx.span.spanContext().traceId,
spanId: this.ctx.span.spanContext().spanId,
span: ctx.span,
};
}

Expand Down Expand Up @@ -152,7 +153,7 @@ export class Logger {
this.globalLogger.error(inputError, { ...this.metadata, stack: new Error().stack });
} else {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
this.globalLogger.error(JSON.stringify(inputError), {...this.metadata, stack: new Error().stack });
this.globalLogger.error(JSON.stringify(inputError), { ...this.metadata, stack: new Error().stack });
}
}
}
Expand All @@ -175,10 +176,11 @@ const consoleFormat = format.combine(
const formattedStack = stack?.split("\n").slice(1).join("\n");

const contextualMetadata: ContextualMetadata = {
workflowUUID: info.workflowUUID as string,
executorID: info.executorID as string,
operationUUID: info.workflowUUID as string,
authenticatedUser: info.authenticatedUser as string,
traceId: info.traceId as string,
spanId: info.spanId as string,
authenticatedRoles: info.authenticatedRoles as string[],
assumedRole: info.assumedRole as string,
};
const messageString: string = typeof message === "string" ? message : JSON.stringify(message);
const fullMessageString = `${messageString}${info.includeContextMetadata ? ` ${JSON.stringify(contextualMetadata)}` : ""}`;
Expand All @@ -193,27 +195,29 @@ class OTLPLogQueueTransport extends TransportStream {
readonly name = "OTLPLogQueueTransport";
readonly otelLogger: OTelLogger;

constructor(private readonly telemetryCollector: TelemetryCollector) {
constructor(readonly telemetryCollector: TelemetryCollector) {
super();
// not sure if we need a more explicit name here
this.otelLogger = new OTelLoggerProvider().getLogger("default") as OTelLogger;
const loggerProvider = new LoggerProvider();
this.otelLogger = loggerProvider.getLogger("default");
const logRecordProcessor = {
forceFlush: async () => {
// no-op
},
onEmit(logRecord: LogRecord) { // Use optionakl coàntext?
telemetryCollector.push(logRecord);
},
shutdown: async () => {
// no-op
}
};
loggerProvider.addLogRecordProcessor(logRecordProcessor);
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
log(info: any, callback: () => void): void {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
const { level, message, stack } = info;

const contextualMetadata: ContextualMetadata = {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
workflowUUID: info.workflowUUID as string,
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
authenticatedUser: info.authenticatedUser as string,
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
traceId: info.traceId as string,
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
spanId: info.spanId as string,
};
const { level, message, stack, span } = info;

const levelToSeverityNumber: { [key: string]: SeverityNumber } = {
error: SeverityNumber.ERROR,
Expand All @@ -222,21 +226,16 @@ class OTLPLogQueueTransport extends TransportStream {
debug: SeverityNumber.DEBUG,
};

const log: LogRecord = {
this.otelLogger.emit({
severityNumber: levelToSeverityNumber[level as string],
severityText: level as string,
body: message as string,
timestamp: new Date().getTime(),
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
attributes: { ...contextualMetadata, stack } as LogAttributes,
// TODO as a nice-to-have, we could retrieve the operation current context, if we use a context manager, and inject the traceId/spanId in the LogRecord
// See https://opentelemetry.io/docs/instrumentation/js/context/#active-context
context: context.active(),
};

const logRecord: OTelLogRecord = new OTelLogRecord(this.otelLogger, log);

this.telemetryCollector.push(logRecord);
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access
attributes: { ...span.attributes, stack } as LogAttributes,
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-unsafe-member-access, @typescript-eslint/no-unsafe-call
context: span?.spanContext() || context.active(),
});

callback();
}
Expand Down
Loading
Loading