diff --git a/query/src/interpreters/interpreter_kill.rs b/query/src/interpreters/interpreter_kill.rs index d4de47ce7c01f..999cbe58e7f20 100644 --- a/query/src/interpreters/interpreter_kill.rs +++ b/query/src/interpreters/interpreter_kill.rs @@ -22,6 +22,7 @@ use common_meta_types::UserPrivilegeType; use common_planners::KillPlan; use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; +use common_tracing::tracing; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; @@ -54,6 +55,18 @@ impl Interpreter for KillInterpreter { .await?; let id = &self.plan.id; + //TODO: TCeason this code need delete on final pr. + let conn_id = self.ctx.get_current_session().get_mysql_conn_id(); + let get_id = self.ctx.get_id_by_mysql_conn_id(&conn_id).await; + match get_id { + Some(get) => { + tracing::info!("=== test get id is {}, plan id is {}", get, id); + } + None => { + tracing::info!("=== test get id is None, plan id is {}", id); + } + } + match self.ctx.get_session_by_id(id).await { None => Err(ErrorCode::UnknownSession(format!( "Not found session id {}", diff --git a/query/src/servers/mysql/mysql_interactive_worker.rs b/query/src/servers/mysql/mysql_interactive_worker.rs index 988c10fea3e5b..d30ca0d6ca51c 100644 --- a/query/src/servers/mysql/mysql_interactive_worker.rs +++ b/query/src/servers/mysql/mysql_interactive_worker.rs @@ -71,7 +71,13 @@ impl AsyncMysqlShim for InteractiveWorker } fn connect_id(&self) -> u32 { - self.session.get_mysql_conn_id() + match self.session.get_mysql_conn_id() { + Some(conn_id) => conn_id, + None => { + //default conn id + u32::from_le_bytes([0x08, 0x00, 0x00, 0x00]) + } + } } fn default_auth_plugin(&self) -> &str { diff --git a/query/src/sessions/query_ctx.rs b/query/src/sessions/query_ctx.rs index a4d1c2a9b281c..0f633235b962b 100644 --- a/query/src/sessions/query_ctx.rs +++ b/query/src/sessions/query_ctx.rs @@ -351,6 +351,18 @@ impl QueryContext { .await } + // Get session id by mysql connection id. + pub async fn get_id_by_mysql_conn_id( + self: &Arc, + conn_id: &Option, + ) -> Option { + self.shared + .session + .get_session_manager() + .get_id_by_mysql_conn_id(conn_id) + .await + } + // Get all the processes list info. pub async fn get_processes_info(self: &Arc) -> Vec { self.shared diff --git a/query/src/sessions/session.rs b/query/src/sessions/session.rs index 294fea2f62e66..54e7efde94242 100644 --- a/query/src/sessions/session.rs +++ b/query/src/sessions/session.rs @@ -89,14 +89,8 @@ impl Session { } } - pub fn get_mysql_conn_id(self: &Arc) -> u32 { - match self.mysql_connection_id { - Some(conn_id) => conn_id, - None => { - //default conn id - u32::from_le_bytes([0x08, 0x00, 0x00, 0x00]) - } - } + pub fn get_mysql_conn_id(self: &Arc) -> Option { + self.mysql_connection_id.clone() } pub fn get_id(self: &Arc) -> String { diff --git a/query/src/sessions/session_mgr.rs b/query/src/sessions/session_mgr.rs index eae5b9d10dbe5..4958d0205b7ce 100644 --- a/query/src/sessions/session_mgr.rs +++ b/query/src/sessions/session_mgr.rs @@ -63,6 +63,8 @@ pub struct SessionManager { storage_operator: RwLock, storage_runtime: Arc, _guards: Vec, + // When typ is MySQL, insert into this map, the id is key, MySQL connection id is val. + pub(in crate::sessions) mysql_conn_map: Arc, String>>>, } impl SessionManager { @@ -101,6 +103,7 @@ impl SessionManager { } else { (Vec::new(), None) }; + let mysql_conn_map = Arc::new(RwLock::new(HashMap::with_capacity(max_sessions))); Ok(Arc::new(SessionManager { conf: RwLock::new(conf), @@ -117,6 +120,7 @@ impl SessionManager { storage_operator: RwLock::new(storage_operator), storage_runtime: Arc::new(storage_runtime), _guards, + mysql_conn_map, })) } @@ -168,6 +172,7 @@ impl SessionManager { )); } } + let session_typ = typ.clone(); let session = Session::try_create( config.clone(), uuid::Uuid::new_v4().to_string(), @@ -176,6 +181,27 @@ impl SessionManager { ) .await?; + match session_typ { + SessionType::MySQL => { + let mut conn_id_query_id = self.mysql_conn_map.write(); + if conn_id_query_id.len() < self.max_sessions { + // force insert + // one session can generate various query id, but the generate is seq. + conn_id_query_id.insert(session.get_mysql_conn_id(), session.get_id()); + } else { + return Err(ErrorCode::TooManyUserConnections( + "The current accept connection has exceeded mysql_handler_thread_num config", + )); + } + } + _ => { + tracing::debug!( + "session type is {}, mysql_conn_map no need to change.", + session_typ + ); + } + } + let mut sessions = self.active_sessions.write(); if sessions.len() < self.max_sessions { label_counter( @@ -245,6 +271,15 @@ impl SessionManager { .map(|session| SessionRef::create(session.clone())) } + #[allow(clippy::ptr_arg)] + pub async fn get_id_by_mysql_conn_id( + self: &Arc, + mysql_conn_id: &Option, + ) -> Option { + let sessions = self.mysql_conn_map.read(); + sessions.get(mysql_conn_id).map(|id| id.clone()) + } + #[allow(clippy::ptr_arg)] pub fn destroy_session(self: &Arc, session_id: &String) { let config = self.get_conf();