Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(insert): server panic when exceeds max active sessions #5928

Merged
merged 2 commits into from
Jun 13, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 69 additions & 35 deletions query/src/interpreters/async_insert_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,20 @@ impl InsertKey {
#[derive(Clone)]
pub struct Entry {
block: DataBlock,
finished: Arc<RwLock<bool>>,
notify: Arc<Notify>,
finished: Arc<RwLock<bool>>,
timeout: Arc<RwLock<bool>>,
error: Arc<RwLock<ErrorCode>>,
}

impl Entry {
pub fn try_create(block: DataBlock) -> Self {
Self {
block,
finished: Arc::new(RwLock::new(false)),
notify: Arc::new(Notify::new()),
finished: Arc::new(RwLock::new(false)),
timeout: Arc::new(RwLock::new(false)),
error: Arc::new(RwLock::new(ErrorCode::Ok(""))),
}
}

Expand All @@ -120,21 +124,39 @@ impl Entry {
self.notify.notify_one();
}

pub fn finish_with_err(&self, err: ErrorCode) {
let mut error = self.error.write();
*error = err;
self.notify.notify_one();
}

pub fn finish_with_timeout(&self) {
let mut timeout = self.timeout.write();
*timeout = true;
self.notify.notify_one();
}

pub async fn wait(&self) -> bool {
pub async fn wait(&self) -> Result<()> {
if *self.finished.read() {
return true;
return Ok(());
}
self.notify.clone().notified().await;
self.is_finished()
match self.is_finished() {
true => Ok(()),
false => match self.is_timeout() {
true => Err(ErrorCode::AsyncInsertTimeoutError("Async insert timeout.")),
false => Err((*self.error.read()).clone()),
},
}
}

pub fn is_finished(&self) -> bool {
return *self.finished.read();
}

pub fn is_timeout(&self) -> bool {
return *self.timeout.read();
}
}

#[derive(Clone)]
Expand Down Expand Up @@ -234,10 +256,9 @@ impl AsyncInsertQueue {
common_planners::InsertInputSource::SelectPlan(plan) => {
let select_interpreter = SelectInterpreter::try_create(ctx.clone(), SelectPlan {
input: Arc::new((**plan).clone()),
})
.unwrap();
})?;

let mut pipeline = select_interpreter.create_new_pipeline().unwrap();
let mut pipeline = select_interpreter.create_new_pipeline()?;

let mut sink_pipeline_builder = SinkPipeBuilder::create();
for _ in 0..pipeline.output_len() {
Expand All @@ -250,7 +271,7 @@ impl AsyncInsertQueue {
pipeline.add_pipe(sink_pipeline_builder.finalize());
let executor =
PipelineCompleteExecutor::try_create(self.runtime.clone(), pipeline).unwrap();
executor.execute().unwrap();
executor.execute()?;
drop(executor);
let blocks = ctx.consume_precommit_blocks();
DataBlock::concat_blocks(&blocks)?
Expand Down Expand Up @@ -298,75 +319,88 @@ impl AsyncInsertQueue {
}
}
}

let mut current_processing_insert = self.current_processing_insert.write();
current_processing_insert.insert(ctx.get_id(), entry);
Ok(())
}

pub fn get_entry(&self, query_id: String) -> EntryPtr {
pub fn get_entry(&self, query_id: &str) -> Result<EntryPtr> {
let current_processing_insert = self.current_processing_insert.read();
return current_processing_insert.get(&query_id).unwrap().clone();
Ok(current_processing_insert.get(query_id).unwrap().clone())
}

pub fn delete_entry(&self, query_id: &str) -> Result<()> {
let mut current_processing_insert = self.current_processing_insert.write();
current_processing_insert.remove(query_id);
Ok(())
}

pub async fn wait_for_processing_insert(
self: Arc<Self>,
query_id: String,
time_out: Duration,
) -> Result<()> {
let entry = self.get_entry(query_id);
let entry = self.get_entry(&query_id)?;
let e = entry.clone();
self.runtime.as_ref().inner().spawn(async move {
let mut intv = interval(time_out);
intv.tick().await;
intv.tick().await;
e.finish_with_timeout();
});
let finished = entry.wait().await;
match finished {
true => Ok(()),
false => Err(ErrorCode::AsyncInsertTimeoutError("Async insert timeout.")),
match entry.wait().await {
Ok(_) => {
self.delete_entry(&query_id)?;
Ok(())
}
Err(err) => {
self.delete_entry(&query_id)?;
Err(err)
}
}
}

fn schedule(self: Arc<Self>, key: InsertKey, data: InsertData) {
self.runtime.as_ref().inner().spawn(async {
self.process(key, data).await;
match self.process(key, data.clone()).await {
Ok(_) => {
for entry in data.entries.into_iter() {
entry.finish();
}
}
Err(err) => {
for entry in data.entries.into_iter() {
entry.finish_with_err(err.clone());
}
}
}
});
}

async fn process(self: Arc<Self>, key: InsertKey, data: InsertData) {
async fn process(self: Arc<Self>, key: InsertKey, data: InsertData) -> Result<()> {
let insert_plan = key.plan;

let session_mgr = self.session_mgr.read().clone().unwrap();
let session = session_mgr.create_session(SessionType::HTTPQuery).await;
let ctx = session.unwrap().create_query_context().await.unwrap();

ctx.apply_changed_settings(key.changed_settings.clone())
.unwrap();
let ctx = session.unwrap().create_query_context().await?;
ctx.apply_changed_settings(key.changed_settings.clone())?;

let interpreter =
InsertInterpreter::try_create(ctx.clone(), insert_plan.as_ref().clone(), true).unwrap();
InsertInterpreter::try_create(ctx.clone(), insert_plan.as_ref().clone(), true)?;

let output_port = OutputPort::create();
let blocks = Arc::new(Mutex::new(VecDeque::from_iter(
data.entries.iter().map(|x| x.block.clone()),
)));
let source = BlocksSource::create(ctx, output_port.clone(), blocks);

let source = BlocksSource::create(ctx, output_port.clone(), blocks)?;
let mut builder = SourcePipeBuilder::create();

builder.add_source(output_port.clone(), source.unwrap());
builder.add_source(output_port.clone(), source);

interpreter
.as_ref()
.set_source_pipe_builder(Some(builder))
.unwrap();
interpreter.execute(None).await.unwrap();

for entry in data.entries.into_iter() {
entry.finish();
}
.set_source_pipe_builder(Some(builder))?;
interpreter.execute(None).await?;
Ok(())
}

fn busy_check(self: Arc<Self>) -> Duration {
Expand Down