Skip to content

Commit

Permalink
refactor: Improve gc_dropped_table_by_id() method (#16528)
Browse files Browse the repository at this point in the history
- Unify arguments using typed values:
  - Replace separate table id and name with &DBIdTableName
  - Use TableId type for table identification
- Reorganize method structure for better clarity and maintainability
  • Loading branch information
drmingdrmer committed Sep 26, 2024
1 parent 7e14ac0 commit 27157a3
Showing 1 changed file with 64 additions and 35 deletions.
99 changes: 64 additions & 35 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2775,14 +2775,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
tables: _,
} => gc_dropped_db_by_id(self, db_id, &req.tenant, db_name).await?,
DroppedId::Table { name, id } => {
gc_dropped_table_by_id(
self,
&req.tenant,
name.db_id,
id.table_id,
name.table_name.clone(),
)
.await?
gc_dropped_table_by_id(self, &req.tenant, &name, &id).await?
}
}
}
Expand Down Expand Up @@ -3805,61 +3798,97 @@ async fn gc_dropped_db_by_id(
Ok(())
}

/// Permanently remove a dropped table from the meta-service.
///
/// The data of the table should already have been removed before calling this method.
async fn gc_dropped_table_by_id(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
tenant: &Tenant,
db_id: u64,
table_id: u64,
table_name: String,
db_id_table_name: &DBIdTableName,
table_id_ident: &TableId,
) -> Result<(), KVAppError> {
// first get TableIdList
let table_id_history_ident = TableIdHistoryIdent {
database_id: db_id,
table_name,
};
// First remove all copied files for the dropped table.
// These markers are not part of the table and can be removed in separate transactions.
remove_copied_files_for_dropped_table(kv_api, table_id_ident).await?;

let seq_id_list = kv_api.get_pb(&table_id_history_ident).await?;
let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let mut txn = TxnRequest::default();

// 1)
remove_data_for_dropped_table(kv_api, table_id_ident, &mut txn).await?;

// 2)
let table_id_history_ident = TableIdHistoryIdent {
database_id: db_id_table_name.db_id,
table_name: db_id_table_name.table_name.clone(),
};

update_txn_to_remove_table_history(
kv_api,
&mut txn,
&table_id_history_ident,
table_id_ident,
)
.await?;

// 3)
remove_index_for_dropped_table(kv_api, tenant, table_id_ident, &mut txn).await?;

let (succ, _responses) = send_txn(kv_api, txn).await?;

if succ {
return Ok(());
}
}
}

/// Fill in condition and operations to TxnRequest to remove table history.
///
/// If the table history does not exist or does not include the table id, do nothing.
///
/// This function does not submit the txn.
async fn update_txn_to_remove_table_history(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
txn: &mut TxnRequest,
table_id_history_ident: &TableIdHistoryIdent,
table_id_ident: &TableId,
) -> Result<(), MetaError> {
let seq_id_list = kv_api.get_pb(table_id_history_ident).await?;

let Some(seq_id_list) = seq_id_list else {
return Ok(());
};

let seq = seq_id_list.seq;
let mut tb_id_list = seq_id_list.data;
let mut history = seq_id_list.data;

// remove table_id from tb_id_list:
{
let index = tb_id_list.id_list.iter().position(|&x| x == table_id);
let Some(index) = index else {
let table_id = table_id_ident.table_id;
let pos = history.id_list.iter().position(|&x| x == table_id);
let Some(index) = pos else {
return Ok(());
};

tb_id_list.id_list.remove(index);
history.id_list.remove(index);
}

let mut txn = TxnRequest::default();

// construct the txn request
txn.condition.push(
// condition: table id list not changed
txn_cond_eq_seq(&table_id_history_ident, seq),
txn_cond_eq_seq(table_id_history_ident, seq),
);

if tb_id_list.id_list.is_empty() {
txn.if_then.push(txn_op_del(&table_id_history_ident));
if history.id_list.is_empty() {
txn.if_then.push(txn_op_del(table_id_history_ident));
} else {
// save new table id list
txn.if_then
.push(txn_op_put_pb(&table_id_history_ident, &tb_id_list, None)?);
.push(txn_op_put_pb(table_id_history_ident, &history, None)?);
}

let table_id_ident = TableId { table_id };
remove_copied_files_for_dropped_table(kv_api, &table_id_ident).await?;
remove_data_for_dropped_table(kv_api, &table_id_ident, &mut txn).await?;
remove_index_for_dropped_table(kv_api, tenant, &table_id_ident, &mut txn).await?;

let _resp = kv_api.transaction(txn).await?;

Ok(())
}

Expand Down

0 comments on commit 27157a3

Please sign in to comment.