Skip to content

Commit

Permalink
Merge branch 'main' into met-140-context
Browse files Browse the repository at this point in the history
  • Loading branch information
Natoandro authored Dec 9, 2024
2 parents 5466424 + 2ea7a48 commit 48b837b
Show file tree
Hide file tree
Showing 20 changed files with 909 additions and 81 deletions.
19 changes: 15 additions & 4 deletions src/typegate/src/runtimes/substantial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
} from "./substantial/agent.ts";
import { closestWord } from "../utils.ts";
import { InternalAuth } from "../services/auth/protocols/internal.ts";
import { applyFilter, type Expr, type ExecutionStatus } from "./substantial/filter_utils.ts";

const logger = getLogger(import.meta);

Expand All @@ -28,7 +29,7 @@ interface QueryCompletedWorkflowResult {
started_at: string;
ended_at: string;
result: {
status: "COMPLETED" | "COMPLETED_WITH_ERROR" | "UNKNOWN";
status: ExecutionStatus;
value: unknown; // hinted by the user
};
}
Expand Down Expand Up @@ -218,9 +219,11 @@ export class SubstantialRuntime extends Runtime {
return JSON.parse(JSON.stringify(res));
};
case "results":
return this.#resultsResover(false);
return this.#resultsResolver(false);
case "results_raw":
return this.#resultsResover(true);
return this.#resultsResolver(true);
case "advanced_filters":
return this.#advancedFiltersResolver();
case "internal_link_parent_child":
return this.#linkerResolver();
default:
Expand Down Expand Up @@ -285,7 +288,7 @@ export class SubstantialRuntime extends Runtime {
};
}

#resultsResover(enableGenerics: boolean): Resolver {
#resultsResolver(enableGenerics: boolean): Resolver {
return async ({ name: workflowName }) => {
this.#checkWorkflowExistOrThrow(workflowName);

Expand Down Expand Up @@ -407,6 +410,14 @@ export class SubstantialRuntime extends Runtime {
};
}

#advancedFiltersResolver(): Resolver {
return async ({ name: workflowName, filter }) => {
this.#checkWorkflowExistOrThrow(workflowName);
// console.log("workflow", workflowName, "Filter", filter);
return await applyFilter(workflowName, this.agent, filter as Expr);
};
}

#linkerResolver(): Resolver {
return async ({ parent_run_id, child_run_id }) => {
await Meta.substantial.metadataWriteParentChildLink({
Expand Down
52 changes: 52 additions & 0 deletions src/typegate/src/runtimes/substantial/deno_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ export class Context {
private id = 0;
public kwargs = {};
gql: ReturnType<typeof createGQLClient>;
logger: SubLogger;

constructor(private run: Run, private internal: TaskContext) {
this.gql = createGQLClient(internal);
this.kwargs = getKwargsCopy(run);
this.logger = new SubLogger(this);
}

#nextId() {
Expand Down Expand Up @@ -414,6 +416,56 @@ class RetryStrategy {
}
}


class SubLogger {
constructor(private ctx: Context) {}

async #log(kind: "warn" | "error" | "info", ...args: unknown[]) {
await this.ctx.save(() => {
const prefix = `[${kind.toUpperCase()}: ${this.ctx.getRun().run_id}]`;
switch(kind) {
case "warn": {
console.warn(prefix, ...args);
break;
}
case "error": {
console.error(prefix,...args);
break;
}
default: {
console.info(prefix, ...args);
break;
}
}

const message = args.map((arg) => {
try {
const json = JSON.stringify(arg);
// Functions are omitted,
// For example, JSON.stringify(() => 1234) => undefined (no throw)
return json === undefined ? String(arg) : json;
} catch(_) {
return String(arg);
}
}).join(" ");

return `${prefix}: ${message}`;
});
}

async warn(...payload: unknown[]) {
await this.#log("warn", ...payload);
}

async info(...payload: unknown[]) {
await this.#log("info", ...payload);
}

async error(...payload: unknown[]) {
await this.#log("error", ...payload);
}
}

function createGQLClient(internal: TaskContext) {
const tgLocal = new URL(internal.meta.url);
if (testBaseUrl) {
Expand Down
Loading

0 comments on commit 48b837b

Please sign in to comment.