Skip to content

Commit

Permalink
refactor: remove get_pb_value()
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Sep 25, 2024
1 parent b4e3025 commit 9e8b998
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 53 deletions.
1 change: 0 additions & 1 deletion src/meta/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ pub use util::db_has_to_exist;
pub use util::deserialize_struct;
pub use util::deserialize_u64;
pub use util::fetch_id;
pub use util::get_pb_value;
pub use util::get_u64_value;
pub use util::list_keys;
pub use util::list_u64_value;
Expand Down
54 changes: 18 additions & 36 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ use databend_common_meta_app::app_error::UnknownTable;
use databend_common_meta_app::app_error::UnknownTableId;
use databend_common_meta_app::app_error::ViewAlreadyExists;
use databend_common_meta_app::data_mask::MaskPolicyTableIdListIdent;
use databend_common_meta_app::data_mask::MaskpolicyTableIdList;
use databend_common_meta_app::id_generator::IdGenerator;
use databend_common_meta_app::schema::catalog_id_ident::CatalogId;
use databend_common_meta_app::schema::catalog_name_ident::CatalogNameIdentRaw;
Expand Down Expand Up @@ -195,7 +194,6 @@ use crate::assert_table_exist;
use crate::db_has_to_exist;
use crate::deserialize_struct;
use crate::fetch_id;
use crate::get_pb_value;
use crate::get_u64_value;
use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
Expand All @@ -220,6 +218,7 @@ use crate::util::deserialize_struct_get_response;
use crate::util::mget_pb_values;
use crate::util::txn_delete_exact;
use crate::util::txn_op_put_pb;
use crate::util::txn_replace_exact;
use crate::util::unknown_database_error;
use crate::SchemaApi;
use crate::DEFAULT_MGET_SIZE;
Expand Down Expand Up @@ -3154,17 +3153,16 @@ async fn construct_drop_table_txn_operations(
let tbid = TableId { table_id };

// Check if table exists.
let (tb_meta_seq, tb_meta): (_, Option<TableMeta>) = get_pb_value(kv_api, &tbid).await?;
if tb_meta_seq == 0 || tb_meta.is_none() {
let (tb_meta_seq, tb_meta) = kv_api.get_pb_seq_and_value(&tbid).await?;
if tb_meta_seq == 0 {
return Err(KVAppError::AppError(AppError::UnknownTableId(
UnknownTableId::new(table_id, "drop_table_by_id failed to find valid tb_meta"),
)));
}

// Get db name, tenant name and related info for tx.
let table_id_to_name = TableIdToName { table_id };
let (_, table_name_opt): (_, Option<DBIdTableName>) =
get_pb_value(kv_api, &table_id_to_name).await?;
let (_, table_name_opt) = kv_api.get_pb_seq_and_value(&table_id_to_name).await?;

let dbid_tbname = if let Some(db_id_table_name) = table_name_opt {
db_id_table_name
Expand Down Expand Up @@ -3246,8 +3244,8 @@ async fn construct_drop_table_txn_operations(
database_id: db_id,
table_name: dbid_tbname.table_name.clone(),
};
let (tb_id_list_seq, _tb_id_list_opt): (_, Option<TableIdList>) =
get_pb_value(kv_api, &dbid_tbname_idlist).await?;
let (tb_id_list_seq, _tb_id_list_opt) =
kv_api.get_pb_seq_and_value(&dbid_tbname_idlist).await?;
if tb_id_list_seq == 0 {
let mut tb_id_list = TableIdList::new();
tb_id_list.append(table_id);
Expand Down Expand Up @@ -3334,8 +3332,7 @@ async fn drop_database_meta(
// add DbIdListKey if not exists
let dbid_idlist =
DatabaseIdHistoryIdent::new(tenant_dbname.tenant(), tenant_dbname.database_name());
let (db_id_list_seq, db_id_list_opt): (_, Option<DbIdList>) =
get_pb_value(kv_api, &dbid_idlist).await?;
let (db_id_list_seq, db_id_list_opt) = kv_api.get_pb_seq_and_value(&dbid_idlist).await?;

if db_id_list_seq == 0 || db_id_list_opt.is_none() {
warn!(
Expand Down Expand Up @@ -3482,7 +3479,7 @@ pub(crate) async fn get_db_by_id_or_err(
) -> Result<(u64, DatabaseMeta), KVAppError> {
let id_key = DatabaseId { db_id };

let (db_meta_seq, db_meta) = get_pb_value(kv_api, &id_key).await?;
let (db_meta_seq, db_meta) = kv_api.get_pb_seq_and_value(&id_key).await?;
db_id_has_to_exist(db_meta_seq, db_id, msg)?;

Ok((
Expand Down Expand Up @@ -3712,8 +3709,7 @@ async fn gc_dropped_db_by_id(
) -> Result<(), KVAppError> {
// List tables by tenant, db_id, table_name.
let dbid_idlist = DatabaseIdHistoryIdent::new(tenant, db_name);
let (db_id_list_seq, db_id_list_opt): (_, Option<DbIdList>) =
get_pb_value(kv_api, &dbid_idlist).await?;
let (db_id_list_seq, db_id_list_opt) = kv_api.get_pb_seq_and_value(&dbid_idlist).await?;

let mut db_id_list = match db_id_list_opt {
Some(list) => list,
Expand All @@ -3724,14 +3720,12 @@ async fn gc_dropped_db_by_id(
continue;
}
let dbid = DatabaseId { db_id };
let (db_meta_seq, _db_meta): (_, Option<DatabaseMeta>) =
get_pb_value(kv_api, &dbid).await?;
let (db_meta_seq, _db_meta) = kv_api.get_pb_seq_and_value(&dbid).await?;
if db_meta_seq == 0 {
return Ok(());
}
let id_to_name = DatabaseIdToName { db_id };
let (name_ident_seq, _name_ident): (_, Option<DatabaseNameIdentRaw>) =
get_pb_value(kv_api, &id_to_name).await?;
let (name_ident_seq, _name_ident) = kv_api.get_pb_seq_and_value(&id_to_name).await?;
if name_ident_seq == 0 {
return Ok(());
}
Expand Down Expand Up @@ -3946,18 +3940,13 @@ async fn update_mask_policy(
key: MaskPolicyTableIdListIdent,
f: impl FnOnce(&mut BTreeSet<u64>),
) -> Result<(), KVAppError> {
let (id_list_seq, id_list_opt): (_, Option<MaskpolicyTableIdList>) =
get_pb_value(kv_api, &key).await?;
let Some(mut seq_list) = kv_api.get_pb(&key).await? else {
return Ok(());
};

if let Some(mut id_list) = id_list_opt {
f(&mut id_list.id_list);
f(&mut seq_list.data.id_list);

txn_req.condition.push(txn_cond_seq(&key, Eq, id_list_seq));

txn_req
.if_then
.push(txn_op_put(&key, serialize_struct(&id_list)?));
}
txn_replace_exact(txn_req, &key, seq_list.seq, &seq_list.data)?;

Ok(())
}
Expand Down Expand Up @@ -4127,20 +4116,13 @@ async fn handle_undrop_table(
database_id: db_id,
table_name: tenant_dbname_tbname.table_name.clone(),
};
let (tb_id_list_seq, tb_id_list_opt): (_, Option<TableIdList>) =
get_pb_value(kv_api, &dbid_tbname_idlist).await?;

let mut tb_id_list = if tb_id_list_seq == 0 {
let Some(seq_list) = kv_api.get_pb(&dbid_tbname_idlist).await? else {
return Err(KVAppError::AppError(AppError::UndropTableHasNoHistory(
UndropTableHasNoHistory::new(&tenant_dbname_tbname.table_name),
)));
} else {
tb_id_list_opt.ok_or_else(|| {
KVAppError::AppError(AppError::UndropTableHasNoHistory(
UndropTableHasNoHistory::new(&tenant_dbname_tbname.table_name),
))
})?
};
let mut tb_id_list = seq_list.data;

let table_id = req.extract_and_validate_table_id(&mut tb_id_list)?;

Expand Down
16 changes: 0 additions & 16 deletions src/meta/api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::seq_value::SeqValue;
use databend_common_meta_types::txn_condition::Target;
use databend_common_meta_types::ConditionResult;
use databend_common_meta_types::InvalidArgument;
Expand Down Expand Up @@ -122,21 +121,6 @@ where K: kvapi::Key {
}
}

/// Get value that are encoded with FromToProto.
///
/// It returns seq number and the data.
pub async fn get_pb_value<K>(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
k: &K,
) -> Result<(u64, Option<K::ValueType>), MetaError>
where
K: kvapi::Key,
K::ValueType: FromToProto,
{
let res = kv_api.get_pb(k).await?;
Ok((res.seq(), res.into_value()))
}

/// Batch get values that are encoded with FromToProto.
pub async fn mget_pb_values<T>(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
Expand Down

0 comments on commit 9e8b998

Please sign in to comment.