Skip to content

Commit

Permalink
refactor: SchemaApi::get_database_history() (#16487)
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Sep 22, 2024
1 parent dd286da commit 8b3c37a
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 145 deletions.
206 changes: 69 additions & 137 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,144 +642,68 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
) -> Result<Vec<Arc<DatabaseInfo>>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

// List tables by tenant, db_id, table_name.
let dbid_tbname_idlist = DatabaseIdHistoryIdent::new(&req.tenant, "dummy");
let dir_name = DirName::new(dbid_tbname_idlist);
let db_id_list_keys = list_keys(self, &dir_name).await?;
let name_ident = DatabaseIdHistoryIdent::new(&req.tenant, "dummy");
let dir_name = DirName::new(name_ident);

let mut db_info_list = vec![];
let now = Utc::now();
let keys: Vec<String> = db_id_list_keys
.iter()
.map(|db_id_list_key| db_id_list_key.to_string_key())
.collect();
let mut db_id_list_keys_iter = db_id_list_keys.into_iter();
let include_drop_db = matches!(&req.filter, Some(DatabaseInfoFilter::IncludeDropped));
for c in keys.chunks(DEFAULT_MGET_SIZE) {
let db_id_list_seq_and_list: Vec<(u64, Option<DbIdList>)> =
mget_pb_values(self, c).await?;
let name_idlists = self.list_pb_vec(&dir_name).await?;

for (db_id_list_seq, db_id_list_opt) in db_id_list_seq_and_list {
let db_id_list_key = db_id_list_keys_iter.next().unwrap();
let db_id_list = if db_id_list_seq == 0 {
continue;
} else {
match db_id_list_opt {
Some(list) => list,
None => {
continue;
}
}
};
let mut dbs = BTreeMap::new();

let inner_keys: Vec<String> = db_id_list
.id_list
.iter()
.map(|db_id| DatabaseId { db_id: *db_id }.to_string_key())
.collect();
let mut db_id_list_iter = db_id_list.id_list.into_iter();
for c in inner_keys.chunks(DEFAULT_MGET_SIZE) {
let db_meta_seq_meta_vec: Vec<(u64, Option<DatabaseMeta>)> =
mget_pb_values(self, c).await?;

for (db_meta_seq, db_meta) in db_meta_seq_meta_vec {
let db_id = db_id_list_iter.next().unwrap();
if db_meta_seq == 0 || db_meta.is_none() {
error!("get_database_history cannot find {:?} db_meta", db_id);
continue;
}
let db_meta = db_meta.unwrap();
// if include drop db, then no need to fill out of retention time db
if !include_drop_db
&& is_drop_time_out_of_retention_time(&db_meta.drop_on, &now)
{
continue;
}
for (db_id_list_key, db_id_list) in name_idlists {
let ids = db_id_list
.id_list
.iter()
.map(|db_id| DatabaseId { db_id: *db_id })
.collect::<Vec<_>>();

let db = DatabaseInfo {
database_id: DatabaseId::new(db_id),
name_ident: DatabaseNameIdent::new_from(db_id_list_key.clone()),
meta: SeqV::new(db_meta_seq, db_meta),
};
for db_ids in ids.chunks(DEFAULT_MGET_SIZE) {
let id_metas = self.get_pb_vec(db_ids.iter().cloned()).await?;

db_info_list.push(Arc::new(db));
}
for (db_id, db_meta) in id_metas {
let Some(db_meta) = db_meta else {
error!("get_database_history cannot find {:?} db_meta", db_id);
continue;
};

let db = DatabaseInfo {
database_id: db_id,
name_ident: DatabaseNameIdent::new_from(db_id_list_key.clone()),
meta: db_meta,
};

dbs.insert(db_id.db_id, Arc::new(db));
}
}
}

// `list_database` can list db which has no `DbIdListKey`
if include_drop_db {
// if `include_drop_db` is true, return all db info which not exist in db_info_list
let db_id_set: HashSet<u64> = db_info_list
.iter()
.map(|db_info| db_info.database_id.db_id)
.collect();
// Find out dbs that are not included in any DbIdListKey.
// Because the DbIdListKey function is added after the first release of the system.
// There may be dbs do not have a corresponding DbIdListKey.

let all_dbs = self.list_databases(req).await?;
for db_info in all_dbs {
if !db_id_set.contains(&db_info.database_id.db_id) {
warn!(
"get db history db:{:?}, db_id:{:?} has no DbIdListKey",
db_info.name_ident, db_info.database_id.db_id
);
db_info_list.push(db_info);
}
}
} else {
// if `include_drop_db` is false, filter out db which drop_on time out of retention time
let db_id_set: HashSet<u64> = db_info_list
.iter()
.map(|db_info| db_info.database_id.db_id)
.collect();
let list_dbs = self.list_databases(req.clone()).await?;
for db_info in list_dbs {
dbs.entry(db_info.database_id.db_id).or_insert_with(|| {
warn!(
"get db history db:{:?}, db_id:{:?} has no DbIdListKey",
db_info.name_ident, db_info.database_id.db_id
);

let all_dbs = self.list_databases(req).await?;
let mut add_dbinfo_map = HashMap::new();
let mut db_id_list = Vec::new();
for db_info in all_dbs {
if !db_id_set.contains(&db_info.database_id.db_id) {
warn!(
"get db history db:{:?}, db_id:{:?} has no DbIdListKey",
db_info.name_ident, db_info.database_id.db_id
);
db_id_list.push(DatabaseId {
db_id: db_info.database_id.db_id,
});
add_dbinfo_map.insert(db_info.database_id.db_id, db_info);
}
}
let inner_keys: Vec<String> = db_id_list
.iter()
.map(|db_id| db_id.to_string_key())
.collect();
let mut db_id_list_iter = db_id_list.into_iter();
for c in inner_keys.chunks(DEFAULT_MGET_SIZE) {
let db_meta_seq_meta_vec: Vec<(u64, Option<DatabaseMeta>)> =
mget_pb_values(self, c).await?;

for (db_meta_seq, db_meta) in db_meta_seq_meta_vec {
let db_id = db_id_list_iter.next().unwrap().db_id;
if db_meta_seq == 0 || db_meta.is_none() {
error!("get_database_history cannot find {:?} db_meta", db_id);
continue;
}
let db_meta = db_meta.unwrap();
// if include drop db, then no need to fill out of retention time db
if is_drop_time_out_of_retention_time(&db_meta.drop_on, &now) {
continue;
}
if let Some(db_info) = add_dbinfo_map.get(&db_id) {
warn!(
"get db history db:{:?}, db_id:{:?} has no DbIdListKey",
db_info.name_ident, db_info.database_id.db_id
);
db_info_list.push(db_info.clone());
}
}
}
db_info
});
}

return Ok(db_info_list);
let now = Utc::now();

// Returns not only retainable subjects, also non-retainable subjects.
let include_non_retainable =
matches!(&req.filter, Some(DatabaseInfoFilter::IncludeNonRetainable));

let dbs = dbs
.into_values()
.filter(|x| include_non_retainable || is_drop_time_retainable(x.meta.drop_on, now))
.collect();

return Ok(dbs);
}

#[logcall::logcall]
Expand Down Expand Up @@ -2781,7 +2705,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
.get_database_history(ListDatabaseReq {
tenant: req.inner.tenant().clone(),
// need to get all db(include drop db)
filter: Some(DatabaseInfoFilter::IncludeDropped),
filter: Some(DatabaseInfoFilter::IncludeNonRetainable),
})
.await?;

Expand Down Expand Up @@ -3255,7 +3179,7 @@ async fn get_table_meta_history(
continue;
};

if is_drop_time_out_of_retention_time(&table_meta.drop_on, now) {
if !is_drop_time_retainable(table_meta.drop_on, *now) {
continue;
}
tb_metas.push((k, table_meta));
Expand Down Expand Up @@ -3561,17 +3485,25 @@ async fn list_table_copied_files(
Ok(copied_files)
}

// Return true if drop time is out of `DATA_RETENTION_TIME_IN_DAYS option,
// use DEFAULT_DATA_RETENTION_SECONDS by default.
fn is_drop_time_out_of_retention_time(
drop_on: &Option<DateTime<Utc>>,
now: &DateTime<Utc>,
) -> bool {
if let Some(drop_on) = drop_on {
return now.timestamp() - drop_on.timestamp() >= DEFAULT_DATA_RETENTION_SECONDS;
}
/// Get the retention boundary time before which the data can be permanently removed.
fn get_retention_boundary(now: DateTime<Utc>) -> DateTime<Utc> {
now - Duration::from_secs(DEFAULT_DATA_RETENTION_SECONDS as u64)
}

false
/// Determines if an item is within the retention period based on its drop time.
///
/// # Arguments
/// * `drop_on` - The optional timestamp when the item was marked for deletion.
/// * `now` - The current timestamp used as a reference point.
///
/// Items without a drop time (`None`) are always considered retainable.
/// The retention period is defined by `DATA_RETENTION_TIME_IN_DAYS`.
fn is_drop_time_retainable(drop_on: Option<DateTime<Utc>>, now: DateTime<Utc>) -> bool {
let retention_boundary = get_retention_boundary(now);

// If it is None, fill it with a very big time.
let drop_on = drop_on.unwrap_or(DateTime::<Utc>::MAX_UTC);
drop_on > retention_boundary
}

/// Get db id and its seq by name, returns (db_id_seq, db_id)
Expand Down
6 changes: 3 additions & 3 deletions src/meta/app/src/schema/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,10 @@ impl GetDatabaseReq {
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum DatabaseInfoFilter {
// include all dropped databases
IncludeDropped,
/// Include all databases, even those that are before retention boundary time.
IncludeNonRetainable,
}

#[derive(Clone, Debug, PartialEq, Eq)]
Expand Down
10 changes: 5 additions & 5 deletions src/meta/app/src/schema/database_id_history_ident.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
use crate::tenant_key::ident::TIdent;

/// The key of the list of database ids that have been used in history by a database name.
pub type DatabaseIdHistoryIdent = TIdent<Resource>;
pub type DatabaseIdHistoryIdentRaw = TIdentRaw<Resource>;
pub type DatabaseIdHistoryIdent = TIdent<DatabaseIdHistoryRsc>;
pub type DatabaseIdHistoryIdentRaw = TIdentRaw<DatabaseIdHistoryRsc>;

pub use kvapi_impl::Resource;
pub use kvapi_impl::DatabaseIdHistoryRsc;

use crate::tenant_key::raw::TIdentRaw;

Expand All @@ -44,8 +44,8 @@ mod kvapi_impl {
use crate::schema::DbIdList;
use crate::tenant_key::resource::TenantResource;

pub struct Resource;
impl TenantResource for Resource {
pub struct DatabaseIdHistoryRsc;
impl TenantResource for DatabaseIdHistoryRsc {
const PREFIX: &'static str = "__fd_db_id_list";
const TYPE: &'static str = "DatabaseIdHistoryIdent";
const HAS_TENANT: bool = true;
Expand Down

0 comments on commit 8b3c37a

Please sign in to comment.