From b2a7f450c816ad6a3ef28e34cfbc339cc7e18b68 Mon Sep 17 00:00:00 2001 From: Sean Smith Date: Mon, 2 Oct 2023 11:55:09 -0500 Subject: [PATCH] chore: Remove temp catalog, session vars (mostly) from remote session Goal: Try to make the remote session as stateless as possible to allow for it to execute any plan for a specific database. There's still some need for `SessionVars` when creating the datafusion runtime/context, but I just used the default for now. It's likely that we'll just be provided a well-typed struct for session configuration soonish. --- crates/rpcsrv/src/handler.rs | 9 ++------- crates/sqlexec/src/context/mod.rs | 2 ++ crates/sqlexec/src/context/remote.rs | 15 ++++++++++----- crates/sqlexec/src/engine.rs | 6 ++---- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/crates/rpcsrv/src/handler.rs b/crates/rpcsrv/src/handler.rs index 6d96e43e9..4236bb43c 100644 --- a/crates/rpcsrv/src/handler.rs +++ b/crates/rpcsrv/src/handler.rs @@ -4,10 +4,9 @@ use crate::{ }; use async_trait::async_trait; use dashmap::DashMap; +use datafusion::arrow::ipc::writer::FileWriter as IpcFileWriter; use datafusion::arrow::record_batch::RecordBatch; use datafusion::physical_plan::SendableRecordBatchStream; -use datafusion::{arrow::ipc::writer::FileWriter as IpcFileWriter, variable::VarType}; -use datafusion_ext::vars::SessionVars; use futures::{Stream, StreamExt}; use protogen::{ gen::rpcsrv::service::{self, BroadcastExchangeResponse}, @@ -92,13 +91,9 @@ impl RpcHandler { let conn_id = Uuid::new_v4(); info!(session_id=%conn_id, "initializing remote session"); - let vars = SessionVars::default() - .with_database_id(db_id, VarType::System) - .with_connection_id(conn_id, VarType::System); - let context = self .engine - .new_remote_session_context(vars, storage_conf) + .new_remote_session_context(conn_id, db_id, storage_conf) .await?; let sess = RemoteSession::new(context); diff --git a/crates/sqlexec/src/context/mod.rs b/crates/sqlexec/src/context/mod.rs index 38110655f..b85d48842 100644 --- a/crates/sqlexec/src/context/mod.rs +++ b/crates/sqlexec/src/context/mod.rs @@ -34,6 +34,7 @@ use crate::{errors::Result, metastore::catalog::SessionCatalog}; /// /// If `memory_limit_bytes` in session varables is non-zero, a new memory pool /// will be created with the max set to this value. +// TODO: Remove `vars`. pub(crate) fn new_datafusion_runtime_env( vars: &SessionVars, catalog: &SessionCatalog, @@ -76,6 +77,7 @@ pub(crate) fn new_datafusion_runtime_env( /// Create a new datafusion config opts common to both local and remote /// sessions. +// TODO: Remove `vars`. pub(crate) fn new_datafusion_session_config_opts(vars: &SessionVars) -> ConfigOptions { // NOTE: We handle catalog/schema defaults and information schemas // ourselves. diff --git a/crates/sqlexec/src/context/remote.rs b/crates/sqlexec/src/context/remote.rs index f3d844799..19b2d0687 100644 --- a/crates/sqlexec/src/context/remote.rs +++ b/crates/sqlexec/src/context/remote.rs @@ -17,7 +17,7 @@ use crate::{ dispatch::external::ExternalDispatcher, errors::{ExecError, Result}, extension_codec::GlareDBExtensionCodec, - metastore::catalog::{CatalogMutator, SessionCatalog, TempCatalog}, + metastore::catalog::{CatalogMutator, SessionCatalog}, remote::{provider_cache::ProviderCache, staged_stream::StagedClientStreams}, }; @@ -26,6 +26,10 @@ use super::{new_datafusion_runtime_env, new_datafusion_session_config_opts}; /// A lightweight session context used during remote execution of physical /// plans. /// +/// This context should be stateless in that it should not be tied to any one +/// specific session. This context should be able to execute physical plans from +/// any session for this partition database. +/// /// Datafusion extensions: /// - StagedClientStreams pub struct RemoteSessionContext { @@ -44,13 +48,16 @@ pub struct RemoteSessionContext { impl RemoteSessionContext { /// Create a new remote session context. pub fn new( - vars: SessionVars, catalog: SessionCatalog, catalog_mutator: CatalogMutator, native_tables: NativeTableStorage, background_jobs: JobRunner, spill_path: Option, ) -> Result { + // TODO: We'll want to remove this eventually. We should be able to + // create a datafusion context/runtime without needing these vars. + let vars = SessionVars::default(); + let runtime = new_datafusion_runtime_env(&vars, &catalog, spill_path)?; let opts = new_datafusion_session_config_opts(&vars); let mut conf: SessionConfig = opts.into(); @@ -59,13 +66,11 @@ impl RemoteSessionContext { conf = conf .with_extension(Arc::new(StagedClientStreams::default())) .with_extension(Arc::new(catalog_mutator)) - .with_extension(Arc::new(native_tables.clone())) - .with_extension(Arc::new(TempCatalog::default())); + .with_extension(Arc::new(native_tables.clone())); // TODO: Query planners for handling custom plans. let df_ctx = DfSessionContext::with_config_rt(conf, Arc::new(runtime)); - df_ctx.register_variable(datafusion::variable::VarType::UserDefined, Arc::new(vars)); Ok(RemoteSessionContext { catalog, diff --git a/crates/sqlexec/src/engine.rs b/crates/sqlexec/src/engine.rs index 0deeeee20..a01b1ef5e 100644 --- a/crates/sqlexec/src/engine.rs +++ b/crates/sqlexec/src/engine.rs @@ -172,11 +172,10 @@ impl Engine { /// since we don't guarantee that sessions get closed). pub async fn new_remote_session_context( &self, - vars: SessionVars, + conn_id: Uuid, + database_id: Uuid, storage: SessionStorageConfig, ) -> Result { - let conn_id = vars.connection_id(); - let database_id = vars.database_id(); let metastore = self.supervisor.init_client(conn_id, database_id).await?; let native = self .storage @@ -186,7 +185,6 @@ impl Engine { let catalog = SessionCatalog::new(state); let context = RemoteSessionContext::new( - vars, catalog, metastore.into(), native,