Skip to content

Commit

Permalink
Merge branch 'main' into fix/where
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohe-Am authored Jan 12, 2025
2 parents 34b6f69 + 445bb5d commit ea0b4aa
Show file tree
Hide file tree
Showing 27 changed files with 367 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ Synchronization variable names start with `SYNC_`.
| SYNC\__S3_SECRET_KEY (\_Required_) | Access key secret for the S3 store credentials; |
| SYNC\__S3_PATH_STYLE (\_Optional_) | `true` or `false`, force path style if `true`. |
| SYNC\__S3_BUCKET (\_Required_) | The bucket to be used for the system (dedicated). |
| SYNC\__FORCE_REMOVE (\_Optional_) | `true` or `false`, Undeploy cached typegraphs at boot |

## Synchronized mode features

Expand Down
42 changes: 42 additions & 0 deletions src/common/src/typegraph/runtimes/deno.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
// SPDX-License-Identifier: MPL-2.0

use anyhow::{anyhow, Context, Result};
use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand All @@ -11,6 +12,47 @@ pub struct FunctionMatData {
pub script: String,
}

#[derive(Serialize, Deserialize, Clone, Debug, Hash, PartialEq, Eq)]
#[serde(tag = "type", content = "value")]
#[serde(rename_all = "snake_case")]
pub enum ContextCheckX {
NotNull,
Value(String),
Pattern(String),
}

#[derive(PartialEq, Eq, Hash, Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "name", content = "param")]
pub enum PredefinedFunctionMatData {
Identity,
True,
False,
Allow,
Deny,
Pass,
InternalPolicy,
ContextCheck { key: String, value: ContextCheckX },
}

#[derive(Serialize)]
struct PredefinedFunctionMatDataRaw {
name: String,
param: Option<Value>,
}

impl PredefinedFunctionMatData {
pub fn from_raw(name: String, param: Option<String>) -> Result<Self> {
let param = param
.map(|p| serde_json::from_str(&p))
.transpose()
.context("invalid predefined function materializer parameter")?;
let value = serde_json::to_value(&PredefinedFunctionMatDataRaw { name, param })?;
serde_json::from_value(value)
.map_err(|e| anyhow!("invalid predefined function materializer: {e:?}"))
}
}

#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ModuleMatData {
Expand Down
1 change: 1 addition & 0 deletions src/typegate/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export function transformSyncConfig(raw: SyncConfig): SyncConfigX {
redis,
s3,
s3Bucket: raw.s3_bucket,
forceRemove: raw.force_remove
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/typegate/src/config/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ export const syncConfigSchema = z.object({
s3_access_key: refineEnvVar("SYNC_S3_ACCESS_KEY"),
s3_secret_key: refineEnvVar("SYNC_S3_SECRET_KEY"),
s3_path_style: zBooleanString.default(false),
force_remove: zBooleanString.default(false),
});
export type SyncConfig = z.infer<typeof syncConfigSchema>;
export type SyncConfigX = {
redis: RedisConnectOptions;
s3: S3ClientConfig;
s3Bucket: string;
forceRemove?: boolean
};

export type TypegateConfig = {
Expand Down
1 change: 0 additions & 1 deletion src/typegate/src/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ try {
base: defaultTypegateConfigBase,
});
const typegate = await Typegate.init(config);

await SystemTypegraph.loadAll(typegate, !globalConfig.packaged);

const server = Deno.serve(
Expand Down
45 changes: 36 additions & 9 deletions src/typegate/src/runtimes/deno/deno.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,42 @@ import { WorkerManager } from "./worker_manager.ts";

const logger = getLogger(import.meta);

const predefinedFuncs: Record<string, Resolver<Record<string, unknown>>> = {
identity: ({ _, ...args }) => args,
true: () => true,
false: () => false,
allow: () => "ALLOW" as PolicyResolverOutput,
deny: () => "DENY" as PolicyResolverOutput,
pass: () => "PASS" as PolicyResolverOutput,
internal_policy: ({ _: { context } }) =>
const predefinedFuncs: Record<
string,
(param: any) => Resolver<Record<string, unknown>>
> = {
identity: () => ({ _, ...args }) => args,
true: () => () => true,
false: () => () => false,
allow: () => () => "ALLOW" as PolicyResolverOutput,
deny: () => () => "DENY" as PolicyResolverOutput,
pass: () => () => "PASS" as PolicyResolverOutput,
internal_policy: () => ({ _: { context } }) =>
context.provider === "internal" ? "ALLOW" : "PASS" as PolicyResolverOutput,
context_check: ({ key, value }) => {
let check: (value: any) => boolean;
switch (value.type) {
case "not_null":
check = (v) => v != null;
break;
case "value":
check = (v) => v === value.value;
break;
case "pattern":
check = (v) => v != null && new RegExp(value.value).test(v);
break;
default:
throw new Error("unreachable");
}
const path = key.split(".");
return ({ _: { context } }) => {
let value: any = context;
for (const segment of path) {
value = value?.[segment];
}
return check(value) ? "PASS" : "DENY" as PolicyResolverOutput;
};
},
};

export class DenoRuntime extends Runtime {
Expand Down Expand Up @@ -229,7 +256,7 @@ export class DenoRuntime extends Runtime {
if (!func) {
throw new Error(`predefined function ${mat.data.name} not found`);
}
return func;
return func(mat.data.param);
}

if (mat.name === "static") {
Expand Down
28 changes: 22 additions & 6 deletions src/typegate/src/sync/replicated_map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,27 +105,43 @@ export class RedisReplicatedMap<T> implements AsyncDisposable {
this.redisObs.close();
}

async getAllHistory() {
const { key, redis } = this;
const all = await redis.hgetall(key);
const history = [];
for (let i = 0; i < all.length; i += 2) {
history.push({
name: all[i],
payload: all[i+1]
});
}

return history;
}

async historySync(): Promise<XIdInput> {
const { key, redis, deserializer } = this;
const { redis, deserializer } = this;

// get last received message before loading history
const [lastMessage] = await redis.xrevrange(this.ekey, "+", "-", 1);
const lastId = lastMessage ? lastMessage.xid : 0;
logger.debug("last message loaded: {}", lastId);

const all = await redis.hgetall(key);
const all = await this.getAllHistory();
logger.debug("history load start: {} elements", all.length);
for (let i = 0; i < all.length; i += 2) {
const name = all[i];
const payload = all[i + 1];

for (const { name, payload } of all) {
logger.info(`reloaded addition: ${name}`);
ensure(
!this.memory.has(name),
() => `typegraph ${name} should not exists in memory at first sync`,
);
this.memory.set(name, await deserializer(payload, true));

const engine = await deserializer(payload, true);
this.memory.set(name, engine);
}
logger.debug("history load end");

return lastId;
}

Expand Down
35 changes: 33 additions & 2 deletions src/typegate/src/typegate/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ import type { ArtifactStore } from "./artifacts/mod.ts";
// TODO move from tests (MET-497)
import { MemoryRegister } from "./memory_register.ts";
import { NoLimiter } from "./no_limiter.ts";
import { TypegraphStore } from "../sync/typegraph.ts";
import { typegraphIdSchema, TypegraphStore } from "../sync/typegraph.ts";
import { createLocalArtifactStore } from "./artifacts/local.ts";
import { createSharedArtifactStore } from "./artifacts/shared.ts";
import { AsyncDisposableStack } from "dispose";
Expand Down Expand Up @@ -141,15 +141,30 @@ export class Typegate implements AsyncDisposable {
stack.move(),
);

const typegraphStore = TypegraphStore.init(syncConfig, cryptoKeys);
const register = await ReplicatedRegister.init(
typegate,
syncConfig.redis,
TypegraphStore.init(syncConfig, cryptoKeys),
typegraphStore
);
typegate.disposables.use(register);

(typegate as { register: Register }).register = register;


if (config.sync?.forceRemove) {
logger.warn("Force removal at boot enabled");
const history = await register.replicatedMap.getAllHistory();
for (const { name, payload } of history) {
try {
await typegate.forceRemove(name, payload, typegraphStore);
} catch (e) {
logger.error(`Failed to force remove typegraph "${name}": ${e}`);
Sentry.captureException(e);
}
}
}

const lastSync = await register.historySync().catch((err) => {
logger.error(err);
throw new Error(
Expand Down Expand Up @@ -397,6 +412,22 @@ export class Typegate implements AsyncDisposable {
await this.artifactStore.runArtifactGC();
}

async forceRemove(name: string, payload: string, typegraphStore: TypegraphStore) {
logger.warn(`Dropping "${name}": started`);
const typegraphId = typegraphIdSchema.parse(JSON.parse(payload));
const [tg] = await typegraphStore.download(
typegraphId,
);
const artifacts = new Set(
Object.values(tg.meta.artifacts).map((m) => m.hash),
);

await this.register.remove(name);
await this.artifactStore.updateRefCounts(new Set(), artifacts);
await this.artifactStore.runArtifactGC();
logger.warn(`Dropping "${name}": done`);
}

async initQueryEngine(
tgDS: TypeGraphDS,
secretManager: SecretManager,
Expand Down
2 changes: 1 addition & 1 deletion src/typegate/src/typegate/register.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ export class ReplicatedRegister extends Register {
return new ReplicatedRegister(replicatedMap);
}

constructor(private replicatedMap: RedisReplicatedMap<QueryEngine>) {
constructor(public replicatedMap: RedisReplicatedMap<QueryEngine>) {
super();
}

Expand Down
14 changes: 10 additions & 4 deletions src/typegate/src/typegraphs/typegate.json
Original file line number Diff line number Diff line change
Expand Up @@ -807,15 +807,21 @@
"data": {}
},
{
"name": "function",
"name": "predefined_function",
"runtime": 0,
"effect": {
"effect": "read",
"idempotent": true
},
"data": {
"script": "var _my_lambda = (_args, { context }) => context.username === 'admin' ? 'ALLOW' : 'DENY' ",
"secrets": []
"name": "context_check",
"param": {
"key": "username",
"value": {
"type": "value",
"value": "admin"
}
}
}
},
{
Expand Down Expand Up @@ -942,7 +948,7 @@
],
"policies": [
{
"name": "admin_only",
"name": "__ctx_username_admin",
"materializer": 1
}
],
Expand Down
9 changes: 2 additions & 7 deletions src/typegate/src/typegraphs/typegate.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
# Copyright Metatype OÜ, licensed under the Mozilla Public License Version 2.0.
# SPDX-License-Identifier: MPL-2.0

from typegraph import Graph, fx, t, typegraph
from typegraph import Graph, fx, t, typegraph, Policy
from typegraph.gen.exports.runtimes import TypegateOperation
from typegraph.gen.types import Err
from typegraph.graph.params import Auth, Cors, Rate
from typegraph.runtimes.base import Materializer
from typegraph.runtimes.deno import DenoRuntime
from typegraph.wit import runtimes, store

### Prisma query (Json protocol):
Expand Down Expand Up @@ -95,11 +94,7 @@
),
)
def typegate(g: Graph):
deno = DenoRuntime()
admin_only = deno.policy(
"admin_only",
code="(_args, { context }) => context.username === 'admin' ? 'ALLOW' : 'DENY' ",
)
admin_only = Policy.context("username", "admin")

g.auth(Auth.basic(["admin"]))

Expand Down
5 changes: 1 addition & 4 deletions src/typegraph/core/src/conversion/runtimes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,10 +128,7 @@ impl MaterializerConverter for DenoMaterializer {
("import_function".to_string(), data)
}
Predefined(predef) => {
let data = serde_json::from_value(json!({
"name": predef.name,
}))
.unwrap();
let data = serde_json::from_value(serde_json::to_value(predef).unwrap()).unwrap();
("predefined_function".to_string(), data)
}
};
Expand Down
4 changes: 0 additions & 4 deletions src/typegraph/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,6 @@ pub fn object_not_found(kind: &str, id: u32) -> TgError {
// .into()
// }

pub fn unknown_predefined_function(name: &str, runtime: &str) -> TgError {
format!("unknown predefined function {name} for runtime {runtime}").into()
}

pub fn duplicate_policy_name(name: &str) -> TgError {
format!("duplicate policy name '{name}'").into()
}
Loading

0 comments on commit ea0b4aa

Please sign in to comment.