Skip to content

Commit

Permalink
Merge commit 'fe71aa242d44ad337aba1e373b336da234ed4f8c' into chunchun…
Browse files Browse the repository at this point in the history
…/update-df-apr-week-1-3
  • Loading branch information
appletreeisyellow committed Apr 23, 2024
2 parents 5b989ed + fe71aa2 commit 059ce1a
Show file tree
Hide file tree
Showing 25 changed files with 777 additions and 290 deletions.
2 changes: 1 addition & 1 deletion datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions datafusion/common/src/tree_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,23 @@ pub trait TreeNode: Sized {
)
}

/// Returns true if `f` returns true for node in the tree.
///
/// Stops recursion as soon as a matching node is found
fn exists<F: FnMut(&Self) -> bool>(&self, mut f: F) -> bool {
let mut found = false;
self.apply(&mut |n| {
Ok(if f(n) {
found = true;
TreeNodeRecursion::Stop
} else {
TreeNodeRecursion::Continue
})
})
.unwrap();
found
}

/// Apply the closure `F` to the node's children.
fn apply_children<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
&self,
Expand Down
27 changes: 16 additions & 11 deletions datafusion/core/src/physical_optimizer/pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,23 @@ pub trait PruningStatistics {
fn num_containers(&self) -> usize;

/// Return the number of null values for the named column as an
/// `Option<UInt64Array>`.
/// [`UInt64Array`]
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
///
/// [`UInt64Array`]: arrow::array::UInt64Array
fn null_counts(&self, column: &Column) -> Option<ArrayRef>;

/// Return the number of rows for the named column in each container
/// as an `Option<UInt64Array>`.
/// as an [`UInt64Array`].
///
/// See [`Self::min_values`] for when to return `None` and null values.
///
/// Note: the returned array must contain [`Self::num_containers`] rows
///
/// [`UInt64Array`]: arrow::array::UInt64Array
fn row_counts(&self, column: &Column) -> Option<ArrayRef>;

/// Returns [`BooleanArray`] where each row represents information known
Expand Down Expand Up @@ -1519,6 +1523,7 @@ mod tests {
array::{BinaryArray, Int32Array, Int64Array, StringArray},
datatypes::{DataType, TimeUnit},
};
use arrow_array::UInt64Array;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr::InList;
Expand Down Expand Up @@ -1684,10 +1689,10 @@ mod tests {
/// there are containers
fn with_null_counts(
mut self,
counts: impl IntoIterator<Item = Option<i64>>,
counts: impl IntoIterator<Item = Option<u64>>,
) -> Self {
let null_counts: ArrayRef =
Arc::new(counts.into_iter().collect::<Int64Array>());
Arc::new(counts.into_iter().collect::<UInt64Array>());

self.assert_invariants();
self.null_counts = Some(null_counts);
Expand All @@ -1698,10 +1703,10 @@ mod tests {
/// there are containers
fn with_row_counts(
mut self,
counts: impl IntoIterator<Item = Option<i64>>,
counts: impl IntoIterator<Item = Option<u64>>,
) -> Self {
let row_counts: ArrayRef =
Arc::new(counts.into_iter().collect::<Int64Array>());
Arc::new(counts.into_iter().collect::<UInt64Array>());

self.assert_invariants();
self.row_counts = Some(row_counts);
Expand Down Expand Up @@ -1753,13 +1758,13 @@ mod tests {
self
}

/// Add null counts for the specified columm.
/// Add null counts for the specified column.
/// There must be the same number of null counts as
/// there are containers
fn with_null_counts(
mut self,
name: impl Into<String>,
counts: impl IntoIterator<Item = Option<i64>>,
counts: impl IntoIterator<Item = Option<u64>>,
) -> Self {
let col = Column::from_name(name.into());

Expand All @@ -1775,13 +1780,13 @@ mod tests {
self
}

/// Add row counts for the specified columm.
/// Add row counts for the specified column.
/// There must be the same number of row counts as
/// there are containers
fn with_row_counts(
mut self,
name: impl Into<String>,
counts: impl IntoIterator<Item = Option<i64>>,
counts: impl IntoIterator<Item = Option<u64>>,
) -> Self {
let col = Column::from_name(name.into());

Expand All @@ -1797,7 +1802,7 @@ mod tests {
self
}

/// Add contained information for the specified columm.
/// Add contained information for the specified column.
fn with_contained(
mut self,
name: impl Into<String>,
Expand Down
15 changes: 8 additions & 7 deletions datafusion/core/tests/simplification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::logical_plan::builder::table_scan_with_filters;
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_expr::{
expr, table_scan, BuiltinScalarFunction, Cast, ColumnarValue, Expr, ExprSchemable,
LogicalPlan, LogicalPlanBuilder, ScalarUDF, Volatility,
expr, table_scan, Cast, ColumnarValue, Expr, ExprSchemable, LogicalPlan,
LogicalPlanBuilder, ScalarUDF, Volatility,
};
use datafusion_functions::math;
use datafusion_optimizer::simplify_expressions::{ExprSimplifier, SimplifyExpressions};
use datafusion_optimizer::{OptimizerContext, OptimizerRule};
use std::sync::Arc;
Expand Down Expand Up @@ -383,17 +384,17 @@ fn test_const_evaluator_scalar_functions() {

// volatile / stable functions should not be evaluated
// rand() + (1 + 2) --> rand() + 3
let fun = BuiltinScalarFunction::Random;
assert_eq!(fun.volatility(), Volatility::Volatile);
let rand = Expr::ScalarFunction(ScalarFunction::new(fun, vec![]));
let fun = math::random();
assert_eq!(fun.signature().volatility, Volatility::Volatile);
let rand = Expr::ScalarFunction(ScalarFunction::new_udf(fun, vec![]));
let expr = rand.clone() + (lit(1) + lit(2));
let expected = rand + lit(3);
test_evaluate(expr, expected);

// parenthesization matters: can't rewrite
// (rand() + 1) + 2 --> (rand() + 1) + 2)
let fun = BuiltinScalarFunction::Random;
let rand = Expr::ScalarFunction(ScalarFunction::new(fun, vec![]));
let fun = math::random();
let rand = Expr::ScalarFunction(ScalarFunction::new_udf(fun, vec![]));
let expr = (rand + lit(1)) + lit(2);
test_evaluate(expr.clone(), expr);
}
Expand Down
21 changes: 0 additions & 21 deletions datafusion/expr/src/built_in_function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,6 @@ pub enum BuiltinScalarFunction {
Exp,
/// factorial
Factorial,
/// nanvl
Nanvl,
// string functions
/// concat
Concat,
Expand All @@ -56,8 +54,6 @@ pub enum BuiltinScalarFunction {
EndsWith,
/// initcap
InitCap,
/// random
Random,
}

/// Maps the sql function name to `BuiltinScalarFunction`
Expand Down Expand Up @@ -114,14 +110,10 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Coalesce => Volatility::Immutable,
BuiltinScalarFunction::Exp => Volatility::Immutable,
BuiltinScalarFunction::Factorial => Volatility::Immutable,
BuiltinScalarFunction::Nanvl => Volatility::Immutable,
BuiltinScalarFunction::Concat => Volatility::Immutable,
BuiltinScalarFunction::ConcatWithSeparator => Volatility::Immutable,
BuiltinScalarFunction::EndsWith => Volatility::Immutable,
BuiltinScalarFunction::InitCap => Volatility::Immutable,

// Volatile builtin functions
BuiltinScalarFunction::Random => Volatility::Volatile,
}
}

Expand Down Expand Up @@ -152,16 +144,10 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::InitCap => {
utf8_to_str_type(&input_expr_types[0], "initcap")
}
BuiltinScalarFunction::Random => Ok(Float64),
BuiltinScalarFunction::EndsWith => Ok(Boolean),

BuiltinScalarFunction::Factorial => Ok(Int64),

BuiltinScalarFunction::Nanvl => match &input_expr_types[0] {
Float32 => Ok(Float32),
_ => Ok(Float64),
},

BuiltinScalarFunction::Ceil | BuiltinScalarFunction::Exp => {
match input_expr_types[0] {
Float32 => Ok(Float32),
Expand Down Expand Up @@ -199,11 +185,6 @@ impl BuiltinScalarFunction {
],
self.volatility(),
),
BuiltinScalarFunction::Random => Signature::exact(vec![], self.volatility()),
BuiltinScalarFunction::Nanvl => Signature::one_of(
vec![Exact(vec![Float32, Float32]), Exact(vec![Float64, Float64])],
self.volatility(),
),
BuiltinScalarFunction::Factorial => {
Signature::uniform(1, vec![Int64], self.volatility())
}
Expand Down Expand Up @@ -240,8 +221,6 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Ceil => &["ceil"],
BuiltinScalarFunction::Exp => &["exp"],
BuiltinScalarFunction::Factorial => &["factorial"],
BuiltinScalarFunction::Nanvl => &["nanvl"],
BuiltinScalarFunction::Random => &["random"],

// conditional functions
BuiltinScalarFunction::Coalesce => &["coalesce"],
Expand Down
15 changes: 4 additions & 11 deletions datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::sync::Arc;

use crate::expr_fn::binary_expr;
use crate::logical_plan::Subquery;
use crate::utils::{expr_to_columns, find_out_reference_exprs};
use crate::utils::expr_to_columns;
use crate::window_frame;
use crate::{
aggregate_function, built_in_function, built_in_window_function, udaf,
Expand Down Expand Up @@ -1232,7 +1232,7 @@ impl Expr {

/// Return true when the expression contains out reference(correlated) expressions.
pub fn contains_outer(&self) -> bool {
!find_out_reference_exprs(self).is_empty()
self.exists(|expr| matches!(expr, Expr::OuterReferenceColumn { .. }))
}

/// Recursively find all [`Expr::Placeholder`] expressions, and
Expand Down Expand Up @@ -1903,8 +1903,8 @@ mod test {
use crate::expr::Cast;
use crate::expr_fn::col;
use crate::{
case, lit, BuiltinScalarFunction, ColumnarValue, Expr, ScalarFunctionDefinition,
ScalarUDF, ScalarUDFImpl, Signature, Volatility,
case, lit, ColumnarValue, Expr, ScalarFunctionDefinition, ScalarUDF,
ScalarUDFImpl, Signature, Volatility,
};
use arrow::datatypes::DataType;
use datafusion_common::Column;
Expand Down Expand Up @@ -2018,13 +2018,6 @@ mod test {

#[test]
fn test_is_volatile_scalar_func_definition() {
// BuiltIn
assert!(
ScalarFunctionDefinition::BuiltIn(BuiltinScalarFunction::Random)
.is_volatile()
.unwrap()
);

// UDF
#[derive(Debug)]
struct TestScalarUDF {
Expand Down
7 changes: 0 additions & 7 deletions datafusion/expr/src/expr_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,11 +297,6 @@ pub fn concat_ws(sep: Expr, values: Vec<Expr>) -> Expr {
))
}

/// Returns a random value in the range 0.0 <= x < 1.0
pub fn random() -> Expr {
Expr::ScalarFunction(ScalarFunction::new(BuiltinScalarFunction::Random, vec![]))
}

/// Returns the approximate number of distinct input values.
/// This function provides an approximation of count(DISTINCT x).
/// Zero is returned if all input values are null.
Expand Down Expand Up @@ -550,7 +545,6 @@ nary_scalar_expr!(
"concatenates several strings, placing a seperator between each one"
);
nary_scalar_expr!(Concat, concat_expr, "concatenates several strings");
scalar_expr!(Nanvl, nanvl, x y, "returns x if x is not NaN otherwise returns y");

/// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression.
pub fn case(expr: Expr) -> CaseBuilder {
Expand Down Expand Up @@ -922,7 +916,6 @@ mod test {
test_unary_scalar_expr!(Factorial, factorial);
test_unary_scalar_expr!(Ceil, ceil);
test_unary_scalar_expr!(Exp, exp);
test_scalar_expr!(Nanvl, nanvl, x, y);

test_scalar_expr!(InitCap, initcap, string);
test_scalar_expr!(EndsWith, ends_with, string, characters);
Expand Down
19 changes: 17 additions & 2 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,21 @@ impl LogicalPlan {
| LogicalPlan::Extension(_) => None,
}
}

/// If this node's expressions contains any references to an outer subquery
pub fn contains_outer_reference(&self) -> bool {
let mut contains = false;
self.apply_expressions(|expr| {
Ok(if expr.contains_outer() {
contains = true;
TreeNodeRecursion::Stop
} else {
TreeNodeRecursion::Continue
})
})
.unwrap();
contains
}
}

/// This macro is used to determine continuation during combined transforming
Expand Down Expand Up @@ -1446,7 +1461,7 @@ impl LogicalPlan {

/// Calls `f` on all subqueries referenced in expressions of the current
/// `LogicalPlan` node.
fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
pub fn apply_subqueries<F: FnMut(&Self) -> Result<TreeNodeRecursion>>(
&self,
mut f: F,
) -> Result<TreeNodeRecursion> {
Expand All @@ -1469,7 +1484,7 @@ impl LogicalPlan {
/// using `f`.
///
/// Returns the current node.
fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
pub fn map_subqueries<F: FnMut(Self) -> Result<Transformed<Self>>>(
self,
mut f: F,
) -> Result<Transformed<Self>> {
Expand Down
Loading

0 comments on commit 059ce1a

Please sign in to comment.