diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index d9368fbd949a0..6dcce288bf3b4 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -1859,14 +1859,9 @@ impl SchemaApi for KV { Some(lock) => lock, None => TableCopiedFileLock {}, }; - let mut condition = vec![ - txn_cond_seq(&tbid, Eq, tb_meta_seq), - txn_cond_seq(&lock_key, Eq, lock_key_seq), - ]; + let mut condition = vec![txn_cond_seq(&lock_key, Eq, lock_key_seq)]; let mut if_then = vec![ - // every copied files changed, change tbid seq to make all table child consistent. - txn_op_put(&tbid, serialize_struct(&tb_meta.unwrap())?), /* (tenant, db_id, tb_id) -> tb_meta */ - txn_op_put(&lock_key, serialize_struct(&lock)?), // copied file lock key + txn_op_put(&lock_key, serialize_struct(&lock)?), // copied file lock key ]; for (file, file_info) in req.file_info.iter() { let key = TableCopiedFileNameIdent { diff --git a/src/query/service/src/interpreters/interpreter_copy_v2.rs b/src/query/service/src/interpreters/interpreter_copy_v2.rs index 464d38faab6bb..12b585eb9edaf 100644 --- a/src/query/service/src/interpreters/interpreter_copy_v2.rs +++ b/src/query/service/src/interpreters/interpreter_copy_v2.rs @@ -29,6 +29,7 @@ use common_meta_types::UserStageInfo; use regex::Regex; use super::append2table; +use crate::catalogs::Catalog; use crate::interpreters::interpreter_common::list_files; use crate::interpreters::interpreter_common::stat_file; use crate::interpreters::Interpreter; @@ -77,20 +78,19 @@ impl CopyInterpreterV2 { } async fn do_upsert_copied_files_info( - &self, - catalog_name: String, + tenant: String, database_name: String, table_id: u64, copy_stage_files: &mut BTreeMap, + catalog: Arc, ) -> Result<()> { let req = UpsertTableCopiedFileReq { table_id, file_info: copy_stage_files.clone(), expire_at: None, }; - let catalog = self.ctx.get_catalog(&catalog_name)?; catalog - .upsert_table_copied_file_info(&self.ctx.get_tenant(), &database_name, req) + .upsert_table_copied_file_info(&tenant, &database_name, req) .await?; copy_stage_files.clear(); Ok(()) @@ -179,11 +179,11 @@ impl CopyInterpreterV2 { } async fn upsert_copied_files_info( - &self, - catalog_name: &str, - database_name: &str, + tenant: String, + database_name: String, table_id: u64, copy_stage_files: BTreeMap, + catalog: Arc, ) -> Result<()> { tracing::info!("upsert_copied_files_info: {:?}", copy_stage_files); @@ -195,21 +195,23 @@ impl CopyInterpreterV2 { for (file_name, file_info) in copy_stage_files { do_copy_stage_files.insert(file_name.clone(), file_info); if do_copy_stage_files.len() > MAX_QUERY_COPIED_FILES_NUM { - self.do_upsert_copied_files_info( - catalog_name.to_string(), - database_name.to_string(), + CopyInterpreterV2::do_upsert_copied_files_info( + tenant.clone(), + database_name.clone(), table_id, &mut do_copy_stage_files, + catalog.clone(), ) .await?; } } if !do_copy_stage_files.is_empty() { - self.do_upsert_copied_files_info( - catalog_name.to_string(), - database_name.to_string(), + CopyInterpreterV2::do_upsert_copied_files_info( + tenant.clone(), + database_name.clone(), table_id, &mut do_copy_stage_files, + catalog.clone(), ) .await?; } @@ -267,9 +269,12 @@ impl CopyInterpreterV2 { catalog_name: &String, db_name: &String, tbl_name: &String, + table_id: u64, from: &ReadDataSourcePlan, - files: Vec, + // files: Vec, + copy_stage_files: BTreeMap, ) -> Result { + let files: Vec = copy_stage_files.keys().cloned().collect(); let mut build_res = PipelineBuildResult::create(); let read_source_plan = Self::rewrite_read_plan_file_name(from.clone(), &files); @@ -290,6 +295,10 @@ impl CopyInterpreterV2 { let ctx = self.ctx.clone(); let files = files.clone(); let from = from.clone(); + let catalog_name = catalog_name.clone(); + let db_name = db_name.clone(); + let catalog = self.ctx.get_catalog(&catalog_name)?; + let tenant = self.ctx.get_tenant(); build_res.main_pipeline.set_on_finished(move |may_error| { if may_error.is_none() { @@ -298,6 +307,10 @@ impl CopyInterpreterV2 { let files = files.clone(); let from = from.clone(); let to_table = to_table.clone(); + let copy_stage_files = copy_stage_files.clone(); + let db_name = db_name.clone(); + let catalog = catalog.clone(); + let tenant = tenant.clone(); return GlobalIORuntime::instance().block_on(async move { // Commit @@ -307,7 +320,17 @@ impl CopyInterpreterV2 { .await?; // Purge - CopyInterpreterV2::purge_files(ctx, &from, &files).await + CopyInterpreterV2::purge_files(ctx, &from, &files).await?; + + // Upsert table copied file info. + CopyInterpreterV2::upsert_copied_files_info( + tenant, + db_name, + table_id, + copy_stage_files, + catalog, + ) + .await }); } @@ -451,28 +474,15 @@ impl Interpreter for CopyInterpreterV2 { return Ok(PipelineBuildResult::create()); } - let result = self - .copy_files_to_table( - catalog_name, - database_name, - table_name, - from, - copy_stage_files.keys().cloned().collect(), - ) - .await; - - if result.is_ok() { - let _ = self - .upsert_copied_files_info( - catalog_name, - database_name, - table_id, - copy_stage_files, - ) - .await?; - } - - result + self.copy_files_to_table( + catalog_name, + database_name, + table_name, + table_id, + from, + copy_stage_files, + ) + .await } other => Err(ErrorCode::LogicalError(format!( "Cannot list files for the source info: {:?}",