Skip to content

Commit

Permalink
chore: Remove temp catalog, session vars (mostly) from remote session (
Browse files Browse the repository at this point in the history
…#1866)

Related to distributed execution.

Goal: Try to make the remote session as stateless as possible to allow
for it to execute any plan for a specific database.

There's still some need for `SessionVars` when creating the datafusion
runtime/context, but I just used the default for now. It's likely that
we'll just be provided a well-typed struct for session configuration
soonish.
  • Loading branch information
scsmithr authored Oct 3, 2023
1 parent 7a72492 commit d810ced
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 16 deletions.
9 changes: 2 additions & 7 deletions crates/rpcsrv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ use crate::{
};
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion::arrow::ipc::writer::FileWriter as IpcFileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::{arrow::ipc::writer::FileWriter as IpcFileWriter, variable::VarType};
use datafusion_ext::vars::SessionVars;
use futures::{Stream, StreamExt};
use protogen::{
gen::rpcsrv::service::{self, BroadcastExchangeResponse},
Expand Down Expand Up @@ -92,13 +91,9 @@ impl RpcHandler {
let conn_id = Uuid::new_v4();
info!(session_id=%conn_id, "initializing remote session");

let vars = SessionVars::default()
.with_database_id(db_id, VarType::System)
.with_connection_id(conn_id, VarType::System);

let context = self
.engine
.new_remote_session_context(vars, storage_conf)
.new_remote_session_context(conn_id, db_id, storage_conf)
.await?;

let sess = RemoteSession::new(context);
Expand Down
2 changes: 2 additions & 0 deletions crates/sqlexec/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{errors::Result, metastore::catalog::SessionCatalog};
///
/// If `memory_limit_bytes` in session varables is non-zero, a new memory pool
/// will be created with the max set to this value.
// TODO: Remove `vars`.
pub(crate) fn new_datafusion_runtime_env(
vars: &SessionVars,
catalog: &SessionCatalog,
Expand Down Expand Up @@ -76,6 +77,7 @@ pub(crate) fn new_datafusion_runtime_env(

/// Create a new datafusion config opts common to both local and remote
/// sessions.
// TODO: Remove `vars`.
pub(crate) fn new_datafusion_session_config_opts(vars: &SessionVars) -> ConfigOptions {
// NOTE: We handle catalog/schema defaults and information schemas
// ourselves.
Expand Down
15 changes: 10 additions & 5 deletions crates/sqlexec/src/context/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
dispatch::external::ExternalDispatcher,
errors::{ExecError, Result},
extension_codec::GlareDBExtensionCodec,
metastore::catalog::{CatalogMutator, SessionCatalog, TempCatalog},
metastore::catalog::{CatalogMutator, SessionCatalog},
remote::{provider_cache::ProviderCache, staged_stream::StagedClientStreams},
};

Expand All @@ -26,6 +26,10 @@ use super::{new_datafusion_runtime_env, new_datafusion_session_config_opts};
/// A lightweight session context used during remote execution of physical
/// plans.
///
/// This context should be stateless in that it should not be tied to any one
/// specific session. This context should be able to execute physical plans from
/// any session for this partition database.
///
/// Datafusion extensions:
/// - StagedClientStreams
pub struct RemoteSessionContext {
Expand All @@ -44,13 +48,16 @@ pub struct RemoteSessionContext {
impl RemoteSessionContext {
/// Create a new remote session context.
pub fn new(
vars: SessionVars,
catalog: SessionCatalog,
catalog_mutator: CatalogMutator,
native_tables: NativeTableStorage,
background_jobs: JobRunner,
spill_path: Option<PathBuf>,
) -> Result<Self> {
// TODO: We'll want to remove this eventually. We should be able to
// create a datafusion context/runtime without needing these vars.
let vars = SessionVars::default();

let runtime = new_datafusion_runtime_env(&vars, &catalog, spill_path)?;
let opts = new_datafusion_session_config_opts(&vars);
let mut conf: SessionConfig = opts.into();
Expand All @@ -59,13 +66,11 @@ impl RemoteSessionContext {
conf = conf
.with_extension(Arc::new(StagedClientStreams::default()))
.with_extension(Arc::new(catalog_mutator))
.with_extension(Arc::new(native_tables.clone()))
.with_extension(Arc::new(TempCatalog::default()));
.with_extension(Arc::new(native_tables.clone()));

// TODO: Query planners for handling custom plans.

let df_ctx = DfSessionContext::with_config_rt(conf, Arc::new(runtime));
df_ctx.register_variable(datafusion::variable::VarType::UserDefined, Arc::new(vars));

Ok(RemoteSessionContext {
catalog,
Expand Down
6 changes: 2 additions & 4 deletions crates/sqlexec/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,10 @@ impl Engine {
/// since we don't guarantee that sessions get closed).
pub async fn new_remote_session_context(
&self,
vars: SessionVars,
conn_id: Uuid,
database_id: Uuid,
storage: SessionStorageConfig,
) -> Result<RemoteSessionContext> {
let conn_id = vars.connection_id();
let database_id = vars.database_id();
let metastore = self.supervisor.init_client(conn_id, database_id).await?;
let native = self
.storage
Expand All @@ -186,7 +185,6 @@ impl Engine {
let catalog = SessionCatalog::new(state);

let context = RemoteSessionContext::new(
vars,
catalog,
metastore.into(),
native,
Expand Down

0 comments on commit d810ced

Please sign in to comment.