Skip to content

Commit

Permalink
refactor: add KVPbCrudApi::crud_try_insert() and crud_try_upsert() (#โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ16484)

* refactor: add KVPbCrudApi::crud_remove()

* refactor: add KVPbCrudApi::crud_try_insert() and crud_try_upsert()
  • Loading branch information
drmingdrmer committed Sep 20, 2024
1 parent 207ee9e commit dffe40f
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 104 deletions.
138 changes: 136 additions & 2 deletions src/meta/api/src/kv_pb_crud_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_types::Change;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::With;
use databend_common_proto_conv::FromToProto;
use fastrace::func_name;
Expand All @@ -27,7 +30,10 @@ use log::debug;
use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_api::UpsertPB;
use crate::meta_txn_error::MetaTxnError;
use crate::send_txn;
use crate::txn_backoff::txn_backoff;
use crate::txn_cond_eq_seq;
use crate::util::txn_op_put_pb;

/// [`KVPbCrudApi`] provide generic meta-service access pattern implementations for `name -> value` mapping.
///
Expand All @@ -39,7 +45,113 @@ where
K: kvapi::Key + Clone + Send + Sync + 'static,
K::ValueType: FromToProto + Clone + Send + Sync + 'static,
{
/// Update or insert a `name -> value` mapping.
/// Attempts to insert a new key-value pair in it does not exist, without CAS loop.
///
/// See: [`KVPbCrudApi::crud_try_upsert`]
async fn crud_try_insert<E>(
&self,
key: &K,
value: K::ValueType,
ttl: Option<Duration>,
on_exist: impl FnOnce() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
self.crud_try_upsert(key, MatchSeq::Exact(0), value, ttl, on_exist)
.await
}

/// Attempts to insert or update a new key-value pair, without CAS loop.
///
/// # Arguments
/// * `key` - The identifier for the new entry.
/// * `value` - The value to be associated with the key.
/// * `ttl` - Optional time-to-live for the entry.
/// * `on_exist` - Callback function invoked if the key already exists.
///
/// # Returns
/// * `Ok(Ok(()))` if the insertion was successful.
/// * `Ok(Err(E))` if the key already exists and `on_exist` returned an error.
/// * `Err(MetaTxnError)` for transaction-related or meta-service errors.
async fn crud_try_upsert<E>(
&self,
key: &K,
match_seq: MatchSeq,
value: K::ValueType,
ttl: Option<Duration>,
on_exist: impl FnOnce() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
debug!(name_ident :? =key; "KVPbCrudApi: {}", func_name!());

let upsert = UpsertPB::insert(key.clone(), value).with(match_seq);
let upsert = if let Some(ttl) = ttl {
upsert.with_ttl(ttl)
} else {
upsert
};

let transition = self.upsert_pb(&upsert).await?;

if transition.is_changed() {
Ok(Ok(()))
} else {
Ok(on_exist())
}
}

/// Updates an existing key-value mapping with CAS loop.
///
/// # Arguments
/// * `name_ident` - The identifier of the key to update.
/// * `update` - A function that takes the current value and returns an optional tuple of
/// (new_value, ttl). If None is returned, the update is cancelled.
/// * `not_found` - A function called when the key doesn't exist. It should either return
/// an error or Ok(()) to cancel the update.
///
/// # Returns
/// * `Ok(Ok(()))` if the update was successful or cancelled.
/// * `Ok(Err(E))` if `not_found` returned an error.
/// * `Err(MetaTxnError)` for transaction-related errors.
///
/// # Note
/// This method uses optimistic locking and will retry on conflicts.
async fn crud_update_existing<E>(
&self,
name_ident: &K,
update: impl Fn(K::ValueType) -> Option<(K::ValueType, Option<Duration>)> + Send,
not_found: impl Fn() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
debug!(name_ident :? =name_ident; "KVPbCrudApi: {}", func_name!());

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let mut txn = TxnRequest::default();

let seq_meta = self.get_pb(name_ident).await?;
let seq = seq_meta.seq();

let updated = match seq_meta.into_value() {
Some(x) => update(x),
None => return Ok(not_found()),
};

let Some((updated, ttl)) = updated else {
// update is cancelled
return Ok(Ok(()));
};

txn.condition.push(txn_cond_eq_seq(name_ident, seq));
txn.if_then.push(txn_op_put_pb(name_ident, &updated, ttl)?);

let (succ, _responses) = send_txn(self, txn).await?;

if succ {
return Ok(Ok(()));
}
}
}

/// Update or insert a `name -> value` mapping, with CAS loop.
///
/// The `update` function is called with the previous value and should output the updated to write back.
/// - Ok(Some(x)): write back `x`.
Expand All @@ -49,7 +161,7 @@ where
/// This function returns an embedded result,
/// - the outer result is for underlying kvapi error,
/// - the inner result is for business logic error.
async fn upsert_with<E>(
async fn crud_upsert_with<E>(
&self,
name_ident: &K,
update: impl Fn(Option<SeqV<K::ValueType>>) -> Result<Option<K::ValueType>, E> + Send,
Expand Down Expand Up @@ -80,6 +192,28 @@ where
}
}
}

/// Remove the `name -> value` mapping by name.
///
/// `not_found` is called when the name does not exist.
/// And this function decide to:
/// - cancel update by returning `Ok(())`
/// - or return an error when the name does not exist.
async fn crud_remove<E>(
&self,
name_ident: &K,
not_found: impl Fn() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
debug!(key :? =name_ident; "NameValueApi: {}", func_name!());

let upsert = UpsertPB::delete(name_ident.clone());
let transition = self.upsert_pb(&upsert).await?;
if !transition.is_changed() {
return Ok(not_found());
}

Ok(Ok(()))
}
}

impl<K, T> KVPbCrudApi<K> for T
Expand Down
98 changes: 6 additions & 92 deletions src/meta/api/src/name_value_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,15 @@ use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::KeyCodec;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqValue;
use databend_common_meta_types::TxnRequest;
use databend_common_meta_types::With;
use databend_common_proto_conv::FromToProto;
use fastrace::func_name;
use log::debug;

use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_api::UpsertPB;
use crate::kv_pb_crud_api::KVPbCrudApi;
use crate::meta_txn_error::MetaTxnError;
use crate::send_txn;
use crate::txn_backoff::txn_backoff;
use crate::txn_cond_eq_seq;
use crate::util::txn_op_put_pb;

/// NameValueApi provide generic meta-service access pattern implementations for `name -> value` mapping.
///
Expand All @@ -53,25 +48,16 @@ where
/// Create a `name -> value` mapping.
async fn insert_name_value(
&self,
name_ident: TIdent<R, N>,
name_ident: &TIdent<R, N>,
value: R::ValueType,
ttl: Option<Duration>,
) -> Result<Result<(), ExistError<R, N>>, MetaTxnError> {
debug!(name_ident :? =name_ident; "NameValueApi: {}", func_name!());

let upsert = UpsertPB::insert(name_ident.clone(), value);
let upsert = if let Some(ttl) = ttl {
upsert.with_ttl(ttl)
} else {
upsert
};

let transition = self.upsert_pb(&upsert).await?;

if !transition.is_changed() {
return Ok(Err(name_ident.exist_error(func_name!())));
}
Ok(Ok(()))
self.crud_try_insert(name_ident, value, ttl, || {
Err(name_ident.exist_error(func_name!()))
})
.await
}

/// Create a `name -> value` mapping, with `CreateOption` support
Expand All @@ -96,78 +82,6 @@ where
}
Ok(Ok(()))
}

/// Update an existent `name -> value` mapping.
///
/// The `update` function is called with the previous value
/// and should output the updated to write back,
/// with an optional time-to-last value.
///
/// `not_found` is called when the name does not exist.
/// And this function decide to:
/// - cancel update by returning `Ok(())`
/// - or return an error when the name does not exist.
async fn update_existent_name_value<E>(
&self,
name_ident: &TIdent<R, N>,
update: impl Fn(R::ValueType) -> Option<(R::ValueType, Option<Duration>)> + Send,
not_found: impl Fn() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
debug!(name_ident :? =name_ident; "NameValueApi: {}", func_name!());

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

let mut txn = TxnRequest::default();

let seq_meta = self.get_pb(name_ident).await?;
let seq = seq_meta.seq();

let updated = match seq_meta.into_value() {
Some(x) => update(x),
None => return Ok(not_found()),
};

let Some((updated, ttl)) = updated else {
// update is cancelled
return Ok(Ok(()));
};

txn.condition.push(txn_cond_eq_seq(name_ident, seq));
txn.if_then.push(txn_op_put_pb(name_ident, &updated, ttl)?);

let (succ, _responses) = send_txn(self, txn).await?;

if succ {
return Ok(Ok(()));
}
}
}

/// Remove the `name -> id -> value` mapping by name, along with associated records, such `id->name` reverse index.
///
/// Returns the removed `SeqV<id>` and `SeqV<value>`, if the name exists.
/// Otherwise, returns None.
///
/// `associated_records` is used to generate additional key-values to remove along with the main operation.
/// Such operations do not have any condition constraints.
/// For example, a `name -> id` mapping can have a reverse `id -> name` mapping.
async fn remove_name_value<E>(
&self,
name_ident: &TIdent<R, N>,
not_found: impl Fn() -> Result<(), E> + Send,
) -> Result<Result<(), E>, MetaTxnError> {
debug!(key :? =name_ident; "NameValueApi: {}", func_name!());

let upsert = UpsertPB::delete(name_ident.clone());
let transition = self.upsert_pb(&upsert).await?;
if !transition.is_changed() {
return Ok(not_found());
}

Ok(Ok(()))
}
}

impl<R, N, T> NameValueApi<R, N> for T
Expand Down
18 changes: 8 additions & 10 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
};

self.update_existent_name_value(
self.crud_update_existing(
&req.name_ident,
|mut meta| {
meta.virtual_columns = req.virtual_columns.clone();
Expand All @@ -1007,7 +1007,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
};

self.remove_name_value(&req.name_ident, not_found).await??;
self.crud_remove(&req.name_ident, not_found).await??;

Ok(())
}
Expand Down Expand Up @@ -2306,7 +2306,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
table_id: req.table_id,
};

self.upsert_with(&tbid, |seq_meta: Option<SeqV<TableMeta>>| {
self.crud_upsert_with(&tbid, |seq_meta: Option<SeqV<TableMeta>>| {
let Some(seq_meta) = seq_meta else {
return Err(AppError::UnknownTableId(UnknownTableId::new(
req.table_id,
Expand Down Expand Up @@ -2964,8 +2964,8 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

// Revision is unique. if it presents, consider it as success.
// Thus, we could just ignore create result
let _create_res = self
.insert_name_value(key, lock_meta, Some(req.ttl))
let _ = self
.crud_try_insert(&key, lock_meta, Some(req.ttl), || Ok::<(), Infallible>(()))
.await?;

Ok(CreateLockRevReply { revision })
Expand All @@ -2982,7 +2982,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let table_id = lock_key.get_table_id();
let key = lock_key.gen_key(req.revision);

self.update_existent_name_value(
self.crud_update_existing(
&key,
|mut lock_meta| {
// Set `acquire_lock = true` to initialize `acquired_on` when the
Expand Down Expand Up @@ -3014,9 +3014,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let revision = req.revision;
let key = lock_key.gen_key(revision);

self.remove_name_value(&key, || Ok::<(), ()>(()))
.await?
.unwrap();
self.crud_remove(&key, || Ok::<(), ()>(())).await?.unwrap();

Ok(())
}
Expand Down Expand Up @@ -3130,7 +3128,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
debug!(req :? =(&name_ident, &value); "SchemaApi: {}", func_name!());

let transition = self
.upsert_with::<Infallible>(name_ident, |t: Option<SeqV<LeastVisibleTime>>| {
.crud_upsert_with::<Infallible>(name_ident, |t: Option<SeqV<LeastVisibleTime>>| {
let curr = t.into_value().unwrap_or_default();
if curr.time >= value.time {
Ok(None)
Expand Down

0 comments on commit dffe40f

Please sign in to comment.