From dda268083f188b218f159789c3dfc5840c64bc0c Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Wed, 10 Aug 2022 17:08:08 +0800 Subject: [PATCH 1/4] refactor session --- src/query/service/src/auth.rs | 6 +++--- .../servers/http/v1/query/execute_state.rs | 6 +++--- .../src/servers/http/v1/query/expirable.rs | 5 +++-- .../http/v1/query/http_query_context.rs | 10 +++++---- .../http/v1/query/http_query_manager.rs | 10 ++++----- .../servers/mysql/mysql_interactive_worker.rs | 13 +++++++++--- .../src/servers/mysql/mysql_session.rs | 7 ++++--- src/query/service/src/sessions/mod.rs | 2 -- src/query/service/src/sessions/query_ctx.rs | 8 +++---- src/query/service/src/sessions/session.rs | 4 ---- src/query/service/src/sessions/session_mgr.rs | 19 +++++++---------- .../service/tests/it/sessions/session.rs | 21 ------------------- src/query/service/tests/it/tests/context.rs | 8 ++----- 13 files changed, 48 insertions(+), 71 deletions(-) diff --git a/src/query/service/src/auth.rs b/src/query/service/src/auth.rs index 9f39fcdb4c7fa..b474e30be442b 100644 --- a/src/query/service/src/auth.rs +++ b/src/query/service/src/auth.rs @@ -23,7 +23,7 @@ use common_users::JwtAuthenticator; use common_users::UserApiProvider; use jwtk::Claims; -use crate::sessions::SessionRef; +use crate::sessions::Session; pub use crate::Config; pub struct AuthMgr { @@ -49,7 +49,7 @@ impl AuthMgr { })) } - pub async fn auth(&self, session: SessionRef, credential: &Credential) -> Result<()> { + pub async fn auth(&self, session: Arc, credential: &Credential) -> Result<()> { let user_info = match credential { Credential::Jwt { token: t, @@ -105,7 +105,7 @@ impl AuthMgr { async fn process_jwt_claims( &self, - session: &SessionRef, + session: &Arc, claims: &Claims, ) -> Result<(String, String)> { // setup tenant if the JWT claims contain extra.tenant_id diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index ab146b7b6f288..727661e2bdfd6 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -49,7 +49,7 @@ use crate::pipelines::PipelineBuildResult; use crate::servers::utils::use_planner_v2; use crate::sessions::QueryAffect; use crate::sessions::QueryContext; -use crate::sessions::SessionRef; +use crate::sessions::Session; use crate::sessions::TableContext; use crate::sql::ColumnBinding; use crate::sql::DfParser; @@ -105,7 +105,7 @@ impl ExecuteState { pub struct ExecuteRunning { // used to kill query - session: SessionRef, + session: Arc, // mainly used to get progress for now ctx: Arc, interpreter: Arc, @@ -180,7 +180,7 @@ impl Executor { impl ExecuteState { pub(crate) async fn try_create( request: &HttpQueryRequest, - session: SessionRef, + session: Arc, ctx: Arc, block_buffer: Arc, ) -> Result>> { diff --git a/src/query/service/src/servers/http/v1/query/expirable.rs b/src/query/service/src/servers/http/v1/query/expirable.rs index 82cc844246fee..f20221c613c94 100644 --- a/src/query/service/src/servers/http/v1/query/expirable.rs +++ b/src/query/service/src/servers/http/v1/query/expirable.rs @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; use std::time::Duration; use std::time::Instant; -use crate::sessions::SessionRef; +use crate::sessions::Session; #[derive(PartialEq, Eq)] pub enum ExpiringState { @@ -30,7 +31,7 @@ pub trait Expirable { fn on_expire(&self); } -impl Expirable for SessionRef { +impl Expirable for Arc { fn expire_state(&self) -> ExpiringState { if self.is_aborting() { ExpiringState::Aborted { diff --git a/src/query/service/src/servers/http/v1/query/http_query_context.rs b/src/query/service/src/servers/http/v1/query/http_query_context.rs index 4378931ff6980..2990ea3bd4ccf 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_context.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_context.rs @@ -12,24 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::sync::Arc; + use poem::FromRequest; use poem::Request; use poem::RequestBody; use poem::Result as PoemResult; -use crate::sessions::SessionRef; +use crate::sessions::Session; use crate::sessions::SessionType; pub struct HttpQueryContext { - session: SessionRef, + session: Arc, } impl HttpQueryContext { - pub fn new(session: SessionRef) -> Self { + pub fn new(session: Arc) -> Self { HttpQueryContext { session } } - pub fn get_session(&self, session_type: SessionType) -> SessionRef { + pub fn get_session(&self, session_type: SessionType) -> Arc { self.session.set_type(session_type); self.session.clone() } diff --git a/src/query/service/src/servers/http/v1/query/http_query_manager.rs b/src/query/service/src/servers/http/v1/query/http_query_manager.rs index 3773857373d3e..5aea3f843b654 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_manager.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_manager.rs @@ -29,7 +29,7 @@ use super::expiring_map::ExpiringMap; use super::HttpQueryContext; use crate::servers::http::v1::query::http_query::HttpQuery; use crate::servers::http::v1::query::HttpQueryRequest; -use crate::sessions::SessionRef; +use crate::sessions::Session; use crate::Config; // TODO(youngsofun): may need refactor later for 2 reasons: @@ -42,7 +42,7 @@ pub(crate) struct HttpQueryConfig { pub struct HttpQueryManager { pub(crate) queries: Arc>>>, - pub(crate) sessions: Mutex>, + pub(crate) sessions: Mutex>>, pub(crate) config: HttpQueryConfig, } @@ -117,14 +117,14 @@ impl HttpQueryManager { q } - pub(crate) async fn get_session(self: &Arc, session_id: &str) -> Option { + pub(crate) async fn get_session(self: &Arc, session_id: &str) -> Option> { let sessions = self.sessions.lock(); sessions.get(session_id) } - pub(crate) async fn add_session(self: &Arc, session: SessionRef, timeout: Duration) { + pub(crate) async fn add_session(self: &Arc, session: Arc, timeout: Duration) { let mut sessions = self.sessions.lock(); - sessions.insert(session.get_id(), session.clone(), Some(timeout)); + sessions.insert(session.get_id(), session, Some(timeout)); } pub(crate) fn kill_session(self: &Arc, session_id: &str) { diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index 7acc3dd2ed10c..387cc21d09ab6 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -52,7 +52,8 @@ use crate::servers::mysql::MySQLFederated; use crate::servers::mysql::MYSQL_VERSION; use crate::servers::utils::use_planner_v2; use crate::sessions::QueryContext; -use crate::sessions::SessionRef; +use crate::sessions::Session; +use crate::sessions::SessionManager; use crate::sessions::TableContext; use crate::sql::plans::Plan; use crate::sql::DfParser; @@ -83,10 +84,16 @@ fn has_result_set_by_plan_node(plan: &PlanNode) -> bool { } struct InteractiveWorkerBase { - session: SessionRef, + session: Arc, generic_hold: PhantomData, } +impl Drop for InteractiveWorkerBase { + fn drop(&mut self) { + SessionManager::instance().destroy_session(&self.session.get_id()) + } +} + pub struct InteractiveWorker { base: InteractiveWorkerBase, version: String, @@ -486,7 +493,7 @@ impl InteractiveWorkerBase { } impl InteractiveWorker { - pub fn create(session: SessionRef, client_addr: String) -> InteractiveWorker { + pub fn create(session: Arc, client_addr: String) -> InteractiveWorker { let mut bs = vec![0u8; 20]; let mut rng = rand::thread_rng(); rng.fill_bytes(bs.as_mut()); diff --git a/src/query/service/src/servers/mysql/mysql_session.rs b/src/query/service/src/servers/mysql/mysql_session.rs index 09e8067d1474b..7985eac2b36b7 100644 --- a/src/query/service/src/servers/mysql/mysql_session.rs +++ b/src/query/service/src/servers/mysql/mysql_session.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::net::Shutdown; +use std::sync::Arc; use common_base::base::tokio::io::BufWriter; use common_base::base::tokio::net::TcpStream; @@ -27,7 +28,7 @@ use opensrv_mysql::IntermediaryOptions; use tracing::error; use crate::servers::mysql::mysql_interactive_worker::InteractiveWorker; -use crate::sessions::SessionRef; +use crate::sessions::Session; // default size of resultset write buffer: 100KB const DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE: usize = 100 * 1024; @@ -35,7 +36,7 @@ const DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE: usize = 100 * 1024; pub struct MySQLConnection; impl MySQLConnection { - pub fn run_on_stream(session: SessionRef, stream: TcpStream) -> Result<()> { + pub fn run_on_stream(session: Arc, stream: TcpStream) -> Result<()> { let blocking_stream = Self::convert_stream(stream)?; MySQLConnection::attach_session(&session, &blocking_stream)?; @@ -58,7 +59,7 @@ impl MySQLConnection { Ok(()) } - fn attach_session(session: &SessionRef, blocking_stream: &std::net::TcpStream) -> Result<()> { + fn attach_session(session: &Arc, blocking_stream: &std::net::TcpStream) -> Result<()> { let host = blocking_stream.peer_addr().ok(); let blocking_stream_ref = blocking_stream.try_clone()?; session.attach(host, move || { diff --git a/src/query/service/src/sessions/mod.rs b/src/query/service/src/sessions/mod.rs index b6fdca424ec95..42577229d57d5 100644 --- a/src/query/service/src/sessions/mod.rs +++ b/src/query/service/src/sessions/mod.rs @@ -22,7 +22,6 @@ mod session_info; #[allow(clippy::module_inception)] mod session_mgr; mod session_mgr_status; -mod session_ref; mod session_settings; mod session_status; mod session_type; @@ -36,7 +35,6 @@ pub use session_ctx::SessionContext; pub use session_info::ProcessInfo; pub use session_mgr::SessionManager; pub use session_mgr_status::SessionManagerStatus; -pub use session_ref::SessionRef; pub use session_settings::Settings; pub use session_status::SessionStatus; pub use session_type::SessionType; diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 31e9cd7c372ec..f22f2179fc445 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -61,8 +61,8 @@ use crate::servers::http::v1::HttpQueryHandle; use crate::sessions::query_affect::QueryAffect; use crate::sessions::ProcessInfo; use crate::sessions::QueryContextShared; +use crate::sessions::Session; use crate::sessions::SessionManager; -use crate::sessions::SessionRef; use crate::sessions::Settings; use crate::sessions::TableContext; use crate::storages::stage::StageTable; @@ -190,12 +190,12 @@ impl QueryContext { } // Get the current session. - pub fn get_current_session(self: &Arc) -> SessionRef { - SessionRef::create(self.shared.session.clone()) + pub fn get_current_session(self: &Arc) -> Arc { + self.shared.session.clone() } // Get one session by session id. - pub async fn get_session_by_id(self: &Arc, id: &str) -> Option { + pub async fn get_session_by_id(self: &Arc, id: &str) -> Option> { SessionManager::instance().get_session_by_id(id).await } diff --git a/src/query/service/src/sessions/session.rs b/src/query/service/src/sessions/session.rs index fd0a7f4756c02..d26a8969deaf0 100644 --- a/src/query/service/src/sessions/session.rs +++ b/src/query/service/src/sessions/session.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::net::SocketAddr; -use std::sync::atomic::AtomicUsize; use std::sync::Arc; use chrono_tz::Tz; @@ -41,7 +40,6 @@ use crate::Config; pub struct Session { pub(in crate::sessions) id: String, pub(in crate::sessions) typ: RwLock, - pub(in crate::sessions) ref_count: Arc, pub(in crate::sessions) session_ctx: Arc, status: Arc>, pub(in crate::sessions) mysql_connection_id: Option, @@ -54,13 +52,11 @@ impl Session { session_ctx: Arc, mysql_connection_id: Option, ) -> Result> { - let ref_count = Arc::new(AtomicUsize::new(0)); let status = Arc::new(Default::default()); Ok(Arc::new(Session { id, typ: RwLock::new(typ), status, - ref_count, session_ctx, mysql_connection_id, })) diff --git a/src/query/service/src/sessions/session_mgr.rs b/src/query/service/src/sessions/session_mgr.rs index d4dce56414ae5..fafcd6f425446 100644 --- a/src/query/service/src/sessions/session_mgr.rs +++ b/src/query/service/src/sessions/session_mgr.rs @@ -36,7 +36,6 @@ use tracing::debug; use tracing::info; use crate::sessions::session::Session; -use crate::sessions::session_ref::SessionRef; use crate::sessions::ProcessInfo; use crate::sessions::SessionContext; use crate::sessions::SessionManagerStatus; @@ -87,7 +86,7 @@ impl SessionManager { self.conf.clone() } - pub async fn create_session(self: &Arc, typ: SessionType) -> Result { + pub async fn create_session(self: &Arc, typ: SessionType) -> Result> { // TODO: maybe deadlock let config = self.get_conf(); { @@ -137,7 +136,7 @@ impl SessionManager { sessions.insert(session.get_id(), session.clone()); - Ok(SessionRef::create(session)) + Ok(session) } else { Err(ErrorCode::TooManyUserConnections( "The current accept connection has exceeded max_active_sessions config", @@ -149,14 +148,14 @@ impl SessionManager { self: &Arc, id: String, aborted: bool, - ) -> Result { + ) -> Result> { // TODO: maybe deadlock? let config = self.get_conf(); { let sessions = self.active_sessions.read(); let v = sessions.get(&id); if v.is_some() { - return Ok(SessionRef::create(v.unwrap().clone())); + return Ok(v.unwrap().clone()); } } @@ -180,18 +179,16 @@ impl SessionManager { ); sessions.insert(id, session.clone()); - Ok(SessionRef::create(session)) + Ok(session) } else { - Ok(SessionRef::create(v.unwrap().clone())) + Ok(v.unwrap().clone()) } } #[allow(clippy::ptr_arg)] - pub async fn get_session_by_id(self: &Arc, id: &str) -> Option { + pub async fn get_session_by_id(self: &Arc, id: &str) -> Option> { let sessions = self.active_sessions.read(); - sessions - .get(id) - .map(|session| SessionRef::create(session.clone())) + sessions.get(id).cloned() } #[allow(clippy::ptr_arg)] diff --git a/src/query/service/tests/it/sessions/session.rs b/src/query/service/tests/it/sessions/session.rs index a01b4341ddf95..5e300469ff59e 100644 --- a/src/query/service/tests/it/sessions/session.rs +++ b/src/query/service/tests/it/sessions/session.rs @@ -67,26 +67,5 @@ async fn test_session_in_management_mode() -> Result<()> { assert_eq!(&actual, "tenant2"); } - // test session leak - let leak_id; - { - let leak_session = SessionManager::instance() - .create_session(SessionType::Dummy) - .await?; - leak_id = leak_session.get_id(); - assert!( - SessionManager::instance() - .get_session_by_id(leak_id.as_str()) - .await - .is_some() - ); - } - assert!( - SessionManager::instance() - .get_session_by_id(leak_id.as_str()) - .await - .is_none() - ); - Ok(()) } diff --git a/src/query/service/tests/it/tests/context.rs b/src/query/service/tests/it/tests/context.rs index 22d731d4ff684..4d75945984692 100644 --- a/src/query/service/tests/it/tests/context.rs +++ b/src/query/service/tests/it/tests/context.rs @@ -157,12 +157,8 @@ pub async fn create_query_context_with_cluster( let nodes = desc.cluster_nodes_list; let dummy_query_context = QueryContext::create_from_shared( - QueryContextShared::try_create( - config, - (*dummy_session).clone(), - Cluster::create(nodes, local_id), - ) - .await?, + QueryContextShared::try_create(config, dummy_session, Cluster::create(nodes, local_id)) + .await?, ); dummy_query_context.get_settings().set_max_threads(8)?; From 85c62204d932d7bcb81781711fc9d4439fc91deb Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Mon, 22 Aug 2022 17:42:16 +0800 Subject: [PATCH 2/4] rebase main --- src/query/service/src/sessions/session_ref.rs | 69 ------------------- 1 file changed, 69 deletions(-) delete mode 100644 src/query/service/src/sessions/session_ref.rs diff --git a/src/query/service/src/sessions/session_ref.rs b/src/query/service/src/sessions/session_ref.rs deleted file mode 100644 index fb812f5ddf7a8..0000000000000 --- a/src/query/service/src/sessions/session_ref.rs +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::ops::Deref; -use std::sync::atomic::Ordering; -use std::sync::Arc; - -use tracing::debug; - -use crate::sessions::Session; -use crate::sessions::SessionManager; - -/// SessionRef is the ptr of session. -/// Remove it in session_manager when the current session is not referenced -pub struct SessionRef { - session: Arc, -} - -impl SessionRef { - pub fn create(session: Arc) -> SessionRef { - session.increment_ref_count(); - SessionRef { session } - } -} - -impl Clone for SessionRef { - fn clone(&self) -> Self { - SessionRef::create(self.session.clone()) - } -} - -impl Deref for SessionRef { - type Target = Arc; - - fn deref(&self) -> &Self::Target { - &self.session - } -} - -impl Drop for SessionRef { - fn drop(&mut self) { - self.session.destroy_session_ref(); - } -} - -impl Session { - pub fn destroy_session_ref(self: &Arc) { - if self.ref_count.fetch_sub(1, Ordering::Relaxed) == 1 { - debug!("Destroy session {}", self.id); - SessionManager::instance().destroy_session(&self.id); - self.quit(); - } - } - - pub fn increment_ref_count(self: &Arc) { - self.ref_count.fetch_add(1, Ordering::Relaxed); - } -} From 20a74106547ba7d1696ed3108da4ebb382384978 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Tue, 23 Aug 2022 13:45:17 +0800 Subject: [PATCH 3/4] drop session in httphandler. --- .../src/servers/http/v1/query/http_query_context.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/query/service/src/servers/http/v1/query/http_query_context.rs b/src/query/service/src/servers/http/v1/query/http_query_context.rs index 2990ea3bd4ccf..164c1a3133304 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_context.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_context.rs @@ -20,6 +20,7 @@ use poem::RequestBody; use poem::Result as PoemResult; use crate::sessions::Session; +use crate::sessions::SessionManager; use crate::sessions::SessionType; pub struct HttpQueryContext { @@ -37,6 +38,12 @@ impl HttpQueryContext { } } +impl Drop for HttpQueryContext { + fn drop(&mut self) { + SessionManager::instance().destroy_session(&self.session.get_id()) + } +} + #[async_trait::async_trait] impl<'a> FromRequest<'a> for &'a HttpQueryContext { async fn from_request(req: &'a Request, _body: &mut RequestBody) -> PoemResult { From 454694cfb8bf93a4a87581d8cebbbbfff07f9a11 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Tue, 23 Aug 2022 18:12:30 +0800 Subject: [PATCH 4/4] try fix sqllogic test cluster --- src/query/service/src/sessions/session_mgr.rs | 50 +++---------------- 1 file changed, 7 insertions(+), 43 deletions(-) diff --git a/src/query/service/src/sessions/session_mgr.rs b/src/query/service/src/sessions/session_mgr.rs index fafcd6f425446..fd44ef917b60e 100644 --- a/src/query/service/src/sessions/session_mgr.rs +++ b/src/query/service/src/sessions/session_mgr.rs @@ -124,7 +124,7 @@ impl SessionManager { let user_api = UserApiProvider::instance(); let session_settings = Settings::try_create(&config, user_api, tenant).await?; let session_ctx = SessionContext::try_create(config.clone(), session_settings)?; - let session = Session::try_create(id, typ, session_ctx, mysql_conn_id)?; + let session = Session::try_create(id, typ.clone(), session_ctx, mysql_conn_id)?; let mut sessions = self.active_sessions.write(); if sessions.len() < self.max_sessions { @@ -134,7 +134,12 @@ impl SessionManager { &config.query.cluster_id, ); - sessions.insert(session.get_id(), session.clone()); + match typ { + SessionType::FlightRPC => {} + _ => { + sessions.insert(session.get_id(), session.clone()); + } + } Ok(session) } else { @@ -144,47 +149,6 @@ impl SessionManager { } } - pub async fn create_rpc_session( - self: &Arc, - id: String, - aborted: bool, - ) -> Result> { - // TODO: maybe deadlock? - let config = self.get_conf(); - { - let sessions = self.active_sessions.read(); - let v = sessions.get(&id); - if v.is_some() { - return Ok(v.unwrap().clone()); - } - } - - let tenant = config.query.tenant_id.clone(); - let user_api = UserApiProvider::instance(); - let session_settings = Settings::try_create(&config, user_api, tenant).await?; - let session_ctx = SessionContext::try_create(config.clone(), session_settings)?; - let session = Session::try_create(id.clone(), SessionType::FlightRPC, session_ctx, None)?; - - let mut sessions = self.active_sessions.write(); - let v = sessions.get(&id); - if v.is_none() { - if aborted { - return Err(ErrorCode::AbortedSession("Aborting server.")); - } - - label_counter( - super::metrics::METRIC_SESSION_CONNECT_NUMBERS, - &config.query.tenant_id, - &config.query.cluster_id, - ); - - sessions.insert(id, session.clone()); - Ok(session) - } else { - Ok(v.unwrap().clone()) - } - } - #[allow(clippy::ptr_arg)] pub async fn get_session_by_id(self: &Arc, id: &str) -> Option> { let sessions = self.active_sessions.read();