Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry collection p2 #222

Merged
merged 22 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,290 changes: 710 additions & 580 deletions package-lock.json

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@
"@koa/bodyparser": "^5.0.0",
"@koa/cors": "^5.0.0",
"@koa/router": "^12.0.0",
"@opentelemetry/api": "^1.4.1",
"@opentelemetry/exporter-logs-otlp-http": "0.41.2",
"@opentelemetry/exporter-trace-otlp-http": "^0.41.2",
"@opentelemetry/sdk-trace-base": "^1.15.2",
"@opentelemetry/api": "1.7.0",
"@opentelemetry/exporter-logs-otlp-proto": "0.46.0",
"@opentelemetry/exporter-trace-otlp-proto": "0.46.0",
Comment on lines +50 to +51
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the latest version and lock it. These are experimental packages and breaking changes happen often.

"@opentelemetry/sdk-trace-base": "1.19.0",
"@types/koa": "^2.13.8",
"@types/lodash": "^4.14.197",
"@types/pg": "^8.10.2",
Expand Down
2 changes: 1 addition & 1 deletion schemas/system_db_schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ export const systemDBSchema = `
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER dbos_notifications_trigger
AFTER INSERT ON notifications
FOR EACH ROW EXECUTE FUNCTION notifications_function();
Expand Down
2 changes: 2 additions & 0 deletions src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export class DBOSContextImpl implements DBOSContext {
authenticatedRoles: string[] = []; // All roles the user has according to authentication
assumedRole: string = ""; // Role in use - that user has and provided authorization to current function
workflowUUID: string = ""; // Workflow UUID. Empty for HandlerContexts.
executorID: string = "local"; // Executor ID. Gathered from request headers and "local" otherwise
readonly logger: DBOSLogger; // Wrapper around the global logger for this context.

constructor(readonly operationName: string, readonly span: Span, logger: Logger, parentCtx?: DBOSContextImpl) {
Expand All @@ -50,6 +51,7 @@ export class DBOSContextImpl implements DBOSContext {
this.authenticatedRoles = parentCtx.authenticatedRoles;
this.assumedRole = parentCtx.assumedRole;
this.workflowUUID = parentCtx.workflowUUID;
this.executorID = parentCtx.executorID;
}
this.logger = new DBOSLogger(logger, this);
}
Expand Down
37 changes: 27 additions & 10 deletions src/dbos-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ interface CommunicatorInfo {
config: CommunicatorConfig;
}

export const OperationType = {
HANDLER: "handler",
WORKFLOW: "workflow",
TRANSACTION: "transaction",
COMMUNICATOR: "communicator",
} as const;
Comment on lines +80 to +85
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Useful as span metadata


const TempWorkflowType = {
transaction: "transaction",
external: "external",
Expand Down Expand Up @@ -276,7 +283,7 @@ export class DBOSExecutor {
} catch (err) {
(err as Error).message = `failed to initialize workflow executor: ${(err as Error).message}`
this.logger.error(err);
throw new DBOSInitializationError(`failed to initialize workflow executor: ${(err as Error).message}`);
throw new DBOSInitializationError(`${(err as Error).message}`);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove duplicate text in this error message.

}
this.initialized = true;

Expand Down Expand Up @@ -364,12 +371,13 @@ export class DBOSExecutor {
const wConfig = wInfo.config;

const wCtxt: WorkflowContextImpl = new WorkflowContextImpl(this, params.parentCtx, workflowUUID, wConfig, wf.name, presetUUID);
wCtxt.span.setAttributes({ args: JSON.stringify(args) }); // TODO enforce skipLogging & request for hashing

let executorID: string = "local";
if (wCtxt.request.headers && wCtxt.request.headers[DBOSExecutorIDHeader]) {
executorID = wCtxt.request.headers[DBOSExecutorIDHeader] as string;
// This is to accomodate the testing runtime, that does not go through HTTP handlers to start workflows
// Ideally we would expose the executorID to the environment instead of propagating it through headers.
if (wCtxt.executorID === 'local' && wCtxt.request.headers && wCtxt.request.headers[DBOSExecutorIDHeader]) {
wCtxt.executorID = wCtxt.request.headers[DBOSExecutorIDHeader] as string;
}

const internalStatus: WorkflowStatusInternal = {
workflowUUID: workflowUUID,
status: StatusString.PENDING,
Expand All @@ -380,7 +388,7 @@ export class DBOSExecutor {
assumedRole: wCtxt.assumedRole,
authenticatedRoles: wCtxt.authenticatedRoles,
request: wCtxt.request,
executorID: executorID,
executorID: wCtxt.executorID,
};
// Synchronously set the workflow's status to PENDING and record workflow inputs.
if (!wCtxt.isTempWorkflow) {
Expand Down Expand Up @@ -638,10 +646,19 @@ export class DBOSExecutor {
}

#getRecoveryContext(workflowUUID: string, status: WorkflowStatus): DBOSContextImpl {
const span = this.tracer.startSpan(status.workflowName);
span.setAttributes({
operationName: status.workflowName,
});
const executorID = status.request.headers && status.request.headers[DBOSExecutorIDHeader]? status.request.headers[DBOSExecutorIDHeader] : "local";
const span = this.tracer.startSpan(
status.workflowName,
{
operationUUID: workflowUUID,
operationType: OperationType.WORKFLOW,
status: status.status,
authenticatedUser: status.authenticatedUser,
assumedRole: status.assumedRole,
authenticatedRoles: status.authenticatedRoles,
executorID: executorID,
},
);
const oc = new DBOSContextImpl(status.workflowName, span, this.logger);
oc.request = status.request;
oc.authenticatedUser = status.authenticatedUser;
Expand Down
1 change: 0 additions & 1 deletion src/dbos-runtime/applicationVersion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,3 @@ export function setApplicationVersion(version: string): void {
}

let applicationVersion: string | undefined = undefined;

30 changes: 27 additions & 3 deletions src/debugger/debug_workflow.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { DBOSExecutor, DBOSNull, dbosNull } from "../dbos-executor";
import { DBOSExecutor, DBOSNull, OperationType, dbosNull } from "../dbos-executor";
import { transaction_outputs } from "../../schemas/user_db_schema";
import { Transaction, TransactionContextImpl } from "../transaction";
import { Communicator } from "../communicator";
Expand Down Expand Up @@ -27,7 +27,17 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
readonly isTempWorkflow: boolean;

constructor(dbosExec: DBOSExecutor, parentCtx: DBOSContextImpl | undefined, workflowUUID: string, readonly workflowConfig: WorkflowConfig, workflowName: string) {
const span = dbosExec.tracer.startSpan(workflowName, { workflowUUID: workflowUUID }, parentCtx?.span);
const span = dbosExec.tracer.startSpan(
workflowName,
Copy link
Member

@kraftp kraftp Dec 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we doing any sort of tracing in debug workflows? @qianl15 what do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah right now the usage is unclear. Maybe we could need it for retro action, in the future. Happy to remove it if @qianl15 doesn't find it useful.

{
operationUUID: workflowUUID,
operationType: OperationType.WORKFLOW,
authenticatedUser: parentCtx?.authenticatedUser ?? "",
authenticatedRoles: parentCtx?.authenticatedRoles ?? [],
assumedRole: parentCtx?.assumedRole ?? "",
},
parentCtx?.span,
);
super(workflowName, span, dbosExec.logger, parentCtx);
this.workflowUUID = workflowUUID;
this.#dbosExec = dbosExec;
Expand Down Expand Up @@ -98,7 +108,19 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
}
// const readOnly = true; // TODO: eventually, this transaction must be read-only.
const funcID = this.functionIDGetIncrement();
const span: Span = this.#dbosExec.tracer.startSpan(txn.name, {}, this.span);
const span: Span = this.#dbosExec.tracer.startSpan(
txn.name,
{
operationUUID: this.workflowUUID,
operationType: OperationType.TRANSACTION,
authenticatedUser: this.authenticatedUser,
authenticatedRoles: this.authenticatedRoles,
assumedRole: this.assumedRole,
readOnly: txnInfo.config.readOnly ?? false, // For now doing as in src/workflow.ts:272
isolationLevel: txnInfo.config.isolationLevel,
},
this.span
);
let check: RecordedResult<R>;

const wrappedTransaction = async (client: UserDatabaseClient): Promise<R> => {
Expand Down Expand Up @@ -126,6 +148,8 @@ export class WorkflowContextDebug extends DBOSContextImpl implements WorkflowCon
}
const funcID = this.functionIDGetIncrement();

// FIXME: we do not create a span for the replay communicator. Do we want to?

// Original result must exist during replay.
const check: R | DBOSNull = await this.#dbosExec.systemDatabase.checkOperationOutput<R>(this.workflowUUID, funcID);
if (check === dbosNull) {
Expand Down
2 changes: 1 addition & 1 deletion src/decorators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -280,14 +280,14 @@ function getOrCreateMethodRegistration<This, Args extends unknown[], Return>(
const requiredRoles = methReg.getRequiredRoles();
if (requiredRoles.length > 0) {
opCtx = rawArgs[0] as DBOSContextImpl;
opCtx.span.setAttribute("requiredRoles", requiredRoles);
const curRoles = opCtx.authenticatedRoles;
let authorized = false;
const set = new Set(curRoles);
for (const role of requiredRoles) {
if (set.has(role)) {
authorized = true;
opCtx.assumedRole = role;
opCtx.span.setAttribute("assumedRole", role);
break;
}
}
Expand Down
20 changes: 17 additions & 3 deletions src/httpServer/handler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { MethodRegistration, MethodParameter, registerAndWrapFunction, getOrCreateMethodArgsRegistration, MethodRegistrationBase, getRegisteredOperations } from "../decorators";
import { DBOSExecutor } from "../dbos-executor";
import { DBOSExecutor, DBOSExecutorIDHeader, OperationType } from "../dbos-executor";
import { DBOSContext, DBOSContextImpl } from "../context";
import Koa from "koa";
import { Workflow, TailParameters, WorkflowHandle, WorkflowParams, WorkflowContext, WFInvokeFuncs } from "../workflow";
Expand Down Expand Up @@ -43,22 +43,35 @@ export class HandlerContextImpl extends DBOSContextImpl implements HandlerContex
readonly W3CTraceContextPropagator: W3CTraceContextPropagator;

constructor(dbosExec: DBOSExecutor, readonly koaContext: Koa.Context) {
// Retrieve or generate the request ID
const requestID = getOrGenerateRequestID(koaContext);

// If present, retrieve the trace context from the request
const httpTracer = new W3CTraceContextPropagator();
const extractedSpanContext = trace.getSpanContext(
httpTracer.extract(ROOT_CONTEXT, koaContext.request.headers, defaultTextMapGetter)
)
let span: Span;
const spanAttributes = {
operationName: koaContext.url,
operationType: OperationType.HANDLER,
requestID: requestID,
requestIP: koaContext.request.ip,
requestURL: koaContext.request.url,
requestMethod: koaContext.request.method,
};
if (extractedSpanContext === undefined) {
span = dbosExec.tracer.startSpan(koaContext.url, spanAttributes);
} else {
extractedSpanContext.isRemote = true;
span = dbosExec.tracer.startSpanWithContext(extractedSpanContext, koaContext.url, spanAttributes);
}

super(koaContext.url, span, dbosExec.logger);

if (koaContext.request.headers && koaContext.request.headers[DBOSExecutorIDHeader]) {
this.executorID = koaContext.request.headers[DBOSExecutorIDHeader] as string;
}
Comment on lines +71 to +73
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now get executor ID as soon as we create the handler context


this.W3CTraceContextPropagator = httpTracer;
this.request = {
headers: koaContext.request.headers,
Expand All @@ -72,8 +85,9 @@ export class HandlerContextImpl extends DBOSContextImpl implements HandlerContex
querystring: koaContext.request.querystring,
url: koaContext.request.url,
ip: koaContext.request.ip,
requestID: getOrGenerateRequestID(koaContext),
requestID: requestID,
};

if (dbosExec.config.application) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
this.applicationConfig = dbosExec.config.application;
Expand Down
4 changes: 2 additions & 2 deletions src/telemetry/exporters.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { TelemetrySignal } from "./collector";
import { isLogSignal, isTraceSignal } from "./";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-http";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto";
import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-proto";
import type { ReadableSpan } from "@opentelemetry/sdk-trace-base";
import type { ReadableLogRecord } from '@opentelemetry/sdk-logs';
import { ExportResult, ExportResultCode } from "@opentelemetry/core";
Expand Down
Loading
Loading