diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index a7a958a0e2bd..7e1d75228939 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -391,6 +391,11 @@ build_exceptions! { UnsupportedDictionarySource(3117), MissingDictionaryOption(3118), WrongDictionaryFieldExpr(3119), + + // Procedure + UnknownProcedure(3130), + ProcedureAlreadyExists(3131), + IllegalProcedureFormat(3132), } // Storage errors [3001, 4000]. diff --git a/src/meta/api/src/lib.rs b/src/meta/api/src/lib.rs index d36bec898385..676e7237ad42 100644 --- a/src/meta/api/src/lib.rs +++ b/src/meta/api/src/lib.rs @@ -34,7 +34,7 @@ mod schema_api_test_suite; mod sequence_api; pub(crate) mod testing; pub mod txn_backoff; -pub(crate) mod util; +pub mod util; pub mod crud; mod sequence_api_impl; diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index d2adc527abe8..ec74e977bb8a 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -3689,7 +3689,7 @@ impl + ?Sized> SchemaApi for KV { req.tenant.clone(), DictionaryIdentity::new(req.db_id, "dummy".to_string()), ); - let dir = DirName::new(dictionary_ident); + let dir = DirName::new_with_level(dictionary_ident, 2); let name_id_values = self.list_id_value(&dir).await?; Ok(name_id_values diff --git a/src/meta/app/src/app_error.rs b/src/meta/app/src/app_error.rs index fbae4a8c0fc2..ce56c1b26dfa 100644 --- a/src/meta/app/src/app_error.rs +++ b/src/meta/app/src/app_error.rs @@ -19,6 +19,8 @@ use databend_common_meta_types::MatchSeq; use crate::background::job_ident; use crate::data_mask::data_mask_name_ident; +use crate::principal::procedure_name_ident; +use crate::principal::ProcedureIdentity; use crate::schema::catalog_name_ident; use crate::schema::dictionary_name_ident; use crate::schema::index_name_ident; @@ -1170,6 +1172,15 @@ pub enum AppError { UnknownDictionary( #[from] UnknownError, ), + + // Procedure + #[error(transparent)] + ProcedureAlreadyExists( + #[from] ExistError, + ), + + #[error(transparent)] + UnknownProcedure(#[from] UnknownError), } impl AppError { @@ -1699,6 +1710,10 @@ impl From for ErrorCode { ErrorCode::DictionaryAlreadyExists(err.message()) } AppError::UnknownDictionary(err) => ErrorCode::UnknownDictionary(err.message()), + AppError::UnknownProcedure(err) => ErrorCode::UnknownProcedure(err.message()), + AppError::ProcedureAlreadyExists(err) => { + ErrorCode::ProcedureAlreadyExists(err.message()) + } } } } diff --git a/src/meta/app/src/id_generator.rs b/src/meta/app/src/id_generator.rs index 5d08f98fac47..89dbb8b4ce86 100644 --- a/src/meta/app/src/id_generator.rs +++ b/src/meta/app/src/id_generator.rs @@ -29,6 +29,8 @@ pub(crate) const ID_GEN_SHARE_ENDPOINT: &str = "share_endpoint_id"; pub(crate) const ID_GEN_DATA_MASK: &str = "data_mask"; pub(crate) const ID_GEN_BACKGROUND_JOB: &str = "background_job"; +pub(crate) const ID_GEN_PROCEDURE: &str = "procedure_id"; + /// Key for resource id generator /// /// This is a special key for an application to generate unique id with kvapi::KVApi. @@ -111,6 +113,13 @@ impl IdGenerator { resource: ID_GEN_CATALOG.to_string(), } } + + /// Create a key for generating procedure id with kvapi::KVApi + pub fn procedure_id() -> Self { + Self { + resource: ID_GEN_PROCEDURE.to_string(), + } + } } impl kvapi::KeyCodec for IdGenerator { @@ -225,6 +234,16 @@ mod t { assert_eq!(g1, g2); } + // Procedure id generator + { + let g = IdGenerator::procedure_id(); + let k = g.to_string_key(); + assert_eq!("__fd_id_gen/procedure_id", k); + + let t2 = IdGenerator::from_str_key(&k)?; + assert_eq!(g, t2); + } + Ok(()) } diff --git a/src/meta/app/src/principal/mod.rs b/src/meta/app/src/principal/mod.rs index e114945bd9c2..80fc40dcf4ca 100644 --- a/src/meta/app/src/principal/mod.rs +++ b/src/meta/app/src/principal/mod.rs @@ -42,6 +42,11 @@ pub mod client_session_ident; pub mod connection_ident; pub mod network_policy_ident; pub mod password_policy_ident; +pub mod procedure; +pub mod procedure_id_ident; +pub mod procedure_id_to_name; +pub mod procedure_identity; +pub mod procedure_name_ident; pub mod stage_file_ident; pub mod tenant_ownership_object_ident; pub mod tenant_user_ident; @@ -60,6 +65,21 @@ pub use ownership_object::OwnershipObject; pub use password_policy::PasswordPolicy; pub use password_policy_ident::PasswordPolicyIdent; pub use principal_identity::PrincipalIdentity; +pub use procedure::CreateProcedureReply; +pub use procedure::CreateProcedureReq; +pub use procedure::DropProcedureReply; +pub use procedure::DropProcedureReq; +pub use procedure::GetProcedureReply; +pub use procedure::GetProcedureReq; +pub use procedure::ListProcedureReq; +pub use procedure::ProcedureInfoFilter; +pub use procedure::ProcedureMeta; +pub use procedure::RenameProcedureReply; +pub use procedure::RenameProcedureReq; +pub use procedure_id_ident::ProcedureId; +pub use procedure_id_to_name::ProcedureIdToNameIdent; +pub use procedure_identity::ProcedureIdentity; +pub use procedure_name_ident::ProcedureNameIdent; pub use role_ident::RoleIdent; pub use role_ident::RoleIdentRaw; pub use role_info::RoleInfo; diff --git a/src/meta/app/src/principal/procedure.rs b/src/meta/app/src/principal/procedure.rs new file mode 100644 index 000000000000..003d3a01dcad --- /dev/null +++ b/src/meta/app/src/principal/procedure.rs @@ -0,0 +1,196 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt; +use std::fmt::Display; +use std::fmt::Formatter; +use std::ops::Deref; + +use chrono::DateTime; +use chrono::Utc; +use databend_common_expression::types::DataType; + +use crate::principal::procedure_id_ident::ProcedureIdIdent; +use crate::principal::procedure_name_ident::ProcedureNameIdent; +use crate::principal::ProcedureIdentity; +use crate::schema::CreateOption; +use crate::tenant::Tenant; +use crate::tenant::ToTenant; +use crate::KeyWithTenant; + +#[derive(Clone, Debug, PartialEq)] +pub struct ProcedureInfo { + pub ident: ProcedureIdIdent, + pub name_ident: ProcedureNameIdent, + pub meta: ProcedureMeta, +} + +#[derive(Clone, Debug, Default, Eq, PartialEq)] +pub struct ProcedureIdent { + pub procedure_id: u64, + pub seq: u64, +} + +#[derive(Clone, Debug, PartialEq)] +pub struct ProcedureMeta { + pub return_types: Vec, + pub created_on: DateTime, + pub updated_on: DateTime, + pub script: String, + pub comment: String, + pub procedure_language: String, +} + +impl Default for ProcedureMeta { + fn default() -> Self { + ProcedureMeta { + return_types: vec![], + created_on: Utc::now(), + updated_on: Utc::now(), + script: "".to_string(), + comment: "".to_string(), + procedure_language: "SQL".to_string(), + } + } +} + +impl Display for ProcedureMeta { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "Lanuage: {:?}, return_type: {:?}, CreatedOn: {:?}, Script: {:?}, Comment: {:?}", + self.procedure_language, self.return_types, self.created_on, self.script, self.comment + ) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct CreateProcedureReq { + pub create_option: CreateOption, + pub name_ident: ProcedureNameIdent, + pub meta: ProcedureMeta, +} + +impl Display for CreateProcedureReq { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + let typ = match self.create_option { + CreateOption::Create => "create_procedure", + CreateOption::CreateIfNotExists => "create_procedure_if_not_exists", + CreateOption::CreateOrReplace => "create_or_replace_procedure", + }; + write!( + f, + "{}:{}/{}={:?}", + typ, + self.name_ident.tenant_name(), + self.name_ident.procedure_name(), + self.meta + ) + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct CreateProcedureReply { + pub procedure_id: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RenameProcedureReq { + pub if_exists: bool, + pub name_ident: ProcedureNameIdent, + pub new_procedure_name: String, +} + +impl Display for RenameProcedureReq { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!( + f, + "rename_procedure:{}/{}=>{}", + self.name_ident.tenant_name(), + self.name_ident.procedure_name(), + self.new_procedure_name + ) + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RenameProcedureReply {} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct DropProcedureReq { + pub if_exists: bool, + pub name_ident: ProcedureNameIdent, +} + +impl Display for DropProcedureReq { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!( + f, + "drop_procedure(if_exists={}):{}/{}", + self.if_exists, + self.name_ident.tenant_name(), + self.name_ident.procedure_name(), + ) + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct DropProcedureReply { + pub procedure_id: u64, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct GetProcedureReq { + pub inner: ProcedureNameIdent, +} + +impl Deref for GetProcedureReq { + type Target = ProcedureNameIdent; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl GetProcedureReq { + pub fn new(tenant: impl ToTenant, procedure_name: ProcedureIdentity) -> GetProcedureReq { + GetProcedureReq { + inner: ProcedureNameIdent::new(tenant, procedure_name), + } + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct GetProcedureReply { + pub id: u64, + pub procedure_meta: ProcedureMeta, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum ProcedureInfoFilter { + // include all dropped procedures + IncludeDropped, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct ListProcedureReq { + pub tenant: Tenant, + pub filter: Option, +} + +impl ListProcedureReq { + pub fn tenant(&self) -> &Tenant { + &self.tenant + } +} diff --git a/src/meta/app/src/principal/procedure_id_ident.rs b/src/meta/app/src/principal/procedure_id_ident.rs new file mode 100644 index 000000000000..83ed4f563999 --- /dev/null +++ b/src/meta/app/src/principal/procedure_id_ident.rs @@ -0,0 +1,102 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::tenant_key::ident::TIdent; + +pub type ProcedureId = DataId; +pub type ProcedureIdIdent = TIdent; +pub type ProcedureIdIdentRaw = TIdentRaw; + +pub use kvapi_impl::Resource; + +use crate::data_id::DataId; +use crate::tenant::ToTenant; +use crate::tenant_key::raw::TIdentRaw; + +impl ProcedureIdIdent { + pub fn new(tenant: impl ToTenant, procedure_id: u64) -> Self { + Self::new_generic(tenant, ProcedureId::new(procedure_id)) + } + + pub fn procedure_id(&self) -> ProcedureId { + *self.name() + } +} + +impl ProcedureIdIdentRaw { + pub fn procedure_name(&self) -> ProcedureId { + *self.name() + } +} + +mod kvapi_impl { + + use databend_common_meta_kvapi::kvapi; + + use crate::principal::procedure_id_ident::ProcedureIdIdent; + use crate::principal::ProcedureMeta; + use crate::tenant_key::resource::TenantResource; + + pub struct Resource; + impl TenantResource for Resource { + const PREFIX: &'static str = "__fd_procedure_by_id"; + const TYPE: &'static str = "ProcedureIdIdent"; + const HAS_TENANT: bool = false; + type ValueType = ProcedureMeta; + } + + impl kvapi::Value for ProcedureMeta { + type KeyType = ProcedureIdIdent; + + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [] + } + } + + // // Use these error types to replace usage of ErrorCode if possible. + // impl From> for ErrorCode { + // impl From> for ErrorCode { +} + +#[cfg(test)] +mod tests { + use databend_common_meta_kvapi::kvapi::Key; + + use super::ProcedureId; + use super::ProcedureIdIdent; + use crate::tenant::Tenant; + + #[test] + fn test_procedure_id_ident() { + let tenant = Tenant::new_literal("dummy"); + let ident = ProcedureIdIdent::new_generic(tenant, ProcedureId::new(3)); + + let key = ident.to_string_key(); + assert_eq!(key, "__fd_procedure_by_id/3"); + + assert_eq!(ident, ProcedureIdIdent::from_str_key(&key).unwrap()); + } + + #[test] + fn test_procedure_id_ident_with_key_space() { + // TODO(xp): implement this test + // let tenant = Tenant::new_literal("test"); + // let ident = ProcedureIdIdent::new(tenant, 3); + // + // let key = ident.to_string_key(); + // assert_eq!(key, "__fd_procedure_by_id/3"); + // + // assert_eq!(ident, ProcedureIdIdent::from_str_key(&key).unwrap()); + } +} diff --git a/src/meta/app/src/principal/procedure_id_to_name.rs b/src/meta/app/src/principal/procedure_id_to_name.rs new file mode 100644 index 000000000000..52a125731419 --- /dev/null +++ b/src/meta/app/src/principal/procedure_id_to_name.rs @@ -0,0 +1,96 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::tenant_key::ident::TIdent; +use crate::tenant_key::raw::TIdentRaw; + +pub type ProcedureIdToNameIdent = TIdent; +pub type ProcedureIdToNameIdentRaw = TIdentRaw; + +pub use kvapi_impl::Resource; + +use crate::principal::procedure_id_ident::ProcedureId; + +impl ProcedureIdToNameIdent { + pub fn procedure_id(&self) -> ProcedureId { + *self.name() + } +} + +impl ProcedureIdToNameIdentRaw { + pub fn procedure_id(&self) -> ProcedureId { + *self.name() + } +} + +mod kvapi_impl { + + use databend_common_meta_kvapi::kvapi; + + use crate::principal::procedure_id_to_name::ProcedureIdToNameIdent; + use crate::principal::ProcedureIdentity; + use crate::tenant_key::resource::TenantResource; + + // TODO(TIdent): parent should return Some(ProcedureIdIdent::new(self.procedure_id).to_string_key()) + pub struct Resource; + impl TenantResource for Resource { + const PREFIX: &'static str = "__fd_procedure_id_to_name"; + const TYPE: &'static str = "ProcedureIdToNameIdent"; + const HAS_TENANT: bool = false; + type ValueType = ProcedureIdentity; + } + + impl kvapi::Value for ProcedureIdentity { + type KeyType = ProcedureIdToNameIdent; + fn dependency_keys(&self, _key: &Self::KeyType) -> impl IntoIterator { + [] + } + } + + // // Use these error types to replace usage of ErrorCode if possible. + // impl From> for ErrorCode { + // impl From> for ErrorCode { +} + +#[cfg(test)] +mod tests { + use databend_common_meta_kvapi::kvapi::Key; + + use super::ProcedureIdToNameIdent; + use crate::principal::procedure_id_ident::ProcedureId; + use crate::tenant::Tenant; + + #[test] + fn test_procedure_id_ident() { + let tenant = Tenant::new_literal("dummy"); + let ident = ProcedureIdToNameIdent::new_generic(tenant, ProcedureId::new(3)); + + let key = ident.to_string_key(); + assert_eq!(key, "__fd_procedure_id_to_name/3"); + + assert_eq!(ident, ProcedureIdToNameIdent::from_str_key(&key).unwrap()); + } + + #[test] + fn test_procedure_id_ident_with_key_space() { + // TODO(xp): implement this test + // let tenant = Tenant::new_literal("test"); + // let ident = ProcedureIdToNameIdent::new(tenant, 3); + // + // let key = ident.to_string_key(); + // assert_eq!(key, "__fd_procedure_id_to_name/3"); + // + // assert_eq!(ident, ProcedureIdToNameIdent::from_str_key(&key).unwrap()); + } +} diff --git a/src/meta/app/src/principal/procedure_identity.rs b/src/meta/app/src/principal/procedure_identity.rs new file mode 100644 index 000000000000..2ce09629e670 --- /dev/null +++ b/src/meta/app/src/principal/procedure_identity.rs @@ -0,0 +1,69 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Display; +use std::fmt::Formatter; + +use databend_common_meta_kvapi::kvapi::KeyBuilder; +use databend_common_meta_kvapi::kvapi::KeyCodec; +use databend_common_meta_kvapi::kvapi::KeyError; +use databend_common_meta_kvapi::kvapi::KeyParser; + +/// Uniquely identifies a procedure with a name and a args vec(string). +#[derive(Clone, Debug, Eq, PartialEq, Hash, Default)] +pub struct ProcedureIdentity { + pub name: String, + pub args: String, +} + +impl ProcedureIdentity { + const ESCAPE_CHARS: [u8; 1] = [b'\'']; + + pub fn new(name: impl ToString, args: impl ToString) -> Self { + Self { + name: name.to_string(), + args: args.to_string(), + } + } +} + +impl Display for ProcedureIdentity { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}({})", + KeyBuilder::escape_specified(&self.name, &Self::ESCAPE_CHARS), + KeyBuilder::escape_specified(&self.args, &Self::ESCAPE_CHARS), + ) + } +} + +impl KeyCodec for ProcedureIdentity { + fn encode_key(&self, b: KeyBuilder) -> KeyBuilder { + b.push_str(&self.name).push_str(&self.args) + } + + fn decode_key(parser: &mut KeyParser) -> Result + where Self: Sized { + let name = parser.next_str()?; + let args = parser.next_str()?; + Ok(Self { name, args }) + } +} + +impl From for ProcedureIdentity { + fn from(procedure: databend_common_ast::ast::ProcedureIdentity) -> Self { + ProcedureIdentity::new(procedure.name, procedure.args_type) + } +} diff --git a/src/meta/app/src/principal/procedure_name_ident.rs b/src/meta/app/src/principal/procedure_name_ident.rs new file mode 100644 index 000000000000..d5b900b1b1c2 --- /dev/null +++ b/src/meta/app/src/principal/procedure_name_ident.rs @@ -0,0 +1,110 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::tenant_key::ident::TIdent; +pub type ProcedureNameIdent = TIdent; + +pub use kvapi_impl::ProcedureName; + +use crate::principal::procedure_identity::ProcedureIdentity; +use crate::tenant::ToTenant; + +impl ProcedureNameIdent { + pub fn new(tenant: impl ToTenant, name: ProcedureIdentity) -> Self { + Self::new_generic(tenant, name) + } + + pub fn new_procedure(tenant: impl ToTenant, name: impl ToString, args: impl ToString) -> Self { + Self::new(tenant, ProcedureIdentity::new(name, args)) + } + + pub fn procedure_name(&self) -> &ProcedureIdentity { + self.name() + } +} + +mod kvapi_impl { + use databend_common_meta_kvapi::kvapi; + use databend_common_meta_kvapi::kvapi::Key; + + use crate::principal::procedure_id_ident::ProcedureId; + use crate::principal::ProcedureNameIdent; + use crate::tenant_key::resource::TenantResource; + use crate::KeyWithTenant; + + pub struct ProcedureName; + impl TenantResource for ProcedureName { + const PREFIX: &'static str = "__fd_procedure"; + const HAS_TENANT: bool = true; + type ValueType = ProcedureId; + } + + impl kvapi::Value for ProcedureId { + type KeyType = ProcedureNameIdent; + fn dependency_keys(&self, key: &Self::KeyType) -> impl IntoIterator { + [self.into_t_ident(key.tenant()).to_string_key()] + } + } + + // // Use these error types to replace usage of ErrorCode if possible. + // impl From> for ErrorCode { + // impl From> for ErrorCode { +} + +#[cfg(test)] +mod tests { + use databend_common_meta_kvapi::kvapi::Key; + + use super::ProcedureNameIdent; + use crate::principal::ProcedureIdentity; + use crate::tenant::Tenant; + + fn test_format_parse(procedure: &str, args: &str, expect: &str) { + let tenant = Tenant::new_literal("test_tenant"); + let procedure_ident = ProcedureIdentity::new(procedure, args); + let tenant_procedure_ident = ProcedureNameIdent::new(tenant, procedure_ident); + + let key = tenant_procedure_ident.to_string_key(); + assert_eq!(key, expect, "'{procedure}' '{args}' '{expect}'"); + + let tenant_procedure_ident_parsed = ProcedureNameIdent::from_str_key(&key).unwrap(); + assert_eq!( + tenant_procedure_ident, tenant_procedure_ident_parsed, + "'{procedure}' '{args}' '{expect}'" + ); + } + + #[test] + fn test_tenant_procedure_ident_as_kvapi_key() { + test_format_parse("procedure", "", "__fd_procedure/test_tenant/procedure/"); + test_format_parse( + "procedure'", + "int,timestamp,string", + "__fd_procedure/test_tenant/procedure%27/int%2ctimestamp%2cstring", + ); + + // With correct encoding the following two pair should not be encoded into the same string. + + test_format_parse( + "p1'@'string", + "string", + "__fd_procedure/test_tenant/p1%27%40%27string/string", + ); + test_format_parse( + "p2", + "int'@'string", + "__fd_procedure/test_tenant/p2/int%27%40%27string", + ); + } +} diff --git a/src/meta/proto-conv/src/lib.rs b/src/meta/proto-conv/src/lib.rs index 0d7e9f42dd50..32103739da65 100644 --- a/src/meta/proto-conv/src/lib.rs +++ b/src/meta/proto-conv/src/lib.rs @@ -78,6 +78,7 @@ mod least_visible_time_from_to_protobuf_impl; mod lock_from_to_protobuf_impl; mod owner_from_to_protobuf_impl; mod ownership_from_to_protobuf_impl; +mod procedure_from_to_protobuf_impl; mod role_from_to_protobuf_impl; mod schema_from_to_protobuf_impl; mod sequence_from_to_protobuf_impl; diff --git a/src/meta/proto-conv/src/procedure_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/procedure_from_to_protobuf_impl.rs new file mode 100644 index 000000000000..4554585a0988 --- /dev/null +++ b/src/meta/proto-conv/src/procedure_from_to_protobuf_impl.rs @@ -0,0 +1,105 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This mod is the key point about compatibility. +//! Everytime update anything in this file, update the `VER` and let the tests pass. + +use chrono::DateTime; +use chrono::Utc; +use databend_common_expression::infer_schema_type; +use databend_common_expression::types::DataType; +use databend_common_expression::TableDataType; +use databend_common_meta_app as mt; +use databend_common_protos::pb; + +use crate::reader_check_msg; +use crate::FromToProto; +use crate::Incompatible; +use crate::MIN_READER_VER; +use crate::VER; + +impl FromToProto for mt::principal::ProcedureIdentity { + type PB = pb::ProcedureIdentity; + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + fn from_pb(p: pb::ProcedureIdentity) -> Result + where Self: Sized { + reader_check_msg(p.ver, p.min_reader_ver)?; + + Ok(mt::principal::ProcedureIdentity { + name: p.name.clone(), + args: p.args, + }) + } + + fn to_pb(&self) -> Result { + Ok(pb::ProcedureIdentity { + ver: VER, + min_reader_ver: MIN_READER_VER, + name: self.name.clone(), + args: self.args.clone(), + }) + } +} + +impl FromToProto for mt::principal::ProcedureMeta { + type PB = pb::ProcedureMeta; + fn get_pb_ver(p: &Self::PB) -> u64 { + p.ver + } + fn from_pb(p: pb::ProcedureMeta) -> Result { + reader_check_msg(p.ver, p.min_reader_ver)?; + + let mut return_types = Vec::with_capacity(p.return_types.len()); + for arg_type in p.return_types { + let arg_type = DataType::from(&TableDataType::from_pb(arg_type)?); + return_types.push(arg_type); + } + + let v = Self { + return_types, + created_on: DateTime::::from_pb(p.created_on)?, + updated_on: DateTime::::from_pb(p.updated_on)?, + script: p.script, + comment: p.comment, + procedure_language: p.language, + }; + Ok(v) + } + + fn to_pb(&self) -> Result { + let mut return_types = Vec::with_capacity(self.return_types.len()); + for arg_type in self.return_types.iter() { + let arg_type = infer_schema_type(arg_type) + .map_err(|e| Incompatible { + reason: format!("Convert DataType to TableDataType failed: {}", e.message()), + })? + .to_pb()?; + return_types.push(arg_type); + } + + let p = pb::ProcedureMeta { + ver: VER, + min_reader_ver: MIN_READER_VER, + return_types, + created_on: self.created_on.to_pb()?, + updated_on: self.updated_on.to_pb()?, + script: self.script.to_string(), + comment: self.comment.to_string(), + language: self.procedure_language.to_string(), + }; + Ok(p) + } +} diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index fdeefa56fd1d..ca7380806287 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -136,7 +136,8 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (104, "2024-08-02: Add: add share catalog into Catalog meta"), (105, "2024-08-05: Add: add Dictionary meta"), (106, "2024-08-08: Add: add QueryTokenInfo"), - (107, "2024-08-09: Add: datatype.proto/DataType Geography type") + (107, "2024-08-09: Add: datatype.proto/DataType Geography type"), + (108, "2024-08-29: Add: procedure.proto: ProcedureMeta and ProcedureIdentity") // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index 82b15aa973c1..0ef441368036 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -105,3 +105,4 @@ mod v102_user_must_change_password; mod v105_dictionary_meta; mod v106_query_token; mod v107_geography_datatype; +mod v108_procedure; diff --git a/src/meta/proto-conv/tests/it/v108_procedure.rs b/src/meta/proto-conv/tests/it/v108_procedure.rs new file mode 100644 index 000000000000..442397c9146a --- /dev/null +++ b/src/meta/proto-conv/tests/it/v108_procedure.rs @@ -0,0 +1,64 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use chrono::TimeZone; +use chrono::Utc; +use databend_common_expression::types::DataType; +use databend_common_meta_app::principal as mt; +use fastrace::func_name; + +use crate::common; + +#[test] +fn v108_procedure_meta() -> anyhow::Result<()> { + let procedure_meta_v108: Vec = vec![ + 34, 9, 146, 2, 0, 160, 6, 108, 168, 6, 24, 82, 23, 50, 48, 49, 52, 45, 49, 49, 45, 50, 56, + 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 90, 23, 50, 48, 49, 52, 45, 49, 49, 45, + 50, 57, 32, 49, 50, 58, 48, 48, 58, 48, 57, 32, 85, 84, 67, 98, 7, 102, 111, 111, 32, 98, + 97, 114, 114, 3, 83, 81, 76, 160, 6, 108, 168, 6, 24, + ]; + + let want = || mt::ProcedureMeta { + return_types: vec![DataType::String], + created_on: Utc.with_ymd_and_hms(2014, 11, 28, 12, 0, 9).unwrap(), + updated_on: Utc.with_ymd_and_hms(2014, 11, 29, 12, 0, 9).unwrap(), + script: "".to_string(), + comment: "foo bar".to_string(), + procedure_language: "SQL".to_string(), + }; + + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), procedure_meta_v108.as_slice(), 108, want()) +} + +#[test] +fn v108_procedure_identity() -> anyhow::Result<()> { + let procedure_identity_v108 = vec![ + 10, 2, 112, 49, 18, 10, 115, 116, 114, 105, 110, 103, 44, 105, 110, 116, 160, 6, 108, 168, + 6, 24, + ]; + + let want = || mt::ProcedureIdentity { + name: "p1".to_string(), + args: "string,int".to_string(), + }; + + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old( + func_name!(), + procedure_identity_v108.as_slice(), + 108, + want(), + ) +} diff --git a/src/meta/protos/proto/procedure.proto b/src/meta/protos/proto/procedure.proto new file mode 100644 index 000000000000..6824c88d18c9 --- /dev/null +++ b/src/meta/protos/proto/procedure.proto @@ -0,0 +1,53 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// The identifier of a database by name. Names can be changed. +// There is no guarantee that two get-database request by name will return the +// same instance. + +syntax = "proto3"; + +package databend_proto; + +import "datatype.proto"; + +message ProcedureIdentity { + uint64 ver = 100; + uint64 min_reader_ver = 101; + + string name = 1; + string args = 2; +} + +// ProcedureMeta is a container of all non-identity information. +message ProcedureMeta { + uint64 ver = 100; + uint64 min_reader_ver = 101; + + // Procedure return type + repeated DataType return_types = 4; + + // The time database created. + string created_on = 10; + + // The time database updated. + string updated_on = 11; + + // Comment about this database. + string comment = 12; + + // Comment about this database. + string script = 13; + string language = 14; +} diff --git a/src/query/ast/src/ast/statements/procedure.rs b/src/query/ast/src/ast/statements/procedure.rs index 62bca697eddb..1ebf8e82ab7a 100644 --- a/src/query/ast/src/ast/statements/procedure.rs +++ b/src/query/ast/src/ast/statements/procedure.rs @@ -18,6 +18,11 @@ use std::fmt::Formatter; use derive_visitor::Drive; use derive_visitor::DriveMut; +use crate::ast::write_comma_separated_list; +use crate::ast::write_comma_separated_string_list; +use crate::ast::CreateOption; +use crate::ast::TypeName; + #[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)] pub struct ExecuteImmediateStmt { pub script: String, @@ -29,3 +34,152 @@ impl Display for ExecuteImmediateStmt { Ok(()) } } + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct ProcedureType { + pub name: Option, + pub data_type: TypeName, +} + +impl Display for ProcedureType { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + if let Some(name) = &self.name { + write!(f, "{} {}", name, self.data_type) + } else { + write!(f, "{}", self.data_type) + } + } +} + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub enum ProcedureLanguage { + SQL, +} + +impl Display for ProcedureLanguage { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + ProcedureLanguage::SQL => write!(f, "LANGUAGE SQL "), + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)] +pub struct ProcedureIdentity { + pub name: String, + pub args_type: String, +} + +impl Display for ProcedureIdentity { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{}({})", &self.name, &self.args_type,) + } +} + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct CreateProcedureStmt { + pub create_option: CreateOption, + pub name: ProcedureIdentity, + pub language: ProcedureLanguage, + // TODO(eason): Now args is alwarys none, but maybe we also need to consider arg name? + pub args: Option>, + pub return_type: Vec, + pub comment: Option, + pub script: String, +} + +impl Display for CreateProcedureStmt { + // CREATE [ OR REPLACE ] PROCEDURE () + // RETURNS { }[ NOT NULL ] + // LANGUAGE SQL + // [ COMMENT = '' ] AS + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "CREATE ")?; + if let CreateOption::CreateOrReplace = self.create_option { + write!(f, "OR REPLACE ")?; + } + write!(f, "PROCEDURE {}", self.name.name)?; + if let Some(args) = &self.args { + if args.is_empty() { + write!(f, "() ")?; + } else { + write!(f, "(")?; + write_comma_separated_list(f, args.clone())?; + write!(f, ") ")?; + } + } else { + write!(f, "() ")?; + } + if self.return_type.len() == 1 { + if let Some(name) = &self.return_type[0].name { + write!( + f, + "RETURNS TABLE({} {}) ", + name, self.return_type[0].data_type + )?; + } else { + write!(f, "RETURNS {} ", self.return_type[0].data_type)?; + } + } else { + write!(f, "RETURNS TABLE(")?; + write_comma_separated_list(f, self.return_type.clone())?; + write!(f, ") ")?; + } + + write!(f, "{}", self.language)?; + if let Some(comment) = &self.comment { + write!(f, "COMMENT='{}' ", comment)?; + } + write!(f, "AS $$\n{}\n$$", self.script)?; + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct DropProcedureStmt { + pub name: ProcedureIdentity, +} + +impl Display for DropProcedureStmt { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "DROP PROCEDURE {}", self.name)?; + + Ok(()) + } +} +#[derive(Debug, Clone, PartialEq, Drive, DriveMut)] +pub struct DescProcedureStmt { + pub name: String, + pub args: Vec, +} + +impl Display for DescProcedureStmt { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "DESCRIBE PROCEDURE {}", self.name)?; + + if self.args.is_empty() { + write!(f, "() ")?; + } else { + write!(f, "(")?; + write_comma_separated_list(f, self.args.clone())?; + write!(f, ") ")?; + } + + Ok(()) + } +} + +#[derive(Debug, Clone, PartialEq, Eq, Drive, DriveMut)] +pub struct CallProcedureStmt { + pub name: String, + pub args: Vec, +} + +impl Display for CallProcedureStmt { + fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { + write!(f, "CALL PROCEDURE {}(", self.name)?; + write_comma_separated_string_list(f, self.args.clone())?; + write!(f, ")")?; + Ok(()) + } +} diff --git a/src/query/ast/src/ast/statements/statement.rs b/src/query/ast/src/ast/statements/statement.rs index 7b8aae78dcd6..050be0264e5f 100644 --- a/src/query/ast/src/ast/statements/statement.rs +++ b/src/query/ast/src/ast/statements/statement.rs @@ -309,6 +309,13 @@ pub enum Statement { // Stored procedures ExecuteImmediate(ExecuteImmediateStmt), + CreateProcedure(CreateProcedureStmt), + DropProcedure(DropProcedureStmt), + ShowProcedures { + show_options: Option, + }, + DescProcedure(DescProcedureStmt), + CallProcedure(CallProcedureStmt), // Sequence CreateSequence(CreateSequenceStmt), @@ -769,6 +776,15 @@ impl Display for Statement { Statement::DropNotification(stmt) => write!(f, "{stmt}")?, Statement::DescribeNotification(stmt) => write!(f, "{stmt}")?, Statement::ExecuteImmediate(stmt) => write!(f, "{stmt}")?, + Statement::CreateProcedure(stmt) => write!(f, "{stmt}")?, + Statement::DropProcedure(stmt) => write!(f, "{stmt}")?, + Statement::DescProcedure(stmt) => write!(f, "{stmt}")?, + Statement::ShowProcedures { show_options } => { + write!(f, "SHOW PROCEDURES")?; + if let Some(show_options) = show_options { + write!(f, " {show_options}")?; + } + } Statement::CreateSequence(stmt) => write!(f, "{stmt}")?, Statement::DropSequence(stmt) => write!(f, "{stmt}")?, Statement::CreateDynamicTable(stmt) => write!(f, "{stmt}")?, @@ -781,6 +797,7 @@ impl Display for Statement { write!(f, " '{object_id}'")?; } Statement::System(stmt) => write!(f, "{stmt}")?, + Statement::CallProcedure(stmt) => write!(f, "{stmt}")?, } Ok(()) } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index a63bb1993b2f..be3b9a70eac9 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -2012,6 +2012,157 @@ pub fn statement_body(i: Input) -> IResult { |(_, action)| Statement::System(SystemStmt { action }), ); + pub fn procedure_type(i: Input) -> IResult { + map(rule! { #ident ~ #type_name }, |(name, data_type)| { + ProcedureType { + name: Some(name.to_string()), + data_type, + } + })(i) + } + + fn procedure_return(i: Input) -> IResult> { + let procedure_table_return = map( + rule! { + TABLE ~ "(" ~ #comma_separated_list1(procedure_type) ~ ")" + }, + |(_, _, test, _)| test, + ); + let procedure_single_return = map(rule! { #type_name }, |data_type| { + vec![ProcedureType { + name: None, + data_type, + }] + }); + rule!(#procedure_single_return: "" + | #procedure_table_return: "TABLE( , ...)")(i) + } + + fn procedure_arg(i: Input) -> IResult>> { + let procedure_args = map( + rule! { + "(" ~ #comma_separated_list1(procedure_type) ~ ")" + }, + |(_, args, _)| Some(args), + ); + let procedure_empty_args = map( + rule! { + "(" ~ ")" + }, + |(_, _)| None, + ); + rule!(#procedure_empty_args: "()" + | #procedure_args: "( , ...)")(i) + } + + // CREATE [ OR REPLACE ] PROCEDURE () + // RETURNS { }[ NOT NULL ] + // LANGUAGE SQL + // [ COMMENT = '' ] AS + let create_procedure = map_res( + rule! { + CREATE ~ ( OR ~ ^REPLACE )? ~ PROCEDURE ~ #ident ~ #procedure_arg ~ RETURNS ~ #procedure_return ~ LANGUAGE ~ SQL ~ (COMMENT ~ "=" ~ #literal_string)? ~ AS ~ #code_string + }, + |(_, opt_or_replace, _, name, args, _, return_type, _, _, opt_comment, _, script)| { + let create_option = parse_create_option(opt_or_replace.is_some(), false)?; + + let name = ProcedureIdentity { + name: name.to_string(), + args_type: if let Some(args) = &args { + args.iter() + .map(|arg| arg.data_type.to_string()) + .collect::>() + .join(",") + } else { + "".to_string() + }, + }; + let stmt = CreateProcedureStmt { + create_option, + name, + args, + return_type, + language: ProcedureLanguage::SQL, + comment: match opt_comment { + Some(opt) => Some(opt.2), + None => None, + }, + script, + }; + Ok(Statement::CreateProcedure(stmt)) + }, + ); + + let show_procedures = map( + rule! { + SHOW ~ PROCEDURES ~ #show_options? + }, + |(_, _, show_options)| Statement::ShowProcedures { show_options }, + ); + + fn procedure_type_name(i: Input) -> IResult> { + let procedure_type_names = map( + rule! { + "(" ~ #comma_separated_list1(type_name) ~ ")" + }, + |(_, args, _)| args, + ); + let procedure_empty_types = map( + rule! { + "(" ~ ")" + }, + |(_, _)| vec![], + ); + rule!(#procedure_empty_types: "()" + | #procedure_type_names: "(, ...)")(i) + } + + let call_procedure = map( + rule! { + CALL ~ PROCEDURE ~ #ident ~ "(" ~ ")" + }, + |(_, _, name, _, _)| { + Statement::CallProcedure(CallProcedureStmt { + name: name.to_string(), + args: vec![], + }) + }, + ); + + let drop_procedure = map( + rule! { + DROP ~ PROCEDURE ~ #ident ~ #procedure_type_name + }, + |(_, _, name, args)| { + Statement::DropProcedure(DropProcedureStmt { + name: ProcedureIdentity { + name: name.to_string(), + args_type: if args.is_empty() { + "".to_string() + } else { + args.iter() + .map(|arg| arg.to_string()) + .collect::>() + .join(",") + }, + }, + }) + }, + ); + + let describe_procedure = map( + rule! { + ( DESC | DESCRIBE ) ~ PROCEDURE ~ #ident ~ #procedure_type_name + }, + |(_, _, name, args)| { + // TODO: modify to ProcedureIdentify + Statement::DescProcedure(DescProcedureStmt { + name: name.to_string(), + args, + }) + }, + ); + alt(( // query, explain,show rule!( @@ -2215,6 +2366,11 @@ AS | #desc_connection: "`DESC | DESCRIBE CONNECTION `" | #show_connections: "`SHOW CONNECTIONS`" | #execute_immediate : "`EXECUTE IMMEDIATE $$