Skip to content

Commit

Permalink
Merge pull request databendlabs#8307 from lichuang/copy_optimize
Browse files Browse the repository at this point in the history
fix: Copy Version mismatch bug
  • Loading branch information
mergify[bot] authored Oct 19, 2022
2 parents bb8a27b + c40b048 commit 4a99f21
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 44 deletions.
9 changes: 2 additions & 7 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1859,14 +1859,9 @@ impl<KV: KVApi> SchemaApi for KV {
Some(lock) => lock,
None => TableCopiedFileLock {},
};
let mut condition = vec![
txn_cond_seq(&tbid, Eq, tb_meta_seq),
txn_cond_seq(&lock_key, Eq, lock_key_seq),
];
let mut condition = vec![txn_cond_seq(&lock_key, Eq, lock_key_seq)];
let mut if_then = vec![
// every copied files changed, change tbid seq to make all table child consistent.
txn_op_put(&tbid, serialize_struct(&tb_meta.unwrap())?), /* (tenant, db_id, tb_id) -> tb_meta */
txn_op_put(&lock_key, serialize_struct(&lock)?), // copied file lock key
txn_op_put(&lock_key, serialize_struct(&lock)?), // copied file lock key
];
for (file, file_info) in req.file_info.iter() {
let key = TableCopiedFileNameIdent {
Expand Down
84 changes: 47 additions & 37 deletions src/query/service/src/interpreters/interpreter_copy_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use common_meta_types::UserStageInfo;
use regex::Regex;

use super::append2table;
use crate::catalogs::Catalog;
use crate::interpreters::interpreter_common::list_files;
use crate::interpreters::interpreter_common::stat_file;
use crate::interpreters::Interpreter;
Expand Down Expand Up @@ -77,20 +78,19 @@ impl CopyInterpreterV2 {
}

async fn do_upsert_copied_files_info(
&self,
catalog_name: String,
tenant: String,
database_name: String,
table_id: u64,
copy_stage_files: &mut BTreeMap<String, TableCopiedFileInfo>,
catalog: Arc<dyn Catalog>,
) -> Result<()> {
let req = UpsertTableCopiedFileReq {
table_id,
file_info: copy_stage_files.clone(),
expire_at: None,
};
let catalog = self.ctx.get_catalog(&catalog_name)?;
catalog
.upsert_table_copied_file_info(&self.ctx.get_tenant(), &database_name, req)
.upsert_table_copied_file_info(&tenant, &database_name, req)
.await?;
copy_stage_files.clear();
Ok(())
Expand Down Expand Up @@ -179,11 +179,11 @@ impl CopyInterpreterV2 {
}

async fn upsert_copied_files_info(
&self,
catalog_name: &str,
database_name: &str,
tenant: String,
database_name: String,
table_id: u64,
copy_stage_files: BTreeMap<String, TableCopiedFileInfo>,
catalog: Arc<dyn Catalog>,
) -> Result<()> {
tracing::info!("upsert_copied_files_info: {:?}", copy_stage_files);

Expand All @@ -195,21 +195,23 @@ impl CopyInterpreterV2 {
for (file_name, file_info) in copy_stage_files {
do_copy_stage_files.insert(file_name.clone(), file_info);
if do_copy_stage_files.len() > MAX_QUERY_COPIED_FILES_NUM {
self.do_upsert_copied_files_info(
catalog_name.to_string(),
database_name.to_string(),
CopyInterpreterV2::do_upsert_copied_files_info(
tenant.clone(),
database_name.clone(),
table_id,
&mut do_copy_stage_files,
catalog.clone(),
)
.await?;
}
}
if !do_copy_stage_files.is_empty() {
self.do_upsert_copied_files_info(
catalog_name.to_string(),
database_name.to_string(),
CopyInterpreterV2::do_upsert_copied_files_info(
tenant.clone(),
database_name.clone(),
table_id,
&mut do_copy_stage_files,
catalog.clone(),
)
.await?;
}
Expand Down Expand Up @@ -267,9 +269,12 @@ impl CopyInterpreterV2 {
catalog_name: &String,
db_name: &String,
tbl_name: &String,
table_id: u64,
from: &ReadDataSourcePlan,
files: Vec<String>,
// files: Vec<String>,
copy_stage_files: BTreeMap<String, TableCopiedFileInfo>,
) -> Result<PipelineBuildResult> {
let files: Vec<String> = copy_stage_files.keys().cloned().collect();
let mut build_res = PipelineBuildResult::create();

let read_source_plan = Self::rewrite_read_plan_file_name(from.clone(), &files);
Expand All @@ -290,6 +295,10 @@ impl CopyInterpreterV2 {
let ctx = self.ctx.clone();
let files = files.clone();
let from = from.clone();
let catalog_name = catalog_name.clone();
let db_name = db_name.clone();
let catalog = self.ctx.get_catalog(&catalog_name)?;
let tenant = self.ctx.get_tenant();

build_res.main_pipeline.set_on_finished(move |may_error| {
if may_error.is_none() {
Expand All @@ -298,6 +307,10 @@ impl CopyInterpreterV2 {
let files = files.clone();
let from = from.clone();
let to_table = to_table.clone();
let copy_stage_files = copy_stage_files.clone();
let db_name = db_name.clone();
let catalog = catalog.clone();
let tenant = tenant.clone();

return GlobalIORuntime::instance().block_on(async move {
// Commit
Expand All @@ -307,7 +320,17 @@ impl CopyInterpreterV2 {
.await?;

// Purge
CopyInterpreterV2::purge_files(ctx, &from, &files).await
CopyInterpreterV2::purge_files(ctx, &from, &files).await?;

// Upsert table copied file info.
CopyInterpreterV2::upsert_copied_files_info(
tenant,
db_name,
table_id,
copy_stage_files,
catalog,
)
.await
});
}

Expand Down Expand Up @@ -451,28 +474,15 @@ impl Interpreter for CopyInterpreterV2 {
return Ok(PipelineBuildResult::create());
}

let result = self
.copy_files_to_table(
catalog_name,
database_name,
table_name,
from,
copy_stage_files.keys().cloned().collect(),
)
.await;

if result.is_ok() {
let _ = self
.upsert_copied_files_info(
catalog_name,
database_name,
table_id,
copy_stage_files,
)
.await?;
}

result
self.copy_files_to_table(
catalog_name,
database_name,
table_name,
table_id,
from,
copy_stage_files,
)
.await
}
other => Err(ErrorCode::LogicalError(format!(
"Cannot list files for the source info: {:?}",
Expand Down

0 comments on commit 4a99f21

Please sign in to comment.