From ef84d36568bcd6765acc012907c6e108e2485879 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Fri, 28 Jan 2022 17:33:39 +0800 Subject: [PATCH 1/2] SESSIONS-3933: Refine the file and field name --- query/benches/suites/mod.rs | 2 +- query/src/api/http/v1/cluster.rs | 2 +- query/src/api/http/v1/logs.rs | 2 +- query/src/api/rpc/flight_dispatcher.rs | 4 +- .../servers/clickhouse/interactive_worker.rs | 2 +- .../clickhouse/interactive_worker_base.rs | 2 +- query/src/servers/http/v1/load.rs | 2 +- .../servers/http/v1/query/execute_state.rs | 2 +- .../servers/mysql/mysql_interactive_worker.rs | 4 +- query/src/sessions/mod.rs | 19 +++--- .../src/sessions/{context.rs => query_ctx.rs} | 13 ++-- ...{context_shared.rs => query_ctx_shared.rs} | 2 +- query/src/sessions/session.rs | 68 +++++++++---------- .../{session_status.rs => session_ctx.rs} | 8 +-- query/src/sessions/session_info.rs | 20 +++--- .../sessions/{sessions.rs => session_mgr.rs} | 9 +++ query/src/sessions/session_ref.rs | 2 +- query/src/sessions/sessions_info.rs | 29 -------- query/tests/it/sessions/session_status.rs | 36 +++++----- 19 files changed, 102 insertions(+), 126 deletions(-) rename query/src/sessions/{context.rs => query_ctx.rs} (97%) rename query/src/sessions/{context_shared.rs => query_ctx_shared.rs} (99%) rename query/src/sessions/{session_status.rs => session_ctx.rs} (97%) rename query/src/sessions/{sessions.rs => session_mgr.rs} (97%) delete mode 100644 query/src/sessions/sessions_info.rs diff --git a/query/benches/suites/mod.rs b/query/benches/suites/mod.rs index 179b3853e507..51cbde0556a5 100644 --- a/query/benches/suites/mod.rs +++ b/query/benches/suites/mod.rs @@ -30,7 +30,7 @@ pub mod bench_sort_query_sql; pub async fn select_executor(sql: &str) -> Result<()> { let sessions = SessionManager::from_conf(Config::default()).await?; let executor_session = sessions.create_session("Benches")?; - let ctx = executor_session.create_context().await?; + let ctx = executor_session.create_query_context().await?; if let PlanNode::Select(plan) = PlanParser::parse(sql, ctx.clone()).await? { let executor = SelectInterpreter::try_create(ctx, plan)?; diff --git a/query/src/api/http/v1/cluster.rs b/query/src/api/http/v1/cluster.rs index 8a525b005767..2b1f2613d9a8 100644 --- a/query/src/api/http/v1/cluster.rs +++ b/query/src/api/http/v1/cluster.rs @@ -43,6 +43,6 @@ pub async fn cluster_list_handler( async fn list_nodes(sessions: &Arc) -> Result>> { let watch_cluster_session = sessions.create_session("WatchCluster")?; - let watch_cluster_context = watch_cluster_session.create_context().await?; + let watch_cluster_context = watch_cluster_session.create_query_context().await?; Ok(watch_cluster_context.get_cluster().get_nodes()) } diff --git a/query/src/api/http/v1/logs.rs b/query/src/api/http/v1/logs.rs index 5a844f10b00e..db996248f743 100644 --- a/query/src/api/http/v1/logs.rs +++ b/query/src/api/http/v1/logs.rs @@ -43,7 +43,7 @@ pub async fn logs_handler( async fn select_table(sessions: &Arc) -> Result { let session = sessions.create_session("WatchLogs")?; - let query_context = session.create_context().await?; + let query_context = session.create_query_context().await?; let mut tracing_table_stream = execute_query(query_context).await?; let stream = async_stream::try_stream! { diff --git a/query/src/api/rpc/flight_dispatcher.rs b/query/src/api/rpc/flight_dispatcher.rs index 9e3cd063e67b..eaaa3a7fcd20 100644 --- a/query/src/api/rpc/flight_dispatcher.rs +++ b/query/src/api/rpc/flight_dispatcher.rs @@ -128,7 +128,7 @@ impl DatabendQueryFlightDispatcher { #[tracing::instrument(level = "debug", skip_all, fields(session.id = session.get_id().as_str()))] async fn one_sink_action(&self, session: SessionRef, action: &FlightAction) -> Result<()> { - let query_context = session.create_context().await?; + let query_context = session.create_query_context().await?; let action_context = QueryContext::create_from(query_context.clone()); let pipeline_builder = PipelineBuilder::create(action_context.clone()); @@ -184,7 +184,7 @@ impl DatabendQueryFlightDispatcher { where T: FlightScatter + Send + 'static, { - let query_context = session.create_context().await?; + let query_context = session.create_query_context().await?; let action_context = QueryContext::create_from(query_context.clone()); let pipeline_builder = PipelineBuilder::create(action_context.clone()); diff --git a/query/src/servers/clickhouse/interactive_worker.rs b/query/src/servers/clickhouse/interactive_worker.rs index cd416b538425..8921d31a9649 100644 --- a/query/src/servers/clickhouse/interactive_worker.rs +++ b/query/src/servers/clickhouse/interactive_worker.rs @@ -110,7 +110,7 @@ impl ClickHouseSession for InteractiveWorker { }; let user_info_auth = self .session - .get_sessions_manager() + .get_session_manager() .get_auth_manager() .auth(&credential) .await; diff --git a/query/src/servers/clickhouse/interactive_worker_base.rs b/query/src/servers/clickhouse/interactive_worker_base.rs index e8be63d99aa2..d7e2acf51761 100644 --- a/query/src/servers/clickhouse/interactive_worker_base.rs +++ b/query/src/servers/clickhouse/interactive_worker_base.rs @@ -63,7 +63,7 @@ impl InteractiveWorkerBase { let query = &ch_ctx.state.query; tracing::debug!("{}", query); - let ctx = session.create_context().await?; + let ctx = session.create_query_context().await?; ctx.attach_query_str(query); let plan = PlanParser::parse(query, ctx.clone()).await?; diff --git a/query/src/servers/http/v1/load.rs b/query/src/servers/http/v1/load.rs index 013124057f7f..dd7e99b65d6a 100644 --- a/query/src/servers/http/v1/load.rs +++ b/query/src/servers/http/v1/load.rs @@ -63,7 +63,7 @@ pub async fn streaming_load( session.set_current_user(user_info.0.clone()); let context = session - .create_context() + .create_query_context() .await .map_err(InternalServerError)?; let insert_sql = req diff --git a/query/src/servers/http/v1/query/execute_state.rs b/query/src/servers/http/v1/query/execute_state.rs index f5c7639ef180..17227d0e3150 100644 --- a/query/src/servers/http/v1/query/execute_state.rs +++ b/query/src/servers/http/v1/query/execute_state.rs @@ -154,7 +154,7 @@ impl ExecuteState { ) -> Result<(ExecutorRef, DataSchemaRef)> { let sql = &request.sql; let session = session_manager.create_session("http-statement")?; - let context = session.create_context().await?; + let context = session.create_query_context().await?; if let Some(db) = &request.session.database { context.set_current_database(db.clone()).await?; }; diff --git a/query/src/servers/mysql/mysql_interactive_worker.rs b/query/src/servers/mysql/mysql_interactive_worker.rs index be1f7b015938..eb64c212159e 100644 --- a/query/src/servers/mysql/mysql_interactive_worker.rs +++ b/query/src/servers/mysql/mysql_interactive_worker.rs @@ -214,7 +214,7 @@ impl InteractiveWorkerBase { let user_manager = self.session.get_user_manager(); let client_ip = info.user_client_address.split(':').collect::>()[0]; - let ctx = self.session.create_context().await?; + let ctx = self.session.create_query_context().await?; let user_info = user_manager .get_user_with_client_ip(&ctx.get_tenant(), user_name, client_ip) .await?; @@ -272,7 +272,7 @@ impl InteractiveWorkerBase { if self.federated_server_setup_set_or_jdbc_command(query) { Ok((vec![DataBlock::empty()], String::from(""))) } else { - let context = self.session.create_context().await?; + let context = self.session.create_query_context().await?; context.attach_query_str(query); let (plan, hints) = PlanParser::parse_with_hint(query, context.clone()).await; diff --git a/query/src/sessions/mod.rs b/query/src/sessions/mod.rs index a2c6a01aa925..22dead186988 100644 --- a/query/src/sessions/mod.rs +++ b/query/src/sessions/mod.rs @@ -15,23 +15,22 @@ #[macro_use] mod macros; -mod context; -mod context_shared; mod metrics; +mod query_ctx; +mod query_ctx_shared; mod session; +mod session_ctx; mod session_info; -mod session_ref; -mod session_status; #[allow(clippy::module_inception)] -mod sessions; -mod sessions_info; +mod session_mgr; +mod session_ref; mod settings; -pub use context::QueryContext; -pub use context_shared::QueryContextShared; +pub use query_ctx::QueryContext; +pub use query_ctx_shared::QueryContextShared; pub use session::Session; +pub use session_ctx::SessionContext; pub use session_info::ProcessInfo; +pub use session_mgr::SessionManager; pub use session_ref::SessionRef; -pub use session_status::MutableStatus; -pub use sessions::SessionManager; pub use settings::Settings; diff --git a/query/src/sessions/context.rs b/query/src/sessions/query_ctx.rs similarity index 97% rename from query/src/sessions/context.rs rename to query/src/sessions/query_ctx.rs index 146bcda2b807..808c2650a6e4 100644 --- a/query/src/sessions/context.rs +++ b/query/src/sessions/query_ctx.rs @@ -253,10 +253,7 @@ impl QueryContext { // Get user manager api. pub fn get_user_manager(self: &Arc) -> Arc { - self.shared - .session - .get_sessions_manager() - .get_user_manager() + self.shared.session.get_session_manager().get_user_manager() } // Get the current session. @@ -268,13 +265,13 @@ impl QueryContext { pub fn get_session_by_id(self: &Arc, id: &str) -> Option { self.shared .session - .get_sessions_manager() + .get_session_manager() .get_session_by_id(id) } // Get all the processes list info. pub fn get_processes_info(self: &Arc) -> Vec { - self.shared.session.get_sessions_manager().processes_info() + self.shared.session.get_session_manager().processes_info() } /// Get the data accessor metrics. @@ -289,12 +286,12 @@ impl QueryContext { /// Get the client socket address. pub fn get_client_address(&self) -> Option { - self.shared.session.mutable_state.get_client_host() + self.shared.session.session_ctx.get_client_host() } /// Get the storage cache manager pub fn get_storage_cache_manager(&self) -> &CacheManager { - self.shared.session.sessions.get_storage_cache_manager() + self.shared.session.session_mgr.get_storage_cache_manager() } // Get the storage data accessor by config. diff --git a/query/src/sessions/context_shared.rs b/query/src/sessions/query_ctx_shared.rs similarity index 99% rename from query/src/sessions/context_shared.rs rename to query/src/sessions/query_ctx_shared.rs index b020e056b2b7..989dffea8f6f 100644 --- a/query/src/sessions/context_shared.rs +++ b/query/src/sessions/query_ctx_shared.rs @@ -217,6 +217,6 @@ impl QueryContextShared { impl Session { pub(in crate::sessions) fn destroy_context_shared(&self) { - self.mutable_state.take_context_shared(); + self.session_ctx.take_context_shared(); } } diff --git a/query/src/sessions/session.rs b/query/src/sessions/session.rs index eb84547069b5..26659d28be8f 100644 --- a/query/src/sessions/session.rs +++ b/query/src/sessions/session.rs @@ -27,9 +27,9 @@ use futures::channel::*; use crate::catalogs::DatabaseCatalog; use crate::configs::Config; -use crate::sessions::context_shared::QueryContextShared; -use crate::sessions::MutableStatus; use crate::sessions::QueryContext; +use crate::sessions::QueryContextShared; +use crate::sessions::SessionContext; use crate::sessions::SessionManager; use crate::sessions::Settings; use crate::users::UserApiProvider; @@ -41,9 +41,9 @@ pub struct Session { #[ignore_malloc_size_of = "insignificant"] pub(in crate::sessions) config: Config, #[ignore_malloc_size_of = "insignificant"] - pub(in crate::sessions) sessions: Arc, + pub(in crate::sessions) session_mgr: Arc, pub(in crate::sessions) ref_count: Arc, - pub(in crate::sessions) mutable_state: Arc, + pub(in crate::sessions) session_ctx: Arc, } impl Session { @@ -51,15 +51,15 @@ impl Session { config: Config, id: String, typ: String, - sessions: Arc, + session_mgr: Arc, ) -> Result> { Ok(Arc::new(Session { id, typ, config, - sessions, + session_mgr, ref_count: Arc::new(AtomicUsize::new(0)), - mutable_state: Arc::new(MutableStatus::try_create()?), + session_ctx: Arc::new(SessionContext::try_create()?), })) } @@ -72,14 +72,14 @@ impl Session { } pub fn is_aborting(self: &Arc) -> bool { - self.mutable_state.get_abort() + self.session_ctx.get_abort() } pub fn kill(self: &Arc) { - let mutable_state = self.mutable_state.clone(); - mutable_state.set_abort(true); - if mutable_state.context_shared_is_none() { - if let Some(io_shutdown) = mutable_state.take_io_shutdown_tx() { + let session_ctx = self.session_ctx.clone(); + session_ctx.set_abort(true); + if session_ctx.context_shared_is_none() { + if let Some(io_shutdown) = session_ctx.take_io_shutdown_tx() { let (tx, rx) = oneshot::channel(); if io_shutdown.send(tx).is_ok() { // We ignore this error because the receiver is return cancelled error. @@ -95,9 +95,9 @@ impl Session { } pub fn force_kill_query(self: &Arc) { - let mutable_state = self.mutable_state.clone(); + let session_ctx = self.session_ctx.clone(); - if let Some(context_shared) = mutable_state.take_context_shared() { + if let Some(context_shared) = session_ctx.take_context_shared() { context_shared.kill(/* shutdown executing query */); } } @@ -105,24 +105,24 @@ impl Session { /// Create a query context for query. /// For a query, execution environment(e.g cluster) should be immutable. /// We can bind the environment to the context in create_context method. - pub async fn create_context(self: &Arc) -> Result> { - let context_shared = self.mutable_state.get_context_shared(); + pub async fn create_query_context(self: &Arc) -> Result> { + let query_ctx = self.session_ctx.get_context_shared(); - Ok(match context_shared.as_ref() { + Ok(match query_ctx.as_ref() { Some(shared) => QueryContext::create_from_shared(shared.clone()), None => { let config = self.config.clone(); - let discovery = self.sessions.get_cluster_discovery(); + let discovery = self.session_mgr.get_cluster_discovery(); let session = self.clone(); let cluster = discovery.discover().await?; let shared = QueryContextShared::try_create(config, session, cluster)?; - let ctx_shared = self.mutable_state.get_context_shared(); - match ctx_shared.as_ref() { + let query_ctx = self.session_ctx.get_context_shared(); + match query_ctx.as_ref() { Some(shared) => QueryContext::create_from_shared(shared.clone()), None => { - self.mutable_state.set_context_shared(Some(shared.clone())); + self.session_ctx.set_context_shared(Some(shared.clone())); QueryContext::create_from_shared(shared) } } @@ -133,8 +133,8 @@ impl Session { pub fn attach(self: &Arc, host: Option, io_shutdown: F) where F: FnOnce() + Send + 'static { let (tx, rx) = futures::channel::oneshot::channel(); - self.mutable_state.set_client_host(host); - self.mutable_state.set_io_shutdown_tx(Some(tx)); + self.session_ctx.set_client_host(host); + self.session_ctx.set_io_shutdown_tx(Some(tx)); common_base::tokio::spawn(async move { if let Ok(tx) = rx.await { @@ -145,29 +145,29 @@ impl Session { } pub fn set_current_database(self: &Arc, database_name: String) { - self.mutable_state.set_current_database(database_name); + self.session_ctx.set_current_database(database_name); } pub fn get_current_database(self: &Arc) -> String { - self.mutable_state.get_current_database() + self.session_ctx.get_current_database() } pub fn get_current_tenant(self: &Arc) -> String { - self.mutable_state.get_current_tenant() + self.session_ctx.get_current_tenant() } pub fn set_current_tenant(self: &Arc, tenant: String) { - self.mutable_state.set_current_tenant(tenant); + self.session_ctx.set_current_tenant(tenant); } pub fn get_current_user(self: &Arc) -> Result { - self.mutable_state + self.session_ctx .get_current_user() .ok_or_else(|| ErrorCode::AuthenticateFailure("unauthenticated")) } pub fn set_current_user(self: &Arc, user: UserInfo) { - self.mutable_state.set_current_user(user) + self.session_ctx.set_current_user(user) } pub fn validate_privilege( @@ -193,19 +193,19 @@ impl Session { } pub fn get_settings(self: &Arc) -> Arc { - self.mutable_state.get_settings() + self.session_ctx.get_settings() } - pub fn get_sessions_manager(self: &Arc) -> Arc { - self.sessions.clone() + pub fn get_session_manager(self: &Arc) -> Arc { + self.session_mgr.clone() } pub fn get_catalog(self: &Arc) -> Arc { - self.sessions.get_catalog() + self.session_mgr.get_catalog() } pub fn get_user_manager(self: &Arc) -> Arc { - self.sessions.get_user_manager() + self.session_mgr.get_user_manager() } pub fn get_memory_usage(self: &Arc) -> usize { diff --git a/query/src/sessions/session_status.rs b/query/src/sessions/session_ctx.rs similarity index 97% rename from query/src/sessions/session_status.rs rename to query/src/sessions/session_ctx.rs index b85e0cc473ef..e571aae44540 100644 --- a/query/src/sessions/session_status.rs +++ b/query/src/sessions/session_ctx.rs @@ -23,11 +23,11 @@ use common_macros::MallocSizeOf; use common_meta_types::UserInfo; use futures::channel::oneshot::Sender; -use crate::sessions::context_shared::QueryContextShared; +use crate::sessions::QueryContextShared; use crate::sessions::Settings; #[derive(MallocSizeOf)] -pub struct MutableStatus { +pub struct SessionContext { abort: AtomicBool, current_database: RwLock, current_tenant: RwLock, @@ -42,9 +42,9 @@ pub struct MutableStatus { context_shared: RwLock>>, } -impl MutableStatus { +impl SessionContext { pub fn try_create() -> Result { - Ok(MutableStatus { + Ok(SessionContext { abort: Default::default(), current_user: Default::default(), current_tenant: Default::default(), diff --git a/query/src/sessions/session_info.rs b/query/src/sessions/session_info.rs index 39ff6679efb0..8c3b249ca986 100644 --- a/query/src/sessions/session_info.rs +++ b/query/src/sessions/session_info.rs @@ -19,8 +19,8 @@ use common_base::ProgressValues; use common_dal::DalMetrics; use common_meta_types::UserInfo; -use crate::sessions::MutableStatus; use crate::sessions::Session; +use crate::sessions::SessionContext; use crate::sessions::Settings; pub struct ProcessInfo { @@ -39,11 +39,11 @@ pub struct ProcessInfo { impl Session { pub fn process_info(self: &Arc) -> ProcessInfo { - let session_mutable_state = self.mutable_state.clone(); - self.to_process_info(&session_mutable_state) + let session_ctx = self.session_ctx.clone(); + self.to_process_info(&session_ctx) } - fn to_process_info(self: &Arc, status: &MutableStatus) -> ProcessInfo { + fn to_process_info(self: &Arc, status: &SessionContext) -> ProcessInfo { let mut memory_usage = 0; if let Some(shared) = &status.get_context_shared() { @@ -69,7 +69,7 @@ impl Session { } } - fn process_state(self: &Arc, status: &MutableStatus) -> String { + fn process_state(self: &Arc, status: &SessionContext) -> String { match status.get_context_shared() { _ if status.get_abort() => String::from("Aborting"), None => String::from("Idle"), @@ -77,34 +77,34 @@ impl Session { } } - fn process_extra_info(self: &Arc, status: &MutableStatus) -> Option { + fn process_extra_info(self: &Arc, status: &SessionContext) -> Option { match self.typ.as_str() { "RPCSession" => Session::rpc_extra_info(status), _ => Session::query_extra_info(status), } } - fn rpc_extra_info(status: &MutableStatus) -> Option { + fn rpc_extra_info(status: &SessionContext) -> Option { status .get_context_shared() .map(|_| String::from("Partial cluster query stage")) } - fn query_extra_info(status: &MutableStatus) -> Option { + fn query_extra_info(status: &SessionContext) -> Option { status .get_context_shared() .as_ref() .map(|context_shared| context_shared.get_query_str()) } - fn query_dal_metrics(status: &MutableStatus) -> Option { + fn query_dal_metrics(status: &SessionContext) -> Option { status .get_context_shared() .as_ref() .map(|context_shared| context_shared.dal_ctx.get_metrics()) } - fn query_scan_progress_value(status: &MutableStatus) -> Option { + fn query_scan_progress_value(status: &SessionContext) -> Option { status .get_context_shared() .as_ref() diff --git a/query/src/sessions/sessions.rs b/query/src/sessions/session_mgr.rs similarity index 97% rename from query/src/sessions/sessions.rs rename to query/src/sessions/session_mgr.rs index 05dae2d41c87..acda70590a09 100644 --- a/query/src/sessions/sessions.rs +++ b/query/src/sessions/session_mgr.rs @@ -35,6 +35,7 @@ use crate::configs::Config; use crate::servers::http::v1::HttpQueryManager; use crate::sessions::session::Session; use crate::sessions::session_ref::SessionRef; +use crate::sessions::ProcessInfo; use crate::storages::cache::CacheManager; use crate::users::auth::auth_mgr::AuthMgr; use crate::users::UserApiProvider; @@ -230,4 +231,12 @@ impl SessionManager { } } } + + pub fn processes_info(self: &Arc) -> Vec { + self.active_sessions + .read() + .values() + .map(Session::process_info) + .collect::>() + } } diff --git a/query/src/sessions/session_ref.rs b/query/src/sessions/session_ref.rs index fbd4efe2037a..e8351e58b9f3 100644 --- a/query/src/sessions/session_ref.rs +++ b/query/src/sessions/session_ref.rs @@ -59,7 +59,7 @@ impl Session { if self.ref_count.fetch_sub(1, Ordering::Release) == 1 { std::sync::atomic::fence(Acquire); tracing::debug!("Destroy session {}", self.id); - self.sessions.destroy_session(&self.id); + self.session_mgr.destroy_session(&self.id); } } diff --git a/query/src/sessions/sessions_info.rs b/query/src/sessions/sessions_info.rs deleted file mode 100644 index e3d832f0ad74..000000000000 --- a/query/src/sessions/sessions_info.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::Arc; - -use crate::sessions::ProcessInfo; -use crate::sessions::Session; -use crate::sessions::SessionManager; - -impl SessionManager { - pub fn processes_info(self: &Arc) -> Vec { - self.active_sessions - .read() - .values() - .map(Session::process_info) - .collect::>() - } -} diff --git a/query/tests/it/sessions/session_status.rs b/query/tests/it/sessions/session_status.rs index d9a7a5912fe0..f78439a6b1b5 100644 --- a/query/tests/it/sessions/session_status.rs +++ b/query/tests/it/sessions/session_status.rs @@ -19,32 +19,32 @@ use std::sync::Arc; use common_exception::Result; use common_meta_types::UserInfo; use databend_query::clusters::Cluster; -use databend_query::sessions::MutableStatus; use databend_query::sessions::QueryContextShared; +use databend_query::sessions::SessionContext; use crate::tests::SessionManagerBuilder; #[test] fn test_session_status() -> Result<()> { - let mutable_status = MutableStatus::try_create()?; + let session_ctx = SessionContext::try_create()?; // Abort status. { - mutable_status.set_abort(true); - let val = mutable_status.get_abort(); + session_ctx.set_abort(true); + let val = session_ctx.get_abort(); assert!(val); } // Current database status. { - mutable_status.set_current_database("bend".to_string()); - let val = mutable_status.get_current_database(); + session_ctx.set_current_database("bend".to_string()); + let val = session_ctx.get_current_database(); assert_eq!("bend", val); } // Settings. { - let val = mutable_status.get_settings(); + let val = session_ctx.get_settings(); assert!(val.get_max_threads()? > 0); } @@ -52,30 +52,30 @@ fn test_session_status() -> Result<()> { { let demo = "127.0.0.1:80"; let server: SocketAddr = demo.parse().unwrap(); - mutable_status.set_client_host(Some(server)); + session_ctx.set_client_host(Some(server)); - let val = mutable_status.get_client_host(); + let val = session_ctx.get_client_host(); assert_eq!(Some(server), val); } // Current user. { let user_info = UserInfo::new_no_auth("user1".to_string(), "".to_string()); - mutable_status.set_current_user(user_info); + session_ctx.set_current_user(user_info); - let val = mutable_status.get_current_user().unwrap(); + let val = session_ctx.get_current_user().unwrap(); assert_eq!("user1".to_string(), val.name); } // io shutdown tx. { let (tx, _) = futures::channel::oneshot::channel(); - mutable_status.set_io_shutdown_tx(Some(tx)); + session_ctx.set_io_shutdown_tx(Some(tx)); - let val = mutable_status.take_io_shutdown_tx(); + let val = session_ctx.take_io_shutdown_tx(); assert!(val.is_some()); - let val = mutable_status.take_io_shutdown_tx(); + let val = session_ctx.take_io_shutdown_tx(); assert!(val.is_none()); } @@ -89,14 +89,14 @@ fn test_session_status() -> Result<()> { Cluster::empty(), )?; - mutable_status.set_context_shared(Some(shared.clone())); - let val = mutable_status.get_context_shared(); + session_ctx.set_context_shared(Some(shared.clone())); + let val = session_ctx.get_context_shared(); assert_eq!(shared.conf, val.unwrap().conf); - let val = mutable_status.take_context_shared(); + let val = session_ctx.take_context_shared(); assert_eq!(shared.conf, val.unwrap().conf); - let val = mutable_status.get_context_shared(); + let val = session_ctx.get_context_shared(); assert!(val.is_none()); } From dbd85f079754be59b0486466022366bb455ba01a Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Fri, 28 Jan 2022 17:39:28 +0800 Subject: [PATCH 2/2] query/tests/it/sessions/session_status.rs -> query/tests/it/sessions/session_context.rs --- query/tests/it/sessions/mod.rs | 2 +- .../tests/it/sessions/{session_status.rs => session_context.rs} | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) rename query/tests/it/sessions/{session_status.rs => session_context.rs} (98%) diff --git a/query/tests/it/sessions/mod.rs b/query/tests/it/sessions/mod.rs index 041d44a302e7..63e2fca24019 100644 --- a/query/tests/it/sessions/mod.rs +++ b/query/tests/it/sessions/mod.rs @@ -13,4 +13,4 @@ // limitations under the License. mod session; -mod session_status; +mod session_context; diff --git a/query/tests/it/sessions/session_status.rs b/query/tests/it/sessions/session_context.rs similarity index 98% rename from query/tests/it/sessions/session_status.rs rename to query/tests/it/sessions/session_context.rs index f78439a6b1b5..48f95abce5c4 100644 --- a/query/tests/it/sessions/session_status.rs +++ b/query/tests/it/sessions/session_context.rs @@ -25,7 +25,7 @@ use databend_query::sessions::SessionContext; use crate::tests::SessionManagerBuilder; #[test] -fn test_session_status() -> Result<()> { +fn test_session_context() -> Result<()> { let session_ctx = SessionContext::try_create()?; // Abort status.