Skip to content

Commit

Permalink
fix: identify failed vacuum db by id (#16553)
Browse files Browse the repository at this point in the history
* fix: identify failed vacuum db by id

Database-name in string can not be used to identify a database.
A database can be renamed and leads to a undefined behaviors.

In this commit, when a table is failed to vacuum, mark the belonging
database as failed by database-id, instead of by database-name.

* M  src/meta/api/src/schema_api_impl.rs
  • Loading branch information
drmingdrmer committed Sep 30, 2024
1 parent 2ebafba commit e8895e6
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 38 deletions.
18 changes: 5 additions & 13 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2723,25 +2723,17 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
)
.await;

let (seq_db_id, db_meta) = match res {
let (seq_db_id, _db_meta) = match res {
Ok(x) => x,
Err(e) => {
return Err(e);
}
};

let db_info = Arc::new(DatabaseInfo {
database_id: seq_db_id.data,
name_ident: tenant_dbname.clone(),
meta: db_meta,
});
let table_nivs = get_history_tables_for_gc(
self,
drop_time_range.clone(),
db_info.database_id.db_id,
the_limit,
)
.await?;
let database_id = seq_db_id.data;
let table_nivs =
get_history_tables_for_gc(self, drop_time_range.clone(), database_id.db_id, the_limit)
.await?;

let mut drop_ids = vec![];
let mut vacuum_tables = vec![];
Expand Down
15 changes: 5 additions & 10 deletions src/query/ee/src/storages/fuse/operations/vacuum_drop_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,22 +35,19 @@ pub async fn do_vacuum_drop_table(
dry_run_limit: Option<usize>,
) -> VacuumDropTablesResult {
let mut list_files = vec![];
let mut failed_dbs = HashSet::new();
let mut failed_tables = HashSet::new();
for (table_info, operator) in tables {
let result =
vacuum_drop_single_table(&table_info, operator, dry_run_limit, &mut list_files).await;
if result.is_err() {
let db_name = table_info.database_name()?;
let table_id = table_info.ident.table_id;
failed_dbs.insert(db_name.to_string());
failed_tables.insert(table_id);
}
}
Ok(if dry_run_limit.is_some() {
(Some(list_files), failed_dbs, failed_tables)
(Some(list_files), failed_tables)
} else {
(None, failed_dbs, failed_tables)
(None, failed_tables)
})
}

Expand Down Expand Up @@ -182,16 +179,14 @@ pub async fn vacuum_drop_tables_by_table_info(
ret_files.extend(files);
}
}
(Some(ret_files), HashSet::new(), HashSet::new())
(Some(ret_files), HashSet::new())
} else {
let mut failed_dbs = HashSet::new();
let mut failed_tables = HashSet::new();
for res in result {
let (_, db, tbl) = res?;
failed_dbs.extend(db);
let (_, tbl) = res?;
failed_tables.extend(tbl);
}
(None, failed_dbs, failed_tables)
(None, failed_tables)
}
};

Expand Down
4 changes: 0 additions & 4 deletions src/query/ee/tests/it/storages/fuse/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ async fn test_fuse_do_vacuum_drop_table_deletion_error() -> Result<()> {
let tables = vec![(table_info, operator)];
let result = do_vacuum_drop_table(tables, None).await?;
assert!(!result.1.is_empty());
assert!(!result.2.is_empty());
// verify that accessor.delete() was called
assert!(faulty_accessor.hit_delete_operation());

Expand Down Expand Up @@ -341,7 +340,6 @@ async fn test_fuse_vacuum_drop_tables_in_parallel_with_deletion_error() -> Resul

// verify that errors of deletions are not swallowed
assert!(!result.1.is_empty());
assert!(!result.2.is_empty());
}

// Case 2: parallel vacuum dropped tables
Expand All @@ -358,7 +356,6 @@ async fn test_fuse_vacuum_drop_tables_in_parallel_with_deletion_error() -> Resul
assert!(faulty_accessor.hit_delete_operation());
// verify that errors of deletions are not swallowed
assert!(!result.1.is_empty());
assert!(!result.2.is_empty());
}

Ok(())
Expand Down Expand Up @@ -433,7 +430,6 @@ async fn test_fuse_do_vacuum_drop_table_external_storage() -> Result<()> {
let tables = vec![(table_info, operator)];
let result = do_vacuum_drop_table(tables, None).await?;
assert!(!result.1.is_empty());
assert!(!result.2.is_empty());

// verify that accessor.delete() was called
assert!(!accessor.hit_delete_operation());
Expand Down
8 changes: 2 additions & 6 deletions src/query/ee_features/vacuum_handler/src/vacuum_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,8 @@ use databend_common_storages_fuse::FuseTable;
// (TableName, file, file size)
pub type VacuumDropFileInfo = (String, String, u64);

// (drop_files, failed_dbs, failed_tables)
pub type VacuumDropTablesResult = Result<(
Option<Vec<VacuumDropFileInfo>>,
HashSet<String>,
HashSet<u64>,
)>;
// (drop_files, failed_tables)
pub type VacuumDropTablesResult = Result<(Option<Vec<VacuumDropFileInfo>>, HashSet<u64>)>;

#[async_trait::async_trait]
pub trait VacuumHandler: Sync + Send {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

use std::cmp::min;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::collections::HashSet;
use std::sync::Arc;

use chrono::Duration;
Expand Down Expand Up @@ -139,6 +141,14 @@ impl Interpreter for VacuumDropTablesInterpreter {
))
.await?;

// map: table id to its belonging db id
let mut containing_db = BTreeMap::new();
for drop_id in drop_ids.iter() {
if let DroppedId::Table { name, id } = drop_id {
containing_db.insert(id.table_id, name.db_id);
}
}

info!(
"vacuum drop table from db {:?}, get_drop_table_infos return tables: {:?}, drop_ids: {:?}",
self.plan.database,
Expand All @@ -156,7 +166,7 @@ impl Interpreter for VacuumDropTablesInterpreter {

let handler = get_vacuum_handler();
let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let (files_opt, failed_dbs, failed_tables) = handler
let (files_opt, failed_tables) = handler
.do_vacuum_drop_tables(
threads_nums,
tables,
Expand All @@ -167,13 +177,20 @@ impl Interpreter for VacuumDropTablesInterpreter {
},
)
.await?;
// gc meta data only when not dry run

let failed_db_ids = failed_tables
.iter()
// Safe unwrap: the map is built from drop_ids
.map(|id| *containing_db.get(id).unwrap())
.collect::<HashSet<_>>();

// gc metadata only when not dry run
if self.plan.option.dry_run.is_none() {
let mut success_dropped_ids = vec![];
for drop_id in drop_ids {
match &drop_id {
DroppedId::Db { db_id: _, db_name } => {
if !failed_dbs.contains(db_name) {
DroppedId::Db { db_id, db_name: _ } => {
if !failed_db_ids.contains(db_id) {
success_dropped_ids.push(drop_id);
}
}
Expand All @@ -186,7 +203,7 @@ impl Interpreter for VacuumDropTablesInterpreter {
}
info!(
"failed dbs:{:?}, failed_tables:{:?}, success_drop_ids:{:?}",
failed_dbs, failed_tables, success_dropped_ids
failed_db_ids, failed_tables, success_dropped_ids
);
self.gc_drop_tables(catalog, success_dropped_ids).await?;
}
Expand Down

0 comments on commit e8895e6

Please sign in to comment.