Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SESSIONS-3933: Refine the file and field name #4015

Merged
merged 2 commits into from
Jan 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion query/benches/suites/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub mod bench_sort_query_sql;
pub async fn select_executor(sql: &str) -> Result<()> {
let sessions = SessionManager::from_conf(Config::default()).await?;
let executor_session = sessions.create_session("Benches")?;
let ctx = executor_session.create_context().await?;
let ctx = executor_session.create_query_context().await?;

if let PlanNode::Select(plan) = PlanParser::parse(sql, ctx.clone()).await? {
let executor = SelectInterpreter::try_create(ctx, plan)?;
Expand Down
2 changes: 1 addition & 1 deletion query/src/api/http/v1/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ pub async fn cluster_list_handler(

async fn list_nodes(sessions: &Arc<SessionManager>) -> Result<Vec<Arc<NodeInfo>>> {
let watch_cluster_session = sessions.create_session("WatchCluster")?;
let watch_cluster_context = watch_cluster_session.create_context().await?;
let watch_cluster_context = watch_cluster_session.create_query_context().await?;
Ok(watch_cluster_context.get_cluster().get_nodes())
}
2 changes: 1 addition & 1 deletion query/src/api/http/v1/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub async fn logs_handler(

async fn select_table(sessions: &Arc<SessionManager>) -> Result<Body> {
let session = sessions.create_session("WatchLogs")?;
let query_context = session.create_context().await?;
let query_context = session.create_query_context().await?;
let mut tracing_table_stream = execute_query(query_context).await?;

let stream = async_stream::try_stream! {
Expand Down
4 changes: 2 additions & 2 deletions query/src/api/rpc/flight_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl DatabendQueryFlightDispatcher {

#[tracing::instrument(level = "debug", skip_all, fields(session.id = session.get_id().as_str()))]
async fn one_sink_action(&self, session: SessionRef, action: &FlightAction) -> Result<()> {
let query_context = session.create_context().await?;
let query_context = session.create_query_context().await?;
let action_context = QueryContext::create_from(query_context.clone());
let pipeline_builder = PipelineBuilder::create(action_context.clone());

Expand Down Expand Up @@ -184,7 +184,7 @@ impl DatabendQueryFlightDispatcher {
where
T: FlightScatter + Send + 'static,
{
let query_context = session.create_context().await?;
let query_context = session.create_query_context().await?;
let action_context = QueryContext::create_from(query_context.clone());
let pipeline_builder = PipelineBuilder::create(action_context.clone());

Expand Down
2 changes: 1 addition & 1 deletion query/src/servers/clickhouse/interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl ClickHouseSession for InteractiveWorker {
};
let user_info_auth = self
.session
.get_sessions_manager()
.get_session_manager()
.get_auth_manager()
.auth(&credential)
.await;
Expand Down
2 changes: 1 addition & 1 deletion query/src/servers/clickhouse/interactive_worker_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl InteractiveWorkerBase {
let query = &ch_ctx.state.query;
tracing::debug!("{}", query);

let ctx = session.create_context().await?;
let ctx = session.create_query_context().await?;
ctx.attach_query_str(query);

let plan = PlanParser::parse(query, ctx.clone()).await?;
Expand Down
2 changes: 1 addition & 1 deletion query/src/servers/http/v1/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub async fn streaming_load(
session.set_current_user(user_info.0.clone());

let context = session
.create_context()
.create_query_context()
.await
.map_err(InternalServerError)?;
let insert_sql = req
Expand Down
2 changes: 1 addition & 1 deletion query/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl ExecuteState {
) -> Result<(ExecutorRef, DataSchemaRef)> {
let sql = &request.sql;
let session = session_manager.create_session("http-statement")?;
let context = session.create_context().await?;
let context = session.create_query_context().await?;
if let Some(db) = &request.session.database {
context.set_current_database(db.clone()).await?;
};
Expand Down
4 changes: 2 additions & 2 deletions query/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl<W: std::io::Write> InteractiveWorkerBase<W> {
let user_manager = self.session.get_user_manager();
let client_ip = info.user_client_address.split(':').collect::<Vec<_>>()[0];

let ctx = self.session.create_context().await?;
let ctx = self.session.create_query_context().await?;
let user_info = user_manager
.get_user_with_client_ip(&ctx.get_tenant(), user_name, client_ip)
.await?;
Expand Down Expand Up @@ -272,7 +272,7 @@ impl<W: std::io::Write> InteractiveWorkerBase<W> {
if self.federated_server_setup_set_or_jdbc_command(query) {
Ok((vec![DataBlock::empty()], String::from("")))
} else {
let context = self.session.create_context().await?;
let context = self.session.create_query_context().await?;
context.attach_query_str(query);
let (plan, hints) = PlanParser::parse_with_hint(query, context.clone()).await;

Expand Down
19 changes: 9 additions & 10 deletions query/src/sessions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,22 @@
#[macro_use]
mod macros;

mod context;
mod context_shared;
mod metrics;
mod query_ctx;
mod query_ctx_shared;
mod session;
mod session_ctx;
mod session_info;
mod session_ref;
mod session_status;
#[allow(clippy::module_inception)]
mod sessions;
mod sessions_info;
mod session_mgr;
mod session_ref;
mod settings;

pub use context::QueryContext;
pub use context_shared::QueryContextShared;
pub use query_ctx::QueryContext;
pub use query_ctx_shared::QueryContextShared;
pub use session::Session;
pub use session_ctx::SessionContext;
pub use session_info::ProcessInfo;
pub use session_mgr::SessionManager;
pub use session_ref::SessionRef;
pub use session_status::MutableStatus;
pub use sessions::SessionManager;
pub use settings::Settings;
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,7 @@ impl QueryContext {

// Get user manager api.
pub fn get_user_manager(self: &Arc<Self>) -> Arc<UserApiProvider> {
self.shared
.session
.get_sessions_manager()
.get_user_manager()
self.shared.session.get_session_manager().get_user_manager()
}

// Get the current session.
Expand All @@ -268,13 +265,13 @@ impl QueryContext {
pub fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
self.shared
.session
.get_sessions_manager()
.get_session_manager()
.get_session_by_id(id)
}

// Get all the processes list info.
pub fn get_processes_info(self: &Arc<Self>) -> Vec<ProcessInfo> {
self.shared.session.get_sessions_manager().processes_info()
self.shared.session.get_session_manager().processes_info()
}

/// Get the data accessor metrics.
Expand All @@ -289,12 +286,12 @@ impl QueryContext {

/// Get the client socket address.
pub fn get_client_address(&self) -> Option<SocketAddr> {
self.shared.session.mutable_state.get_client_host()
self.shared.session.session_ctx.get_client_host()
}

/// Get the storage cache manager
pub fn get_storage_cache_manager(&self) -> &CacheManager {
self.shared.session.sessions.get_storage_cache_manager()
self.shared.session.session_mgr.get_storage_cache_manager()
}

// Get the storage data accessor by config.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,6 @@ impl QueryContextShared {

impl Session {
pub(in crate::sessions) fn destroy_context_shared(&self) {
self.mutable_state.take_context_shared();
self.session_ctx.take_context_shared();
}
}
68 changes: 34 additions & 34 deletions query/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use futures::channel::*;

use crate::catalogs::DatabaseCatalog;
use crate::configs::Config;
use crate::sessions::context_shared::QueryContextShared;
use crate::sessions::MutableStatus;
use crate::sessions::QueryContext;
use crate::sessions::QueryContextShared;
use crate::sessions::SessionContext;
use crate::sessions::SessionManager;
use crate::sessions::Settings;
use crate::users::UserApiProvider;
Expand All @@ -41,25 +41,25 @@ pub struct Session {
#[ignore_malloc_size_of = "insignificant"]
pub(in crate::sessions) config: Config,
#[ignore_malloc_size_of = "insignificant"]
pub(in crate::sessions) sessions: Arc<SessionManager>,
pub(in crate::sessions) session_mgr: Arc<SessionManager>,
pub(in crate::sessions) ref_count: Arc<AtomicUsize>,
pub(in crate::sessions) mutable_state: Arc<MutableStatus>,
pub(in crate::sessions) session_ctx: Arc<SessionContext>,
}

impl Session {
pub fn try_create(
config: Config,
id: String,
typ: String,
sessions: Arc<SessionManager>,
session_mgr: Arc<SessionManager>,
) -> Result<Arc<Session>> {
Ok(Arc::new(Session {
id,
typ,
config,
sessions,
session_mgr,
ref_count: Arc::new(AtomicUsize::new(0)),
mutable_state: Arc::new(MutableStatus::try_create()?),
session_ctx: Arc::new(SessionContext::try_create()?),
}))
}

Expand All @@ -72,14 +72,14 @@ impl Session {
}

pub fn is_aborting(self: &Arc<Self>) -> bool {
self.mutable_state.get_abort()
self.session_ctx.get_abort()
}

pub fn kill(self: &Arc<Self>) {
let mutable_state = self.mutable_state.clone();
mutable_state.set_abort(true);
if mutable_state.context_shared_is_none() {
if let Some(io_shutdown) = mutable_state.take_io_shutdown_tx() {
let session_ctx = self.session_ctx.clone();
session_ctx.set_abort(true);
if session_ctx.context_shared_is_none() {
if let Some(io_shutdown) = session_ctx.take_io_shutdown_tx() {
let (tx, rx) = oneshot::channel();
if io_shutdown.send(tx).is_ok() {
// We ignore this error because the receiver is return cancelled error.
Expand All @@ -95,34 +95,34 @@ impl Session {
}

pub fn force_kill_query(self: &Arc<Self>) {
let mutable_state = self.mutable_state.clone();
let session_ctx = self.session_ctx.clone();

if let Some(context_shared) = mutable_state.take_context_shared() {
if let Some(context_shared) = session_ctx.take_context_shared() {
context_shared.kill(/* shutdown executing query */);
}
}

/// Create a query context for query.
/// For a query, execution environment(e.g cluster) should be immutable.
/// We can bind the environment to the context in create_context method.
pub async fn create_context(self: &Arc<Self>) -> Result<Arc<QueryContext>> {
let context_shared = self.mutable_state.get_context_shared();
pub async fn create_query_context(self: &Arc<Self>) -> Result<Arc<QueryContext>> {
let query_ctx = self.session_ctx.get_context_shared();

Ok(match context_shared.as_ref() {
Ok(match query_ctx.as_ref() {
Some(shared) => QueryContext::create_from_shared(shared.clone()),
None => {
let config = self.config.clone();
let discovery = self.sessions.get_cluster_discovery();
let discovery = self.session_mgr.get_cluster_discovery();

let session = self.clone();
let cluster = discovery.discover().await?;
let shared = QueryContextShared::try_create(config, session, cluster)?;

let ctx_shared = self.mutable_state.get_context_shared();
match ctx_shared.as_ref() {
let query_ctx = self.session_ctx.get_context_shared();
match query_ctx.as_ref() {
Some(shared) => QueryContext::create_from_shared(shared.clone()),
None => {
self.mutable_state.set_context_shared(Some(shared.clone()));
self.session_ctx.set_context_shared(Some(shared.clone()));
QueryContext::create_from_shared(shared)
}
}
Expand All @@ -133,8 +133,8 @@ impl Session {
pub fn attach<F>(self: &Arc<Self>, host: Option<SocketAddr>, io_shutdown: F)
where F: FnOnce() + Send + 'static {
let (tx, rx) = futures::channel::oneshot::channel();
self.mutable_state.set_client_host(host);
self.mutable_state.set_io_shutdown_tx(Some(tx));
self.session_ctx.set_client_host(host);
self.session_ctx.set_io_shutdown_tx(Some(tx));

common_base::tokio::spawn(async move {
if let Ok(tx) = rx.await {
Expand All @@ -145,29 +145,29 @@ impl Session {
}

pub fn set_current_database(self: &Arc<Self>, database_name: String) {
self.mutable_state.set_current_database(database_name);
self.session_ctx.set_current_database(database_name);
}

pub fn get_current_database(self: &Arc<Self>) -> String {
self.mutable_state.get_current_database()
self.session_ctx.get_current_database()
}

pub fn get_current_tenant(self: &Arc<Self>) -> String {
self.mutable_state.get_current_tenant()
self.session_ctx.get_current_tenant()
}

pub fn set_current_tenant(self: &Arc<Self>, tenant: String) {
self.mutable_state.set_current_tenant(tenant);
self.session_ctx.set_current_tenant(tenant);
}

pub fn get_current_user(self: &Arc<Self>) -> Result<UserInfo> {
self.mutable_state
self.session_ctx
.get_current_user()
.ok_or_else(|| ErrorCode::AuthenticateFailure("unauthenticated"))
}

pub fn set_current_user(self: &Arc<Self>, user: UserInfo) {
self.mutable_state.set_current_user(user)
self.session_ctx.set_current_user(user)
}

pub fn validate_privilege(
Expand All @@ -193,19 +193,19 @@ impl Session {
}

pub fn get_settings(self: &Arc<Self>) -> Arc<Settings> {
self.mutable_state.get_settings()
self.session_ctx.get_settings()
}

pub fn get_sessions_manager(self: &Arc<Self>) -> Arc<SessionManager> {
self.sessions.clone()
pub fn get_session_manager(self: &Arc<Self>) -> Arc<SessionManager> {
self.session_mgr.clone()
}

pub fn get_catalog(self: &Arc<Self>) -> Arc<DatabaseCatalog> {
self.sessions.get_catalog()
self.session_mgr.get_catalog()
}

pub fn get_user_manager(self: &Arc<Self>) -> Arc<UserApiProvider> {
self.sessions.get_user_manager()
self.session_mgr.get_user_manager()
}

pub fn get_memory_usage(self: &Arc<Self>) -> usize {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ use common_macros::MallocSizeOf;
use common_meta_types::UserInfo;
use futures::channel::oneshot::Sender;

use crate::sessions::context_shared::QueryContextShared;
use crate::sessions::QueryContextShared;
use crate::sessions::Settings;

#[derive(MallocSizeOf)]
pub struct MutableStatus {
pub struct SessionContext {
abort: AtomicBool,
current_database: RwLock<String>,
current_tenant: RwLock<String>,
Expand All @@ -42,9 +42,9 @@ pub struct MutableStatus {
context_shared: RwLock<Option<Arc<QueryContextShared>>>,
}

impl MutableStatus {
impl SessionContext {
pub fn try_create() -> Result<Self> {
Ok(MutableStatus {
Ok(SessionContext {
abort: Default::default(),
current_user: Default::default(),
current_tenant: Default::default(),
Expand Down
Loading