From fe01c43d9521c7e01727eb915293d8cb11e352ea Mon Sep 17 00:00:00 2001 From: youngsofun Date: Tue, 22 Mar 2022 15:49:07 +0800 Subject: [PATCH 1/4] http handler support session --- .../interpreter_factory_interceptor.rs | 1 + .../servers/http/v1/http_query_handlers.rs | 8 +- query/src/servers/http/v1/mod.rs | 2 + .../servers/http/v1/query/execute_state.rs | 14 +- query/src/servers/http/v1/query/expirable.rs | 35 ++++ .../src/servers/http/v1/query/expiring_map.rs | 172 ++++++++++++++++++ query/src/servers/http/v1/query/http_query.rs | 64 ++++++- .../http/v1/query/http_query_manager.rs | 21 +++ query/src/servers/http/v1/query/mod.rs | 3 + query/src/servers/http/v1/statement.rs | 4 +- query/src/sessions/mod.rs | 6 +- query/src/sessions/session.rs | 15 ++ query/src/sessions/session_mgr.rs | 4 +- .../{status.rs => session_mgr_status.rs} | 8 +- query/src/sessions/session_status.rs | 40 ++++ .../it/servers/http/http_query_handlers.rs | 51 ++++++ 16 files changed, 420 insertions(+), 28 deletions(-) create mode 100644 query/src/servers/http/v1/query/expirable.rs create mode 100644 query/src/servers/http/v1/query/expiring_map.rs rename query/src/sessions/{status.rs => session_mgr_status.rs} (91%) create mode 100644 query/src/sessions/session_status.rs diff --git a/query/src/interpreters/interpreter_factory_interceptor.rs b/query/src/interpreters/interpreter_factory_interceptor.rs index 50aba2aa4e35..ae021834a1ab 100644 --- a/query/src/interpreters/interpreter_factory_interceptor.rs +++ b/query/src/interpreters/interpreter_factory_interceptor.rs @@ -81,6 +81,7 @@ impl Interpreter for InterceptorInterpreter { async fn finish(&self) -> Result<()> { let session = self.ctx.get_current_session(); let now = SystemTime::now(); + session.get_status().write().query_finish(); if session.get_type().is_user_session() { session .get_session_manager() diff --git a/query/src/servers/http/v1/http_query_handlers.rs b/query/src/servers/http/v1/http_query_handlers.rs index b0a789d2bc63..cc53a74dd8b9 100644 --- a/query/src/servers/http/v1/http_query_handlers.rs +++ b/query/src/servers/http/v1/http_query_handlers.rs @@ -78,6 +78,7 @@ pub struct QueryStats { #[derive(Serialize, Deserialize, Debug)] pub struct QueryResponse { pub id: String, + pub session_id: Option, pub schema: Option, pub data: JsonBlockRef, pub state: ExecuteStateName, @@ -99,7 +100,8 @@ impl QueryResponse { ), None => (Arc::new(vec![]), None), }; - let columns = r.initial_state.as_ref().and_then(|v| v.schema.clone()); + let schema = r.initial_state.as_ref().and_then(|v| v.schema.clone()); + let session_id = r.initial_state.as_ref().map(|v| v.session_id.clone()); let stats = QueryStats { scan_progress: r.state.scan_progress.clone(), running_time_ms: r.state.running_time_ms, @@ -107,7 +109,8 @@ impl QueryResponse { QueryResponse { data, state: r.state.state, - schema: columns, + schema, + session_id, stats, id: id.clone(), next_uri: next_url, @@ -124,6 +127,7 @@ impl QueryResponse { state: ExecuteStateName::Failed, data: Arc::new(vec![]), schema: None, + session_id: None, next_uri: None, stats_uri: None, final_uri: None, diff --git a/query/src/servers/http/v1/mod.rs b/query/src/servers/http/v1/mod.rs index 5def7987aaa6..c82ec031892b 100644 --- a/query/src/servers/http/v1/mod.rs +++ b/query/src/servers/http/v1/mod.rs @@ -34,6 +34,8 @@ pub use load::LoadResponse; pub use query::ExecuteStateName; pub use query::HttpQueryHandle; pub use query::HttpQueryManager; +pub use query::HttpSession; +pub use query::HttpSessionConf; pub use stage::upload_to_stage; pub use stage::UploadToStageResponse; pub use statement::statement_handler; diff --git a/query/src/servers/http/v1/query/execute_state.rs b/query/src/servers/http/v1/query/execute_state.rs index 492cae445468..88a97f35fafe 100644 --- a/query/src/servers/http/v1/query/execute_state.rs +++ b/query/src/servers/http/v1/query/execute_state.rs @@ -25,7 +25,6 @@ use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; -use common_meta_types::UserInfo; use common_tracing::tracing; use futures::StreamExt; use serde::Deserialize; @@ -36,9 +35,7 @@ use super::http_query::HttpQueryRequest; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterFactory; use crate::sessions::QueryContext; -use crate::sessions::SessionManager; use crate::sessions::SessionRef; -use crate::sessions::SessionType; use crate::sql::PlanParser; #[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq)] @@ -137,22 +134,13 @@ pub(crate) struct ExecuteRunning { impl ExecuteState { pub(crate) async fn try_create( request: &HttpQueryRequest, - session_manager: &Arc, - user_info: &UserInfo, + session: SessionRef, block_tx: mpsc::Sender, ) -> Result<(Arc>, DataSchemaRef)> { let sql = &request.sql; let start_time = Instant::now(); - let session = session_manager - .create_session(SessionType::HTTPQuery) - .await?; let ctx = session.create_query_context().await?; - if let Some(db) = &request.session.database { - ctx.set_current_database(db.clone()).await?; - }; ctx.attach_query_str(sql); - session.set_current_user(user_info.clone()); - let plan = PlanParser::parse(ctx.clone(), sql).await?; let schema = plan.schema(); diff --git a/query/src/servers/http/v1/query/expirable.rs b/query/src/servers/http/v1/query/expirable.rs new file mode 100644 index 000000000000..28a4117d80a7 --- /dev/null +++ b/query/src/servers/http/v1/query/expirable.rs @@ -0,0 +1,35 @@ +use std::time::Instant; + +use crate::sessions::SessionRef; + +#[derive(PartialEq)] +pub enum ExpiringState { + InUse, + Idle { since: Instant }, + Aborted { need_cleanup: bool }, +} + +pub trait Expirable { + fn expire_state(&self) -> ExpiringState; + fn on_expire(&self); +} + +impl Expirable for SessionRef { + fn expire_state(&self) -> ExpiringState { + if self.is_aborting() { + ExpiringState::Aborted { + need_cleanup: false, + } + } else if !self.query_context_shared_is_none() { + ExpiringState::InUse + } else { + let status = self.get_status(); + let status = status.read(); + ExpiringState::Idle { + since: status.last_access(), + } + } + } + + fn on_expire(&self) {} +} diff --git a/query/src/servers/http/v1/query/expiring_map.rs b/query/src/servers/http/v1/query/expiring_map.rs new file mode 100644 index 000000000000..3fc80183e9eb --- /dev/null +++ b/query/src/servers/http/v1/query/expiring_map.rs @@ -0,0 +1,172 @@ +// Copyright 2021 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::borrow::Borrow; +use std::collections::HashMap; +use std::hash::Hash; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; + +use common_base::tokio::task; +use common_base::tokio::time::sleep; +use common_infallible::RwLock; + +use crate::servers::http::v1::query::expirable::Expirable; +use crate::servers::http::v1::query::expirable::ExpiringState; + +// todo(youngsofun): use ExpiringMap for HttpQuery + +struct MaybeExpiring +where V: Expirable +{ + task: Option>, + pub value: V, +} + +// on insert:start task +// 1. check V for expire +// 2. call remove if expire +// on remove: +// 1. call on_expire +// 2. Cancel task + +pub struct ExpiringMap +where V: Expirable +{ + inner: Arc>>, +} + +async fn run_check(e: &T, max_idle: Duration) -> bool { + loop { + match e.expire_state() { + ExpiringState::InUse => sleep(max_idle).await, + ExpiringState::Idle { since } => { + let now = Instant::now(); + if now - since > max_idle { + return true; + } else { + sleep(max_idle - (now - since)).await; + continue; + } + } + ExpiringState::Aborted { need_cleanup } => return need_cleanup, + } + } +} + +impl Default for ExpiringMap +where V: Expirable +{ + fn default() -> Self { + Self { + inner: Arc::new(RwLock::new(Inner::default())), + } + } +} + +struct Inner +where V: Expirable +{ + map: HashMap>, +} + +impl Default for Inner +where V: Expirable +{ + fn default() -> Self { + Self { + map: HashMap::new(), + } + } +} + +impl Inner +where + K: Hash + Eq, + V: Expirable, +{ + pub fn remove(&mut self, k: &Q) + where + K: Borrow, + Q: Hash + Eq, + { + if let Some(mut checker) = self.map.remove(k) { + if let Some(t) = checker.task.take() { + t.abort() + } + checker.value.on_expire(); + } + } + pub fn insert(&mut self, k: K, v: MaybeExpiring) { + self.map.insert(k, v); + } + + pub fn get(&self, k: &Q) -> Option<&V> + where + K: Borrow, + Q: Hash + Eq, + { + self.map.get(k).map(|i| &i.value) + } +} + +impl ExpiringMap +where + K: Hash + Eq, + V: Expirable + Clone, +{ + pub fn insert(&mut self, k: K, v: V, max_idle_time: Option) + where + K: Clone + Send + Sync + 'static, + V: Send + Sync + 'static, + { + let mut inner = self.inner.write(); + let task = match max_idle_time { + Some(d) => { + let inner = self.inner.clone(); + let v_clone = v.clone(); + let k_clone = k.clone(); + let task = task::spawn(async move { + if run_check(&v_clone, d).await { + let mut inner = inner.write(); + inner.remove(&k_clone); + } + }); + Some(task) + } + None => None, + }; + let i = MaybeExpiring { task, value: v }; + inner.insert(k, i); + } + + pub fn get(&self, k: &Q) -> Option + where + K: Borrow, + Q: Hash + Eq, + { + let inner = self.inner.read(); + inner.get(k).cloned() + } + + pub fn remove(&mut self, k: &Q) + where + K: Borrow, + Q: Hash + Eq, + { + let mut map = self.inner.write(); + map.remove(k) + } +} diff --git a/query/src/servers/http/v1/query/http_query.rs b/query/src/servers/http/v1/query/http_query.rs index c36b1c9dfd53..537d5f00e2f9 100644 --- a/query/src/servers/http/v1/query/http_query.rs +++ b/query/src/servers/http/v1/query/http_query.rs @@ -27,6 +27,8 @@ use common_exception::Result; use common_meta_types::UserInfo; use serde::Deserialize; +use crate::servers::http::v1::query::expirable::Expirable; +use crate::servers::http::v1::query::expirable::ExpiringState; use crate::servers::http::v1::query::http_query_manager::HttpQueryConfig; use crate::servers::http::v1::query::ExecuteState; use crate::servers::http::v1::query::ExecuteStateName; @@ -35,12 +37,13 @@ use crate::servers::http::v1::query::ResponseData; use crate::servers::http::v1::query::ResultDataManager; use crate::servers::http::v1::query::Wait; use crate::sessions::SessionManager; +use crate::sessions::SessionType; #[derive(Deserialize, Debug)] #[serde(deny_unknown_fields)] pub struct HttpQueryRequest { #[serde(default)] - pub session: HttpSessionConf, + pub session: HttpSession, pub sql: String, #[serde(default)] pub pagination: PaginationConf, @@ -68,13 +71,30 @@ impl PaginationConf { } } -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq)] +#[serde(deny_unknown_fields)] pub struct HttpSessionConf { pub database: Option, + pub max_idle_time: Option, +} + +#[derive(Deserialize, Debug, PartialEq)] +#[serde(untagged)] +pub enum HttpSession { + // keep New before old, so it deserialize to New when empty + New(HttpSessionConf), + Old { id: String }, +} + +impl Default for HttpSession { + fn default() -> Self { + HttpSession::New(Default::default()) + } } pub struct ResponseInitialState { pub schema: Option, + pub session_id: String, } pub struct ResponseState { @@ -92,6 +112,7 @@ pub struct HttpQueryResponseInternal { pub struct HttpQuery { pub(crate) id: String, + pub(crate) session_id: String, #[allow(dead_code)] request: HttpQueryRequest, state: Arc>, @@ -108,14 +129,48 @@ impl HttpQuery { user_info: &UserInfo, config: HttpQueryConfig, ) -> Result> { + let http_query_manager = session_manager.get_http_query_manager(); + let session = match &request.session { + HttpSession::New(session_conf) => { + let session = session_manager + .create_session(SessionType::HTTPQuery) + .await?; + if let Some(db) = &session_conf.database { + session.set_current_database(db.clone()); + } + if let Some(secs) = session_conf.max_idle_time { + if secs > 0 { + http_query_manager + .add_session(session.clone(), Duration::from_secs(secs)) + .await; + } + } + session + } + HttpSession::Old { id } => { + let session = http_query_manager + .get_session(id) + .await + .ok_or_else(|| ErrorCode::UnknownSession(id))?; + if session.expire_state() == ExpiringState::InUse { + return Err(ErrorCode::BadArguments( + "last query on the session not finished", + )); + }; + session + } + }; + session.set_current_user(user_info.clone()); + let session_id = session.get_id().clone(); + //TODO(youngsofun): support config/set channel size let (block_tx, block_rx) = mpsc::channel(10); - let (state, schema) = - ExecuteState::try_create(&request, session_manager, user_info, block_tx).await?; + let (state, schema) = ExecuteState::try_create(&request, session, block_tx).await?; let data = Arc::new(TokioMutex::new(ResultDataManager::new(schema, block_rx))); let query = HttpQuery { id: id.to_string(), + session_id, request, state, data, @@ -158,6 +213,7 @@ impl HttpQuery { let data = self.data.lock().await; ResponseInitialState { schema: Some(data.schema.clone()), + session_id: self.session_id.clone(), } } diff --git a/query/src/servers/http/v1/query/http_query_manager.rs b/query/src/servers/http/v1/query/http_query_manager.rs index fbed7ffbf98d..6b0afa6814bb 100644 --- a/query/src/servers/http/v1/query/http_query_manager.rs +++ b/query/src/servers/http/v1/query/http_query_manager.rs @@ -14,18 +14,22 @@ use std::collections::HashMap; use std::sync::Arc; +use std::time::Duration; use common_base::tokio; use common_base::tokio::sync::RwLock; use common_base::tokio::time::sleep; use common_exception::Result; +use common_infallible::Mutex; use common_meta_types::UserInfo; use common_tracing::tracing; +use super::expiring_map::ExpiringMap; use crate::configs::Config; use crate::servers::http::v1::query::http_query::HttpQuery; use crate::servers::http::v1::query::HttpQueryRequest; use crate::sessions::SessionManager; +use crate::sessions::SessionRef; // TODO(youngsofun): may need refactor later for 2 reasons: // 1. some can be both configured and overwritten by http query request @@ -37,6 +41,7 @@ pub(crate) struct HttpQueryConfig { pub struct HttpQueryManager { pub(crate) queries: Arc>>>, + pub(crate) sessions: Mutex>, pub(crate) config: HttpQueryConfig, } @@ -44,6 +49,7 @@ impl HttpQueryManager { pub async fn create_global(cfg: Config) -> Result> { Ok(Arc::new(HttpQueryManager { queries: Arc::new(RwLock::new(HashMap::new())), + sessions: Mutex::new(ExpiringMap::default()), config: HttpQueryConfig { result_timeout_millis: cfg.query.http_handler_result_timeout_millis, }, @@ -104,4 +110,19 @@ impl HttpQueryManager { } q } + + pub(crate) async fn get_session(self: &Arc, session_id: &str) -> Option { + let sessions = self.sessions.lock(); + sessions.get(session_id) + } + + pub(crate) async fn add_session(self: &Arc, session: SessionRef, timeout: Duration) { + let mut sessions = self.sessions.lock(); + sessions.insert(session.get_id(), session.clone(), Some(timeout)); + } + + pub(crate) fn kill_session(self: &Arc, session_id: &str) { + let mut sessions = self.sessions.lock(); + sessions.remove(session_id); + } } diff --git a/query/src/servers/http/v1/query/mod.rs b/query/src/servers/http/v1/query/mod.rs index ed0bb05cd225..d4d825b24534 100644 --- a/query/src/servers/http/v1/query/mod.rs +++ b/query/src/servers/http/v1/query/mod.rs @@ -13,6 +13,8 @@ // limitations under the License. mod execute_state; +mod expirable; +mod expiring_map; mod http_query; mod http_query_manager; mod result_data_manager; @@ -24,6 +26,7 @@ pub use execute_state::HttpQueryHandle; pub use http_query::HttpQuery; pub use http_query::HttpQueryRequest; pub use http_query::HttpQueryResponseInternal; +pub use http_query::HttpSession; pub use http_query::HttpSessionConf; pub use http_query::PaginationConf; pub use http_query::ResponseInitialState; diff --git a/query/src/servers/http/v1/statement.rs b/query/src/servers/http/v1/statement.rs index 7abf4f2ff5b3..1b08ea3a4325 100644 --- a/query/src/servers/http/v1/statement.rs +++ b/query/src/servers/http/v1/statement.rs @@ -26,6 +26,7 @@ use poem::Route; use serde::Deserialize; use super::query::HttpQueryRequest; +use super::query::HttpSession; use super::query::HttpSessionConf; use super::query::PaginationConf; use super::QueryResponse; @@ -48,10 +49,11 @@ pub async fn statement_handler( let query_id = http_query_manager.next_query_id(); let session = HttpSessionConf { database: params.db.filter(|x| !x.is_empty()), + max_idle_time: None, }; let req = HttpQueryRequest { sql, - session, + session: HttpSession::New(session), pagination: PaginationConf { wait_time_secs: -1 }, }; let query = http_query_manager diff --git a/query/src/sessions/mod.rs b/query/src/sessions/mod.rs index f0c36f0e7711..8c18176d36c0 100644 --- a/query/src/sessions/mod.rs +++ b/query/src/sessions/mod.rs @@ -20,10 +20,11 @@ mod session_ctx; mod session_info; #[allow(clippy::module_inception)] mod session_mgr; +mod session_mgr_status; mod session_ref; mod session_settings; +mod session_status; mod session_type; -mod status; pub use query_ctx::QueryContext; pub use query_ctx_shared::QueryContextShared; @@ -31,7 +32,8 @@ pub use session::Session; pub use session_ctx::SessionContext; pub use session_info::ProcessInfo; pub use session_mgr::SessionManager; +pub use session_mgr_status::SessionManagerStatus; pub use session_ref::SessionRef; pub use session_settings::Settings; +pub use session_status::SessionStatus; pub use session_type::SessionType; -pub use status::Status; diff --git a/query/src/sessions/session.rs b/query/src/sessions/session.rs index 8968789c9bc7..91e74cd2bf84 100644 --- a/query/src/sessions/session.rs +++ b/query/src/sessions/session.rs @@ -18,6 +18,7 @@ use std::sync::Arc; use common_exception::ErrorCode; use common_exception::Result; +use common_infallible::RwLock; use common_macros::MallocSizeOf; use common_mem_allocator::malloc_size; use common_meta_types::GrantObject; @@ -32,6 +33,7 @@ use crate::sessions::QueryContext; use crate::sessions::QueryContextShared; use crate::sessions::SessionContext; use crate::sessions::SessionManager; +use crate::sessions::SessionStatus; use crate::sessions::SessionType; use crate::sessions::Settings; @@ -46,6 +48,8 @@ pub struct Session { pub(in crate::sessions) session_ctx: Arc, #[ignore_malloc_size_of = "insignificant"] session_settings: Settings, + #[ignore_malloc_size_of = "insignificant"] + status: Arc>, } impl Session { @@ -59,6 +63,7 @@ impl Session { let session_settings = Settings::try_create(&conf, session_ctx.clone(), session_mgr.get_user_manager())?; let ref_count = Arc::new(AtomicUsize::new(0)); + let status = Arc::new(Default::default()); Ok(Arc::new(Session { id, @@ -67,6 +72,7 @@ impl Session { ref_count, session_ctx, session_settings, + status, })) } @@ -94,6 +100,7 @@ impl Session { } } } + self.session_mgr.http_query_manager.kill_session(&self.id); } pub fn force_kill_session(self: &Arc) { @@ -142,6 +149,10 @@ impl Session { }) } + pub fn query_context_shared_is_none(&self) -> bool { + self.session_ctx.query_context_shared_is_none() + } + pub fn attach(self: &Arc, host: Option, io_shutdown: F) where F: FnOnce() + Send + 'static { let (tx, rx) = futures::channel::oneshot::channel(); @@ -234,4 +245,8 @@ impl Session { pub fn get_config(&self) -> Config { self.session_mgr.get_config() } + + pub fn get_status(self: &Arc) -> Arc> { + self.status.clone() + } } diff --git a/query/src/sessions/session_mgr.rs b/query/src/sessions/session_mgr.rs index 683ca3761a3f..af7cc16229f1 100644 --- a/query/src/sessions/session_mgr.rs +++ b/query/src/sessions/session_mgr.rs @@ -46,8 +46,8 @@ use crate::servers::http::v1::HttpQueryManager; use crate::sessions::session::Session; use crate::sessions::session_ref::SessionRef; use crate::sessions::ProcessInfo; +use crate::sessions::SessionManagerStatus; use crate::sessions::SessionType; -use crate::sessions::Status; use crate::storages::cache::CacheManager; use crate::users::auth::auth_mgr::AuthMgr; use crate::users::UserApiProvider; @@ -65,7 +65,7 @@ pub struct SessionManager { pub(in crate::sessions) storage_cache_manager: RwLock>, pub(in crate::sessions) query_logger: RwLock>>, - pub status: Arc>, + pub status: Arc>, storage_operator: RwLock, storage_runtime: Runtime, _guards: Vec, diff --git a/query/src/sessions/status.rs b/query/src/sessions/session_mgr_status.rs similarity index 91% rename from query/src/sessions/status.rs rename to query/src/sessions/session_mgr_status.rs index 9cf3f3a4ba5e..dcf0d35c13fd 100644 --- a/query/src/sessions/status.rs +++ b/query/src/sessions/session_mgr_status.rs @@ -15,14 +15,14 @@ use std::time::SystemTime; #[derive(Clone)] -pub struct Status { +pub struct SessionManagerStatus { pub running_queries_count: u64, pub last_query_started_at: Option, pub last_query_finished_at: Option, pub instance_started_at: SystemTime, } -impl Status { +impl SessionManagerStatus { pub(crate) fn query_start(&mut self, now: SystemTime) { self.running_queries_count += 1; self.last_query_started_at = Some(now) @@ -36,9 +36,9 @@ impl Status { } } -impl Default for Status { +impl Default for SessionManagerStatus { fn default() -> Self { - Status { + SessionManagerStatus { running_queries_count: 0, last_query_started_at: None, last_query_finished_at: None, diff --git a/query/src/sessions/session_status.rs b/query/src/sessions/session_status.rs new file mode 100644 index 000000000000..67a4582b1dcb --- /dev/null +++ b/query/src/sessions/session_status.rs @@ -0,0 +1,40 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Instant; + +pub struct SessionStatus { + pub session_started_at: Instant, + pub last_query_finished_at: Option, +} + +impl SessionStatus { + pub(crate) fn query_finish(&mut self) { + self.last_query_finished_at = Some(Instant::now()) + } + + pub(crate) fn last_access(&self) -> Instant { + self.last_query_finished_at + .unwrap_or(self.session_started_at) + } +} + +impl Default for SessionStatus { + fn default() -> Self { + SessionStatus { + session_started_at: Instant::now(), + last_query_finished_at: None, + } + } +} diff --git a/query/tests/it/servers/http/http_query_handlers.rs b/query/tests/it/servers/http/http_query_handlers.rs index 772bfc01ba2d..35c7e2ef3884 100644 --- a/query/tests/it/servers/http/http_query_handlers.rs +++ b/query/tests/it/servers/http/http_query_handlers.rs @@ -31,6 +31,7 @@ use databend_query::servers::http::v1::middleware::HTTPSessionEndpoint; use databend_query::servers::http::v1::middleware::HTTPSessionMiddleware; use databend_query::servers::http::v1::query_route; use databend_query::servers::http::v1::ExecuteStateName; +use databend_query::servers::http::v1::HttpSession; use databend_query::servers::http::v1::QueryResponse; use databend_query::servers::HttpHandler; use headers::Header; @@ -158,6 +159,56 @@ async fn test_async() -> Result<()> { Ok(()) } +#[test] +fn test_http_session_serde() { + { + let json = r#"{"id": "abc"}"#; + assert_eq!( + serde_json::from_str::(json).unwrap(), + HttpSession::Old { + id: "abc".to_string() + } + ); + } + + { + let json = r#"{}"#; + assert_eq!( + serde_json::from_str::(json).unwrap(), + HttpSession::New(Default::default()) + ); + } + + { + let json = r#"{"unexpected": ""}"#; + assert!(serde_json::from_str::(json).is_err()); + } +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_http_session() -> Result<()> { + let ep = create_endpoint(); + let json = serde_json::json!({"sql": "use system", "session": {"max_idle_time": 10}}); + + let (status, result) = post_json_to_endpoint(&ep, &json).await?; + assert_eq!(status, StatusCode::OK); + assert!(result.error.is_none(), "{:?}", result); + assert_eq!(result.data.len(), 0); + assert_eq!(result.next_uri, None, "{:?}", result); + assert!(result.stats.scan_progress.is_some()); + assert!(result.schema.is_some()); + assert_eq!(result.state, ExecuteStateName::Succeeded); + let session_id = &result.session_id.unwrap(); + + let json = serde_json::json!({"sql": "select database()", "session": {"id": session_id}}); + let (status, result) = post_json_to_endpoint(&ep, &json).await?; + assert!(result.error.is_none(), "{:?}", result); + assert_eq!(status, StatusCode::OK); + assert_eq!(result.data.len(), 1, "{:?}", result); + assert_eq!(result.data[0][0], "system",); + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_result_timeout() -> Result<()> { let session_manager = SessionManagerBuilder::create() From 549e392feabd1f20f7e1dbf25331268d60ae2934 Mon Sep 17 00:00:00 2001 From: youngsofun Date: Tue, 22 Mar 2022 16:11:45 +0800 Subject: [PATCH 2/4] refine Expirable. --- query/src/servers/http/v1/query/expirable.rs | 6 ++++-- query/src/servers/http/v1/query/expiring_map.rs | 8 +++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/query/src/servers/http/v1/query/expirable.rs b/query/src/servers/http/v1/query/expirable.rs index 28a4117d80a7..dc4da1f3aec4 100644 --- a/query/src/servers/http/v1/query/expirable.rs +++ b/query/src/servers/http/v1/query/expirable.rs @@ -1,3 +1,4 @@ +use std::time::Duration; use std::time::Instant; use crate::sessions::SessionRef; @@ -5,7 +6,8 @@ use crate::sessions::SessionRef; #[derive(PartialEq)] pub enum ExpiringState { InUse, - Idle { since: Instant }, + // return Duration, so user can choose to use Systime or Instance + Idle { idle_time: Duration }, Aborted { need_cleanup: bool }, } @@ -26,7 +28,7 @@ impl Expirable for SessionRef { let status = self.get_status(); let status = status.read(); ExpiringState::Idle { - since: status.last_access(), + idle_time: Instant::now() - status.last_access(), } } } diff --git a/query/src/servers/http/v1/query/expiring_map.rs b/query/src/servers/http/v1/query/expiring_map.rs index 3fc80183e9eb..7e29b5e4e1dd 100644 --- a/query/src/servers/http/v1/query/expiring_map.rs +++ b/query/src/servers/http/v1/query/expiring_map.rs @@ -17,7 +17,6 @@ use std::collections::HashMap; use std::hash::Hash; use std::sync::Arc; use std::time::Duration; -use std::time::Instant; use common_base::tokio::task; use common_base::tokio::time::sleep; @@ -52,12 +51,11 @@ async fn run_check(e: &T, max_idle: Duration) -> bool { loop { match e.expire_state() { ExpiringState::InUse => sleep(max_idle).await, - ExpiringState::Idle { since } => { - let now = Instant::now(); - if now - since > max_idle { + ExpiringState::Idle { idle_time } => { + if idle_time > max_idle { return true; } else { - sleep(max_idle - (now - since)).await; + sleep(max_idle - idle_time).await; continue; } } From d4a7b08ef772e6cde191f5d9a05fe657a601e4aa Mon Sep 17 00:00:00 2001 From: youngsofun Date: Tue, 22 Mar 2022 16:39:04 +0800 Subject: [PATCH 3/4] refine ExpiringMap. --- .../src/servers/http/v1/query/expiring_map.rs | 91 +++++++------------ 1 file changed, 34 insertions(+), 57 deletions(-) diff --git a/query/src/servers/http/v1/query/expiring_map.rs b/query/src/servers/http/v1/query/expiring_map.rs index 7e29b5e4e1dd..4005ed3651ff 100644 --- a/query/src/servers/http/v1/query/expiring_map.rs +++ b/query/src/servers/http/v1/query/expiring_map.rs @@ -34,6 +34,17 @@ where V: Expirable pub value: V, } +impl MaybeExpiring +where V: Expirable +{ + pub fn on_expire(&mut self) { + if let Some(t) = self.task.take() { + t.abort() + } + self.value.on_expire(); + } +} + // on insert:start task // 1. check V for expire // 2. call remove if expire @@ -44,7 +55,7 @@ where V: Expirable pub struct ExpiringMap where V: Expirable { - inner: Arc>>, + map: Arc>>>, } async fn run_check(e: &T, max_idle: Duration) -> bool { @@ -69,55 +80,9 @@ where V: Expirable { fn default() -> Self { Self { - inner: Arc::new(RwLock::new(Inner::default())), - } - } -} - -struct Inner -where V: Expirable -{ - map: HashMap>, -} - -impl Default for Inner -where V: Expirable -{ - fn default() -> Self { - Self { - map: HashMap::new(), - } - } -} - -impl Inner -where - K: Hash + Eq, - V: Expirable, -{ - pub fn remove(&mut self, k: &Q) - where - K: Borrow, - Q: Hash + Eq, - { - if let Some(mut checker) = self.map.remove(k) { - if let Some(t) = checker.task.take() { - t.abort() - } - checker.value.on_expire(); + map: Arc::new(RwLock::new(HashMap::default())), } } - pub fn insert(&mut self, k: K, v: MaybeExpiring) { - self.map.insert(k, v); - } - - pub fn get(&self, k: &Q) -> Option<&V> - where - K: Borrow, - Q: Hash + Eq, - { - self.map.get(k).map(|i| &i.value) - } } impl ExpiringMap @@ -130,16 +95,15 @@ where K: Clone + Send + Sync + 'static, V: Send + Sync + 'static, { - let mut inner = self.inner.write(); + let mut map = self.map.write(); let task = match max_idle_time { Some(d) => { - let inner = self.inner.clone(); + let map_clone = self.map.clone(); let v_clone = v.clone(); let k_clone = k.clone(); let task = task::spawn(async move { if run_check(&v_clone, d).await { - let mut inner = inner.write(); - inner.remove(&k_clone); + Self::remove_inner(&map_clone, &k_clone); } }); Some(task) @@ -147,7 +111,7 @@ where None => None, }; let i = MaybeExpiring { task, value: v }; - inner.insert(k, i); + map.insert(k, i); } pub fn get(&self, k: &Q) -> Option @@ -155,8 +119,8 @@ where K: Borrow, Q: Hash + Eq, { - let inner = self.inner.read(); - inner.get(k).cloned() + let map = self.map.read(); + map.get(k).map(|i| &i.value).cloned() } pub fn remove(&mut self, k: &Q) @@ -164,7 +128,20 @@ where K: Borrow, Q: Hash + Eq, { - let mut map = self.inner.write(); - map.remove(k) + Self::remove_inner(&self.map, k) + } + + fn remove_inner(map: &Arc>>>, k: &Q) + where + K: Borrow, + Q: Hash + Eq, + { + let checker = { + let mut map = map.write(); + map.remove(k) + }; + if let Some(mut checker) = checker { + checker.on_expire() + } } } From edd3fa11e6616cb4e210f56f29f9650d04858b74 Mon Sep 17 00:00:00 2001 From: youngsofun Date: Tue, 22 Mar 2022 17:07:19 +0800 Subject: [PATCH 4/4] fix license. --- query/src/servers/http/v1/query/expirable.rs | 14 ++++++++++++++ query/src/servers/http/v1/query/expiring_map.rs | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/query/src/servers/http/v1/query/expirable.rs b/query/src/servers/http/v1/query/expirable.rs index dc4da1f3aec4..244ef98fe7c7 100644 --- a/query/src/servers/http/v1/query/expirable.rs +++ b/query/src/servers/http/v1/query/expirable.rs @@ -1,3 +1,17 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use std::time::Duration; use std::time::Instant; diff --git a/query/src/servers/http/v1/query/expiring_map.rs b/query/src/servers/http/v1/query/expiring_map.rs index 4005ed3651ff..33c63addf6cd 100644 --- a/query/src/servers/http/v1/query/expiring_map.rs +++ b/query/src/servers/http/v1/query/expiring_map.rs @@ -1,4 +1,4 @@ -// Copyright 2021 Datafuse Labs. +// Copyright 2022 Datafuse Labs. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.