Skip to content

Commit

Permalink
Restatectl unification (#2634)
Browse files Browse the repository at this point in the history
* Make datafusion optional in restate_admin

* Unify tikv-jemalloc-sys features between restate-server and restatectl

* Disable resolver functionality in jsonschema

This can download schemas over internet, read files - not ideal

* Update workspace hack
  • Loading branch information
jackkleeman authored Feb 5, 2025
1 parent 106b4de commit f7b1e5a
Show file tree
Hide file tree
Showing 15 changed files with 54 additions and 49 deletions.
5 changes: 1 addition & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ hyper-rustls = { version = "0.27.2", default-features = false, features = [
hyper-util = { version = "0.1" }
indexmap = "2.7"
itertools = "0.14.0"
jsonschema = "0.28.3"
jsonschema = { version = "0.28.3", default-features = false }
metrics = { version = "0.24" }
metrics-tracing-context = { version = "0.18.0" }
metrics-exporter-prometheus = { version = "0.16", default-features = false, features = [
Expand Down Expand Up @@ -192,7 +192,8 @@ test-log = { version = "0.2.11", default-features = false, features = [
"trace",
] }
toml = { version = "0.8.12" }
tikv-jemallocator = "0.6"
tikv-jemallocator = { version = "0.6", features = ["unprefixed_malloc_on_supported_platforms", "profiling"] }
tikv-jemalloc-sys = { version = "0.6", features = ["profiling"] }
thiserror = "2.0"
tokio = { version = "1.41.1", default-features = false, features = [
"rt-multi-thread",
Expand Down
5 changes: 3 additions & 2 deletions crates/admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ options_schema = ["restate-service-client/options_schema"]
memory-loglet = ["restate-bifrost/memory-loglet"]
replicated-loglet = ["restate-bifrost/replicated-loglet"]
serve-web-ui = ["restate-web-ui", "mime_guess"]
storage-query = ["dep:restate-storage-query-datafusion", "dep:datafusion"]
metadata-api = []

[dependencies]
Expand All @@ -28,7 +29,7 @@ restate-futures-util = { workspace = true }
restate-metadata-server = { workspace = true }
restate-service-client = { workspace = true }
restate-service-protocol = { workspace = true, features = ["discovery"] }
restate-storage-query-datafusion = { workspace = true }
restate-storage-query-datafusion = { workspace = true, optional = true }
restate-types = { workspace = true, features = ["schemars"] }
restate-utoipa = { workspace = true }
restate-wal-protocol = { workspace = true }
Expand All @@ -40,7 +41,7 @@ axum = { workspace = true, features = ["json"] }
bytes = { workspace = true }
bytestring = { workspace = true }
codederror = { workspace = true }
datafusion = { workspace = true }
datafusion = { workspace = true, optional = true }
derive_builder = { workspace = true }
derive_more = { workspace = true }
enumset = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions crates/admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod rest_api;
mod schema_registry;
pub mod service;
mod state;
#[cfg(feature = "storage-query")]
mod storage_query;
#[cfg(feature = "serve-web-ui")]
mod web_ui;
Expand Down
37 changes: 23 additions & 14 deletions crates/admin/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::sync::Arc;

use axum::error_handling::HandleErrorLayer;
use http::StatusCode;
use restate_admin_rest_model::version::AdminApiVersion;
Expand All @@ -21,12 +19,11 @@ use tower::ServiceBuilder;
use restate_core::network::net_util;
use restate_core::MetadataWriter;
use restate_service_protocol::discovery::ServiceDiscovery;
use restate_storage_query_datafusion::context::QueryContext;
use restate_types::net::BindAddress;
use restate_types::schema::subscriptions::SubscriptionValidator;

use crate::schema_registry::SchemaRegistry;
use crate::{rest_api, state, storage_query};
use crate::{rest_api, state};

#[derive(Debug, thiserror::Error)]
#[error("could not create the service client: {0}")]
Expand All @@ -35,7 +32,8 @@ pub struct BuildError(#[from] restate_service_client::BuildError);
pub struct AdminService<V> {
bifrost: Bifrost,
schema_registry: SchemaRegistry<V>,
query_context: Option<QueryContext>,
#[cfg(feature = "storage-query")]
query_context: Option<restate_storage_query_datafusion::context::QueryContext>,
#[cfg(feature = "metadata-api")]
metadata_writer: MetadataWriter,
}
Expand All @@ -50,7 +48,6 @@ where
subscription_validator: V,
service_discovery: ServiceDiscovery,
experimental_feature_kafka_ingress_next: bool,
query_context: Option<QueryContext>,
) -> Self {
Self {
bifrost,
Expand All @@ -62,7 +59,19 @@ where
subscription_validator,
experimental_feature_kafka_ingress_next,
),
query_context,
#[cfg(feature = "storage-query")]
query_context: None,
}
}

#[cfg(feature = "storage-query")]
pub fn with_query_context(
self,
query_context: restate_storage_query_datafusion::context::QueryContext,
) -> Self {
Self {
query_context: Some(query_context),
..self
}
}

Expand All @@ -74,14 +83,14 @@ where

let rest_state = state::AdminServiceState::new(self.schema_registry, self.bifrost);

let router = self
.query_context
.map(|query_context| {
let query_state = Arc::new(state::QueryServiceState { query_context });
let router = axum::Router::new();

axum::Router::new().merge(storage_query::create_router(query_state))
})
.unwrap_or_default();
#[cfg(feature = "storage-query")]
let router = if let Some(query_context) = self.query_context {
router.merge(crate::storage_query::router(query_context))
} else {
router
};

#[cfg(feature = "metadata-api")]
let router = router.merge(crate::metadata_api::router(
Expand Down
6 changes: 0 additions & 6 deletions crates/admin/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,13 @@

use crate::schema_registry::SchemaRegistry;
use restate_bifrost::Bifrost;
use restate_storage_query_datafusion::context::QueryContext;

#[derive(Clone, derive_builder::Builder)]
pub struct AdminServiceState<V> {
pub schema_registry: SchemaRegistry<V>,
pub bifrost: Bifrost,
}

#[derive(Clone)]
pub struct QueryServiceState {
pub query_context: QueryContext,
}

impl<V> AdminServiceState<V> {
pub fn new(schema_registry: SchemaRegistry<V>, bifrost: Bifrost) -> Self {
Self {
Expand Down
13 changes: 10 additions & 3 deletions crates/admin/src/storage_query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@ mod query;
use axum::{routing::post, Router};
use std::sync::Arc;

use crate::state::QueryServiceState;
use restate_storage_query_datafusion::context::QueryContext;

#[derive(Clone)]
pub struct QueryServiceState {
pub query_context: QueryContext,
}

pub fn router(query_context: QueryContext) -> Router {
let query_state = Arc::new(QueryServiceState { query_context });

pub fn create_router(state: Arc<QueryServiceState>) -> Router<()> {
// Setup the router
axum::Router::new()
.route("/query", post(query::query))
.with_state(state)
.with_state(query_state)
}
2 changes: 1 addition & 1 deletion crates/admin/src/storage_query/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use serde_with::serde_as;

use super::convert::{ConvertRecordBatchStream, V1_CONVERTER};
use super::error::StorageQueryError;
use crate::state::QueryServiceState;
use super::QueryServiceState;

#[serde_as]
#[derive(Debug, Deserialize, JsonSchema)]
Expand Down
4 changes: 2 additions & 2 deletions crates/bifrost/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ xxhash-rust = { workspace = true, features = ["xxh3"] }
restate-core = { workspace = true, features = ["test-util"] }
restate-log-server = { workspace = true }
restate-metadata-server = { workspace = true }
restate-storage-api = { workspace = true }
restate-storage-api = { workspace = true }
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
restate-wal-protocol = { workspace = true, features = ["serde"] }
Expand All @@ -76,7 +76,7 @@ tracing-subscriber = { workspace = true }
tracing-test = { workspace = true }

[target.'cfg(not(target_env = "msvc"))'.dev-dependencies]
tikv-jemallocator = { workspace = true, features = ["unprefixed_malloc_on_supported_platforms", "profiling"] }
tikv-jemallocator = { workspace = true }


[[bench]]
Expand Down
2 changes: 1 addition & 1 deletion crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ options_schema = [
[dependencies]
workspace-hack = { version = "0.1", path = "../../workspace-hack" }

restate-admin = { workspace = true }
restate-admin = { workspace = true, features = ["storage-query"]}
restate-bifrost = { workspace = true }
restate-core = { workspace = true }
restate-ingress-http = { workspace = true }
Expand Down
4 changes: 2 additions & 2 deletions crates/node/src/roles/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ impl<T: TransportConnect> AdminRole<T> {
config.ingress.clone(),
service_discovery,
config.ingress.experimental_feature_kafka_ingress_next(),
Some(query_context),
);
)
.with_query_context(query_context);

let controller = if config.admin.is_cluster_controller_enabled() {
Some(cluster_controller::Service::new(
Expand Down
4 changes: 4 additions & 0 deletions crates/rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ threadpool = { version = "1.8" }
tokio = { workspace = true }
tracing = { workspace = true }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
# rocksdb = { workspace = true } brings this in implicitly, but we want to ensure it uses the workspace features
tikv-jemalloc-sys = { workspace = true }

[dev-dependencies]
restate-test-util = { workspace = true }
restate-types = { workspace = true, features = ["test-util"] }
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ serde_json = { workspace = true }
url = { workspace = true }

[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = { workspace = true, features = ["unprefixed_malloc_on_supported_platforms", "profiling"] }
tikv-jemallocator = { workspace = true }

[build-dependencies]
vergen = { version = "8.0.0", default-features = false, features = [
Expand Down
1 change: 0 additions & 1 deletion tools/xtask/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ async fn generate_rest_api_doc() -> anyhow::Result<()> {
.unwrap(),
),
false,
None,
);

TaskCenter::spawn(
Expand Down
12 changes: 2 additions & 10 deletions workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ rand_core = { version = "0.6", default-features = false, features = ["std"] }
regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa", "hybrid", "meta", "nfa", "perf", "unicode"] }
regex-syntax = { version = "0.8" }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "http2", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
reqwest = { version = "0.12", default-features = false, features = ["http2", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
ring = { version = "0.17", features = ["std"] }
rustls-647d43efb71741da = { package = "rustls", version = "0.21" }
schemars = { version = "0.8", features = ["bytes", "enumset", "preserve_order"] }
Expand Down Expand Up @@ -185,7 +185,7 @@ rand_core = { version = "0.6", default-features = false, features = ["std"] }
regex = { version = "1" }
regex-automata = { version = "0.4", default-features = false, features = ["dfa", "hybrid", "meta", "nfa", "perf", "unicode"] }
regex-syntax = { version = "0.8" }
reqwest = { version = "0.12", default-features = false, features = ["blocking", "http2", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
reqwest = { version = "0.12", default-features = false, features = ["http2", "json", "rustls-tls", "rustls-tls-native-roots", "stream"] }
ring = { version = "0.17", features = ["std"] }
rustls-647d43efb71741da = { package = "rustls", version = "0.21" }
schemars = { version = "0.8", features = ["bytes", "enumset", "preserve_order"] }
Expand Down Expand Up @@ -235,8 +235,6 @@ prost = { version = "0.13", default-features = false, features = ["no-recursion-
rustix = { version = "0.38", features = ["fs", "stdio", "termios"] }
rustls-2b5c6dc72f624058 = { package = "rustls", version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] }
signal-hook-mio = { version = "0.2", default-features = false, features = ["support-v0_8", "support-v1_0"] }
tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemallocator = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] }
tower-d8f496e17d97b5cb = { package = "tower", version = "0.5", default-features = false, features = ["timeout"] }

[target.x86_64-unknown-linux-gnu.build-dependencies]
Expand All @@ -252,8 +250,6 @@ prost = { version = "0.13", default-features = false, features = ["no-recursion-
rustix = { version = "0.38", features = ["fs", "stdio", "termios"] }
rustls-2b5c6dc72f624058 = { package = "rustls", version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] }
signal-hook-mio = { version = "0.2", default-features = false, features = ["support-v0_8", "support-v1_0"] }
tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemallocator = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] }
tower-d8f496e17d97b5cb = { package = "tower", version = "0.5", default-features = false, features = ["timeout"] }

[target.aarch64-apple-darwin.dependencies]
Expand All @@ -269,8 +265,6 @@ prost = { version = "0.13", default-features = false, features = ["no-recursion-
rustix = { version = "0.38", features = ["fs", "stdio", "termios"] }
rustls-2b5c6dc72f624058 = { package = "rustls", version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] }
signal-hook-mio = { version = "0.2", default-features = false, features = ["support-v0_8", "support-v1_0"] }
tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemallocator = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] }
tower-d8f496e17d97b5cb = { package = "tower", version = "0.5", default-features = false, features = ["timeout"] }

[target.aarch64-apple-darwin.build-dependencies]
Expand All @@ -286,8 +280,6 @@ prost = { version = "0.13", default-features = false, features = ["no-recursion-
rustix = { version = "0.38", features = ["fs", "stdio", "termios"] }
rustls-2b5c6dc72f624058 = { package = "rustls", version = "0.23", default-features = false, features = ["logging", "ring", "std", "tls12"] }
signal-hook-mio = { version = "0.2", default-features = false, features = ["support-v0_8", "support-v1_0"] }
tikv-jemalloc-sys = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] }
tikv-jemallocator = { version = "0.6", features = ["profiling", "unprefixed_malloc_on_supported_platforms"] }
tower-d8f496e17d97b5cb = { package = "tower", version = "0.5", default-features = false, features = ["timeout"] }

### END HAKARI SECTION

0 comments on commit f7b1e5a

Please sign in to comment.