From f7b1e5ad899373eeef7574df380bc0a8b7830210 Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Wed, 5 Feb 2025 19:39:40 +0000 Subject: [PATCH] Restatectl unification (#2634) * Make datafusion optional in restate_admin * Unify tikv-jemalloc-sys features between restate-server and restatectl * Disable resolver functionality in jsonschema This can download schemas over internet, read files - not ideal * Update workspace hack --- Cargo.lock | 5 +--- Cargo.toml | 5 ++-- crates/admin/Cargo.toml | 5 ++-- crates/admin/src/lib.rs | 1 + crates/admin/src/service.rs | 37 +++++++++++++++---------- crates/admin/src/state.rs | 6 ---- crates/admin/src/storage_query/mod.rs | 13 +++++++-- crates/admin/src/storage_query/query.rs | 2 +- crates/bifrost/Cargo.toml | 4 +-- crates/node/Cargo.toml | 2 +- crates/node/src/roles/admin.rs | 4 +-- crates/rocksdb/Cargo.toml | 4 +++ server/Cargo.toml | 2 +- tools/xtask/src/main.rs | 1 - workspace-hack/Cargo.toml | 12 ++------ 15 files changed, 54 insertions(+), 49 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 17a818f1bb..ae6b20d206 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4013,7 +4013,6 @@ dependencies = [ "percent-encoding", "referencing", "regex-syntax 0.8.5", - "reqwest", "serde", "serde_json", "uuid-simd", @@ -6266,7 +6265,6 @@ checksum = "43e734407157c3c2034e0258f5e4473ddb361b1e85f95a66690d67264d7cd1da" dependencies = [ "base64 0.22.1", "bytes", - "futures-channel", "futures-core", "futures-util", "h2 0.4.7", @@ -7080,6 +7078,7 @@ dependencies = [ "strum", "thiserror 2.0.11", "threadpool", + "tikv-jemalloc-sys", "tokio", "tracing", "workspace-hack", @@ -10222,8 +10221,6 @@ dependencies = [ "syn 1.0.109", "syn 2.0.98", "sync_wrapper", - "tikv-jemalloc-sys", - "tikv-jemallocator", "time", "time-macros", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 31d5ee8632..f0213f10c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -140,7 +140,7 @@ hyper-rustls = { version = "0.27.2", default-features = false, features = [ hyper-util = { version = "0.1" } indexmap = "2.7" itertools = "0.14.0" -jsonschema = "0.28.3" +jsonschema = { version = "0.28.3", default-features = false } metrics = { version = "0.24" } metrics-tracing-context = { version = "0.18.0" } metrics-exporter-prometheus = { version = "0.16", default-features = false, features = [ @@ -192,7 +192,8 @@ test-log = { version = "0.2.11", default-features = false, features = [ "trace", ] } toml = { version = "0.8.12" } -tikv-jemallocator = "0.6" +tikv-jemallocator = { version = "0.6", features = ["unprefixed_malloc_on_supported_platforms", "profiling"] } +tikv-jemalloc-sys = { version = "0.6", features = ["profiling"] } thiserror = "2.0" tokio = { version = "1.41.1", default-features = false, features = [ "rt-multi-thread", diff --git a/crates/admin/Cargo.toml b/crates/admin/Cargo.toml index d29814778a..5192f74c81 100644 --- a/crates/admin/Cargo.toml +++ b/crates/admin/Cargo.toml @@ -14,6 +14,7 @@ options_schema = ["restate-service-client/options_schema"] memory-loglet = ["restate-bifrost/memory-loglet"] replicated-loglet = ["restate-bifrost/replicated-loglet"] serve-web-ui = ["restate-web-ui", "mime_guess"] +storage-query = ["dep:restate-storage-query-datafusion", "dep:datafusion"] metadata-api = [] [dependencies] @@ -28,7 +29,7 @@ restate-futures-util = { workspace = true } restate-metadata-server = { workspace = true } restate-service-client = { workspace = true } restate-service-protocol = { workspace = true, features = ["discovery"] } -restate-storage-query-datafusion = { workspace = true } +restate-storage-query-datafusion = { workspace = true, optional = true } restate-types = { workspace = true, features = ["schemars"] } restate-utoipa = { workspace = true } restate-wal-protocol = { workspace = true } @@ -40,7 +41,7 @@ axum = { workspace = true, features = ["json"] } bytes = { workspace = true } bytestring = { workspace = true } codederror = { workspace = true } -datafusion = { workspace = true } +datafusion = { workspace = true, optional = true } derive_builder = { workspace = true } derive_more = { workspace = true } enumset = { workspace = true } diff --git a/crates/admin/src/lib.rs b/crates/admin/src/lib.rs index 5e1588e52a..a25610b16b 100644 --- a/crates/admin/src/lib.rs +++ b/crates/admin/src/lib.rs @@ -16,6 +16,7 @@ mod rest_api; mod schema_registry; pub mod service; mod state; +#[cfg(feature = "storage-query")] mod storage_query; #[cfg(feature = "serve-web-ui")] mod web_ui; diff --git a/crates/admin/src/service.rs b/crates/admin/src/service.rs index 3073153b1d..bd7e2f34c4 100644 --- a/crates/admin/src/service.rs +++ b/crates/admin/src/service.rs @@ -8,8 +8,6 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::sync::Arc; - use axum::error_handling::HandleErrorLayer; use http::StatusCode; use restate_admin_rest_model::version::AdminApiVersion; @@ -21,12 +19,11 @@ use tower::ServiceBuilder; use restate_core::network::net_util; use restate_core::MetadataWriter; use restate_service_protocol::discovery::ServiceDiscovery; -use restate_storage_query_datafusion::context::QueryContext; use restate_types::net::BindAddress; use restate_types::schema::subscriptions::SubscriptionValidator; use crate::schema_registry::SchemaRegistry; -use crate::{rest_api, state, storage_query}; +use crate::{rest_api, state}; #[derive(Debug, thiserror::Error)] #[error("could not create the service client: {0}")] @@ -35,7 +32,8 @@ pub struct BuildError(#[from] restate_service_client::BuildError); pub struct AdminService { bifrost: Bifrost, schema_registry: SchemaRegistry, - query_context: Option, + #[cfg(feature = "storage-query")] + query_context: Option, #[cfg(feature = "metadata-api")] metadata_writer: MetadataWriter, } @@ -50,7 +48,6 @@ where subscription_validator: V, service_discovery: ServiceDiscovery, experimental_feature_kafka_ingress_next: bool, - query_context: Option, ) -> Self { Self { bifrost, @@ -62,7 +59,19 @@ where subscription_validator, experimental_feature_kafka_ingress_next, ), - query_context, + #[cfg(feature = "storage-query")] + query_context: None, + } + } + + #[cfg(feature = "storage-query")] + pub fn with_query_context( + self, + query_context: restate_storage_query_datafusion::context::QueryContext, + ) -> Self { + Self { + query_context: Some(query_context), + ..self } } @@ -74,14 +83,14 @@ where let rest_state = state::AdminServiceState::new(self.schema_registry, self.bifrost); - let router = self - .query_context - .map(|query_context| { - let query_state = Arc::new(state::QueryServiceState { query_context }); + let router = axum::Router::new(); - axum::Router::new().merge(storage_query::create_router(query_state)) - }) - .unwrap_or_default(); + #[cfg(feature = "storage-query")] + let router = if let Some(query_context) = self.query_context { + router.merge(crate::storage_query::router(query_context)) + } else { + router + }; #[cfg(feature = "metadata-api")] let router = router.merge(crate::metadata_api::router( diff --git a/crates/admin/src/state.rs b/crates/admin/src/state.rs index 8460335c94..af4d1eef8b 100644 --- a/crates/admin/src/state.rs +++ b/crates/admin/src/state.rs @@ -10,7 +10,6 @@ use crate::schema_registry::SchemaRegistry; use restate_bifrost::Bifrost; -use restate_storage_query_datafusion::context::QueryContext; #[derive(Clone, derive_builder::Builder)] pub struct AdminServiceState { @@ -18,11 +17,6 @@ pub struct AdminServiceState { pub bifrost: Bifrost, } -#[derive(Clone)] -pub struct QueryServiceState { - pub query_context: QueryContext, -} - impl AdminServiceState { pub fn new(schema_registry: SchemaRegistry, bifrost: Bifrost) -> Self { Self { diff --git a/crates/admin/src/storage_query/mod.rs b/crates/admin/src/storage_query/mod.rs index 33b0802b0d..4a68ea0085 100644 --- a/crates/admin/src/storage_query/mod.rs +++ b/crates/admin/src/storage_query/mod.rs @@ -15,11 +15,18 @@ mod query; use axum::{routing::post, Router}; use std::sync::Arc; -use crate::state::QueryServiceState; +use restate_storage_query_datafusion::context::QueryContext; + +#[derive(Clone)] +pub struct QueryServiceState { + pub query_context: QueryContext, +} + +pub fn router(query_context: QueryContext) -> Router { + let query_state = Arc::new(QueryServiceState { query_context }); -pub fn create_router(state: Arc) -> Router<()> { // Setup the router axum::Router::new() .route("/query", post(query::query)) - .with_state(state) + .with_state(query_state) } diff --git a/crates/admin/src/storage_query/query.rs b/crates/admin/src/storage_query/query.rs index da19a4bba7..0bdb62ea37 100644 --- a/crates/admin/src/storage_query/query.rs +++ b/crates/admin/src/storage_query/query.rs @@ -36,7 +36,7 @@ use serde_with::serde_as; use super::convert::{ConvertRecordBatchStream, V1_CONVERTER}; use super::error::StorageQueryError; -use crate::state::QueryServiceState; +use super::QueryServiceState; #[serde_as] #[derive(Debug, Deserialize, JsonSchema)] diff --git a/crates/bifrost/Cargo.toml b/crates/bifrost/Cargo.toml index a1a38766d5..237eab71a8 100644 --- a/crates/bifrost/Cargo.toml +++ b/crates/bifrost/Cargo.toml @@ -56,7 +56,7 @@ xxhash-rust = { workspace = true, features = ["xxh3"] } restate-core = { workspace = true, features = ["test-util"] } restate-log-server = { workspace = true } restate-metadata-server = { workspace = true } -restate-storage-api = { workspace = true } +restate-storage-api = { workspace = true } restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } restate-wal-protocol = { workspace = true, features = ["serde"] } @@ -76,7 +76,7 @@ tracing-subscriber = { workspace = true } tracing-test = { workspace = true } [target.'cfg(not(target_env = "msvc"))'.dev-dependencies] -tikv-jemallocator = { workspace = true, features = ["unprefixed_malloc_on_supported_platforms", "profiling"] } +tikv-jemallocator = { workspace = true } [[bench]] diff --git a/crates/node/Cargo.toml b/crates/node/Cargo.toml index 87870fb125..da0de30352 100644 --- a/crates/node/Cargo.toml +++ b/crates/node/Cargo.toml @@ -20,7 +20,7 @@ options_schema = [ [dependencies] workspace-hack = { version = "0.1", path = "../../workspace-hack" } -restate-admin = { workspace = true } +restate-admin = { workspace = true, features = ["storage-query"]} restate-bifrost = { workspace = true } restate-core = { workspace = true } restate-ingress-http = { workspace = true } diff --git a/crates/node/src/roles/admin.rs b/crates/node/src/roles/admin.rs index 9c2483627a..f18576de2e 100644 --- a/crates/node/src/roles/admin.rs +++ b/crates/node/src/roles/admin.rs @@ -106,8 +106,8 @@ impl AdminRole { config.ingress.clone(), service_discovery, config.ingress.experimental_feature_kafka_ingress_next(), - Some(query_context), - ); + ) + .with_query_context(query_context); let controller = if config.admin.is_cluster_controller_enabled() { Some(cluster_controller::Service::new( diff --git a/crates/rocksdb/Cargo.toml b/crates/rocksdb/Cargo.toml index 07f9c59506..6aae0f23eb 100644 --- a/crates/rocksdb/Cargo.toml +++ b/crates/rocksdb/Cargo.toml @@ -38,6 +38,10 @@ threadpool = { version = "1.8" } tokio = { workspace = true } tracing = { workspace = true } +[target.'cfg(not(target_env = "msvc"))'.dependencies] +# rocksdb = { workspace = true } brings this in implicitly, but we want to ensure it uses the workspace features +tikv-jemalloc-sys = { workspace = true } + [dev-dependencies] restate-test-util = { workspace = true } restate-types = { workspace = true, features = ["test-util"] } diff --git a/server/Cargo.toml b/server/Cargo.toml index 36c1a0d8e3..da4290af6f 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -101,7 +101,7 @@ serde_json = { workspace = true } url = { workspace = true } [target.'cfg(not(target_env = "msvc"))'.dependencies] -tikv-jemallocator = { workspace = true, features = ["unprefixed_malloc_on_supported_platforms", "profiling"] } +tikv-jemallocator = { workspace = true } [build-dependencies] vergen = { version = "8.0.0", default-features = false, features = [ diff --git a/tools/xtask/src/main.rs b/tools/xtask/src/main.rs index eff2434c6b..90a66976cc 100644 --- a/tools/xtask/src/main.rs +++ b/tools/xtask/src/main.rs @@ -115,7 +115,6 @@ async fn generate_rest_api_doc() -> anyhow::Result<()> { .unwrap(), ), false, - None, ); TaskCenter::spawn( diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index ef545164dd..6abaa743f7 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -81,7 +81,7 @@ rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa", "hybrid", "meta", "nfa", "perf", "unicode"] } regex-syntax = { version = "0.8" } -reqwest = { version = "0.12", default-features = false, features = ["blocking", "http2", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] } +reqwest = { version = "0.12", default-features = false, features = ["http2", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] } ring = { version = "0.17", features = ["std"] } rustls-647d43efb71741da = { package = "rustls", version = "0.21" } schemars = { version = "0.8", features = ["bytes", "enumset", "preserve_order"] } @@ -185,7 +185,7 @@ rand_core = { version = "0.6", default-features = false, features = ["std"] } regex = { version = "1" } regex-automata = { version = "0.4", default-features = false, features = ["dfa", "hybrid", "meta", "nfa", "perf", "unicode"] } regex-syntax = { version = "0.8" } -reqwest = { version = "0.12", default-features = false, features = ["blocking", "http2", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] } +reqwest = { version = "0.12", default-features = false, features = ["http2", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] } ring = { version = "0.17", features = ["std"] } rustls-647d43efb71741da = { package = "rustls", version = "0.21" } schemars = { version = "0.8", features = ["bytes", "enumset", "preserve_order"] } @@ -235,8 +235,6 @@ prost = { version = "0.13", default-features = false, features = ["no-recursion- rustix = { version = "0.38", features = ["fs", "stdio", "termios"] } rustls-2b5c6dc72f624058 = { package = "rustls", version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] } signal-hook-mio = { version = "0.2", default-features = false, features = ["support-v0_8", "support-v1_0"] } -tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] } -tikv-jemallocator = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] } tower-d8f496e17d97b5cb = { package = "tower", version = "0.5", default-features = false, features = ["timeout"] } [target.x86_64-unknown-linux-gnu.build-dependencies] @@ -252,8 +250,6 @@ prost = { version = "0.13", default-features = false, features = ["no-recursion- rustix = { version = "0.38", features = ["fs", "stdio", "termios"] } rustls-2b5c6dc72f624058 = { package = "rustls", version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] } signal-hook-mio = { version = "0.2", default-features = false, features = ["support-v0_8", "support-v1_0"] } -tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] } -tikv-jemallocator = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] } tower-d8f496e17d97b5cb = { package = "tower", version = "0.5", default-features = false, features = ["timeout"] } [target.aarch64-apple-darwin.dependencies] @@ -269,8 +265,6 @@ prost = { version = "0.13", default-features = false, features = ["no-recursion- rustix = { version = "0.38", features = ["fs", "stdio", "termios"] } rustls-2b5c6dc72f624058 = { package = "rustls", version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] } signal-hook-mio = { version = "0.2", default-features = false, features = ["support-v0_8", "support-v1_0"] } -tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] } -tikv-jemallocator = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] } tower-d8f496e17d97b5cb = { package = "tower", version = "0.5", default-features = false, features = ["timeout"] } [target.aarch64-apple-darwin.build-dependencies] @@ -286,8 +280,6 @@ prost = { version = "0.13", default-features = false, features = ["no-recursion- rustix = { version = "0.38", features = ["fs", "stdio", "termios"] } rustls-2b5c6dc72f624058 = { package = "rustls", version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] } signal-hook-mio = { version = "0.2", default-features = false, features = ["support-v0_8", "support-v1_0"] } -tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] } -tikv-jemallocator = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] } tower-d8f496e17d97b5cb = { package = "tower", version = "0.5", default-features = false, features = ["timeout"] } ### END HAKARI SECTION