diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index fd0c11ed0ab0..677685b2c65b 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -36,7 +36,7 @@ use crate::cast::{ as_decimal128_array, as_decimal256_array, as_dictionary_array, as_fixed_size_binary_array, as_fixed_size_list_array, }; -use crate::error::{DataFusionError, Result, _internal_err, _not_impl_err}; +use crate::error::{DataFusionError, Result, _exec_err, _internal_err, _not_impl_err}; use crate::hash_utils::create_hashes; use crate::utils::{ array_into_fixed_size_list_array, array_into_large_list_array, array_into_list_array, @@ -1707,9 +1707,7 @@ impl ScalarValue { // figure out the type based on the first element let data_type = match scalars.peek() { None => { - return _internal_err!( - "Empty iterator passed to ScalarValue::iter_to_array" - ); + return _exec_err!("Empty iterator passed to ScalarValue::iter_to_array"); } Some(sv) => sv.data_type(), }; @@ -1723,7 +1721,7 @@ impl ScalarValue { if let ScalarValue::$SCALAR_TY(v) = sv { Ok(v) } else { - _internal_err!( + _exec_err!( "Inconsistent types in ScalarValue::iter_to_array. \ Expected {:?}, got {:?}", data_type, sv @@ -1743,7 +1741,7 @@ impl ScalarValue { if let ScalarValue::$SCALAR_TY(v, _) = sv { Ok(v) } else { - _internal_err!( + _exec_err!( "Inconsistent types in ScalarValue::iter_to_array. \ Expected {:?}, got {:?}", data_type, sv @@ -1765,7 +1763,7 @@ impl ScalarValue { if let ScalarValue::$SCALAR_TY(v) = sv { Ok(v) } else { - _internal_err!( + _exec_err!( "Inconsistent types in ScalarValue::iter_to_array. \ Expected {:?}, got {:?}", data_type, sv @@ -1908,11 +1906,11 @@ impl ScalarValue { if &inner_key_type == key_type { Ok(*scalar) } else { - _internal_err!("Expected inner key type of {key_type} but found: {inner_key_type}, value was ({scalar:?})") + _exec_err!("Expected inner key type of {key_type} but found: {inner_key_type}, value was ({scalar:?})") } } _ => { - _internal_err!( + _exec_err!( "Expected scalar of type {value_type} but found: {scalar} {scalar:?}" ) } @@ -1940,7 +1938,7 @@ impl ScalarValue { if let ScalarValue::FixedSizeBinary(_, v) = sv { Ok(v) } else { - _internal_err!( + _exec_err!( "Inconsistent types in ScalarValue::iter_to_array. \ Expected {data_type:?}, got {sv:?}" ) @@ -1965,7 +1963,7 @@ impl ScalarValue { | DataType::RunEndEncoded(_, _) | DataType::ListView(_) | DataType::LargeListView(_) => { - return _internal_err!( + return _not_impl_err!( "Unsupported creation of {:?} array from ScalarValue {:?}", data_type, scalars.peek() diff --git a/datafusion/core/src/dataframe/mod.rs b/datafusion/core/src/dataframe/mod.rs index 04101fce2f5d..442dc7caf5a8 100644 --- a/datafusion/core/src/dataframe/mod.rs +++ b/datafusion/core/src/dataframe/mod.rs @@ -1712,11 +1712,13 @@ mod tests { use datafusion_expr::window_function::row_number; use datafusion_expr::{ cast, create_udf, expr, lit, BuiltInWindowFunction, ExprFunctionExt, - ScalarFunctionImplementation, Volatility, WindowFunctionDefinition, + ScalarFunctionImplementation, Volatility, WindowFrame, WindowFrameBound, + WindowFrameUnits, WindowFunctionDefinition, }; use datafusion_functions_aggregate::expr_fn::{array_agg, count_distinct}; use datafusion_physical_expr::expressions::Column; use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; + use sqlparser::ast::NullTreatment; // Get string representation of the plan async fn assert_physical_plan(df: &DataFrame, expected: Vec<&str>) { @@ -2362,6 +2364,90 @@ mod tests { Ok(()) } + #[tokio::test] + async fn window_using_aggregates() -> Result<()> { + // build plan using DataFrame API + let df = test_table().await?.filter(col("c1").eq(lit("a")))?; + let mut aggr_expr = vec![ + ( + datafusion_functions_aggregate::first_last::first_value_udaf(), + "first_value", + ), + ( + datafusion_functions_aggregate::first_last::last_value_udaf(), + "last_val", + ), + ( + datafusion_functions_aggregate::approx_distinct::approx_distinct_udaf(), + "approx_distinct", + ), + ( + datafusion_functions_aggregate::approx_median::approx_median_udaf(), + "approx_median", + ), + ( + datafusion_functions_aggregate::median::median_udaf(), + "median", + ), + (datafusion_functions_aggregate::min_max::max_udaf(), "max"), + (datafusion_functions_aggregate::min_max::min_udaf(), "min"), + ] + .into_iter() + .map(|(func, name)| { + let w = WindowFunction::new( + WindowFunctionDefinition::AggregateUDF(func), + vec![col("c3")], + ); + + Expr::WindowFunction(w) + .null_treatment(NullTreatment::IgnoreNulls) + .order_by(vec![col("c2").sort(true, true), col("c3").sort(true, true)]) + .window_frame(WindowFrame::new_bounds( + WindowFrameUnits::Rows, + WindowFrameBound::Preceding(ScalarValue::UInt64(None)), + WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1))), + )) + .build() + .unwrap() + .alias(name) + }) + .collect::>(); + aggr_expr.extend_from_slice(&[col("c2"), col("c3")]); + + let df: Vec = df.select(aggr_expr)?.collect().await?; + + assert_batches_sorted_eq!( + ["+-------------+----------+-----------------+---------------+--------+-----+------+----+------+", + "| first_value | last_val | approx_distinct | approx_median | median | max | min | c2 | c3 |", + "+-------------+----------+-----------------+---------------+--------+-----+------+----+------+", + "| | | | | | | | 1 | -85 |", + "| -85 | -101 | 14 | -12 | -101 | 83 | -101 | 4 | -54 |", + "| -85 | -101 | 17 | -25 | -101 | 83 | -101 | 5 | -31 |", + "| -85 | -12 | 10 | -32 | -12 | 83 | -85 | 3 | 13 |", + "| -85 | -25 | 3 | -56 | -25 | -25 | -85 | 1 | -5 |", + "| -85 | -31 | 18 | -29 | -31 | 83 | -101 | 5 | 36 |", + "| -85 | -38 | 16 | -25 | -38 | 83 | -101 | 4 | 65 |", + "| -85 | -43 | 7 | -43 | -43 | 83 | -85 | 2 | 45 |", + "| -85 | -48 | 6 | -35 | -48 | 83 | -85 | 2 | -43 |", + "| -85 | -5 | 4 | -37 | -5 | -5 | -85 | 1 | 83 |", + "| -85 | -54 | 15 | -17 | -54 | 83 | -101 | 4 | -38 |", + "| -85 | -56 | 2 | -70 | -56 | -56 | -85 | 1 | -25 |", + "| -85 | -72 | 9 | -43 | -72 | 83 | -85 | 3 | -12 |", + "| -85 | -85 | 1 | -85 | -85 | -85 | -85 | 1 | -56 |", + "| -85 | 13 | 11 | -17 | 13 | 83 | -85 | 3 | 14 |", + "| -85 | 13 | 11 | -25 | 13 | 83 | -85 | 3 | 13 |", + "| -85 | 14 | 12 | -12 | 14 | 83 | -85 | 3 | 17 |", + "| -85 | 17 | 13 | -11 | 17 | 83 | -85 | 4 | -101 |", + "| -85 | 45 | 8 | -34 | 45 | 83 | -85 | 3 | -72 |", + "| -85 | 65 | 17 | -17 | 65 | 83 | -101 | 5 | -101 |", + "| -85 | 83 | 5 | -25 | 83 | 83 | -85 | 2 | -48 |", + "+-------------+----------+-----------------+---------------+--------+-----+------+----+------+"], + &df + ); + + Ok(()) + } + // Test issue: https://github.com/apache/datafusion/issues/10346 #[tokio::test] async fn test_select_over_aggregate_schema() -> Result<()> { diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index 0a057d6f1417..e9c876291845 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -987,8 +987,24 @@ impl SessionStateBuilder { /// Returns a new [SessionStateBuilder] based on an existing [SessionState] /// The session id for the new builder will be unset; all other fields will - /// be cloned from what is set in the provided session state + /// be cloned from what is set in the provided session state. If the default + /// catalog exists in existing session state, the new session state will not + /// create default catalog and schema. pub fn new_from_existing(existing: SessionState) -> Self { + let default_catalog_exist = existing + .catalog_list() + .catalog(&existing.config.options().catalog.default_catalog) + .is_some(); + // The new `with_create_default_catalog_and_schema` should be false if the default catalog exists + let create_default_catalog_and_schema = existing + .config + .options() + .catalog + .create_default_catalog_and_schema + && !default_catalog_exist; + let new_config = existing + .config + .with_create_default_catalog_and_schema(create_default_catalog_and_schema); Self { session_id: None, analyzer: Some(existing.analyzer), @@ -1005,7 +1021,7 @@ impl SessionStateBuilder { window_functions: Some(existing.window_functions.into_values().collect_vec()), serializer_registry: Some(existing.serializer_registry), file_formats: Some(existing.file_formats.into_values().collect_vec()), - config: Some(existing.config), + config: Some(new_config), table_options: Some(existing.table_options), execution_props: Some(existing.execution_props), table_factories: Some(existing.table_factories), @@ -1801,17 +1817,19 @@ impl<'a> SimplifyInfo for SessionSimplifyProvider<'a> { #[cfg(test)] mod tests { - use std::collections::HashMap; - + use super::{SessionContextProvider, SessionStateBuilder}; + use crate::catalog_common::MemoryCatalogProviderList; + use crate::datasource::MemTable; + use crate::execution::context::SessionState; + use arrow_array::{ArrayRef, Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema}; use datafusion_common::DFSchema; use datafusion_common::Result; + use datafusion_execution::config::SessionConfig; use datafusion_expr::Expr; use datafusion_sql::planner::{PlannerContext, SqlToRel}; - - use crate::execution::context::SessionState; - - use super::{SessionContextProvider, SessionStateBuilder}; + use std::collections::HashMap; + use std::sync::Arc; #[test] fn test_session_state_with_default_features() { @@ -1841,4 +1859,68 @@ mod tests { assert!(sql_to_expr(&state).is_err()) } + + #[test] + fn test_from_existing() -> Result<()> { + fn employee_batch() -> RecordBatch { + let name: ArrayRef = + Arc::new(StringArray::from_iter_values(["Andy", "Andrew"])); + let age: ArrayRef = Arc::new(Int32Array::from(vec![11, 22])); + RecordBatch::try_from_iter(vec![("name", name), ("age", age)]).unwrap() + } + let batch = employee_batch(); + let table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; + + let session_state = SessionStateBuilder::new() + .with_catalog_list(Arc::new(MemoryCatalogProviderList::new())) + .build(); + let table_ref = session_state.resolve_table_ref("employee").to_string(); + session_state + .schema_for_ref(&table_ref)? + .register_table("employee".to_string(), Arc::new(table))?; + + let default_catalog = session_state + .config + .options() + .catalog + .default_catalog + .clone(); + let default_schema = session_state + .config + .options() + .catalog + .default_schema + .clone(); + let is_exist = session_state + .catalog_list() + .catalog(default_catalog.as_str()) + .unwrap() + .schema(default_schema.as_str()) + .unwrap() + .table_exist("employee"); + assert!(is_exist); + let new_state = SessionStateBuilder::new_from_existing(session_state).build(); + assert!(new_state + .catalog_list() + .catalog(default_catalog.as_str()) + .unwrap() + .schema(default_schema.as_str()) + .unwrap() + .table_exist("employee")); + + // if `with_create_default_catalog_and_schema` is disabled, the new one shouldn't create default catalog and schema + let disable_create_default = + SessionConfig::default().with_create_default_catalog_and_schema(false); + let without_default_state = SessionStateBuilder::new() + .with_config(disable_create_default) + .build(); + assert!(without_default_state + .catalog_list() + .catalog(&default_catalog) + .is_none()); + let new_state = + SessionStateBuilder::new_from_existing(without_default_state).build(); + assert!(new_state.catalog_list().catalog(&default_catalog).is_none()); + Ok(()) + } } diff --git a/datafusion/core/tests/memory_limit/mod.rs b/datafusion/core/tests/memory_limit/mod.rs index 5c712af80192..e6a51eae1337 100644 --- a/datafusion/core/tests/memory_limit/mod.rs +++ b/datafusion/core/tests/memory_limit/mod.rs @@ -76,8 +76,7 @@ async fn group_by_none() { TestCase::new() .with_query("select median(request_bytes) from t") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "AggregateStream", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: AggregateStream" ]) .with_memory_limit(2_000) .run() @@ -89,8 +88,7 @@ async fn group_by_row_hash() { TestCase::new() .with_query("select count(*) from t GROUP BY response_bytes") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "GroupedHashAggregateStream", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: GroupedHashAggregateStream" ]) .with_memory_limit(2_000) .run() @@ -103,8 +101,7 @@ async fn group_by_hash() { // group by dict column .with_query("select count(*) from t GROUP BY service, host, pod, container") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "GroupedHashAggregateStream", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: GroupedHashAggregateStream" ]) .with_memory_limit(1_000) .run() @@ -117,8 +114,7 @@ async fn join_by_key_multiple_partitions() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "HashJoinInput[0]", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[0]", ]) .with_memory_limit(1_000) .with_config(config) @@ -132,8 +128,7 @@ async fn join_by_key_single_partition() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service = t2.service") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "HashJoinInput", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput", ]) .with_memory_limit(1_000) .with_config(config) @@ -146,8 +141,7 @@ async fn join_by_expression() { TestCase::new() .with_query("select t1.* from t t1 JOIN t t2 ON t1.service != t2.service") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "NestedLoopJoinLoad[0]", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]", ]) .with_memory_limit(1_000) .run() @@ -159,8 +153,7 @@ async fn cross_join() { TestCase::new() .with_query("select t1.* from t t1 CROSS JOIN t t2") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "CrossJoinExec", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec", ]) .with_memory_limit(1_000) .run() @@ -216,8 +209,7 @@ async fn symmetric_hash_join() { "select t1.* from t t1 JOIN t t2 ON t1.pod = t2.pod AND t1.time = t2.time", ) .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "SymmetricHashJoinStream", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: SymmetricHashJoinStream", ]) .with_memory_limit(1_000) .with_scenario(Scenario::AccessLogStreaming) @@ -235,8 +227,7 @@ async fn sort_preserving_merge() { // so only a merge is needed .with_query("select * from t ORDER BY a ASC NULLS LAST, b ASC NULLS LAST LIMIT 10") .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "SortPreservingMergeExec", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: SortPreservingMergeExec", ]) // provide insufficient memory to merge .with_memory_limit(partition_size / 2) @@ -313,8 +304,7 @@ async fn sort_spill_reservation() { test.clone() .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "ExternalSorterMerge", // merging in sort fails + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: ExternalSorterMerge", ]) .with_config(config) .run() @@ -343,8 +333,7 @@ async fn oom_recursive_cte() { SELECT * FROM nodes;", ) .with_expected_errors(vec![ - "Resources exhausted: Failed to allocate additional", - "RecursiveQuery", + "Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: RecursiveQuery", ]) .with_memory_limit(2_000) .run() @@ -396,7 +385,7 @@ async fn oom_with_tracked_consumer_pool() { .with_expected_errors(vec![ "Failed to allocate additional", "for ParquetSink(ArrowColumnWriter)", - "Resources exhausted with top memory consumers (across reservations) are: ParquetSink(ArrowColumnWriter)" + "Additional allocation failed with top memory consumers (across reservations) as: ParquetSink(ArrowColumnWriter)" ]) .with_memory_pool(Arc::new( TrackConsumersPool::new( diff --git a/datafusion/execution/src/memory_pool/pool.rs b/datafusion/execution/src/memory_pool/pool.rs index 4a41602bd961..d3cd93979baf 100644 --- a/datafusion/execution/src/memory_pool/pool.rs +++ b/datafusion/execution/src/memory_pool/pool.rs @@ -392,7 +392,7 @@ fn provide_top_memory_consumers_to_error_msg( error_msg: String, top_consumers: String, ) -> String { - format!("Resources exhausted with top memory consumers (across reservations) are: {}. Error: {}", top_consumers, error_msg) + format!("Additional allocation failed with top memory consumers (across reservations) as: {}. Error: {}", top_consumers, error_msg) } #[cfg(test)] @@ -501,7 +501,7 @@ mod tests { // Test: reports if new reservation causes error // using the previously set sizes for other consumers let mut r5 = MemoryConsumer::new("r5").register(&pool); - let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool"; + let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool"; let res = r5.try_grow(150); assert!( matches!( @@ -524,7 +524,7 @@ mod tests { // Test: see error message when no consumers recorded yet let mut r0 = MemoryConsumer::new(same_name).register(&pool); - let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool"; + let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool"; let res = r0.try_grow(150); assert!( matches!( @@ -543,7 +543,7 @@ mod tests { let mut r1 = new_consumer_same_name.clone().register(&pool); // TODO: the insufficient_capacity_err() message is per reservation, not per consumer. // a followup PR will clarify this message "0 bytes already allocated for this reservation" - let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 90 bytes remain available for the total pool"; + let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 90 bytes remain available for the total pool"; let res = r1.try_grow(150); assert!( matches!( @@ -555,7 +555,7 @@ mod tests { // Test: will accumulate size changes per consumer, not per reservation r1.grow(20); - let expected = "Resources exhausted with top memory consumers (across reservations) are: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool"; + let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool"; let res = r1.try_grow(150); assert!( matches!( @@ -570,7 +570,7 @@ mod tests { let consumer_with_same_name_but_different_hash = MemoryConsumer::new(same_name).with_can_spill(true); let mut r2 = consumer_with_same_name_but_different_hash.register(&pool); - let expected = "Resources exhausted with top memory consumers (across reservations) are: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 70 bytes remain available for the total pool"; + let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 70 bytes remain available for the total pool"; let res = r2.try_grow(150); assert!( matches!( @@ -590,7 +590,7 @@ mod tests { let r1_consumer = MemoryConsumer::new("r1"); let mut r1 = r1_consumer.clone().register(&pool); r1.grow(20); - let expected = "Resources exhausted with top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool"; + let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool"; let res = r0.try_grow(150); assert!( matches!( @@ -604,7 +604,7 @@ mod tests { // Test: unregister one // only the remaining one should be listed pool.unregister(&r1_consumer); - let expected_consumers = "Resources exhausted with top memory consumers (across reservations) are: r0 consumed 10 bytes"; + let expected_consumers = "Additional allocation failed with top memory consumers (across reservations) as: r0 consumed 10 bytes"; let res = r0.try_grow(150); assert!( matches!( diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index 25573d915959..420246595558 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -20,16 +20,21 @@ use crate::{ disk_manager::{DiskManager, DiskManagerConfig}, - memory_pool::{GreedyMemoryPool, MemoryPool, UnboundedMemoryPool}, + memory_pool::{ + GreedyMemoryPool, MemoryPool, TrackConsumersPool, UnboundedMemoryPool, + }, object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, }; use crate::cache::cache_manager::{CacheManager, CacheManagerConfig}; use datafusion_common::{DataFusionError, Result}; use object_store::ObjectStore; -use std::fmt::{Debug, Formatter}; use std::path::PathBuf; use std::sync::Arc; +use std::{ + fmt::{Debug, Formatter}, + num::NonZeroUsize, +}; use url::Url; #[derive(Clone)] @@ -213,7 +218,10 @@ impl RuntimeConfig { /// Note DataFusion does not yet respect this limit in all cases. pub fn with_memory_limit(self, max_memory: usize, memory_fraction: f64) -> Self { let pool_size = (max_memory as f64 * memory_fraction) as usize; - self.with_memory_pool(Arc::new(GreedyMemoryPool::new(pool_size))) + self.with_memory_pool(Arc::new(TrackConsumersPool::new( + GreedyMemoryPool::new(pool_size), + NonZeroUsize::new(5).unwrap(), + ))) } /// Use the specified path to create any needed temporary files diff --git a/datafusion/expr-common/src/type_coercion/binary.rs b/datafusion/expr-common/src/type_coercion/binary.rs index 251ac6cb8c0e..6d2fb660f669 100644 --- a/datafusion/expr-common/src/type_coercion/binary.rs +++ b/datafusion/expr-common/src/type_coercion/binary.rs @@ -1045,14 +1045,27 @@ pub fn like_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option Option { + use arrow::datatypes::DataType::*; + match (lhs_type, rhs_type) { + (DataType::Null, Utf8View | Utf8 | LargeUtf8) => Some(rhs_type.clone()), + (Utf8View | Utf8 | LargeUtf8, DataType::Null) => Some(lhs_type.clone()), + (DataType::Null, DataType::Null) => Some(Utf8), + _ => None, + } +} + /// coercion rules for regular expression comparison operations. /// This is a union of string coercion rules and dictionary coercion rules pub fn regex_coercion(lhs_type: &DataType, rhs_type: &DataType) -> Option { string_coercion(lhs_type, rhs_type) .or_else(|| dictionary_coercion(lhs_type, rhs_type, false)) + .or_else(|| regex_null_coercion(lhs_type, rhs_type)) } /// Checks if the TimeUnit associated with a Time32 or Time64 type is consistent, diff --git a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs index be2b5e48a8db..149312e5a9c0 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/bool_op.rs @@ -17,6 +17,7 @@ use std::sync::Arc; +use crate::aggregate::groups_accumulator::nulls::filtered_null_mask; use arrow::array::{ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder}; use arrow::buffer::BooleanBuffer; use datafusion_common::Result; @@ -46,17 +47,22 @@ where /// Function that computes the output bool_fn: F, + + /// The identity element for the boolean operation. + /// Any value combined with this returns the original value. + identity: bool, } impl BooleanGroupsAccumulator where F: Fn(bool, bool) -> bool + Send + Sync, { - pub fn new(bitop_fn: F) -> Self { + pub fn new(bool_fn: F, identity: bool) -> Self { Self { values: BooleanBufferBuilder::new(0), null_state: NullState::new(), - bool_fn: bitop_fn, + bool_fn, + identity, } } } @@ -77,7 +83,9 @@ where if self.values.len() < total_num_groups { let new_groups = total_num_groups - self.values.len(); - self.values.append_n(new_groups, Default::default()); + // Fill with the identity element, so that when the first non-null value is encountered, + // it will combine with the identity and the result will be the first non-null value itself. + self.values.append_n(new_groups, self.identity); } // NullState dispatches / handles tracking nulls and groups that saw no values @@ -135,4 +143,22 @@ where // capacity is in bits, so convert to bytes self.values.capacity() / 8 + self.null_state.size() } + + fn convert_to_state( + &self, + values: &[ArrayRef], + opt_filter: Option<&BooleanArray>, + ) -> Result> { + let values = values[0].as_boolean().clone(); + + let values_null_buffer_filtered = filtered_null_mask(opt_filter, &values); + let (values_buf, _) = values.into_parts(); + let values_filtered = BooleanArray::new(values_buf, values_null_buffer_filtered); + + Ok(vec![Arc::new(values_filtered)]) + } + + fn supports_convert_to_state(&self) -> bool { + true + } } diff --git a/datafusion/functions-aggregate/src/bool_and_or.rs b/datafusion/functions-aggregate/src/bool_and_or.rs index b993b2a4979c..7cc7d9ff7fec 100644 --- a/datafusion/functions-aggregate/src/bool_and_or.rs +++ b/datafusion/functions-aggregate/src/bool_and_or.rs @@ -151,7 +151,7 @@ impl AggregateUDFImpl for BoolAnd { ) -> Result> { match args.return_type { DataType::Boolean => { - Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y))) + Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x && y, true))) } _ => not_impl_err!( "GroupsAccumulator not supported for {} with {}", @@ -270,9 +270,10 @@ impl AggregateUDFImpl for BoolOr { args: AccumulatorArgs, ) -> Result> { match args.return_type { - DataType::Boolean => { - Ok(Box::new(BooleanGroupsAccumulator::new(|x, y| x || y))) - } + DataType::Boolean => Ok(Box::new(BooleanGroupsAccumulator::new( + |x, y| x || y, + false, + ))), _ => not_impl_err!( "GroupsAccumulator not supported for {} with {}", args.name, diff --git a/datafusion/functions-aggregate/src/min_max.rs b/datafusion/functions-aggregate/src/min_max.rs index f9a08631bfb9..4dcd5ac0e951 100644 --- a/datafusion/functions-aggregate/src/min_max.rs +++ b/datafusion/functions-aggregate/src/min_max.rs @@ -48,7 +48,9 @@ use arrow::datatypes::{ Int32Type, Int64Type, Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type, }; use arrow_schema::IntervalUnit; -use datafusion_common::{downcast_value, internal_err, DataFusionError, Result}; +use datafusion_common::{ + downcast_value, exec_err, internal_err, DataFusionError, Result, +}; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; use std::fmt::Debug; @@ -68,7 +70,12 @@ use std::ops::Deref; fn get_min_max_result_type(input_types: &[DataType]) -> Result> { // make sure that the input types only has one element. - assert_eq!(input_types.len(), 1); + if input_types.len() != 1 { + return exec_err!( + "min/max was called with {} arguments. It requires only 1.", + input_types.len() + ); + } // min and max support the dictionary data type // unpack the dictionary to get the value match &input_types[0] { diff --git a/datafusion/functions/Cargo.toml b/datafusion/functions/Cargo.toml index 688563baecfa..2b3f80fc930b 100644 --- a/datafusion/functions/Cargo.toml +++ b/datafusion/functions/Cargo.toml @@ -151,3 +151,8 @@ required-features = ["string_expressions"] harness = false name = "pad" required-features = ["unicode_expressions"] + +[[bench]] +harness = false +name = "repeat" +required-features = ["string_expressions"] diff --git a/datafusion/functions/benches/repeat.rs b/datafusion/functions/benches/repeat.rs new file mode 100644 index 000000000000..916c8374e5fb --- /dev/null +++ b/datafusion/functions/benches/repeat.rs @@ -0,0 +1,136 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +extern crate criterion; + +use arrow::array::{ArrayRef, Int64Array, OffsetSizeTrait}; +use arrow::util::bench_util::{ + create_string_array_with_len, create_string_view_array_with_len, +}; +use criterion::{black_box, criterion_group, criterion_main, Criterion, SamplingMode}; +use datafusion_expr::ColumnarValue; +use datafusion_functions::string; +use std::sync::Arc; +use std::time::Duration; + +fn create_args( + size: usize, + str_len: usize, + repeat_times: i64, + use_string_view: bool, +) -> Vec { + let number_array = Arc::new(Int64Array::from( + (0..size).map(|_| repeat_times).collect::>(), + )); + + if use_string_view { + let string_array = + Arc::new(create_string_view_array_with_len(size, 0.1, str_len, false)); + vec![ + ColumnarValue::Array(string_array), + ColumnarValue::Array(number_array), + ] + } else { + let string_array = + Arc::new(create_string_array_with_len::(size, 0.1, str_len)); + + vec![ + ColumnarValue::Array(string_array), + ColumnarValue::Array(Arc::clone(&number_array) as ArrayRef), + ] + } +} + +fn criterion_benchmark(c: &mut Criterion) { + let repeat = string::repeat(); + for size in [1024, 4096] { + // REPEAT 3 TIMES + let repeat_times = 3; + let mut group = c.benchmark_group(format!("repeat {} times", repeat_times)); + group.sampling_mode(SamplingMode::Flat); + group.sample_size(10); + group.measurement_time(Duration::from_secs(10)); + + let args = create_args::(size, 32, repeat_times, true); + group.bench_function( + &format!( + "repeat_string_view [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(repeat.invoke(&args))), + ); + + let args = create_args::(size, 32, repeat_times, false); + group.bench_function( + &format!( + "repeat_string [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(repeat.invoke(&args))), + ); + + let args = create_args::(size, 32, repeat_times, false); + group.bench_function( + &format!( + "repeat_large_string [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(repeat.invoke(&args))), + ); + + group.finish(); + + // REPEAT 30 TIMES + let repeat_times = 30; + let mut group = c.benchmark_group(format!("repeat {} times", repeat_times)); + group.sampling_mode(SamplingMode::Flat); + group.sample_size(10); + group.measurement_time(Duration::from_secs(10)); + + let args = create_args::(size, 32, repeat_times, true); + group.bench_function( + &format!( + "repeat_string_view [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(repeat.invoke(&args))), + ); + + let args = create_args::(size, 32, repeat_times, false); + group.bench_function( + &format!( + "repeat_string [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(repeat.invoke(&args))), + ); + + let args = create_args::(size, 32, repeat_times, false); + group.bench_function( + &format!( + "repeat_large_string [size={}, repeat_times={}]", + size, repeat_times + ), + |b| b.iter(|| black_box(repeat.invoke(&args))), + ); + + group.finish(); + } +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/datafusion/functions/src/core/mod.rs b/datafusion/functions/src/core/mod.rs index 062a4a104d54..af340930eabc 100644 --- a/datafusion/functions/src/core/mod.rs +++ b/datafusion/functions/src/core/mod.rs @@ -86,6 +86,7 @@ pub mod expr_fn { } } +/// Returns all DataFusion functions defined in this package pub fn functions() -> Vec> { vec![ nullif(), @@ -94,6 +95,13 @@ pub fn functions() -> Vec> { nvl2(), arrow_typeof(), named_struct(), + // Note: most users invoke `get_field` indirectly via field access + // syntax like `my_struct_col['field_name']`, which results in a call to + // `get_field(my_struct_col, "field_name")`. + // + // However, it is also exposed directly for use cases such as + // serializing / deserializing plans with the field access desugared to + // calls to `get_field` get_field(), coalesce(), ] diff --git a/datafusion/functions/src/crypto/mod.rs b/datafusion/functions/src/crypto/mod.rs index 497c1af62a72..46177fc22b60 100644 --- a/datafusion/functions/src/crypto/mod.rs +++ b/datafusion/functions/src/crypto/mod.rs @@ -62,6 +62,7 @@ pub mod expr_fn { )); } +/// Returns all DataFusion functions defined in this package pub fn functions() -> Vec> { vec![digest(), md5(), sha224(), sha256(), sha384(), sha512()] } diff --git a/datafusion/functions/src/datetime/mod.rs b/datafusion/functions/src/datetime/mod.rs index a7e9827d6ca6..db4e365267dd 100644 --- a/datafusion/functions/src/datetime/mod.rs +++ b/datafusion/functions/src/datetime/mod.rs @@ -272,7 +272,7 @@ pub mod expr_fn { } } -/// Return a list of all functions in this package +/// Returns all DataFusion functions defined in this package pub fn functions() -> Vec> { vec![ current_date(), diff --git a/datafusion/functions/src/encoding/mod.rs b/datafusion/functions/src/encoding/mod.rs index 24e11e5d635f..48171370ad58 100644 --- a/datafusion/functions/src/encoding/mod.rs +++ b/datafusion/functions/src/encoding/mod.rs @@ -37,6 +37,7 @@ pub mod expr_fn { )); } +/// Returns all DataFusion functions defined in this package pub fn functions() -> Vec> { vec![encode(), decode()] } diff --git a/datafusion/functions/src/math/mod.rs b/datafusion/functions/src/math/mod.rs index 1e41fff289a4..b221fb900cfa 100644 --- a/datafusion/functions/src/math/mod.rs +++ b/datafusion/functions/src/math/mod.rs @@ -276,6 +276,7 @@ pub mod expr_fn { ); } +/// Returns all DataFusion functions defined in this package pub fn functions() -> Vec> { vec![ abs(), diff --git a/datafusion/functions/src/regex/mod.rs b/datafusion/functions/src/regex/mod.rs index 884db24d9ec8..4ac162290ddb 100644 --- a/datafusion/functions/src/regex/mod.rs +++ b/datafusion/functions/src/regex/mod.rs @@ -65,7 +65,7 @@ pub mod expr_fn { } } -#[doc = r" Return a list of all functions in this package"] +/// Returns all DataFusion functions defined in this package pub fn functions() -> Vec> { vec![regexp_match(), regexp_like(), regexp_replace()] } diff --git a/datafusion/functions/src/string/common.rs b/datafusion/functions/src/string/common.rs index 7037c1d1c3c3..54aebb039046 100644 --- a/datafusion/functions/src/string/common.rs +++ b/datafusion/functions/src/string/common.rs @@ -19,8 +19,9 @@ use std::fmt::{Display, Formatter}; use std::sync::Arc; use arrow::array::{ - new_null_array, Array, ArrayDataBuilder, ArrayRef, GenericStringArray, - GenericStringBuilder, OffsetSizeTrait, StringArray, + new_null_array, Array, ArrayAccessor, ArrayDataBuilder, ArrayIter, ArrayRef, + GenericStringArray, GenericStringBuilder, OffsetSizeTrait, StringArray, + StringViewArray, }; use arrow::buffer::{Buffer, MutableBuffer, NullBuffer}; use arrow::datatypes::DataType; @@ -251,6 +252,22 @@ impl<'a> ColumnarValueRef<'a> { } } +pub trait StringArrayType<'a>: ArrayAccessor + Sized { + fn iter(&self) -> ArrayIter; +} + +impl<'a, T: OffsetSizeTrait> StringArrayType<'a> for &'a GenericStringArray { + fn iter(&self) -> ArrayIter { + GenericStringArray::::iter(self) + } +} + +impl<'a> StringArrayType<'a> for &'a StringViewArray { + fn iter(&self) -> ArrayIter { + StringViewArray::iter(self) + } +} + /// Optimized version of the StringBuilder in Arrow that: /// 1. Precalculating the expected length of the result, avoiding reallocations. /// 2. Avoids creating / incrementally creating a `NullBufferBuilder` diff --git a/datafusion/functions/src/string/mod.rs b/datafusion/functions/src/string/mod.rs index 9a19151a85e2..622802f0142b 100644 --- a/datafusion/functions/src/string/mod.rs +++ b/datafusion/functions/src/string/mod.rs @@ -167,7 +167,7 @@ pub mod expr_fn { } } -/// Return a list of all functions in this package +/// Returns all DataFusion functions defined in this package pub fn functions() -> Vec> { vec![ ascii(), diff --git a/datafusion/functions/src/string/overlay.rs b/datafusion/functions/src/string/overlay.rs index 772b04136129..e285bd85b197 100644 --- a/datafusion/functions/src/string/overlay.rs +++ b/datafusion/functions/src/string/overlay.rs @@ -21,7 +21,9 @@ use std::sync::Arc; use arrow::array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; use arrow::datatypes::DataType; -use datafusion_common::cast::{as_generic_string_array, as_int64_array}; +use datafusion_common::cast::{ + as_generic_string_array, as_int64_array, as_string_view_array, +}; use datafusion_common::{exec_err, Result}; use datafusion_expr::TypeSignature::*; use datafusion_expr::{ColumnarValue, Volatility}; @@ -46,8 +48,10 @@ impl OverlayFunc { Self { signature: Signature::one_of( vec![ + Exact(vec![Utf8View, Utf8View, Int64, Int64]), Exact(vec![Utf8, Utf8, Int64, Int64]), Exact(vec![LargeUtf8, LargeUtf8, Int64, Int64]), + Exact(vec![Utf8View, Utf8View, Int64]), Exact(vec![Utf8, Utf8, Int64]), Exact(vec![LargeUtf8, LargeUtf8, Int64]), ], @@ -76,54 +80,107 @@ impl ScalarUDFImpl for OverlayFunc { fn invoke(&self, args: &[ColumnarValue]) -> Result { match args[0].data_type() { - DataType::Utf8 => make_scalar_function(overlay::, vec![])(args), + DataType::Utf8View | DataType::Utf8 => { + make_scalar_function(overlay::, vec![])(args) + } DataType::LargeUtf8 => make_scalar_function(overlay::, vec![])(args), other => exec_err!("Unsupported data type {other:?} for function overlay"), } } } +macro_rules! process_overlay { + // For the three-argument case + ($string_array:expr, $characters_array:expr, $pos_num:expr) => {{ + $string_array + .iter() + .zip($characters_array.iter()) + .zip($pos_num.iter()) + .map(|((string, characters), start_pos)| { + match (string, characters, start_pos) { + (Some(string), Some(characters), Some(start_pos)) => { + let string_len = string.chars().count(); + let characters_len = characters.chars().count(); + let replace_len = characters_len as i64; + let mut res = + String::with_capacity(string_len.max(characters_len)); + + //as sql replace index start from 1 while string index start from 0 + if start_pos > 1 && start_pos - 1 < string_len as i64 { + let start = (start_pos - 1) as usize; + res.push_str(&string[..start]); + } + res.push_str(characters); + // if start + replace_len - 1 >= string_length, just to string end + if start_pos + replace_len - 1 < string_len as i64 { + let end = (start_pos + replace_len - 1) as usize; + res.push_str(&string[end..]); + } + Ok(Some(res)) + } + _ => Ok(None), + } + }) + .collect::>>() + }}; + + // For the four-argument case + ($string_array:expr, $characters_array:expr, $pos_num:expr, $len_num:expr) => {{ + $string_array + .iter() + .zip($characters_array.iter()) + .zip($pos_num.iter()) + .zip($len_num.iter()) + .map(|(((string, characters), start_pos), len)| { + match (string, characters, start_pos, len) { + (Some(string), Some(characters), Some(start_pos), Some(len)) => { + let string_len = string.chars().count(); + let characters_len = characters.chars().count(); + let replace_len = len.min(string_len as i64); + let mut res = + String::with_capacity(string_len.max(characters_len)); + + //as sql replace index start from 1 while string index start from 0 + if start_pos > 1 && start_pos - 1 < string_len as i64 { + let start = (start_pos - 1) as usize; + res.push_str(&string[..start]); + } + res.push_str(characters); + // if start + replace_len - 1 >= string_length, just to string end + if start_pos + replace_len - 1 < string_len as i64 { + let end = (start_pos + replace_len - 1) as usize; + res.push_str(&string[end..]); + } + Ok(Some(res)) + } + _ => Ok(None), + } + }) + .collect::>>() + }}; +} + /// OVERLAY(string1 PLACING string2 FROM integer FOR integer2) /// Replaces a substring of string1 with string2 starting at the integer bit /// pgsql overlay('Txxxxas' placing 'hom' from 2 for 4) → Thomas /// overlay('Txxxxas' placing 'hom' from 2) -> Thomxas, without for option, str2's len is instead -pub fn overlay(args: &[ArrayRef]) -> Result { +fn overlay(args: &[ArrayRef]) -> Result { + let use_string_view = args[0].data_type() == &DataType::Utf8View; + if use_string_view { + string_view_overlay::(args) + } else { + string_overlay::(args) + } +} + +pub fn string_overlay(args: &[ArrayRef]) -> Result { match args.len() { 3 => { let string_array = as_generic_string_array::(&args[0])?; let characters_array = as_generic_string_array::(&args[1])?; let pos_num = as_int64_array(&args[2])?; - let result = string_array - .iter() - .zip(characters_array.iter()) - .zip(pos_num.iter()) - .map(|((string, characters), start_pos)| { - match (string, characters, start_pos) { - (Some(string), Some(characters), Some(start_pos)) => { - let string_len = string.chars().count(); - let characters_len = characters.chars().count(); - let replace_len = characters_len as i64; - let mut res = - String::with_capacity(string_len.max(characters_len)); - - //as sql replace index start from 1 while string index start from 0 - if start_pos > 1 && start_pos - 1 < string_len as i64 { - let start = (start_pos - 1) as usize; - res.push_str(&string[..start]); - } - res.push_str(characters); - // if start + replace_len - 1 >= string_length, just to string end - if start_pos + replace_len - 1 < string_len as i64 { - let end = (start_pos + replace_len - 1) as usize; - res.push_str(&string[end..]); - } - Ok(Some(res)) - } - _ => Ok(None), - } - }) - .collect::>>()?; + let result = process_overlay!(string_array, characters_array, pos_num)?; Ok(Arc::new(result) as ArrayRef) } 4 => { @@ -132,37 +189,34 @@ pub fn overlay(args: &[ArrayRef]) -> Result { let pos_num = as_int64_array(&args[2])?; let len_num = as_int64_array(&args[3])?; - let result = string_array - .iter() - .zip(characters_array.iter()) - .zip(pos_num.iter()) - .zip(len_num.iter()) - .map(|(((string, characters), start_pos), len)| { - match (string, characters, start_pos, len) { - (Some(string), Some(characters), Some(start_pos), Some(len)) => { - let string_len = string.chars().count(); - let characters_len = characters.chars().count(); - let replace_len = len.min(string_len as i64); - let mut res = - String::with_capacity(string_len.max(characters_len)); - - //as sql replace index start from 1 while string index start from 0 - if start_pos > 1 && start_pos - 1 < string_len as i64 { - let start = (start_pos - 1) as usize; - res.push_str(&string[..start]); - } - res.push_str(characters); - // if start + replace_len - 1 >= string_length, just to string end - if start_pos + replace_len - 1 < string_len as i64 { - let end = (start_pos + replace_len - 1) as usize; - res.push_str(&string[end..]); - } - Ok(Some(res)) - } - _ => Ok(None), - } - }) - .collect::>>()?; + let result = + process_overlay!(string_array, characters_array, pos_num, len_num)?; + Ok(Arc::new(result) as ArrayRef) + } + other => { + exec_err!("overlay was called with {other} arguments. It requires 3 or 4.") + } + } +} + +pub fn string_view_overlay(args: &[ArrayRef]) -> Result { + match args.len() { + 3 => { + let string_array = as_string_view_array(&args[0])?; + let characters_array = as_string_view_array(&args[1])?; + let pos_num = as_int64_array(&args[2])?; + + let result = process_overlay!(string_array, characters_array, pos_num)?; + Ok(Arc::new(result) as ArrayRef) + } + 4 => { + let string_array = as_string_view_array(&args[0])?; + let characters_array = as_string_view_array(&args[1])?; + let pos_num = as_int64_array(&args[2])?; + let len_num = as_int64_array(&args[3])?; + + let result = + process_overlay!(string_array, characters_array, pos_num, len_num)?; Ok(Arc::new(result) as ArrayRef) } other => { diff --git a/datafusion/functions/src/string/repeat.rs b/datafusion/functions/src/string/repeat.rs index a377dee06f41..20e4462784b8 100644 --- a/datafusion/functions/src/string/repeat.rs +++ b/datafusion/functions/src/string/repeat.rs @@ -18,17 +18,20 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{ArrayRef, GenericStringArray, OffsetSizeTrait, StringArray}; +use arrow::array::{ + ArrayRef, AsArray, GenericStringArray, GenericStringBuilder, Int64Array, + OffsetSizeTrait, StringViewArray, +}; use arrow::datatypes::DataType; +use arrow::datatypes::DataType::{Int64, LargeUtf8, Utf8, Utf8View}; -use datafusion_common::cast::{ - as_generic_string_array, as_int64_array, as_string_view_array, -}; +use datafusion_common::cast::as_int64_array; use datafusion_common::{exec_err, Result}; use datafusion_expr::TypeSignature::*; use datafusion_expr::{ColumnarValue, Volatility}; use datafusion_expr::{ScalarUDFImpl, Signature}; +use crate::string::common::StringArrayType; use crate::utils::{make_scalar_function, utf8_to_str_type}; #[derive(Debug)] @@ -44,7 +47,6 @@ impl Default for RepeatFunc { impl RepeatFunc { pub fn new() -> Self { - use DataType::*; Self { signature: Signature::one_of( vec![ @@ -79,51 +81,53 @@ impl ScalarUDFImpl for RepeatFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - match args[0].data_type() { - DataType::Utf8View => make_scalar_function(repeat_utf8view, vec![])(args), - DataType::Utf8 => make_scalar_function(repeat::, vec![])(args), - DataType::LargeUtf8 => make_scalar_function(repeat::, vec![])(args), - other => exec_err!("Unsupported data type {other:?} for function repeat. Expected Utf8, Utf8View or LargeUtf8"), - } + make_scalar_function(repeat, vec![])(args) } } /// Repeats string the specified number of times. /// repeat('Pg', 4) = 'PgPgPgPg' -fn repeat(args: &[ArrayRef]) -> Result { - let string_array = as_generic_string_array::(&args[0])?; +fn repeat(args: &[ArrayRef]) -> Result { let number_array = as_int64_array(&args[1])?; - - let result = string_array - .iter() - .zip(number_array.iter()) - .map(|(string, number)| repeat_common(string, number)) - .collect::>(); - - Ok(Arc::new(result) as ArrayRef) + match args[0].data_type() { + Utf8View => { + let string_view_array = args[0].as_string_view(); + repeat_impl::(string_view_array, number_array) + } + Utf8 => { + let string_array = args[0].as_string::(); + repeat_impl::>(string_array, number_array) + } + LargeUtf8 => { + let string_array = args[0].as_string::(); + repeat_impl::>(string_array, number_array) + } + other => exec_err!( + "Unsupported data type {other:?} for function repeat. \ + Expected Utf8, Utf8View or LargeUtf8." + ), + } } -fn repeat_utf8view(args: &[ArrayRef]) -> Result { - let string_view_array = as_string_view_array(&args[0])?; - let number_array = as_int64_array(&args[1])?; - - let result = string_view_array +fn repeat_impl<'a, T, S>(string_array: S, number_array: &Int64Array) -> Result +where + T: OffsetSizeTrait, + S: StringArrayType<'a>, +{ + let mut builder: GenericStringBuilder = GenericStringBuilder::new(); + string_array .iter() .zip(number_array.iter()) - .map(|(string, number)| repeat_common(string, number)) - .collect::(); - - Ok(Arc::new(result) as ArrayRef) -} - -fn repeat_common(string: Option<&str>, number: Option) -> Option { - match (string, number) { - (Some(string), Some(number)) if number >= 0 => { - Some(string.repeat(number as usize)) - } - (Some(_), Some(_)) => Some("".to_string()), - _ => None, - } + .for_each(|(string, number)| match (string, number) { + (Some(string), Some(number)) if number >= 0 => { + builder.append_value(string.repeat(number as usize)) + } + (Some(_), Some(_)) => builder.append_value(""), + _ => builder.append_null(), + }); + let array = builder.finish(); + + Ok(Arc::new(array) as ArrayRef) } #[cfg(test)] diff --git a/datafusion/functions/src/string/split_part.rs b/datafusion/functions/src/string/split_part.rs index d6f7bb4a4d4a..19721f0fad28 100644 --- a/datafusion/functions/src/string/split_part.rs +++ b/datafusion/functions/src/string/split_part.rs @@ -21,7 +21,9 @@ use std::sync::Arc; use arrow::array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; use arrow::datatypes::DataType; -use datafusion_common::cast::{as_generic_string_array, as_int64_array}; +use datafusion_common::cast::{ + as_generic_string_array, as_int64_array, as_string_view_array, +}; use datafusion_common::{exec_err, Result}; use datafusion_expr::TypeSignature::*; use datafusion_expr::{ColumnarValue, Volatility}; @@ -46,7 +48,12 @@ impl SplitPartFunc { Self { signature: Signature::one_of( vec![ + Exact(vec![Utf8View, Utf8View, Int64]), + Exact(vec![Utf8View, Utf8, Int64]), + Exact(vec![Utf8View, LargeUtf8, Int64]), + Exact(vec![Utf8, Utf8View, Int64]), Exact(vec![Utf8, Utf8, Int64]), + Exact(vec![LargeUtf8, Utf8View, Int64]), Exact(vec![LargeUtf8, Utf8, Int64]), Exact(vec![Utf8, LargeUtf8, Int64]), Exact(vec![LargeUtf8, LargeUtf8, Int64]), @@ -75,50 +82,101 @@ impl ScalarUDFImpl for SplitPartFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - match args[0].data_type() { - DataType::Utf8 => make_scalar_function(split_part::, vec![])(args), - DataType::LargeUtf8 => make_scalar_function(split_part::, vec![])(args), - other => { - exec_err!("Unsupported data type {other:?} for function split_part") + match (args[0].data_type(), args[1].data_type()) { + ( + DataType::Utf8 | DataType::Utf8View, + DataType::Utf8 | DataType::Utf8View, + ) => make_scalar_function(split_part::, vec![])(args), + (DataType::LargeUtf8, DataType::LargeUtf8) => { + make_scalar_function(split_part::, vec![])(args) } + (_, DataType::LargeUtf8) => { + make_scalar_function(split_part::, vec![])(args) + } + (DataType::LargeUtf8, _) => { + make_scalar_function(split_part::, vec![])(args) + } + (first_type, second_type) => exec_err!( + "unsupported first type {} and second type {} for split_part function", + first_type, + second_type + ), } } } +macro_rules! process_split_part { + ($string_array: expr, $delimiter_array: expr, $n_array: expr) => {{ + let result = $string_array + .iter() + .zip($delimiter_array.iter()) + .zip($n_array.iter()) + .map(|((string, delimiter), n)| match (string, delimiter, n) { + (Some(string), Some(delimiter), Some(n)) => { + let split_string: Vec<&str> = string.split(delimiter).collect(); + let len = split_string.len(); + + let index = match n.cmp(&0) { + std::cmp::Ordering::Less => len as i64 + n, + std::cmp::Ordering::Equal => { + return exec_err!("field position must not be zero"); + } + std::cmp::Ordering::Greater => n - 1, + } as usize; + + if index < len { + Ok(Some(split_string[index])) + } else { + Ok(Some("")) + } + } + _ => Ok(None), + }) + .collect::>>()?; + Ok(Arc::new(result) as ArrayRef) + }}; +} + /// Splits string at occurrences of delimiter and returns the n'th field (counting from one). /// split_part('abc~@~def~@~ghi', '~@~', 2) = 'def' -fn split_part(args: &[ArrayRef]) -> Result { - let string_array = as_generic_string_array::(&args[0])?; - let delimiter_array = as_generic_string_array::(&args[1])?; +fn split_part( + args: &[ArrayRef], +) -> Result { let n_array = as_int64_array(&args[2])?; - let result = string_array - .iter() - .zip(delimiter_array.iter()) - .zip(n_array.iter()) - .map(|((string, delimiter), n)| match (string, delimiter, n) { - (Some(string), Some(delimiter), Some(n)) => { - let split_string: Vec<&str> = string.split(delimiter).collect(); - let len = split_string.len(); - - let index = match n.cmp(&0) { - std::cmp::Ordering::Less => len as i64 + n, - std::cmp::Ordering::Equal => { - return exec_err!("field position must not be zero"); - } - std::cmp::Ordering::Greater => n - 1, - } as usize; - - if index < len { - Ok(Some(split_string[index])) - } else { - Ok(Some("")) + match (args[0].data_type(), args[1].data_type()) { + (DataType::Utf8View, _) => { + let string_array = as_string_view_array(&args[0])?; + match args[1].data_type() { + DataType::Utf8View => { + let delimiter_array = as_string_view_array(&args[1])?; + process_split_part!(string_array, delimiter_array, n_array) + } + _ => { + let delimiter_array = + as_generic_string_array::(&args[1])?; + process_split_part!(string_array, delimiter_array, n_array) } } - _ => Ok(None), - }) - .collect::>>()?; - - Ok(Arc::new(result) as ArrayRef) + } + (_, DataType::Utf8View) => { + let delimiter_array = as_string_view_array(&args[1])?; + match args[0].data_type() { + DataType::Utf8View => { + let string_array = as_string_view_array(&args[0])?; + process_split_part!(string_array, delimiter_array, n_array) + } + _ => { + let string_array = as_generic_string_array::(&args[0])?; + process_split_part!(string_array, delimiter_array, n_array) + } + } + } + (_, _) => { + let string_array = as_generic_string_array::(&args[0])?; + let delimiter_array = as_generic_string_array::(&args[1])?; + process_split_part!(string_array, delimiter_array, n_array) + } + } } #[cfg(test)] diff --git a/datafusion/functions/src/unicode/lpad.rs b/datafusion/functions/src/unicode/lpad.rs index 521cdc5d0ff0..e102673c4253 100644 --- a/datafusion/functions/src/unicode/lpad.rs +++ b/datafusion/functions/src/unicode/lpad.rs @@ -20,8 +20,8 @@ use std::fmt::Write; use std::sync::Arc; use arrow::array::{ - Array, ArrayAccessor, ArrayIter, ArrayRef, AsArray, GenericStringArray, - GenericStringBuilder, Int64Array, OffsetSizeTrait, StringViewArray, + Array, ArrayRef, AsArray, GenericStringArray, GenericStringBuilder, Int64Array, + OffsetSizeTrait, StringViewArray, }; use arrow::datatypes::DataType; use unicode_segmentation::UnicodeSegmentation; @@ -32,6 +32,7 @@ use datafusion_common::{exec_err, Result}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use crate::string::common::StringArrayType; use crate::utils::{make_scalar_function, utf8_to_str_type}; #[derive(Debug)] @@ -248,20 +249,6 @@ where Ok(Arc::new(array) as ArrayRef) } -trait StringArrayType<'a>: ArrayAccessor + Sized { - fn iter(&self) -> ArrayIter; -} -impl<'a, T: OffsetSizeTrait> StringArrayType<'a> for &'a GenericStringArray { - fn iter(&self) -> ArrayIter { - GenericStringArray::::iter(self) - } -} -impl<'a> StringArrayType<'a> for &'a StringViewArray { - fn iter(&self) -> ArrayIter { - StringViewArray::iter(self) - } -} - #[cfg(test)] mod tests { use crate::unicode::lpad::LPadFunc; diff --git a/datafusion/functions/src/unicode/mod.rs b/datafusion/functions/src/unicode/mod.rs index 9e8c07cd36ed..40915bc9efde 100644 --- a/datafusion/functions/src/unicode/mod.rs +++ b/datafusion/functions/src/unicode/mod.rs @@ -125,7 +125,7 @@ pub mod expr_fn { } } -/// Return a list of all functions in this package +/// Returns all DataFusion functions defined in this package pub fn functions() -> Vec> { vec![ character_length(), diff --git a/datafusion/functions/src/unicode/reverse.rs b/datafusion/functions/src/unicode/reverse.rs index 52666cc57059..da16d3ee3752 100644 --- a/datafusion/functions/src/unicode/reverse.rs +++ b/datafusion/functions/src/unicode/reverse.rs @@ -18,12 +18,14 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; +use arrow::array::{ + Array, ArrayAccessor, ArrayIter, ArrayRef, AsArray, GenericStringArray, + OffsetSizeTrait, +}; use arrow::datatypes::DataType; - -use datafusion_common::cast::as_generic_string_array; use datafusion_common::{exec_err, Result}; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; +use DataType::{LargeUtf8, Utf8, Utf8View}; use crate::utils::{make_scalar_function, utf8_to_str_type}; @@ -44,7 +46,7 @@ impl ReverseFunc { Self { signature: Signature::uniform( 1, - vec![Utf8, LargeUtf8], + vec![Utf8View, Utf8, LargeUtf8], Volatility::Immutable, ), } @@ -70,8 +72,8 @@ impl ScalarUDFImpl for ReverseFunc { fn invoke(&self, args: &[ColumnarValue]) -> Result { match args[0].data_type() { - DataType::Utf8 => make_scalar_function(reverse::, vec![])(args), - DataType::LargeUtf8 => make_scalar_function(reverse::, vec![])(args), + Utf8 | Utf8View => make_scalar_function(reverse::, vec![])(args), + LargeUtf8 => make_scalar_function(reverse::, vec![])(args), other => { exec_err!("Unsupported data type {other:?} for function reverse") } @@ -83,10 +85,17 @@ impl ScalarUDFImpl for ReverseFunc { /// reverse('abcde') = 'edcba' /// The implementation uses UTF-8 code points as characters pub fn reverse(args: &[ArrayRef]) -> Result { - let string_array = as_generic_string_array::(&args[0])?; + if args[0].data_type() == &Utf8View { + reverse_impl::(args[0].as_string_view()) + } else { + reverse_impl::(args[0].as_string::()) + } +} - let result = string_array - .iter() +fn reverse_impl<'a, T: OffsetSizeTrait, V: ArrayAccessor>( + string_array: V, +) -> Result { + let result = ArrayIter::new(string_array) .map(|string| string.map(|string: &str| string.chars().rev().collect::())) .collect::>(); @@ -95,8 +104,8 @@ pub fn reverse(args: &[ArrayRef]) -> Result { #[cfg(test)] mod tests { - use arrow::array::{Array, StringArray}; - use arrow::datatypes::DataType::Utf8; + use arrow::array::{Array, LargeStringArray, StringArray}; + use arrow::datatypes::DataType::{LargeUtf8, Utf8}; use datafusion_common::{Result, ScalarValue}; use datafusion_expr::{ColumnarValue, ScalarUDFImpl}; @@ -104,50 +113,49 @@ mod tests { use crate::unicode::reverse::ReverseFunc; use crate::utils::test::test_function; + macro_rules! test_reverse { + ($INPUT:expr, $EXPECTED:expr) => { + test_function!( + ReverseFunc::new(), + &[ColumnarValue::Scalar(ScalarValue::Utf8($INPUT))], + $EXPECTED, + &str, + Utf8, + StringArray + ); + + test_function!( + ReverseFunc::new(), + &[ColumnarValue::Scalar(ScalarValue::LargeUtf8($INPUT))], + $EXPECTED, + &str, + LargeUtf8, + LargeStringArray + ); + + test_function!( + ReverseFunc::new(), + &[ColumnarValue::Scalar(ScalarValue::Utf8View($INPUT))], + $EXPECTED, + &str, + Utf8, + StringArray + ); + }; + } + #[test] fn test_functions() -> Result<()> { - test_function!( - ReverseFunc::new(), - &[ColumnarValue::Scalar(ScalarValue::from("abcde"))], - Ok(Some("edcba")), - &str, - Utf8, - StringArray - ); - test_function!( - ReverseFunc::new(), - &[ColumnarValue::Scalar(ScalarValue::from("loẅks"))], - Ok(Some("sk̈wol")), - &str, - Utf8, - StringArray - ); - test_function!( - ReverseFunc::new(), - &[ColumnarValue::Scalar(ScalarValue::from("loẅks"))], - Ok(Some("sk̈wol")), - &str, - Utf8, - StringArray - ); - test_function!( - ReverseFunc::new(), - &[ColumnarValue::Scalar(ScalarValue::Utf8(None))], - Ok(None), - &str, - Utf8, - StringArray - ); + test_reverse!(Some("abcde".into()), Ok(Some("edcba"))); + test_reverse!(Some("loẅks".into()), Ok(Some("sk̈wol"))); + test_reverse!(Some("loẅks".into()), Ok(Some("sk̈wol"))); + test_reverse!(None, Ok(None)); #[cfg(not(feature = "unicode_expressions"))] - test_function!( - ReverseFunc::new(), - &[ColumnarValue::Scalar(ScalarValue::from("abcde"))], + test_reverse!( + Some("abcde".into()), internal_err!( "function reverse requires compilation with feature flag: unicode_expressions." ), - &str, - Utf8, - StringArray ); Ok(()) diff --git a/datafusion/functions/src/unicode/substr.rs b/datafusion/functions/src/unicode/substr.rs index 9d15920bb655..9fd8c75eab23 100644 --- a/datafusion/functions/src/unicode/substr.rs +++ b/datafusion/functions/src/unicode/substr.rs @@ -19,10 +19,12 @@ use std::any::Any; use std::cmp::max; use std::sync::Arc; -use arrow::array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; +use arrow::array::{ + ArrayAccessor, ArrayIter, ArrayRef, AsArray, GenericStringArray, OffsetSizeTrait, +}; use arrow::datatypes::DataType; -use datafusion_common::cast::{as_generic_string_array, as_int64_array}; +use datafusion_common::cast::as_int64_array; use datafusion_common::{exec_err, Result}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; @@ -51,6 +53,8 @@ impl SubstrFunc { Exact(vec![LargeUtf8, Int64]), Exact(vec![Utf8, Int64, Int64]), Exact(vec![LargeUtf8, Int64, Int64]), + Exact(vec![Utf8View, Int64]), + Exact(vec![Utf8View, Int64, Int64]), ], Volatility::Immutable, ), @@ -77,11 +81,7 @@ impl ScalarUDFImpl for SubstrFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - match args[0].data_type() { - DataType::Utf8 => make_scalar_function(substr::, vec![])(args), - DataType::LargeUtf8 => make_scalar_function(substr::, vec![])(args), - other => exec_err!("Unsupported data type {other:?} for function substr"), - } + make_scalar_function(substr, vec![])(args) } fn aliases(&self) -> &[String] { @@ -89,18 +89,39 @@ impl ScalarUDFImpl for SubstrFunc { } } +pub fn substr(args: &[ArrayRef]) -> Result { + match args[0].data_type() { + DataType::Utf8 => { + let string_array = args[0].as_string::(); + calculate_substr::<_, i32>(string_array, &args[1..]) + } + DataType::LargeUtf8 => { + let string_array = args[0].as_string::(); + calculate_substr::<_, i64>(string_array, &args[1..]) + } + DataType::Utf8View => { + let string_array = args[0].as_string_view(); + calculate_substr::<_, i32>(string_array, &args[1..]) + } + other => exec_err!("Unsupported data type {other:?} for function substr"), + } +} + /// Extracts the substring of string starting at the start'th character, and extending for count characters if that is specified. (Same as substring(string from start for count).) /// substr('alphabet', 3) = 'phabet' /// substr('alphabet', 3, 2) = 'ph' /// The implementation uses UTF-8 code points as characters -pub fn substr(args: &[ArrayRef]) -> Result { +fn calculate_substr<'a, V, T>(string_array: V, args: &[ArrayRef]) -> Result +where + V: ArrayAccessor, + T: OffsetSizeTrait, +{ match args.len() { - 2 => { - let string_array = as_generic_string_array::(&args[0])?; - let start_array = as_int64_array(&args[1])?; + 1 => { + let iter = ArrayIter::new(string_array); + let start_array = as_int64_array(&args[0])?; - let result = string_array - .iter() + let result = iter .zip(start_array.iter()) .map(|(string, start)| match (string, start) { (Some(string), Some(start)) => { @@ -113,16 +134,14 @@ pub fn substr(args: &[ArrayRef]) -> Result { _ => None, }) .collect::>(); - Ok(Arc::new(result) as ArrayRef) } - 3 => { - let string_array = as_generic_string_array::(&args[0])?; - let start_array = as_int64_array(&args[1])?; - let count_array = as_int64_array(&args[2])?; + 2 => { + let iter = ArrayIter::new(string_array); + let start_array = as_int64_array(&args[0])?; + let count_array = as_int64_array(&args[1])?; - let result = string_array - .iter() + let result = iter .zip(start_array.iter()) .zip(count_array.iter()) .map(|((string, start), count)| match (string, start, count) { @@ -162,6 +181,71 @@ mod tests { #[test] fn test_functions() -> Result<()> { + test_function!( + SubstrFunc::new(), + &[ + ColumnarValue::Scalar(ScalarValue::Utf8View(None)), + ColumnarValue::Scalar(ScalarValue::from(1i64)), + ], + Ok(None), + &str, + Utf8, + StringArray + ); + test_function!( + SubstrFunc::new(), + &[ + ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from( + "alphabet" + )))), + ColumnarValue::Scalar(ScalarValue::from(0i64)), + ], + Ok(Some("alphabet")), + &str, + Utf8, + StringArray + ); + test_function!( + SubstrFunc::new(), + &[ + ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from( + "joséésoj" + )))), + ColumnarValue::Scalar(ScalarValue::from(5i64)), + ], + Ok(Some("ésoj")), + &str, + Utf8, + StringArray + ); + test_function!( + SubstrFunc::new(), + &[ + ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from( + "alphabet" + )))), + ColumnarValue::Scalar(ScalarValue::from(3i64)), + ColumnarValue::Scalar(ScalarValue::from(2i64)), + ], + Ok(Some("ph")), + &str, + Utf8, + StringArray + ); + test_function!( + SubstrFunc::new(), + &[ + ColumnarValue::Scalar(ScalarValue::Utf8View(Some(String::from( + "alphabet" + )))), + ColumnarValue::Scalar(ScalarValue::from(3i64)), + ColumnarValue::Scalar(ScalarValue::from(20i64)), + ], + Ok(Some("phabet")), + &str, + Utf8, + StringArray + ); test_function!( SubstrFunc::new(), &[ diff --git a/datafusion/functions/src/unicode/translate.rs b/datafusion/functions/src/unicode/translate.rs index 5f64d8875bf5..a42b9c6cb857 100644 --- a/datafusion/functions/src/unicode/translate.rs +++ b/datafusion/functions/src/unicode/translate.rs @@ -18,18 +18,18 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::{ArrayRef, GenericStringArray, OffsetSizeTrait}; +use arrow::array::{ + ArrayAccessor, ArrayIter, ArrayRef, AsArray, GenericStringArray, OffsetSizeTrait, +}; use arrow::datatypes::DataType; use hashbrown::HashMap; use unicode_segmentation::UnicodeSegmentation; -use datafusion_common::cast::as_generic_string_array; +use crate::utils::{make_scalar_function, utf8_to_str_type}; use datafusion_common::{exec_err, Result}; use datafusion_expr::TypeSignature::Exact; use datafusion_expr::{ColumnarValue, ScalarUDFImpl, Signature, Volatility}; -use crate::utils::{make_scalar_function, utf8_to_str_type}; - #[derive(Debug)] pub struct TranslateFunc { signature: Signature, @@ -46,7 +46,10 @@ impl TranslateFunc { use DataType::*; Self { signature: Signature::one_of( - vec![Exact(vec![Utf8, Utf8, Utf8])], + vec![ + Exact(vec![Utf8View, Utf8, Utf8]), + Exact(vec![Utf8, Utf8, Utf8]), + ], Volatility::Immutable, ), } @@ -71,27 +74,54 @@ impl ScalarUDFImpl for TranslateFunc { } fn invoke(&self, args: &[ColumnarValue]) -> Result { - match args[0].data_type() { - DataType::Utf8 => make_scalar_function(translate::, vec![])(args), - DataType::LargeUtf8 => make_scalar_function(translate::, vec![])(args), - other => { - exec_err!("Unsupported data type {other:?} for function translate") - } + make_scalar_function(invoke_translate, vec![])(args) + } +} + +fn invoke_translate(args: &[ArrayRef]) -> Result { + match args[0].data_type() { + DataType::Utf8View => { + let string_array = args[0].as_string_view(); + let from_array = args[1].as_string::(); + let to_array = args[2].as_string::(); + translate::(string_array, from_array, to_array) + } + DataType::Utf8 => { + let string_array = args[0].as_string::(); + let from_array = args[1].as_string::(); + let to_array = args[2].as_string::(); + translate::(string_array, from_array, to_array) + } + DataType::LargeUtf8 => { + let string_array = args[0].as_string::(); + let from_array = args[1].as_string::(); + let to_array = args[2].as_string::(); + translate::(string_array, from_array, to_array) + } + other => { + exec_err!("Unsupported data type {other:?} for function translate") } } } /// Replaces each character in string that matches a character in the from set with the corresponding character in the to set. If from is longer than to, occurrences of the extra characters in from are deleted. /// translate('12345', '143', 'ax') = 'a2x5' -fn translate(args: &[ArrayRef]) -> Result { - let string_array = as_generic_string_array::(&args[0])?; - let from_array = as_generic_string_array::(&args[1])?; - let to_array = as_generic_string_array::(&args[2])?; - - let result = string_array - .iter() - .zip(from_array.iter()) - .zip(to_array.iter()) +fn translate<'a, T: OffsetSizeTrait, V, B>( + string_array: V, + from_array: B, + to_array: B, +) -> Result +where + V: ArrayAccessor, + B: ArrayAccessor, +{ + let string_array_iter = ArrayIter::new(string_array); + let from_array_iter = ArrayIter::new(from_array); + let to_array_iter = ArrayIter::new(to_array); + + let result = string_array_iter + .zip(from_array_iter) + .zip(to_array_iter) .map(|((string, from), to)| match (string, from, to) { (Some(string), Some(from), Some(to)) => { // create a hashmap of [char, index] to change from O(n) to O(1) for from list diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index 347a5d82dbec..06f54481a6fa 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -2498,6 +2498,111 @@ mod tests { Ok(()) } + #[test] + fn regex_with_nulls() -> Result<()> { + let schema = Schema::new(vec![ + Field::new("a", DataType::Utf8, true), + Field::new("b", DataType::Utf8, true), + ]); + let a = Arc::new(StringArray::from(vec![ + Some("abc"), + None, + Some("abc"), + None, + Some("abc"), + ])) as ArrayRef; + let b = Arc::new(StringArray::from(vec![ + Some("^a"), + Some("^A"), + None, + None, + Some("^(b|c)"), + ])) as ArrayRef; + + let regex_expected = + BooleanArray::from(vec![Some(true), None, None, None, Some(false)]); + let regex_not_expected = + BooleanArray::from(vec![Some(false), None, None, None, Some(true)]); + apply_logic_op( + &Arc::new(schema.clone()), + &a, + &b, + Operator::RegexMatch, + regex_expected.clone(), + )?; + apply_logic_op( + &Arc::new(schema.clone()), + &a, + &b, + Operator::RegexIMatch, + regex_expected.clone(), + )?; + apply_logic_op( + &Arc::new(schema.clone()), + &a, + &b, + Operator::RegexNotMatch, + regex_not_expected.clone(), + )?; + apply_logic_op( + &Arc::new(schema), + &a, + &b, + Operator::RegexNotIMatch, + regex_not_expected.clone(), + )?; + + let schema = Schema::new(vec![ + Field::new("a", DataType::LargeUtf8, true), + Field::new("b", DataType::LargeUtf8, true), + ]); + let a = Arc::new(LargeStringArray::from(vec![ + Some("abc"), + None, + Some("abc"), + None, + Some("abc"), + ])) as ArrayRef; + let b = Arc::new(LargeStringArray::from(vec![ + Some("^a"), + Some("^A"), + None, + None, + Some("^(b|c)"), + ])) as ArrayRef; + + apply_logic_op( + &Arc::new(schema.clone()), + &a, + &b, + Operator::RegexMatch, + regex_expected.clone(), + )?; + apply_logic_op( + &Arc::new(schema.clone()), + &a, + &b, + Operator::RegexIMatch, + regex_expected.clone(), + )?; + apply_logic_op( + &Arc::new(schema.clone()), + &a, + &b, + Operator::RegexNotMatch, + regex_not_expected.clone(), + )?; + apply_logic_op( + &Arc::new(schema), + &a, + &b, + Operator::RegexNotIMatch, + regex_not_expected.clone(), + )?; + + Ok(()) + } + #[test] fn or_with_nulls_op() -> Result<()> { let schema = Schema::new(vec![ diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 2840d3f62bf9..0868ee721665 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -693,9 +693,8 @@ mod tests { assert_contains!( err.to_string(), - "External error: Resources exhausted: Failed to allocate additional" + "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: CrossJoinExec" ); - assert_contains!(err.to_string(), "CrossJoinExec"); Ok(()) } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 14835f717ea3..e40a07cf6220 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -3821,13 +3821,11 @@ mod tests { let stream = join.execute(0, task_ctx)?; let err = common::collect(stream).await.unwrap_err(); + // Asserting that operator-level reservation attempting to overallocate assert_contains!( err.to_string(), - "External error: Resources exhausted: Failed to allocate additional" + "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput" ); - - // Asserting that operator-level reservation attempting to overallocate - assert_contains!(err.to_string(), "HashJoinInput"); } Ok(()) @@ -3902,13 +3900,12 @@ mod tests { let stream = join.execute(1, task_ctx)?; let err = common::collect(stream).await.unwrap_err(); + // Asserting that stream-level reservation attempting to overallocate assert_contains!( err.to_string(), - "External error: Resources exhausted: Failed to allocate additional" - ); + "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: HashJoinInput[1]" - // Asserting that stream-level reservation attempting to overallocate - assert_contains!(err.to_string(), "HashJoinInput[1]"); + ); } Ok(()) diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index d69d818331be..04a025c93288 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1039,9 +1039,8 @@ mod tests { assert_contains!( err.to_string(), - "External error: Resources exhausted: Failed to allocate additional" + "External error: Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as: NestedLoopJoinLoad[0]" ); - assert_contains!(err.to_string(), "NestedLoopJoinLoad[0]"); } Ok(()) diff --git a/datafusion/physical-plan/src/windows/mod.rs b/datafusion/physical-plan/src/windows/mod.rs index 1fd0ca36b1eb..03090faf3efd 100644 --- a/datafusion/physical-plan/src/windows/mod.rs +++ b/datafusion/physical-plan/src/windows/mod.rs @@ -113,7 +113,6 @@ pub fn create_window_expr( let aggregate = AggregateExprBuilder::new(Arc::clone(fun), args.to_vec()) .schema(Arc::new(input_schema.clone())) .alias(name) - .order_by(order_by.to_vec()) .with_ignore_nulls(ignore_nulls) .build()?; window_expr_from_aggregate_expr( diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 322ddcdb047b..0cda24d6ff5e 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -1881,6 +1881,12 @@ SELECT MIN(c1), MIN(c2) FROM test ---- 0 1 +query error min/max was called with 2 arguments. It requires only 1. +SELECT MIN(c1, c2) FROM test + +query error min/max was called with 2 arguments. It requires only 1. +SELECT MAX(c1, c2) FROM test + # aggregate_grouped query II SELECT c1, SUM(c2) FROM test GROUP BY c1 order by c1 @@ -3724,6 +3730,51 @@ SELECT bool_or(distinct c1), bool_or(distinct c2), bool_or(distinct c3), bool_or ---- true true true false true true false NULL +# Test issue: https://github.com/apache/datafusion/issues/11846 +statement ok +create table t1(v1 int, v2 boolean); + +statement ok +insert into t1 values (1, true), (1, true); + +statement ok +insert into t1 values (3, null), (3, true); + +statement ok +insert into t1 values (2, false), (2, true); + +statement ok +insert into t1 values (6, false), (6, false); + +statement ok +insert into t1 values (4, null), (4, null); + +statement ok +insert into t1 values (5, false), (5, null); + +query IB +select v1, bool_and(v2) from t1 group by v1 order by v1; +---- +1 true +2 false +3 true +4 NULL +5 false +6 false + +query IB +select v1, bool_or(v2) from t1 group by v1 order by v1; +---- +1 true +2 true +3 true +4 NULL +5 false +6 false + +statement ok +drop table t1; + # All supported timestamp types # "nanos" --> TimestampNanosecondArray diff --git a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt index ba378f4230f8..ab1c7e78f1ff 100644 --- a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt +++ b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt @@ -40,6 +40,22 @@ STORED AS CSV LOCATION '../../testing/data/csv/aggregate_test_100.csv' OPTIONS ('format.has_header' 'true'); +# Table to test `bool_and()`, `bool_or()` aggregate functions +statement ok +CREATE TABLE aggregate_test_100_bool ( + v1 VARCHAR NOT NULL, + v2 BOOLEAN, + v3 BOOLEAN +); + +statement ok +INSERT INTO aggregate_test_100_bool +SELECT + c1 as v1, + CASE WHEN c2 > 3 THEN TRUE WHEN c2 > 1 THEN FALSE ELSE NULL END as v2, + CASE WHEN c1='a' OR c1='b' THEN TRUE WHEN c1='c' OR c1='d' THEN FALSE ELSE NULL END as v3 +FROM aggregate_test_100; + # Prepare settings to skip partial aggregation from the beginning statement ok set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 0; @@ -117,6 +133,33 @@ GROUP BY 1, 2 ORDER BY 1 LIMIT 5; -2117946883 d -2117946883 NULL NULL NULL -2098805236 c -2098805236 NULL NULL NULL +# FIXME: add bool_and(v3) column when issue fixed +# ISSUE https://github.com/apache/datafusion/issues/11846 +query TBBB rowsort +select v1, bool_or(v2), bool_and(v2), bool_or(v3) +from aggregate_test_100_bool +group by v1 +---- +a true false true +b true false true +c true false false +d true false false +e true false NULL + +query TBBB rowsort +select v1, + bool_or(v2) FILTER (WHERE v1 = 'a' OR v1 = 'c' OR v1 = 'e'), + bool_or(v2) FILTER (WHERE v2 = false), + bool_or(v2) FILTER (WHERE v2 = NULL) +from aggregate_test_100_bool +group by v1 +---- +a true false NULL +b NULL false NULL +c true false NULL +d NULL false NULL +e true false NULL + # Prepare settings to always skip aggregation after couple of batches statement ok set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 10; @@ -223,6 +266,32 @@ c 2.666666666667 0.425241138254 d 2.444444444444 0.541519476308 e 3 0.505440263521 +# FIXME: add bool_and(v3) column when issue fixed +# ISSUE https://github.com/apache/datafusion/issues/11846 +query TBBB rowsort +select v1, bool_or(v2), bool_and(v2), bool_or(v3) +from aggregate_test_100_bool +group by v1 +---- +a true false true +b true false true +c true false false +d true false false +e true false NULL + +query TBBB rowsort +select v1, + bool_or(v2) FILTER (WHERE v1 = 'a' OR v1 = 'c' OR v1 = 'e'), + bool_or(v2) FILTER (WHERE v2 = false), + bool_or(v2) FILTER (WHERE v2 = NULL) +from aggregate_test_100_bool +group by v1 +---- +a true false NULL +b NULL false NULL +c true false NULL +d NULL false NULL +e true false NULL # Enabling PG dialect for filtered aggregates tests statement ok @@ -377,3 +446,48 @@ ORDER BY i; statement ok DROP TABLE decimal_table; + +# Extra tests for 'bool_*()' edge cases +statement ok +set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 0; + +statement ok +set datafusion.execution.skip_partial_aggregation_probe_ratio_threshold = 0.0; + +statement ok +set datafusion.execution.target_partitions = 1; + +statement ok +set datafusion.execution.batch_size = 1; + +statement ok +create table bool_aggregate_functions ( + c1 boolean not null, + c2 boolean not null, + c3 boolean not null, + c4 boolean not null, + c5 boolean, + c6 boolean, + c7 boolean, + c8 boolean +) +as values + (true, true, false, false, true, true, null, null), + (true, false, true, false, false, null, false, null), + (true, true, false, false, null, true, false, null); + +query BBBBBBBB +SELECT bool_and(c1), bool_and(c2), bool_and(c3), bool_and(c4), bool_and(c5), bool_and(c6), bool_and(c7), bool_and(c8) FROM bool_aggregate_functions +---- +true false false false false true false NULL + +statement ok +set datafusion.execution.skip_partial_aggregation_probe_rows_threshold = 2; + +query BBBBBBBB +SELECT bool_and(c1), bool_and(c2), bool_and(c3), bool_and(c4), bool_and(c5), bool_and(c6), bool_and(c7), bool_and(c8) FROM bool_aggregate_functions +---- +true false false false false true false NULL + +statement ok +DROP TABLE aggregate_test_100_bool diff --git a/datafusion/sqllogictest/test_files/functions.slt b/datafusion/sqllogictest/test_files/functions.slt index 04ab0d76e65f..cb592fdda0c8 100644 --- a/datafusion/sqllogictest/test_files/functions.slt +++ b/datafusion/sqllogictest/test_files/functions.slt @@ -234,6 +234,16 @@ SELECT reverse('abcde') ---- edcba +query T +SELECT reverse(arrow_cast('abcde', 'LargeUtf8')) +---- +edcba + +query T +SELECT reverse(arrow_cast('abcde', 'Utf8View')) +---- +edcba + query T SELECT reverse(arrow_cast('abcde', 'Dictionary(Int32, Utf8)')) ---- @@ -244,11 +254,31 @@ SELECT reverse('loẅks') ---- sk̈wol +query T +SELECT reverse(arrow_cast('loẅks', 'LargeUtf8')) +---- +sk̈wol + +query T +SELECT reverse(arrow_cast('loẅks', 'Utf8View')) +---- +sk̈wol + query T SELECT reverse(NULL) ---- NULL +query T +SELECT reverse(arrow_cast(NULL, 'LargeUtf8')) +---- +NULL + +query T +SELECT reverse(arrow_cast(NULL, 'Utf8View')) +---- +NULL + query T SELECT right('abcde', -2) ---- @@ -816,6 +846,38 @@ SELECT split_part(arrow_cast('foo_bar', 'Dictionary(Int32, Utf8)'), '_', 2) ---- bar +# test largeutf8, utf8view for split_part +query T +SELECT split_part(arrow_cast('large_apple_large_orange_large_banana', 'LargeUtf8'), '_', 3) +---- +large + +query T +SELECT split_part(arrow_cast('view_apple_view_orange_view_banana', 'Utf8View'), '_', 3); +---- +view + +query T +SELECT split_part('test_large_split_large_case', arrow_cast('_large', 'LargeUtf8'), 2) +---- +_split + +query T +SELECT split_part(arrow_cast('huge_large_apple_large_orange_large_banana', 'LargeUtf8'), arrow_cast('_', 'Utf8View'), 2) +---- +large + +query T +SELECT split_part(arrow_cast('view_apple_view_large_banana', 'Utf8View'), arrow_cast('_large', 'LargeUtf8'), 2) +---- +_banana + +query T +SELECT split_part(NULL, '_', 2) +---- +NULL + + query B SELECT starts_with('foobar', 'foo') ---- @@ -925,7 +987,7 @@ SELECT products.* REPLACE (price*2 AS price, product_id+1000 AS product_id) FROM 1003 OldBrand Product 3 79.98 1004 OldBrand Product 4 99.98 -#overlay tests +# overlay tests statement ok CREATE TABLE over_test( str TEXT, @@ -967,6 +1029,31 @@ NULL Thomxas NULL +# overlay tests with utf8view +query T +SELECT overlay(arrow_cast(str, 'Utf8View') placing arrow_cast(characters, 'Utf8View') from pos for len) from over_test +---- +abc +qwertyasdfg +ijkz +Thomas +NULL +NULL +NULL +NULL + +query T +SELECT overlay(arrow_cast(str, 'Utf8View') placing arrow_cast(characters, 'Utf8View') from pos) from over_test +---- +abc +qwertyasdfg +ijk +Thomxas +NULL +NULL +Thomxas +NULL + query I SELECT levenshtein('kitten', 'sitting') ---- diff --git a/datafusion/sqllogictest/test_files/regexp.slt b/datafusion/sqllogictest/test_files/regexp.slt index 149ad7f6fdcd..c04021651a50 100644 --- a/datafusion/sqllogictest/test_files/regexp.slt +++ b/datafusion/sqllogictest/test_files/regexp.slt @@ -230,6 +230,66 @@ SELECT regexp_match('aaa-555', '.*-(\d*)'); ---- [555] +query B +select 'abc' ~ null; +---- +NULL + +query B +select null ~ null; +---- +NULL + +query B +select null ~ 'abc'; +---- +NULL + +query B +select 'abc' ~* null; +---- +NULL + +query B +select null ~* null; +---- +NULL + +query B +select null ~* 'abc'; +---- +NULL + +query B +select 'abc' !~ null; +---- +NULL + +query B +select null !~ null; +---- +NULL + +query B +select null !~ 'abc'; +---- +NULL + +query B +select 'abc' !~* null; +---- +NULL + +query B +select null !~* null; +---- +NULL + +query B +select null !~* 'abc'; +---- +NULL + # # regexp_replace tests # @@ -335,6 +395,26 @@ SELECT 'foo\nbar\nbaz' LIKE '%bar%'; ---- true +query B +SELECT NULL LIKE NULL; +---- +NULL + +query B +SELECT NULL iLIKE NULL; +---- +NULL + +query B +SELECT NULL not LIKE NULL; +---- +NULL + +query B +SELECT NULL not iLIKE NULL; +---- +NULL + statement ok drop table t; diff --git a/datafusion/sqllogictest/test_files/string_view.slt b/datafusion/sqllogictest/test_files/string_view.slt index e1d4a96620f8..82a714a432ba 100644 --- a/datafusion/sqllogictest/test_files/string_view.slt +++ b/datafusion/sqllogictest/test_files/string_view.slt @@ -425,6 +425,43 @@ logical_plan 01)Projection: starts_with(test.column1_utf8view, Utf8View("äöüß")) AS c1, starts_with(test.column1_utf8view, Utf8View("")) AS c2, starts_with(test.column1_utf8view, Utf8View(NULL)) AS c3, starts_with(Utf8View(NULL), test.column1_utf8view) AS c4 02)--TableScan: test projection=[column1_utf8view] +### Test TRANSLATE + +# Should run TRANSLATE using utf8view column successfully +query T +SELECT + TRANSLATE(column1_utf8view, 'foo', 'bar') as c +FROM test; +---- +Andrew +Xiangpeng +Raphael +NULL + +# Should run TRANSLATE using utf8 column successfully +query T +SELECT + TRANSLATE(column1_utf8, 'foo', 'bar') as c +FROM test; +---- +Andrew +Xiangpeng +Raphael +NULL + +# Should run TRANSLATE using large_utf8 column successfully +query T +SELECT + TRANSLATE(column1_large_utf8, 'foo', 'bar') as c +FROM test; +---- +Andrew +Xiangpeng +Raphael +NULL + + + ### Initcap query TT @@ -484,7 +521,30 @@ logical_plan 01)Projection: test.column1_utf8view LIKE Utf8View("foo") AS like, test.column1_utf8view ILIKE Utf8View("foo") AS ilike 02)--TableScan: test projection=[column1_utf8view] +## Ensure no casts for SUBSTR +query TT +EXPLAIN SELECT + SUBSTR(column1_utf8view, 1, 3) as c1, + SUBSTR(column2_utf8, 1, 3) as c2, + SUBSTR(column2_large_utf8, 1, 3) as c3 +FROM test; +---- +logical_plan +01)Projection: substr(test.column1_utf8view, Int64(1), Int64(3)) AS c1, substr(test.column2_utf8, Int64(1), Int64(3)) AS c2, substr(test.column2_large_utf8, Int64(1), Int64(3)) AS c3 +02)--TableScan: test projection=[column2_utf8, column2_large_utf8, column1_utf8view] + +query TTT +SELECT + SUBSTR(column1_utf8view, 1, 3) as c1, + SUBSTR(column2_utf8, 1, 3) as c2, + SUBSTR(column2_large_utf8, 1, 3) as c3 +FROM test; +---- +And X X +Xia Xia Xia +Rap R R +NULL R R ## Ensure no casts for ASCII @@ -818,16 +878,23 @@ logical_plan 02)--TableScan: test projection=[column1_utf8view] ## Ensure no casts for OVERLAY -## TODO file ticket query TT EXPLAIN SELECT OVERLAY(column1_utf8view PLACING 'foo' FROM 2 ) as c1 FROM test; ---- logical_plan -01)Projection: overlay(CAST(test.column1_utf8view AS Utf8), Utf8("foo"), Int64(2)) AS c1 +01)Projection: overlay(test.column1_utf8view, Utf8View("foo"), Int64(2)) AS c1 02)--TableScan: test projection=[column1_utf8view] +query T +SELECT OVERLAY(column1_utf8view PLACING 'foo' FROM 2 ) as c1 FROM test; +---- +Afooew +Xfoogpeng +Rfooael +NULL + ## Ensure no casts for REGEXP_LIKE query TT EXPLAIN SELECT @@ -883,14 +950,13 @@ logical_plan 03)----TableScan: test projection=[column1_utf8view, column2_utf8view] ## Ensure no casts for REVERSE -## TODO file ticket query TT EXPLAIN SELECT REVERSE(column1_utf8view) as c1 FROM test; ---- logical_plan -01)Projection: reverse(CAST(test.column1_utf8view AS Utf8)) AS c1 +01)Projection: reverse(test.column1_utf8view) AS c1 02)--TableScan: test projection=[column1_utf8view] @@ -974,11 +1040,12 @@ logical_plan ## TODO file ticket query TT EXPLAIN SELECT - SPLIT_PART(column1_utf8view, 'f', 1) as c + SPLIT_PART(column1_utf8view, 'f', 1) as c1, + SPLIT_PART('testtesttest',column1_utf8view, 1) as c2 FROM test; ---- logical_plan -01)Projection: split_part(CAST(test.column1_utf8view AS Utf8), Utf8("f"), Int64(1)) AS c +01)Projection: split_part(test.column1_utf8view, Utf8("f"), Int64(1)) AS c1, split_part(Utf8("testtesttest"), test.column1_utf8view, Int64(1)) AS c2 02)--TableScan: test projection=[column1_utf8view] ## Ensure no casts for STRPOS @@ -1003,9 +1070,8 @@ EXPLAIN SELECT FROM test; ---- logical_plan -01)Projection: substr(__common_expr_1, Int64(1)) AS c, substr(__common_expr_1, Int64(1), Int64(2)) AS c2 -02)--Projection: CAST(test.column1_utf8view AS Utf8) AS __common_expr_1 -03)----TableScan: test projection=[column1_utf8view] +01)Projection: substr(test.column1_utf8view, Int64(1)) AS c, substr(test.column1_utf8view, Int64(1), Int64(2)) AS c2 +02)--TableScan: test projection=[column1_utf8view] ## Ensure no casts for SUBSTRINDEX query TT @@ -1041,14 +1107,13 @@ logical_plan 02)--TableScan: test projection=[column1_utf8view, column2_utf8view] ## Ensure no casts for TRANSLATE -## TODO file ticket query TT EXPLAIN SELECT TRANSLATE(column1_utf8view, 'foo', 'bar') as c FROM test; ---- logical_plan -01)Projection: translate(CAST(test.column1_utf8view AS Utf8), Utf8("foo"), Utf8("bar")) AS c +01)Projection: translate(test.column1_utf8view, Utf8("foo"), Utf8("bar")) AS c 02)--TableScan: test projection=[column1_utf8view] ## Ensure no casts for FIND_IN_SET