Skip to content

Commit

Permalink
MySQL Handler Kill Query
Browse files Browse the repository at this point in the history
Link to discussion: #5405
  • Loading branch information
TCeason committed May 19, 2022
1 parent 056431b commit b9e58a9
Show file tree
Hide file tree
Showing 12 changed files with 205 additions and 33 deletions.
2 changes: 1 addition & 1 deletion docs/doc/30-reference/30-sql/60-kill/kill-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Attempts to forcibly terminate the currently running queries.
## Syntax

```
KILL QUERY|CONNECTION <query_id>
KILL QUERY|CONNECTION <session_id>
```

## Examples
Expand Down
49 changes: 35 additions & 14 deletions query/src/interpreters/interpreter_kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,25 @@ impl KillInterpreter {
pub fn try_create(ctx: Arc<QueryContext>, plan: KillPlan) -> Result<InterpreterPtr> {
Ok(Arc::new(KillInterpreter { ctx, plan }))
}

async fn execute_kill(&self, session_id: &String) -> Result<SendableDataBlockStream> {
match self.ctx.get_session_by_id(session_id).await {
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
session_id
))),
Some(kill_session) if self.plan.kill_connection => {
kill_session.force_kill_session();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
}
Some(kill_session) => {
kill_session.force_kill_query();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
}
}
}
}

#[async_trait::async_trait]
Expand All @@ -54,21 +73,23 @@ impl Interpreter for KillInterpreter {
.await?;

let id = &self.plan.id;
match self.ctx.get_session_by_id(id).await {
None => Err(ErrorCode::UnknownSession(format!(
"Not found session id {}",
id
))),
Some(kill_session) if self.plan.kill_connection => {
kill_session.force_kill_session();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
}
Some(kill_session) => {
kill_session.force_kill_query();
let schema = Arc::new(DataSchema::empty());
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
// If press Ctrl + C, MySQL Client will create a new session and send query
// `kill query mysql_connection_id` to server.
// the type of connection_id is u32, if parse success get session by connection_id,
// otherwise use the session_id.
// More info Link to: https://github.com/datafuselabs/databend/discussions/5405.
match id.parse::<u32>() {
Ok(mysql_conn_id) => {
let session_id = self.ctx.get_id_by_mysql_conn_id(&Some(mysql_conn_id)).await;
match session_id {
Some(get) => self.execute_kill(&get).await,
None => Err(ErrorCode::UnknownSession(format!(
"MySQL connection id {} not found session id",
mysql_conn_id
))),
}
}
Err(_) => self.execute_kill(id).await,
}
}
}
8 changes: 7 additions & 1 deletion query/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,13 @@ impl<W: std::io::Write + Send + Sync> AsyncMysqlShim<W> for InteractiveWorker<W>
}

fn connect_id(&self) -> u32 {
u32::from_le_bytes([0x08, 0x00, 0x00, 0x00])
match self.session.get_mysql_conn_id() {
Some(conn_id) => conn_id,
None => {
//default conn id
u32::from_le_bytes([0x08, 0x00, 0x00, 0x00])
}
}
}

fn default_auth_plugin(&self) -> &str {
Expand Down
12 changes: 12 additions & 0 deletions query/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,18 @@ impl QueryContext {
.await
}

// Get session id by mysql connection id.
pub async fn get_id_by_mysql_conn_id(
self: &Arc<Self>,
conn_id: &Option<u32>,
) -> Option<String> {
self.shared
.session
.get_session_manager()
.get_id_by_mysql_conn_id(conn_id)
.await
}

// Get all the processes list info.
pub async fn get_processes_info(self: &Arc<Self>) -> Vec<ProcessInfo> {
self.shared
Expand Down
42 changes: 33 additions & 9 deletions query/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use common_base::infallible::RwLock;
Expand Down Expand Up @@ -50,6 +51,7 @@ pub struct Session {
session_settings: Settings,
#[ignore_malloc_size_of = "insignificant"]
status: Arc<RwLock<SessionStatus>>,
pub(in crate::sessions) mysql_connection_id: Option<u32>,
}

impl Session {
Expand All @@ -63,16 +65,38 @@ impl Session {
let session_settings = Settings::try_create(&conf)?;
let ref_count = Arc::new(AtomicUsize::new(0));
let status = Arc::new(Default::default());
match typ {
SessionType::MySQL => {
let mysql_conn_id = session_mgr
.mysql_basic_conn_id
.load(Ordering::Relaxed)
.to_le();
Ok(Arc::new(Session {
id,
typ: RwLock::new(typ),
session_mgr,
ref_count,
session_ctx,
session_settings,
status,
mysql_connection_id: Some(mysql_conn_id),
}))
}
_ => Ok(Arc::new(Session {
id,
typ: RwLock::new(typ),
session_mgr,
ref_count,
session_ctx,
session_settings,
status,
mysql_connection_id: None,
})),
}
}

Ok(Arc::new(Session {
id,
typ: RwLock::new(typ),
session_mgr,
ref_count,
session_ctx,
session_settings,
status,
}))
pub fn get_mysql_conn_id(self: &Arc<Self>) -> Option<u32> {
self.mysql_connection_id
}

pub fn get_id(self: &Arc<Self>) -> String {
Expand Down
2 changes: 2 additions & 0 deletions query/src/sessions/session_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct ProcessInfo {
pub memory_usage: i64,
pub dal_metrics: Option<DalMetrics>,
pub scan_progress_value: Option<ProgressValues>,
pub mysql_connection_id: Option<u32>,
}

impl Session {
Expand Down Expand Up @@ -67,6 +68,7 @@ impl Session {
memory_usage,
dal_metrics: Session::query_dal_metrics(status),
scan_progress_value: Session::query_scan_progress_value(status),
mysql_connection_id: self.mysql_connection_id,
}
}

Expand Down
51 changes: 49 additions & 2 deletions query/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

use std::collections::HashMap;
use std::future::Future;
use std::ops::DerefMut;
use std::sync::atomic::AtomicU32;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -59,6 +62,9 @@ pub struct SessionManager {
storage_operator: RwLock<Operator>,
storage_runtime: Arc<Runtime>,
_guards: Vec<WorkerGuard>,
// When typ is MySQL, insert into this map, key is id, val is MySQL connection id.
pub(crate) mysql_conn_map: Arc<RwLock<HashMap<Option<u32>, String>>>,
pub(in crate::sessions) mysql_basic_conn_id: AtomicU32,
}

impl SessionManager {
Expand Down Expand Up @@ -93,6 +99,7 @@ impl SessionManager {
} else {
(Vec::new(), None)
};
let mysql_conn_map = Arc::new(RwLock::new(HashMap::with_capacity(max_sessions)));

Ok(Arc::new(SessionManager {
conf: RwLock::new(conf),
Expand All @@ -107,6 +114,8 @@ impl SessionManager {
storage_operator: RwLock::new(storage_operator),
storage_runtime: Arc::new(storage_runtime),
_guards,
mysql_conn_map,
mysql_basic_conn_id: AtomicU32::new(9_u32.to_le() as u32),
}))
}

Expand Down Expand Up @@ -145,10 +154,11 @@ impl SessionManager {
let sessions = self.active_sessions.read();
if sessions.len() == self.max_sessions {
return Err(ErrorCode::TooManyUserConnections(
"The current accept connection has exceeded mysql_handler_thread_num config",
"The current accept connection has exceeded max_active_sessions config",
));
}
}
let session_typ = typ.clone();
let session = Session::try_create(
config.clone(),
uuid::Uuid::new_v4().to_string(),
Expand All @@ -157,6 +167,27 @@ impl SessionManager {
)
.await?;

match session_typ {
SessionType::MySQL => {
let mut conn_id_session_id = self.mysql_conn_map.write();
if conn_id_session_id.len() < self.max_sessions {
conn_id_session_id.insert(session.get_mysql_conn_id(), session.get_id());
self.mysql_basic_conn_id.fetch_add(1, Ordering::Relaxed);
self.mysql_basic_conn_id.load(Ordering::Relaxed);
} else {
return Err(ErrorCode::TooManyUserConnections(
"The current accept connection has exceeded max_active_sessions config",
));
}
}
_ => {
tracing::debug!(
"session type is {}, mysql_conn_map no need to change.",
session_typ
);
}
}

let mut sessions = self.active_sessions.write();
if sessions.len() < self.max_sessions {
label_counter(
Expand All @@ -170,7 +201,7 @@ impl SessionManager {
Ok(SessionRef::create(session))
} else {
Err(ErrorCode::TooManyUserConnections(
"The current accept connection has exceeded mysql_handler_thread_num config",
"The current accept connection has exceeded max_active_sessions config",
))
}
}
Expand Down Expand Up @@ -226,6 +257,15 @@ impl SessionManager {
.map(|session| SessionRef::create(session.clone()))
}

#[allow(clippy::ptr_arg)]
pub async fn get_id_by_mysql_conn_id(
self: &Arc<Self>,
mysql_conn_id: &Option<u32>,
) -> Option<String> {
let sessions = self.mysql_conn_map.read();
sessions.get(mysql_conn_id).cloned()
}

#[allow(clippy::ptr_arg)]
pub fn destroy_session(self: &Arc<Self>, session_id: &String) {
let config = self.get_conf();
Expand All @@ -237,6 +277,13 @@ impl SessionManager {

let mut sessions = self.active_sessions.write();
sessions.remove(session_id);
//also need remove mysql_conn_map
let mut mysql_conns_map = self.mysql_conn_map.write();
for (k, v) in mysql_conns_map.deref_mut().clone() {
if &v == session_id {
mysql_conns_map.remove(&k);
}
}
}

pub fn graceful_shutdown(
Expand Down
26 changes: 22 additions & 4 deletions query/src/sql/parsers/parser_kill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
// Borrow from apache/arrow/rust/datafusion/src/sql/sql_parser
// See notice.md

use sqlparser::ast::Ident;
use sqlparser::parser::ParserError;
use sqlparser::tokenizer::Token;

use crate::sql::statements::DfKillStatement;
use crate::sql::DfParser;
Expand All @@ -34,9 +36,25 @@ impl<'a> DfParser<'a> {

// Parse 'KILL statement'.
fn parse_kill<const KILL_QUERY: bool>(&mut self) -> Result<DfStatement<'a>, ParserError> {
Ok(DfStatement::KillStatement(DfKillStatement {
object_id: self.parser.parse_identifier()?,
kill_query: KILL_QUERY,
}))
let token = self.parser.next_token();
match &token {
Token::Word(w) => Ok(DfStatement::KillStatement(DfKillStatement {
object_id: w.to_ident(),
kill_query: KILL_QUERY,
})),
// Sometimes MySQL Client could send `kill query connect_id`
// and the connect_id is a number, so we parse it as a SingleQuotedString.
Token::SingleQuotedString(s) | Token::Number(s, _) => {
Ok(DfStatement::KillStatement(DfKillStatement {
object_id: Ident::with_quote('\'', s),
kill_query: KILL_QUERY,
}))
}
Token::BackQuotedString(s) => Ok(DfStatement::KillStatement(DfKillStatement {
object_id: Ident::with_quote('`', s),
kill_query: KILL_QUERY,
})),
_ => self.expected("identifier", token),
}
}
}
4 changes: 4 additions & 0 deletions query/src/storages/system/processes_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl AsyncSystemTable for ProcessesTable {
let mut processes_dal_metrics_write_bytes = Vec::with_capacity(processes_info.len());
let mut processes_scan_progress_read_rows = Vec::with_capacity(processes_info.len());
let mut processes_scan_progress_read_bytes = Vec::with_capacity(processes_info.len());
let mut processes_mysql_connection_id = Vec::with_capacity(processes_info.len());

for process_info in &processes_info {
processes_id.push(process_info.id.clone().into_bytes());
Expand All @@ -77,6 +78,7 @@ impl AsyncSystemTable for ProcessesTable {
ProcessesTable::process_scan_progress_values(&process_info.scan_progress_value);
processes_scan_progress_read_rows.push(scan_progress_read_rows);
processes_scan_progress_read_bytes.push(scan_progress_read_bytes);
processes_mysql_connection_id.push(process_info.mysql_connection_id);
}

Ok(DataBlock::create(self.table_info.schema(), vec![
Expand All @@ -92,6 +94,7 @@ impl AsyncSystemTable for ProcessesTable {
Series::from_data(processes_dal_metrics_write_bytes),
Series::from_data(processes_scan_progress_read_rows),
Series::from_data(processes_scan_progress_read_bytes),
Series::from_data(processes_mysql_connection_id),
]))
}
}
Expand All @@ -111,6 +114,7 @@ impl ProcessesTable {
DataField::new_nullable("dal_metrics_write_bytes", u64::to_data_type()),
DataField::new_nullable("scan_progress_read_rows", u64::to_data_type()),
DataField::new_nullable("scan_progress_read_bytes", u64::to_data_type()),
DataField::new_nullable("mysql_connection_id", u32::to_data_type()),
]);

let table_info = TableInfo {
Expand Down
4 changes: 2 additions & 2 deletions query/tests/it/servers/mysql/mysql_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn test_rejected_session_with_sequence() -> Result<()> {
Ok(_) => panic!("Expected rejected connection"),
Err(error) => {
assert_eq!(error.code(), 1067);
assert_eq!(error.message(), "Reject connection, cause: Server error: `ERROR HY000 (1815): The current accept connection has exceeded mysql_handler_thread_num config'");
assert_eq!(error.message(), "Reject connection, cause: Server error: `ERROR HY000 (1815): The current accept connection has exceeded max_active_sessions config'");
}
};

Expand Down Expand Up @@ -99,7 +99,7 @@ async fn test_rejected_session_with_parallel() -> Result<()> {
Err(error) => {
destroy_barrier.wait().await;
assert_eq!(error.code(), 1067);
assert_eq!(error.message(), "Reject connection, cause: Server error: `ERROR HY000 (1815): The current accept connection has exceeded mysql_handler_thread_num config'");
assert_eq!(error.message(), "Reject connection, cause: Server error: `ERROR HY000 (1815): The current accept connection has exceeded max_active_sessions config'");
CreateServerResult::Rejected
}
}
Expand Down
Loading

0 comments on commit b9e58a9

Please sign in to comment.