Skip to content

Commit

Permalink
Merge pull request #4538 from youngsofun/session
Browse files Browse the repository at this point in the history
support server-side-session for HTTP handler
  • Loading branch information
BohuTANG authored Mar 22, 2022
2 parents 0ee7bef + edd3fa1 commit b67c9f6
Show file tree
Hide file tree
Showing 16 changed files with 411 additions and 28 deletions.
1 change: 1 addition & 0 deletions query/src/interpreters/interpreter_factory_interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ impl Interpreter for InterceptorInterpreter {
async fn finish(&self) -> Result<()> {
let session = self.ctx.get_current_session();
let now = SystemTime::now();
session.get_status().write().query_finish();
if session.get_type().is_user_session() {
session
.get_session_manager()
Expand Down
8 changes: 6 additions & 2 deletions query/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub struct QueryStats {
#[derive(Serialize, Deserialize, Debug)]
pub struct QueryResponse {
pub id: String,
pub session_id: Option<String>,
pub schema: Option<DataSchemaRef>,
pub data: JsonBlockRef,
pub state: ExecuteStateName,
Expand All @@ -99,15 +100,17 @@ impl QueryResponse {
),
None => (Arc::new(vec![]), None),
};
let columns = r.initial_state.as_ref().and_then(|v| v.schema.clone());
let schema = r.initial_state.as_ref().and_then(|v| v.schema.clone());
let session_id = r.initial_state.as_ref().map(|v| v.session_id.clone());
let stats = QueryStats {
scan_progress: r.state.scan_progress.clone(),
running_time_ms: r.state.running_time_ms,
};
QueryResponse {
data,
state: r.state.state,
schema: columns,
schema,
session_id,
stats,
id: id.clone(),
next_uri: next_url,
Expand All @@ -124,6 +127,7 @@ impl QueryResponse {
state: ExecuteStateName::Failed,
data: Arc::new(vec![]),
schema: None,
session_id: None,
next_uri: None,
stats_uri: None,
final_uri: None,
Expand Down
2 changes: 2 additions & 0 deletions query/src/servers/http/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ pub use load::LoadResponse;
pub use query::ExecuteStateName;
pub use query::HttpQueryHandle;
pub use query::HttpQueryManager;
pub use query::HttpSession;
pub use query::HttpSessionConf;
pub use stage::upload_to_stage;
pub use stage::UploadToStageResponse;
pub use statement::statement_handler;
Expand Down
14 changes: 1 addition & 13 deletions query/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use common_datablocks::DataBlock;
use common_datavalues::DataSchemaRef;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_types::UserInfo;
use common_tracing::tracing;
use futures::StreamExt;
use serde::Deserialize;
Expand All @@ -36,9 +35,7 @@ use super::http_query::HttpQueryRequest;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterFactory;
use crate::sessions::QueryContext;
use crate::sessions::SessionManager;
use crate::sessions::SessionRef;
use crate::sessions::SessionType;
use crate::sql::PlanParser;

#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq)]
Expand Down Expand Up @@ -137,22 +134,13 @@ pub(crate) struct ExecuteRunning {
impl ExecuteState {
pub(crate) async fn try_create(
request: &HttpQueryRequest,
session_manager: &Arc<SessionManager>,
user_info: &UserInfo,
session: SessionRef,
block_tx: mpsc::Sender<DataBlock>,
) -> Result<(Arc<RwLock<Executor>>, DataSchemaRef)> {
let sql = &request.sql;
let start_time = Instant::now();
let session = session_manager
.create_session(SessionType::HTTPQuery)
.await?;
let ctx = session.create_query_context().await?;
if let Some(db) = &request.session.database {
ctx.set_current_database(db.clone()).await?;
};
ctx.attach_query_str(sql);
session.set_current_user(user_info.clone());

let plan = PlanParser::parse(ctx.clone(), sql).await?;
let schema = plan.schema();

Expand Down
51 changes: 51 additions & 0 deletions query/src/servers/http/v1/query/expirable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;
use std::time::Instant;

use crate::sessions::SessionRef;

#[derive(PartialEq)]
pub enum ExpiringState {
InUse,
// return Duration, so user can choose to use Systime or Instance
Idle { idle_time: Duration },
Aborted { need_cleanup: bool },
}

pub trait Expirable {
fn expire_state(&self) -> ExpiringState;
fn on_expire(&self);
}

impl Expirable for SessionRef {
fn expire_state(&self) -> ExpiringState {
if self.is_aborting() {
ExpiringState::Aborted {
need_cleanup: false,
}
} else if !self.query_context_shared_is_none() {
ExpiringState::InUse
} else {
let status = self.get_status();
let status = status.read();
ExpiringState::Idle {
idle_time: Instant::now() - status.last_access(),
}
}
}

fn on_expire(&self) {}
}
147 changes: 147 additions & 0 deletions query/src/servers/http/v1/query/expiring_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Borrow;
use std::collections::HashMap;
use std::hash::Hash;
use std::sync::Arc;
use std::time::Duration;

use common_base::tokio::task;
use common_base::tokio::time::sleep;
use common_infallible::RwLock;

use crate::servers::http::v1::query::expirable::Expirable;
use crate::servers::http::v1::query::expirable::ExpiringState;

// todo(youngsofun): use ExpiringMap for HttpQuery

struct MaybeExpiring<V>
where V: Expirable
{
task: Option<task::JoinHandle<()>>,
pub value: V,
}

impl<V> MaybeExpiring<V>
where V: Expirable
{
pub fn on_expire(&mut self) {
if let Some(t) = self.task.take() {
t.abort()
}
self.value.on_expire();
}
}

// on insert:start task
// 1. check V for expire
// 2. call remove if expire
// on remove:
// 1. call on_expire
// 2. Cancel task

pub struct ExpiringMap<K, V>
where V: Expirable
{
map: Arc<RwLock<HashMap<K, MaybeExpiring<V>>>>,
}

async fn run_check<T: Expirable>(e: &T, max_idle: Duration) -> bool {
loop {
match e.expire_state() {
ExpiringState::InUse => sleep(max_idle).await,
ExpiringState::Idle { idle_time } => {
if idle_time > max_idle {
return true;
} else {
sleep(max_idle - idle_time).await;
continue;
}
}
ExpiringState::Aborted { need_cleanup } => return need_cleanup,
}
}
}

impl<K, V> Default for ExpiringMap<K, V>
where V: Expirable
{
fn default() -> Self {
Self {
map: Arc::new(RwLock::new(HashMap::default())),
}
}
}

impl<K, V> ExpiringMap<K, V>
where
K: Hash + Eq,
V: Expirable + Clone,
{
pub fn insert(&mut self, k: K, v: V, max_idle_time: Option<Duration>)
where
K: Clone + Send + Sync + 'static,
V: Send + Sync + 'static,
{
let mut map = self.map.write();
let task = match max_idle_time {
Some(d) => {
let map_clone = self.map.clone();
let v_clone = v.clone();
let k_clone = k.clone();
let task = task::spawn(async move {
if run_check(&v_clone, d).await {
Self::remove_inner(&map_clone, &k_clone);
}
});
Some(task)
}
None => None,
};
let i = MaybeExpiring { task, value: v };
map.insert(k, i);
}

pub fn get<Q: ?Sized>(&self, k: &Q) -> Option<V>
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let map = self.map.read();
map.get(k).map(|i| &i.value).cloned()
}

pub fn remove<Q: ?Sized>(&mut self, k: &Q)
where
K: Borrow<Q>,
Q: Hash + Eq,
{
Self::remove_inner(&self.map, k)
}

fn remove_inner<Q: ?Sized>(map: &Arc<RwLock<HashMap<K, MaybeExpiring<V>>>>, k: &Q)
where
K: Borrow<Q>,
Q: Hash + Eq,
{
let checker = {
let mut map = map.write();
map.remove(k)
};
if let Some(mut checker) = checker {
checker.on_expire()
}
}
}
Loading

0 comments on commit b67c9f6

Please sign in to comment.