diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index c6b84a7cad5b6..897624363c2b9 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -390,6 +390,11 @@ build_exceptions! { UnsupportedDictionarySource(3117), MissingDictionaryOption(3118), WrongDictionaryFieldExpr(3119), + + // Procedure + UnknownProcedure(3130), + ProcedureAlreadyExists(3131), + IllegalProcedureFormat(3132), } // Storage errors [3001, 4000]. diff --git a/src/meta/api/src/lib.rs b/src/meta/api/src/lib.rs index 84a5132428450..406b38c860e34 100644 --- a/src/meta/api/src/lib.rs +++ b/src/meta/api/src/lib.rs @@ -67,6 +67,7 @@ pub use util::is_all_db_data_removed; pub use util::is_db_need_to_be_remove; pub use util::list_keys; pub use util::list_u64_value; +pub use util::procedure_has_to_exist; pub use util::remove_db_from_share; pub use util::send_txn; pub use util::serialize_struct; diff --git a/src/meta/api/src/util.rs b/src/meta/api/src/util.rs index cc51fd7cce371..097752d509775 100644 --- a/src/meta/api/src/util.rs +++ b/src/meta/api/src/util.rs @@ -21,6 +21,7 @@ use databend_common_meta_app::app_error::ShareHasNoGrantedDatabase; use databend_common_meta_app::app_error::ShareHasNoGrantedPrivilege; use databend_common_meta_app::app_error::UnknownDatabase; use databend_common_meta_app::app_error::UnknownDatabaseId; +use databend_common_meta_app::app_error::UnknownProcedure; use databend_common_meta_app::app_error::UnknownShare; use databend_common_meta_app::app_error::UnknownShareAccounts; use databend_common_meta_app::app_error::UnknownShareEndpoint; @@ -31,6 +32,7 @@ use databend_common_meta_app::app_error::UnknownTableId; use databend_common_meta_app::app_error::VirtualColumnNotFound; use databend_common_meta_app::app_error::WrongShareObject; use databend_common_meta_app::primitive::Id; +use databend_common_meta_app::principal::ProcedureNameIdent; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent; use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdentRaw; use databend_common_meta_app::schema::DBIdTableName; @@ -444,6 +446,28 @@ pub fn db_id_has_to_exist(seq: u64, db_id: u64, msg: impl Display) -> Result<(), } } +/// Return OK if a procedure_id or procedure_meta exists by checking the seq. +/// +/// Otherwise returns UnknownProcedure error +pub fn procedure_has_to_exist( + seq: u64, + procedure_name_ident: &ProcedureNameIdent, + msg: impl Display, +) -> Result<(), KVAppError> { + if seq == 0 { + debug!(seq = seq, db_name_ident :? =(procedure_name_ident); "procedure does not exist"); + + Err(KVAppError::AppError(AppError::UnknownProcedure( + UnknownProcedure::new( + procedure_name_ident.procedure_name(), + format!("{}: {}", msg, procedure_name_ident.display()), + ), + ))) + } else { + Ok(()) + } +} + /// Return OK if a `table_name_ident->*` exists by checking the seq. /// /// Otherwise returns [`AppError::UnknownTable`] error diff --git a/src/meta/app/src/app_error.rs b/src/meta/app/src/app_error.rs index d527bf6f59da1..a4d35426aec78 100644 --- a/src/meta/app/src/app_error.rs +++ b/src/meta/app/src/app_error.rs @@ -67,6 +67,22 @@ impl DatabaseAlreadyExists { } } +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +#[error("ProcedureAlreadyExists: `{procedure_name}` while `{context}`")] +pub struct ProcedureAlreadyExists { + procedure_name: String, + context: String, +} + +impl ProcedureAlreadyExists { + pub fn new(procedure_name: impl Into, context: impl Into) -> Self { + Self { + procedure_name: procedure_name.into(), + context: context.into(), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] #[error("CreateDatabaseWithDropTime: `{db_name}` with drop_on")] pub struct CreateDatabaseWithDropTime { @@ -387,6 +403,22 @@ impl UnknownDatabase { } } +#[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] +#[error("UnknownProcedure: `{procedure_name}` while `{context}`")] +pub struct UnknownProcedure { + procedure_name: String, + context: String, +} + +impl UnknownProcedure { + pub fn new(procedure_name: impl Into, context: impl Into) -> Self { + Self { + procedure_name: procedure_name.into(), + context: context.into(), + } + } +} + #[derive(thiserror::Error, Debug, Clone, PartialEq, Eq)] #[error("UnknownCatalog: `{catalog_name}` while `{context}`")] pub struct UnknownCatalog { @@ -1244,6 +1276,13 @@ pub enum AppError { #[error(transparent)] UnknownDictionary(#[from] UnknownDictionary), + + // Procedure + #[error(transparent)] + UnknownProcedure(#[from] UnknownProcedure), + + #[error(transparent)] + ProcedureAlreadyExists(#[from] ProcedureAlreadyExists), } impl AppError { @@ -1296,6 +1335,18 @@ impl AppErrorMessage for UnknownDatabase { } } +impl AppErrorMessage for UnknownProcedure { + fn message(&self) -> String { + format!("Unknown procedure '{}'", self.procedure_name) + } +} + +impl AppErrorMessage for ProcedureAlreadyExists { + fn message(&self) -> String { + format!("Procedure '{}' already exists", self.procedure_name) + } +} + impl AppErrorMessage for DatabaseAlreadyExists { fn message(&self) -> String { format!("Database '{}' already exists", self.db_name) @@ -1805,6 +1856,10 @@ impl From for ErrorCode { ErrorCode::DictionaryAlreadyExists(err.message()) } AppError::UnknownDictionary(err) => ErrorCode::UnknownDictionary(err.message()), + AppError::UnknownProcedure(err) => ErrorCode::UnknownProcedure(err.message()), + AppError::ProcedureAlreadyExists(err) => { + ErrorCode::ProcedureAlreadyExists(err.message()) + } } } } diff --git a/src/meta/app/src/id_generator.rs b/src/meta/app/src/id_generator.rs index d9ab6602e5fa4..5930ccd54a3cd 100644 --- a/src/meta/app/src/id_generator.rs +++ b/src/meta/app/src/id_generator.rs @@ -28,6 +28,8 @@ pub(crate) const ID_GEN_SHARE_ENDPOINT: &str = "share_endpoint_id"; pub(crate) const ID_GEN_DATA_MASK: &str = "data_mask"; pub(crate) const ID_GEN_BACKGROUND_JOB: &str = "background_job"; +pub(crate) const ID_GEN_PROCEDURE: &str = "procedure_id"; + /// Key for resource id generator /// /// This is a special key for an application to generate unique id with kvapi::KVApi. @@ -103,6 +105,13 @@ impl IdGenerator { resource: ID_GEN_CATALOG.to_string(), } } + + /// Create a key for generating procedure id with kvapi::KVApi + pub fn procedure_id() -> Self { + Self { + resource: ID_GEN_PROCEDURE.to_string(), + } + } } impl kvapi::KeyCodec for IdGenerator { @@ -217,6 +226,16 @@ mod t { assert_eq!(g1, g2); } + // Procedure id generator + { + let g = IdGenerator::procedure_id(); + let k = g.to_string_key(); + assert_eq!("__fd_id_gen/procedure_id", k); + + let t2 = IdGenerator::from_str_key(&k)?; + assert_eq!(g, t2); + } + Ok(()) } diff --git a/src/meta/app/src/principal/mod.rs b/src/meta/app/src/principal/mod.rs index e114945bd9c23..9410489a44b44 100644 --- a/src/meta/app/src/principal/mod.rs +++ b/src/meta/app/src/principal/mod.rs @@ -42,6 +42,9 @@ pub mod client_session_ident; pub mod connection_ident; pub mod network_policy_ident; pub mod password_policy_ident; +pub mod procedur_name_ident; +pub mod procedure; +pub mod procedure_id_ident; pub mod stage_file_ident; pub mod tenant_ownership_object_ident; pub mod tenant_user_ident; @@ -60,6 +63,21 @@ pub use ownership_object::OwnershipObject; pub use password_policy::PasswordPolicy; pub use password_policy_ident::PasswordPolicyIdent; pub use principal_identity::PrincipalIdentity; +pub use procedur_name_ident::ProcedureNameIdent; +pub use procedure::CreateProcedureReply; +pub use procedure::CreateProcedureReq; +pub use procedure::DropProcedureReply; +pub use procedure::DropProcedureReq; +pub use procedure::GetProcedureReply; +pub use procedure::GetProcedureReq; +pub use procedure::ListProcedureReq; +pub use procedure::ProcedureId; +pub use procedure::ProcedureIdList; +pub use procedure::ProcedureIdToName; +pub use procedure::ProcedureInfoFilter; +pub use procedure::ProcedureMeta; +pub use procedure::RenameProcedureReply; +pub use procedure::RenameProcedureReq; pub use role_ident::RoleIdent; pub use role_ident::RoleIdentRaw; pub use role_info::RoleInfo; diff --git a/src/meta/app/src/principal/procedur_name_ident.rs b/src/meta/app/src/principal/procedur_name_ident.rs new file mode 100644 index 0000000000000..83ce02d44026b --- /dev/null +++ b/src/meta/app/src/principal/procedur_name_ident.rs @@ -0,0 +1,82 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::tenant_key::ident::TIdent; +use crate::tenant_key::raw::TIdentRaw; + +pub type ProcedureNameIdent = TIdent; +pub type ProcedureNameIdentRaw = TIdentRaw; + +pub use kvapi_impl::Resource; + +impl ProcedureNameIdent { + pub fn procedure_name(&self) -> &str { + self.name() + } +} + +impl ProcedureNameIdentRaw { + pub fn procedure_name(&self) -> &str { + self.name() + } +} + +mod kvapi_impl { + + use databend_common_meta_kvapi::kvapi; + use databend_common_meta_kvapi::kvapi::Key; + + use crate::primitive::Id; + use crate::principal::ProcedureId; + use crate::principal::ProcedureNameIdent; + use crate::tenant_key::resource::TenantResource; + + pub struct Resource; + impl TenantResource for Resource { + const PREFIX: &'static str = "__fd_procedure"; + const TYPE: &'static str = "ProcedureNameIdent"; + const HAS_TENANT: bool = true; + type ValueType = Id; + } + + impl kvapi::Value for Id { + type KeyType = ProcedureNameIdent; + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [self.inner().to_string_key()] + } + } + + // // Use these error types to replace usage of ErrorCode if possible. + // impl From> for ErrorCode { + // impl From> for ErrorCode { +} + +#[cfg(test)] +mod tests { + use databend_common_meta_kvapi::kvapi::Key; + + use super::ProcedureNameIdent; + use crate::tenant::Tenant; + + #[test] + fn test_ident() { + let tenant = Tenant::new_literal("test"); + let ident = ProcedureNameIdent::new(tenant, "test1"); + + let key = ident.to_string_key(); + assert_eq!(key, "__fd_procedure/test/test1"); + + assert_eq!(ident, ProcedureNameIdent::from_str_key(&key).unwrap()); + } +} diff --git a/src/meta/app/src/principal/procedure.rs b/src/meta/app/src/principal/procedure.rs new file mode 100644 index 0000000000000..00a603a507a29 --- /dev/null +++ b/src/meta/app/src/principal/procedure.rs @@ -0,0 +1,384 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::fmt::Display; +use std::fmt::Formatter; +use std::ops::Deref; + +use chrono::DateTime; +use chrono::Utc; +use databend_common_expression::types::DataType; + +use crate::principal::procedur_name_ident::ProcedureNameIdent; +use crate::schema::CreateOption; +use crate::tenant::Tenant; +use crate::tenant::ToTenant; +use crate::KeyWithTenant; + +#[derive(Clone, Debug, PartialEq)] +pub struct ProcedureInfo { + pub ident: ProcedureIdent, + pub name_ident: ProcedureNameIdent, + pub meta: ProcedureMeta, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, Eq, PartialEq)] +pub struct ProcedureIdent { + pub procedure_id: u64, + pub seq: u64, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq, PartialOrd, Ord)] +pub struct ProcedureId { + pub procedure_id: u64, +} + +impl ProcedureId { + pub fn new(procedure_id: u64) -> Self { + ProcedureId { procedure_id } + } +} + +impl From for ProcedureId { + fn from(procedure_id: u64) -> Self { + ProcedureId { procedure_id } + } +} + +impl Display for ProcedureId { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{}", self.procedure_id) + } +} + +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct ProcedureIdToName { + pub procedure_id: u64, +} + +impl Display for ProcedureIdToName { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{}", self.procedure_id) + } +} + +impl ProcedureIdToName { + pub fn new(procedure_id: u64) -> Self { + ProcedureIdToName { procedure_id } + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct ProcedureMeta { + pub return_types: Vec, + pub created_on: DateTime, + pub updated_on: DateTime, + pub script: String, + pub comment: String, + pub procedure_language: String, +} + +impl Default for ProcedureMeta { + fn default() -> Self { + ProcedureMeta { + return_types: vec![], + created_on: Utc::now(), + updated_on: Utc::now(), + script: "".to_string(), + comment: "".to_string(), + procedure_language: "SQL".to_string(), + } + } +} + +impl Display for ProcedureMeta { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Lanuage: {:?}, return_type: {:?}, CreatedOn: {:?}, Script: {:?}, Comment: {:?}", + self.procedure_language, self.return_types, self.created_on, self.script, self.comment + ) + } +} + +/// Save procedure name id list history. +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, Default, PartialEq)] +pub struct ProcedureIdList { + pub id_list: Vec, +} + +impl ProcedureIdList { + pub fn new() -> ProcedureIdList { + ProcedureIdList::default() + } + + pub fn len(&self) -> usize { + self.id_list.len() + } + + pub fn id_list(&self) -> &Vec { + &self.id_list + } + + pub fn append(&mut self, table_id: u64) { + self.id_list.push(table_id); + } + + pub fn is_empty(&self) -> bool { + self.id_list.is_empty() + } + + pub fn pop(&mut self) -> Option { + self.id_list.pop() + } + + pub fn last(&mut self) -> Option<&u64> { + self.id_list.last() + } +} + +impl Display for ProcedureIdList { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!(f, "DB id list: {:?}", self.id_list) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct CreateProcedureReq { + pub create_option: CreateOption, + pub name_ident: ProcedureNameIdent, + pub meta: ProcedureMeta, +} + +impl Display for CreateProcedureReq { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + match self.create_option { + CreateOption::Create => write!( + f, + "create_procedure:{}/{}={:?}", + self.name_ident.tenant_name(), + self.name_ident.procedure_name(), + self.meta + ), + CreateOption::CreateIfNotExists => write!( + f, + "create_procedure_if_not_exists:{}/{}={:?}", + self.name_ident.tenant_name(), + self.name_ident.procedure_name(), + self.meta + ), + + CreateOption::CreateOrReplace => write!( + f, + "create_or_replace_procedure:{}/{}={:?}", + self.name_ident.tenant_name(), + self.name_ident.procedure_name(), + self.meta + ), + } + } +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Eq, PartialEq)] +pub struct CreateProcedureReply { + pub procedure_id: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RenameProcedureReq { + pub if_exists: bool, + pub name_ident: ProcedureNameIdent, + pub new_procedure_name: String, +} + +impl Display for RenameProcedureReq { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!( + f, + "rename_procedure:{}/{}=>{}", + self.name_ident.tenant_name(), + self.name_ident.procedure_name(), + self.new_procedure_name + ) + } +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct RenameProcedureReply {} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct DropProcedureReq { + pub if_exists: bool, + pub name_ident: ProcedureNameIdent, +} + +impl Display for DropProcedureReq { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!( + f, + "drop_procedure(if_exists={}):{}/{}", + self.if_exists, + self.name_ident.tenant_name(), + self.name_ident.procedure_name(), + ) + } +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct DropProcedureReply { + pub procedure_id: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct UndropProcedureReq { + pub name_ident: ProcedureNameIdent, +} + +impl Display for UndropProcedureReq { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!( + f, + "undrop_procedure:{}/{}", + self.name_ident.tenant_name(), + self.name_ident.procedure_name(), + ) + } +} + +impl UndropProcedureReq { + pub fn tenant(&self) -> &Tenant { + self.name_ident.tenant() + } + pub fn procedure_name(&self) -> &str { + self.name_ident.procedure_name() + } +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +pub struct UndropProcedureReply {} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct GetProcedureReq { + pub inner: ProcedureNameIdent, +} + +impl Deref for GetProcedureReq { + type Target = ProcedureNameIdent; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl GetProcedureReq { + pub fn new(tenant: impl ToTenant, procedure_name: impl ToString) -> GetProcedureReq { + GetProcedureReq { + inner: ProcedureNameIdent::new(tenant, procedure_name), + } + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct GetProcedureReply { + pub id: u64, + pub index_meta: ProcedureMeta, +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +pub enum ProcedureInfoFilter { + // include all dropped procedures + IncludeDropped, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ListProcedureReq { + pub tenant: Tenant, + pub filter: Option, +} + +impl ListProcedureReq { + pub fn tenant(&self) -> &Tenant { + &self.tenant + } +} + +mod kvapi_key_impl { + use databend_common_meta_kvapi::kvapi; + + use crate::principal::procedur_name_ident::ProcedureNameIdentRaw; + use crate::principal::ProcedureId; + use crate::principal::ProcedureIdToName; + use crate::principal::ProcedureMeta; + + impl kvapi::KeyCodec for ProcedureId { + fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder { + b.push_u64(self.procedure_id) + } + + fn decode_key(parser: &mut kvapi::KeyParser) -> Result { + let procedure_id = parser.next_u64()?; + Ok(Self { procedure_id }) + } + } + + /// "__fd_procedure_by_id/" + impl kvapi::Key for ProcedureId { + const PREFIX: &'static str = "__fd_procedure_by_id"; + + type ValueType = ProcedureMeta; + + fn parent(&self) -> Option { + None + } + } + + impl kvapi::KeyCodec for ProcedureIdToName { + fn encode_key(&self, b: kvapi::KeyBuilder) -> kvapi::KeyBuilder { + b.push_u64(self.procedure_id) + } + + fn decode_key(parser: &mut kvapi::KeyParser) -> Result { + let procedure_id = parser.next_u64()?; + Ok(Self { procedure_id }) + } + } + + /// "__fd_procedure_id_to_name/ -> ProcedureNameIdent" + impl kvapi::Key for ProcedureIdToName { + const PREFIX: &'static str = "__fd_procedure_id_to_name"; + + type ValueType = ProcedureNameIdentRaw; + + fn parent(&self) -> Option { + Some(ProcedureId::new(self.procedure_id).to_string_key()) + } + } + + impl kvapi::Value for ProcedureMeta { + type KeyType = ProcedureId; + + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [] + } + } + + impl kvapi::Value for ProcedureNameIdentRaw { + type KeyType = ProcedureIdToName; + + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [] + } + } +} diff --git a/src/meta/app/src/principal/procedure_id_ident.rs b/src/meta/app/src/principal/procedure_id_ident.rs new file mode 100644 index 0000000000000..4d55a080a321d --- /dev/null +++ b/src/meta/app/src/principal/procedure_id_ident.rs @@ -0,0 +1,87 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::tenant_key::ident::TIdent; + +/// The key of the list of database ids that have been used in history by a database name. +pub type ProcedureIdIdent = TIdent; +pub type ProcedureIdIdentRaw = TIdentRaw; + +pub use kvapi_impl::Resource; + +use crate::tenant_key::raw::TIdentRaw; + +impl ProcedureIdIdent { + pub fn database_name(&self) -> &str { + self.name().as_str() + } +} + +impl ProcedureIdIdentRaw { + pub fn database_name(&self) -> &str { + self.name().as_str() + } +} + +mod kvapi_impl { + + use databend_common_meta_kvapi::kvapi; + use databend_common_meta_kvapi::kvapi::Key; + + use crate::principal::procedure_id_ident::ProcedureIdIdent; + use crate::principal::ProcedureId; + use crate::principal::ProcedureIdList; + use crate::tenant_key::resource::TenantResource; + + pub struct Resource; + impl TenantResource for Resource { + const PREFIX: &'static str = "__fd_procedure_id_list"; + const TYPE: &'static str = "ProcedureIdIdent"; + const HAS_TENANT: bool = true; + type ValueType = ProcedureIdList; + } + + impl kvapi::Value for ProcedureIdList { + type KeyType = ProcedureIdIdent; + + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + self.id_list + .iter() + .map(|id| ProcedureId::new(*id).to_string_key()) + } + } + + // // Use these error types to replace usage of ErrorCode if possible. + // impl From> for ErrorCode { + // impl From> for ErrorCode { +} + +#[cfg(test)] +mod tests { + use databend_common_meta_kvapi::kvapi::Key; + + use super::ProcedureIdIdent; + use crate::tenant::Tenant; + + #[test] + fn test_database_id_history_ident() { + let tenant = Tenant::new_literal("test"); + let ident = ProcedureIdIdent::new(tenant, "3"); + + let key = ident.to_string_key(); + assert_eq!(key, "__fd_procedure_id_list/test/3"); + + assert_eq!(ident, ProcedureIdIdent::from_str_key(&key).unwrap()); + } +} diff --git a/src/meta/proto-conv/src/lib.rs b/src/meta/proto-conv/src/lib.rs index 86b22f0832598..d5a35e4125cd6 100644 --- a/src/meta/proto-conv/src/lib.rs +++ b/src/meta/proto-conv/src/lib.rs @@ -78,6 +78,7 @@ mod least_visible_time_from_to_protobuf_impl; mod lock_from_to_protobuf_impl; mod owner_from_to_protobuf_impl; mod ownership_from_to_protobuf_impl; +mod procedure_from_to_protobuf_impl; mod role_from_to_protobuf_impl; mod schema_from_to_protobuf_impl; mod sequence_from_to_protobuf_impl; diff --git a/src/meta/proto-conv/src/procedure_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/procedure_from_to_protobuf_impl.rs new file mode 100644 index 0000000000000..b1edc1d4b614e --- /dev/null +++ b/src/meta/proto-conv/src/procedure_from_to_protobuf_impl.rs @@ -0,0 +1,104 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This mod is the key point about compatibility. +//! Everytime update anything in this file, update the `VER` and let the tests pass. + +use chrono::DateTime; +use chrono::Utc; +use databend_common_expression::infer_schema_type; +use databend_common_expression::types::DataType; +use databend_common_expression::TableDataType; +use databend_common_meta_app as mt; +use databend_common_protos::pb; + +use crate::reader_check_msg; +use crate::FromToProto; +use crate::Incompatible; +use crate::MIN_READER_VER; +use crate::VER; + +impl FromToProto for mt::principal::ProcedureMeta { + type PB = pb::ProcedureMeta; + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + fn from_pb(p: pb::ProcedureMeta) -> Result { + reader_check_msg(p.ver, p.min_reader_ver)?; + + reader_check_msg(p.ver, p.min_reader_ver)?; + + let mut return_types = Vec::with_capacity(p.return_types.len()); + for arg_type in p.return_types { + let arg_type = DataType::from(&TableDataType::from_pb(arg_type)?); + return_types.push(arg_type); + } + + let v = Self { + return_types, + created_on: DateTime::::from_pb(p.created_on)?, + updated_on: DateTime::::from_pb(p.updated_on)?, + script: p.script, + comment: p.comment, + procedure_language: p.language, + }; + Ok(v) + } + + fn to_pb(&self) -> Result { + let mut return_types = Vec::with_capacity(self.return_types.len()); + for arg_type in self.return_types.iter() { + let arg_type = infer_schema_type(arg_type) + .map_err(|e| Incompatible { + reason: format!("Convert DataType to TableDataType failed: {}", e.message()), + })? + .to_pb()?; + return_types.push(arg_type); + } + + let p = pb::ProcedureMeta { + ver: VER, + min_reader_ver: MIN_READER_VER, + return_types, + created_on: self.created_on.to_pb()?, + updated_on: self.updated_on.to_pb()?, + script: self.script.to_string(), + comment: self.comment.to_string(), + language: self.procedure_language.to_string(), + }; + Ok(p) + } +} + +impl FromToProto for mt::principal::ProcedureIdList { + type PB = pb::ProcedureIdList; + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + fn from_pb(p: pb::ProcedureIdList) -> Result { + reader_check_msg(p.ver, p.min_reader_ver)?; + + let v = Self { id_list: p.ids }; + Ok(v) + } + + fn to_pb(&self) -> Result { + let p = pb::ProcedureIdList { + ver: VER, + min_reader_ver: MIN_READER_VER, + ids: self.id_list.clone(), + }; + Ok(p) + } +} diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index fdeefa56fd1de..18f8d3a65ce7f 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -136,7 +136,8 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (104, "2024-08-02: Add: add share catalog into Catalog meta"), (105, "2024-08-05: Add: add Dictionary meta"), (106, "2024-08-08: Add: add QueryTokenInfo"), - (107, "2024-08-09: Add: datatype.proto/DataType Geography type") + (107, "2024-08-09: Add: datatype.proto/DataType Geography type"), + (108, "2024-08-29: Add: procedure.proto: ProcedureMeta and ProcedureIdList") // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index c6231cf731fde..15e9a8deeca00 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -111,3 +111,4 @@ mod v104_share_catalog; mod v105_dictionary_meta; mod v106_query_token; mod v107_geography_datatype; +mod v108_procedure; diff --git a/src/meta/proto-conv/tests/it/v108_procedure.rs b/src/meta/proto-conv/tests/it/v108_procedure.rs new file mode 100644 index 0000000000000..4027631384b0e --- /dev/null +++ b/src/meta/proto-conv/tests/it/v108_procedure.rs @@ -0,0 +1,43 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::TimeZone; +use chrono::Utc; +use databend_common_expression::types::DataType; +use databend_common_meta_app::principal as mt; +use fastrace::func_name; + +use crate::common; + +#[test] +fn v108_procedure_meta() -> anyhow::Result<()> { + let procedure_meta_v108: Vec = vec![ + 34, 9, 146, 2, 0, 160, 6, 108, 168, 6, 24, 82, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, + 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 90, 23, 50, 48, 49, 52, 45, 49, 49, 45, + 50, 57, 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 98, 7, 102, 111, 111, 32, 98, + 97, 114, 114, 3, 83, 81, 76, 160, 6, 108, 168, 6, 24, + ]; + + let want = || mt::ProcedureMeta { + return_types: vec![DataType::String], + created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), + updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 9).unwrap(), + script: "".to_string(), + comment: "foo bar".to_string(), + procedure_language: "SQL".to_string(), + }; + + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), procedure_meta_v108.as_slice(), 108, want()) +} diff --git a/src/meta/protos/proto/procedure.proto b/src/meta/protos/proto/procedure.proto new file mode 100644 index 0000000000000..e87242b1f8006 --- /dev/null +++ b/src/meta/protos/proto/procedure.proto @@ -0,0 +1,53 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The identifier of a database by name. Names can be changed. +// There is no guarantee that two get-database request by name will return the +// same instance. + +syntax = "proto3"; + +package databend_proto; + +import "datatype.proto"; + +// ProcedureMeta is a container of all non-identity information. +message ProcedureMeta { + uint64 ver = 100; + uint64 min_reader_ver = 101; + + // Procedure return type + repeated DataType return_types = 4; + + // The time database created. + string created_on = 10; + + // The time database updated. + string updated_on = 11; + + // Comment about this database. + string comment = 12; + + // Comment about this database. + string script = 13; + string language = 14; +} + +// Save procedure name id list history. +message ProcedureIdList { + uint64 ver = 100; + uint64 min_reader_ver = 101; + + repeated uint64 ids = 1; +} diff --git a/src/query/ast/src/ast/statements/procedure.rs b/src/query/ast/src/ast/statements/procedure.rs index d578efd14f6cc..6558b70858d27 100644 --- a/src/query/ast/src/ast/statements/procedure.rs +++ b/src/query/ast/src/ast/statements/procedure.rs @@ -19,6 +19,7 @@ use derive_visitor::Drive; use derive_visitor::DriveMut; use crate::ast::write_comma_separated_list; +use crate::ast::write_comma_separated_string_list; use crate::ast::CreateOption; use crate::ast::TypeName; @@ -68,6 +69,7 @@ pub struct CreateProcedureStmt { pub create_option: CreateOption, pub name: String, pub language: ProcedureLanguage, + // TODO(eason): Now args is alwarys none, but maybe we also need to consider arg name? pub args: Option>, pub return_type: Vec, pub comment: Option, @@ -173,3 +175,18 @@ impl Display for DescProcedureStmt { Ok(()) } } + +#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)] +pub struct CallProcedureStmt { + pub name: String, + pub args: Vec, +} + +impl Display for CallProcedureStmt { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "CALL PROCEDURE {}(", self.name)?; + write_comma_separated_string_list(f, self.args.clone())?; + write!(f, ")")?; + Ok(()) + } +} diff --git a/src/query/ast/src/ast/statements/statement.rs b/src/query/ast/src/ast/statements/statement.rs index 914988a6258b3..cf4baddc6f39e 100644 --- a/src/query/ast/src/ast/statements/statement.rs +++ b/src/query/ast/src/ast/statements/statement.rs @@ -329,6 +329,7 @@ pub enum Statement { show_options: Option, }, DescProcedure(DescProcedureStmt), + CallProcedure(CallProcedureStmt), // Sequence CreateSequence(CreateSequenceStmt), @@ -822,6 +823,7 @@ impl Display for Statement { write!(f, " '{object_id}'")?; } Statement::System(stmt) => write!(f, "{stmt}")?, + Statement::CallProcedure(stmt) => write!(f, "{stmt}")?, } Ok(()) } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index f8a07a5ea1f5a..4237d0789f4df 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -2247,6 +2247,18 @@ pub fn statement_body(i: Input) -> IResult { |(_, _, show_options)| Statement::ShowProcedures { show_options }, ); + let call_procedure = map( + rule! { + CALL ~ PROCEDURE ~ #ident ~ "(" ~ ")" + }, + |(_, _, name, _, _)| { + Statement::CallProcedure(CallProcedureStmt { + name: name.to_string(), + args: vec![], + }) + }, + ); + let describe_procedure = map( rule! { ( DESC | DESCRIBE ) ~ PROCEDURE ~ #ident ~ "(" ~ ")" @@ -2479,6 +2491,7 @@ AS | #drop_procedure : "`DROP PROCEDURE ()`" | #show_procedures : "`SHOW PROCEDURES []()`" | #describe_procedure : "`DESC PROCEDURE ()`" + | #call_procedure : "`CALL PROCEDURE ()`" ), ))(i) } diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index 3ebddbea701c1..3e49556f8bc76 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -844,6 +844,7 @@ fn test_statement() { // Stored Procedure r#"describe PROCEDURE p1()"#, r#"drop PROCEDURE p1()"#, + r#"call PROCEDURE p1()"#, r#"show PROCEDURES like 'p1%'"#, r#"create PROCEDURE p1() returns string not null language sql comment = 'test' as $$ BEGIN @@ -984,6 +985,7 @@ fn test_statement_error() { // Stored Procedure r#"desc procedure p1"#, r#"drop procedure p1"#, + r#"call procedure p1"#, r#"create PROCEDURE p1() returns table(string not null, int null) language sql comment = 'test' as $$ BEGIN LET sum := 0; diff --git a/src/query/ast/tests/it/testdata/stmt-error.txt b/src/query/ast/tests/it/testdata/stmt-error.txt index 6f029ab1ddaae..06c1a09a19ede 100644 --- a/src/query/ast/tests/it/testdata/stmt-error.txt +++ b/src/query/ast/tests/it/testdata/stmt-error.txt @@ -976,6 +976,18 @@ error: | while parsing `DROP PROCEDURE ()` +---------- Input ---------- +call procedure p1 +---------- Output --------- +error: + --> SQL:1:18 + | +1 | call procedure p1 + | ---- ^ unexpected end of input, expecting `(` + | | + | while parsing `CALL PROCEDURE ()` + + ---------- Input ---------- create PROCEDURE p1() returns table(string not null, int null) language sql comment = 'test' as $$ BEGIN @@ -991,7 +1003,7 @@ error: --> SQL:1:44 | 1 | create PROCEDURE p1() returns table(string not null, int null) language sql comment = 'test' as $$ - | ------ ----- ^^^ unexpected `not`, expecting `INT8`, `INT16`, `INT32`, `INT64`, `UINT16`, `UINT32`, `UINT64`, `INTEGER`, `FLOAT32`, `FLOAT64`, `GEOMETRY`, `INT`, `BOOL`, `DATE`, `BLOB`, `TEXT`, `JSON`, `UINT8`, `FLOAT`, `TUPLE`, `DOUBLE`, `BITMAP`, `BINARY`, `STRING`, `BOOLEAN`, `UNSIGNED`, `DATETIME`, `NULLABLE`, `TIMESTAMP`, `TINYINT`, `LONGBLOB`, `TINYBLOB`, `SMALLINT`, `BIGINT`, `SIGNED`, `DECIMAL`, `ARRAY`, `MAP`, `VARBINARY`, `MEDIUMBLOB`, `VARCHAR`, `CHAR`, `CHARACTER`, or `VARIANT` + | ------ ----- ^^^ unexpected `not`, expecting `INT8`, `INT16`, `INT32`, `INT64`, `UINT16`, `UINT32`, `UINT64`, `INTEGER`, `FLOAT32`, `FLOAT64`, `GEOMETRY`, `INT`, `BOOL`, `DATE`, `BLOB`, `TEXT`, `JSON`, `UINT8`, `FLOAT`, `TUPLE`, `DOUBLE`, `BITMAP`, `BINARY`, `STRING`, `BOOLEAN`, `UNSIGNED`, `DATETIME`, `NULLABLE`, `TIMESTAMP`, `GEOGRAPHY`, `TINYINT`, `LONGBLOB`, `TINYBLOB`, `SMALLINT`, `BIGINT`, `SIGNED`, `DECIMAL`, `ARRAY`, `MAP`, `VARBINARY`, `MEDIUMBLOB`, `VARCHAR`, `CHAR`, `CHARACTER`, or `VARIANT` | | | | | while parsing TABLE( , ...) | while parsing `CREATE [ OR REPLACE ] PROCEDURE () RETURNS { [ NOT NULL ] | TABLE( , ...)} LANGUAGE SQL [ COMMENT = '' ] AS ` diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 7731d69c58dc4..cf55c5fa35577 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -23445,6 +23445,19 @@ DropProcedure( ) +---------- Input ---------- +call PROCEDURE p1() +---------- Output --------- +CALL PROCEDURE p1() +---------- AST ------------ +CallProcedure( + CallProcedureStmt { + name: "p1", + args: [], + }, +) + + ---------- Input ---------- show PROCEDURES like 'p1%' ---------- Output --------- diff --git a/src/query/management/src/lib.rs b/src/query/management/src/lib.rs index 7cc18effb4336..07d9ef0fcd472 100644 --- a/src/query/management/src/lib.rs +++ b/src/query/management/src/lib.rs @@ -29,6 +29,7 @@ mod user; mod client_session; pub mod errors; +mod procedure; pub use client_session::ClientSessionMgr; pub use cluster::ClusterApi; @@ -37,6 +38,7 @@ pub use connection::ConnectionMgr; pub use file_format::FileFormatMgr; pub use network_policy::NetworkPolicyMgr; pub use password_policy::PasswordPolicyMgr; +pub use procedure::ProcedureMgr; pub use quota::QuotaApi; pub use quota::QuotaMgr; pub use role::RoleApi; diff --git a/src/query/management/src/procedure/mod.rs b/src/query/management/src/procedure/mod.rs new file mode 100644 index 0000000000000..1f6fbbac35100 --- /dev/null +++ b/src/query/management/src/procedure/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod procedure_mgr; + +pub use procedure_mgr::ProcedureMgr; diff --git a/src/query/management/src/procedure/procedure_mgr.rs b/src/query/management/src/procedure/procedure_mgr.rs new file mode 100644 index 0000000000000..a8b6524aa21cb --- /dev/null +++ b/src/query/management/src/procedure/procedure_mgr.rs @@ -0,0 +1,414 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::sync::Arc; + +use databend_common_meta_api::deserialize_struct; +use databend_common_meta_api::fetch_id; +use databend_common_meta_api::get_pb_value; +use databend_common_meta_api::get_u64_value; +use databend_common_meta_api::kv_app_error::KVAppError; +use databend_common_meta_api::list_u64_value; +use databend_common_meta_api::procedure_has_to_exist; +use databend_common_meta_api::send_txn; +use databend_common_meta_api::serialize_struct; +use databend_common_meta_api::serialize_u64; +use databend_common_meta_api::txn_backoff::txn_backoff; +use databend_common_meta_api::txn_cond_seq; +use databend_common_meta_api::txn_op_del; +use databend_common_meta_api::txn_op_put; +use databend_common_meta_app::app_error::AppError; +use databend_common_meta_app::app_error::ProcedureAlreadyExists; +use databend_common_meta_app::id_generator::IdGenerator; +use databend_common_meta_app::principal::procedur_name_ident::ProcedureNameIdentRaw; +use databend_common_meta_app::principal::procedure::ProcedureIdent; +use databend_common_meta_app::principal::procedure::ProcedureInfo; +use databend_common_meta_app::principal::procedure_id_ident::ProcedureIdIdent; +use databend_common_meta_app::principal::CreateProcedureReply; +use databend_common_meta_app::principal::CreateProcedureReq; +use databend_common_meta_app::principal::DropProcedureReply; +use databend_common_meta_app::principal::DropProcedureReq; +use databend_common_meta_app::principal::GetProcedureReq; +use databend_common_meta_app::principal::ListProcedureReq; +use databend_common_meta_app::principal::ProcedureId; +use databend_common_meta_app::principal::ProcedureIdList; +use databend_common_meta_app::principal::ProcedureIdToName; +use databend_common_meta_app::principal::ProcedureMeta; +use databend_common_meta_app::principal::ProcedureNameIdent; +use databend_common_meta_app::schema::CreateOption; +use databend_common_meta_app::tenant::Tenant; +use databend_common_meta_app::KeyWithTenant; +use databend_common_meta_kvapi::kvapi; +use databend_common_meta_kvapi::kvapi::Key; +use databend_common_meta_types::ConditionResult::Eq; +use databend_common_meta_types::MetaError; +use databend_common_meta_types::TxnCondition; +use databend_common_meta_types::TxnOp; +use databend_common_meta_types::TxnRequest; +use fastrace::func_name; +use log::debug; +use log::info; +use log::warn; + +pub struct ProcedureMgr { + kv_api: Arc>, + tenant: Tenant, +} + +impl ProcedureMgr { + pub fn create(kv_api: Arc>, tenant: &Tenant) -> Self { + ProcedureMgr { + kv_api, + tenant: tenant.clone(), + } + } + + /// Add a PROCEDURE to /tenant/procedure-name. + #[async_backtrace::framed] + pub async fn create_procedure( + &self, + req: CreateProcedureReq, + ) -> Result { + debug!(req :? =(&req); "SchemaApi: {}", func_name!()); + + let name_key = &req.name_ident; + + let mut trials = txn_backoff(None, func_name!()); + loop { + trials.next().unwrap()?.await; + + // Get procedure by name to ensure absence + let (procedure_id_seq, procedure_id) = + get_u64_value(self.kv_api.as_ref(), name_key).await?; + debug!(procedure_id_seq = procedure_id_seq, procedure_id = procedure_id, name_key :? =(name_key); "get_procedure"); + + let mut condition = vec![]; + let mut if_then = vec![]; + + if procedure_id_seq > 0 { + match req.create_option { + CreateOption::Create => { + return Err(KVAppError::AppError(AppError::ProcedureAlreadyExists( + ProcedureAlreadyExists::new( + name_key.procedure_name(), + format!("create procedure: tenant: {}", self.tenant.tenant_name()), + ), + ))); + } + CreateOption::CreateIfNotExists => { + return Ok(CreateProcedureReply { procedure_id }); + } + CreateOption::CreateOrReplace => { + // TODO + // let (_, share_specs) = drop_procedure_meta( + // self, + // name_key, + // false, + // false, + // &mut condition, + // &mut if_then, + // ) + // .await?; + // share_specs + } + } + } + + // get procedure id list from _fd_procedure_id_list/procedure_id + let procedureid_idlist = + ProcedureIdIdent::new(name_key.tenant(), name_key.procedure_name()); + let (procedure_id_list_seq, procedure_id_list_opt): (_, Option) = + get_pb_value(self.kv_api.as_ref(), &procedureid_idlist).await?; + + let mut procedure_id_list = if procedure_id_list_seq == 0 { + ProcedureIdList::new() + } else { + procedure_id_list_opt.unwrap_or(ProcedureIdList::new()) + }; + + // Create procedure by inserting these record: + // (tenant, procedure_name) -> procedure_id + // (procedure_id) -> procedure_meta + // append procedure_id into _fd_procedure_id_list// + // (procedure_id) -> (tenant,procedure_name) + + let procedure_id = fetch_id(self.kv_api.as_ref(), IdGenerator::procedure_id()).await?; + let id_key = ProcedureId { procedure_id }; + let id_to_name_key = ProcedureIdToName { procedure_id }; + + debug!(procedure_id = procedure_id, name_key :? =(name_key); "new procedure id"); + + { + // append procedure_id into procedure_id_list + procedure_id_list.append(procedure_id); + + condition.extend(vec![ + txn_cond_seq(name_key, Eq, procedure_id_seq), + txn_cond_seq(&id_to_name_key, Eq, 0), + txn_cond_seq(&procedureid_idlist, Eq, procedure_id_list_seq), + ]); + if_then.extend(vec![ + txn_op_put(name_key, serialize_u64(procedure_id)?), // (tenant, procedure_name) -> procedure_id + txn_op_put(&id_key, serialize_struct(&req.meta)?), // (procedure_id) -> procedure_meta + txn_op_put(&procedureid_idlist, serialize_struct(&procedure_id_list)?), /* _fd_procedure_id_list// -> procedure_id_list */ + txn_op_put(&id_to_name_key, serialize_struct(&ProcedureNameIdentRaw::from(name_key))?), /* __fd_procedure_id_to_name/ -> (tenant,procedure_name) */ + ]); + + let txn_req = TxnRequest { + condition, + if_then, + else_then: vec![], + }; + + let (succ, _responses) = send_txn(self.kv_api.as_ref(), txn_req).await?; + + debug!( + name :? =(name_key), + id :? =(&id_key), + succ = succ; + "create_procedure" + ); + + if succ { + info!( + "procedure name: {}, meta: {}", + req.name_ident.procedure_name(), + &req.meta + ); + return Ok(CreateProcedureReply { procedure_id }); + } + } + } + } + + /// Drop the tenant's PROCEDURE by name, return the dropped one or None if nothing is dropped. + #[async_backtrace::framed] + #[fastrace::trace] + pub async fn drop_procedure( + &self, + req: DropProcedureReq, + ) -> Result { + debug!(req :? =(&req); "SchemaApi: {}", func_name!()); + + let tenant_procedure_name = &req.name_ident; + + let mut trials = txn_backoff(None, func_name!()); + loop { + trials.next().unwrap()?.await; + + let mut condition = vec![]; + let mut if_then = vec![]; + + let procedure_id = drop_procedure_meta( + self.kv_api.as_ref(), + tenant_procedure_name, + req.if_exists, + true, + &mut condition, + &mut if_then, + ) + .await?; + let txn_req = TxnRequest { + condition, + if_then, + else_then: vec![], + }; + + let (succ, _responses) = send_txn(self.kv_api.as_ref(), txn_req).await?; + + debug!( + name :? =(tenant_procedure_name), + succ = succ; + "drop_procedure" + ); + + if succ { + return Ok(DropProcedureReply { procedure_id }); + } + } + } + + #[fastrace::trace] + pub async fn get_procedure(&self, req: GetProcedureReq) -> Result { + debug!(req :? =(&req); "SchemaApi: {}", func_name!()); + + let name_key = &req.inner; + + let (_, procedure_id, procedure_meta_seq, procedure_meta) = + get_procedure_or_err(self.kv_api.as_ref(), name_key, "get_procedure").await?; + + let procedure = ProcedureInfo { + ident: ProcedureIdent { + procedure_id, + seq: procedure_meta_seq, + }, + name_ident: name_key.clone(), + meta: procedure_meta, + }; + + Ok(procedure) + } + #[fastrace::trace] + pub async fn list_procedures( + &self, + req: ListProcedureReq, + ) -> Result>, KVAppError> { + debug!(req :? =(&req); "SchemaApi: {}", func_name!()); + + // Using a empty procedure to to list all + let name_key = ProcedureNameIdent::new(req.tenant(), ""); + + // Pairs of procedure-name and procedure_id with seq + let (tenant_procedure_names, procedure_ids) = + list_u64_value(self.kv_api.as_ref(), &name_key).await?; + + // Keys for fetching serialized ProcedureMeta from kvapi::KVApi + let mut kv_keys = Vec::with_capacity(procedure_ids.len()); + + for procedure_id in procedure_ids.iter() { + let k = ProcedureId { + procedure_id: *procedure_id, + } + .to_string_key(); + kv_keys.push(k); + } + + // Batch get all procedure-metas. + // - A procedure-meta may be already deleted. It is Ok. Just ignore it. + let seq_metas = self.kv_api.mget_kv(&kv_keys).await?; + let mut procedure_infos = Vec::with_capacity(kv_keys.len()); + + for (i, seq_meta_opt) in seq_metas.iter().enumerate() { + if let Some(seq_meta) = seq_meta_opt { + let procedure_meta: ProcedureMeta = deserialize_struct(&seq_meta.data)?; + + let procedure_info = ProcedureInfo { + ident: ProcedureIdent { + procedure_id: procedure_ids[i], + seq: seq_meta.seq, + }, + name_ident: ProcedureNameIdent::new( + name_key.tenant(), + tenant_procedure_names[i].procedure_name(), + ), + meta: procedure_meta, + }; + procedure_infos.push(Arc::new(procedure_info)); + } else { + debug!( + k = &kv_keys[i]; + "procedure_meta not found, maybe just deleted after listing names and before listing meta" + ); + } + } + + Ok(procedure_infos) + } +} + +/// Returns (procedure_id_seq, procedure_id, procedure_meta_seq, procedure_meta) +pub(crate) async fn get_procedure_or_err( + kv_api: &(impl kvapi::KVApi + ?Sized), + name_key: &ProcedureNameIdent, + msg: impl Display, +) -> Result<(u64, u64, u64, ProcedureMeta), KVAppError> { + let (procedure_id_seq, procedure_id) = get_u64_value(kv_api, name_key).await?; + procedure_has_to_exist(procedure_id_seq, name_key, &msg)?; + + let id_key = ProcedureId { procedure_id }; + + let (procedure_meta_seq, procedure_meta) = get_pb_value(kv_api, &id_key).await?; + procedure_has_to_exist(procedure_meta_seq, name_key, msg)?; + + Ok(( + procedure_id_seq, + procedure_id, + procedure_meta_seq, + // Safe unwrap(): procedure_meta_seq > 0 implies procedure_meta is not None. + procedure_meta.unwrap(), + )) +} + +async fn drop_procedure_meta( + kv_api: &(impl kvapi::KVApi + ?Sized), + tenant_procedurename: &ProcedureNameIdent, + if_exists: bool, + drop_name_key: bool, + condition: &mut Vec, + if_then: &mut Vec, +) -> Result { + let res = get_procedure_or_err( + kv_api, + tenant_procedurename, + format!("drop_procedure: {}", tenant_procedurename.display()), + ) + .await; + + let (procedure_id_seq, procedure_id, _, _) = match res { + Ok(x) => x, + Err(e) => { + if let KVAppError::AppError(AppError::UnknownProcedure(_)) = e { + if if_exists { + return Ok(0); + } + } + + return Err(e); + } + }; + + // remove procedure_name -> procedure id + if drop_name_key { + condition.push(txn_cond_seq(tenant_procedurename, Eq, procedure_id_seq)); + if_then.push(txn_op_del(tenant_procedurename)); // (tenant, procedure_name) -> procedure_id + } + + { + // Delete procedure by these operations: + // del (tenant, procedure_name) -> procedure_id + debug!( + procedure_id = procedure_id, + name_key :? =(tenant_procedurename); + "drop_procedure" + ); + + // add procedureIdListKey if not exists + let procedureid_idlist = ProcedureIdIdent::new( + tenant_procedurename.tenant(), + tenant_procedurename.procedure_name(), + ); + let (procedure_id_list_seq, procedure_id_list_opt): (_, Option) = + get_pb_value(kv_api, &procedureid_idlist).await?; + + if procedure_id_list_seq == 0 || procedure_id_list_opt.is_none() { + warn!( + "drop procedure:{:?}, procedure_id:{:?} has no procedureIdListKey", + tenant_procedurename, procedure_id + ); + + let mut procedure_id_list = ProcedureIdList::new(); + procedure_id_list.append(procedure_id); + + condition.push(txn_cond_seq(&procedureid_idlist, Eq, procedure_id_list_seq)); + // _fd_procedure_id_list// -> procedure_id_list + if_then.push(txn_op_put( + &procedureid_idlist, + serialize_struct(&procedure_id_list)?, + )); + }; + } + + Ok(procedure_id) +} diff --git a/src/query/service/src/databases/system/system_database.rs b/src/query/service/src/databases/system/system_database.rs index 7eb10ea42a4fa..10ed59833cc67 100644 --- a/src/query/service/src/databases/system/system_database.rs +++ b/src/query/service/src/databases/system/system_database.rs @@ -47,6 +47,7 @@ use databend_common_storages_system::NotificationHistoryTable; use databend_common_storages_system::NotificationsTable; use databend_common_storages_system::OneTable; use databend_common_storages_system::PasswordPoliciesTable; +use databend_common_storages_system::ProceduresTable; use databend_common_storages_system::ProcessesTable; use databend_common_storages_system::QueriesProfilingTable; use databend_common_storages_system::QueryCacheTable; @@ -140,6 +141,7 @@ impl SystemDatabase { NotificationHistoryTable::create(sys_db_meta.next_table_id()), ViewsTableWithHistory::create(sys_db_meta.next_table_id()), ViewsTableWithoutHistory::create(sys_db_meta.next_table_id()), + ProceduresTable::create(sys_db_meta.next_table_id()), ]; let disable_tables = Self::disable_system_tables(); diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index c24e31a6c7f2e..db990426ef978 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -1271,9 +1271,16 @@ impl AccessChecker for PrivilegeAccess { Plan::ExistsTable(_) => {} Plan::DescDatamaskPolicy(_) => {} Plan::Begin => {} + Plan::ExecuteImmediate(_) + | Plan::CreateProcedure(_) + | Plan::DropProcedure(_) + /*| Plan::ShowCreateProcedure(_) + | Plan::RenameProcedure(_)*/ => { + self.validate_access(&GrantObject::Global, UserPrivilegeType::Super, false, false) + .await?; + } Plan::Commit => {} Plan::Abort => {} - Plan::ExecuteImmediate(_) => {} } Ok(()) diff --git a/src/query/service/src/interpreters/interpreter_factory.rs b/src/query/service/src/interpreters/interpreter_factory.rs index 7ed7f0fc1248f..f95c2ac848e7e 100644 --- a/src/query/service/src/interpreters/interpreter_factory.rs +++ b/src/query/service/src/interpreters/interpreter_factory.rs @@ -52,6 +52,8 @@ use crate::interpreters::interpreter_notification_create::CreateNotificationInte use crate::interpreters::interpreter_notification_desc::DescNotificationInterpreter; use crate::interpreters::interpreter_notification_drop::DropNotificationInterpreter; use crate::interpreters::interpreter_presign::PresignInterpreter; +use crate::interpreters::interpreter_procedure_create::CreateProcedureInterpreter; +use crate::interpreters::interpreter_procedure_drop::DropProcedureInterpreter; use crate::interpreters::interpreter_role_show::ShowRolesInterpreter; use crate::interpreters::interpreter_set_priority::SetPriorityInterpreter; use crate::interpreters::interpreter_system_action::SystemActionInterpreter; @@ -627,6 +629,20 @@ impl InterpreterFactory { ctx, *drop_dict.clone(), )?)), + Plan::CreateProcedure(p) => Ok(Arc::new(CreateProcedureInterpreter::try_create( + ctx, + *p.clone(), + )?)), + Plan::DropProcedure(p) => Ok(Arc::new(DropProcedureInterpreter::try_create( + ctx, + *p.clone(), + )?)), + // Plan::ShowCreateProcedure(_) => {} + // + // Plan::RenameProcedure(p) => Ok(Arc::new(RenameProcedureInterpreter::try_create( + // ctx, + // p.clone(), + // )?)), } } } diff --git a/src/query/service/src/interpreters/interpreter_procedure_create.rs b/src/query/service/src/interpreters/interpreter_procedure_create.rs new file mode 100644 index 0000000000000..31f7a70228108 --- /dev/null +++ b/src/query/service/src/interpreters/interpreter_procedure_create.rs @@ -0,0 +1,64 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_meta_app::principal::CreateProcedureReq; +use databend_common_sql::plans::CreateProcedurePlan; +use databend_common_users::UserApiProvider; +use log::debug; + +use crate::interpreters::Interpreter; +use crate::pipelines::PipelineBuildResult; +use crate::sessions::QueryContext; +use crate::sessions::TableContext; + +#[derive(Debug)] +pub struct CreateProcedureInterpreter { + ctx: Arc, + plan: CreateProcedurePlan, +} + +impl CreateProcedureInterpreter { + pub fn try_create(ctx: Arc, plan: CreateProcedurePlan) -> Result { + Ok(CreateProcedureInterpreter { ctx, plan }) + } +} + +#[async_trait::async_trait] +impl Interpreter for CreateProcedureInterpreter { + fn name(&self) -> &str { + "CreateProcedureInterpreter" + } + + fn is_ddl(&self) -> bool { + true + } + + #[fastrace::trace] + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + debug!("ctx.id" = self.ctx.get_id().as_str(); "create_procedure_execute"); + + let tenant = self.plan.tenant.clone(); + + let create_procedure_req: CreateProcedureReq = self.plan.clone().into(); + let _ = UserApiProvider::instance() + .add_procedure(&tenant, create_procedure_req) + .await?; + + Ok(PipelineBuildResult::create()) + } +} diff --git a/src/query/service/src/interpreters/interpreter_procedure_drop.rs b/src/query/service/src/interpreters/interpreter_procedure_drop.rs new file mode 100644 index 0000000000000..abb710155fa46 --- /dev/null +++ b/src/query/service/src/interpreters/interpreter_procedure_drop.rs @@ -0,0 +1,64 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_meta_app::principal::DropProcedureReq; +use databend_common_sql::plans::DropProcedurePlan; +use databend_common_users::UserApiProvider; +use log::debug; + +use crate::interpreters::Interpreter; +use crate::pipelines::PipelineBuildResult; +use crate::sessions::QueryContext; +use crate::sessions::TableContext; + +#[derive(Debug)] +pub struct DropProcedureInterpreter { + ctx: Arc, + pub(crate) plan: DropProcedurePlan, +} + +impl DropProcedureInterpreter { + pub fn try_create(ctx: Arc, plan: DropProcedurePlan) -> Result { + Ok(DropProcedureInterpreter { ctx, plan }) + } +} + +#[async_trait::async_trait] +impl Interpreter for DropProcedureInterpreter { + fn name(&self) -> &str { + "DropProcedureInterpreter" + } + + fn is_ddl(&self) -> bool { + true + } + + #[fastrace::trace] + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + debug!("ctx.id" = self.ctx.get_id().as_str(); "drop_procedure_execute"); + + let tenant = self.plan.tenant.clone(); + + let drop_procedure_req: DropProcedureReq = self.plan.clone().into(); + let _ = UserApiProvider::instance() + .drop_procedure(&tenant, drop_procedure_req) + .await?; + + Ok(PipelineBuildResult::create()) + } +} diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index b1fff3e7a59d4..5090aa00bbdd8 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -72,6 +72,8 @@ mod interpreter_password_policy_drop; mod interpreter_presign; mod interpreter_privilege_grant; mod interpreter_privilege_revoke; +mod interpreter_procedure_create; +mod interpreter_procedure_drop; mod interpreter_replace; mod interpreter_role_create; mod interpreter_role_drop; diff --git a/src/query/service/tests/it/storages/testdata/columns_table.txt b/src/query/service/tests/it/storages/testdata/columns_table.txt index a5f816174388c..e9025100e1571 100644 --- a/src/query/service/tests/it/storages/testdata/columns_table.txt +++ b/src/query/service/tests/it/storages/testdata/columns_table.txt @@ -12,6 +12,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'after' | 'system' | 'tasks' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'agg_spilled_bytes' | 'system' | 'query_log' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | | 'agg_spilled_rows' | 'system' | 'query_log' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | +| 'arguments' | 'system' | 'procedures' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'arguments' | 'system' | 'user_functions' | 'Variant' | 'VARIANT' | '' | '' | 'NO' | '' | | 'attempt_number' | 'system' | 'task_history' | 'Int32' | 'INT' | '' | '' | 'NO' | '' | | 'auth_type' | 'system' | 'users' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -60,6 +61,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'comment' | 'system' | 'columns' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'comment' | 'system' | 'notifications' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' | | 'comment' | 'system' | 'password_policies' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | +| 'comment' | 'system' | 'procedures' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'comment' | 'system' | 'stages' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'comment' | 'system' | 'streams' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'comment' | 'system' | 'tables' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -85,6 +87,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'created_on' | 'system' | 'notification_history' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'notifications' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'password_policies' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | +| 'created_on' | 'system' | 'procedures' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'roles' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'stages' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | | 'created_on' | 'system' | 'streams' | 'Timestamp' | 'TIMESTAMP' | '' | '' | 'NO' | '' | @@ -140,6 +143,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'definition' | 'system' | 'user_functions' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'description' | 'system' | 'configs' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'description' | 'system' | 'functions' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | +| 'description' | 'system' | 'procedures' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'description' | 'system' | 'settings' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'description' | 'system' | 'user_functions' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'disabled' | 'system' | 'users' | 'Boolean' | 'BOOLEAN' | '' | '' | 'NO' | '' | @@ -264,6 +268,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'name' | 'system' | 'malloc_stats_totals' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'notifications' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'password_policies' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | +| 'name' | 'system' | 'procedures' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'roles' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'settings' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'name' | 'system' | 'stages' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | @@ -324,6 +329,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'port' | 'system' | 'clusters' | 'UInt16' | 'SMALLINT UNSIGNED' | '' | '' | 'NO' | '' | | 'position_in_unique_constraint' | 'information_schema' | 'key_column_usage' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | | 'privileges' | 'information_schema' | 'columns' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | +| 'procedure_id' | 'system' | 'procedures' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | | 'processed' | 'system' | 'notification_history' | 'Nullable(Timestamp)' | 'TIMESTAMP' | '' | '' | 'YES' | '' | | 'projections' | 'system' | 'query_log' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'query_duration_ms' | 'system' | 'query_log' | 'Int64' | 'BIGINT' | '' | '' | 'NO' | '' | diff --git a/src/query/sql/src/planner/binder/binder.rs b/src/query/sql/src/planner/binder/binder.rs index db41f8e6162d3..0c2a31e00ff7c 100644 --- a/src/query/sql/src/planner/binder/binder.rs +++ b/src/query/sql/src/planner/binder/binder.rs @@ -640,7 +640,7 @@ impl<'a> Binder { return Err(ErrorCode::SyntaxException("DROP PROCEDURE, set enable_experimental_procedure=1")); } } Statement::ShowProcedures { show_options } => { if self.ctx.get_settings().get_enable_experimental_procedure()? { - self.show_procedures(show_options).await? + self.bind_show_procedures(bind_context, show_options).await? } else { return Err(ErrorCode::SyntaxException("SHOW PROCEDURES, set enable_experimental_procedure=1")); } } @@ -649,6 +649,13 @@ impl<'a> Binder { } else { return Err(ErrorCode::SyntaxException("DESC PROCEDURE, set enable_experimental_procedure=1")); } } + Statement::CallProcedure(stmt) => { + if self.ctx.get_settings().get_enable_experimental_procedure()? { + self.bind_call_procedure(stmt).await? + } else { + return Err(ErrorCode::SyntaxException("DESC PROCEDURE, set enable_experimental_procedure=1")); + } + } }; match plan.kind() { diff --git a/src/query/sql/src/planner/binder/ddl/procedure.rs b/src/query/sql/src/planner/binder/ddl/procedure.rs index 425281dcd7480..3073565cf10b7 100644 --- a/src/query/sql/src/planner/binder/ddl/procedure.rs +++ b/src/query/sql/src/planner/binder/ddl/procedure.rs @@ -12,15 +12,30 @@ // See the License for the specific language governing permissions and // limitations under the License. +use chrono::Utc; +use databend_common_ast::ast::CallProcedureStmt; use databend_common_ast::ast::CreateProcedureStmt; use databend_common_ast::ast::DescProcedureStmt; use databend_common_ast::ast::DropProcedureStmt; use databend_common_ast::ast::ExecuteImmediateStmt; +use databend_common_ast::ast::ProcedureLanguage; +use databend_common_ast::ast::ProcedureReturnType; use databend_common_ast::ast::ShowOptions; use databend_common_exception::Result; +use databend_common_expression::types::DataType; +use databend_common_meta_app::principal::GetProcedureReq; +use databend_common_meta_app::principal::ProcedureMeta; +use databend_common_meta_app::principal::ProcedureNameIdent; +use databend_common_users::UserApiProvider; +use crate::binder::show::get_show_options; +use crate::plans::CreateProcedurePlan; +use crate::plans::DropProcedurePlan; use crate::plans::ExecuteImmediatePlan; use crate::plans::Plan; +use crate::plans::RewriteKind; +use crate::resolve_type_name; +use crate::BindContext; use crate::Binder; impl Binder { @@ -40,28 +55,100 @@ impl Binder { create_option, name, language, - args, + args: _args, return_type, comment, script, } = stmt; - todo!() + let tenant = self.ctx.get_tenant(); + // TODO: + // 1. need parser name: ProcedureNameIdent = name + args + // 2. need check script's return type and stmt.return_type + + let meta = self.procedure_meta(return_type, script, comment, language)?; + Ok(Plan::CreateProcedure(Box::new(CreateProcedurePlan { + create_option: create_option.clone().into(), + tenant, + name: name.to_owned(), + meta, + }))) } pub async fn bind_drop_procedure(&mut self, stmt: &DropProcedureStmt) -> Result { - let DropProcedureStmt { name, args } = stmt; + let DropProcedureStmt { name, args: _args } = stmt; + + let tenant = self.ctx.get_tenant(); + // TODO: need parser name: ProcedureNameIdent = name + args + Ok(Plan::DropProcedure(Box::new(DropProcedurePlan { + if_exists: false, + tenant: tenant.to_owned(), + name: ProcedureNameIdent::new(tenant, name), + }))) + } + pub async fn bind_desc_procedure(&mut self, _stmt: &DescProcedureStmt) -> Result { todo!() } - pub async fn bind_desc_procedure(&mut self, stmt: &DescProcedureStmt) -> Result { - let DescProcedureStmt { name, args } = stmt; + pub async fn bind_show_procedures( + &mut self, + bind_context: &mut BindContext, + show_options: &Option, + ) -> Result { + let (show_limit, limit_str) = get_show_options(show_options, None); + let query = format!( + "SELECT name, procedure_id, arguments, comment, description, created_on FROM system.procedures {} ORDER BY name {}", + show_limit, limit_str, + ); - todo!() + self.bind_rewrite_to_query(bind_context, &query, RewriteKind::ShowProcedures) + .await } - pub async fn show_procedures(&mut self, show_options: &Option) -> Result { - todo!() + pub async fn bind_call_procedure(&mut self, stmt: &CallProcedureStmt) -> Result { + let CallProcedureStmt { name, args: _args } = stmt; + let tenant = self.ctx.get_tenant(); + // TODO: ProcedureNameIdent = name + args_type. Need to get type in here. + let req = GetProcedureReq { + inner: ProcedureNameIdent::new(tenant.clone(), name), + }; + let procedure = UserApiProvider::instance() + .get_procedure(&tenant, req) + .await?; + Ok(Plan::ExecuteImmediate(Box::new(ExecuteImmediatePlan { + script: procedure.meta.script, + }))) + } + + fn procedure_meta( + &self, + return_type: &[ProcedureReturnType], + script: &str, + comment: &Option, + language: &ProcedureLanguage, + ) -> Result { + let mut return_types = Vec::with_capacity(return_type.len()); + for arg_type in return_type { + return_types.push(DataType::from(&resolve_type_name( + &arg_type.data_type, + true, + )?)); + } + + Ok(ProcedureMeta { + return_types, + created_on: Utc::now(), + updated_on: Utc::now(), + script: script.to_string(), + comment: if let Some(comment) = comment { + comment.to_owned() + } else { + "".to_string() + }, + procedure_language: match language { + ProcedureLanguage::SQL => "SQL".to_string(), + }, + }) } } diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index f034eb5bd8d42..f8499a94eec5e 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -204,6 +204,10 @@ impl Plan { // Stored procedures Plan::ExecuteImmediate(_) => Ok("ExecuteImmediate".to_string()), + Plan::CreateProcedure(_) => Ok("CreateProcedure".to_string()), + Plan::DropProcedure(_) => Ok("DropProcedure".to_string()), + // Plan::ShowCreateProcedure(_) => Ok("ShowCreateProcedure".to_string()), + // Plan::RenameProcedure(_) => Ok("ProcedureDatabase".to_string()), // sequence Plan::CreateSequence(_) => Ok("CreateSequence".to_string()), diff --git a/src/query/sql/src/planner/plans/ddl/procedure.rs b/src/query/sql/src/planner/plans/ddl/procedure.rs index 8b26fd7c3e06b..3a44b4681502e 100644 --- a/src/query/sql/src/planner/plans/ddl/procedure.rs +++ b/src/query/sql/src/planner/plans/ddl/procedure.rs @@ -16,6 +16,12 @@ use databend_common_expression::types::DataType; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRef; use databend_common_expression::DataSchemaRefExt; +use databend_common_meta_app::principal::CreateProcedureReq; +use databend_common_meta_app::principal::DropProcedureReq; +use databend_common_meta_app::principal::ProcedureMeta; +use databend_common_meta_app::principal::ProcedureNameIdent; +use databend_common_meta_app::schema::CreateOption; +use databend_common_meta_app::tenant::Tenant; #[derive(Clone, Debug, PartialEq)] pub struct ExecuteImmediatePlan { @@ -27,3 +33,56 @@ impl ExecuteImmediatePlan { DataSchemaRefExt::create(vec![DataField::new("Result", DataType::String)]) } } + +#[derive(Debug, Clone)] +pub struct CreateProcedurePlan { + pub create_option: CreateOption, + pub tenant: Tenant, + pub name: String, + pub meta: ProcedureMeta, +} + +impl From for CreateProcedureReq { + fn from(p: CreateProcedurePlan) -> Self { + CreateProcedureReq { + create_option: p.create_option, + name_ident: ProcedureNameIdent::new(&p.tenant, &p.name), + meta: p.meta, + } + } +} + +impl From<&CreateProcedurePlan> for CreateProcedureReq { + fn from(p: &CreateProcedurePlan) -> Self { + CreateProcedureReq { + create_option: p.create_option, + name_ident: ProcedureNameIdent::new(&p.tenant, &p.name), + meta: p.meta.clone(), + } + } +} + +#[derive(Debug, Clone)] +pub struct DropProcedurePlan { + pub if_exists: bool, + pub tenant: Tenant, + pub name: ProcedureNameIdent, +} + +impl From for DropProcedureReq { + fn from(p: DropProcedurePlan) -> Self { + DropProcedureReq { + if_exists: p.if_exists, + name_ident: p.name, + } + } +} + +impl From<&DropProcedurePlan> for DropProcedureReq { + fn from(p: &DropProcedurePlan) -> Self { + DropProcedureReq { + if_exists: p.if_exists, + name_ident: p.name.clone(), + } + } +} diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index c633c1a08d284..c73c299cee81f 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -54,6 +54,7 @@ use crate::plans::CreateIndexPlan; use crate::plans::CreateNetworkPolicyPlan; use crate::plans::CreateNotificationPlan; use crate::plans::CreatePasswordPolicyPlan; +use crate::plans::CreateProcedurePlan; use crate::plans::CreateRolePlan; use crate::plans::CreateSequencePlan; use crate::plans::CreateShareEndpointPlan; @@ -85,6 +86,7 @@ use crate::plans::DropIndexPlan; use crate::plans::DropNetworkPolicyPlan; use crate::plans::DropNotificationPlan; use crate::plans::DropPasswordPolicyPlan; +use crate::plans::DropProcedurePlan; use crate::plans::DropRolePlan; use crate::plans::DropSequencePlan; use crate::plans::DropShareEndpointPlan; @@ -376,6 +378,10 @@ pub enum Plan { // Stored procedures ExecuteImmediate(Box), + // ShowCreateProcedure(Box), + DropProcedure(Box), + CreateProcedure(Box), + // RenameProcedure(Box), // sequence CreateSequence(Box), @@ -419,6 +425,7 @@ pub enum RewriteKind { ShowGrants, Call, + ShowProcedures, } impl Plan { diff --git a/src/query/storages/system/src/lib.rs b/src/query/storages/system/src/lib.rs index 80fefe546266d..ce3d1cfd7d4d7 100644 --- a/src/query/storages/system/src/lib.rs +++ b/src/query/storages/system/src/lib.rs @@ -45,6 +45,7 @@ mod notification_history_table; mod notifications_table; mod one_table; mod password_policies_table; +mod procedures_table; mod processes_table; mod queries_profiling; mod query_cache_table; @@ -94,6 +95,7 @@ pub use notifications_table::parse_notifications_to_datablock; pub use notifications_table::NotificationsTable; pub use one_table::OneTable; pub use password_policies_table::PasswordPoliciesTable; +pub use procedures_table::ProceduresTable; pub use processes_table::ProcessesTable; pub use queries_profiling::ProfilesLogElement; pub use queries_profiling::ProfilesLogQueue; diff --git a/src/query/storages/system/src/procedures_table.rs b/src/query/storages/system/src/procedures_table.rs new file mode 100644 index 0000000000000..bd981df8c73d8 --- /dev/null +++ b/src/query/storages/system/src/procedures_table.rs @@ -0,0 +1,136 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::table::Table; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::TimestampType; +use databend_common_expression::types::UInt64Type; +use databend_common_expression::utils::FromData; +use databend_common_expression::DataBlock; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_common_expression::TableSchemaRefExt; +use databend_common_meta_app::principal::ListProcedureReq; +use databend_common_meta_app::schema::TableIdent; +use databend_common_meta_app::schema::TableInfo; +use databend_common_meta_app::schema::TableMeta; +use databend_common_users::UserApiProvider; +use itertools::Itertools; + +use crate::table::AsyncOneBlockSystemTable; +use crate::table::AsyncSystemTable; + +pub struct ProceduresTable { + table_info: TableInfo, +} + +#[async_trait::async_trait] +impl AsyncSystemTable for ProceduresTable { + const NAME: &'static str = "system.procedures"; + + fn get_table_info(&self) -> &TableInfo { + &self.table_info + } + + #[async_backtrace::framed] + async fn get_full_data( + &self, + ctx: Arc, + _push_downs: Option, + ) -> Result { + let tenant = ctx.get_tenant(); + let user_api = UserApiProvider::instance(); + let mgr = user_api.procedure_api(&tenant); + let procedures = mgr + .list_procedures(ListProcedureReq { + tenant, + filter: None, + }) + .await?; + + let mut names = Vec::with_capacity(procedures.len()); + let mut procedure_ids = Vec::with_capacity(procedures.len()); + let mut languages = Vec::with_capacity(procedures.len()); + let mut descriptions = Vec::with_capacity(procedures.len()); + // TODO: argument = name + arg_type + return_type + // +------------------------------------+ + // | arguments | + // +------------------------------------+ + // | AREA_OF_CIRCLE(FLOAT) RETURN FLOAT | + // +------------------------------------+ + let mut arguments = Vec::with_capacity(procedures.len()); + let mut comments = Vec::with_capacity(procedures.len()); + let mut created_ons = Vec::with_capacity(procedures.len()); + + for procedure in &procedures { + names.push(procedure.name_ident.procedure_name()); + procedure_ids.push(procedure.ident.procedure_id); + arguments.push(format!( + "{} RETURN ({})", + procedure.name_ident.procedure_name(), + procedure.meta.return_types.iter().join(",") + )); + languages.push(procedure.meta.procedure_language.as_str()); + descriptions.push("user-defined procedure"); + comments.push(procedure.meta.comment.as_str()); + + created_ons.push(procedure.meta.created_on.timestamp_micros()); + } + + Ok(DataBlock::new_from_columns(vec![ + StringType::from_data(names), + UInt64Type::from_data(procedure_ids), + StringType::from_data(arguments), + StringType::from_data(comments), + StringType::from_data(descriptions), + TimestampType::from_data(created_ons), + ])) + } +} + +impl ProceduresTable { + pub fn create(table_id: u64) -> Arc { + let schema = TableSchemaRefExt::create(vec![ + TableField::new("name", TableDataType::String), + TableField::new( + "procedure_id", + TableDataType::Number(NumberDataType::UInt64), + ), + TableField::new("arguments", TableDataType::String), + TableField::new("comment", TableDataType::String), + TableField::new("description", TableDataType::String), + TableField::new("created_on", TableDataType::Timestamp), + ]); + + let table_info = TableInfo { + desc: "'system'.'procedures'".to_string(), + name: "procedures".to_string(), + ident: TableIdent::new(table_id, 0), + meta: TableMeta { + schema, + engine: "SystemDatabases".to_string(), + ..Default::default() + }, + ..Default::default() + }; + + AsyncOneBlockSystemTable::create(ProceduresTable { table_info }) + } +} diff --git a/src/query/users/src/lib.rs b/src/query/users/src/lib.rs index 76518d04819ad..2bf6764a271c5 100644 --- a/src/query/users/src/lib.rs +++ b/src/query/users/src/lib.rs @@ -34,6 +34,7 @@ pub mod connection; pub mod file_format; pub mod role_cache_mgr; pub mod role_util; +mod user_procedure; pub use jwt::*; pub use password_policy::*; diff --git a/src/query/users/src/user_api.rs b/src/query/users/src/user_api.rs index e32c9413378c8..72eedc3df4e6a 100644 --- a/src/query/users/src/user_api.rs +++ b/src/query/users/src/user_api.rs @@ -25,6 +25,7 @@ use databend_common_management::ConnectionMgr; use databend_common_management::FileFormatMgr; use databend_common_management::NetworkPolicyMgr; use databend_common_management::PasswordPolicyMgr; +use databend_common_management::ProcedureMgr; use databend_common_management::QuotaApi; use databend_common_management::QuotaMgr; use databend_common_management::RoleApi; @@ -154,6 +155,9 @@ impl UserApiProvider { pub fn setting_api(&self, tenant: &Tenant) -> Arc { Arc::new(SettingMgr::create(self.client.clone(), tenant)) } + pub fn procedure_api(&self, tenant: &Tenant) -> ProcedureMgr { + ProcedureMgr::create(self.client.clone(), tenant) + } pub fn network_policy_api(&self, tenant: &Tenant) -> NetworkPolicyMgr { NetworkPolicyMgr::create(self.client.clone(), tenant) diff --git a/src/query/users/src/user_procedure.rs b/src/query/users/src/user_procedure.rs new file mode 100644 index 0000000000000..11d78a8d11007 --- /dev/null +++ b/src/query/users/src/user_procedure.rs @@ -0,0 +1,52 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_meta_app::principal::procedure::ProcedureInfo; +use databend_common_meta_app::principal::CreateProcedureReq; +use databend_common_meta_app::principal::DropProcedureReq; +use databend_common_meta_app::principal::GetProcedureReq; +use databend_common_meta_app::tenant::Tenant; + +use crate::UserApiProvider; + +/// Procedure operations. +impl UserApiProvider { + // Add a new Procedure. + #[async_backtrace::framed] + pub async fn add_procedure(&self, tenant: &Tenant, req: CreateProcedureReq) -> Result<()> { + let procedure_api = self.procedure_api(tenant); + procedure_api.create_procedure(req).await?; + Ok(()) + } + + #[async_backtrace::framed] + pub async fn get_procedure( + &self, + tenant: &Tenant, + req: GetProcedureReq, + ) -> Result { + let procedure_api = self.procedure_api(tenant); + let procedure = procedure_api.get_procedure(req).await?; + + Ok(procedure) + } + + // Drop a Procedure by name. + #[async_backtrace::framed] + pub async fn drop_procedure(&self, tenant: &Tenant, req: DropProcedureReq) -> Result<()> { + let _ = self.procedure_api(tenant).drop_procedure(req).await?; + Ok(()) + } +} diff --git a/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test b/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test new file mode 100644 index 0000000000000..de02511306c69 --- /dev/null +++ b/tests/sqllogictests/suites/base/15_procedure/15_0002_procedure.test @@ -0,0 +1,33 @@ +statement ok +set global enable_experimental_procedure=1; + +statement ok +CREATE PROCEDURE p1() RETURNS int not null LANGUAGE SQL COMMENT='test' AS $$ +BEGIN + LET x := -1; + LET sum := 0; + FOR x IN x TO x + 3 DO + sum := sum + x; + END FOR; + RETURN sum; +END; +$$; + + +query T +call procedure p1(); +---- +2 + + +query T +select name, arguments from system.procedures where name = 'p1'; +---- +p1 p1 RETURN (Int32) + + +statement ok +drop procedure p1(); + +statement ok +unset global enable_experimental_procedure;