diff --git a/query/src/interpreters/async_insert_queue.rs b/query/src/interpreters/async_insert_queue.rs index af77565006e10..b1e8b711eebe4 100644 --- a/query/src/interpreters/async_insert_queue.rs +++ b/query/src/interpreters/async_insert_queue.rs @@ -101,16 +101,20 @@ impl InsertKey { #[derive(Clone)] pub struct Entry { block: DataBlock, - finished: Arc>, notify: Arc, + finished: Arc>, + timeout: Arc>, + error: Arc>, } impl Entry { pub fn try_create(block: DataBlock) -> Self { Self { block, - finished: Arc::new(RwLock::new(false)), notify: Arc::new(Notify::new()), + finished: Arc::new(RwLock::new(false)), + timeout: Arc::new(RwLock::new(false)), + error: Arc::new(RwLock::new(ErrorCode::Ok(""))), } } @@ -120,21 +124,39 @@ impl Entry { self.notify.notify_one(); } + pub fn finish_with_err(&self, err: ErrorCode) { + let mut error = self.error.write(); + *error = err; + self.notify.notify_one(); + } + pub fn finish_with_timeout(&self) { + let mut timeout = self.timeout.write(); + *timeout = true; self.notify.notify_one(); } - pub async fn wait(&self) -> bool { + pub async fn wait(&self) -> Result<()> { if *self.finished.read() { - return true; + return Ok(()); } self.notify.clone().notified().await; - self.is_finished() + match self.is_finished() { + true => Ok(()), + false => match self.is_timeout() { + true => Err(ErrorCode::AsyncInsertTimeoutError("Async insert timeout.")), + false => Err((*self.error.read()).clone()), + }, + } } pub fn is_finished(&self) -> bool { return *self.finished.read(); } + + pub fn is_timeout(&self) -> bool { + return *self.timeout.read(); + } } #[derive(Clone)] @@ -234,10 +256,9 @@ impl AsyncInsertQueue { common_planners::InsertInputSource::SelectPlan(plan) => { let select_interpreter = SelectInterpreter::try_create(ctx.clone(), SelectPlan { input: Arc::new((**plan).clone()), - }) - .unwrap(); + })?; - let mut pipeline = select_interpreter.create_new_pipeline().unwrap(); + let mut pipeline = select_interpreter.create_new_pipeline()?; let mut sink_pipeline_builder = SinkPipeBuilder::create(); for _ in 0..pipeline.output_len() { @@ -250,7 +271,7 @@ impl AsyncInsertQueue { pipeline.add_pipe(sink_pipeline_builder.finalize()); let executor = PipelineCompleteExecutor::try_create(self.runtime.clone(), pipeline).unwrap(); - executor.execute().unwrap(); + executor.execute()?; drop(executor); let blocks = ctx.consume_precommit_blocks(); DataBlock::concat_blocks(&blocks)? @@ -298,15 +319,20 @@ impl AsyncInsertQueue { } } } - let mut current_processing_insert = self.current_processing_insert.write(); current_processing_insert.insert(ctx.get_id(), entry); Ok(()) } - pub fn get_entry(&self, query_id: String) -> EntryPtr { + pub fn get_entry(&self, query_id: &str) -> Result { let current_processing_insert = self.current_processing_insert.read(); - return current_processing_insert.get(&query_id).unwrap().clone(); + Ok(current_processing_insert.get(query_id).unwrap().clone()) + } + + pub fn delete_entry(&self, query_id: &str) -> Result<()> { + let mut current_processing_insert = self.current_processing_insert.write(); + current_processing_insert.remove(query_id); + Ok(()) } pub async fn wait_for_processing_insert( @@ -314,7 +340,7 @@ impl AsyncInsertQueue { query_id: String, time_out: Duration, ) -> Result<()> { - let entry = self.get_entry(query_id); + let entry = self.get_entry(&query_id)?; let e = entry.clone(); self.runtime.as_ref().inner().spawn(async move { let mut intv = interval(time_out); @@ -322,51 +348,59 @@ impl AsyncInsertQueue { intv.tick().await; e.finish_with_timeout(); }); - let finished = entry.wait().await; - match finished { - true => Ok(()), - false => Err(ErrorCode::AsyncInsertTimeoutError("Async insert timeout.")), + match entry.wait().await { + Ok(_) => { + self.delete_entry(&query_id)?; + Ok(()) + } + Err(err) => { + self.delete_entry(&query_id)?; + Err(err) + } } } fn schedule(self: Arc, key: InsertKey, data: InsertData) { self.runtime.as_ref().inner().spawn(async { - self.process(key, data).await; + match self.process(key, data.clone()).await { + Ok(_) => { + for entry in data.entries.into_iter() { + entry.finish(); + } + } + Err(err) => { + for entry in data.entries.into_iter() { + entry.finish_with_err(err.clone()); + } + } + } }); } - async fn process(self: Arc, key: InsertKey, data: InsertData) { + async fn process(self: Arc, key: InsertKey, data: InsertData) -> Result<()> { let insert_plan = key.plan; let session_mgr = self.session_mgr.read().clone().unwrap(); let session = session_mgr.create_session(SessionType::HTTPQuery).await; - let ctx = session.unwrap().create_query_context().await.unwrap(); - - ctx.apply_changed_settings(key.changed_settings.clone()) - .unwrap(); + let ctx = session.unwrap().create_query_context().await?; + ctx.apply_changed_settings(key.changed_settings.clone())?; let interpreter = - InsertInterpreter::try_create(ctx.clone(), insert_plan.as_ref().clone(), true).unwrap(); + InsertInterpreter::try_create(ctx.clone(), insert_plan.as_ref().clone(), true)?; let output_port = OutputPort::create(); let blocks = Arc::new(Mutex::new(VecDeque::from_iter( data.entries.iter().map(|x| x.block.clone()), ))); - let source = BlocksSource::create(ctx, output_port.clone(), blocks); - + let source = BlocksSource::create(ctx, output_port.clone(), blocks)?; let mut builder = SourcePipeBuilder::create(); - - builder.add_source(output_port.clone(), source.unwrap()); + builder.add_source(output_port.clone(), source); interpreter .as_ref() - .set_source_pipe_builder(Some(builder)) - .unwrap(); - interpreter.execute(None).await.unwrap(); - - for entry in data.entries.into_iter() { - entry.finish(); - } + .set_source_pipe_builder(Some(builder))?; + interpreter.execute(None).await?; + Ok(()) } fn busy_check(self: Arc) -> Duration {