Skip to content

Commit

Permalink
Merge pull request #7064 from ariesdevil/dev
Browse files Browse the repository at this point in the history
refactor(session): Remove `SessionRef`
  • Loading branch information
mergify[bot] authored Aug 23, 2022
2 parents 5d328ba + 541a91a commit d06cc14
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 179 deletions.
6 changes: 3 additions & 3 deletions src/query/service/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_users::JwtAuthenticator;
use common_users::UserApiProvider;
use jwtk::Claims;

use crate::sessions::SessionRef;
use crate::sessions::Session;
pub use crate::Config;

pub struct AuthMgr {
Expand All @@ -49,7 +49,7 @@ impl AuthMgr {
}))
}

pub async fn auth(&self, session: SessionRef, credential: &Credential) -> Result<()> {
pub async fn auth(&self, session: Arc<Session>, credential: &Credential) -> Result<()> {
let user_info = match credential {
Credential::Jwt {
token: t,
Expand Down Expand Up @@ -105,7 +105,7 @@ impl AuthMgr {

async fn process_jwt_claims(
&self,
session: &SessionRef,
session: &Arc<Session>,
claims: &Claims<CustomClaims>,
) -> Result<(String, String)> {
// setup tenant if the JWT claims contain extra.tenant_id
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use crate::pipelines::PipelineBuildResult;
use crate::servers::utils::use_planner_v2;
use crate::sessions::QueryAffect;
use crate::sessions::QueryContext;
use crate::sessions::SessionRef;
use crate::sessions::Session;
use crate::sessions::TableContext;
use crate::sql::ColumnBinding;
use crate::sql::DfParser;
Expand Down Expand Up @@ -105,7 +105,7 @@ impl ExecuteState {

pub struct ExecuteRunning {
// used to kill query
session: SessionRef,
session: Arc<Session>,
// mainly used to get progress for now
ctx: Arc<QueryContext>,
interpreter: Arc<dyn Interpreter>,
Expand Down Expand Up @@ -180,7 +180,7 @@ impl Executor {
impl ExecuteState {
pub(crate) async fn try_create(
request: &HttpQueryRequest,
session: SessionRef,
session: Arc<Session>,
ctx: Arc<QueryContext>,
block_buffer: Arc<BlockBuffer>,
) -> Result<Arc<RwLock<Executor>>> {
Expand Down
5 changes: 3 additions & 2 deletions src/query/service/src/servers/http/v1/query/expirable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use crate::sessions::SessionRef;
use crate::sessions::Session;

#[derive(PartialEq, Eq)]
pub enum ExpiringState {
Expand All @@ -30,7 +31,7 @@ pub trait Expirable {
fn on_expire(&self);
}

impl Expirable for SessionRef {
impl Expirable for Arc<Session> {
fn expire_state(&self) -> ExpiringState {
if self.is_aborting() {
ExpiringState::Aborted {
Expand Down
17 changes: 13 additions & 4 deletions src/query/service/src/servers/http/v1/query/http_query_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,29 +12,38 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use poem::FromRequest;
use poem::Request;
use poem::RequestBody;
use poem::Result as PoemResult;

use crate::sessions::SessionRef;
use crate::sessions::Session;
use crate::sessions::SessionManager;
use crate::sessions::SessionType;

pub struct HttpQueryContext {
session: SessionRef,
session: Arc<Session>,
}

impl HttpQueryContext {
pub fn new(session: SessionRef) -> Self {
pub fn new(session: Arc<Session>) -> Self {
HttpQueryContext { session }
}

pub fn get_session(&self, session_type: SessionType) -> SessionRef {
pub fn get_session(&self, session_type: SessionType) -> Arc<Session> {
self.session.set_type(session_type);
self.session.clone()
}
}

impl Drop for HttpQueryContext {
fn drop(&mut self) {
SessionManager::instance().destroy_session(&self.session.get_id())
}
}

#[async_trait::async_trait]
impl<'a> FromRequest<'a> for &'a HttpQueryContext {
async fn from_request(req: &'a Request, _body: &mut RequestBody) -> PoemResult<Self> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use super::expiring_map::ExpiringMap;
use super::HttpQueryContext;
use crate::servers::http::v1::query::http_query::HttpQuery;
use crate::servers::http::v1::query::HttpQueryRequest;
use crate::sessions::SessionRef;
use crate::sessions::Session;
use crate::Config;

// TODO(youngsofun): may need refactor later for 2 reasons:
Expand All @@ -42,7 +42,7 @@ pub(crate) struct HttpQueryConfig {

pub struct HttpQueryManager {
pub(crate) queries: Arc<RwLock<HashMap<String, Arc<HttpQuery>>>>,
pub(crate) sessions: Mutex<ExpiringMap<String, SessionRef>>,
pub(crate) sessions: Mutex<ExpiringMap<String, Arc<Session>>>,
pub(crate) config: HttpQueryConfig,
}

Expand Down Expand Up @@ -117,14 +117,14 @@ impl HttpQueryManager {
q
}

pub(crate) async fn get_session(self: &Arc<Self>, session_id: &str) -> Option<SessionRef> {
pub(crate) async fn get_session(self: &Arc<Self>, session_id: &str) -> Option<Arc<Session>> {
let sessions = self.sessions.lock();
sessions.get(session_id)
}

pub(crate) async fn add_session(self: &Arc<Self>, session: SessionRef, timeout: Duration) {
pub(crate) async fn add_session(self: &Arc<Self>, session: Arc<Session>, timeout: Duration) {
let mut sessions = self.sessions.lock();
sessions.insert(session.get_id(), session.clone(), Some(timeout));
sessions.insert(session.get_id(), session, Some(timeout));
}

pub(crate) fn kill_session(self: &Arc<Self>, session_id: &str) {
Expand Down
13 changes: 10 additions & 3 deletions src/query/service/src/servers/mysql/mysql_interactive_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ use crate::servers::mysql::MySQLFederated;
use crate::servers::mysql::MYSQL_VERSION;
use crate::servers::utils::use_planner_v2;
use crate::sessions::QueryContext;
use crate::sessions::SessionRef;
use crate::sessions::Session;
use crate::sessions::SessionManager;
use crate::sessions::TableContext;
use crate::sql::plans::Plan;
use crate::sql::DfParser;
Expand Down Expand Up @@ -83,10 +84,16 @@ fn has_result_set_by_plan_node(plan: &PlanNode) -> bool {
}

struct InteractiveWorkerBase<W: AsyncWrite + Send + Unpin> {
session: SessionRef,
session: Arc<Session>,
generic_hold: PhantomData<W>,
}

impl<W: AsyncWrite + Send + Unpin> Drop for InteractiveWorkerBase<W> {
fn drop(&mut self) {
SessionManager::instance().destroy_session(&self.session.get_id())
}
}

pub struct InteractiveWorker<W: AsyncWrite + Send + Unpin> {
base: InteractiveWorkerBase<W>,
version: String,
Expand Down Expand Up @@ -486,7 +493,7 @@ impl<W: AsyncWrite + Send + Unpin> InteractiveWorkerBase<W> {
}

impl<W: AsyncWrite + Send + Unpin> InteractiveWorker<W> {
pub fn create(session: SessionRef, client_addr: String) -> InteractiveWorker<W> {
pub fn create(session: Arc<Session>, client_addr: String) -> InteractiveWorker<W> {
let mut bs = vec![0u8; 20];
let mut rng = rand::thread_rng();
rng.fill_bytes(bs.as_mut());
Expand Down
7 changes: 4 additions & 3 deletions src/query/service/src/servers/mysql/mysql_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::net::Shutdown;
use std::sync::Arc;

use common_base::base::tokio::io::BufWriter;
use common_base::base::tokio::net::TcpStream;
Expand All @@ -27,15 +28,15 @@ use opensrv_mysql::IntermediaryOptions;
use tracing::error;

use crate::servers::mysql::mysql_interactive_worker::InteractiveWorker;
use crate::sessions::SessionRef;
use crate::sessions::Session;

// default size of resultset write buffer: 100KB
const DEFAULT_RESULT_SET_WRITE_BUFFER_SIZE: usize = 100 * 1024;

pub struct MySQLConnection;

impl MySQLConnection {
pub fn run_on_stream(session: SessionRef, stream: TcpStream) -> Result<()> {
pub fn run_on_stream(session: Arc<Session>, stream: TcpStream) -> Result<()> {
let blocking_stream = Self::convert_stream(stream)?;
MySQLConnection::attach_session(&session, &blocking_stream)?;

Expand All @@ -58,7 +59,7 @@ impl MySQLConnection {
Ok(())
}

fn attach_session(session: &SessionRef, blocking_stream: &std::net::TcpStream) -> Result<()> {
fn attach_session(session: &Arc<Session>, blocking_stream: &std::net::TcpStream) -> Result<()> {
let host = blocking_stream.peer_addr().ok();
let blocking_stream_ref = blocking_stream.try_clone()?;
session.attach(host, move || {
Expand Down
2 changes: 0 additions & 2 deletions src/query/service/src/sessions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ mod session_info;
#[allow(clippy::module_inception)]
mod session_mgr;
mod session_mgr_status;
mod session_ref;
mod session_settings;
mod session_status;
mod session_type;
Expand All @@ -36,7 +35,6 @@ pub use session_ctx::SessionContext;
pub use session_info::ProcessInfo;
pub use session_mgr::SessionManager;
pub use session_mgr_status::SessionManagerStatus;
pub use session_ref::SessionRef;
pub use session_settings::Settings;
pub use session_status::SessionStatus;
pub use session_type::SessionType;
8 changes: 4 additions & 4 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ use crate::servers::http::v1::HttpQueryHandle;
use crate::sessions::query_affect::QueryAffect;
use crate::sessions::ProcessInfo;
use crate::sessions::QueryContextShared;
use crate::sessions::Session;
use crate::sessions::SessionManager;
use crate::sessions::SessionRef;
use crate::sessions::Settings;
use crate::sessions::TableContext;
use crate::storages::stage::StageTable;
Expand Down Expand Up @@ -190,12 +190,12 @@ impl QueryContext {
}

// Get the current session.
pub fn get_current_session(self: &Arc<Self>) -> SessionRef {
SessionRef::create(self.shared.session.clone())
pub fn get_current_session(self: &Arc<Self>) -> Arc<Session> {
self.shared.session.clone()
}

// Get one session by session id.
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<Arc<Session>> {
SessionManager::instance().get_session_by_id(id).await
}

Expand Down
4 changes: 0 additions & 4 deletions src/query/service/src/sessions/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use chrono_tz::Tz;
Expand Down Expand Up @@ -41,7 +40,6 @@ use crate::Config;
pub struct Session {
pub(in crate::sessions) id: String,
pub(in crate::sessions) typ: RwLock<SessionType>,
pub(in crate::sessions) ref_count: Arc<AtomicUsize>,
pub(in crate::sessions) session_ctx: Arc<SessionContext>,
status: Arc<RwLock<SessionStatus>>,
pub(in crate::sessions) mysql_connection_id: Option<u32>,
Expand All @@ -54,13 +52,11 @@ impl Session {
session_ctx: Arc<SessionContext>,
mysql_connection_id: Option<u32>,
) -> Result<Arc<Session>> {
let ref_count = Arc::new(AtomicUsize::new(0));
let status = Arc::new(Default::default());
Ok(Arc::new(Session {
id,
typ: RwLock::new(typ),
status,
ref_count,
session_ctx,
mysql_connection_id,
}))
Expand Down
61 changes: 11 additions & 50 deletions src/query/service/src/sessions/session_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use tracing::debug;
use tracing::info;

use crate::sessions::session::Session;
use crate::sessions::session_ref::SessionRef;
use crate::sessions::ProcessInfo;
use crate::sessions::SessionContext;
use crate::sessions::SessionManagerStatus;
Expand Down Expand Up @@ -87,7 +86,7 @@ impl SessionManager {
self.conf.clone()
}

pub async fn create_session(self: &Arc<Self>, typ: SessionType) -> Result<SessionRef> {
pub async fn create_session(self: &Arc<Self>, typ: SessionType) -> Result<Arc<Session>> {
// TODO: maybe deadlock
let config = self.get_conf();
{
Expand Down Expand Up @@ -125,7 +124,7 @@ impl SessionManager {
let user_api = UserApiProvider::instance();
let session_settings = Settings::try_create(&config, user_api, tenant).await?;
let session_ctx = SessionContext::try_create(config.clone(), session_settings)?;
let session = Session::try_create(id, typ, session_ctx, mysql_conn_id)?;
let session = Session::try_create(id, typ.clone(), session_ctx, mysql_conn_id)?;

let mut sessions = self.active_sessions.write();
if sessions.len() < self.max_sessions {
Expand All @@ -135,63 +134,25 @@ impl SessionManager {
&config.query.cluster_id,
);

sessions.insert(session.get_id(), session.clone());
match typ {
SessionType::FlightRPC => {}
_ => {
sessions.insert(session.get_id(), session.clone());
}
}

Ok(SessionRef::create(session))
Ok(session)
} else {
Err(ErrorCode::TooManyUserConnections(
"The current accept connection has exceeded max_active_sessions config",
))
}
}

pub async fn create_rpc_session(
self: &Arc<Self>,
id: String,
aborted: bool,
) -> Result<SessionRef> {
// TODO: maybe deadlock?
let config = self.get_conf();
{
let sessions = self.active_sessions.read();
let v = sessions.get(&id);
if v.is_some() {
return Ok(SessionRef::create(v.unwrap().clone()));
}
}

let tenant = config.query.tenant_id.clone();
let user_api = UserApiProvider::instance();
let session_settings = Settings::try_create(&config, user_api, tenant).await?;
let session_ctx = SessionContext::try_create(config.clone(), session_settings)?;
let session = Session::try_create(id.clone(), SessionType::FlightRPC, session_ctx, None)?;

let mut sessions = self.active_sessions.write();
let v = sessions.get(&id);
if v.is_none() {
if aborted {
return Err(ErrorCode::AbortedSession("Aborting server."));
}

label_counter(
super::metrics::METRIC_SESSION_CONNECT_NUMBERS,
&config.query.tenant_id,
&config.query.cluster_id,
);

sessions.insert(id, session.clone());
Ok(SessionRef::create(session))
} else {
Ok(SessionRef::create(v.unwrap().clone()))
}
}

#[allow(clippy::ptr_arg)]
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<SessionRef> {
pub async fn get_session_by_id(self: &Arc<Self>, id: &str) -> Option<Arc<Session>> {
let sessions = self.active_sessions.read();
sessions
.get(id)
.map(|session| SessionRef::create(session.clone()))
sessions.get(id).cloned()
}

#[allow(clippy::ptr_arg)]
Expand Down
Loading

1 comment on commit d06cc14

@vercel
Copy link

@vercel vercel bot commented on d06cc14 Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.vercel.app
databend-git-main-databend.vercel.app
databend-databend.vercel.app
databend.rs

Please sign in to comment.