diff --git a/common/management/src/lib.rs b/common/management/src/lib.rs index 42adedd799109..6d3b660e8e6f4 100644 --- a/common/management/src/lib.rs +++ b/common/management/src/lib.rs @@ -20,11 +20,11 @@ mod user; pub use cluster::ClusterApi; pub use cluster::ClusterMgr; +pub use role::RoleApi; pub use role::RoleMgr; -pub use role::RoleMgrApi; +pub use stage::StageApi; pub use stage::StageMgr; -pub use stage::StageMgrApi; +pub use udf::UdfApi; pub use udf::UdfMgr; -pub use udf::UdfMgrApi; -pub use user::user_api::UserMgrApi; +pub use user::user_api::UserApi; pub use user::user_mgr::UserMgr; diff --git a/common/management/src/role/mod.rs b/common/management/src/role/mod.rs index d1a52eccb3d97..2e924ca1bfbbf 100644 --- a/common/management/src/role/mod.rs +++ b/common/management/src/role/mod.rs @@ -15,5 +15,5 @@ mod role_api; mod role_mgr; -pub use role_api::RoleMgrApi; +pub use role_api::RoleApi; pub use role_mgr::RoleMgr; diff --git a/common/management/src/role/role_api.rs b/common/management/src/role/role_api.rs index 31896720104c9..f352d57d04f9f 100644 --- a/common/management/src/role/role_api.rs +++ b/common/management/src/role/role_api.rs @@ -19,7 +19,7 @@ use common_meta_types::SeqV; use common_meta_types::UserPrivilegeSet; #[async_trait::async_trait] -pub trait RoleMgrApi: Sync + Send { +pub trait RoleApi: Sync + Send { async fn add_role(&self, role_info: &RoleInfo) -> Result; async fn get_role(&self, role_name: &str, seq: Option) -> Result>; diff --git a/common/management/src/role/role_mgr.rs b/common/management/src/role/role_mgr.rs index 01a467f204dd1..5e522f13fb13f 100644 --- a/common/management/src/role/role_mgr.rs +++ b/common/management/src/role/role_mgr.rs @@ -29,7 +29,7 @@ use common_meta_types::SeqV; use common_meta_types::UpsertKVAction; use common_meta_types::UserPrivilegeSet; -use crate::role::role_api::RoleMgrApi; +use crate::role::role_api::RoleApi; static ROLE_API_KEY_PREFIX: &str = "__fd_roles"; @@ -79,7 +79,7 @@ impl RoleMgr { } #[async_trait::async_trait] -impl RoleMgrApi for RoleMgr { +impl RoleApi for RoleMgr { async fn add_role(&self, role_info: &RoleInfo) -> common_exception::Result { let match_seq = MatchSeq::Exact(0); let key = format!("{}/{}", self.role_prefix, &role_info.name); diff --git a/common/management/src/stage/mod.rs b/common/management/src/stage/mod.rs index d5567c413f418..37448a73b7a7e 100644 --- a/common/management/src/stage/mod.rs +++ b/common/management/src/stage/mod.rs @@ -15,5 +15,5 @@ mod stage_api; mod stage_mgr; -pub use stage_api::StageMgrApi; +pub use stage_api::StageApi; pub use stage_mgr::StageMgr; diff --git a/common/management/src/stage/stage_api.rs b/common/management/src/stage/stage_api.rs index 2536c054fa085..598e589596c78 100644 --- a/common/management/src/stage/stage_api.rs +++ b/common/management/src/stage/stage_api.rs @@ -17,7 +17,7 @@ use common_meta_types::SeqV; use common_meta_types::UserStageInfo; #[async_trait::async_trait] -pub trait StageMgrApi: Sync + Send { +pub trait StageApi: Sync + Send { // Add a stage info to /tenant/stage-name. async fn add_stage(&self, stage: UserStageInfo) -> Result; diff --git a/common/management/src/stage/stage_mgr.rs b/common/management/src/stage/stage_mgr.rs index 08e7929ec2208..74159fc4d2f9b 100644 --- a/common/management/src/stage/stage_mgr.rs +++ b/common/management/src/stage/stage_mgr.rs @@ -26,7 +26,7 @@ use common_meta_types::SeqV; use common_meta_types::UpsertKVAction; use common_meta_types::UserStageInfo; -use crate::stage::StageMgrApi; +use crate::stage::StageApi; static USER_STAGE_API_KEY_PREFIX: &str = "__fd_stages"; @@ -46,7 +46,7 @@ impl StageMgr { } #[async_trait::async_trait] -impl StageMgrApi for StageMgr { +impl StageApi for StageMgr { async fn add_stage(&self, info: UserStageInfo) -> Result { let seq = MatchSeq::Exact(0); let val = Operation::Update(serde_json::to_vec(&info)?); diff --git a/common/management/src/udf/mod.rs b/common/management/src/udf/mod.rs index 066c75a21b31b..d2e6e2ebb2957 100644 --- a/common/management/src/udf/mod.rs +++ b/common/management/src/udf/mod.rs @@ -15,5 +15,5 @@ mod udf_api; mod udf_mgr; -pub use udf_api::UdfMgrApi; +pub use udf_api::UdfApi; pub use udf_mgr::UdfMgr; diff --git a/common/management/src/udf/udf_api.rs b/common/management/src/udf/udf_api.rs index 8821f463b603e..2357b22c7d953 100644 --- a/common/management/src/udf/udf_api.rs +++ b/common/management/src/udf/udf_api.rs @@ -17,7 +17,7 @@ use common_meta_types::SeqV; use common_meta_types::UserDefinedFunction; #[async_trait::async_trait] -pub trait UdfMgrApi: Sync + Send { +pub trait UdfApi: Sync + Send { // Add a UDF to /tenant/udf-name. async fn add_udf(&self, udf: UserDefinedFunction) -> Result; diff --git a/common/management/src/udf/udf_mgr.rs b/common/management/src/udf/udf_mgr.rs index caff9a6be1aa4..3fc58a7e73ce8 100644 --- a/common/management/src/udf/udf_mgr.rs +++ b/common/management/src/udf/udf_mgr.rs @@ -28,7 +28,7 @@ use common_meta_types::SeqV; use common_meta_types::UpsertKVAction; use common_meta_types::UserDefinedFunction; -use crate::udf::UdfMgrApi; +use crate::udf::UdfApi; static UDF_API_KEY_PREFIX: &str = "__fd_udfs"; @@ -48,7 +48,7 @@ impl UdfMgr { } #[async_trait::async_trait] -impl UdfMgrApi for UdfMgr { +impl UdfApi for UdfMgr { async fn add_udf(&self, info: UserDefinedFunction) -> Result { if is_builtin_function(info.name.as_str()) { return Err(ErrorCode::UdfAlreadyExists(format!( diff --git a/common/management/src/user/user_api.rs b/common/management/src/user/user_api.rs index d18440cad5bea..bc56f02879ef7 100644 --- a/common/management/src/user/user_api.rs +++ b/common/management/src/user/user_api.rs @@ -20,7 +20,7 @@ use common_meta_types::UserInfo; use common_meta_types::UserPrivilegeSet; #[async_trait::async_trait] -pub trait UserMgrApi: Sync + Send { +pub trait UserApi: Sync + Send { async fn add_user(&self, user_info: UserInfo) -> Result; async fn get_user( diff --git a/common/management/src/user/user_mgr.rs b/common/management/src/user/user_mgr.rs index e0cead5e97326..5a6d1f1fee3ca 100644 --- a/common/management/src/user/user_mgr.rs +++ b/common/management/src/user/user_mgr.rs @@ -30,7 +30,7 @@ use common_meta_types::UpsertKVAction; use common_meta_types::UserInfo; use common_meta_types::UserPrivilegeSet; -use crate::user::user_api::UserMgrApi; +use crate::user::user_api::UserApi; static USER_API_KEY_PREFIX: &str = "__fd_users"; @@ -81,7 +81,7 @@ impl UserMgr { } #[async_trait::async_trait] -impl UserMgrApi for UserMgr { +impl UserApi for UserMgr { async fn add_user(&self, user_info: UserInfo) -> common_exception::Result { let match_seq = MatchSeq::Exact(0); let user_key = format_user_key(&user_info.name, &user_info.hostname); diff --git a/query/src/sessions/query_ctx_shared.rs b/query/src/sessions/query_ctx_shared.rs index 989dffea8f6f8..65769c7113ca6 100644 --- a/query/src/sessions/query_ctx_shared.rs +++ b/query/src/sessions/query_ctx_shared.rs @@ -217,6 +217,6 @@ impl QueryContextShared { impl Session { pub(in crate::sessions) fn destroy_context_shared(&self) { - self.session_ctx.take_context_shared(); + self.session_ctx.take_query_context_shared(); } } diff --git a/query/src/sessions/session.rs b/query/src/sessions/session.rs index 26659d28be8fb..886221e3610e5 100644 --- a/query/src/sessions/session.rs +++ b/query/src/sessions/session.rs @@ -78,7 +78,7 @@ impl Session { pub fn kill(self: &Arc) { let session_ctx = self.session_ctx.clone(); session_ctx.set_abort(true); - if session_ctx.context_shared_is_none() { + if session_ctx.query_context_shared_is_none() { if let Some(io_shutdown) = session_ctx.take_io_shutdown_tx() { let (tx, rx) = oneshot::channel(); if io_shutdown.send(tx).is_ok() { @@ -97,7 +97,7 @@ impl Session { pub fn force_kill_query(self: &Arc) { let session_ctx = self.session_ctx.clone(); - if let Some(context_shared) = session_ctx.take_context_shared() { + if let Some(context_shared) = session_ctx.take_query_context_shared() { context_shared.kill(/* shutdown executing query */); } } @@ -106,7 +106,7 @@ impl Session { /// For a query, execution environment(e.g cluster) should be immutable. /// We can bind the environment to the context in create_context method. pub async fn create_query_context(self: &Arc) -> Result> { - let query_ctx = self.session_ctx.get_context_shared(); + let query_ctx = self.session_ctx.get_query_context_shared(); Ok(match query_ctx.as_ref() { Some(shared) => QueryContext::create_from_shared(shared.clone()), @@ -118,11 +118,12 @@ impl Session { let cluster = discovery.discover().await?; let shared = QueryContextShared::try_create(config, session, cluster)?; - let query_ctx = self.session_ctx.get_context_shared(); + let query_ctx = self.session_ctx.get_query_context_shared(); match query_ctx.as_ref() { Some(shared) => QueryContext::create_from_shared(shared.clone()), None => { - self.session_ctx.set_context_shared(Some(shared.clone())); + self.session_ctx + .set_query_context_shared(Some(shared.clone())); QueryContext::create_from_shared(shared) } } diff --git a/query/src/sessions/session_ctx.rs b/query/src/sessions/session_ctx.rs index e571aae445408..8e89a7cf02cfd 100644 --- a/query/src/sessions/session_ctx.rs +++ b/query/src/sessions/session_ctx.rs @@ -39,7 +39,7 @@ pub struct SessionContext { #[ignore_malloc_size_of = "insignificant"] io_shutdown_tx: RwLock>>>, #[ignore_malloc_size_of = "insignificant"] - context_shared: RwLock>>, + query_context_shared: RwLock>>, } impl SessionContext { @@ -52,7 +52,7 @@ impl SessionContext { current_database: RwLock::new("default".to_string()), session_settings: RwLock::new(Settings::try_create()?.as_ref().clone()), io_shutdown_tx: Default::default(), - context_shared: Default::default(), + query_context_shared: Default::default(), }) } @@ -128,24 +128,24 @@ impl SessionContext { lock.take() } - pub fn context_shared_is_none(&self) -> bool { - let lock = self.context_shared.read(); + pub fn query_context_shared_is_none(&self) -> bool { + let lock = self.query_context_shared.read(); lock.is_none() } - pub fn get_context_shared(&self) -> Option> { - let lock = self.context_shared.read(); + pub fn get_query_context_shared(&self) -> Option> { + let lock = self.query_context_shared.read(); lock.clone() } - pub fn set_context_shared(&self, ctx: Option>) { - let mut lock = self.context_shared.write(); + pub fn set_query_context_shared(&self, ctx: Option>) { + let mut lock = self.query_context_shared.write(); *lock = ctx } // Take the context_shared. - pub fn take_context_shared(&self) -> Option> { - let mut lock = self.context_shared.write(); + pub fn take_query_context_shared(&self) -> Option> { + let mut lock = self.query_context_shared.write(); lock.take() } } diff --git a/query/src/sessions/session_info.rs b/query/src/sessions/session_info.rs index 8c3b249ca986d..99831da75451c 100644 --- a/query/src/sessions/session_info.rs +++ b/query/src/sessions/session_info.rs @@ -46,7 +46,7 @@ impl Session { fn to_process_info(self: &Arc, status: &SessionContext) -> ProcessInfo { let mut memory_usage = 0; - if let Some(shared) = &status.get_context_shared() { + if let Some(shared) = &status.get_query_context_shared() { if let Ok(runtime) = shared.try_get_runtime() { let runtime_tracker = runtime.get_tracker(); let runtime_memory_tracker = runtime_tracker.get_memory_tracker(); @@ -70,7 +70,7 @@ impl Session { } fn process_state(self: &Arc, status: &SessionContext) -> String { - match status.get_context_shared() { + match status.get_query_context_shared() { _ if status.get_abort() => String::from("Aborting"), None => String::from("Idle"), Some(_) => String::from("Query"), @@ -86,27 +86,27 @@ impl Session { fn rpc_extra_info(status: &SessionContext) -> Option { status - .get_context_shared() + .get_query_context_shared() .map(|_| String::from("Partial cluster query stage")) } fn query_extra_info(status: &SessionContext) -> Option { status - .get_context_shared() + .get_query_context_shared() .as_ref() .map(|context_shared| context_shared.get_query_str()) } fn query_dal_metrics(status: &SessionContext) -> Option { status - .get_context_shared() + .get_query_context_shared() .as_ref() .map(|context_shared| context_shared.dal_ctx.get_metrics()) } fn query_scan_progress_value(status: &SessionContext) -> Option { status - .get_context_shared() + .get_query_context_shared() .as_ref() .map(|context_shared| context_shared.scan_progress.get_values()) } diff --git a/query/src/sessions/session_mgr.rs b/query/src/sessions/session_mgr.rs index acda70590a09b..7451cfd1f6df8 100644 --- a/query/src/sessions/session_mgr.rs +++ b/query/src/sessions/session_mgr.rs @@ -44,7 +44,7 @@ pub struct SessionManager { pub(in crate::sessions) conf: Config, pub(in crate::sessions) discovery: Arc, pub(in crate::sessions) catalog: Arc, - pub(in crate::sessions) user: Arc, + pub(in crate::sessions) user_manager: Arc, pub(in crate::sessions) auth_manager: Arc, pub(in crate::sessions) http_query_manager: Arc, @@ -71,7 +71,7 @@ impl SessionManager { catalog, conf, discovery, - user, + user_manager: user, http_query_manager, auth_manager, max_sessions: max_active_sessions, @@ -98,7 +98,7 @@ impl SessionManager { /// Get the user api provider. pub fn get_user_manager(self: &Arc) -> Arc { - self.user.clone() + self.user_manager.clone() } pub fn get_catalog(self: &Arc) -> Arc { diff --git a/query/src/users/user_api.rs b/query/src/users/user_api.rs index 98d0cca47be94..e23e32267603d 100644 --- a/query/src/users/user_api.rs +++ b/query/src/users/user_api.rs @@ -15,14 +15,14 @@ use std::sync::Arc; use common_exception::Result; +use common_management::RoleApi; use common_management::RoleMgr; -use common_management::RoleMgrApi; +use common_management::StageApi; use common_management::StageMgr; -use common_management::StageMgrApi; +use common_management::UdfApi; use common_management::UdfMgr; -use common_management::UdfMgrApi; +use common_management::UserApi; use common_management::UserMgr; -use common_management::UserMgrApi; use common_meta_api::KVApi; use crate::common::MetaClientProvider; @@ -33,35 +33,27 @@ pub struct UserApiProvider { } impl UserApiProvider { - async fn create_kv_client(cfg: &Config) -> Result> { - match MetaClientProvider::new(cfg.meta.to_grpc_client_config()) - .try_get_kv_client() - .await - { - Ok(client) => Ok(client), - Err(cause) => Err(cause.add_message_back("(while create user api).")), - } - } - pub async fn create_global(conf: Config) -> Result> { - let client = UserApiProvider::create_kv_client(&conf).await?; + let client = MetaClientProvider::new(conf.meta.to_grpc_client_config()) + .try_get_kv_client() + .await?; Ok(Arc::new(UserApiProvider { client })) } - pub fn get_user_api_client(&self, tenant: &str) -> Arc { + pub fn get_user_api_client(&self, tenant: &str) -> Arc { Arc::new(UserMgr::new(self.client.clone(), tenant)) } - pub fn get_role_api_client(&self, tenant: &str) -> Arc { + pub fn get_role_api_client(&self, tenant: &str) -> Arc { Arc::new(RoleMgr::new(self.client.clone(), tenant)) } - pub fn get_stage_api_client(&self, tenant: &str) -> Arc { + pub fn get_stage_api_client(&self, tenant: &str) -> Arc { Arc::new(StageMgr::new(self.client.clone(), tenant)) } - pub fn get_udf_api_client(&self, tenant: &str) -> Arc { + pub fn get_udf_api_client(&self, tenant: &str) -> Arc { Arc::new(UdfMgr::new(self.client.clone(), tenant)) } } diff --git a/query/tests/it/sessions/session_context.rs b/query/tests/it/sessions/session_context.rs index 48f95abce5c47..277d560ccd039 100644 --- a/query/tests/it/sessions/session_context.rs +++ b/query/tests/it/sessions/session_context.rs @@ -89,14 +89,14 @@ fn test_session_context() -> Result<()> { Cluster::empty(), )?; - session_ctx.set_context_shared(Some(shared.clone())); - let val = session_ctx.get_context_shared(); + session_ctx.set_query_context_shared(Some(shared.clone())); + let val = session_ctx.get_query_context_shared(); assert_eq!(shared.conf, val.unwrap().conf); - let val = session_ctx.take_context_shared(); + let val = session_ctx.take_query_context_shared(); assert_eq!(shared.conf, val.unwrap().conf); - let val = session_ctx.get_context_shared(); + let val = session_ctx.get_query_context_shared(); assert!(val.is_none()); }