Skip to content

Commit

Permalink
feat(query): Procedure (Part1) (#16348)
Browse files Browse the repository at this point in the history
* feat(query): support procedure

1. support parse

* Part2: support interpreter

* refactor procedure mgr: use DataId

* remove ProcedureIdList, refactor procedure_mgr

* fix ut

* remove serde::Serialize

* fix conflict

* add ProcedureIdentity

* eaiser ProcedureIdentity en/decode

* chore: fix list_dictionaries

* fix conflicts
  • Loading branch information
TCeason authored Sep 5, 2024
1 parent 015778c commit 8b19e5e
Show file tree
Hide file tree
Showing 47 changed files with 2,446 additions and 7 deletions.
5 changes: 5 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ build_exceptions! {
UnsupportedDictionarySource(3117),
MissingDictionaryOption(3118),
WrongDictionaryFieldExpr(3119),

// Procedure
UnknownProcedure(3130),
ProcedureAlreadyExists(3131),
IllegalProcedureFormat(3132),
}

// Storage errors [3001, 4000].
Expand Down
2 changes: 1 addition & 1 deletion src/meta/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ mod schema_api_test_suite;
mod sequence_api;
pub(crate) mod testing;
pub mod txn_backoff;
pub(crate) mod util;
pub mod util;

pub mod crud;
mod sequence_api_impl;
Expand Down
2 changes: 1 addition & 1 deletion src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3689,7 +3689,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
req.tenant.clone(),
DictionaryIdentity::new(req.db_id, "dummy".to_string()),
);
let dir = DirName::new(dictionary_ident);
let dir = DirName::new_with_level(dictionary_ident, 2);

let name_id_values = self.list_id_value(&dir).await?;
Ok(name_id_values
Expand Down
15 changes: 15 additions & 0 deletions src/meta/app/src/app_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use databend_common_meta_types::MatchSeq;

use crate::background::job_ident;
use crate::data_mask::data_mask_name_ident;
use crate::principal::procedure_name_ident;
use crate::principal::ProcedureIdentity;
use crate::schema::catalog_name_ident;
use crate::schema::dictionary_name_ident;
use crate::schema::index_name_ident;
Expand Down Expand Up @@ -1170,6 +1172,15 @@ pub enum AppError {
UnknownDictionary(
#[from] UnknownError<dictionary_name_ident::DictionaryNameRsc, DictionaryIdentity>,
),

// Procedure
#[error(transparent)]
ProcedureAlreadyExists(
#[from] ExistError<procedure_name_ident::ProcedureName, ProcedureIdentity>,
),

#[error(transparent)]
UnknownProcedure(#[from] UnknownError<procedure_name_ident::ProcedureName, ProcedureIdentity>),
}

impl AppError {
Expand Down Expand Up @@ -1699,6 +1710,10 @@ impl From<AppError> for ErrorCode {
ErrorCode::DictionaryAlreadyExists(err.message())
}
AppError::UnknownDictionary(err) => ErrorCode::UnknownDictionary(err.message()),
AppError::UnknownProcedure(err) => ErrorCode::UnknownProcedure(err.message()),
AppError::ProcedureAlreadyExists(err) => {
ErrorCode::ProcedureAlreadyExists(err.message())
}
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions src/meta/app/src/id_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub(crate) const ID_GEN_SHARE_ENDPOINT: &str = "share_endpoint_id";
pub(crate) const ID_GEN_DATA_MASK: &str = "data_mask";
pub(crate) const ID_GEN_BACKGROUND_JOB: &str = "background_job";

pub(crate) const ID_GEN_PROCEDURE: &str = "procedure_id";

/// Key for resource id generator
///
/// This is a special key for an application to generate unique id with kvapi::KVApi.
Expand Down Expand Up @@ -111,6 +113,13 @@ impl IdGenerator {
resource: ID_GEN_CATALOG.to_string(),
}
}

/// Create a key for generating procedure id with kvapi::KVApi
pub fn procedure_id() -> Self {
Self {
resource: ID_GEN_PROCEDURE.to_string(),
}
}
}

impl kvapi::KeyCodec for IdGenerator {
Expand Down Expand Up @@ -225,6 +234,16 @@ mod t {
assert_eq!(g1, g2);
}

// Procedure id generator
{
let g = IdGenerator::procedure_id();
let k = g.to_string_key();
assert_eq!("__fd_id_gen/procedure_id", k);

let t2 = IdGenerator::from_str_key(&k)?;
assert_eq!(g, t2);
}

Ok(())
}

Expand Down
20 changes: 20 additions & 0 deletions src/meta/app/src/principal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ pub mod client_session_ident;
pub mod connection_ident;
pub mod network_policy_ident;
pub mod password_policy_ident;
pub mod procedure;
pub mod procedure_id_ident;
pub mod procedure_id_to_name;
pub mod procedure_identity;
pub mod procedure_name_ident;
pub mod stage_file_ident;
pub mod tenant_ownership_object_ident;
pub mod tenant_user_ident;
Expand All @@ -60,6 +65,21 @@ pub use ownership_object::OwnershipObject;
pub use password_policy::PasswordPolicy;
pub use password_policy_ident::PasswordPolicyIdent;
pub use principal_identity::PrincipalIdentity;
pub use procedure::CreateProcedureReply;
pub use procedure::CreateProcedureReq;
pub use procedure::DropProcedureReply;
pub use procedure::DropProcedureReq;
pub use procedure::GetProcedureReply;
pub use procedure::GetProcedureReq;
pub use procedure::ListProcedureReq;
pub use procedure::ProcedureInfoFilter;
pub use procedure::ProcedureMeta;
pub use procedure::RenameProcedureReply;
pub use procedure::RenameProcedureReq;
pub use procedure_id_ident::ProcedureId;
pub use procedure_id_to_name::ProcedureIdToNameIdent;
pub use procedure_identity::ProcedureIdentity;
pub use procedure_name_ident::ProcedureNameIdent;
pub use role_ident::RoleIdent;
pub use role_ident::RoleIdentRaw;
pub use role_info::RoleInfo;
Expand Down
196 changes: 196 additions & 0 deletions src/meta/app/src/principal/procedure.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt;
use std::fmt::Display;
use std::fmt::Formatter;
use std::ops::Deref;

use chrono::DateTime;
use chrono::Utc;
use databend_common_expression::types::DataType;

use crate::principal::procedure_id_ident::ProcedureIdIdent;
use crate::principal::procedure_name_ident::ProcedureNameIdent;
use crate::principal::ProcedureIdentity;
use crate::schema::CreateOption;
use crate::tenant::Tenant;
use crate::tenant::ToTenant;
use crate::KeyWithTenant;

#[derive(Clone, Debug, PartialEq)]
pub struct ProcedureInfo {
pub ident: ProcedureIdIdent,
pub name_ident: ProcedureNameIdent,
pub meta: ProcedureMeta,
}

#[derive(Clone, Debug, Default, Eq, PartialEq)]
pub struct ProcedureIdent {
pub procedure_id: u64,
pub seq: u64,
}

#[derive(Clone, Debug, PartialEq)]
pub struct ProcedureMeta {
pub return_types: Vec<DataType>,
pub created_on: DateTime<Utc>,
pub updated_on: DateTime<Utc>,
pub script: String,
pub comment: String,
pub procedure_language: String,
}

impl Default for ProcedureMeta {
fn default() -> Self {
ProcedureMeta {
return_types: vec![],
created_on: Utc::now(),
updated_on: Utc::now(),
script: "".to_string(),
comment: "".to_string(),
procedure_language: "SQL".to_string(),
}
}
}

impl Display for ProcedureMeta {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(
f,
"Lanuage: {:?}, return_type: {:?}, CreatedOn: {:?}, Script: {:?}, Comment: {:?}",
self.procedure_language, self.return_types, self.created_on, self.script, self.comment
)
}
}

#[derive(Clone, Debug, PartialEq)]
pub struct CreateProcedureReq {
pub create_option: CreateOption,
pub name_ident: ProcedureNameIdent,
pub meta: ProcedureMeta,
}

impl Display for CreateProcedureReq {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
let typ = match self.create_option {
CreateOption::Create => "create_procedure",
CreateOption::CreateIfNotExists => "create_procedure_if_not_exists",
CreateOption::CreateOrReplace => "create_or_replace_procedure",
};
write!(
f,
"{}:{}/{}={:?}",
typ,
self.name_ident.tenant_name(),
self.name_ident.procedure_name(),
self.meta
)
}
}

#[derive(Clone, Debug, Eq, PartialEq)]
pub struct CreateProcedureReply {
pub procedure_id: u64,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RenameProcedureReq {
pub if_exists: bool,
pub name_ident: ProcedureNameIdent,
pub new_procedure_name: String,
}

impl Display for RenameProcedureReq {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"rename_procedure:{}/{}=>{}",
self.name_ident.tenant_name(),
self.name_ident.procedure_name(),
self.new_procedure_name
)
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RenameProcedureReply {}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DropProcedureReq {
pub if_exists: bool,
pub name_ident: ProcedureNameIdent,
}

impl Display for DropProcedureReq {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"drop_procedure(if_exists={}):{}/{}",
self.if_exists,
self.name_ident.tenant_name(),
self.name_ident.procedure_name(),
)
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DropProcedureReply {
pub procedure_id: u64,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct GetProcedureReq {
pub inner: ProcedureNameIdent,
}

impl Deref for GetProcedureReq {
type Target = ProcedureNameIdent;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

impl GetProcedureReq {
pub fn new(tenant: impl ToTenant, procedure_name: ProcedureIdentity) -> GetProcedureReq {
GetProcedureReq {
inner: ProcedureNameIdent::new(tenant, procedure_name),
}
}
}

#[derive(Clone, Debug, PartialEq)]
pub struct GetProcedureReply {
pub id: u64,
pub procedure_meta: ProcedureMeta,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ProcedureInfoFilter {
// include all dropped procedures
IncludeDropped,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct ListProcedureReq {
pub tenant: Tenant,
pub filter: Option<ProcedureInfoFilter>,
}

impl ListProcedureReq {
pub fn tenant(&self) -> &Tenant {
&self.tenant
}
}
Loading

0 comments on commit 8b19e5e

Please sign in to comment.