Skip to content

Commit

Permalink
refactor: Simplify UdfMgr::add_udf() (#14631)
Browse files Browse the repository at this point in the history
* refactor: Add KVPbApi::list_pb_values() returns values without keys

* refactor: add KVPbApi::upsert_pb() to update or insert

* refactor: Simplify UdfMgr::add_udf()
  • Loading branch information
drmingdrmer committed Feb 6, 2024
1 parent a9b8926 commit 4b157cd
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 53 deletions.
58 changes: 58 additions & 0 deletions src/meta/api/src/kv_pb_api/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_meta_types::Change;
use databend_common_meta_types::Operation;
use databend_common_meta_types::SeqV;
use databend_common_proto_conv::FromToProto;

use crate::kv_pb_api::PbDecodeError;
use crate::kv_pb_api::PbEncodeError;

/// Encode an upsert Operation of T into protobuf encoded value.
pub fn encode_operation<T>(value: &Operation<T>) -> Result<Operation<Vec<u8>>, PbEncodeError>
where T: FromToProto {
match value {
Operation::Update(t) => {
let p = t.to_pb()?;
let mut buf = vec![];
prost::Message::encode(&p, &mut buf)?;
Ok(Operation::Update(buf))
}
Operation::Delete => Ok(Operation::Delete),
Operation::AsIs => Ok(Operation::AsIs),
}
}

/// Decode Change<Vec<u8>> into Change<T>, with FromToProto.
pub fn decode_transition<T>(seqv: Change<Vec<u8>>) -> Result<Change<T>, PbDecodeError>
where T: FromToProto {
let c = Change {
ident: seqv.ident,
prev: seqv.prev.map(decode_seqv::<T>).transpose()?,
result: seqv.result.map(decode_seqv::<T>).transpose()?,
};

Ok(c)
}

/// Deserialize SeqV<Vec<u8>> into SeqV<T>, with FromToProto.
pub fn decode_seqv<T>(seqv: SeqV) -> Result<SeqV<T>, PbDecodeError>
where T: FromToProto {
let buf = &seqv.data;
let p: T::PB = prost::Message::decode(buf.as_ref())?;
let v: T = FromToProto::from_pb(p)?;

Ok(SeqV::with_meta(seqv.seq, seqv.meta, v))
}
60 changes: 60 additions & 0 deletions src/meta/api/src/kv_pb_api/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Defines errors used by protobuf based API.

use databend_common_meta_types::InvalidArgument;
use databend_common_meta_types::MetaError;
use databend_common_proto_conv::Incompatible;

use crate::kv_pb_api::PbDecodeError;

/// An error occurred when encoding with FromToProto.
#[derive(Clone, Debug, PartialEq, thiserror::Error)]
#[error("PbEncodeError: {0}")]
pub enum PbEncodeError {
EncodeError(#[from] prost::EncodeError),
Incompatible(#[from] Incompatible),
}

impl From<PbEncodeError> for MetaError {
fn from(value: PbEncodeError) -> Self {
match value {
PbEncodeError::EncodeError(e) => MetaError::from(InvalidArgument::new(e, "")),
PbEncodeError::Incompatible(e) => MetaError::from(InvalidArgument::new(e, "")),
}
}
}

/// An error occurs when writing protobuf encoded value to kv store.
#[derive(Clone, Debug, PartialEq, thiserror::Error)]
#[error("PbApiWriteError: {0}")]
pub enum PbApiWriteError<E> {
PbEncodeError(#[from] PbEncodeError),
/// upsert reads the state transition after the operation.
PbDecodeError(#[from] PbDecodeError),
/// Error returned from KVApi.
KvApiError(E),
}

impl From<PbApiWriteError<MetaError>> for MetaError {
/// For KVApi that returns MetaError, convert protobuf related error to MetaError directly.
fn from(value: PbApiWriteError<MetaError>) -> Self {
match value {
PbApiWriteError::PbEncodeError(e) => MetaError::from(e),
PbApiWriteError::PbDecodeError(e) => MetaError::from(e),
PbApiWriteError::KvApiError(e) => e,
}
}
}
97 changes: 86 additions & 11 deletions src/meta/api/src/kv_pb_api.rs → src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

//! Kv API with `kvapi::Key` type key and protobuf encoded value.

mod codec;
mod errors;
mod upsert_pb;

use std::future::Future;

use databend_common_meta_kvapi::kvapi;
Expand All @@ -22,26 +26,47 @@ use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::NonEmptyItem;
use databend_common_meta_types::protobuf::StreamItem;
use databend_common_meta_types::Change;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaNetworkError;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::UpsertKV;
use databend_common_proto_conv::FromToProto;
use databend_common_proto_conv::Incompatible;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::stream::BoxStream;
use futures::stream::StreamExt;
use futures::TryStreamExt;
use PbApiReadError::KvApiError;

/// An error occurs when decoding protobuf encoded value.
pub(crate) use self::codec::decode_seqv;
pub(crate) use self::codec::decode_transition;
pub(crate) use self::codec::encode_operation;
pub use self::errors::PbApiWriteError;
pub use self::errors::PbEncodeError;
pub use self::upsert_pb::UpsertPB;

// TODO: move error to separate file

/// An error occurred when decoding protobuf encoded value.
#[derive(Clone, Debug, PartialEq, thiserror::Error)]
#[error("PbDecodeError: {0}")]
pub enum PbDecodeError {
DecodeError(#[from] prost::DecodeError),
Incompatible(#[from] Incompatible),
}

impl From<PbDecodeError> for MetaError {
fn from(value: PbDecodeError) -> Self {
match value {
PbDecodeError::DecodeError(e) => MetaError::from(InvalidReply::new("", &e)),
PbDecodeError::Incompatible(e) => MetaError::from(InvalidReply::new("", &e)),
}
}
}

/// An error occurs when found an unexpected None value.
#[derive(Clone, Debug, PartialEq, thiserror::Error)]
#[error("NoneValue: unexpected None value of key: '{key}'")]
Expand Down Expand Up @@ -112,6 +137,50 @@ impl From<PbApiReadError<MetaError>> for MetaError {

/// This trait provides a way to access a kv store with `kvapi::Key` type key and protobuf encoded value.
pub trait KVPbApi: KVApi {
/// Update or insert a protobuf encoded value by kvapi::Key.
///
/// The key will be converted to string and the value is encoded by `FromToProto`.
/// It returns the transition before and after executing the operation.
/// The state before and after will be the same if the seq does not match.
fn upsert_pb<K>(
&self,
req: &UpsertPB<K>,
) -> impl Future<Output = Result<Change<K::ValueType>, Self::Error>> + Send
where
K: kvapi::Key + Send,
K::ValueType: FromToProto,
Self::Error: From<PbApiWriteError<Self::Error>>,
{
self.upsert_pb_low(req).map_err(Self::Error::from)
}

/// Same as `upsert_pb` but returns [`PbApiWriteError`]. No require of `From<PbApiWriteError>` for `Self::Error`.
fn upsert_pb_low<K>(
&self,
req: &UpsertPB<K>,
) -> impl Future<Output = Result<Change<K::ValueType>, PbApiWriteError<Self::Error>>> + Send
where
K: kvapi::Key,
K::ValueType: FromToProto,
{
// leave it out of async move block to avoid requiring Send
let k = req.key.to_string_key();
let v = encode_operation(&req.value);
let seq = req.seq;
let value_meta = req.value_meta.clone();

async move {
let v = v?;
let req = UpsertKV::new(k, seq, v, value_meta);
let reply = self
.upsert_kv(req)
.await
.map_err(PbApiWriteError::KvApiError)?;
let transition = decode_transition(reply)?;
Ok(transition)
}
}

/// Get protobuf encoded value by kvapi::Key.
///
/// The key will be converted to string and the returned value is decoded by `FromToProto`.
Expand Down Expand Up @@ -146,6 +215,22 @@ pub trait KVPbApi: KVApi {
}
}

/// Same as `list_pb` but does not return key, only values.
fn list_pb_values<K>(
&self,
prefix: &DirName<K>,
) -> impl Future<
Output = Result<BoxStream<'static, Result<K::ValueType, Self::Error>>, Self::Error>,
> + Send
where
K: kvapi::Key + 'static,
K::ValueType: FromToProto,
Self::Error: From<PbApiReadError<Self::Error>>,
{
self.list_pb(prefix)
.map_ok(|strm| strm.map_ok(|x| x.seqv.data).boxed())
}

/// List protobuf encoded values by prefix and returns a stream.
///
/// The returned value is decoded by `FromToProto`.
Expand Down Expand Up @@ -216,16 +301,6 @@ where
}
}

/// Deserialize SeqV<Vec<u8>> into SeqV<T>, with FromToProto.
fn decode_seqv<T>(seqv: SeqV) -> Result<SeqV<T>, PbDecodeError>
where T: FromToProto {
let buf = &seqv.data;
let p: T::PB = prost::Message::decode(buf.as_ref())?;
let v: T = FromToProto::from_pb(p)?;

Ok(SeqV::with_meta(seqv.seq, seqv.meta, v))
}

#[cfg(test)]
mod tests {
use crate::kv_pb_api::PbDecodeError;
Expand Down
105 changes: 105 additions & 0 deletions src/meta/api/src/kv_pb_api/upsert_pb.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaSpec;
use databend_common_meta_types::Operation;
use databend_common_meta_types::With;

#[derive(Clone, Debug)]
pub struct UpsertPB<K: kvapi::Key> {
pub key: K,

/// Since a sequence number is always positive, using Exact(0) to perform an add-if-absent operation.
/// - GE(1) to perform an update-any operation.
/// - Exact(n) to perform an update on some specified version.
/// - Any to perform an update or insert that always takes effect.
pub seq: MatchSeq,

/// The value to set. A `None` indicates to delete it.
pub value: Operation<K::ValueType>,

/// Meta data of a value.
pub value_meta: Option<MetaSpec>,
}

impl<K: kvapi::Key> UpsertPB<K> {
pub fn new(
key: K,
seq: MatchSeq,
value: Operation<K::ValueType>,
value_meta: Option<MetaSpec>,
) -> Self {
Self {
key,
seq,
value,
value_meta,
}
}

pub fn insert(key: K, value: K::ValueType) -> Self {
Self {
key,
seq: MatchSeq::Exact(0),
value: Operation::Update(value),
value_meta: None,
}
}

pub fn update(key: K, value: K::ValueType) -> Self {
Self {
key,
seq: MatchSeq::GE(0),
value: Operation::Update(value),
value_meta: None,
}
}

pub fn delete(key: K) -> Self {
Self {
key,
seq: MatchSeq::GE(0),
value: Operation::Delete,
value_meta: None,
}
}

pub fn with_expire_sec(self, expire_at_sec: u64) -> Self {
self.with(MetaSpec::new_expire(expire_at_sec))
}

/// Set the time to last for the value.
/// When the ttl is passed, the value is deleted.
pub fn with_ttl(self, ttl: Duration) -> Self {
self.with(MetaSpec::new_ttl(ttl))
}
}

impl<K: kvapi::Key> With<MatchSeq> for UpsertPB<K> {
fn with(mut self, seq: MatchSeq) -> Self {
self.seq = seq;
self
}
}

impl<K: kvapi::Key> With<MetaSpec> for UpsertPB<K> {
fn with(mut self, meta: MetaSpec) -> Self {
self.value_meta = Some(meta);
self
}
}
6 changes: 2 additions & 4 deletions src/meta/types/src/change.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,7 @@ use crate::SeqValue;
/// the `result` could also be possible to be None.
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, derive_more::From)]
pub struct Change<T, ID = u64>
where
ID: Clone + PartialEq,
T: Clone + PartialEq,
where ID: Clone + PartialEq
{
/// identity of the resource that is changed.
pub ident: Option<ID>,
Expand All @@ -42,7 +40,7 @@ where
impl<T, ID> Change<T, ID>
where
ID: Clone + PartialEq + Debug,
T: Clone + PartialEq + Debug,
T: PartialEq + Debug,
{
pub fn new(prev: Option<SeqV<T>>, result: Option<SeqV<T>>) -> Self {
Change {
Expand Down
Loading

0 comments on commit 4b157cd

Please sign in to comment.