Skip to content

Commit

Permalink
use hashmap store (mysql_conn_id, process_id)
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed May 17, 2022
1 parent 06d21e3 commit 7a39bda
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 9 deletions.
13 changes: 13 additions & 0 deletions query/src/interpreters/interpreter_kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use common_meta_types::UserPrivilegeType;
use common_planners::KillPlan;
use common_streams::DataBlockStream;
use common_streams::SendableDataBlockStream;
use common_tracing::tracing;

use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
Expand Down Expand Up @@ -54,6 +55,18 @@ impl Interpreter for KillInterpreter {
.await?;

let id = &self.plan.id;
//TODO: TCeason this code need delete on final pr.
let conn_id = self.ctx.get_current_session().get_mysql_conn_id();
let get_id = self.ctx.get_id_by_mysql_conn_id(&conn_id).await;
match get_id {
Some(get) => {
tracing::info!("=== test get id is {}, plan id is {}", get, id);
}
None => {
tracing::info!("=== test get id is None, plan id is {}", id);
}
}

match self.ctx.get_session_by_id(id).await {
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
Expand Down
8 changes: 7 additions & 1 deletion query/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ impl<W: std::io::Write + Send + Sync> AsyncMysqlShim<W> for InteractiveWorker<W>
}

fn connect_id(&self) -> u32 {
self.session.get_mysql_conn_id()
match self.session.get_mysql_conn_id() {
Some(conn_id) => conn_id,
None => {
//default conn id
u32::from_le_bytes([0x08, 0x00, 0x00, 0x00])
}
}
}

fn default_auth_plugin(&self) -> &str {
Expand Down
12 changes: 12 additions & 0 deletions query/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,18 @@ impl QueryContext {
.await
}

// Get session id by mysql connection id.
pub async fn get_id_by_mysql_conn_id(
self: &Arc<Self>,
conn_id: &Option<u32>,
) -> Option<String> {
self.shared
.session
.get_session_manager()
.get_id_by_mysql_conn_id(conn_id)
.await
}

// Get all the processes list info.
pub async fn get_processes_info(self: &Arc<Self>) -> Vec<ProcessInfo> {
self.shared
Expand Down
10 changes: 2 additions & 8 deletions query/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,8 @@ impl Session {
}
}

pub fn get_mysql_conn_id(self: &Arc<Self>) -> u32 {
match self.mysql_connection_id {
Some(conn_id) => conn_id,
None => {
//default conn id
u32::from_le_bytes([0x08, 0x00, 0x00, 0x00])
}
}
pub fn get_mysql_conn_id(self: &Arc<Self>) -> Option<u32> {
self.mysql_connection_id.clone()
}

pub fn get_id(self: &Arc<Self>) -> String {
Expand Down
35 changes: 35 additions & 0 deletions query/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ pub struct SessionManager {
storage_operator: RwLock<Operator>,
storage_runtime: Arc<Runtime>,
_guards: Vec<WorkerGuard>,
// When typ is MySQL, insert into this map, the id is key, MySQL connection id is val.
pub(in crate::sessions) mysql_conn_map: Arc<RwLock<HashMap<Option<u32>, String>>>,
}

impl SessionManager {
Expand Down Expand Up @@ -101,6 +103,7 @@ impl SessionManager {
} else {
(Vec::new(), None)
};
let mysql_conn_map = Arc::new(RwLock::new(HashMap::with_capacity(max_sessions)));

Ok(Arc::new(SessionManager {
conf: RwLock::new(conf),
Expand All @@ -117,6 +120,7 @@ impl SessionManager {
storage_operator: RwLock::new(storage_operator),
storage_runtime: Arc::new(storage_runtime),
_guards,
mysql_conn_map,
}))
}

Expand Down Expand Up @@ -168,6 +172,7 @@ impl SessionManager {
));
}
}
let session_typ = typ.clone();
let session = Session::try_create(
config.clone(),
uuid::Uuid::new_v4().to_string(),
Expand All @@ -176,6 +181,27 @@ impl SessionManager {
)
.await?;

match session_typ {
SessionType::MySQL => {
let mut conn_id_query_id = self.mysql_conn_map.write();
if conn_id_query_id.len() < self.max_sessions {
// force insert
// one session can generate various query id, but the generate is seq.
conn_id_query_id.insert(session.get_mysql_conn_id(), session.get_id());
} else {
return Err(ErrorCode::TooManyUserConnections(
"The current accept connection has exceeded mysql_handler_thread_num config",
));
}
}
_ => {
tracing::debug!(
"session type is {}, mysql_conn_map no need to change.",
session_typ
);
}
}

let mut sessions = self.active_sessions.write();
if sessions.len() < self.max_sessions {
label_counter(
Expand Down Expand Up @@ -245,6 +271,15 @@ impl SessionManager {
.map(|session| SessionRef::create(session.clone()))
}

#[allow(clippy::ptr_arg)]
pub async fn get_id_by_mysql_conn_id(
self: &Arc<Self>,
mysql_conn_id: &Option<u32>,
) -> Option<String> {
let sessions = self.mysql_conn_map.read();
sessions.get(mysql_conn_id).map(|id| id.clone())
}

#[allow(clippy::ptr_arg)]
pub fn destroy_session(self: &Arc<Self>, session_id: &String) {
let config = self.get_conf();
Expand Down

0 comments on commit 7a39bda

Please sign in to comment.