Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Remove temp catalog, session vars (mostly) from remote session #1866

Merged
merged 1 commit into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions crates/rpcsrv/src/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ use crate::{
};
use async_trait::async_trait;
use dashmap::DashMap;
use datafusion::arrow::ipc::writer::FileWriter as IpcFileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::physical_plan::SendableRecordBatchStream;
use datafusion::{arrow::ipc::writer::FileWriter as IpcFileWriter, variable::VarType};
use datafusion_ext::vars::SessionVars;
use futures::{Stream, StreamExt};
use protogen::{
gen::rpcsrv::service::{self, BroadcastExchangeResponse},
Expand Down Expand Up @@ -92,13 +91,9 @@ impl RpcHandler {
let conn_id = Uuid::new_v4();
info!(session_id=%conn_id, "initializing remote session");

let vars = SessionVars::default()
.with_database_id(db_id, VarType::System)
.with_connection_id(conn_id, VarType::System);

let context = self
.engine
.new_remote_session_context(vars, storage_conf)
.new_remote_session_context(conn_id, db_id, storage_conf)
.await?;

let sess = RemoteSession::new(context);
Expand Down
2 changes: 2 additions & 0 deletions crates/sqlexec/src/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use crate::{errors::Result, metastore::catalog::SessionCatalog};
///
/// If `memory_limit_bytes` in session varables is non-zero, a new memory pool
/// will be created with the max set to this value.
// TODO: Remove `vars`.
pub(crate) fn new_datafusion_runtime_env(
vars: &SessionVars,
catalog: &SessionCatalog,
Expand Down Expand Up @@ -76,6 +77,7 @@ pub(crate) fn new_datafusion_runtime_env(

/// Create a new datafusion config opts common to both local and remote
/// sessions.
// TODO: Remove `vars`.
pub(crate) fn new_datafusion_session_config_opts(vars: &SessionVars) -> ConfigOptions {
// NOTE: We handle catalog/schema defaults and information schemas
// ourselves.
Expand Down
15 changes: 10 additions & 5 deletions crates/sqlexec/src/context/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
dispatch::external::ExternalDispatcher,
errors::{ExecError, Result},
extension_codec::GlareDBExtensionCodec,
metastore::catalog::{CatalogMutator, SessionCatalog, TempCatalog},
metastore::catalog::{CatalogMutator, SessionCatalog},
remote::{provider_cache::ProviderCache, staged_stream::StagedClientStreams},
};

Expand All @@ -26,6 +26,10 @@ use super::{new_datafusion_runtime_env, new_datafusion_session_config_opts};
/// A lightweight session context used during remote execution of physical
/// plans.
///
/// This context should be stateless in that it should not be tied to any one
/// specific session. This context should be able to execute physical plans from
/// any session for this partition database.
///
/// Datafusion extensions:
/// - StagedClientStreams
pub struct RemoteSessionContext {
Expand All @@ -44,13 +48,16 @@ pub struct RemoteSessionContext {
impl RemoteSessionContext {
/// Create a new remote session context.
pub fn new(
vars: SessionVars,
catalog: SessionCatalog,
catalog_mutator: CatalogMutator,
native_tables: NativeTableStorage,
background_jobs: JobRunner,
spill_path: Option<PathBuf>,
) -> Result<Self> {
// TODO: We'll want to remove this eventually. We should be able to
// create a datafusion context/runtime without needing these vars.
let vars = SessionVars::default();

let runtime = new_datafusion_runtime_env(&vars, &catalog, spill_path)?;
let opts = new_datafusion_session_config_opts(&vars);
let mut conf: SessionConfig = opts.into();
Expand All @@ -59,13 +66,11 @@ impl RemoteSessionContext {
conf = conf
.with_extension(Arc::new(StagedClientStreams::default()))
.with_extension(Arc::new(catalog_mutator))
.with_extension(Arc::new(native_tables.clone()))
.with_extension(Arc::new(TempCatalog::default()));
.with_extension(Arc::new(native_tables.clone()));

// TODO: Query planners for handling custom plans.

let df_ctx = DfSessionContext::with_config_rt(conf, Arc::new(runtime));
df_ctx.register_variable(datafusion::variable::VarType::UserDefined, Arc::new(vars));

Ok(RemoteSessionContext {
catalog,
Expand Down
6 changes: 2 additions & 4 deletions crates/sqlexec/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,10 @@ impl Engine {
/// since we don't guarantee that sessions get closed).
pub async fn new_remote_session_context(
&self,
vars: SessionVars,
conn_id: Uuid,
database_id: Uuid,
storage: SessionStorageConfig,
) -> Result<RemoteSessionContext> {
let conn_id = vars.connection_id();
let database_id = vars.database_id();
let metastore = self.supervisor.init_client(conn_id, database_id).await?;
let native = self
.storage
Expand All @@ -186,7 +185,6 @@ impl Engine {
let catalog = SessionCatalog::new(state);

let context = RemoteSessionContext::new(
vars,
catalog,
metastore.into(),
native,
Expand Down