Skip to content

Commit

Permalink
SESSIONS-3933: Refine function name [PATCH-2]
Browse files Browse the repository at this point in the history
  • Loading branch information
BohuTANG committed Jan 29, 2022
1 parent ffd5026 commit a9872b3
Show file tree
Hide file tree
Showing 19 changed files with 60 additions and 67 deletions.
8 changes: 4 additions & 4 deletions common/management/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ mod user;

pub use cluster::ClusterApi;
pub use cluster::ClusterMgr;
pub use role::RoleApi;
pub use role::RoleMgr;
pub use role::RoleMgrApi;
pub use stage::StageApi;
pub use stage::StageMgr;
pub use stage::StageMgrApi;
pub use udf::UdfApi;
pub use udf::UdfMgr;
pub use udf::UdfMgrApi;
pub use user::user_api::UserMgrApi;
pub use user::user_api::UserApi;
pub use user::user_mgr::UserMgr;
2 changes: 1 addition & 1 deletion common/management/src/role/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
mod role_api;
mod role_mgr;

pub use role_api::RoleMgrApi;
pub use role_api::RoleApi;
pub use role_mgr::RoleMgr;
2 changes: 1 addition & 1 deletion common/management/src/role/role_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use common_meta_types::SeqV;
use common_meta_types::UserPrivilegeSet;

#[async_trait::async_trait]
pub trait RoleMgrApi: Sync + Send {
pub trait RoleApi: Sync + Send {
async fn add_role(&self, role_info: &RoleInfo) -> Result<u64>;

async fn get_role(&self, role_name: &str, seq: Option<u64>) -> Result<SeqV<RoleInfo>>;
Expand Down
4 changes: 2 additions & 2 deletions common/management/src/role/role_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use common_meta_types::SeqV;
use common_meta_types::UpsertKVAction;
use common_meta_types::UserPrivilegeSet;

use crate::role::role_api::RoleMgrApi;
use crate::role::role_api::RoleApi;

static ROLE_API_KEY_PREFIX: &str = "__fd_roles";

Expand Down Expand Up @@ -79,7 +79,7 @@ impl RoleMgr {
}

#[async_trait::async_trait]
impl RoleMgrApi for RoleMgr {
impl RoleApi for RoleMgr {
async fn add_role(&self, role_info: &RoleInfo) -> common_exception::Result<u64> {
let match_seq = MatchSeq::Exact(0);
let key = format!("{}/{}", self.role_prefix, &role_info.name);
Expand Down
2 changes: 1 addition & 1 deletion common/management/src/stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
mod stage_api;
mod stage_mgr;

pub use stage_api::StageMgrApi;
pub use stage_api::StageApi;
pub use stage_mgr::StageMgr;
2 changes: 1 addition & 1 deletion common/management/src/stage/stage_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use common_meta_types::SeqV;
use common_meta_types::UserStageInfo;

#[async_trait::async_trait]
pub trait StageMgrApi: Sync + Send {
pub trait StageApi: Sync + Send {
// Add a stage info to /tenant/stage-name.
async fn add_stage(&self, stage: UserStageInfo) -> Result<u64>;

Expand Down
4 changes: 2 additions & 2 deletions common/management/src/stage/stage_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_meta_types::SeqV;
use common_meta_types::UpsertKVAction;
use common_meta_types::UserStageInfo;

use crate::stage::StageMgrApi;
use crate::stage::StageApi;

static USER_STAGE_API_KEY_PREFIX: &str = "__fd_stages";

Expand All @@ -46,7 +46,7 @@ impl StageMgr {
}

#[async_trait::async_trait]
impl StageMgrApi for StageMgr {
impl StageApi for StageMgr {
async fn add_stage(&self, info: UserStageInfo) -> Result<u64> {
let seq = MatchSeq::Exact(0);
let val = Operation::Update(serde_json::to_vec(&info)?);
Expand Down
2 changes: 1 addition & 1 deletion common/management/src/udf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@
mod udf_api;
mod udf_mgr;

pub use udf_api::UdfMgrApi;
pub use udf_api::UdfApi;
pub use udf_mgr::UdfMgr;
2 changes: 1 addition & 1 deletion common/management/src/udf/udf_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use common_meta_types::SeqV;
use common_meta_types::UserDefinedFunction;

#[async_trait::async_trait]
pub trait UdfMgrApi: Sync + Send {
pub trait UdfApi: Sync + Send {
// Add a UDF to /tenant/udf-name.
async fn add_udf(&self, udf: UserDefinedFunction) -> Result<u64>;

Expand Down
4 changes: 2 additions & 2 deletions common/management/src/udf/udf_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use common_meta_types::SeqV;
use common_meta_types::UpsertKVAction;
use common_meta_types::UserDefinedFunction;

use crate::udf::UdfMgrApi;
use crate::udf::UdfApi;

static UDF_API_KEY_PREFIX: &str = "__fd_udfs";

Expand All @@ -48,7 +48,7 @@ impl UdfMgr {
}

#[async_trait::async_trait]
impl UdfMgrApi for UdfMgr {
impl UdfApi for UdfMgr {
async fn add_udf(&self, info: UserDefinedFunction) -> Result<u64> {
if is_builtin_function(info.name.as_str()) {
return Err(ErrorCode::UdfAlreadyExists(format!(
Expand Down
2 changes: 1 addition & 1 deletion common/management/src/user/user_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_meta_types::UserInfo;
use common_meta_types::UserPrivilegeSet;

#[async_trait::async_trait]
pub trait UserMgrApi: Sync + Send {
pub trait UserApi: Sync + Send {
async fn add_user(&self, user_info: UserInfo) -> Result<u64>;

async fn get_user(
Expand Down
4 changes: 2 additions & 2 deletions common/management/src/user/user_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use common_meta_types::UpsertKVAction;
use common_meta_types::UserInfo;
use common_meta_types::UserPrivilegeSet;

use crate::user::user_api::UserMgrApi;
use crate::user::user_api::UserApi;

static USER_API_KEY_PREFIX: &str = "__fd_users";

Expand Down Expand Up @@ -81,7 +81,7 @@ impl UserMgr {
}

#[async_trait::async_trait]
impl UserMgrApi for UserMgr {
impl UserApi for UserMgr {
async fn add_user(&self, user_info: UserInfo) -> common_exception::Result<u64> {
let match_seq = MatchSeq::Exact(0);
let user_key = format_user_key(&user_info.name, &user_info.hostname);
Expand Down
2 changes: 1 addition & 1 deletion query/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,6 @@ impl QueryContextShared {

impl Session {
pub(in crate::sessions) fn destroy_context_shared(&self) {
self.session_ctx.take_context_shared();
self.session_ctx.take_query_context_shared();
}
}
11 changes: 6 additions & 5 deletions query/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl Session {
pub fn kill(self: &Arc<Self>) {
let session_ctx = self.session_ctx.clone();
session_ctx.set_abort(true);
if session_ctx.context_shared_is_none() {
if session_ctx.query_context_shared_is_none() {
if let Some(io_shutdown) = session_ctx.take_io_shutdown_tx() {
let (tx, rx) = oneshot::channel();
if io_shutdown.send(tx).is_ok() {
Expand All @@ -97,7 +97,7 @@ impl Session {
pub fn force_kill_query(self: &Arc<Self>) {
let session_ctx = self.session_ctx.clone();

if let Some(context_shared) = session_ctx.take_context_shared() {
if let Some(context_shared) = session_ctx.take_query_context_shared() {
context_shared.kill(/* shutdown executing query */);
}
}
Expand All @@ -106,7 +106,7 @@ impl Session {
/// For a query, execution environment(e.g cluster) should be immutable.
/// We can bind the environment to the context in create_context method.
pub async fn create_query_context(self: &Arc<Self>) -> Result<Arc<QueryContext>> {
let query_ctx = self.session_ctx.get_context_shared();
let query_ctx = self.session_ctx.get_query_context_shared();

Ok(match query_ctx.as_ref() {
Some(shared) => QueryContext::create_from_shared(shared.clone()),
Expand All @@ -118,11 +118,12 @@ impl Session {
let cluster = discovery.discover().await?;
let shared = QueryContextShared::try_create(config, session, cluster)?;

let query_ctx = self.session_ctx.get_context_shared();
let query_ctx = self.session_ctx.get_query_context_shared();
match query_ctx.as_ref() {
Some(shared) => QueryContext::create_from_shared(shared.clone()),
None => {
self.session_ctx.set_context_shared(Some(shared.clone()));
self.session_ctx
.set_query_context_shared(Some(shared.clone()));
QueryContext::create_from_shared(shared)
}
}
Expand Down
20 changes: 10 additions & 10 deletions query/src/sessions/session_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub struct SessionContext {
#[ignore_malloc_size_of = "insignificant"]
io_shutdown_tx: RwLock<Option<Sender<Sender<()>>>>,
#[ignore_malloc_size_of = "insignificant"]
context_shared: RwLock<Option<Arc<QueryContextShared>>>,
query_context_shared: RwLock<Option<Arc<QueryContextShared>>>,
}

impl SessionContext {
Expand All @@ -52,7 +52,7 @@ impl SessionContext {
current_database: RwLock::new("default".to_string()),
session_settings: RwLock::new(Settings::try_create()?.as_ref().clone()),
io_shutdown_tx: Default::default(),
context_shared: Default::default(),
query_context_shared: Default::default(),
})
}

Expand Down Expand Up @@ -128,24 +128,24 @@ impl SessionContext {
lock.take()
}

pub fn context_shared_is_none(&self) -> bool {
let lock = self.context_shared.read();
pub fn query_context_shared_is_none(&self) -> bool {
let lock = self.query_context_shared.read();
lock.is_none()
}

pub fn get_context_shared(&self) -> Option<Arc<QueryContextShared>> {
let lock = self.context_shared.read();
pub fn get_query_context_shared(&self) -> Option<Arc<QueryContextShared>> {
let lock = self.query_context_shared.read();
lock.clone()
}

pub fn set_context_shared(&self, ctx: Option<Arc<QueryContextShared>>) {
let mut lock = self.context_shared.write();
pub fn set_query_context_shared(&self, ctx: Option<Arc<QueryContextShared>>) {
let mut lock = self.query_context_shared.write();
*lock = ctx
}

// Take the context_shared.
pub fn take_context_shared(&self) -> Option<Arc<QueryContextShared>> {
let mut lock = self.context_shared.write();
pub fn take_query_context_shared(&self) -> Option<Arc<QueryContextShared>> {
let mut lock = self.query_context_shared.write();
lock.take()
}
}
12 changes: 6 additions & 6 deletions query/src/sessions/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Session {
fn to_process_info(self: &Arc<Self>, status: &SessionContext) -> ProcessInfo {
let mut memory_usage = 0;

if let Some(shared) = &status.get_context_shared() {
if let Some(shared) = &status.get_query_context_shared() {
if let Ok(runtime) = shared.try_get_runtime() {
let runtime_tracker = runtime.get_tracker();
let runtime_memory_tracker = runtime_tracker.get_memory_tracker();
Expand All @@ -70,7 +70,7 @@ impl Session {
}

fn process_state(self: &Arc<Self>, status: &SessionContext) -> String {
match status.get_context_shared() {
match status.get_query_context_shared() {
_ if status.get_abort() => String::from("Aborting"),
None => String::from("Idle"),
Some(_) => String::from("Query"),
Expand All @@ -86,27 +86,27 @@ impl Session {

fn rpc_extra_info(status: &SessionContext) -> Option<String> {
status
.get_context_shared()
.get_query_context_shared()
.map(|_| String::from("Partial cluster query stage"))
}

fn query_extra_info(status: &SessionContext) -> Option<String> {
status
.get_context_shared()
.get_query_context_shared()
.as_ref()
.map(|context_shared| context_shared.get_query_str())
}

fn query_dal_metrics(status: &SessionContext) -> Option<DalMetrics> {
status
.get_context_shared()
.get_query_context_shared()
.as_ref()
.map(|context_shared| context_shared.dal_ctx.get_metrics())
}

fn query_scan_progress_value(status: &SessionContext) -> Option<ProgressValues> {
status
.get_context_shared()
.get_query_context_shared()
.as_ref()
.map(|context_shared| context_shared.scan_progress.get_values())
}
Expand Down
6 changes: 3 additions & 3 deletions query/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct SessionManager {
pub(in crate::sessions) conf: Config,
pub(in crate::sessions) discovery: Arc<ClusterDiscovery>,
pub(in crate::sessions) catalog: Arc<DatabaseCatalog>,
pub(in crate::sessions) user: Arc<UserApiProvider>,
pub(in crate::sessions) user_manager: Arc<UserApiProvider>,
pub(in crate::sessions) auth_manager: Arc<AuthMgr>,
pub(in crate::sessions) http_query_manager: Arc<HttpQueryManager>,

Expand All @@ -71,7 +71,7 @@ impl SessionManager {
catalog,
conf,
discovery,
user,
user_manager: user,
http_query_manager,
auth_manager,
max_sessions: max_active_sessions,
Expand All @@ -98,7 +98,7 @@ impl SessionManager {

/// Get the user api provider.
pub fn get_user_manager(self: &Arc<Self>) -> Arc<UserApiProvider> {
self.user.clone()
self.user_manager.clone()
}

pub fn get_catalog(self: &Arc<Self>) -> Arc<DatabaseCatalog> {
Expand Down
30 changes: 11 additions & 19 deletions query/src/users/user_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
use std::sync::Arc;

use common_exception::Result;
use common_management::RoleApi;
use common_management::RoleMgr;
use common_management::RoleMgrApi;
use common_management::StageApi;
use common_management::StageMgr;
use common_management::StageMgrApi;
use common_management::UdfApi;
use common_management::UdfMgr;
use common_management::UdfMgrApi;
use common_management::UserApi;
use common_management::UserMgr;
use common_management::UserMgrApi;
use common_meta_api::KVApi;

use crate::common::MetaClientProvider;
Expand All @@ -33,35 +33,27 @@ pub struct UserApiProvider {
}

impl UserApiProvider {
async fn create_kv_client(cfg: &Config) -> Result<Arc<dyn KVApi>> {
match MetaClientProvider::new(cfg.meta.to_grpc_client_config())
.try_get_kv_client()
.await
{
Ok(client) => Ok(client),
Err(cause) => Err(cause.add_message_back("(while create user api).")),
}
}

pub async fn create_global(conf: Config) -> Result<Arc<UserApiProvider>> {
let client = UserApiProvider::create_kv_client(&conf).await?;
let client = MetaClientProvider::new(conf.meta.to_grpc_client_config())
.try_get_kv_client()
.await?;

Ok(Arc::new(UserApiProvider { client }))
}

pub fn get_user_api_client(&self, tenant: &str) -> Arc<dyn UserMgrApi> {
pub fn get_user_api_client(&self, tenant: &str) -> Arc<dyn UserApi> {
Arc::new(UserMgr::new(self.client.clone(), tenant))
}

pub fn get_role_api_client(&self, tenant: &str) -> Arc<dyn RoleMgrApi> {
pub fn get_role_api_client(&self, tenant: &str) -> Arc<dyn RoleApi> {
Arc::new(RoleMgr::new(self.client.clone(), tenant))
}

pub fn get_stage_api_client(&self, tenant: &str) -> Arc<dyn StageMgrApi> {
pub fn get_stage_api_client(&self, tenant: &str) -> Arc<dyn StageApi> {
Arc::new(StageMgr::new(self.client.clone(), tenant))
}

pub fn get_udf_api_client(&self, tenant: &str) -> Arc<dyn UdfMgrApi> {
pub fn get_udf_api_client(&self, tenant: &str) -> Arc<dyn UdfApi> {
Arc::new(UdfMgr::new(self.client.clone(), tenant))
}
}
Loading

0 comments on commit a9872b3

Please sign in to comment.