Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: simplify SchemaApi::truncate_table() #16506

Merged
merged 4 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,16 @@ pub trait SchemaApi: Send + Sync {
async fn get_tables_history(
&self,
req: ListTableReq,
db_name: &str,
) -> Result<Vec<Arc<TableInfo>>, KVAppError>;

async fn list_tables(&self, req: ListTableReq) -> Result<Vec<Arc<TableInfo>>, KVAppError>;
/// List all tables in the database.
///
/// Returns a list of `(table_name, table_id, table_meta)` tuples.
async fn list_tables(
&self,
req: ListTableReq,
) -> Result<Vec<(String, TableId, SeqV<TableMeta>)>, KVAppError>;

/// Return TableMeta by table_id.
///
Expand Down
172 changes: 52 additions & 120 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ use databend_common_meta_app::schema::RenameTableReq;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyAction;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
use databend_common_meta_app::schema::TableCopiedFileInfo;
use databend_common_meta_app::schema::TableCopiedFileNameIdent;
use databend_common_meta_app::schema::TableId;
use databend_common_meta_app::schema::TableIdHistoryIdent;
Expand Down Expand Up @@ -202,6 +201,7 @@ use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_crud_api::KVPbCrudApi;
use crate::list_keys;
use crate::list_u64_value;
use crate::meta_txn_error::MetaTxnError;
use crate::name_id_value_api::NameIdValueApi;
use crate::name_value_api::NameValueApi;
Expand All @@ -217,8 +217,6 @@ use crate::txn_op_put;
use crate::util::db_id_has_to_exist;
use crate::util::deserialize_id_get_response;
use crate::util::deserialize_struct_get_response;
use crate::util::get_table_by_id_or_err;
use crate::util::list_tables_from_unshare_db;
use crate::util::mget_pb_values;
use crate::util::txn_delete_exact;
use crate::util::txn_op_put_pb;
Expand Down Expand Up @@ -1569,29 +1567,13 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
async fn get_tables_history(
&self,
req: ListTableReq,
db_name: &str,
) -> Result<Vec<Arc<TableInfo>>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let tenant_dbname = &req.inner;

// Get db by name to ensure presence
let res = get_db_or_err(
self,
tenant_dbname,
format!("get_tables_history: {}", tenant_dbname.display()),
)
.await;

let (seq_db_id, _db_meta) = match res {
Ok(x) => x,
Err(e) => {
return Err(e);
}
};

// List tables by tenant, db_id, table_name.
let table_id_history_ident = TableIdHistoryIdent {
database_id: *seq_db_id.data,
database_id: req.database_id.db_id,
table_name: "dummy".to_string(),
};

Expand All @@ -1605,7 +1587,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
.iter()
.map(|table_id_list_key| {
TableIdHistoryIdent {
database_id: *seq_db_id.data,
database_id: req.database_id.db_id,
table_name: table_id_list_key.table_name.clone(),
}
.to_string_key()
Expand Down Expand Up @@ -1640,11 +1622,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
.map(|(table_id, seqv)| {
Arc::new(TableInfo {
ident: TableIdent::new(table_id.table_id, seqv.seq()),
desc: format!(
"'{}'.'{}'",
tenant_dbname.database_name(),
table_id_list_key.table_name,
),
desc: format!("'{}'.'{}'", db_name, table_id_list_key.table_name,),
name: table_id_list_key.table_name.to_string(),
meta: seqv.data,
db_type: DatabaseType::NormalDB,
Expand All @@ -1661,29 +1639,38 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

#[logcall::logcall]
#[fastrace::trace]
async fn list_tables(&self, req: ListTableReq) -> Result<Vec<Arc<TableInfo>>, KVAppError> {
async fn list_tables(
&self,
req: ListTableReq,
) -> Result<Vec<(String, TableId, SeqV<TableMeta>)>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let tenant_dbname = &req.inner;
let dbid_tbname = DBIdTableName {
db_id: req.database_id.db_id,
// Use empty name to scan all tables
table_name: "".to_string(),
};

// Get db by name to ensure presence
let res = get_db_or_err(
self,
tenant_dbname,
format!("list_tables: {}", tenant_dbname.display()),
)
.await;
let (names, ids) = list_u64_value(self, &dbid_tbname).await?;

let (seq_db_id, _db_meta) = match res {
Ok(x) => x,
Err(e) => {
return Err(e);
}
};
let ids = ids
.into_iter()
.map(|id| TableId { table_id: id })
.collect::<Vec<_>>();

let tb_infos = list_tables_from_unshare_db(self, *seq_db_id.data, tenant_dbname).await?;
let mut seq_metas = vec![];
for chunk in ids.chunks(DEFAULT_MGET_SIZE) {
let got = self.get_pb_values_vec(chunk.to_vec()).await?;
seq_metas.extend(got);
}

Ok(tb_infos)
let res = names
.into_iter()
.zip(ids)
.zip(seq_metas)
.filter_map(|((n, id), seq_meta)| seq_meta.map(|x| (n.table_name, id, x)))
.collect::<Vec<_>>();
Ok(res)
}

#[logcall::logcall]
Expand Down Expand Up @@ -2105,8 +2092,6 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let ctx = func_name!();

let table_id = TableId {
table_id: req.table_id,
};
Expand All @@ -2118,20 +2103,26 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
// If table seq is not changed before and after listing, we can be sure the list of copied
// files is consistent to this version of the table.

let (mut seq_1, _tb_meta) = get_table_by_id_or_err(self, &table_id, ctx).await?;
let mut seq_1 = self.get_seq(&table_id).await?;

let mut trials = txn_backoff(None, func_name!());
let copied_files = loop {
trials.next().unwrap()?.await;

let copied_files = list_table_copied_files(self, table_id.table_id).await?;
let copied_file_ident = TableCopiedFileNameIdent {
table_id: table_id.table_id,
file: "dummy".to_string(),
};
let dir_name = DirName::new(copied_file_ident);
let copied_files = self.list_pb_vec(&dir_name).await?;

let (seq_2, _tb_meta) = get_table_by_id_or_err(self, &table_id, ctx).await?;
let seq_2 = self.get_seq(&table_id).await?;

if seq_1 == seq_2 {
debug!(
"list all copied file of table {}: {:?}",
table_id.table_id, copied_files
"list all copied file of table {}: {} files",
table_id.table_id,
copied_files.len()
);
break copied_files;
} else {
Expand All @@ -2141,64 +2132,22 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

// 2. Remove the copied files only when the seq of a copied file has not changed.
//
// During running this step with several small transaction, other transactions may be
// modifying the table.
//
// - We assert the table seq is not changed in each transaction.
// - We do not assert the seq of each copied file in each transaction, since we only delete
// non-changed ones.

for chunk in copied_files.chunks(chunk_size as usize) {
let str_keys: Vec<_> = chunk.iter().map(|f| f.to_string_key()).collect();

// Load the `seq` of every copied file
let seqs = {
let seq_infos: Vec<(u64, Option<TableCopiedFileInfo>)> =
mget_pb_values(self, &str_keys).await?;

seq_infos.into_iter().map(|(seq, _)| seq)
let txn = TxnRequest {
condition: vec![],
if_then: chunk
.iter()
.map(|(name, seq_file)| {
TxnOp::delete_exact(name.to_string_key(), Some(seq_file.seq()))
})
.collect(),
else_then: vec![],
};

let mut if_then = vec![];
for (copied_seq, copied_str_key) in seqs.zip(str_keys) {
if copied_seq == 0 {
continue;
}

if_then.push(TxnOp::delete_exact(copied_str_key, Some(copied_seq)));
}

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let (tb_meta_seq, tb_meta) = get_table_by_id_or_err(self, &table_id, ctx).await?;

let mut if_then = if_then.clone();

// Update to increase table meta seq, so that to assert no other process modify the table
if_then.push(txn_op_put(&table_id, serialize_struct(&tb_meta)?));

let txn_req = TxnRequest {
condition: vec![txn_cond_seq(&table_id, Eq, tb_meta_seq)],
if_then,
else_then: vec![],
};

debug!("submit chunk delete copied files: {:?}", txn_req);

let (succ, _responses) = send_txn(self, txn_req).await?;
debug!(
id :? =(&table_id),
succ = succ,
ctx = ctx;
""
);

if succ {
break;
}
}
let (_succ, _responses) = send_txn(self, txn).await?;
}

Ok(TruncateTableReply {})
Expand Down Expand Up @@ -3472,23 +3421,6 @@ async fn remove_copied_files_for_dropped_table(
unreachable!()
}

/// List the copied file identities belonging to a table.
async fn list_table_copied_files(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
table_id: u64,
) -> Result<Vec<TableCopiedFileNameIdent>, MetaError> {
let copied_file_ident = TableCopiedFileNameIdent {
table_id,
file: "dummy".to_string(),
};

let dir_name = DirName::new(copied_file_ident);

let copied_files = list_keys(kv_api, &dir_name).await?;

Ok(copied_files)
}

/// Get the retention boundary time before which the data can be permanently removed.
fn get_retention_boundary(now: DateTime<Utc>) -> DateTime<Utc> {
now - Duration::from_secs(DEFAULT_DATA_RETENTION_SECONDS as u64)
Expand Down
Loading
Loading