From 4b157cd1fcea75c5dd26693dc4d369cde618dca5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Tue, 6 Feb 2024 22:08:10 +0800 Subject: [PATCH] refactor: Simplify UdfMgr::add_udf() (#14631) * refactor: Add KVPbApi::list_pb_values() returns values without keys * refactor: add KVPbApi::upsert_pb() to update or insert * refactor: Simplify UdfMgr::add_udf() --- src/meta/api/src/kv_pb_api/codec.rs | 58 ++++++++++ src/meta/api/src/kv_pb_api/errors.rs | 60 ++++++++++ .../src/{kv_pb_api.rs => kv_pb_api/mod.rs} | 97 ++++++++++++++-- src/meta/api/src/kv_pb_api/upsert_pb.rs | 105 ++++++++++++++++++ src/meta/types/src/change.rs | 6 +- src/meta/types/src/cmd/mod.rs | 2 +- src/meta/types/src/errors/meta_api_errors.rs | 8 ++ src/meta/types/src/errors/meta_errors.rs | 7 ++ src/query/management/src/udf/udf_mgr.rs | 53 +++------ 9 files changed, 343 insertions(+), 53 deletions(-) create mode 100644 src/meta/api/src/kv_pb_api/codec.rs create mode 100644 src/meta/api/src/kv_pb_api/errors.rs rename src/meta/api/src/{kv_pb_api.rs => kv_pb_api/mod.rs} (73%) create mode 100644 src/meta/api/src/kv_pb_api/upsert_pb.rs diff --git a/src/meta/api/src/kv_pb_api/codec.rs b/src/meta/api/src/kv_pb_api/codec.rs new file mode 100644 index 000000000000..84168daf0ae2 --- /dev/null +++ b/src/meta/api/src/kv_pb_api/codec.rs @@ -0,0 +1,58 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_meta_types::Change; +use databend_common_meta_types::Operation; +use databend_common_meta_types::SeqV; +use databend_common_proto_conv::FromToProto; + +use crate::kv_pb_api::PbDecodeError; +use crate::kv_pb_api::PbEncodeError; + +/// Encode an upsert Operation of T into protobuf encoded value. +pub fn encode_operation(value: &Operation) -> Result>, PbEncodeError> +where T: FromToProto { + match value { + Operation::Update(t) => { + let p = t.to_pb()?; + let mut buf = vec![]; + prost::Message::encode(&p, &mut buf)?; + Ok(Operation::Update(buf)) + } + Operation::Delete => Ok(Operation::Delete), + Operation::AsIs => Ok(Operation::AsIs), + } +} + +/// Decode Change> into Change, with FromToProto. +pub fn decode_transition(seqv: Change>) -> Result, PbDecodeError> +where T: FromToProto { + let c = Change { + ident: seqv.ident, + prev: seqv.prev.map(decode_seqv::).transpose()?, + result: seqv.result.map(decode_seqv::).transpose()?, + }; + + Ok(c) +} + +/// Deserialize SeqV> into SeqV, with FromToProto. +pub fn decode_seqv(seqv: SeqV) -> Result, PbDecodeError> +where T: FromToProto { + let buf = &seqv.data; + let p: T::PB = prost::Message::decode(buf.as_ref())?; + let v: T = FromToProto::from_pb(p)?; + + Ok(SeqV::with_meta(seqv.seq, seqv.meta, v)) +} diff --git a/src/meta/api/src/kv_pb_api/errors.rs b/src/meta/api/src/kv_pb_api/errors.rs new file mode 100644 index 000000000000..527e7cc58a70 --- /dev/null +++ b/src/meta/api/src/kv_pb_api/errors.rs @@ -0,0 +1,60 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Defines errors used by protobuf based API. + +use databend_common_meta_types::InvalidArgument; +use databend_common_meta_types::MetaError; +use databend_common_proto_conv::Incompatible; + +use crate::kv_pb_api::PbDecodeError; + +/// An error occurred when encoding with FromToProto. +#[derive(Clone, Debug, PartialEq, thiserror::Error)] +#[error("PbEncodeError: {0}")] +pub enum PbEncodeError { + EncodeError(#[from] prost::EncodeError), + Incompatible(#[from] Incompatible), +} + +impl From for MetaError { + fn from(value: PbEncodeError) -> Self { + match value { + PbEncodeError::EncodeError(e) => MetaError::from(InvalidArgument::new(e, "")), + PbEncodeError::Incompatible(e) => MetaError::from(InvalidArgument::new(e, "")), + } + } +} + +/// An error occurs when writing protobuf encoded value to kv store. +#[derive(Clone, Debug, PartialEq, thiserror::Error)] +#[error("PbApiWriteError: {0}")] +pub enum PbApiWriteError { + PbEncodeError(#[from] PbEncodeError), + /// upsert reads the state transition after the operation. + PbDecodeError(#[from] PbDecodeError), + /// Error returned from KVApi. + KvApiError(E), +} + +impl From> for MetaError { + /// For KVApi that returns MetaError, convert protobuf related error to MetaError directly. + fn from(value: PbApiWriteError) -> Self { + match value { + PbApiWriteError::PbEncodeError(e) => MetaError::from(e), + PbApiWriteError::PbDecodeError(e) => MetaError::from(e), + PbApiWriteError::KvApiError(e) => e, + } + } +} diff --git a/src/meta/api/src/kv_pb_api.rs b/src/meta/api/src/kv_pb_api/mod.rs similarity index 73% rename from src/meta/api/src/kv_pb_api.rs rename to src/meta/api/src/kv_pb_api/mod.rs index 3df1dd610813..a35894ac234b 100644 --- a/src/meta/api/src/kv_pb_api.rs +++ b/src/meta/api/src/kv_pb_api/mod.rs @@ -14,6 +14,10 @@ //! Kv API with `kvapi::Key` type key and protobuf encoded value. +mod codec; +mod errors; +mod upsert_pb; + use std::future::Future; use databend_common_meta_kvapi::kvapi; @@ -22,19 +26,31 @@ use databend_common_meta_kvapi::kvapi::KVApi; use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_kvapi::kvapi::NonEmptyItem; use databend_common_meta_types::protobuf::StreamItem; +use databend_common_meta_types::Change; use databend_common_meta_types::InvalidReply; use databend_common_meta_types::MetaError; use databend_common_meta_types::MetaNetworkError; use databend_common_meta_types::SeqV; +use databend_common_meta_types::UpsertKV; use databend_common_proto_conv::FromToProto; use databend_common_proto_conv::Incompatible; use futures::future::FutureExt; +use futures::future::TryFutureExt; use futures::stream::BoxStream; use futures::stream::StreamExt; use futures::TryStreamExt; use PbApiReadError::KvApiError; -/// An error occurs when decoding protobuf encoded value. +pub(crate) use self::codec::decode_seqv; +pub(crate) use self::codec::decode_transition; +pub(crate) use self::codec::encode_operation; +pub use self::errors::PbApiWriteError; +pub use self::errors::PbEncodeError; +pub use self::upsert_pb::UpsertPB; + +// TODO: move error to separate file + +/// An error occurred when decoding protobuf encoded value. #[derive(Clone, Debug, PartialEq, thiserror::Error)] #[error("PbDecodeError: {0}")] pub enum PbDecodeError { @@ -42,6 +58,15 @@ pub enum PbDecodeError { Incompatible(#[from] Incompatible), } +impl From for MetaError { + fn from(value: PbDecodeError) -> Self { + match value { + PbDecodeError::DecodeError(e) => MetaError::from(InvalidReply::new("", &e)), + PbDecodeError::Incompatible(e) => MetaError::from(InvalidReply::new("", &e)), + } + } +} + /// An error occurs when found an unexpected None value. #[derive(Clone, Debug, PartialEq, thiserror::Error)] #[error("NoneValue: unexpected None value of key: '{key}'")] @@ -112,6 +137,50 @@ impl From> for MetaError { /// This trait provides a way to access a kv store with `kvapi::Key` type key and protobuf encoded value. pub trait KVPbApi: KVApi { + /// Update or insert a protobuf encoded value by kvapi::Key. + /// + /// The key will be converted to string and the value is encoded by `FromToProto`. + /// It returns the transition before and after executing the operation. + /// The state before and after will be the same if the seq does not match. + fn upsert_pb( + &self, + req: &UpsertPB, + ) -> impl Future, Self::Error>> + Send + where + K: kvapi::Key + Send, + K::ValueType: FromToProto, + Self::Error: From>, + { + self.upsert_pb_low(req).map_err(Self::Error::from) + } + + /// Same as `upsert_pb` but returns [`PbApiWriteError`]. No require of `From` for `Self::Error`. + fn upsert_pb_low( + &self, + req: &UpsertPB, + ) -> impl Future, PbApiWriteError>> + Send + where + K: kvapi::Key, + K::ValueType: FromToProto, + { + // leave it out of async move block to avoid requiring Send + let k = req.key.to_string_key(); + let v = encode_operation(&req.value); + let seq = req.seq; + let value_meta = req.value_meta.clone(); + + async move { + let v = v?; + let req = UpsertKV::new(k, seq, v, value_meta); + let reply = self + .upsert_kv(req) + .await + .map_err(PbApiWriteError::KvApiError)?; + let transition = decode_transition(reply)?; + Ok(transition) + } + } + /// Get protobuf encoded value by kvapi::Key. /// /// The key will be converted to string and the returned value is decoded by `FromToProto`. @@ -146,6 +215,22 @@ pub trait KVPbApi: KVApi { } } + /// Same as `list_pb` but does not return key, only values. + fn list_pb_values( + &self, + prefix: &DirName, + ) -> impl Future< + Output = Result>, Self::Error>, + > + Send + where + K: kvapi::Key + 'static, + K::ValueType: FromToProto, + Self::Error: From>, + { + self.list_pb(prefix) + .map_ok(|strm| strm.map_ok(|x| x.seqv.data).boxed()) + } + /// List protobuf encoded values by prefix and returns a stream. /// /// The returned value is decoded by `FromToProto`. @@ -216,16 +301,6 @@ where } } -/// Deserialize SeqV> into SeqV, with FromToProto. -fn decode_seqv(seqv: SeqV) -> Result, PbDecodeError> -where T: FromToProto { - let buf = &seqv.data; - let p: T::PB = prost::Message::decode(buf.as_ref())?; - let v: T = FromToProto::from_pb(p)?; - - Ok(SeqV::with_meta(seqv.seq, seqv.meta, v)) -} - #[cfg(test)] mod tests { use crate::kv_pb_api::PbDecodeError; diff --git a/src/meta/api/src/kv_pb_api/upsert_pb.rs b/src/meta/api/src/kv_pb_api/upsert_pb.rs new file mode 100644 index 000000000000..a96873ee0a27 --- /dev/null +++ b/src/meta/api/src/kv_pb_api/upsert_pb.rs @@ -0,0 +1,105 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use databend_common_meta_kvapi::kvapi; +use databend_common_meta_types::MatchSeq; +use databend_common_meta_types::MetaSpec; +use databend_common_meta_types::Operation; +use databend_common_meta_types::With; + +#[derive(Clone, Debug)] +pub struct UpsertPB { + pub key: K, + + /// Since a sequence number is always positive, using Exact(0) to perform an add-if-absent operation. + /// - GE(1) to perform an update-any operation. + /// - Exact(n) to perform an update on some specified version. + /// - Any to perform an update or insert that always takes effect. + pub seq: MatchSeq, + + /// The value to set. A `None` indicates to delete it. + pub value: Operation, + + /// Meta data of a value. + pub value_meta: Option, +} + +impl UpsertPB { + pub fn new( + key: K, + seq: MatchSeq, + value: Operation, + value_meta: Option, + ) -> Self { + Self { + key, + seq, + value, + value_meta, + } + } + + pub fn insert(key: K, value: K::ValueType) -> Self { + Self { + key, + seq: MatchSeq::Exact(0), + value: Operation::Update(value), + value_meta: None, + } + } + + pub fn update(key: K, value: K::ValueType) -> Self { + Self { + key, + seq: MatchSeq::GE(0), + value: Operation::Update(value), + value_meta: None, + } + } + + pub fn delete(key: K) -> Self { + Self { + key, + seq: MatchSeq::GE(0), + value: Operation::Delete, + value_meta: None, + } + } + + pub fn with_expire_sec(self, expire_at_sec: u64) -> Self { + self.with(MetaSpec::new_expire(expire_at_sec)) + } + + /// Set the time to last for the value. + /// When the ttl is passed, the value is deleted. + pub fn with_ttl(self, ttl: Duration) -> Self { + self.with(MetaSpec::new_ttl(ttl)) + } +} + +impl With for UpsertPB { + fn with(mut self, seq: MatchSeq) -> Self { + self.seq = seq; + self + } +} + +impl With for UpsertPB { + fn with(mut self, meta: MetaSpec) -> Self { + self.value_meta = Some(meta); + self + } +} diff --git a/src/meta/types/src/change.rs b/src/meta/types/src/change.rs index ec1f309eb57c..e6b74c5541d0 100644 --- a/src/meta/types/src/change.rs +++ b/src/meta/types/src/change.rs @@ -29,9 +29,7 @@ use crate::SeqValue; /// the `result` could also be possible to be None. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, derive_more::From)] pub struct Change -where - ID: Clone + PartialEq, - T: Clone + PartialEq, +where ID: Clone + PartialEq { /// identity of the resource that is changed. pub ident: Option, @@ -42,7 +40,7 @@ where impl Change where ID: Clone + PartialEq + Debug, - T: Clone + PartialEq + Debug, + T: PartialEq + Debug, { pub fn new(prev: Option>, result: Option>) -> Self { Change { diff --git a/src/meta/types/src/cmd/mod.rs b/src/meta/types/src/cmd/mod.rs index e2fa08215204..c014dd30f16a 100644 --- a/src/meta/types/src/cmd/mod.rs +++ b/src/meta/types/src/cmd/mod.rs @@ -111,7 +111,7 @@ impl fmt::Display for UpsertKV { impl UpsertKV { pub fn new( - key: &str, + key: impl ToString, seq: MatchSeq, value: Operation>, value_meta: Option, diff --git a/src/meta/types/src/errors/meta_api_errors.rs b/src/meta/types/src/errors/meta_api_errors.rs index 684c0a0ad5b3..e6e85a20c314 100644 --- a/src/meta/types/src/errors/meta_api_errors.rs +++ b/src/meta/types/src/errors/meta_api_errors.rs @@ -23,6 +23,7 @@ use crate::raft_types::ChangeMembershipError; use crate::raft_types::Fatal; use crate::raft_types::ForwardToLeader; use crate::ClientWriteError; +use crate::InvalidArgument; use crate::InvalidReply; use crate::MetaNetworkError; use crate::RaftError; @@ -173,6 +174,13 @@ impl From for MetaAPIError { } } +impl From for MetaAPIError { + fn from(e: InvalidArgument) -> Self { + let net_err = MetaNetworkError::from(e); + Self::NetworkError(net_err) + } +} + impl From for MetaAPIError { fn from(e: InvalidReply) -> Self { let net_err = MetaNetworkError::from(e); diff --git a/src/meta/types/src/errors/meta_errors.rs b/src/meta/types/src/errors/meta_errors.rs index 79031d721649..306afff04e45 100644 --- a/src/meta/types/src/errors/meta_errors.rs +++ b/src/meta/types/src/errors/meta_errors.rs @@ -19,6 +19,7 @@ use serde::Serialize; use thiserror::Error; use crate::errors; +use crate::InvalidArgument; use crate::InvalidReply; use crate::MetaAPIError; use crate::MetaClientError; @@ -59,6 +60,12 @@ impl From for MetaError { } } +impl From for MetaError { + fn from(e: InvalidArgument) -> Self { + let api_err = MetaAPIError::from(e); + Self::APIError(api_err) + } +} impl From for MetaError { fn from(e: InvalidReply) -> Self { let api_err = MetaAPIError::from(e); diff --git a/src/query/management/src/udf/udf_mgr.rs b/src/query/management/src/udf/udf_mgr.rs index 07b10b95fcdd..44c229b004bb 100644 --- a/src/query/management/src/udf/udf_mgr.rs +++ b/src/query/management/src/udf/udf_mgr.rs @@ -18,21 +18,18 @@ use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_functions::is_builtin_function; use databend_common_meta_api::kv_pb_api::KVPbApi; +use databend_common_meta_api::kv_pb_api::UpsertPB; use databend_common_meta_app::principal::UdfName; use databend_common_meta_app::principal::UserDefinedFunction; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_kvapi::kvapi; use databend_common_meta_kvapi::kvapi::DirName; -use databend_common_meta_kvapi::kvapi::Key; use databend_common_meta_types::MatchSeq; -use databend_common_meta_types::MatchSeqExt; use databend_common_meta_types::MetaError; use databend_common_meta_types::SeqV; -use databend_common_meta_types::UpsertKV; use databend_common_meta_types::With; use futures::stream::TryStreamExt; -use crate::serde::serialize_struct; use crate::udf::UdfApi; pub struct UdfMgr { @@ -70,9 +67,8 @@ impl UdfApi for UdfMgr { let seq = MatchSeq::from(*create_option); let key = UdfName::new(&self.tenant, &info.name); - let value = serialize_struct(&info, ErrorCode::IllegalUDFFormat, || "")?; - let req = UpsertKV::insert(key.to_string_key(), &value).with(seq); - let res = self.kv_api.upsert_kv(req).await?; + let req = UpsertPB::insert(key, info.clone()).with(seq); + let res = self.kv_api.upsert_pb(&req).await?; if let CreateOption::CreateIfNotExists(false) = create_option { if res.prev.is_some() { @@ -89,6 +85,7 @@ impl UdfApi for UdfMgr { #[async_backtrace::framed] #[minitrace::trace] async fn update_udf(&self, info: UserDefinedFunction, seq: MatchSeq) -> Result { + // TODO: add ensure_not_builtin_function() if is_builtin_function(info.name.as_str()) { return Err(ErrorCode::UdfAlreadyExists(format!( "Cannot add UDF '{}': name conflicts with a built-in function.", @@ -96,33 +93,16 @@ impl UdfApi for UdfMgr { ))); } - // TODO: remove get_udf(), check if the UDF exists after upsert_kv() - // Check if UDF is defined - let seqv = self.get_udf(info.name.as_str()).await?; - - match seq.match_seq(&seqv) { - Ok(_) => {} - Err(_) => { - return Err(ErrorCode::UnknownUDF(format!( - "UDF '{}' does not exist.", - &info.name - ))); - } - } - let key = UdfName::new(&self.tenant, &info.name); - // TODO: these logic are reppeated several times, consider to extract them. - // TODO: add a new trait PBKVApi for the common logic that saves pb values in kvapi. - let value = serialize_struct(&info, ErrorCode::IllegalUDFFormat, || "")?; - let req = UpsertKV::update(key.to_string_key(), &value).with(seq); - let res = self.kv_api.upsert_kv(req).await?; - - match res.result { - Some(SeqV { seq: s, .. }) => Ok(s), - None => Err(ErrorCode::UnknownUDF(format!( + let req = UpsertPB::update(key, info.clone()).with(seq); + let res = self.kv_api.upsert_pb(&req).await?; + if res.is_changed() { + Ok(res.result.unwrap().seq) + } else { + Err(ErrorCode::UnknownUDF(format!( "UDF '{}' does not exist.", - info.name.clone() - ))), + info.name + ))) } } @@ -144,8 +124,7 @@ impl UdfApi for UdfMgr { #[minitrace::trace] async fn get_udfs(&self) -> Result> { let key = DirName::new(UdfName::new(&self.tenant, "")); - let strm = self.kv_api.list_pb(&key).await?; - let strm = strm.map_ok(|item| item.seqv.data); + let strm = self.kv_api.list_pb_values(&key).await?; let udfs = strm.try_collect().await?; Ok(udfs) } @@ -154,10 +133,10 @@ impl UdfApi for UdfMgr { #[minitrace::trace] async fn drop_udf(&self, udf_name: &str, seq: MatchSeq) -> Result<()> { let key = UdfName::new(&self.tenant, udf_name); - let req = UpsertKV::delete(key.to_string_key()).with(seq); - let res = self.kv_api.upsert_kv(req).await?; + let req = UpsertPB::delete(key).with(seq); + let res = self.kv_api.upsert_pb(&req).await?; - if res.prev.is_some() && res.result.is_none() { + if res.is_changed() { Ok(()) } else { Err(ErrorCode::UnknownUDF(format!(