diff --git a/Cargo.lock b/Cargo.lock index 2928b6d015..60fb74198d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2164,6 +2164,7 @@ dependencies = [ "prost-build", "prost-types", "quilkin-macros", + "quilkin-proto", "rand", "regex", "schemars", @@ -2195,6 +2196,7 @@ dependencies = [ "tryhard", "url", "uuid", + "xds", "xxhash-rust", ] @@ -2207,6 +2209,15 @@ dependencies = [ "syn 2.0.60", ] +[[package]] +name = "quilkin-proto" +version = "0.1.0" +dependencies = [ + "prost", + "prost-types", + "tonic", +] + [[package]] name = "quote" version = "1.0.36" @@ -3665,6 +3676,37 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "xds" +version = "0.1.0" +dependencies = [ + "arc-swap", + "async-stream", + "cached", + "enum-map", + "eyre", + "fixedstr", + "futures", + "once_cell", + "parking_lot", + "prometheus", + "prost", + "prost-types", + "quilkin-proto", + "rand", + "schemars", + "serde", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-futures", + "tryhard", + "uuid", +] + [[package]] name = "xxhash-rust" version = "0.8.10" diff --git a/Cargo.toml b/Cargo.toml index a7200b3031..4833c15212 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,22 +66,24 @@ test = false [dependencies] # Local quilkin-macros = { version = "0.9.0-dev", path = "./crates/macros" } +xds = { path = "crates/xds" } +quilkin-proto = { path = "crates/quilkin-proto" } # Crates.io -arc-swap = { version = "1.6.0", features = ["serde"] } +arc-swap.workspace = true async-channel.workspace = true -async-stream = "0.3.5" +async-stream.workspace = true base64.workspace = true base64-serde = "0.7.0" bytes = { version = "1.5.0", features = ["serde"] } -cached = { version = "0.49", default-features = false } +cached.workspace = true time = { version = "0.3", default-features = false, features = ["std"] } clap = { version = "4.4.6", features = ["cargo", "derive", "env"] } dashmap = { version = "5.5.3", features = ["serde"] } either = "1.9.0" -enum-map = "2.6.3" -eyre = "0.6.8" -fixedstr = { version = "0.5", features = ["flex-str"] } +enum-map.workspace = true +eyre.workspace = true +fixedstr.workspace = true futures.workspace = true hyper = { version = "0.14.27", features = ["http2"] } hyper-rustls = { version = "0.24.1", features = ["http2", "webpki-roots"] } @@ -92,15 +94,15 @@ maxminddb = "0.24.0" notify = "6.1.1" num_cpus = "1.16.0" once_cell = "1.18.0" -parking_lot = "0.12.1" -prometheus = { version = "0.13.3", default-features = false } -prost = "0.12.1" -prost-types = "0.12.1" +parking_lot.workspace = true +prometheus.workspace = true +prost.workspace = true +prost-types.workspace = true rand.workspace = true regex = "1.9.6" -schemars = { version = "0.8.15", features = ["bytes", "url"] } +schemars.workspace = true seahash = "4.1" -serde = { version = "1.0.188", features = ["derive", "rc"] } +serde.workspace = true serde_json.workspace = true serde_regex = "1.1.0" serde_stacker = "0.1.10" @@ -108,16 +110,16 @@ serde_yaml = "0.9.25" snap = "1.1.0" socket2.workspace = true stable-eyre = "0.2.2" -thiserror = "1.0.49" +thiserror.workspace = true tokio.workspace = true -tokio-stream = { version = "0.1.14", features = ["net", "sync"] } -tonic = "0.10.2" +tokio-stream.workspace = true +tonic.workspace = true tracing.workspace = true -tracing-futures = { version = "0.2.5", features = ["futures-03"] } +tracing-futures.workspace = true tracing-subscriber = { workspace = true, features = ["json", "env-filter"] } -tryhard = "0.5.1" +tryhard.workspace = true url = { version = "2.4.1", features = ["serde"] } -uuid = { version = "1.4.1", default-features = false, features = ["v4"] } +uuid.workspace = true lasso = { version = "0.7.2", features = ["multi-threaded"] } kube.workspace = true kube-core.workspace = true @@ -168,8 +170,14 @@ debug = true members = [".", "crates/*"] [workspace.dependencies] +arc-swap = { version = "1.6.0", features = ["serde"] } async-channel = "2.1.0" +async-stream = "0.3.5" base64 = "0.21.0" +cached = { version = "0.49", default-features = false } +eyre = "0.6.8" +enum-map = "2.6.3" +futures = "0.3.28" kube = { version = "0.91", features = [ "runtime", "rustls-tls", @@ -179,12 +187,18 @@ kube-core = { version = "0.91", default-features = false, features = [ "schema", ] } k8s-openapi = { version = "0.22", features = ["v1_29", "schemars"] } -futures = "0.3.28" once_cell = "1.18.0" +prometheus = { version = "0.13.3", default-features = false } +prost = "0.12.1" +prost-types = "0.12.1" quilkin = { path = "." } rand = "0.8.5" +serde = { version = "1.0.188", features = ["derive", "rc"] } serde_json = "1.0.107" socket2 = { version = "0.5.4", features = ["all"] } +tempfile = "3.8.0" +thiserror = "1.0.49" +tokio-stream = { version = "0.1.14", features = ["net", "sync"] } tokio = { version = "1.32.0", features = [ "rt-multi-thread", "fs", @@ -193,6 +207,12 @@ tokio = { version = "1.32.0", features = [ "parking_lot", "tracing", ] } -tempfile = "3.8.0" +tonic = "0.10.2" tracing = "0.1.37" +tracing-futures = { version = "0.2.5", features = ["futures-03"] } tracing-subscriber = "0.3" +tryhard = "0.5.1" +uuid = { version = "1.4.1", default-features = false, features = ["v4"] } +fixedstr = { version = "0.5", features = ["flex-str"] } +parking_lot = "0.12.1" +schemars = { version = "0.8.15", features = ["bytes", "url"] } diff --git a/crates/proto-gen/gen.rs b/crates/proto-gen/gen.rs index f58221090a..4bf934f48e 100644 --- a/crates/proto-gen/gen.rs +++ b/crates/proto-gen/gen.rs @@ -211,7 +211,7 @@ fn execute(which: &str) { .arg("--generate-transport") .args(["--disable-comments", "."]) .arg(which) - .args(["-o", "src/generated"]); + .args(["-o", "crates/quilkin-proto/src/generated"]); for (dir, files) in files { cmd.arg("-d"); diff --git a/crates/quilkin-proto/Cargo.toml b/crates/quilkin-proto/Cargo.toml new file mode 100644 index 0000000000..e822ecd8b6 --- /dev/null +++ b/crates/quilkin-proto/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "quilkin-proto" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +prost.workspace = true +prost-types.workspace = true +tonic.workspace = true diff --git a/src/generated/envoy.rs b/crates/quilkin-proto/src/envoy.rs similarity index 100% rename from src/generated/envoy.rs rename to crates/quilkin-proto/src/envoy.rs diff --git a/src/generated/envoy/config.rs b/crates/quilkin-proto/src/envoy/config.rs similarity index 100% rename from src/generated/envoy/config.rs rename to crates/quilkin-proto/src/envoy/config.rs diff --git a/src/generated/envoy/config/accesslog.rs b/crates/quilkin-proto/src/envoy/config/accesslog.rs similarity index 100% rename from src/generated/envoy/config/accesslog.rs rename to crates/quilkin-proto/src/envoy/config/accesslog.rs diff --git a/src/generated/envoy/config/accesslog/v3.rs b/crates/quilkin-proto/src/envoy/config/accesslog/v3.rs similarity index 100% rename from src/generated/envoy/config/accesslog/v3.rs rename to crates/quilkin-proto/src/envoy/config/accesslog/v3.rs diff --git a/src/generated/envoy/config/core.rs b/crates/quilkin-proto/src/envoy/config/core.rs similarity index 100% rename from src/generated/envoy/config/core.rs rename to crates/quilkin-proto/src/envoy/config/core.rs diff --git a/src/generated/envoy/config/core/v3.rs b/crates/quilkin-proto/src/envoy/config/core/v3.rs similarity index 100% rename from src/generated/envoy/config/core/v3.rs rename to crates/quilkin-proto/src/envoy/config/core/v3.rs diff --git a/src/generated/envoy/config/endpoint.rs b/crates/quilkin-proto/src/envoy/config/endpoint.rs similarity index 100% rename from src/generated/envoy/config/endpoint.rs rename to crates/quilkin-proto/src/envoy/config/endpoint.rs diff --git a/src/generated/envoy/config/endpoint/v3.rs b/crates/quilkin-proto/src/envoy/config/endpoint/v3.rs similarity index 100% rename from src/generated/envoy/config/endpoint/v3.rs rename to crates/quilkin-proto/src/envoy/config/endpoint/v3.rs diff --git a/src/generated/envoy/config/listener.rs b/crates/quilkin-proto/src/envoy/config/listener.rs similarity index 100% rename from src/generated/envoy/config/listener.rs rename to crates/quilkin-proto/src/envoy/config/listener.rs diff --git a/src/generated/envoy/config/listener/v3.rs b/crates/quilkin-proto/src/envoy/config/listener/v3.rs similarity index 100% rename from src/generated/envoy/config/listener/v3.rs rename to crates/quilkin-proto/src/envoy/config/listener/v3.rs diff --git a/src/generated/envoy/config/route.rs b/crates/quilkin-proto/src/envoy/config/route.rs similarity index 100% rename from src/generated/envoy/config/route.rs rename to crates/quilkin-proto/src/envoy/config/route.rs diff --git a/src/generated/envoy/config/route/v3.rs b/crates/quilkin-proto/src/envoy/config/route/v3.rs similarity index 100% rename from src/generated/envoy/config/route/v3.rs rename to crates/quilkin-proto/src/envoy/config/route/v3.rs diff --git a/src/generated/envoy/kind.rs b/crates/quilkin-proto/src/envoy/kind.rs similarity index 100% rename from src/generated/envoy/kind.rs rename to crates/quilkin-proto/src/envoy/kind.rs diff --git a/src/generated/envoy/kind/matcher.rs b/crates/quilkin-proto/src/envoy/kind/matcher.rs similarity index 100% rename from src/generated/envoy/kind/matcher.rs rename to crates/quilkin-proto/src/envoy/kind/matcher.rs diff --git a/src/generated/envoy/kind/matcher/v3.rs b/crates/quilkin-proto/src/envoy/kind/matcher/v3.rs similarity index 100% rename from src/generated/envoy/kind/matcher/v3.rs rename to crates/quilkin-proto/src/envoy/kind/matcher/v3.rs diff --git a/src/generated/envoy/kind/metadata.rs b/crates/quilkin-proto/src/envoy/kind/metadata.rs similarity index 100% rename from src/generated/envoy/kind/metadata.rs rename to crates/quilkin-proto/src/envoy/kind/metadata.rs diff --git a/src/generated/envoy/kind/metadata/v3.rs b/crates/quilkin-proto/src/envoy/kind/metadata/v3.rs similarity index 100% rename from src/generated/envoy/kind/metadata/v3.rs rename to crates/quilkin-proto/src/envoy/kind/metadata/v3.rs diff --git a/src/generated/envoy/kind/tracing.rs b/crates/quilkin-proto/src/envoy/kind/tracing.rs similarity index 100% rename from src/generated/envoy/kind/tracing.rs rename to crates/quilkin-proto/src/envoy/kind/tracing.rs diff --git a/src/generated/envoy/kind/tracing/v3.rs b/crates/quilkin-proto/src/envoy/kind/tracing/v3.rs similarity index 100% rename from src/generated/envoy/kind/tracing/v3.rs rename to crates/quilkin-proto/src/envoy/kind/tracing/v3.rs diff --git a/src/generated/envoy/kind/v3.rs b/crates/quilkin-proto/src/envoy/kind/v3.rs similarity index 100% rename from src/generated/envoy/kind/v3.rs rename to crates/quilkin-proto/src/envoy/kind/v3.rs diff --git a/src/generated/envoy/service.rs b/crates/quilkin-proto/src/envoy/service.rs similarity index 100% rename from src/generated/envoy/service.rs rename to crates/quilkin-proto/src/envoy/service.rs diff --git a/src/generated/envoy/service/discovery.rs b/crates/quilkin-proto/src/envoy/service/discovery.rs similarity index 100% rename from src/generated/envoy/service/discovery.rs rename to crates/quilkin-proto/src/envoy/service/discovery.rs diff --git a/src/generated/envoy/service/discovery/v3.rs b/crates/quilkin-proto/src/envoy/service/discovery/v3.rs similarity index 100% rename from src/generated/envoy/service/discovery/v3.rs rename to crates/quilkin-proto/src/envoy/service/discovery/v3.rs diff --git a/src/generated/google.rs b/crates/quilkin-proto/src/google.rs similarity index 100% rename from src/generated/google.rs rename to crates/quilkin-proto/src/google.rs diff --git a/src/generated/google/rpc.rs b/crates/quilkin-proto/src/google/rpc.rs similarity index 100% rename from src/generated/google/rpc.rs rename to crates/quilkin-proto/src/google/rpc.rs diff --git a/crates/quilkin-proto/src/lib.rs b/crates/quilkin-proto/src/lib.rs new file mode 100644 index 0000000000..17cf090856 --- /dev/null +++ b/crates/quilkin-proto/src/lib.rs @@ -0,0 +1,13 @@ +#![allow( + clippy::doc_markdown, + clippy::use_self, + clippy::enum_variant_names, + clippy::large_enum_variant, + clippy::len_without_is_empty, + rustdoc::bare_urls +)] +pub mod envoy; +pub mod google; +pub mod quilkin; +pub mod validate; +pub mod xds; diff --git a/src/generated/quilkin.rs b/crates/quilkin-proto/src/quilkin.rs similarity index 100% rename from src/generated/quilkin.rs rename to crates/quilkin-proto/src/quilkin.rs diff --git a/src/generated/quilkin/config.rs b/crates/quilkin-proto/src/quilkin/config.rs similarity index 100% rename from src/generated/quilkin/config.rs rename to crates/quilkin-proto/src/quilkin/config.rs diff --git a/src/generated/quilkin/config/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/config/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/config/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/config/v1alpha1.rs diff --git a/src/generated/quilkin/filters.rs b/crates/quilkin-proto/src/quilkin/filters.rs similarity index 100% rename from src/generated/quilkin/filters.rs rename to crates/quilkin-proto/src/quilkin/filters.rs diff --git a/src/generated/quilkin/filters/capture.rs b/crates/quilkin-proto/src/quilkin/filters/capture.rs similarity index 100% rename from src/generated/quilkin/filters/capture.rs rename to crates/quilkin-proto/src/quilkin/filters/capture.rs diff --git a/src/generated/quilkin/filters/capture/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/filters/capture/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/filters/capture/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/filters/capture/v1alpha1.rs diff --git a/src/generated/quilkin/filters/compress.rs b/crates/quilkin-proto/src/quilkin/filters/compress.rs similarity index 100% rename from src/generated/quilkin/filters/compress.rs rename to crates/quilkin-proto/src/quilkin/filters/compress.rs diff --git a/src/generated/quilkin/filters/compress/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/filters/compress/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/filters/compress/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/filters/compress/v1alpha1.rs diff --git a/src/generated/quilkin/filters/concatenate.rs b/crates/quilkin-proto/src/quilkin/filters/concatenate.rs similarity index 100% rename from src/generated/quilkin/filters/concatenate.rs rename to crates/quilkin-proto/src/quilkin/filters/concatenate.rs diff --git a/src/generated/quilkin/filters/concatenate/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/filters/concatenate/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/filters/concatenate/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/filters/concatenate/v1alpha1.rs diff --git a/src/generated/quilkin/filters/debug.rs b/crates/quilkin-proto/src/quilkin/filters/debug.rs similarity index 100% rename from src/generated/quilkin/filters/debug.rs rename to crates/quilkin-proto/src/quilkin/filters/debug.rs diff --git a/src/generated/quilkin/filters/debug/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/filters/debug/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/filters/debug/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/filters/debug/v1alpha1.rs diff --git a/src/generated/quilkin/filters/drop.rs b/crates/quilkin-proto/src/quilkin/filters/drop.rs similarity index 100% rename from src/generated/quilkin/filters/drop.rs rename to crates/quilkin-proto/src/quilkin/filters/drop.rs diff --git a/src/generated/quilkin/filters/drop/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/filters/drop/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/filters/drop/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/filters/drop/v1alpha1.rs diff --git a/src/generated/quilkin/filters/firewall.rs b/crates/quilkin-proto/src/quilkin/filters/firewall.rs similarity index 100% rename from src/generated/quilkin/filters/firewall.rs rename to crates/quilkin-proto/src/quilkin/filters/firewall.rs diff --git a/src/generated/quilkin/filters/firewall/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/filters/firewall/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/filters/firewall/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/filters/firewall/v1alpha1.rs diff --git a/src/generated/quilkin/filters/load_balancer.rs b/crates/quilkin-proto/src/quilkin/filters/load_balancer.rs similarity index 100% rename from src/generated/quilkin/filters/load_balancer.rs rename to crates/quilkin-proto/src/quilkin/filters/load_balancer.rs diff --git a/src/generated/quilkin/filters/load_balancer/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/filters/load_balancer/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/filters/load_balancer/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/filters/load_balancer/v1alpha1.rs diff --git a/src/generated/quilkin/filters/local_rate_limit.rs b/crates/quilkin-proto/src/quilkin/filters/local_rate_limit.rs similarity index 100% rename from src/generated/quilkin/filters/local_rate_limit.rs rename to crates/quilkin-proto/src/quilkin/filters/local_rate_limit.rs diff --git a/src/generated/quilkin/filters/local_rate_limit/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/filters/local_rate_limit/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/filters/local_rate_limit/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/filters/local_rate_limit/v1alpha1.rs diff --git a/src/generated/quilkin/filters/matches.rs b/crates/quilkin-proto/src/quilkin/filters/matches.rs similarity index 100% rename from src/generated/quilkin/filters/matches.rs rename to crates/quilkin-proto/src/quilkin/filters/matches.rs diff --git a/src/generated/quilkin/filters/matches/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/filters/matches/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/filters/matches/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/filters/matches/v1alpha1.rs diff --git a/src/generated/quilkin/filters/pass.rs b/crates/quilkin-proto/src/quilkin/filters/pass.rs similarity index 100% rename from src/generated/quilkin/filters/pass.rs rename to crates/quilkin-proto/src/quilkin/filters/pass.rs diff --git a/src/generated/quilkin/filters/pass/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/filters/pass/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/filters/pass/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/filters/pass/v1alpha1.rs diff --git a/src/generated/quilkin/filters/timestamp.rs b/crates/quilkin-proto/src/quilkin/filters/timestamp.rs similarity index 100% rename from src/generated/quilkin/filters/timestamp.rs rename to crates/quilkin-proto/src/quilkin/filters/timestamp.rs diff --git a/src/generated/quilkin/filters/timestamp/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/filters/timestamp/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/filters/timestamp/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/filters/timestamp/v1alpha1.rs diff --git a/src/generated/quilkin/filters/token_router.rs b/crates/quilkin-proto/src/quilkin/filters/token_router.rs similarity index 100% rename from src/generated/quilkin/filters/token_router.rs rename to crates/quilkin-proto/src/quilkin/filters/token_router.rs diff --git a/src/generated/quilkin/filters/token_router/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/filters/token_router/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/filters/token_router/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/filters/token_router/v1alpha1.rs diff --git a/src/generated/quilkin/relay.rs b/crates/quilkin-proto/src/quilkin/relay.rs similarity index 100% rename from src/generated/quilkin/relay.rs rename to crates/quilkin-proto/src/quilkin/relay.rs diff --git a/src/generated/quilkin/relay/v1alpha1.rs b/crates/quilkin-proto/src/quilkin/relay/v1alpha1.rs similarity index 100% rename from src/generated/quilkin/relay/v1alpha1.rs rename to crates/quilkin-proto/src/quilkin/relay/v1alpha1.rs diff --git a/src/generated/validate.rs b/crates/quilkin-proto/src/validate.rs similarity index 100% rename from src/generated/validate.rs rename to crates/quilkin-proto/src/validate.rs diff --git a/src/generated/xds.rs b/crates/quilkin-proto/src/xds.rs similarity index 100% rename from src/generated/xds.rs rename to crates/quilkin-proto/src/xds.rs diff --git a/src/generated/xds/annotations.rs b/crates/quilkin-proto/src/xds/annotations.rs similarity index 100% rename from src/generated/xds/annotations.rs rename to crates/quilkin-proto/src/xds/annotations.rs diff --git a/src/generated/xds/annotations/v3.rs b/crates/quilkin-proto/src/xds/annotations/v3.rs similarity index 100% rename from src/generated/xds/annotations/v3.rs rename to crates/quilkin-proto/src/xds/annotations/v3.rs diff --git a/src/generated/xds/core.rs b/crates/quilkin-proto/src/xds/core.rs similarity index 100% rename from src/generated/xds/core.rs rename to crates/quilkin-proto/src/xds/core.rs diff --git a/src/generated/xds/core/v3.rs b/crates/quilkin-proto/src/xds/core/v3.rs similarity index 100% rename from src/generated/xds/core/v3.rs rename to crates/quilkin-proto/src/xds/core/v3.rs diff --git a/src/generated/xds/kind.rs b/crates/quilkin-proto/src/xds/kind.rs similarity index 100% rename from src/generated/xds/kind.rs rename to crates/quilkin-proto/src/xds/kind.rs diff --git a/src/generated/xds/kind/matcher.rs b/crates/quilkin-proto/src/xds/kind/matcher.rs similarity index 100% rename from src/generated/xds/kind/matcher.rs rename to crates/quilkin-proto/src/xds/kind/matcher.rs diff --git a/src/generated/xds/kind/matcher/v3.rs b/crates/quilkin-proto/src/xds/kind/matcher/v3.rs similarity index 100% rename from src/generated/xds/kind/matcher/v3.rs rename to crates/quilkin-proto/src/xds/kind/matcher/v3.rs diff --git a/crates/xds/Cargo.toml b/crates/xds/Cargo.toml new file mode 100644 index 0000000000..bb9ad27e70 --- /dev/null +++ b/crates/xds/Cargo.toml @@ -0,0 +1,34 @@ +[package] +name = "xds" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +quilkin-proto = { path = "../quilkin-proto" } + +arc-swap.workspace = true +async-stream.workspace = true +cached.workspace = true +enum-map.workspace = true +eyre.workspace = true +futures.workspace = true +once_cell.workspace = true +prometheus.workspace = true +rand.workspace = true +prost.workspace = true +prost-types.workspace = true +serde.workspace = true +serde_json.workspace = true +schemars.workspace = true +fixedstr.workspace = true +parking_lot.workspace = true +thiserror.workspace = true +tokio-stream.workspace = true +tokio.workspace = true +tonic.workspace = true +tracing-futures.workspace = true +tracing.workspace = true +tryhard.workspace = true +uuid.workspace = true diff --git a/src/net/xds/client.rs b/crates/xds/src/client.rs similarity index 90% rename from src/net/xds/client.rs rename to crates/xds/src/client.rs index 1c932931ae..39b5ca261b 100644 --- a/src/net/xds/client.rs +++ b/crates/xds/src/client.rs @@ -14,7 +14,12 @@ * limitations under the License. */ -use std::{collections::HashSet, sync::atomic::Ordering, sync::Arc, time::Duration}; +use std::{ + collections::HashSet, + sync::atomic::{AtomicBool, Ordering}, + sync::Arc, + time::Duration, +}; use futures::StreamExt; use rand::Rng; @@ -27,7 +32,6 @@ use tryhard::{ }; use crate::{ - config::Config, generated::{ envoy::{ config::core::v3::Node, @@ -38,7 +42,7 @@ use crate::{ }, quilkin::relay::v1alpha1::aggregated_control_plane_discovery_service_client::AggregatedControlPlaneDiscoveryServiceClient, }, - net::xds::{Resource, ResourceType}, + resource::{Resource, ResourceType}, Result, }; @@ -51,6 +55,8 @@ pub type AdsStream = BidirectionalStream; pub type MdsClient = Client; pub type MdsStream = BidirectionalStream; +pub(crate) const IDLE_REQUEST_INTERVAL: Duration = Duration::from_secs(30); + #[tonic::async_trait] pub trait ServiceClient: Clone + Sized + Send + 'static { type Request: Clone + Send + Sync + Sized + 'static + std::fmt::Debug; @@ -215,18 +221,18 @@ impl Client { } impl MdsClient { - pub fn mds_client_stream( + pub fn mds_client_stream( &self, - config: Arc, - rt_config: crate::components::agent::Ready, + config: Arc, + is_healthy: Arc, ) -> MdsStream { - MdsStream::mds_client_stream(self, config, rt_config) + MdsStream::mds_client_stream(self, config, is_healthy) } - pub async fn delta_stream( + pub async fn delta_stream( self, - config: Arc, - rt_config: crate::components::agent::Ready, + config: Arc, + is_healthy: Arc, ) -> Result { let identifier = String::from(&*self.identifier); @@ -250,10 +256,10 @@ impl MdsClient { { let control_plane = super::server::ControlPlane::from_arc( config.clone(), - crate::components::admin::IDLE_REQUEST_INTERVAL, + IDLE_REQUEST_INTERVAL, ); let mut stream = control_plane.delta_aggregated_resources(stream).await?; - rt_config.relay_is_healthy.store(true, Ordering::SeqCst); + is_healthy.store(true, Ordering::SeqCst); while let Some(result) = stream.next().await { let response = result?; @@ -262,7 +268,7 @@ impl MdsClient { } } - rt_config.relay_is_healthy.store(false, Ordering::SeqCst); + is_healthy.store(false, Ordering::SeqCst); //tracing::warn!("lost connection to relay server, retrying"); let new_client = MdsClient::connect_with_backoff(&self.management_servers) @@ -326,7 +332,7 @@ impl DeltaClientStream { &self, identifier: &str, subs: &[(ResourceType, Vec)], - local: &crate::config::xds::LocalVersions, + local: &crate::config::LocalVersions, ) -> Result<()> { for (rt, names) in subs { let initial_resource_versions = local.get(*rt).clone(); @@ -374,7 +380,7 @@ impl DeltaServerStream { res_tx .send(DeltaDiscoveryResponse { - control_plane: Some(crate::net::xds::core::ControlPlane { identifier }), + control_plane: Some(crate::core::ControlPlane { identifier }), ..Default::default() }) .await?; @@ -407,20 +413,20 @@ impl Drop for DeltaSubscription { impl AdsClient { /// Starts a new stream to the xDS management server. - pub fn xds_client_stream( + pub fn xds_client_stream( &self, - config: Arc, - rt_config: crate::components::proxy::Ready, + config: Arc, + is_healthy: Arc, ) -> AdsStream { - AdsStream::xds_client_stream(self, config, rt_config) + AdsStream::xds_client_stream(self, config, is_healthy) } /// Attempts to start a new delta stream to the xDS management server, if the /// management server does not support delta xDS we return the client as an error - pub async fn delta_subscribe( + pub async fn delta_subscribe( self, - config: Arc, - rt_config: crate::components::proxy::Ready, + config: Arc, + is_healthy: Arc, resources: impl IntoIterator)>, ) -> Result { let resource_subscriptions: Vec<_> = resources.into_iter().collect(); @@ -445,7 +451,7 @@ impl AdsClient { // the future we'll send the resources we already have locally to hopefully // reduce the amount of response data if those resources are already up // to date with the current state of the management server - let local = Arc::new(crate::config::xds::LocalVersions::default()); + let local = Arc::new(crate::config::LocalVersions::default()); if let Err(err) = ds .refresh(&identifier, &resource_subscriptions, &local) .await @@ -461,7 +467,7 @@ impl AdsClient { loop { tracing::trace!("creating discovery response handler"); - let mut response_stream = crate::config::xds::handle_delta_discovery_responses( + let mut response_stream = crate::config::handle_delta_discovery_responses( identifier.clone(), stream, config.clone(), @@ -470,19 +476,12 @@ impl AdsClient { ); loop { - let next_response = tokio::time::timeout( - rt_config.idle_request_interval, - response_stream.next(), - ); + let next_response = + tokio::time::timeout(IDLE_REQUEST_INTERVAL, response_stream.next()); match next_response.await { Ok(Some(Ok(response))) => { - rt_config - .xds_is_healthy - .read() - .as_deref() - .unwrap() - .store(true, Ordering::SeqCst); + is_healthy.store(true, Ordering::SeqCst); tracing::trace!("received delta response"); ds.send_response(response).await?; @@ -506,12 +505,7 @@ impl AdsClient { } } - rt_config - .xds_is_healthy - .read() - .as_deref() - .unwrap() - .store(false, Ordering::SeqCst); + is_healthy.store(false, Ordering::SeqCst); tracing::info!("Lost connection to xDS, retrying"); let (new_client, _) = @@ -539,15 +533,15 @@ pub struct BidirectionalStream { } impl AdsStream { - pub fn xds_client_stream( + pub fn xds_client_stream( Client { client, identifier, management_servers, .. }: &AdsClient, - config: Arc, - rt_config: crate::components::proxy::Ready, + config: Arc, + is_healthy: Arc, ) -> Self { let mut client = client.clone(); let identifier = identifier.clone(); @@ -599,17 +593,11 @@ impl AdsStream { loop { let next_response = - tokio::time::timeout(rt_config.idle_request_interval, stream.next()); + tokio::time::timeout(IDLE_REQUEST_INTERVAL, stream.next()); match next_response.await { Ok(Some(Ok(ack))) => { - rt_config - .xds_is_healthy - .read() - .as_deref() - .unwrap() - .store(true, Ordering::SeqCst); - + is_healthy.store(true, Ordering::SeqCst); tracing::trace!("received ack"); requests.send(ack)?; continue; @@ -636,13 +624,7 @@ impl AdsStream { } } - rt_config - .xds_is_healthy - .read() - .as_deref() - .unwrap() - .store(false, Ordering::SeqCst); - + is_healthy.store(false, Ordering::SeqCst); tracing::info!("Lost connection to xDS, retrying"); client = AdsClient::connect_with_backoff(&management_servers) .await? @@ -675,15 +657,15 @@ impl AdsStream { } impl MdsStream { - pub fn mds_client_stream( + pub fn mds_client_stream( Client { client, identifier, management_servers, .. }: &MdsClient, - config: Arc, - rt_config: crate::components::agent::Ready, + config: Arc, + is_healthy: Arc, ) -> Self { let mut client = client.clone(); let identifier = identifier.clone(); @@ -696,7 +678,7 @@ impl MdsStream { loop { let initial_response = DiscoveryResponse { - control_plane: Some(crate::net::xds::core::ControlPlane { + control_plane: Some(crate::core::ControlPlane { identifier: (&*identifier).into(), }), ..<_>::default() @@ -717,18 +699,18 @@ impl MdsStream { let control_plane = super::server::ControlPlane::from_arc( config.clone(), - crate::components::admin::IDLE_REQUEST_INTERVAL, + IDLE_REQUEST_INTERVAL, ); let mut stream = control_plane.stream_resources(stream).await?; - rt_config.relay_is_healthy.store(true, Ordering::SeqCst); + is_healthy.store(true, Ordering::SeqCst); while let Some(result) = stream.next().await { let response = result?; - tracing::debug!(config=%serde_json::to_value(&config).unwrap(), "received discovery response"); + tracing::debug!("received discovery response"); requests.send(response)?; } - rt_config.relay_is_healthy.store(false, Ordering::SeqCst); + is_healthy.store(false, Ordering::SeqCst); tracing::warn!("lost connection to relay server, retrying"); client = MdsClient::connect_with_backoff(&management_servers) diff --git a/src/config/xds.rs b/crates/xds/src/config.rs similarity index 56% rename from src/config/xds.rs rename to crates/xds/src/config.rs index 7933f5c962..8f963e0772 100644 --- a/src/config/xds.rs +++ b/crates/xds/src/config.rs @@ -1,10 +1,25 @@ -use super::Config; -use crate::net::xds::{ +use crate::{ discovery::{DeltaDiscoveryRequest, DeltaDiscoveryResponse}, - metrics, Resource, ResourceType, + ResourceType, }; use enum_map::Enum as _; -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration}; + +pub(crate) const BACKOFF_INITIAL_DELAY: Duration = Duration::from_millis(500); +pub(crate) const BACKOFF_MAX_DELAY: Duration = Duration::from_secs(30); +pub(crate) const BACKOFF_MAX_JITTER: Duration = Duration::from_millis(2000); +pub(crate) const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); + +/// Returns the configured maximum allowed message size for gRPC messages. +/// When using State Of The World xDS, the message size can get large enough +/// that it can exceed the default limits. +pub fn max_grpc_message_size() -> usize { + std::env::var("QUILKIN_MAX_GRPC_MESSAGE_SIZE") + .as_deref() + .ok() + .and_then(|var| var.parse().ok()) + .unwrap_or(256 * 1024 * 1024) +} /// Keeps tracking of the local versions of each resource sent from the management /// server, allowing reconnections to the same/new management servers to send initial @@ -21,17 +36,54 @@ impl LocalVersions { } } +pub trait Configuration: Send + Sync + Sized + 'static { + fn identifier(&self) -> String; + fn apply(&self, response: crate::Resource) -> crate::Result<()>; + + fn apply_delta( + &self, + resource_type: ResourceType, + resources: impl Iterator>, + removed_resources: Vec, + local_versions: &mut HashMap, + ) -> crate::Result<()>; + + fn discovery_request( + &self, + _node_id: &str, + resource_type: ResourceType, + names: &[String], + ) -> Result, eyre::Error>; + + fn delta_discovery_request( + &self, + subscribed: &std::collections::BTreeSet, + client_versions: &crate::ClientVersions, + ) -> crate::Result; + + fn on_changed( + &self, + subscribed: crate::server::ControlPlane, + ) -> impl std::future::Future + Send + 'static; +} + +pub struct DeltaDiscoveryRes { + pub resources: Vec, + pub awaiting_ack: crate::AwaitingAck, + pub removed: Vec, +} + /// Processes responses from management servers, applying resources to the proxy #[tracing::instrument(skip_all, fields(identifier))] -pub fn handle_delta_discovery_responses( +pub fn handle_delta_discovery_responses( identifier: String, stream: impl futures::Stream> + 'static + Send, - config: Arc, + config: Arc, local: Arc, remote_addr: Option, ) -> std::pin::Pin> + Send>> { Box::pin(async_stream::try_stream! { - let _stream_metrics = metrics::StreamConnectionMetrics::new(identifier.clone()); + let _stream_metrics = crate::metrics::StreamConnectionMetrics::new(identifier.clone()); tracing::trace!("awaiting delta response"); for await response in stream { @@ -49,7 +101,7 @@ pub fn handle_delta_discovery_responses( let control_plane_identifier = response.control_plane.as_ref().map(|cp| cp.identifier.as_str()).unwrap_or_default(); - metrics::delta_discovery_responses(control_plane_identifier, &response.type_url).inc(); + crate::metrics::delta_discovery_responses(control_plane_identifier, &response.type_url).inc(); tracing::trace!( version = &*response.system_version_info, r#type = &*response.type_url, @@ -70,7 +122,7 @@ pub fn handle_delta_discovery_responses( .resources .into_iter() .map(|res| { - Resource::try_from(res.resource.ok_or_else(|| eyre::format_err!("resource field not set"))?).map(|mut rsrc| { + crate::Resource::try_from(res.resource.ok_or_else(|| eyre::format_err!("resource field not set"))?).map(|mut rsrc| { if let Some(ra) = remote_addr { rsrc.add_host_to_datacenter(ra); } @@ -80,14 +132,14 @@ pub fn handle_delta_discovery_responses( }; let error_detail = if let Err(error) = result { - metrics::nacks(control_plane_identifier, &response.type_url).inc(); + crate::metrics::nacks(control_plane_identifier, &response.type_url).inc(); Some(crate::generated::google::rpc::Status { code: 3, message: error.to_string(), ..Default::default() }) } else { - metrics::acks(control_plane_identifier, &response.type_url).inc(); + crate::metrics::acks(control_plane_identifier, &response.type_url).inc(); None }; diff --git a/src/net/xds.rs b/crates/xds/src/lib.rs similarity index 92% rename from src/net/xds.rs rename to crates/xds/src/lib.rs index 7b276d752f..ce70d177a6 100644 --- a/src/net/xds.rs +++ b/crates/xds/src/lib.rs @@ -14,14 +14,15 @@ * limitations under the License. */ -pub(crate) use crate::generated::quilkin::relay::v1alpha1 as relay; - -pub(crate) mod client; -pub(crate) mod metrics; -mod resource; +pub mod client; +pub mod config; +pub mod locality; +pub mod metrics; +pub mod net; +pub mod resource; pub mod server; -use crate::net::{cluster::EndpointSetVersion, endpoint::Locality}; +use crate::locality::Locality; pub use crate::generated::envoy::{ config::core::v3::{self as core, socket_address}, @@ -29,8 +30,11 @@ pub use crate::generated::envoy::{ service::discovery::v3 as discovery, }; pub use client::{AdsClient, Client}; +pub use quilkin_proto as generated; pub use resource::{Resource, ResourceType}; -use std::collections::HashMap; +use std::{collections::HashMap, fmt}; + +pub type Result = std::result::Result; /// Keeps track of what resource versions a particular client has pub enum ClientVersions { @@ -140,6 +144,40 @@ impl ClientVersions { } } +#[derive(Copy, Clone, PartialEq, Eq)] +pub struct EndpointSetVersion(u64); + +impl EndpointSetVersion { + pub fn from_number(version: u64) -> Self { + Self(version) + } + + pub fn number(&self) -> u64 { + self.0 + } +} + +impl fmt::Display for EndpointSetVersion { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::LowerHex::fmt(&self.0, f) + } +} + +impl fmt::Debug for EndpointSetVersion { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::LowerHex::fmt(&self.0, f) + } +} + +impl std::str::FromStr for EndpointSetVersion { + type Err = eyre::Error; + + #[inline] + fn from_str(s: &str) -> Result { + Ok(Self(u64::from_str_radix(s, 16)?)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/net/endpoint/locality.rs b/crates/xds/src/locality.rs similarity index 96% rename from src/net/endpoint/locality.rs rename to crates/xds/src/locality.rs index eadbcac38b..974ead0dad 100644 --- a/src/net/endpoint/locality.rs +++ b/crates/xds/src/locality.rs @@ -20,7 +20,7 @@ use std::num::NonZeroUsize; type FixedBuffer = fixedstr::Flexstr<64>; const SEP: char = ':'; -/// The location of an [`Endpoint`][super::Endpoint]. +/// The location of an `Endpoint`. #[derive(Clone, Default, Debug, Hash, Eq, PartialEq, PartialOrd, Ord)] pub struct Locality { /// Internal buffer with the full string @@ -61,7 +61,6 @@ impl Locality { } } - #[cfg(test)] pub fn with_region(region: impl AsRef) -> Self { let region = region.as_ref(); Self { @@ -136,14 +135,14 @@ impl std::str::FromStr for Locality { } } -impl From for Locality { +impl From for Locality { #[inline] - fn from(value: crate::net::cluster::proto::Locality) -> Self { + fn from(value: crate::resource::proto::Locality) -> Self { Self::new(value.region, value.zone, value.sub_zone) } } -impl From for crate::net::cluster::proto::Locality { +impl From for crate::resource::proto::Locality { #[inline] fn from(value: Locality) -> Self { Self { diff --git a/src/net/xds/metrics.rs b/crates/xds/src/metrics.rs similarity index 88% rename from src/net/xds/metrics.rs rename to crates/xds/src/metrics.rs index ee471c2fd0..24cc567050 100644 --- a/src/net/xds/metrics.rs +++ b/crates/xds/src/metrics.rs @@ -14,13 +14,33 @@ * limitations under the License. */ +use arc_swap::ArcSwap; use once_cell::sync::Lazy; -use prometheus::{IntCounterVec, IntGaugeVec}; +use prometheus::{IntCounterVec, IntGaugeVec, Registry}; pub(crate) const NODE_LABEL: &str = "node"; pub(crate) const CONTROL_PLANE_LABEL: &str = "control_plane"; pub(crate) const TYPE_LABEL: &str = "type"; +/// TODO: Remove and replace with a local registry. +static REGISTRY: Lazy> = Lazy::new(|| { + ArcSwap::new(std::sync::Arc::new( + Registry::new_custom(Some("quilkin".into()), None).unwrap(), + )) +}); + +/// Sets the [prometheus::Registry] containing all the metrics +/// registered in xDS. +pub fn set_registry(registry: std::sync::Arc) { + REGISTRY.store(registry); +} + +/// Returns the [prometheus::Registry] containing all the metrics +/// registered in xDS. +pub fn registry() -> arc_swap::Guard> { + REGISTRY.load() +} + pub(crate) fn active_control_planes(control_plane: &str) -> prometheus::IntGauge { static ACTIVE_CONTROL_PLANES: Lazy = Lazy::new(|| { prometheus::register_int_gauge_vec_with_registry! { diff --git a/crates/xds/src/net.rs b/crates/xds/src/net.rs new file mode 100644 index 0000000000..ce4ee61d11 --- /dev/null +++ b/crates/xds/src/net.rs @@ -0,0 +1,57 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use std::{io, net::SocketAddr}; + +/// TCP listener for a GRPC service, always binds to the local IPv6 address +pub struct TcpListener { + inner: std::net::TcpListener, +} + +impl TcpListener { + /// Binds a TCP listener, if `None` is passed, binds to an ephemeral port + #[inline] + pub fn bind(port: Option) -> io::Result { + std::net::TcpListener::bind((std::net::Ipv6Addr::UNSPECIFIED, port.unwrap_or_default())) + .map(|inner| Self { inner }) + } + + /// Retrieves the port the listener is bound to + #[inline] + pub fn port(&self) -> u16 { + self.inner.local_addr().expect("failed to bind").port() + } + + /// Retrieves the local address the listener is bound to + #[inline] + pub fn local_addr(&self) -> SocketAddr { + self.inner.local_addr().expect("failed to bind") + } + + #[inline] + pub fn into_stream(self) -> io::Result { + self.inner.set_nonblocking(true)?; + let tl = tokio::net::TcpListener::from_std(self.inner)?; + Ok(tokio_stream::wrappers::TcpListenerStream::new(tl)) + } +} + +impl From for std::net::TcpListener { + #[inline] + fn from(value: TcpListener) -> Self { + value.inner + } +} diff --git a/src/net/xds/resource.rs b/crates/xds/src/resource.rs similarity index 93% rename from src/net/xds/resource.rs rename to crates/xds/src/resource.rs index 4ff1eb8673..c981c6a57f 100644 --- a/src/net/xds/resource.rs +++ b/crates/xds/src/resource.rs @@ -39,12 +39,14 @@ type_urls! { } } +pub use crate::generated::quilkin::config::v1alpha1 as proto; + #[derive(Clone, Debug)] pub enum Resource { - Cluster(Box), - Datacenter(Box), + Cluster(Box), + Datacenter(Box), Listener(Box), - FilterChain(crate::net::cluster::proto::FilterChain), + FilterChain(proto::FilterChain), } impl Resource { @@ -54,7 +56,7 @@ impl Resource { Self::Cluster(cluster) => cluster .locality .clone() - .map(|locality| crate::net::endpoint::Locality::from(locality).to_string()) + .map(|locality| crate::locality::Locality::from(locality).to_string()) .unwrap_or_default(), Self::Listener(listener) => listener.name.to_string(), Self::FilterChain(_fc) => String::new(), @@ -144,7 +146,11 @@ impl ResourceType { ) -> Result { Ok(prost_types::Any { type_url: self.type_url().into(), - value: crate::codec::prost::encode(message)?, + value: { + let mut buf = Vec::with_capacity(message.encoded_len()); + message.encode(&mut buf)?; + buf + }, }) } } diff --git a/src/net/xds/server.rs b/crates/xds/src/server.rs similarity index 88% rename from src/net/xds/server.rs rename to crates/xds/src/server.rs index e4749def32..eb81114979 100644 --- a/src/net/xds/server.rs +++ b/crates/xds/src/server.rs @@ -22,30 +22,24 @@ use tokio_stream::StreamExt; use tracing_futures::Instrument; use crate::{ - config::Config, - net::{ - xds::{ - discovery::{ - aggregated_discovery_service_server::{ - AggregatedDiscoveryService, AggregatedDiscoveryServiceServer, - }, - DeltaDiscoveryRequest, DeltaDiscoveryResponse, DiscoveryRequest, DiscoveryResponse, - }, - metrics, - relay::aggregated_control_plane_discovery_service_server::{ - AggregatedControlPlaneDiscoveryService, - AggregatedControlPlaneDiscoveryServiceServer, - }, - ResourceType, + discovery::{ + aggregated_discovery_service_server::{ + AggregatedDiscoveryService, AggregatedDiscoveryServiceServer, }, - TcpListener, + DeltaDiscoveryRequest, DeltaDiscoveryResponse, DiscoveryRequest, DiscoveryResponse, + }, + generated::quilkin::relay::v1alpha1::aggregated_control_plane_discovery_service_server::{ + AggregatedControlPlaneDiscoveryService, AggregatedControlPlaneDiscoveryServiceServer, }, + metrics, + net::TcpListener, + ResourceType, }; #[tracing::instrument(skip_all)] -pub fn spawn( +pub fn spawn( listener: TcpListener, - config: std::sync::Arc, + config: std::sync::Arc, idle_request_interval: Duration, ) -> io::Result>> { let server = AggregatedDiscoveryServiceServer::new(ControlPlane::from_arc( @@ -60,9 +54,9 @@ pub fn spawn( .map_err(From::from)) } -pub(crate) fn control_plane_discovery_server( +pub fn control_plane_discovery_server( listener: TcpListener, - config: Arc, + config: Arc, idle_request_interval: Duration, ) -> io::Result>> { let server = AggregatedControlPlaneDiscoveryServiceServer::new(ControlPlane::from_arc( @@ -77,11 +71,20 @@ pub(crate) fn control_plane_discovery_server( .map_err(From::from)) } -#[derive(Clone)] -pub struct ControlPlane { - config: Arc, +pub struct ControlPlane { + pub config: Arc, idle_request_interval: Duration, - watchers: Arc>, + watchers: Arc>, +} + +impl Clone for ControlPlane { + fn clone(&self) -> Self { + Self { + config: self.config.clone(), + idle_request_interval: self.idle_request_interval, + watchers: self.watchers.clone(), + } + } } struct Watchers { @@ -101,8 +104,8 @@ impl Default for Watchers { } } -impl ControlPlane { - pub fn from_arc(config: Arc, idle_request_interval: Duration) -> Self { +impl ControlPlane { + pub fn from_arc(config: Arc, idle_request_interval: Duration) -> Self { let this = Self { config, idle_request_interval, @@ -110,55 +113,14 @@ impl ControlPlane { }; tokio::spawn({ - let this = this.clone(); - async move { - let mut cluster_watcher = this.config.clusters.watch(); - tracing::trace!("waiting for changes"); - - match &this.config.datacenter { - crate::config::DatacenterConfig::Agent {..} => { - loop { - match cluster_watcher.changed().await { - Ok(()) => this.push_update(ResourceType::Cluster), - Err(error) => tracing::error!(%error, "error watching changes"), - } - } - } - crate::config::DatacenterConfig::NonAgent { datacenters } => { - let mut dc_watcher = datacenters.watch(); - loop { - tokio::select! { - result = cluster_watcher.changed() => { - match result { - Ok(()) => this.push_update(ResourceType::Cluster), - Err(error) => tracing::error!(%error, "error watching changes"), - } - } - result = dc_watcher.changed() => { - match result { - Ok(()) => this.push_update(ResourceType::Datacenter), - Err(error) => tracing::error!(%error, "error watching changes"), - } - } - } - } - } - } - } - .instrument(tracing::debug_span!("control_plane_watch_cluster")) - }); - - this.config.filters.watch({ - let this = this.clone(); - move |_| { - this.push_update(ResourceType::Listener); - } + let this2 = this.clone(); + this.config.on_changed(this2) }); this } - fn push_update(&self, resource_type: ResourceType) { + pub fn push_update(&self, resource_type: ResourceType) { let watchers = &self.watchers[resource_type]; watchers .version @@ -188,8 +150,8 @@ impl ControlPlane { .version .load(std::sync::atomic::Ordering::Relaxed) .to_string(), - control_plane: Some(crate::net::xds::core::ControlPlane { - identifier: (*self.config.id.load()).clone(), + control_plane: Some(crate::core::ControlPlane { + identifier: self.config.identifier(), }), type_url: resource_type.type_url().to_owned(), canary: false, @@ -311,7 +273,7 @@ impl ControlPlane { + std::marker::Unpin + 'static, { - use crate::net::xds::{AwaitingAck, ClientVersions}; + use crate::{AwaitingAck, ClientVersions}; use std::collections::BTreeSet; tracing::debug!("starting delta stream"); @@ -330,8 +292,8 @@ impl ControlPlane { let mut pending_acks = cached::TimedSizedCache::with_size_and_lifespan(50, 1); let this = Self::clone(self); - let control_plane_id = crate::net::xds::core::ControlPlane { - identifier: (*this.config.id.load()).clone(), + let control_plane_id = crate::core::ControlPlane { + identifier: this.config.identifier(), }; struct ResourceTypeTracker { @@ -551,7 +513,7 @@ impl ControlPlane { } #[tonic::async_trait] -impl AggregatedDiscoveryService for ControlPlane { +impl AggregatedDiscoveryService for ControlPlane { type StreamAggregatedResourcesStream = std::pin::Pin> + Send>>; type DeltaAggregatedResourcesStream = @@ -583,7 +545,7 @@ impl AggregatedDiscoveryService for ControlPlane { } #[tonic::async_trait] -impl AggregatedControlPlaneDiscoveryService for ControlPlane { +impl AggregatedControlPlaneDiscoveryService for ControlPlane { type StreamAggregatedResourcesStream = std::pin::Pin> + Send>>; type DeltaAggregatedResourcesStream = @@ -618,18 +580,18 @@ impl AggregatedControlPlaneDiscoveryService for ControlPlane { Arc::from(&*identifier), move |(mut requests, _rx), _subscribed_resources| async move { tracing::info!(%identifier, "sending initial discovery request"); - crate::net::xds::client::MdsStream::discovery_request_without_cache( + crate::client::MdsStream::discovery_request_without_cache( &identifier, &mut requests, - crate::net::xds::ResourceType::Cluster, + crate::ResourceType::Cluster, &[], ) .map_err(|error| tonic::Status::internal(error.to_string()))?; - crate::net::xds::client::MdsStream::discovery_request_without_cache( + crate::client::MdsStream::discovery_request_without_cache( &identifier, &mut requests, - crate::net::xds::ResourceType::Datacenter, + crate::ResourceType::Datacenter, &[], ) .map_err(|error| tonic::Status::internal(error.to_string()))?; @@ -652,10 +614,10 @@ impl AggregatedControlPlaneDiscoveryService for ControlPlane { requests.send(ack?)?; } else { tracing::trace!("exceeded idle interval, sending request"); - crate::net::xds::client::MdsStream::discovery_request_without_cache( + crate::client::MdsStream::discovery_request_without_cache( &identifier, &mut requests, - crate::net::xds::ResourceType::Cluster, + crate::ResourceType::Cluster, &[], ) .map_err(|error| tonic::Status::internal(error.to_string()))?; @@ -678,7 +640,7 @@ impl AggregatedControlPlaneDiscoveryService for ControlPlane { &self, responses: tonic::Request>, ) -> Result, tonic::Status> { - use crate::net::xds::ResourceType; + use crate::ResourceType; let remote_addr = responses .remote_addr() @@ -708,7 +670,7 @@ impl AggregatedControlPlaneDiscoveryService for ControlPlane { async move { tracing::info!(identifier, "sending initial delta discovery request"); - let local = Arc::new(crate::config::xds::LocalVersions::default()); + let local = Arc::new(crate::config::LocalVersions::default()); ds.refresh( &identifier, @@ -721,7 +683,7 @@ impl AggregatedControlPlaneDiscoveryService for ControlPlane { .await .map_err(|error| tonic::Status::internal(error.to_string()))?; - let mut response_stream = crate::config::xds::handle_delta_discovery_responses( + let mut response_stream = crate::config::handle_delta_discovery_responses( identifier.clone(), responses, config.clone(), @@ -771,7 +733,7 @@ mod tests { use tokio::time::timeout; use super::*; - use crate::net::xds::{ + use crate::{ core::Node, // listener::v3::{FilterChain, Listener}, // }, diff --git a/src/components/agent.rs b/src/components/agent.rs index 1796af89e6..e5110494bc 100644 --- a/src/components/agent.rs +++ b/src/components/agent.rs @@ -84,10 +84,10 @@ impl Agent { // Attempt to connect to a delta stream if the relay has one // available, otherwise fallback to the regular aggregated stream - Some(match client.delta_stream(config.clone(), ready.clone()).await { + Some(match client.delta_stream(config.clone(), ready.relay_is_healthy.clone()).await { Ok(ds) => XdsTask::Delta(ds), Err(client) => { - XdsTask::Aggregated(client.mds_client_stream(config, ready)) + XdsTask::Aggregated(client.mds_client_stream(config, ready.relay_is_healthy.clone())) } }) } diff --git a/src/components/manage.rs b/src/components/manage.rs index d0da84741f..b6362627f3 100644 --- a/src/components/manage.rs +++ b/src/components/manage.rs @@ -54,11 +54,14 @@ impl Manage { // Attempt to connect to a delta stream if the relay has one // available, otherwise fallback to the regular aggregated stream Some( - match client.delta_stream(config.clone(), ready.clone()).await { + match client + .delta_stream(config.clone(), ready.relay_is_healthy.clone()) + .await + { Ok(ds) => XdsTask::Delta(ds), - Err(client) => { - XdsTask::Aggregated(client.mds_client_stream(config.clone(), ready)) - } + Err(client) => XdsTask::Aggregated( + client.mds_client_stream(config.clone(), ready.relay_is_healthy.clone()), + ), }, ) } else { diff --git a/src/components/proxy.rs b/src/components/proxy.rs index 4c928fc18e..5fe9359b35 100644 --- a/src/components/proxy.rs +++ b/src/components/proxy.rs @@ -167,11 +167,13 @@ impl Proxy { let mut delta_sub = None; let mut state_sub = None; + let xds_is_healthy = + ready.xds_is_healthy.read().as_ref().unwrap().clone(); match client .delta_subscribe( config.clone(), - ready.clone(), + xds_is_healthy.clone(), [ (ResourceType::Cluster, Vec::new()), (ResourceType::Listener, Vec::new()), @@ -182,7 +184,8 @@ impl Proxy { { Ok(ds) => delta_sub = Some(ds), Err(client) => { - let mut stream = client.xds_client_stream(config, ready); + let mut stream = + client.xds_client_stream(config, xds_is_healthy); tokio::time::sleep(std::time::Duration::from_nanos(1)).await; stream diff --git a/src/config.rs b/src/config.rs index e93263677d..a6931cd1ea 100644 --- a/src/config.rs +++ b/src/config.rs @@ -49,25 +49,10 @@ mod error; pub mod providers; mod slot; pub mod watch; -pub(crate) mod xds; - -base64_serde_type!(pub Base64Standard, base64::engine::general_purpose::STANDARD); pub(crate) const BACKOFF_INITIAL_DELAY: Duration = Duration::from_millis(500); -pub(crate) const BACKOFF_MAX_DELAY: Duration = Duration::from_secs(30); -pub(crate) const BACKOFF_MAX_JITTER: Duration = Duration::from_millis(2000); -pub(crate) const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); - -/// Returns the configured maximum allowed message size for gRPC messages. -/// When using State Of The World xDS, the message size can get large enough -/// that it can exceed the default limits. -pub fn max_grpc_message_size() -> usize { - std::env::var("QUILKIN_MAX_GRPC_MESSAGE_SIZE") - .as_deref() - .ok() - .and_then(|var| var.parse().ok()) - .unwrap_or(256 * 1024 * 1024) -} + +base64_serde_type!(pub Base64Standard, base64::engine::general_purpose::STANDARD); #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] #[serde(untagged)] @@ -103,13 +88,89 @@ pub struct Config { pub datacenter: DatacenterConfig, } -pub struct DeltaDiscoveryRes { - pub resources: Vec, - pub awaiting_ack: crate::net::xds::AwaitingAck, - pub removed: Vec, +impl xds::config::Configuration for Config { + fn identifier(&self) -> String { + (*self.id.load()).clone() + } + + fn apply(&self, response: xds::Resource) -> crate::Result<()> { + self.apply(response) + } + + fn apply_delta( + &self, + resource_type: ResourceType, + resources: impl Iterator>, + removed_resources: Vec, + local_versions: &mut HashMap, + ) -> crate::Result<()> { + self.apply_delta(resource_type, resources, removed_resources, local_versions) + } + + fn discovery_request( + &self, + _node_id: &str, + resource_type: ResourceType, + names: &[String], + ) -> Result, eyre::Error> { + self.discovery_request(_node_id, resource_type, names) + } + + fn delta_discovery_request( + &self, + subscribed: &std::collections::BTreeSet, + client_versions: &xds::ClientVersions, + ) -> crate::Result { + self.delta_discovery_request(subscribed, client_versions) + } + + fn on_changed( + &self, + control_plane: xds::server::ControlPlane, + ) -> impl std::future::Future + Send + 'static { + let mut cluster_watcher = self.clusters.watch(); + self.filters.watch({ + let this = control_plane.clone(); + move |_| { + this.push_update(ResourceType::Listener); + } + }); + + tracing::trace!("waiting for changes"); + + async move { + match &control_plane.config.datacenter { + crate::config::DatacenterConfig::Agent { .. } => loop { + match cluster_watcher.changed().await { + Ok(()) => control_plane.push_update(ResourceType::Cluster), + Err(error) => tracing::error!(%error, "error watching changes"), + } + }, + crate::config::DatacenterConfig::NonAgent { datacenters } => { + let mut dc_watcher = datacenters.watch(); + loop { + tokio::select! { + result = cluster_watcher.changed() => { + match result { + Ok(()) => control_plane.push_update(ResourceType::Cluster), + Err(error) => tracing::error!(%error, "error watching changes"), + } + } + result = dc_watcher.changed() => { + match result { + Ok(()) => control_plane.push_update(ResourceType::Datacenter), + Err(error) => tracing::error!(%error, "error watching changes"), + } + } + } + } + } + } + } + } } -use crate::net::xds::ClientVersions; +use crate::net::xds::{config::DeltaDiscoveryRes, ClientVersions}; impl Config { /// Attempts to deserialize `input` as a YAML object representing `Self`. @@ -203,20 +264,20 @@ impl Config { if names.is_empty() { for cluster in self.clusters.read().iter() { resources.push(resource_type.encode_to_any( - &crate::net::cluster::proto::Cluster::from(( + &crate::net::cluster::locality_and_set_to_proto( cluster.key(), &cluster.value().endpoints, - )), + ), )?); } } else { for locality in names.iter().filter_map(|name| name.parse().ok()) { if let Some(cluster) = self.clusters.read().get(&Some(locality)) { resources.push(resource_type.encode_to_any( - &crate::net::cluster::proto::Cluster::from(( + &crate::net::cluster::locality_and_set_to_proto( cluster.key(), &cluster.value().endpoints, - )), + ), )?); } } @@ -325,7 +386,7 @@ impl Config { name: key.as_ref().map(|k| k.to_string()).unwrap_or_default(), version: current_version.to_string(), resource: Some(resource_type.encode_to_any( - &crate::net::cluster::proto::Cluster::from((key, &value.endpoints)), + &crate::net::cluster::locality_and_set_to_proto(key, &value.endpoints), )?), ..Default::default() }); diff --git a/src/generated.rs b/src/generated.rs deleted file mode 100644 index bd7a458986..0000000000 --- a/src/generated.rs +++ /dev/null @@ -1,6 +0,0 @@ -#![allow(clippy::doc_markdown, clippy::use_self)] -pub mod envoy; -pub mod google; -pub mod quilkin; -pub mod validate; -pub mod xds; diff --git a/src/lib.rs b/src/lib.rs index b0e996d475..f4c43c75b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -34,13 +34,7 @@ pub mod filters; #[doc(hidden)] pub mod test; -#[allow( - clippy::enum_variant_names, - clippy::large_enum_variant, - clippy::len_without_is_empty, - rustdoc::bare_urls -)] -pub mod generated; +pub use quilkin_proto as generated; pub type Result = std::result::Result; diff --git a/src/net.rs b/src/net.rs index 2277971c39..3ce4a83373 100644 --- a/src/net.rs +++ b/src/net.rs @@ -92,7 +92,9 @@ pub mod cluster; pub mod endpoint; pub(crate) mod maxmind_db; pub mod phoenix; -pub mod xds; + +pub use xds; +pub use xds::net::TcpListener; use std::{ io, @@ -328,46 +330,6 @@ impl DualStackEpollSocket { } } -/// TCP listener for a GRPC service, always binds to the local IPv6 address -pub struct TcpListener { - inner: std::net::TcpListener, -} - -impl TcpListener { - /// Binds a TCP listener, if `None` is passed, binds to an ephemeral port - #[inline] - pub fn bind(port: Option) -> io::Result { - std::net::TcpListener::bind((std::net::Ipv6Addr::UNSPECIFIED, port.unwrap_or_default())) - .map(|inner| Self { inner }) - } - - /// Retrieves the port the listener is bound to - #[inline] - pub fn port(&self) -> u16 { - self.inner.local_addr().expect("failed to bind").port() - } - - /// Retrieves the local address the listener is bound to - #[inline] - pub fn local_addr(&self) -> SocketAddr { - self.inner.local_addr().expect("failed to bind") - } - - #[inline] - pub fn into_stream(self) -> io::Result { - self.inner.set_nonblocking(true)?; - let tl = tokio::net::TcpListener::from_std(self.inner)?; - Ok(tokio_stream::wrappers::TcpListenerStream::new(tl)) - } -} - -impl From for std::net::TcpListener { - #[inline] - fn from(value: TcpListener) -> Self { - value.inner - } -} - #[cfg(test)] mod tests { use std::{ diff --git a/src/net/cluster.rs b/src/net/cluster.rs index edf5570eb2..6ec99d5e14 100644 --- a/src/net/cluster.rs +++ b/src/net/cluster.rs @@ -15,6 +15,7 @@ */ use std::{ + borrow::Borrow, collections::{hash_map::RandomState, BTreeSet}, fmt, sync::atomic::{AtomicU64, AtomicUsize, Ordering::Relaxed}, @@ -25,6 +26,7 @@ use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use crate::net::endpoint::{Endpoint, EndpointAddress, Locality}; +use xds::EndpointSetVersion; const SUBSYSTEM: &str = "cluster"; @@ -60,30 +62,6 @@ pub(crate) fn active_endpoints() -> &'static prometheus::IntGauge { &ACTIVE_ENDPOINTS } -#[derive(Copy, Clone, PartialEq, Eq)] -pub struct EndpointSetVersion(u64); - -impl fmt::Display for EndpointSetVersion { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::LowerHex::fmt(&self.0, f) - } -} - -impl fmt::Debug for EndpointSetVersion { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::LowerHex::fmt(&self.0, f) - } -} - -impl std::str::FromStr for EndpointSetVersion { - type Err = eyre::Error; - - #[inline] - fn from_str(s: &str) -> Result { - Ok(Self(u64::from_str_radix(s, 16)?)) - } -} - pub type TokenAddressMap = std::collections::BTreeMap>; #[derive(Copy, Clone)] @@ -132,7 +110,7 @@ impl EndpointSet { let mut this = Self { endpoints, token_map: TokenAddressMap::new(), - hash: hash.0, + hash: hash.number(), version: 1, }; @@ -165,7 +143,7 @@ impl EndpointSet { /// Unique version for this endpoint set #[inline] pub fn version(&self) -> EndpointSetVersion { - EndpointSetVersion(self.hash) + EndpointSetVersion::from_number(self.hash) } /// Bumps the version, calculating a hash for the entire endpoint set @@ -654,24 +632,6 @@ where } } -impl From<(Option, BTreeSet)> for proto::Cluster { - fn from((locality, endpoints): (Option, BTreeSet)) -> Self { - Self { - locality: locality.map(From::from), - endpoints: endpoints.iter().map(From::from).collect(), - } - } -} - -impl From<(&Option, &BTreeSet)> for proto::Cluster { - fn from((locality, endpoints): (&Option, &BTreeSet)) -> Self { - Self { - locality: locality.clone().map(From::from), - endpoints: endpoints.iter().map(From::from).collect(), - } - } -} - impl From<&'_ Endpoint> for proto::Endpoint { fn from(endpoint: &Endpoint) -> Self { Self { @@ -683,6 +643,16 @@ impl From<&'_ Endpoint> for proto::Endpoint { } } +pub(crate) fn locality_and_set_to_proto( + locality: impl Borrow>, + endpoints: impl Borrow>, +) -> proto::Cluster { + proto::Cluster { + locality: locality.borrow().clone().map(From::from), + endpoints: endpoints.borrow().iter().map(From::from).collect(), + } +} + #[cfg(test)] mod tests { use std::net::Ipv4Addr; diff --git a/src/net/endpoint.rs b/src/net/endpoint.rs index 73a78dc1eb..00c356eca8 100644 --- a/src/net/endpoint.rs +++ b/src/net/endpoint.rs @@ -17,7 +17,6 @@ //! Types representing where the data is the sent. pub(crate) mod address; -mod locality; pub mod metadata; use crate::net::cluster::proto; @@ -26,10 +25,11 @@ use serde::{Deserialize, Serialize}; pub use self::{ address::{AddressKind, EndpointAddress}, - locality::Locality, metadata::DynamicMetadata, }; +pub use xds::locality::Locality; + pub type EndpointMetadata = metadata::MetadataView; pub use base64_set::Set;