diff --git a/crates/catalog/src/session_catalog.rs b/crates/catalog/src/session_catalog.rs index a05737e52..226260adb 100644 --- a/crates/catalog/src/session_catalog.rs +++ b/crates/catalog/src/session_catalog.rs @@ -446,6 +446,8 @@ impl TempCatalog { builtin: false, external: false, is_temp: true, + sql_example: None, + description: None, }, options: TableOptions::Internal(TableOptionsInternal { columns }), tunnel_id: None, @@ -487,6 +489,8 @@ impl TempCatalog { builtin: false, external: false, is_temp: true, + sql_example: None, + description: None, }, options: TableOptions::Internal(TableOptionsInternal { columns: Vec::new(), diff --git a/crates/datafusion_ext/src/functions.rs b/crates/datafusion_ext/src/functions.rs index 84c03ba11..7009b3b2a 100644 --- a/crates/datafusion_ext/src/functions.rs +++ b/crates/datafusion_ext/src/functions.rs @@ -1,53 +1,20 @@ -use std::collections::HashMap; -use std::fmt; -use std::fmt::Display; -use std::sync::Arc; +use std::fmt::{self, Display}; use crate::errors::{ExtensionError, Result}; use crate::vars::SessionVars; use async_trait::async_trait; use catalog::session_catalog::SessionCatalog; use datafusion::arrow::datatypes::{Field, Fields}; -use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; -use datafusion::logical_expr::Signature; use datafusion::prelude::SessionContext; use datafusion::scalar::ScalarValue; use decimal::Decimal128; -use protogen::metastore::types::catalog::{EntryType, RuntimePreference}; +use protogen::metastore::types::catalog::EntryType; use protogen::rpcsrv::types::func_param_value::{ FuncParamValue as ProtoFuncParamValue, FuncParamValueArrayVariant, FuncParamValueEnum as ProtoFuncParamValueEnum, }; -#[async_trait] -pub trait TableFunc: Sync + Send { - /// The name for this table function. This name will be used when looking up - /// function implementations. - fn name(&self) -> &str; - fn runtime_preference(&self) -> RuntimePreference; - fn detect_runtime( - &self, - _args: &[FuncParamValue], - _parent: RuntimePreference, - ) -> Result { - Ok(self.runtime_preference()) - } - - /// Return a table provider using the provided args. - async fn create_provider( - &self, - ctx: &dyn TableFuncContextProvider, - args: Vec, - opts: HashMap, - ) -> Result>; - /// Return the signature for this function. - /// Defaults to None. - // TODO: Remove the default impl once we have `signature` implemented for all functions - fn signature(&self) -> Option { - None - } -} pub trait TableFuncContextProvider: Sync + Send { /// Get a reference to the session catalog. fn get_session_catalog(&self) -> &SessionCatalog; diff --git a/crates/datasources/src/native/access.rs b/crates/datasources/src/native/access.rs index c2bcfd037..892ef6ff8 100644 --- a/crates/datasources/src/native/access.rs +++ b/crates/datasources/src/native/access.rs @@ -363,6 +363,8 @@ mod tests { builtin: false, external: false, is_temp: false, + description: None, + sql_example: None, }, options: TableOptions::Internal(TableOptionsInternal { columns: vec![InternalColumnDefinition { diff --git a/crates/metastore/src/database.rs b/crates/metastore/src/database.rs index 5d1f1409f..b867bbcc1 100644 --- a/crates/metastore/src/database.rs +++ b/crates/metastore/src/database.rs @@ -5,8 +5,7 @@ use once_cell::sync::Lazy; use pgrepr::oid::FIRST_AVAILABLE_ID; use protogen::metastore::types::catalog::{ CatalogEntry, CatalogState, CredentialsEntry, DatabaseEntry, DeploymentMetadata, EntryMeta, - EntryType, FunctionEntry, FunctionType, RuntimePreference, SchemaEntry, SourceAccessMode, - TableEntry, TunnelEntry, ViewEntry, + EntryType, SchemaEntry, SourceAccessMode, TableEntry, TunnelEntry, ViewEntry, }; use protogen::metastore::types::options::{ DatabaseOptions, DatabaseOptionsInternal, TableOptions, TunnelOptions, @@ -17,7 +16,7 @@ use sqlbuiltins::builtins::{ BuiltinDatabase, BuiltinSchema, BuiltinTable, BuiltinView, DATABASE_DEFAULT, DEFAULT_SCHEMA, FIRST_NON_SCHEMA_ID, }; -use sqlbuiltins::functions::{BUILTIN_AGGREGATE_FUNCS, BUILTIN_SCALAR_FUNCS, BUILTIN_TABLE_FUNCS}; +use sqlbuiltins::functions::{BUILTIN_FUNCS, BUILTIN_TABLE_FUNCS}; use sqlbuiltins::validation::{ validate_database_tunnel_support, validate_object_name, validate_table_tunnel_support, }; @@ -670,6 +669,8 @@ impl State { builtin: false, external: true, is_temp: false, + sql_example: None, + description: None, }, options: create_database.options, tunnel_id, @@ -700,6 +701,8 @@ impl State { builtin: false, external: false, is_temp: false, + sql_example: None, + description: None, }, options: create_tunnel.options, }; @@ -735,6 +738,8 @@ impl State { builtin: false, external: false, is_temp: false, + sql_example: None, + description: None, }, options: create_credentials.options, comment: create_credentials.comment, @@ -766,6 +771,8 @@ impl State { builtin: false, external: false, is_temp: false, + sql_example: None, + description: None, }, options: create_credential.options, comment: create_credential.comment, @@ -798,6 +805,8 @@ impl State { builtin: false, external: false, is_temp: false, + sql_example: None, + description: None, }, }; self.entries.insert(oid, CatalogEntry::Schema(ent))?; @@ -821,6 +830,8 @@ impl State { builtin: false, external: false, is_temp: false, + sql_example: None, + description: None, }, sql: create_view.sql, columns: create_view.columns, @@ -851,6 +862,8 @@ impl State { builtin: false, external: false, is_temp: false, + sql_example: None, + description: None, }, options: TableOptions::Internal(create_table.options), tunnel_id: None, @@ -892,6 +905,8 @@ impl State { builtin: false, external: true, is_temp: false, + sql_example: None, + description: None, }, options: create_ext.options, tunnel_id, @@ -1170,6 +1185,8 @@ impl BuiltinCatalog { builtin: true, external: false, is_temp: false, + sql_example: None, + description: None, }, options: DatabaseOptions::Internal(DatabaseOptionsInternal {}), tunnel_id: None, @@ -1192,6 +1209,8 @@ impl BuiltinCatalog { builtin: true, external: false, is_temp: false, + sql_example: None, + description: None, }, }), ); @@ -1215,6 +1234,8 @@ impl BuiltinCatalog { builtin: true, external: false, is_temp: false, + sql_example: None, + description: None, }, options: TableOptions::new_internal(table.columns.clone()), tunnel_id: None, @@ -1245,6 +1266,8 @@ impl BuiltinCatalog { builtin: true, external: false, is_temp: false, + sql_example: None, + description: None, }, sql: view.sql.to_string(), columns: Vec::new(), @@ -1264,24 +1287,10 @@ impl BuiltinCatalog { let schema_id = schema_names .get(DEFAULT_SCHEMA) .ok_or_else(|| MetastoreError::MissingNamedSchema(DEFAULT_SCHEMA.to_string()))?; + let mut entry = func.as_function_entry(oid, *schema_id); + entry.runtime_preference = func.runtime_preference(); - entries.insert( - oid, - CatalogEntry::Function(FunctionEntry { - meta: EntryMeta { - entry_type: EntryType::Function, - id: oid, - parent: *schema_id, - name: func.name().to_string(), - builtin: true, - external: false, - is_temp: false, - }, - func_type: FunctionType::TableReturning, - runtime_preference: func.runtime_preference(), - signature: func.signature(), - }), - ); + entries.insert(oid, CatalogEntry::Function(entry)); schema_objects .get_mut(schema_id) .unwrap() @@ -1290,39 +1299,7 @@ impl BuiltinCatalog { oid += 1; } - - for func in BUILTIN_SCALAR_FUNCS.iter_funcs() { - // Put them all in the default schema. - let schema_id = schema_names - .get(DEFAULT_SCHEMA) - .ok_or_else(|| MetastoreError::MissingNamedSchema(DEFAULT_SCHEMA.to_string()))?; - - entries.insert( - oid, - CatalogEntry::Function(FunctionEntry { - meta: EntryMeta { - entry_type: EntryType::Function, - id: oid, - parent: *schema_id, - name: func.to_string(), - builtin: true, - external: false, - is_temp: false, - }, - func_type: FunctionType::Scalar, - runtime_preference: RuntimePreference::Unspecified, - signature: Some(func.signature()), - }), - ); - schema_objects - .get_mut(schema_id) - .unwrap() - .functions - .insert(func.to_string(), oid); - - oid += 1; - } - for func in BUILTIN_AGGREGATE_FUNCS.iter_funcs() { + for func in BUILTIN_FUNCS.iter_funcs() { // Put them all in the default schema. let schema_id = schema_names .get(DEFAULT_SCHEMA) @@ -1330,26 +1307,13 @@ impl BuiltinCatalog { entries.insert( oid, - CatalogEntry::Function(FunctionEntry { - meta: EntryMeta { - entry_type: EntryType::Function, - id: oid, - parent: *schema_id, - name: func.to_string(), - builtin: true, - external: false, - is_temp: false, - }, - func_type: FunctionType::Aggregate, - runtime_preference: RuntimePreference::Unspecified, - signature: Some(func.signature()), - }), + CatalogEntry::Function(func.as_function_entry(oid, *schema_id)), ); schema_objects .get_mut(schema_id) .unwrap() .functions - .insert(func.to_string().to_ascii_uppercase(), oid); + .insert(func.name().to_string(), oid); oid += 1; } diff --git a/crates/protogen/proto/metastore/catalog.proto b/crates/protogen/proto/metastore/catalog.proto index 168fd04df..921006e53 100644 --- a/crates/protogen/proto/metastore/catalog.proto +++ b/crates/protogen/proto/metastore/catalog.proto @@ -125,7 +125,11 @@ message EntryMeta { // Temp objects should never be persisted. bool is_temp = 7; - // next: 8 + // Optional sql example string. + optional string sql_example = 8; + // Optional description string + optional string description = 9; + // next: 10 } // Defines what kind of access is allowed on the data source. diff --git a/crates/protogen/src/metastore/types/catalog.rs b/crates/protogen/src/metastore/types/catalog.rs index 74c21ce7a..0d5468088 100644 --- a/crates/protogen/src/metastore/types/catalog.rs +++ b/crates/protogen/src/metastore/types/catalog.rs @@ -266,6 +266,8 @@ pub struct EntryMeta { pub builtin: bool, pub external: bool, pub is_temp: bool, + pub sql_example: Option, + pub description: Option, } impl From for catalog::EntryMeta { @@ -279,6 +281,8 @@ impl From for catalog::EntryMeta { builtin: value.builtin, external: value.external, is_temp: value.is_temp, + sql_example: value.sql_example, + description: value.description, } } } @@ -294,6 +298,8 @@ impl TryFrom for EntryMeta { builtin: value.builtin, external: value.external, is_temp: value.is_temp, + sql_example: value.sql_example, + description: value.description, }) } } diff --git a/crates/sqlbuiltins/src/builtins.rs b/crates/sqlbuiltins/src/builtins.rs index c59106a53..81f0327ce 100644 --- a/crates/sqlbuiltins/src/builtins.rs +++ b/crates/sqlbuiltins/src/builtins.rs @@ -13,11 +13,20 @@ //! database node will be able to see it, but will not be able to execute //! appropriately. We can revisit this if this isn't acceptable long-term. -use datafusion::arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema}; +use async_trait::async_trait; +use datafusion::{ + arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema}, + datasource::TableProvider, + logical_expr::Signature, +}; +use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; use once_cell::sync::Lazy; use pgrepr::oid::FIRST_GLAREDB_BUILTIN_ID; -use protogen::metastore::types::options::InternalColumnDefinition; -use std::sync::Arc; +use protogen::metastore::types::{ + catalog::{EntryMeta, EntryType, FunctionEntry, FunctionType, RuntimePreference}, + options::InternalColumnDefinition, +}; +use std::{collections::HashMap, sync::Arc}; /// The default catalog that exists in all GlareDB databases. pub const DEFAULT_CATALOG: &str = "default"; @@ -171,6 +180,8 @@ pub static GLARE_FUNCTIONS: Lazy = Lazy::new(|| BuiltinTable { false, ), ("builtin", DataType::Boolean, false), + ("example", DataType::Utf8, true), + ("description", DataType::Utf8, true), ]), }); @@ -601,6 +612,108 @@ impl BuiltinView { } } +#[async_trait] +/// A builtin table function. +/// Table functions are ones that are used in the FROM clause. +/// e.g. `SELECT * FROM my_table_func(...)` +pub trait TableFunc: Sync + Send + BuiltinFunction { + fn runtime_preference(&self) -> RuntimePreference; + fn detect_runtime( + &self, + _args: &[FuncParamValue], + _parent: RuntimePreference, + ) -> datafusion_ext::errors::Result { + Ok(self.runtime_preference()) + } + + /// Return a table provider using the provided args. + async fn create_provider( + &self, + ctx: &dyn TableFuncContextProvider, + args: Vec, + opts: HashMap, + ) -> datafusion_ext::errors::Result>; +} + +/// The same as `BuiltinFunction` , but with const values. +pub trait ConstBuiltinFunction: Sync + Send { + const NAME: &'static str; + const DESCRIPTION: &'static str; + const EXAMPLE: &'static str; + const FUNCTION_TYPE: FunctionType; + fn signature(&self) -> Option { + None + } +} + +impl BuiltinFunction for T +where + T: ConstBuiltinFunction, +{ + fn name(&self) -> &'static str { + Self::NAME + } + fn sql_example(&self) -> Option { + Some(Self::EXAMPLE.to_string()) + } + fn description(&self) -> Option { + Some(Self::DESCRIPTION.to_string()) + } + fn function_type(&self) -> FunctionType { + Self::FUNCTION_TYPE + } + fn signature(&self) -> Option { + self.signature() + } +} +/// A builtin function. +/// This trait is implemented by all builtin functions. +pub trait BuiltinFunction: Sync + Send { + /// The name for this function. This name will be used when looking up + /// function implementations. + fn name(&self) -> &'static str; + /// Return the signature for this function. + /// Defaults to None. + // TODO: Remove the default impl once we have `signature` implemented for all functions + fn signature(&self) -> Option { + None + } + /// Return a sql example for this function. + /// Defaults to None. + fn sql_example(&self) -> Option { + None + } + /// Return a description for this function. + /// Defaults to None. + fn description(&self) -> Option { + None + } + // Returns the function type. 'aggregate', 'scalar', or 'table' + fn function_type(&self) -> FunctionType; + + // convert to a builtin `FunctionEntry` + fn as_function_entry(&self, id: u32, parent: u32) -> FunctionEntry { + let meta = EntryMeta { + entry_type: EntryType::Function, + id, + parent, + name: self.name().to_string(), + builtin: true, + external: false, + is_temp: false, + sql_example: self.sql_example(), + description: self.description(), + }; + + FunctionEntry { + meta, + func_type: self.function_type(), + runtime_preference: RuntimePreference::Unspecified, + signature: self.signature(), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/crates/sqlbuiltins/src/functions/aggregates.rs b/crates/sqlbuiltins/src/functions/aggregates.rs new file mode 100644 index 000000000..5e14a1159 --- /dev/null +++ b/crates/sqlbuiltins/src/functions/aggregates.rs @@ -0,0 +1,337 @@ +// we make use of the document! macro to generate the documentation for the builtin functions. +// specifically the `stringify!` macro is used to get the name of the function. +// `Abs` would otherwise be `Abs` instead of `abs`. and so on. +#![allow(non_camel_case_types)] + +use crate::{builtins::BuiltinFunction, document}; +use datafusion::logical_expr::AggregateFunction; +use protogen::metastore::types::catalog::FunctionType; + +document! { + doc => "Gives the approximate count of distinct elements using HyperLogLog", + example => "approx_distinct(a)", + name => approx_distinct +} + +document! { + doc => "Gives the approximate median of a column", + example => "approx_median(a)", + name => approx_median +} + +document! { + doc => "Gives the approximate percentile of a column", + example => "approx_percentile_cont(a)", + name => approx_percentile_cont +} + +document! { + doc => "Gives the approximate percentile of a column with a weight column", + example => "approx_percentile_cont_with_weight(a)", + name => approx_percentile_cont_with_weight +} + +document! { + doc => "Returns a list containing all the values of a column", + example => "array_agg(a)", + name => array_agg +} +document! { + doc => "Returns the average of a column", + example => "avg(a)", + name => avg +} +document! { + doc => "Returns the bitwise AND of a column", + example => "bit_and(a)", + name => bit_and +} +document! { + doc => "Returns the bitwise OR of a column", + example => "bit_or(a)", + name => bit_or +} +document! { + doc => "Returns the bitwise XOR of a column", + example => "bit_xor(a)", + name => bit_xor +} +document!( + doc => "Returns the boolean AND of a column", + example => "bool_and(a)", + name => bool_and +); +document! { + doc => "Returns the boolean OR of a column", + example => "bool_or(a)", + name => bool_or +} + +document! { + doc => "Returns the correlation coefficient of two columns", + example => "correlation(x, y)", + name => correlation +} +document! { + doc => "Returns the number of rows in a column", + example => "count(a)", + name => count +} +document! { + doc => "Returns the covariance of two columns", + example => "covariance(x, y)", + name => covariance +} +document! { + doc => "Returns the population covariance of two columns", + example => "covariance_pop(x, y)", + name => covariance_pop +} +document! { + doc => "Returns the first value in a column", + example => "first_value(a)", + name => first_value +} +document! { + doc => "Returns 1 if a column is aggregated, 0 otherwise", + example => "grouping(a)", + name => grouping +} +document! { + doc => "Returns the last value in a column", + example => "last_value(a)", + name => last_value +} +document! { + doc => "Returns the maximum value in a column", + example => "max(a)", + name => max +} +document! { + doc => "Returns the median value in a column", + example => "median(a)", + name => median +} +document! { + doc => "Returns the minimum value in a column", + example => "min(a)", + name => min +} +document! { + doc => "Returns the average of the independent variable for non-null pairs in a group, where x is the independent variable and y is the dependent variable", + example => "regr_avgx(y, x)", + name => regr_avgx +} + +document! { + doc => "Returns the average of the dependent variable for non-null pairs in a group, where x is the independent variable and y is the dependent variable", + example => "regr_avgy(y, x)", + name => regr_avgy +} + +document! { + doc => "Returns the number of non-null number pairs in a group", + example => "regr_count(y, x)", + name => regr_count +} + +document! { + doc => "Returns the intercept of the univariate linear regression line for non-null pairs in a group", + example => "regr_intercept(y, x)", + name => regr_intercept +} + +document! { + doc => "Returns the coefficient of determination (R-squared) for non-null pairs in a group", + example => "regr_r2(y, x)", + name => regr_r2 +} + +document! { + doc => "Returns the slope of the linear regression line for non-null pairs in a group", + example => "regr_slope(y, x)", + name => regr_slope +} + +document! { + doc => "Returns the sum of squares of the independent variable for non-null pairs in a group", + example => "regr_sxx(y, x)", + name => regr_sxx +} + +document! { + doc => "Returns the sum of products of independent times dependent variable for non-null pairs in a group", + example => "regr_sxy(y, x)", + name => regr_sxy +} + +document! { + doc => "Returns the sum of squares of the dependent variable for non-null pairs in a group", + example => "regr_syy(y, x)", + name => regr_syy +} + +document! { + doc => "Returns the sample standard deviation of a column", + example => "stddev(a)", + name => stddev +} + +document! { + doc => "Returns the population standard deviation of a column", + example => "stddev_pop(a)", + name => stddev_pop +} + +document! { + doc => "Returns the sum of a column", + example => "sum(a)", + name => sum +} +document! { + doc => "Returns the sample variance of a column", + example => "variance(a)", + name => variance +} + +document! { + doc => "Returns the population variance of a column", + example => "variance_pop(a)", + name => variance_pop +} + +impl BuiltinFunction for AggregateFunction { + fn function_type(&self) -> FunctionType { + FunctionType::Aggregate + } + fn name(&self) -> &'static str { + use AggregateFunction::*; + match self { + ApproxDistinct => approx_distinct::NAME, + ApproxMedian => approx_median::NAME, + ApproxPercentileCont => approx_percentile_cont::NAME, + ApproxPercentileContWithWeight => approx_percentile_cont_with_weight::NAME, + ArrayAgg => array_agg::NAME, + Avg => avg::NAME, + BitAnd => bit_and::NAME, + BitOr => bit_or::NAME, + BitXor => bit_xor::NAME, + BoolAnd => bool_and::NAME, + BoolOr => bool_or::NAME, + Correlation => correlation::NAME, + Count => count::NAME, + Covariance => covariance::NAME, + CovariancePop => covariance_pop::NAME, + FirstValue => first_value::NAME, + Grouping => grouping::NAME, + LastValue => last_value::NAME, + Max => max::NAME, + Median => median::NAME, + Min => min::NAME, + RegrAvgx => regr_avgx::NAME, + RegrAvgy => regr_avgy::NAME, + RegrCount => regr_count::NAME, + RegrIntercept => regr_intercept::NAME, + RegrR2 => regr_r2::NAME, + RegrSlope => regr_slope::NAME, + RegrSXX => regr_sxx::NAME, + RegrSXY => regr_sxy::NAME, + RegrSYY => regr_syy::NAME, + Stddev => stddev::NAME, + StddevPop => stddev_pop::NAME, + Sum => sum::NAME, + Variance => variance::NAME, + VariancePop => variance_pop::NAME, + } + } + + fn signature(&self) -> Option { + Some(AggregateFunction::signature(self)) + } + fn sql_example(&self) -> Option { + use AggregateFunction::*; + Some( + match self { + ApproxDistinct => approx_distinct::EXAMPLE, + ApproxMedian => approx_median::EXAMPLE, + ApproxPercentileCont => approx_percentile_cont::EXAMPLE, + ApproxPercentileContWithWeight => approx_percentile_cont_with_weight::EXAMPLE, + ArrayAgg => array_agg::EXAMPLE, + Avg => avg::EXAMPLE, + BitAnd => bit_and::EXAMPLE, + BitOr => bit_or::EXAMPLE, + BitXor => bit_xor::EXAMPLE, + BoolAnd => bool_and::EXAMPLE, + BoolOr => bool_or::EXAMPLE, + Correlation => correlation::EXAMPLE, + Count => count::EXAMPLE, + Covariance => covariance::EXAMPLE, + CovariancePop => covariance_pop::EXAMPLE, + FirstValue => first_value::EXAMPLE, + Grouping => grouping::EXAMPLE, + LastValue => last_value::EXAMPLE, + Max => max::EXAMPLE, + Median => median::EXAMPLE, + Min => min::EXAMPLE, + RegrAvgx => regr_avgx::EXAMPLE, + RegrAvgy => regr_avgy::EXAMPLE, + RegrCount => regr_count::EXAMPLE, + RegrIntercept => regr_intercept::EXAMPLE, + RegrR2 => regr_r2::EXAMPLE, + RegrSlope => regr_slope::EXAMPLE, + RegrSXX => regr_sxx::EXAMPLE, + RegrSXY => regr_sxy::EXAMPLE, + RegrSYY => regr_syy::EXAMPLE, + Stddev => stddev::EXAMPLE, + StddevPop => stddev_pop::EXAMPLE, + Sum => sum::EXAMPLE, + Variance => variance::EXAMPLE, + VariancePop => variance_pop::EXAMPLE, + } + .to_string(), + ) + } + fn description(&self) -> Option { + use AggregateFunction::*; + Some( + match self { + ApproxDistinct => approx_distinct::DESCRIPTION, + ApproxMedian => approx_median::DESCRIPTION, + ApproxPercentileCont => approx_percentile_cont::DESCRIPTION, + ApproxPercentileContWithWeight => approx_percentile_cont_with_weight::DESCRIPTION, + ArrayAgg => array_agg::DESCRIPTION, + Avg => avg::DESCRIPTION, + BitAnd => bit_and::DESCRIPTION, + BitOr => bit_or::DESCRIPTION, + BitXor => bit_xor::DESCRIPTION, + BoolAnd => bool_and::DESCRIPTION, + BoolOr => bool_or::DESCRIPTION, + Correlation => correlation::DESCRIPTION, + Count => count::DESCRIPTION, + Covariance => covariance::DESCRIPTION, + CovariancePop => covariance_pop::DESCRIPTION, + FirstValue => first_value::DESCRIPTION, + Grouping => grouping::DESCRIPTION, + LastValue => last_value::DESCRIPTION, + Max => max::DESCRIPTION, + Median => median::DESCRIPTION, + Min => min::DESCRIPTION, + RegrAvgx => regr_avgx::DESCRIPTION, + RegrAvgy => regr_avgy::DESCRIPTION, + RegrCount => regr_count::DESCRIPTION, + RegrIntercept => regr_intercept::DESCRIPTION, + RegrR2 => regr_r2::DESCRIPTION, + RegrSlope => regr_slope::DESCRIPTION, + RegrSXX => regr_sxx::DESCRIPTION, + RegrSXY => regr_sxy::DESCRIPTION, + RegrSYY => regr_syy::DESCRIPTION, + Stddev => stddev::DESCRIPTION, + StddevPop => stddev_pop::DESCRIPTION, + Sum => sum::DESCRIPTION, + Variance => variance::DESCRIPTION, + VariancePop => variance_pop::DESCRIPTION, + } + .to_string(), + ) + } +} diff --git a/crates/sqlbuiltins/src/functions/iceberg.rs b/crates/sqlbuiltins/src/functions/iceberg.rs deleted file mode 100644 index a8b8893af..000000000 --- a/crates/sqlbuiltins/src/functions/iceberg.rs +++ /dev/null @@ -1,204 +0,0 @@ -use std::collections::HashMap; -use std::sync::Arc; - -use crate::functions::table_location_and_opts; -use async_trait::async_trait; -use datafusion::arrow::array::{Int32Builder, Int64Builder, StringBuilder, UInt64Builder}; -use datafusion::arrow::datatypes::{DataType, Field, Schema}; -use datafusion::arrow::record_batch::RecordBatch; -use datafusion::datasource::{MemTable, TableProvider}; -use datafusion_ext::errors::{ExtensionError, Result}; -use datafusion_ext::functions::{FuncParamValue, TableFunc, TableFuncContextProvider}; -use datasources::lake::iceberg::table::IcebergTable; -use datasources::lake::storage_options_into_object_store; -use protogen::metastore::types::catalog::RuntimePreference; - -/// Scan an iceberg table. -#[derive(Debug, Clone, Copy)] -pub struct IcebergScan; - -#[async_trait] -impl TableFunc for IcebergScan { - fn runtime_preference(&self) -> RuntimePreference { - // TODO: Detect runtime - RuntimePreference::Remote - } - - fn name(&self) -> &str { - "iceberg_scan" - } - - async fn create_provider( - &self, - ctx: &dyn TableFuncContextProvider, - args: Vec, - mut opts: HashMap, - ) -> Result> { - // TODO: Reduce duplication - let (loc, opts) = table_location_and_opts(ctx, args, &mut opts)?; - - let store = storage_options_into_object_store(&loc, &opts).map_err(box_err)?; - let table = IcebergTable::open(loc.clone(), store) - .await - .map_err(box_err)?; - let reader = table.table_reader().await.map_err(box_err)?; - - Ok(reader) - } -} - -/// Scan snapshot information for an iceberg tables. Will not attempt to read -/// data files. -#[derive(Debug, Clone, Copy)] -pub struct IcebergSnapshots; - -#[async_trait] -impl TableFunc for IcebergSnapshots { - fn runtime_preference(&self) -> RuntimePreference { - RuntimePreference::Remote - } - fn name(&self) -> &str { - "iceberg_snapshots" - } - - async fn create_provider( - &self, - ctx: &dyn TableFuncContextProvider, - args: Vec, - mut opts: HashMap, - ) -> Result> { - let (loc, opts) = table_location_and_opts(ctx, args, &mut opts)?; - - let store = storage_options_into_object_store(&loc, &opts).map_err(box_err)?; - let table = IcebergTable::open(loc, store).await.map_err(box_err)?; - - let snapshots = &table.metadata().snapshots; - - let schema = Arc::new(Schema::new(vec![ - Field::new("snapshot_id", DataType::Int64, false), - Field::new("timestamp_ms", DataType::Int64, false), - Field::new("manifest_list", DataType::Utf8, false), - Field::new("schema_id", DataType::Int32, false), - ])); - - let mut snapshot_id = Int64Builder::new(); - let mut timestamp_ms = Int64Builder::new(); - let mut manifest_list = StringBuilder::new(); - let mut schema_id = Int32Builder::new(); - - for snapshot in snapshots { - snapshot_id.append_value(snapshot.snapshot_id); - timestamp_ms.append_value(snapshot.timestamp_ms); - manifest_list.append_value(&snapshot.manifest_list); - schema_id.append_value(snapshot.schema_id); - } - - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(snapshot_id.finish()), - Arc::new(timestamp_ms.finish()), - Arc::new(manifest_list.finish()), - Arc::new(schema_id.finish()), - ], - )?; - - Ok(Arc::new( - MemTable::try_new(schema, vec![vec![batch]]).unwrap(), - )) - } -} - -/// Scan data file metadata for the current snapshot of an iceberg table. Will -/// not attempt to read data files. -#[derive(Debug, Clone, Copy)] -pub struct IcebergDataFiles; - -#[async_trait] -impl TableFunc for IcebergDataFiles { - fn runtime_preference(&self) -> RuntimePreference { - RuntimePreference::Remote - } - fn name(&self) -> &str { - "iceberg_data_files" - } - - async fn create_provider( - &self, - ctx: &dyn TableFuncContextProvider, - args: Vec, - mut opts: HashMap, - ) -> Result> { - let (loc, opts) = table_location_and_opts(ctx, args, &mut opts)?; - - let store = storage_options_into_object_store(&loc, &opts).map_err(box_err)?; - let table = IcebergTable::open(loc, store).await.map_err(box_err)?; - - let manifests = table.read_manifests().await.map_err(box_err)?; - - let schema = Arc::new(Schema::new(vec![ - Field::new("manifest_index", DataType::UInt64, false), - Field::new("manifest_content", DataType::Utf8, false), - Field::new("snapshot_id", DataType::Int64, true), - Field::new("sequence_number", DataType::Int64, true), - Field::new("file_sequence_number", DataType::Int64, true), - Field::new("file_path", DataType::Utf8, false), - Field::new("file_format", DataType::Utf8, false), - Field::new("record_count", DataType::Int64, false), - Field::new("file_size_bytes", DataType::Int64, false), - ])); - - let mut manifest_index = UInt64Builder::new(); - let mut manifest_content = StringBuilder::new(); - let mut snapshot_id = Int64Builder::new(); - let mut sequence_number = Int64Builder::new(); - let mut file_sequence_number = Int64Builder::new(); - let mut file_path = StringBuilder::new(); - let mut file_format = StringBuilder::new(); - let mut record_count = Int64Builder::new(); - let mut file_size_bytes = Int64Builder::new(); - - for (idx, manifest) in manifests.into_iter().enumerate() { - for entry in manifest.entries { - // Manifest metadata - manifest_index.append_value(idx as u64); - manifest_content.append_value(manifest.metadata.content.to_string()); - - // Entry data - snapshot_id.append_option(entry.snapshot_id); - sequence_number.append_option(entry.sequence_number); - file_sequence_number.append_option(entry.file_sequence_number); - file_path.append_value(&entry.data_file.file_path); - file_format.append_value(&entry.data_file.file_format); - record_count.append_value(entry.data_file.record_count); - file_size_bytes.append_value(entry.data_file.file_size_in_bytes); - } - } - - let batch = RecordBatch::try_new( - schema.clone(), - vec![ - Arc::new(manifest_index.finish()), - Arc::new(manifest_content.finish()), - Arc::new(snapshot_id.finish()), - Arc::new(sequence_number.finish()), - Arc::new(file_sequence_number.finish()), - Arc::new(file_path.finish()), - Arc::new(file_format.finish()), - Arc::new(record_count.finish()), - Arc::new(file_size_bytes.finish()), - ], - )?; - - Ok(Arc::new( - MemTable::try_new(schema, vec![vec![batch]]).unwrap(), - )) - } -} - -fn box_err(err: E) -> ExtensionError -where - E: std::error::Error + Send + Sync + 'static, -{ - ExtensionError::Access(Box::new(err)) -} diff --git a/crates/sqlbuiltins/src/functions/mod.rs b/crates/sqlbuiltins/src/functions/mod.rs index bb5bbc443..c2d27b8fe 100644 --- a/crates/sqlbuiltins/src/functions/mod.rs +++ b/crates/sqlbuiltins/src/functions/mod.rs @@ -1,242 +1,89 @@ //! Builtin table returning functions. +mod aggregates; +mod scalars; +mod table; -mod bigquery; -mod delta; -mod excel; -mod generate_series; -mod iceberg; -mod lance; -mod mongo; -mod mysql; -mod object_store; -mod postgres; -mod snowflake; -mod virtual_listing; - -use ::object_store::aws::AmazonS3ConfigKey; -use ::object_store::azure::AzureConfigKey; -use ::object_store::gcp::GoogleConfigKey; -use datafusion::logical_expr::{AggregateFunction, BuiltinScalarFunction}; use std::collections::HashMap; -use std::sync::Arc; -use datafusion_ext::errors::{ExtensionError, Result}; -use datafusion_ext::functions::{FuncParamValue, IdentValue, TableFunc, TableFuncContextProvider}; -use datasources::common::url::{DatasourceUrl, DatasourceUrlType}; -use once_cell::sync::Lazy; -use protogen::metastore::types::options::{CredentialsOptions, StorageOptions}; - -use self::bigquery::ReadBigQuery; -use self::delta::DeltaScan; -use self::excel::ExcelScan; -use self::generate_series::GenerateSeries; -use self::iceberg::{IcebergDataFiles, IcebergScan, IcebergSnapshots}; -use self::lance::LanceScan; -use self::mongo::ReadMongoDb; -use self::mysql::ReadMysql; -use self::object_store::{CSV_SCAN, JSON_SCAN, PARQUET_SCAN, READ_CSV, READ_JSON, READ_PARQUET}; -use self::postgres::ReadPostgres; -use self::snowflake::ReadSnowflake; -use self::virtual_listing::{ListColumns, ListSchemas, ListTables}; +use crate::builtins::BuiltinFunction; +use self::scalars::ArrowCastFunction; +use self::table::BuiltinTableFuncs; +use datafusion::logical_expr::{AggregateFunction, BuiltinScalarFunction}; +use once_cell::sync::Lazy; +use std::sync::Arc; /// Builtin table returning functions available for all sessions. pub static BUILTIN_TABLE_FUNCS: Lazy = Lazy::new(BuiltinTableFuncs::new); -pub static BUILTIN_SCALAR_FUNCS: Lazy = Lazy::new(BuiltinScalarFuncs::new); -pub static BUILTIN_AGGREGATE_FUNCS: Lazy = - Lazy::new(BuiltinAggregateFuncs::new); -/// All builtin aggregate functions. -pub struct BuiltinAggregateFuncs { - funcs: HashMap, -} - -impl BuiltinAggregateFuncs { - pub fn new() -> Self { - use strum::IntoEnumIterator; - let funcs = AggregateFunction::iter() - .map(|f| (f.to_string(), f)) - .collect::>(); +pub static ARROW_CAST_FUNC: Lazy = Lazy::new(|| ArrowCastFunction {}); +pub static BUILTIN_FUNCS: Lazy = Lazy::new(BuiltinFuncs::new); - BuiltinAggregateFuncs { funcs } - } - pub fn iter_funcs(&self) -> impl Iterator { - self.funcs.values() - } -} - -impl Default for BuiltinAggregateFuncs { - fn default() -> Self { - Self::new() - } -} - -/// All builtin scalar functions. -pub struct BuiltinScalarFuncs { - funcs: HashMap, +pub struct BuiltinFuncs { + funcs: HashMap>, } -impl BuiltinScalarFuncs { +impl BuiltinFuncs { pub fn new() -> Self { use strum::IntoEnumIterator; - let funcs = BuiltinScalarFunction::iter() - .map(|f| (f.to_string(), f)) - .collect::>(); - - BuiltinScalarFuncs { funcs } - } - pub fn iter_funcs(&self) -> impl Iterator { - self.funcs.values() - } -} -impl Default for BuiltinScalarFuncs { - fn default() -> Self { - Self::new() - } -} - -/// All builtin table functions. -pub struct BuiltinTableFuncs { - funcs: HashMap>, -} - -impl BuiltinTableFuncs { - pub fn new() -> BuiltinTableFuncs { - let funcs: Vec> = vec![ - // Databases/warehouses - Arc::new(ReadPostgres), - Arc::new(ReadBigQuery), - Arc::new(ReadMongoDb), - Arc::new(ReadMysql), - Arc::new(ReadSnowflake), - // Object store - Arc::new(PARQUET_SCAN), - Arc::new(READ_PARQUET), - Arc::new(CSV_SCAN), - Arc::new(READ_CSV), - Arc::new(JSON_SCAN), - Arc::new(READ_JSON), - // Data lakes - Arc::new(DeltaScan), - Arc::new(IcebergScan), - Arc::new(IcebergSnapshots), - Arc::new(IcebergDataFiles), - Arc::new(ExcelScan), - Arc::new(LanceScan), - // Listing - Arc::new(ListSchemas), - Arc::new(ListTables), - Arc::new(ListColumns), - // Series generating - Arc::new(GenerateSeries), - ]; - let funcs: HashMap> = funcs - .into_iter() - .map(|f| (f.name().to_string(), f)) - .collect(); - - BuiltinTableFuncs { funcs } - } - - pub fn find_function(&self, name: &str) -> Option> { - self.funcs.get(name).cloned() + let scalars = BuiltinScalarFunction::iter().map(|f| { + let key = f.to_string(); + let value: Arc = Arc::new(f); + (key, value) + }); + let aggregates = AggregateFunction::iter().map(|f| { + let key = f.to_string(); + let value: Arc = Arc::new(f); + (key, value) + }); + let arrow_cast: Arc = Arc::new(ArrowCastFunction {}); + let arrow_cast = (arrow_cast.name().to_string(), arrow_cast); + let arrow_cast = std::iter::once(arrow_cast); + + let funcs: HashMap> = + scalars.chain(aggregates).chain(arrow_cast).collect(); + + BuiltinFuncs { funcs } } - - pub fn iter_funcs(&self) -> impl Iterator> { + pub fn iter_funcs(&self) -> impl Iterator> { self.funcs.values() } } -impl Default for BuiltinTableFuncs { +impl Default for BuiltinFuncs { fn default() -> Self { Self::new() } } -// Parse the data lake table location and object store options from the provided function arguments -fn table_location_and_opts( - ctx: &dyn TableFuncContextProvider, - args: Vec, - opts: &mut HashMap, -) -> Result<(DatasourceUrl, StorageOptions)> { - let mut args = args.into_iter(); - let first = args.next().unwrap(); - let url: String = first.param_into()?; - let source_url = - DatasourceUrl::try_new(url).map_err(|e| ExtensionError::Access(Box::new(e)))?; - - let mut maybe_cred_opts = None; - // Check if a credentials object has been supplied - if let Some(func_param) = args.next() { - let creds: IdentValue = func_param.param_into()?; - maybe_cred_opts = Some( - ctx.get_session_catalog() - .resolve_credentials(creds.as_str()) - .cloned() - .ok_or(ExtensionError::String(format!( - "missing credentials object: {creds}" - )))? - .options, - ); - } - - let mut storage_options = StorageOptions::default(); - match (source_url.datasource_url_type(), maybe_cred_opts) { - (DatasourceUrlType::File, None) => {} // no options fine in this case - (DatasourceUrlType::File, _) => { - return Err(ExtensionError::String( - "Credentials incorrectly provided when accessing local delta table".to_string(), - )) - } - (DatasourceUrlType::Gcs, Some(CredentialsOptions::Gcp(creds))) => { - storage_options.inner.insert( - GoogleConfigKey::ServiceAccountKey.as_ref().to_string(), - creds.service_account_key, - ); - } - (DatasourceUrlType::S3, Some(CredentialsOptions::Aws(creds))) => { - const REGION_KEY: &str = "region"; - let region = opts - .remove(REGION_KEY) - .ok_or(ExtensionError::MissingNamedArgument(REGION_KEY))? - .param_into()?; - - storage_options.inner.insert( - AmazonS3ConfigKey::AccessKeyId.as_ref().to_string(), - creds.access_key_id, - ); - storage_options.inner.insert( - AmazonS3ConfigKey::SecretAccessKey.as_ref().to_string(), - creds.secret_access_key, - ); - storage_options - .inner - .insert(AmazonS3ConfigKey::Region.as_ref().to_string(), region); - } - (DatasourceUrlType::Azure, Some(CredentialsOptions::Azure(creds))) => { - storage_options.inner.insert( - AzureConfigKey::AccountName.as_ref().to_string(), - creds.account_name, - ); - storage_options.inner.insert( - AzureConfigKey::AccessKey.as_ref().to_string(), - creds.access_key, - ); +// Define a macro to associate doc strings and examples with items +// The macro helps preserve the line wrapping. rustfmt will otherwise collapse the lines. +#[macro_export] +macro_rules! document { + (doc => $doc:expr, example => $example:expr, name => $item:ident) => { + #[doc = $doc] + pub struct $item; + + impl $item { + const DESCRIPTION: &'static str = $doc; + const EXAMPLE: &'static str = $example; + const NAME: &'static str = stringify!($item); } - (DatasourceUrlType::Http, _) => { - return Err(ExtensionError::String( - "Accessing delta tables over http not supported".to_string(), - )) + }; + (doc => $doc:expr, example => $example:expr, $name:expr => $item:ident) => { + #[doc = $doc] + pub struct $item; + + impl $item { + const DESCRIPTION: &'static str = $doc; + const EXAMPLE: &'static str = $example; + const NAME: &'static str = $name; } - (datasource, creds) => { - return Err(ExtensionError::String(format!( - "Invalid credentials for {datasource}, got {} creds", - if let Some(o) = creds { - o.as_str() - } else { - "no" - } - ))) + }; + // uses an existing struct + ($doc:expr, $example:expr, name => $name:expr, implementation => $item:ident) => { + impl $item { + const DESCRIPTION: &'static str = $doc; + const EXAMPLE: &'static str = $example; + const NAME: &'static str = $name; } }; - - Ok((source_url, storage_options)) } diff --git a/crates/sqlbuiltins/src/functions/scalars.rs b/crates/sqlbuiltins/src/functions/scalars.rs new file mode 100644 index 000000000..4655dc23f --- /dev/null +++ b/crates/sqlbuiltins/src/functions/scalars.rs @@ -0,0 +1,994 @@ +// we make use of the document! macro to generate the documentation for the builtin functions. +// specifically the `stringify!` macro is used to get the name of the function. +// `Abs` would otherwise be `Abs` instead of `abs`. and so on. +#![allow(non_camel_case_types)] + +use crate::{ + builtins::{BuiltinFunction, ConstBuiltinFunction}, + document, +}; +use datafusion::logical_expr::BuiltinScalarFunction; +use protogen::metastore::types::catalog::FunctionType; + +pub struct ArrowCastFunction {} + +impl ConstBuiltinFunction for ArrowCastFunction { + const NAME: &'static str = "arrow_cast"; + const DESCRIPTION: &'static str = "Casts a value to a specified arrow type."; + const EXAMPLE: &'static str = "arrow_cast(1, 'Int32')"; + const FUNCTION_TYPE: FunctionType = FunctionType::Scalar; +} + +document! { + doc => "Compute the absolute value of a number", + example => "abs(-1)", + name => abs +} +document! { + doc => "Compute the inverse cosine (arc cosine) of a number", + example => "acos(0.5)", + name => acos +} +document! { + doc => "Compute the inverse hyperbolic cosine of a number", + example => "acosh(1)", + name => acosh +} +document! { + doc => "Compute the inverse sine (arc sine) of a number", + example => "asin(0.5)", + name => asin +} +document! { + doc => "Compute the inverse hyperbolic sine of a number", + example => "asinh(1)", + name => asinh +} +document! { + doc => "Computes the arctangent of a number", + example => "atan(1)", + name => atan +} +document! { + doc => "Computes the arctangent of y/x given y and x", + example => "atan2(1, 1)", + name => atan2 +} +document! { + doc => "Compute the inverse hyperbolic tangent of a number", + example => "atanh(0.5)", + name => atanh +} +document! { + doc => "Compute the cube root of a number", + example => "cbrt(27)", + name => cbrt +} +document! { + doc => "Compute the smallest integer greater than or equal to a number", + example => "ceil(1.1)", + name => ceil +} +document! { + doc => "Compute the cosine of a number", + example => "cos(0)", + name => cos +} +document! { + doc => "Compute the cotangent of a number", + example => "cot(1)", + name => cot +} +document! { + doc => "Compute the hyperbolic cosine of a number", + example => "cosh(0)", + name => cosh +} +document! { + doc => "Converts an angle measured in radians to an approximately equivalent angle measured in degrees", + example => "degrees(1)", + name => degrees +} +document! { + doc => "Compute the base-e exponential of a number", + example => "exp(1)", + name => exp +} +document! { + doc => "Compute the factorial of a number", + example => "factorial(5)", + name => factorial +} +document! { + doc => "Compute the largest integer less than or equal to a number", + example => "floor(1.1)", + name => floor +} +document! { + doc => "Compute the greatest common divisor of two integers", + example => "gcd(10, 15)", + name => gcd +} +document! { + doc => "Returns true if the argument is NaN, false otherwise", + example => "isnan(1)", + name => isnan +} +document! { + doc => "Returns true if the argument is zero, false otherwise", + example => "iszero(0)", + name => iszero +} +document! { + doc => "Compute the least common multiple of two integers", + example => "lcm(10, 15)", + name => lcm +} +document! { + doc => "Compute the natural logarithm of a number", + example => "ln(1)", + name => ln +} +document! { + doc => "Compute the logarithm of a number in a specified base", + example => "log(1)", + name => log +} +document! { + doc => "Compute the base-10 logarithm of a number", + example => "log10(1)", + name => log10 +} +document! { + doc => "Compute the base-2 logarithm of a number", + example => "log2(1)", + name => log2 +} +document! { + doc => "Returns the first argument if it is not NaN, otherwise returns the second argument", + example => "nanvl(1, 2)", + name => nanvl +} +document! { + doc => "Compute the constant π", + example => "pi()", + name => pi +} + +document! { + doc => "Compute the power of a number", + example => "pow(2, 3)", + name => pow +} +document! { + doc => "Converts an angle measured in degrees to an approximately equivalent angle measured in radians", + example => "radians(1)", + name => radians +} +document! { + doc => "Compute a pseudo-random number between 0 and 1", + example => "random()", + name => random +} +document! { + doc => "Round a number to the nearest integer", + example => "round(1.1)", + name => round +} +document! { + doc => "Compute the sign of a number", + example => "signum(1)", + name => signum +} +document! { + doc => "Compute the sine of a number", + example => "sin(0)", + name => sin +} +document! { + doc => "Compute the hyperbolic sine of a number", + example => "sinh(0)", + name => sinh +} +document! { + doc => "Compute the square root of a number", + example => "sqrt(4)", + name => sqrt +} +document! { + doc => "Compute the tangent of a number", + example => "tan(0)", + name => tan +} +document! { + doc => "Compute the hyperbolic tangent of a number", + example => "tanh(0)", + name => tanh +} +document! { + doc => "Truncate a number to the nearest integer. If a second argument is provided, truncate to the specified number of decimal places", + example => "trunc(1.1111, 2)", + name => trunc +} +document! { + doc => "Returns the first non-null argument, or null if all arguments are null", + example => "coalesce(null, 1)", + name => coalesce +} +document! { + doc => "Returns null if the arguments are equal, otherwise returns the first argument", + example => "nullif(1, 1)", + name => nullif +} +document! { + doc => "Compute the ASCII code of the first character of a string", + example => "ascii('a')", + name => ascii +} +document! { + doc => "Compute the number of bits in a string", + example => "bit_length('hello')", + name => bit_length +} +document! { + doc => "Remove the longest string containing only characters from a set of characters from the start and end of a string", + example => "btrim('hello', 'eh')", + name => btrim +} +document! { + doc => "Compute the number of characters in a string", + example => "character_length('hello')", + name => character_length +} +document! { + doc => "Concatenate two strings", + example => "concat('hello', 'world')", + name => concat +} +document! { + doc => "Concatenate two strings with a separator", + example => "concat_ws(',', 'hello', 'world')", + name => concat_ws +} +document! { + doc => "Compute the character with the given ASCII code", + example => "chr(97)", + name => chr +} +document! { + doc => "Capitalize the first letter of each word in a string", + example => "initcap('hello world')", + name => initcap +} +document! { + doc => "Extract a substring from the start of a string with the given length", + example => "left('hello', 2)", + name => left +} +document! { + doc => "Convert a string to lowercase", + example => "lower('HELLO')", + name => lower +} +document! { + doc => "Pad a string to the left to a specified length with a sequence of characters", + example => "lpad('hello', 10, '12')", + name => lpad +} +document! { + doc => "Remove all spaces from the start of a string", + example => "ltrim(' hello ')", + name => ltrim +} +document! { + doc => "Compute the number of bytes in a string", + example => "octet_length('hello')", + name => octet_length +} +document! { + doc => "Repeat a string a specified number of times", + example => "repeat('hello', 2)", + name => repeat +} +document! { + doc => "Replace all occurrences of a substring in a string with a new substring", + example => "replace('hello', 'l', 'r')", + name => replace +} +document! { + doc => "Reverse the characters in a string", + example => "reverse('hello')", + name => reverse +} +document! { + doc => "Extract a substring from the end of a string with the given length", + example => "right('hello', 2)", + name => right +} +document! { + doc => "Pad a string to the right to a specified length with a sequence of characters", + example => "rpad('hello', 10, '12')", + name => rpad +} +document! { + doc => "Remove all spaces from the end of a string", + example => "rtrim(' hello ')", + name => rtrim +} +document! { + doc => "Split a string on a delimiter and return the nth field", + example => "split_part('hello.world', '.', 2)", + name => split_part +} +document! { + doc => "Split a string on a delimiter and return an array of the fields", + example => "string_to_array('hello world', ' ')", + name => string_to_array +} +document! { + doc => "Returns true if the first string starts with the second string, false otherwise", + example => "starts_with('hello world', 'hello')", + name => starts_with +} +document! { + doc => "Find the position of the first occurrence of a substring in a string", + example => "strpos('hello world', 'world')", + name => strpos +} +document! { + doc => "Extract a substring from a string with the given start position and length", + example => "substr('hello', 2, 2)", + name => substr +} +document! { + doc => "Convert a number or binary value to a hexadecimal string", + example => "to_hex(99999)", + name => to_hex +} +document! { + doc => "Replace all occurrences of a set of characters in a string with a new set of characters", + example => "translate('hello', 'el', '12')", + name => translate +} +document! { + doc => "Remove all spaces from the beginning and end of a string", + example => "trim(' hello ')", + name => trim +} +document! { + doc => "Convert a string to uppercase", + example => "upper('hello')", + name => upper +} +document! { + doc => "Generate a random UUID", + example => "uuid()", + name => uuid +} +document! { + doc => "Returns true if the first string matches the second string as a regular expression, false otherwise", + example => "regexp_match('hello world', 'hello')", + name => regexp_match +} +document! { + doc => "Replace all occurrences of a substring in a string with a new substring using a regular expression", + example => "regexp_replace('hello world', 'hello', 'goodbye')", + name => regexp_replace +} +document! { + doc => "Returns the current timestamp", + example => "now()", + name => now +} +document! { + doc => "Returns the current date", + example => "current_date()", + name => current_date +} +document! { + doc => "Returns the current time in UTC", + example => "current_time()", + name => current_time +} +document! { + doc => "Returns the date binned to the specified interval", + example => "date_bin('15 minutes', TIMESTAMP '2022-01-01 15:07:00', TIMESTAMP '2020-01-01)", + name => date_bin +} +document! { + doc => "Returns the date truncated to the specified unit", + example => "date_trunc('day', '2020-01-01')", + name => date_trunc +} +document! { + doc => "Returns the specified part of a date", + example => "date_part('year', '2020-01-01')", + name => date_part +} +document! { + doc => "Converts a string to a timestamp (Timestamp). Alias for `TIMESTAMP `", + example => "to_timestamp('2020-09-08T12:00:00+00:00')", + name => to_timestamp +} +document! { + doc => "Converts a string to a timestamp with millisecond precision (Timestamp)", + example => "to_timestamp_millis('2020-09-08T12:00:00+00:00')", + name => to_timestamp_millis +} +document! { + doc => "Converts a string to a timestamp with microsecond precision (Timestamp<µs, UTC>)", + example => "to_timestamp_micros('2020-09-08T12:00:00+00:00')", + name => to_timestamp_micros +} +document! { + doc => "Converts a string to a timestamp with second precision (Timestamp)", + example => "to_timestamp_seconds('2020-09-08T12:00:00+00:00')", + name => to_timestamp_seconds +} +document! { + doc => "Converts a unix timestamp (seconds since 1970-01-01 00:00:00 UTC) to a timestamp (Timestamp)", + example => "from_unixtime(1600000000)", + name => from_unixtime +} +document! { + doc => "Compute the digest of a string using the specified algorithm. Valid algorithms are: md5, sha224, sha256, sha384, sha512, blake2s, blake2b, blake3", + example => "digest('hello', 'sha256')", + name => digest +} +document! { + doc => "Compute the MD5 digest of a string. Alias for `digest(, 'md5')`", + example => "md5('hello')", + name => md5 +} +document! { + doc => "Compute the SHA-224 digest of a string. Alias for `digest(, 'sha224')`", + example => "sha224('hello')", + name => sha224 +} +document! { + doc => "Compute the SHA-256 digest of a string. Alias for `digest(, 'sha256')`", + example => "sha256('hello')", + name => sha256 +} +document! { + doc => "Compute the SHA-384 digest of a string. Alias for `digest(, 'sha384')`", + example => "sha384('hello')", + name => sha384 +} +document! { + doc => "Compute the SHA-512 digest of a string. Alias for `digest(, 'sha512')`", + example => "sha512('hello')", + name => sha512 +} +document! { + doc => "Encode a string using the specified encoding. Valid encodings are: hex, base64", + example => "encode('hello', 'hex')", + name => encode +} +document! { + doc => "Decode a string using the specified encoding. Valid encodings are: hex, base64", + example => "decode('68656c6c6f', 'hex')", + name => decode +} +document! { + doc => "Returns the Arrow type of the argument", + example => "arrow_typeof(1)", + name => arrow_typeof +} +document! { + doc => "Append an element to the end of an array", + example => "array_append([1, 2], 3)", + name => array_append +} +document! { + doc => "Concatenate two arrays", + example => "array_concat([1, 2], [3, 4])", + name => array_concat +} +document! { + doc => "Returns the dimensions of an array", + example => "array_dims([[[1]]])", + name => array_dims +} +document! { + doc => "Returns a boolean indicating whether the array is empty", + example => "empty([])", + name => empty +} +document! { + doc => "Returns the element of an array at the specified index (using one-based indexing)", + example => "array_element([1, 2], 1)", + name => array_element +} +document! { + doc => "Flatten an array of arrays", + example => "flatten([[1, 2], [3, 4]])", + name => flatten +} +document! { + doc => "Returns true if the first array contains all elements of the second array", + example => "array_has_all([1, 2], [1, 2, 3])", + name => array_has_all +} +document! { + doc => "Returns true if the first array contains any elements of the second array", + example => "array_has_any([1, 2], [1, 2, 3])", + name => array_has_any +} +document! { + doc => "Returns true if the array contains the specified element", + example => "array_contains([1, 2], 1)", + name => array_contains +} +document! { + doc => "Returns the length of an array", + example => "array_length([1, 2])", + name => array_length +} +document! { + doc => "Returns the number of dimensions of an array", + example => "array_ndims([ [1, 2], [3, 4] ])", + name => array_ndims +} +document! { + doc => "Remove the last element of an array", + example => "array_pop_back([1, 2])", + name => array_pop_back +} +document! { + doc => "Find the position of the first occurrence of an element in an array", + example => "array_position([1, 2], 2)", + name => array_position +} +document! { + doc => "Find the positions of all occurrences of an element in an array", + example => "array_positions([1, 2, 1], 1)", + name => array_positions +} +document! { + doc => "Prepend an element to the start of an array", + example => "array_prepend([1, 2], 3)", + name => array_prepend +} +document! { + doc => "Repeat an element a specified number of times to create an array", + example => "array_repeat(1, 2)", + name => array_repeat +} +document! { + doc => "Remove the first occurrence of an element from an array", + example => "array_remove([1, 2, 1], 1)", + name => array_remove +} +document! { + doc => "Remove the first n occurrences of an element from an array", + example => "array_remove_n([1, 2, 1], 1, 2)", + name => array_remove_n +} +document! { + doc => "Remove all occurrences of an element from an array", + example => "array_remove_all([1, 2, 1], 1)", + name => array_remove_all +} +document! { + doc => "Replace the first occurrence of an element in an array with a new element", + example => "array_replace([1, 2, 1], 1, 3)", + name => array_replace +} +document! { + doc => "Replace the first n occurrences of an element in an array with a new element", + example => "array_replace_n([1, 2, 1], 1, 3, 2)", + name => array_replace_n +} +document! { + doc => "Replace all occurrences of an element in an array with a new element", + example => "array_replace_all([1, 2, 1], 1, 3)", + name => array_replace_all +} +document! { + doc => "Extract a slice from an array", + example => "array_slice([1, 2, 3, 4], 1, 2)", + name => array_slice +} +document! { + doc => "Convert an array to a string with a separator", + example => "array_to_string([1, 2, 3], ',')", + name => array_to_string +} +document! { + doc => "Returns the number of elements in an array", + example => "cardinality([1, 2, 3])", + name => cardinality +} +document! { + doc => "Create an array from a list of elements", + example => "make_array(1, 2, 3)", + name => make_array +} +document! { + doc => "Create a struct from a list of elements. The field names will always be `cN` where N is the index of the element", + example => "struct(1, 'hello')", + "struct" => struct_ +} + +impl BuiltinFunction for BuiltinScalarFunction { + fn function_type(&self) -> FunctionType { + FunctionType::Scalar + } + + fn name(&self) -> &'static str { + use BuiltinScalarFunction::*; + match self { + Abs => abs::NAME, + Acos => acos::NAME, + Acosh => acosh::NAME, + Asin => asin::NAME, + Asinh => asinh::NAME, + Atan => atan::NAME, + Atanh => atanh::NAME, + Atan2 => atan2::NAME, + Cbrt => cbrt::NAME, + Ceil => ceil::NAME, + Cos => cos::NAME, + Cot => cot::NAME, + Cosh => cosh::NAME, + Degrees => degrees::NAME, + Exp => exp::NAME, + Factorial => factorial::NAME, + Floor => floor::NAME, + Gcd => gcd::NAME, + Isnan => isnan::NAME, + Iszero => iszero::NAME, + Lcm => lcm::NAME, + Ln => ln::NAME, + Log => log::NAME, + Log10 => log10::NAME, + Log2 => log2::NAME, + Nanvl => nanvl::NAME, + Pi => pi::NAME, + Power => pow::NAME, + Radians => radians::NAME, + Random => random::NAME, + Round => round::NAME, + Signum => signum::NAME, + Sin => sin::NAME, + Sinh => sinh::NAME, + Sqrt => sqrt::NAME, + Tan => tan::NAME, + Tanh => tanh::NAME, + Trunc => trunc::NAME, + Coalesce => coalesce::NAME, + NullIf => nullif::NAME, + Ascii => ascii::NAME, + BitLength => bit_length::NAME, + Btrim => btrim::NAME, + CharacterLength => character_length::NAME, + Concat => concat::NAME, + ConcatWithSeparator => concat_ws::NAME, + Chr => chr::NAME, + InitCap => initcap::NAME, + Left => left::NAME, + Lower => lower::NAME, + Lpad => lpad::NAME, + Ltrim => ltrim::NAME, + OctetLength => octet_length::NAME, + Repeat => repeat::NAME, + Replace => replace::NAME, + Reverse => reverse::NAME, + Right => right::NAME, + Rpad => rpad::NAME, + Rtrim => rtrim::NAME, + SplitPart => split_part::NAME, + StringToArray => string_to_array::NAME, + StartsWith => starts_with::NAME, + Strpos => strpos::NAME, + Substr => substr::NAME, + ToHex => to_hex::NAME, + Translate => translate::NAME, + Trim => trim::NAME, + Upper => upper::NAME, + Uuid => uuid::NAME, + RegexpMatch => regexp_match::NAME, + RegexpReplace => regexp_replace::NAME, + Now => now::NAME, + CurrentDate => current_date::NAME, + CurrentTime => current_time::NAME, + DateBin => date_bin::NAME, + DateTrunc => date_trunc::NAME, + DatePart => date_part::NAME, + ToTimestamp => to_timestamp::NAME, + ToTimestampMillis => to_timestamp_millis::NAME, + ToTimestampMicros => to_timestamp_micros::NAME, + ToTimestampSeconds => to_timestamp_seconds::NAME, + FromUnixtime => from_unixtime::NAME, + Digest => digest::NAME, + MD5 => md5::NAME, + SHA224 => sha224::NAME, + SHA256 => sha256::NAME, + SHA384 => sha384::NAME, + SHA512 => sha512::NAME, + Encode => encode::NAME, + Decode => decode::NAME, + ArrowTypeof => arrow_typeof::NAME, + ArrayAppend => array_append::NAME, + ArrayConcat => array_concat::NAME, + ArrayDims => array_dims::NAME, + ArrayEmpty => empty::NAME, + ArrayElement => array_element::NAME, + Flatten => flatten::NAME, + ArrayHasAll => array_has_all::NAME, + ArrayHasAny => array_has_any::NAME, + ArrayHas => array_contains::NAME, + ArrayLength => array_length::NAME, + ArrayNdims => array_ndims::NAME, + ArrayPopBack => array_pop_back::NAME, + ArrayPosition => array_position::NAME, + ArrayPositions => array_positions::NAME, + ArrayPrepend => array_prepend::NAME, + ArrayRepeat => array_repeat::NAME, + ArrayRemove => array_remove::NAME, + ArrayRemoveN => array_remove_n::NAME, + ArrayRemoveAll => array_remove_all::NAME, + ArrayReplace => array_replace::NAME, + ArrayReplaceN => array_replace_n::NAME, + ArrayReplaceAll => array_replace_all::NAME, + ArraySlice => array_slice::NAME, + ArrayToString => array_to_string::NAME, + Cardinality => cardinality::NAME, + MakeArray => make_array::NAME, + Struct => struct_::NAME, + } + } + fn sql_example(&self) -> Option { + use BuiltinScalarFunction::*; + Some( + match self { + Abs => abs::EXAMPLE, + Acos => acos::EXAMPLE, + Acosh => acosh::EXAMPLE, + Asin => asin::EXAMPLE, + Asinh => asinh::EXAMPLE, + Atan => atan::EXAMPLE, + Atanh => atanh::EXAMPLE, + Atan2 => atan2::EXAMPLE, + Cbrt => cbrt::EXAMPLE, + Ceil => ceil::EXAMPLE, + Cos => cos::EXAMPLE, + Cot => cot::EXAMPLE, + Cosh => cosh::EXAMPLE, + Degrees => degrees::EXAMPLE, + Exp => exp::EXAMPLE, + Factorial => factorial::EXAMPLE, + Floor => floor::EXAMPLE, + Gcd => gcd::EXAMPLE, + Isnan => isnan::EXAMPLE, + Iszero => iszero::EXAMPLE, + Lcm => lcm::EXAMPLE, + Ln => ln::EXAMPLE, + Log => log::EXAMPLE, + Log10 => log10::EXAMPLE, + Log2 => log2::EXAMPLE, + Nanvl => nanvl::EXAMPLE, + Pi => pi::EXAMPLE, + Power => pow::EXAMPLE, + Radians => radians::EXAMPLE, + Random => random::EXAMPLE, + Round => round::EXAMPLE, + Signum => signum::EXAMPLE, + Sin => sin::EXAMPLE, + Sinh => sinh::EXAMPLE, + Sqrt => sqrt::EXAMPLE, + Tan => tan::EXAMPLE, + Tanh => tanh::EXAMPLE, + Trunc => trunc::EXAMPLE, + Coalesce => coalesce::EXAMPLE, + NullIf => nullif::EXAMPLE, + Ascii => ascii::EXAMPLE, + BitLength => bit_length::EXAMPLE, + Btrim => btrim::EXAMPLE, + CharacterLength => character_length::EXAMPLE, + Concat => concat::EXAMPLE, + ConcatWithSeparator => concat_ws::EXAMPLE, + Chr => chr::EXAMPLE, + InitCap => initcap::EXAMPLE, + Left => left::EXAMPLE, + Lower => lower::EXAMPLE, + Lpad => lpad::EXAMPLE, + Ltrim => ltrim::EXAMPLE, + OctetLength => octet_length::EXAMPLE, + Repeat => repeat::EXAMPLE, + Replace => replace::EXAMPLE, + Reverse => reverse::EXAMPLE, + Right => right::EXAMPLE, + Rpad => rpad::EXAMPLE, + Rtrim => rtrim::EXAMPLE, + SplitPart => split_part::EXAMPLE, + StringToArray => string_to_array::EXAMPLE, + StartsWith => starts_with::EXAMPLE, + Strpos => strpos::EXAMPLE, + Substr => substr::EXAMPLE, + ToHex => to_hex::EXAMPLE, + Translate => translate::EXAMPLE, + Trim => trim::EXAMPLE, + Upper => upper::EXAMPLE, + Uuid => uuid::EXAMPLE, + RegexpMatch => regexp_match::EXAMPLE, + RegexpReplace => regexp_replace::EXAMPLE, + Now => now::EXAMPLE, + CurrentDate => current_date::EXAMPLE, + CurrentTime => current_time::EXAMPLE, + DateBin => date_bin::EXAMPLE, + DateTrunc => date_trunc::EXAMPLE, + DatePart => date_part::EXAMPLE, + ToTimestamp => to_timestamp::EXAMPLE, + ToTimestampMillis => to_timestamp_millis::EXAMPLE, + ToTimestampMicros => to_timestamp_micros::EXAMPLE, + ToTimestampSeconds => to_timestamp_seconds::EXAMPLE, + FromUnixtime => from_unixtime::EXAMPLE, + Digest => digest::EXAMPLE, + MD5 => md5::EXAMPLE, + SHA224 => sha224::EXAMPLE, + SHA256 => sha256::EXAMPLE, + SHA384 => sha384::EXAMPLE, + SHA512 => sha512::EXAMPLE, + Encode => encode::EXAMPLE, + Decode => decode::EXAMPLE, + ArrowTypeof => arrow_typeof::EXAMPLE, + ArrayAppend => array_append::EXAMPLE, + ArrayConcat => array_concat::EXAMPLE, + ArrayDims => array_dims::EXAMPLE, + ArrayEmpty => empty::EXAMPLE, + ArrayElement => array_element::EXAMPLE, + Flatten => flatten::EXAMPLE, + ArrayHasAll => array_has_all::EXAMPLE, + ArrayHasAny => array_has_any::EXAMPLE, + ArrayHas => array_contains::EXAMPLE, + ArrayLength => array_length::EXAMPLE, + ArrayNdims => array_ndims::EXAMPLE, + ArrayPopBack => array_pop_back::EXAMPLE, + ArrayPosition => array_position::EXAMPLE, + ArrayPositions => array_positions::EXAMPLE, + ArrayPrepend => array_prepend::EXAMPLE, + ArrayRepeat => array_repeat::EXAMPLE, + ArrayRemove => array_remove::EXAMPLE, + ArrayRemoveN => array_remove_n::EXAMPLE, + ArrayRemoveAll => array_remove_all::EXAMPLE, + ArrayReplace => array_replace::EXAMPLE, + ArrayReplaceN => array_replace_n::EXAMPLE, + ArrayReplaceAll => array_replace_all::EXAMPLE, + ArraySlice => array_slice::EXAMPLE, + ArrayToString => array_to_string::EXAMPLE, + Cardinality => cardinality::EXAMPLE, + MakeArray => make_array::EXAMPLE, + Struct => struct_::EXAMPLE, + } + .to_string(), + ) + } + fn description(&self) -> Option { + use BuiltinScalarFunction::*; + Some( + match self { + Abs => abs::DESCRIPTION, + Acos => acos::DESCRIPTION, + Acosh => acosh::DESCRIPTION, + Asin => asin::DESCRIPTION, + Asinh => asinh::DESCRIPTION, + Atan => atan::DESCRIPTION, + Atanh => atanh::DESCRIPTION, + Atan2 => atan2::DESCRIPTION, + Cbrt => cbrt::DESCRIPTION, + Ceil => ceil::DESCRIPTION, + Cos => cos::DESCRIPTION, + Cot => cot::DESCRIPTION, + Cosh => cosh::DESCRIPTION, + Degrees => degrees::DESCRIPTION, + Exp => exp::DESCRIPTION, + Factorial => factorial::DESCRIPTION, + Floor => floor::DESCRIPTION, + Gcd => gcd::DESCRIPTION, + Isnan => isnan::DESCRIPTION, + Iszero => iszero::DESCRIPTION, + Lcm => lcm::DESCRIPTION, + Ln => ln::DESCRIPTION, + Log => log::DESCRIPTION, + Log10 => log10::DESCRIPTION, + Log2 => log2::DESCRIPTION, + Nanvl => nanvl::DESCRIPTION, + Pi => pi::DESCRIPTION, + Power => pow::DESCRIPTION, + Radians => radians::DESCRIPTION, + Random => random::DESCRIPTION, + Round => round::DESCRIPTION, + Signum => signum::DESCRIPTION, + Sin => sin::DESCRIPTION, + Sinh => sinh::DESCRIPTION, + Sqrt => sqrt::DESCRIPTION, + Tan => tan::DESCRIPTION, + Tanh => tanh::DESCRIPTION, + Trunc => trunc::DESCRIPTION, + Coalesce => coalesce::DESCRIPTION, + NullIf => nullif::DESCRIPTION, + Ascii => ascii::DESCRIPTION, + BitLength => bit_length::DESCRIPTION, + Btrim => btrim::DESCRIPTION, + CharacterLength => character_length::DESCRIPTION, + Concat => concat::DESCRIPTION, + ConcatWithSeparator => concat_ws::DESCRIPTION, + Chr => chr::DESCRIPTION, + InitCap => initcap::DESCRIPTION, + Left => left::DESCRIPTION, + Lower => lower::DESCRIPTION, + Lpad => lpad::DESCRIPTION, + Ltrim => ltrim::DESCRIPTION, + OctetLength => octet_length::DESCRIPTION, + Repeat => repeat::DESCRIPTION, + Replace => replace::DESCRIPTION, + Reverse => reverse::DESCRIPTION, + Right => right::DESCRIPTION, + Rpad => rpad::DESCRIPTION, + Rtrim => rtrim::DESCRIPTION, + SplitPart => split_part::DESCRIPTION, + StringToArray => string_to_array::DESCRIPTION, + StartsWith => starts_with::DESCRIPTION, + Strpos => strpos::DESCRIPTION, + Substr => substr::DESCRIPTION, + ToHex => to_hex::DESCRIPTION, + Translate => translate::DESCRIPTION, + Trim => trim::DESCRIPTION, + Upper => upper::DESCRIPTION, + Uuid => uuid::DESCRIPTION, + RegexpMatch => regexp_match::DESCRIPTION, + RegexpReplace => regexp_replace::DESCRIPTION, + Now => now::DESCRIPTION, + CurrentDate => current_date::DESCRIPTION, + CurrentTime => current_time::DESCRIPTION, + DateBin => date_bin::DESCRIPTION, + DateTrunc => date_trunc::DESCRIPTION, + DatePart => date_part::DESCRIPTION, + ToTimestamp => to_timestamp::DESCRIPTION, + ToTimestampMillis => to_timestamp_millis::DESCRIPTION, + ToTimestampMicros => to_timestamp_micros::DESCRIPTION, + ToTimestampSeconds => to_timestamp_seconds::DESCRIPTION, + FromUnixtime => from_unixtime::DESCRIPTION, + Digest => digest::DESCRIPTION, + MD5 => md5::DESCRIPTION, + SHA224 => sha224::DESCRIPTION, + SHA256 => sha256::DESCRIPTION, + SHA384 => sha384::DESCRIPTION, + SHA512 => sha512::DESCRIPTION, + Encode => encode::DESCRIPTION, + Decode => decode::DESCRIPTION, + ArrowTypeof => arrow_typeof::DESCRIPTION, + ArrayAppend => array_append::DESCRIPTION, + ArrayConcat => array_concat::DESCRIPTION, + ArrayDims => array_dims::DESCRIPTION, + ArrayEmpty => empty::DESCRIPTION, + ArrayElement => array_element::DESCRIPTION, + Flatten => flatten::DESCRIPTION, + ArrayHasAll => array_has_all::DESCRIPTION, + ArrayHasAny => array_has_any::DESCRIPTION, + ArrayHas => array_contains::DESCRIPTION, + ArrayLength => array_length::DESCRIPTION, + ArrayNdims => array_ndims::DESCRIPTION, + ArrayPopBack => array_pop_back::DESCRIPTION, + ArrayPosition => array_position::DESCRIPTION, + ArrayPositions => array_positions::DESCRIPTION, + ArrayPrepend => array_prepend::DESCRIPTION, + ArrayRepeat => array_repeat::DESCRIPTION, + ArrayRemove => array_remove::DESCRIPTION, + ArrayRemoveN => array_remove_n::DESCRIPTION, + ArrayRemoveAll => array_remove_all::DESCRIPTION, + ArrayReplace => array_replace::DESCRIPTION, + ArrayReplaceN => array_replace_n::DESCRIPTION, + ArrayReplaceAll => array_replace_all::DESCRIPTION, + ArraySlice => array_slice::DESCRIPTION, + ArrayToString => array_to_string::DESCRIPTION, + Cardinality => cardinality::DESCRIPTION, + MakeArray => make_array::DESCRIPTION, + Struct => struct_::DESCRIPTION, + } + .to_string(), + ) + } +} diff --git a/crates/sqlbuiltins/src/functions/bigquery.rs b/crates/sqlbuiltins/src/functions/table/bigquery.rs similarity index 78% rename from crates/sqlbuiltins/src/functions/bigquery.rs rename to crates/sqlbuiltins/src/functions/table/bigquery.rs index b109f0fa0..80178185d 100644 --- a/crates/sqlbuiltins/src/functions/bigquery.rs +++ b/crates/sqlbuiltins/src/functions/table/bigquery.rs @@ -7,21 +7,21 @@ use datafusion::arrow::datatypes::DataType; use datafusion::datasource::TableProvider; use datafusion::logical_expr::{Signature, Volatility}; use datafusion_ext::errors::{ExtensionError, Result}; -use datafusion_ext::functions::{FuncParamValue, TableFunc, TableFuncContextProvider}; +use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; use datasources::bigquery::{BigQueryAccessor, BigQueryTableAccess}; -use protogen::metastore::types::catalog::RuntimePreference; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; + +use crate::builtins::{ConstBuiltinFunction, TableFunc}; #[derive(Debug, Clone, Copy)] pub struct ReadBigQuery; -#[async_trait] -impl TableFunc for ReadBigQuery { - fn runtime_preference(&self) -> RuntimePreference { - RuntimePreference::Remote - } - fn name(&self) -> &str { - "read_bigquery" - } +impl ConstBuiltinFunction for ReadBigQuery { + const NAME: &'static str = "read_bigquery"; + const DESCRIPTION: &'static str = "Reads a BigQuery table"; + const EXAMPLE: &'static str = + "SELECT * FROM read_bigquery('service_account', 'project_id', 'dataset_id', 'table_id')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; fn signature(&self) -> Option { Some(Signature::uniform( 4, @@ -29,6 +29,13 @@ impl TableFunc for ReadBigQuery { Volatility::Stable, )) } +} + +#[async_trait] +impl TableFunc for ReadBigQuery { + fn runtime_preference(&self) -> RuntimePreference { + RuntimePreference::Remote + } async fn create_provider( &self, _: &dyn TableFuncContextProvider, diff --git a/crates/sqlbuiltins/src/functions/delta.rs b/crates/sqlbuiltins/src/functions/table/delta.rs similarity index 70% rename from crates/sqlbuiltins/src/functions/delta.rs rename to crates/sqlbuiltins/src/functions/table/delta.rs index 7c4ddae0a..a4d0b1587 100644 --- a/crates/sqlbuiltins/src/functions/delta.rs +++ b/crates/sqlbuiltins/src/functions/table/delta.rs @@ -1,13 +1,14 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::functions::table_location_and_opts; +use super::table_location_and_opts; +use crate::builtins::{ConstBuiltinFunction, TableFunc}; use async_trait::async_trait; use datafusion::datasource::TableProvider; use datafusion_ext::errors::{ExtensionError, Result}; -use datafusion_ext::functions::{FuncParamValue, TableFunc, TableFuncContextProvider}; +use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; use datasources::lake::delta::access::load_table_direct; -use protogen::metastore::types::catalog::RuntimePreference; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; /// Function for scanning delta tables. /// @@ -20,17 +21,19 @@ use protogen::metastore::types::catalog::RuntimePreference; #[derive(Debug, Clone, Copy)] pub struct DeltaScan; +impl ConstBuiltinFunction for DeltaScan { + const NAME: &'static str = "delta_scan"; + const DESCRIPTION: &'static str = "Scans a delta table"; + const EXAMPLE: &'static str = "SELECT * FROM delta_scan('file:///path/to/table')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; +} + #[async_trait] impl TableFunc for DeltaScan { fn runtime_preference(&self) -> RuntimePreference { // TODO: Detect runtime. RuntimePreference::Remote } - - fn name(&self) -> &str { - "delta_scan" - } - async fn create_provider( &self, ctx: &dyn TableFuncContextProvider, diff --git a/crates/sqlbuiltins/src/functions/excel.rs b/crates/sqlbuiltins/src/functions/table/excel.rs similarity index 75% rename from crates/sqlbuiltins/src/functions/excel.rs rename to crates/sqlbuiltins/src/functions/table/excel.rs index 4b609baff..4f72f42b6 100644 --- a/crates/sqlbuiltins/src/functions/excel.rs +++ b/crates/sqlbuiltins/src/functions/table/excel.rs @@ -4,27 +4,33 @@ use std::sync::Arc; use async_trait::async_trait; use datafusion::datasource::TableProvider; use datafusion_ext::errors::{ExtensionError, Result}; -use datafusion_ext::functions::{FuncParamValue, TableFunc, TableFuncContextProvider}; +use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; use datasources::common::url::DatasourceUrl; use datasources::excel::read_excel_impl; use ioutil::resolve_path; -use protogen::metastore::types::catalog::RuntimePreference; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; + +use crate::builtins::{ConstBuiltinFunction, TableFunc}; use super::table_location_and_opts; #[derive(Debug, Clone, Copy)] pub struct ExcelScan; +impl ConstBuiltinFunction for ExcelScan { + const NAME: &'static str = "read_excel"; + const DESCRIPTION: &'static str = "Reads an Excel file from the local filesystem"; + const EXAMPLE: &'static str = + "SELECT * FROM read_excel('file:///path/to/file.xlsx', sheet_name => 'Sheet1')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; +} + #[async_trait] impl TableFunc for ExcelScan { fn runtime_preference(&self) -> RuntimePreference { RuntimePreference::Local } - fn name(&self) -> &str { - "read_excel" - } - async fn create_provider( &self, ctx: &dyn TableFuncContextProvider, diff --git a/crates/sqlbuiltins/src/functions/generate_series.rs b/crates/sqlbuiltins/src/functions/table/generate_series.rs similarity index 94% rename from crates/sqlbuiltins/src/functions/generate_series.rs rename to crates/sqlbuiltins/src/functions/table/generate_series.rs index e2f538d24..874588db2 100644 --- a/crates/sqlbuiltins/src/functions/generate_series.rs +++ b/crates/sqlbuiltins/src/functions/table/generate_series.rs @@ -17,17 +17,33 @@ use datafusion::logical_expr::{Signature, TypeSignature, Volatility}; use datafusion::physical_plan::streaming::PartitionStream; use datafusion::physical_plan::{RecordBatchStream, SendableRecordBatchStream}; use datafusion_ext::errors::{ExtensionError, Result}; -use datafusion_ext::functions::{ - FromFuncParamValue, FuncParamValue, TableFunc, TableFuncContextProvider, -}; +use datafusion_ext::functions::{FromFuncParamValue, FuncParamValue, TableFuncContextProvider}; use decimal::Decimal128; use futures::Stream; use num_traits::Zero; -use protogen::metastore::types::catalog::RuntimePreference; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; + +use crate::builtins::{ConstBuiltinFunction, TableFunc}; #[derive(Debug, Clone, Copy)] pub struct GenerateSeries; +impl ConstBuiltinFunction for GenerateSeries { + const NAME: &'static str = "generate_series"; + const DESCRIPTION: &'static str = "Generate a series of values"; + const EXAMPLE: &'static str = "SELECT * FROM generate_series(1, 10, 2)"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; + fn signature(&self) -> Option { + Some(Signature::new( + TypeSignature::OneOf(vec![ + TypeSignature::Uniform(2, vec![DataType::Int64, DataType::Decimal128(38, 0)]), + TypeSignature::Uniform(3, vec![DataType::Int64, DataType::Decimal128(38, 0)]), + ]), + Volatility::Immutable, + )) + } +} + #[async_trait] impl TableFunc for GenerateSeries { fn runtime_preference(&self) -> RuntimePreference { @@ -43,9 +59,6 @@ impl TableFunc for GenerateSeries { other => Ok(other), } } - fn name(&self) -> &str { - "generate_series" - } async fn create_provider( &self, @@ -124,16 +137,6 @@ impl TableFunc for GenerateSeries { _ => return Err(ExtensionError::InvalidNumArgs), } } - - fn signature(&self) -> Option { - Some(Signature::new( - TypeSignature::OneOf(vec![ - TypeSignature::Uniform(2, vec![DataType::Int64, DataType::Decimal128(38, 0)]), - TypeSignature::Uniform(3, vec![DataType::Int64, DataType::Decimal128(38, 0)]), - ]), - Volatility::Immutable, - )) - } } fn create_straming_table( diff --git a/crates/sqlbuiltins/src/functions/table/iceberg.rs b/crates/sqlbuiltins/src/functions/table/iceberg.rs new file mode 100644 index 000000000..0d73848d8 --- /dev/null +++ b/crates/sqlbuiltins/src/functions/table/iceberg.rs @@ -0,0 +1,29 @@ +mod data_files; +mod scan; +mod snapshots; + +use std::collections::HashMap; +use std::sync::Arc; + +use super::table_location_and_opts; +use crate::builtins::TableFunc; +use async_trait::async_trait; +pub(crate) use data_files::*; +use datafusion::arrow::array::{Int32Builder, Int64Builder, StringBuilder, UInt64Builder}; +use datafusion::arrow::datatypes::{DataType, Field, Schema}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::datasource::{MemTable, TableProvider}; +use datafusion_ext::errors::{ExtensionError, Result}; +use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; +use datasources::lake::iceberg::table::IcebergTable; +use datasources::lake::storage_options_into_object_store; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; +pub(crate) use scan::*; +pub(crate) use snapshots::*; + +fn box_err(err: E) -> ExtensionError +where + E: std::error::Error + Send + Sync + 'static, +{ + ExtensionError::Access(Box::new(err)) +} diff --git a/crates/sqlbuiltins/src/functions/table/iceberg/data_files.rs b/crates/sqlbuiltins/src/functions/table/iceberg/data_files.rs new file mode 100644 index 000000000..c9533a434 --- /dev/null +++ b/crates/sqlbuiltins/src/functions/table/iceberg/data_files.rs @@ -0,0 +1,94 @@ +use crate::builtins::ConstBuiltinFunction; + +use super::*; + +/// Scan data file metadata for the current snapshot of an iceberg table. Will +/// not attempt to read data files. +#[derive(Debug, Clone, Copy)] +pub struct IcebergDataFiles; + +impl ConstBuiltinFunction for IcebergDataFiles { + const NAME: &'static str = "iceberg_data_files"; + const DESCRIPTION: &'static str = "Scans data file metadata for an iceberg table"; + const EXAMPLE: &'static str = "SELECT * FROM iceberg_data_files('file:///path/to/table')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; +} + +#[async_trait] +impl TableFunc for IcebergDataFiles { + fn runtime_preference(&self) -> RuntimePreference { + RuntimePreference::Remote + } + + async fn create_provider( + &self, + ctx: &dyn TableFuncContextProvider, + args: Vec, + mut opts: HashMap, + ) -> Result> { + let (loc, opts) = table_location_and_opts(ctx, args, &mut opts)?; + + let store = storage_options_into_object_store(&loc, &opts).map_err(box_err)?; + let table = IcebergTable::open(loc, store).await.map_err(box_err)?; + + let manifests = table.read_manifests().await.map_err(box_err)?; + + let schema = Arc::new(Schema::new(vec![ + Field::new("manifest_index", DataType::UInt64, false), + Field::new("manifest_content", DataType::Utf8, false), + Field::new("snapshot_id", DataType::Int64, true), + Field::new("sequence_number", DataType::Int64, true), + Field::new("file_sequence_number", DataType::Int64, true), + Field::new("file_path", DataType::Utf8, false), + Field::new("file_format", DataType::Utf8, false), + Field::new("record_count", DataType::Int64, false), + Field::new("file_size_bytes", DataType::Int64, false), + ])); + + let mut manifest_index = UInt64Builder::new(); + let mut manifest_content = StringBuilder::new(); + let mut snapshot_id = Int64Builder::new(); + let mut sequence_number = Int64Builder::new(); + let mut file_sequence_number = Int64Builder::new(); + let mut file_path = StringBuilder::new(); + let mut file_format = StringBuilder::new(); + let mut record_count = Int64Builder::new(); + let mut file_size_bytes = Int64Builder::new(); + + for (idx, manifest) in manifests.into_iter().enumerate() { + for entry in manifest.entries { + // Manifest metadata + manifest_index.append_value(idx as u64); + manifest_content.append_value(manifest.metadata.content.to_string()); + + // Entry data + snapshot_id.append_option(entry.snapshot_id); + sequence_number.append_option(entry.sequence_number); + file_sequence_number.append_option(entry.file_sequence_number); + file_path.append_value(&entry.data_file.file_path); + file_format.append_value(&entry.data_file.file_format); + record_count.append_value(entry.data_file.record_count); + file_size_bytes.append_value(entry.data_file.file_size_in_bytes); + } + } + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(manifest_index.finish()), + Arc::new(manifest_content.finish()), + Arc::new(snapshot_id.finish()), + Arc::new(sequence_number.finish()), + Arc::new(file_sequence_number.finish()), + Arc::new(file_path.finish()), + Arc::new(file_format.finish()), + Arc::new(record_count.finish()), + Arc::new(file_size_bytes.finish()), + ], + )?; + + Ok(Arc::new( + MemTable::try_new(schema, vec![vec![batch]]).unwrap(), + )) + } +} diff --git a/crates/sqlbuiltins/src/functions/table/iceberg/scan.rs b/crates/sqlbuiltins/src/functions/table/iceberg/scan.rs new file mode 100644 index 000000000..44f98fde1 --- /dev/null +++ b/crates/sqlbuiltins/src/functions/table/iceberg/scan.rs @@ -0,0 +1,40 @@ +use crate::builtins::ConstBuiltinFunction; + +use super::*; + +/// Scan an iceberg table. +#[derive(Debug, Clone, Copy)] +pub struct IcebergScan; + +impl ConstBuiltinFunction for IcebergScan { + const NAME: &'static str = "iceberg_scan"; + const DESCRIPTION: &'static str = "Scans an iceberg table"; + const EXAMPLE: &'static str = "SELECT * FROM iceberg_scan('file:///path/to/table')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; +} + +#[async_trait] +impl TableFunc for IcebergScan { + fn runtime_preference(&self) -> RuntimePreference { + // TODO: Detect runtime + RuntimePreference::Remote + } + + async fn create_provider( + &self, + ctx: &dyn TableFuncContextProvider, + args: Vec, + mut opts: HashMap, + ) -> Result> { + // TODO: Reduce duplication + let (loc, opts) = table_location_and_opts(ctx, args, &mut opts)?; + + let store = storage_options_into_object_store(&loc, &opts).map_err(box_err)?; + let table = IcebergTable::open(loc.clone(), store) + .await + .map_err(box_err)?; + let reader = table.table_reader().await.map_err(box_err)?; + + Ok(reader) + } +} diff --git a/crates/sqlbuiltins/src/functions/table/iceberg/snapshots.rs b/crates/sqlbuiltins/src/functions/table/iceberg/snapshots.rs new file mode 100644 index 000000000..4f8ae77d1 --- /dev/null +++ b/crates/sqlbuiltins/src/functions/table/iceberg/snapshots.rs @@ -0,0 +1,69 @@ +use crate::builtins::ConstBuiltinFunction; + +use super::*; + +/// Scan snapshot information for an iceberg tables. Will not attempt to read +/// data files. +#[derive(Debug, Clone, Copy)] +pub struct IcebergSnapshots; + +impl ConstBuiltinFunction for IcebergSnapshots { + const NAME: &'static str = "iceberg_snapshots"; + const DESCRIPTION: &'static str = "Scans snapshot information for an iceberg table"; + const EXAMPLE: &'static str = "SELECT * FROM iceberg_snapshots('file:///path/to/table')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; +} + +#[async_trait] +impl TableFunc for IcebergSnapshots { + fn runtime_preference(&self) -> RuntimePreference { + RuntimePreference::Remote + } + + async fn create_provider( + &self, + ctx: &dyn TableFuncContextProvider, + args: Vec, + mut opts: HashMap, + ) -> Result> { + let (loc, opts) = table_location_and_opts(ctx, args, &mut opts)?; + + let store = storage_options_into_object_store(&loc, &opts).map_err(box_err)?; + let table = IcebergTable::open(loc, store).await.map_err(box_err)?; + + let snapshots = &table.metadata().snapshots; + + let schema = Arc::new(Schema::new(vec![ + Field::new("snapshot_id", DataType::Int64, false), + Field::new("timestamp_ms", DataType::Int64, false), + Field::new("manifest_list", DataType::Utf8, false), + Field::new("schema_id", DataType::Int32, false), + ])); + + let mut snapshot_id = Int64Builder::new(); + let mut timestamp_ms = Int64Builder::new(); + let mut manifest_list = StringBuilder::new(); + let mut schema_id = Int32Builder::new(); + + for snapshot in snapshots { + snapshot_id.append_value(snapshot.snapshot_id); + timestamp_ms.append_value(snapshot.timestamp_ms); + manifest_list.append_value(&snapshot.manifest_list); + schema_id.append_value(snapshot.schema_id); + } + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(snapshot_id.finish()), + Arc::new(timestamp_ms.finish()), + Arc::new(manifest_list.finish()), + Arc::new(schema_id.finish()), + ], + )?; + + Ok(Arc::new( + MemTable::try_new(schema, vec![vec![batch]]).unwrap(), + )) + } +} diff --git a/crates/sqlbuiltins/src/functions/lance.rs b/crates/sqlbuiltins/src/functions/table/lance.rs similarity index 69% rename from crates/sqlbuiltins/src/functions/lance.rs rename to crates/sqlbuiltins/src/functions/table/lance.rs index 4240556d4..1761a5c66 100644 --- a/crates/sqlbuiltins/src/functions/lance.rs +++ b/crates/sqlbuiltins/src/functions/table/lance.rs @@ -1,13 +1,14 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::functions::table_location_and_opts; +use super::table_location_and_opts; +use crate::builtins::{ConstBuiltinFunction, TableFunc}; use async_trait::async_trait; use datafusion::datasource::TableProvider; use datafusion_ext::errors::{ExtensionError, Result}; -use datafusion_ext::functions::{FuncParamValue, TableFunc, TableFuncContextProvider}; +use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; use datasources::lance::scan_lance_table; -use protogen::metastore::types::catalog::RuntimePreference; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; /// Function for scanning delta tables. /// @@ -15,11 +16,16 @@ use protogen::metastore::types::catalog::RuntimePreference; /// initializing object storage happens within the lance lib. We're /// responsible for providing credentials, then it's responsible for creating /// the store. -/// -/// See #[derive(Debug, Clone, Copy)] pub struct LanceScan; +impl ConstBuiltinFunction for LanceScan { + const NAME: &'static str = "lance_scan"; + const DESCRIPTION: &'static str = "Scans a Lance table"; + const EXAMPLE: &'static str = "SELECT * FROM lance_scan('file:///path/to/table.lance')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; +} + #[async_trait] impl TableFunc for LanceScan { fn runtime_preference(&self) -> RuntimePreference { @@ -27,10 +33,6 @@ impl TableFunc for LanceScan { RuntimePreference::Remote } - fn name(&self) -> &str { - "lance_scan" - } - async fn create_provider( &self, ctx: &dyn TableFuncContextProvider, diff --git a/crates/sqlbuiltins/src/functions/table/mod.rs b/crates/sqlbuiltins/src/functions/table/mod.rs new file mode 100644 index 000000000..af7113902 --- /dev/null +++ b/crates/sqlbuiltins/src/functions/table/mod.rs @@ -0,0 +1,186 @@ +//! Builtin table returning functions. +mod bigquery; +mod delta; +mod excel; +mod generate_series; +mod iceberg; +mod lance; +mod mongo; +mod mysql; +mod object_store; +mod postgres; +mod snowflake; +mod virtual_listing; + +use ::object_store::aws::AmazonS3ConfigKey; +use ::object_store::azure::AzureConfigKey; +use ::object_store::gcp::GoogleConfigKey; +use std::collections::HashMap; +use std::sync::Arc; + +use datafusion_ext::errors::{ExtensionError, Result}; +use datafusion_ext::functions::{FuncParamValue, IdentValue, TableFuncContextProvider}; +use datasources::common::url::{DatasourceUrl, DatasourceUrlType}; +use protogen::metastore::types::options::{CredentialsOptions, StorageOptions}; + +use self::bigquery::ReadBigQuery; +use self::delta::DeltaScan; +use self::excel::ExcelScan; +use self::generate_series::GenerateSeries; +use self::iceberg::{IcebergDataFiles, IcebergScan, IcebergSnapshots}; +use self::lance::LanceScan; +use self::mongo::ReadMongoDb; +use self::mysql::ReadMysql; +use self::object_store::{CSV_SCAN, JSON_SCAN, PARQUET_SCAN, READ_CSV, READ_JSON, READ_PARQUET}; +use self::postgres::ReadPostgres; +use self::snowflake::ReadSnowflake; +use self::virtual_listing::{ListColumns, ListSchemas, ListTables}; +use crate::builtins::TableFunc; + +/// All builtin table functions. +pub struct BuiltinTableFuncs { + funcs: HashMap>, +} + +impl BuiltinTableFuncs { + pub fn new() -> BuiltinTableFuncs { + let funcs: Vec> = vec![ + // Databases/warehouses + Arc::new(ReadPostgres), + Arc::new(ReadBigQuery), + Arc::new(ReadMongoDb), + Arc::new(ReadMysql), + Arc::new(ReadSnowflake), + // Object store + Arc::new(PARQUET_SCAN), + Arc::new(READ_PARQUET), + Arc::new(CSV_SCAN), + Arc::new(READ_CSV), + Arc::new(JSON_SCAN), + Arc::new(READ_JSON), + // Data lakes + Arc::new(DeltaScan), + Arc::new(IcebergScan), + Arc::new(IcebergSnapshots), + Arc::new(IcebergDataFiles), + Arc::new(ExcelScan), + Arc::new(LanceScan), + // Listing + Arc::new(ListSchemas), + Arc::new(ListTables), + Arc::new(ListColumns), + // Series generating + Arc::new(GenerateSeries), + ]; + let funcs: HashMap> = funcs + .into_iter() + .map(|f| (f.name().to_string(), f)) + .collect(); + + BuiltinTableFuncs { funcs } + } + + pub fn find_function(&self, name: &str) -> Option> { + self.funcs.get(name).cloned() + } + + pub fn iter_funcs(&self) -> impl Iterator> { + self.funcs.values() + } +} + +impl Default for BuiltinTableFuncs { + fn default() -> Self { + Self::new() + } +} + +// Parse the data lake table location and object store options from the provided function arguments +fn table_location_and_opts( + ctx: &dyn TableFuncContextProvider, + args: Vec, + opts: &mut HashMap, +) -> Result<(DatasourceUrl, StorageOptions)> { + let mut args = args.into_iter(); + let first = args.next().unwrap(); + let url: String = first.param_into()?; + let source_url = + DatasourceUrl::try_new(url).map_err(|e| ExtensionError::Access(Box::new(e)))?; + + let mut maybe_cred_opts = None; + // Check if a credentials object has been supplied + if let Some(func_param) = args.next() { + let creds: IdentValue = func_param.param_into()?; + maybe_cred_opts = Some( + ctx.get_session_catalog() + .resolve_credentials(creds.as_str()) + .cloned() + .ok_or(ExtensionError::String(format!( + "missing credentials object: {creds}" + )))? + .options, + ); + } + + let mut storage_options = StorageOptions::default(); + match (source_url.datasource_url_type(), maybe_cred_opts) { + (DatasourceUrlType::File, None) => {} // no options fine in this case + (DatasourceUrlType::File, _) => { + return Err(ExtensionError::String( + "Credentials incorrectly provided when accessing local delta table".to_string(), + )) + } + (DatasourceUrlType::Gcs, Some(CredentialsOptions::Gcp(creds))) => { + storage_options.inner.insert( + GoogleConfigKey::ServiceAccountKey.as_ref().to_string(), + creds.service_account_key, + ); + } + (DatasourceUrlType::S3, Some(CredentialsOptions::Aws(creds))) => { + const REGION_KEY: &str = "region"; + let region = opts + .remove(REGION_KEY) + .ok_or(ExtensionError::MissingNamedArgument(REGION_KEY))? + .param_into()?; + + storage_options.inner.insert( + AmazonS3ConfigKey::AccessKeyId.as_ref().to_string(), + creds.access_key_id, + ); + storage_options.inner.insert( + AmazonS3ConfigKey::SecretAccessKey.as_ref().to_string(), + creds.secret_access_key, + ); + storage_options + .inner + .insert(AmazonS3ConfigKey::Region.as_ref().to_string(), region); + } + (DatasourceUrlType::Azure, Some(CredentialsOptions::Azure(creds))) => { + storage_options.inner.insert( + AzureConfigKey::AccountName.as_ref().to_string(), + creds.account_name, + ); + storage_options.inner.insert( + AzureConfigKey::AccessKey.as_ref().to_string(), + creds.access_key, + ); + } + (DatasourceUrlType::Http, _) => { + return Err(ExtensionError::String( + "Accessing delta tables over http not supported".to_string(), + )) + } + (datasource, creds) => { + return Err(ExtensionError::String(format!( + "Invalid credentials for {datasource}, got {} creds", + if let Some(o) = creds { + o.as_str() + } else { + "no" + } + ))) + } + }; + + Ok((source_url, storage_options)) +} diff --git a/crates/sqlbuiltins/src/functions/mongo.rs b/crates/sqlbuiltins/src/functions/table/mongo.rs similarity index 76% rename from crates/sqlbuiltins/src/functions/mongo.rs rename to crates/sqlbuiltins/src/functions/table/mongo.rs index ecf9270ba..d6110cb28 100644 --- a/crates/sqlbuiltins/src/functions/mongo.rs +++ b/crates/sqlbuiltins/src/functions/table/mongo.rs @@ -6,21 +6,22 @@ use datafusion::arrow::datatypes::DataType; use datafusion::datasource::TableProvider; use datafusion::logical_expr::{Signature, Volatility}; use datafusion_ext::errors::{ExtensionError, Result}; -use datafusion_ext::functions::{FuncParamValue, TableFunc, TableFuncContextProvider}; +use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; use datasources::mongodb::{MongoAccessor, MongoTableAccessInfo}; -use protogen::metastore::types::catalog::RuntimePreference; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; + +use crate::builtins::{ConstBuiltinFunction, TableFunc}; #[derive(Debug, Clone, Copy)] pub struct ReadMongoDb; -#[async_trait] -impl TableFunc for ReadMongoDb { - fn runtime_preference(&self) -> RuntimePreference { - RuntimePreference::Remote - } - fn name(&self) -> &str { - "read_mongodb" - } +impl ConstBuiltinFunction for ReadMongoDb { + const NAME: &'static str = "read_mongodb"; + const DESCRIPTION: &'static str = "Reads a MongoDB table"; + const EXAMPLE: &'static str = + "SELECT * FROM read_mongodb('mongodb://localhost:27017', 'database', 'collection')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; + fn signature(&self) -> Option { Some(Signature::uniform( 3, @@ -28,6 +29,14 @@ impl TableFunc for ReadMongoDb { Volatility::Stable, )) } +} + +#[async_trait] +impl TableFunc for ReadMongoDb { + fn runtime_preference(&self) -> RuntimePreference { + RuntimePreference::Remote + } + async fn create_provider( &self, _: &dyn TableFuncContextProvider, diff --git a/crates/sqlbuiltins/src/functions/mysql.rs b/crates/sqlbuiltins/src/functions/table/mysql.rs similarity index 77% rename from crates/sqlbuiltins/src/functions/mysql.rs rename to crates/sqlbuiltins/src/functions/table/mysql.rs index f34dbd2b3..e9fb94f76 100644 --- a/crates/sqlbuiltins/src/functions/mysql.rs +++ b/crates/sqlbuiltins/src/functions/table/mysql.rs @@ -6,21 +6,21 @@ use datafusion::arrow::datatypes::DataType; use datafusion::datasource::TableProvider; use datafusion::logical_expr::{Signature, Volatility}; use datafusion_ext::errors::{ExtensionError, Result}; -use datafusion_ext::functions::{FuncParamValue, TableFunc, TableFuncContextProvider}; +use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; use datasources::mysql::{MysqlAccessor, MysqlTableAccess}; -use protogen::metastore::types::catalog::RuntimePreference; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; + +use crate::builtins::{ConstBuiltinFunction, TableFunc}; #[derive(Debug, Clone, Copy)] pub struct ReadMysql; -#[async_trait] -impl TableFunc for ReadMysql { - fn runtime_preference(&self) -> RuntimePreference { - RuntimePreference::Remote - } - fn name(&self) -> &str { - "read_mysql" - } +impl ConstBuiltinFunction for ReadMysql { + const NAME: &'static str = "read_mysql"; + const DESCRIPTION: &'static str = "Reads a MySQL table"; + const EXAMPLE: &'static str = + "SELECT * FROM read_mysql('mysql://localhost:3306', 'database', 'table')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; fn signature(&self) -> Option { Some(Signature::uniform( 3, @@ -28,6 +28,14 @@ impl TableFunc for ReadMysql { Volatility::Stable, )) } +} + +#[async_trait] +impl TableFunc for ReadMysql { + fn runtime_preference(&self) -> RuntimePreference { + RuntimePreference::Remote + } + async fn create_provider( &self, _: &dyn TableFuncContextProvider, diff --git a/crates/sqlbuiltins/src/functions/object_store.rs b/crates/sqlbuiltins/src/functions/table/object_store.rs similarity index 94% rename from crates/sqlbuiltins/src/functions/object_store.rs rename to crates/sqlbuiltins/src/functions/table/object_store.rs index 6fa931246..8307ade39 100644 --- a/crates/sqlbuiltins/src/functions/object_store.rs +++ b/crates/sqlbuiltins/src/functions/table/object_store.rs @@ -15,7 +15,7 @@ use datafusion::logical_expr::{Signature, Volatility}; use datafusion::scalar::ScalarValue; use datafusion_ext::errors::{ExtensionError, Result}; use datafusion_ext::functions::{ - FromFuncParamValue, FuncParamValue, IdentValue, TableFunc, TableFuncContextProvider, + FromFuncParamValue, FuncParamValue, IdentValue, TableFuncContextProvider, }; use datasources::common::url::{DatasourceUrl, DatasourceUrlType}; @@ -27,9 +27,11 @@ use datasources::object_store::s3::S3StoreAccess; use datasources::object_store::{MultiSourceTableProvider, ObjStoreAccess}; use futures::TryStreamExt; use object_store::azure::AzureConfigKey; -use protogen::metastore::types::catalog::RuntimePreference; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; use protogen::metastore::types::options::{CredentialsOptions, StorageOptions}; +use crate::builtins::{BuiltinFunction, TableFunc}; + pub const PARQUET_SCAN: ObjScanTableFunc = ObjScanTableFunc(FileType::PARQUET, "parquet_scan"); pub const READ_PARQUET: ObjScanTableFunc = ObjScanTableFunc(FileType::PARQUET, "read_parquet"); @@ -42,10 +44,27 @@ pub const READ_JSON: ObjScanTableFunc = ObjScanTableFunc(FileType::JSON, "read_n #[derive(Debug, Clone)] pub struct ObjScanTableFunc(FileType, &'static str); -#[async_trait] -impl TableFunc for ObjScanTableFunc { - fn runtime_preference(&self) -> RuntimePreference { - RuntimePreference::Unspecified +impl BuiltinFunction for ObjScanTableFunc { + fn name(&self) -> &'static str { + self.1 + } + fn function_type(&self) -> FunctionType { + FunctionType::TableReturning + } + fn sql_example(&self) -> Option { + fn build_example(extension: &str) -> String { + format!( + "SELECT * FROM {ext}_scan('./my_data.{ext}')", + ext = extension + ) + } + Some(build_example(self.0.to_string().as_str())) + } + fn description(&self) -> Option { + Some(format!( + "Returns a table by scanning the given {ext} file(s).", + ext = self.0.to_string().to_lowercase() + )) } fn signature(&self) -> Option { @@ -58,6 +77,14 @@ impl TableFunc for ObjScanTableFunc { Volatility::Stable, )) } +} + +#[async_trait] +impl TableFunc for ObjScanTableFunc { + fn runtime_preference(&self) -> RuntimePreference { + RuntimePreference::Unspecified + } + fn detect_runtime( &self, args: &[FuncParamValue], @@ -88,11 +115,6 @@ impl TableFunc for ObjScanTableFunc { } } - fn name(&self) -> &str { - let Self(_, name) = self; - name - } - async fn create_provider( &self, ctx: &dyn TableFuncContextProvider, diff --git a/crates/sqlbuiltins/src/functions/postgres.rs b/crates/sqlbuiltins/src/functions/table/postgres.rs similarity index 76% rename from crates/sqlbuiltins/src/functions/postgres.rs rename to crates/sqlbuiltins/src/functions/table/postgres.rs index 5ae9700fe..6714f9e2b 100644 --- a/crates/sqlbuiltins/src/functions/postgres.rs +++ b/crates/sqlbuiltins/src/functions/table/postgres.rs @@ -6,21 +6,21 @@ use datafusion::arrow::datatypes::DataType; use datafusion::datasource::TableProvider; use datafusion::logical_expr::{Signature, Volatility}; use datafusion_ext::errors::{ExtensionError, Result}; -use datafusion_ext::functions::{FuncParamValue, TableFunc, TableFuncContextProvider}; +use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; use datasources::postgres::{PostgresAccess, PostgresTableProvider, PostgresTableProviderConfig}; -use protogen::metastore::types::catalog::RuntimePreference; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; + +use crate::builtins::{ConstBuiltinFunction, TableFunc}; #[derive(Debug, Clone, Copy)] pub struct ReadPostgres; -#[async_trait] -impl TableFunc for ReadPostgres { - fn runtime_preference(&self) -> RuntimePreference { - RuntimePreference::Remote - } - fn name(&self) -> &str { - "read_postgres" - } +impl ConstBuiltinFunction for ReadPostgres { + const NAME: &'static str = "read_postgres"; + const DESCRIPTION: &'static str = "Reads a Postgres table"; + const EXAMPLE: &'static str = + "SELECT * FROM read_postgres('postgres://localhost:5432', 'database', 'table')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; fn signature(&self) -> Option { Some(Signature::uniform( 3, @@ -28,6 +28,13 @@ impl TableFunc for ReadPostgres { Volatility::Stable, )) } +} + +#[async_trait] +impl TableFunc for ReadPostgres { + fn runtime_preference(&self) -> RuntimePreference { + RuntimePreference::Remote + } async fn create_provider( &self, _: &dyn TableFuncContextProvider, diff --git a/crates/sqlbuiltins/src/functions/snowflake.rs b/crates/sqlbuiltins/src/functions/table/snowflake.rs similarity index 81% rename from crates/sqlbuiltins/src/functions/snowflake.rs rename to crates/sqlbuiltins/src/functions/table/snowflake.rs index f26bf32b1..303fcc0b4 100644 --- a/crates/sqlbuiltins/src/functions/snowflake.rs +++ b/crates/sqlbuiltins/src/functions/table/snowflake.rs @@ -6,21 +6,22 @@ use datafusion::arrow::datatypes::DataType; use datafusion::datasource::TableProvider; use datafusion::logical_expr::{Signature, Volatility}; use datafusion_ext::errors::{ExtensionError, Result}; -use datafusion_ext::functions::{FuncParamValue, TableFunc, TableFuncContextProvider}; +use datafusion_ext::functions::{FuncParamValue, TableFuncContextProvider}; use datasources::snowflake::{SnowflakeAccessor, SnowflakeDbConnection, SnowflakeTableAccess}; -use protogen::metastore::types::catalog::RuntimePreference; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; + +use crate::builtins::{ConstBuiltinFunction, TableFunc}; #[derive(Debug, Clone, Copy)] pub struct ReadSnowflake; -#[async_trait] -impl TableFunc for ReadSnowflake { - fn runtime_preference(&self) -> RuntimePreference { - RuntimePreference::Remote - } - fn name(&self) -> &str { - "read_snowflake" - } +impl ConstBuiltinFunction for ReadSnowflake { + const NAME: &'static str = "read_snowflake"; + const DESCRIPTION: &'static str = "Reads a Snowflake table"; + const EXAMPLE: &'static str = + "SELECT * FROM read_snowflake('account', 'username', 'password', 'database', 'warehouse', 'role', 'schema', 'table')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; + fn signature(&self) -> Option { Some(Signature::uniform( 8, @@ -28,6 +29,13 @@ impl TableFunc for ReadSnowflake { Volatility::Stable, )) } +} + +#[async_trait] +impl TableFunc for ReadSnowflake { + fn runtime_preference(&self) -> RuntimePreference { + RuntimePreference::Remote + } async fn create_provider( &self, _: &dyn TableFuncContextProvider, diff --git a/crates/sqlbuiltins/src/functions/virtual_listing.rs b/crates/sqlbuiltins/src/functions/table/virtual_listing.rs similarity index 87% rename from crates/sqlbuiltins/src/functions/virtual_listing.rs rename to crates/sqlbuiltins/src/functions/table/virtual_listing.rs index 2bb2977a1..2d9c87af9 100644 --- a/crates/sqlbuiltins/src/functions/virtual_listing.rs +++ b/crates/sqlbuiltins/src/functions/table/virtual_listing.rs @@ -9,7 +9,7 @@ use datafusion::datasource::{MemTable, TableProvider}; use datafusion::logical_expr::{Signature, Volatility}; use datafusion_ext::errors::{ExtensionError, Result}; use datafusion_ext::functions::{ - FuncParamValue, IdentValue, TableFunc, TableFuncContextProvider, VirtualLister, + FuncParamValue, IdentValue, TableFuncContextProvider, VirtualLister, }; use datasources::bigquery::BigQueryAccessor; use datasources::debug::DebugVirtualLister; @@ -17,26 +17,29 @@ use datasources::mongodb::MongoAccessor; use datasources::mysql::MysqlAccessor; use datasources::postgres::PostgresAccess; use datasources::snowflake::{SnowflakeAccessor, SnowflakeDbConnection}; -use protogen::metastore::types::catalog::RuntimePreference; +use protogen::metastore::types::catalog::{FunctionType, RuntimePreference}; use protogen::metastore::types::options::{ DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsMongo, DatabaseOptionsMysql, DatabaseOptionsPostgres, DatabaseOptionsSnowflake, }; +use crate::builtins::{ConstBuiltinFunction, TableFunc}; + #[derive(Debug, Clone, Copy)] pub struct ListSchemas; +impl ConstBuiltinFunction for ListSchemas { + const NAME: &'static str = "list_schemas"; + const DESCRIPTION: &'static str = "Lists schemas in a database"; + const EXAMPLE: &'static str = "SELECT * FROM list_schemas('database')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; +} #[async_trait] impl TableFunc for ListSchemas { fn runtime_preference(&self) -> RuntimePreference { - // Currently all of our db's are "external" so it'd never be preferred to run this locally. + // Currently all of our db's are "external" it'd never be preferred to run this locally. RuntimePreference::Remote } - - fn name(&self) -> &str { - "list_schemas" - } - async fn create_provider( &self, ctx: &dyn TableFuncContextProvider, @@ -73,6 +76,20 @@ impl TableFunc for ListSchemas { #[derive(Debug, Clone, Copy)] pub struct ListTables; +impl ConstBuiltinFunction for ListTables { + const NAME: &'static str = "list_tables"; + const DESCRIPTION: &'static str = "Lists tables in a schema"; + const EXAMPLE: &'static str = "SELECT * FROM list_tables('database', 'schema')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; + fn signature(&self) -> Option { + Some(Signature::uniform( + 3, + vec![DataType::Utf8], + Volatility::Stable, + )) + } +} + #[async_trait] impl TableFunc for ListTables { fn runtime_preference(&self) -> RuntimePreference { @@ -80,10 +97,6 @@ impl TableFunc for ListTables { RuntimePreference::Remote } - fn name(&self) -> &str { - "list_tables" - } - async fn create_provider( &self, ctx: &dyn TableFuncContextProvider, @@ -121,16 +134,12 @@ impl TableFunc for ListTables { #[derive(Debug, Clone, Copy)] pub struct ListColumns; -#[async_trait] -impl TableFunc for ListColumns { - fn runtime_preference(&self) -> RuntimePreference { - // Currently all of our db's are "external" so it'd never be preferred to run this locally. - RuntimePreference::Remote - } +impl ConstBuiltinFunction for ListColumns { + const NAME: &'static str = "list_columns"; + const DESCRIPTION: &'static str = "Lists columns in a table"; + const EXAMPLE: &'static str = "SELECT * FROM list_columns('database', 'schema', 'table')"; + const FUNCTION_TYPE: FunctionType = FunctionType::TableReturning; - fn name(&self) -> &str { - "list_columns" - } fn signature(&self) -> Option { Some(Signature::uniform( 3, @@ -138,6 +147,14 @@ impl TableFunc for ListColumns { Volatility::Stable, )) } +} + +#[async_trait] +impl TableFunc for ListColumns { + fn runtime_preference(&self) -> RuntimePreference { + // Currently all of our db's are "external" so it'd never be preferred to run this locally. + RuntimePreference::Remote + } async fn create_provider( &self, ctx: &dyn TableFuncContextProvider, diff --git a/crates/sqlbuiltins/src/selectors.rs b/crates/sqlbuiltins/src/selectors.rs new file mode 100644 index 000000000..e69de29bb diff --git a/crates/sqlexec/src/dispatch/system.rs b/crates/sqlexec/src/dispatch/system.rs index 1d226d051..413cc20e5 100644 --- a/crates/sqlexec/src/dispatch/system.rs +++ b/crates/sqlexec/src/dispatch/system.rs @@ -425,6 +425,8 @@ impl<'a> SystemTableDispatcher<'a> { let mut function_type = StringBuilder::new(); let mut parameters = ListBuilder::new(StringBuilder::new()); let mut builtin = BooleanBuilder::new(); + let mut sql_examples = StringBuilder::new(); + let mut descriptions = StringBuilder::new(); for func in self .catalog @@ -440,6 +442,8 @@ impl<'a> SystemTableDispatcher<'a> { schema_oid.append_value(ent.meta.parent); function_name.append_value(&ent.meta.name); function_type.append_value(ent.func_type.as_str()); + sql_examples.append_option(ent.meta.sql_example.as_ref()); + descriptions.append_option(ent.meta.description.as_ref()); const EMPTY: [Option<&'static str>; 0] = []; if let Some(sig) = &ent.signature { @@ -464,6 +468,8 @@ impl<'a> SystemTableDispatcher<'a> { Arc::new(function_type.finish()), Arc::new(parameters.finish()), Arc::new(builtin.finish()), + Arc::new(sql_examples.finish()), + Arc::new(descriptions.finish()), ], ) .unwrap(); diff --git a/testdata/sqllogictests/catalog/functions.slt b/testdata/sqllogictests/catalog/functions.slt new file mode 100644 index 000000000..9bf79862a --- /dev/null +++ b/testdata/sqllogictests/catalog/functions.slt @@ -0,0 +1,14 @@ +# Test the builtin 'functions' table `glare_catalog.functions + +statement ok +select * from glare_catalog.functions; + +query I rowsort +select distinct function_type from glare_catalog.functions; +---- +aggregate +scalar +table + +statement ok +select description, example from glare_catalog.functions \ No newline at end of file