Skip to content

Commit

Permalink
Support DuckDB style struct syntax (apache#11214)
Browse files Browse the repository at this point in the history
* struct literal

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* add nested

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* fmt

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* rm useless comment

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>

* switch to NYI error, derive debug/clone

* improve documentation strings

* Avoid stack overflow by putting code in a new function

---------

Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
  • Loading branch information
jayzhan211 and alamb authored Jul 3, 2024
1 parent ecc1c01 commit fe66daa
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 27 deletions.
30 changes: 10 additions & 20 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,26 +231,16 @@ impl SessionState {
);
}

let mut user_defined_sql_planners = vec![];

// register crate of array expressions (if enabled)
#[cfg(feature = "array_expressions")]
{
let array_planner =
Arc::new(functions_array::planner::ArrayFunctionPlanner) as _;

let field_access_planner =
Arc::new(functions_array::planner::FieldAccessPlanner) as _;

user_defined_sql_planners.extend(vec![array_planner, field_access_planner]);
}
#[cfg(feature = "datetime_expressions")]
{
let extract_planner =
Arc::new(functions::datetime::planner::ExtractPlanner::default()) as _;

user_defined_sql_planners.push(extract_planner);
}
let user_defined_sql_planners: Vec<Arc<dyn UserDefinedSQLPlanner>> = vec![
Arc::new(functions::core::planner::CoreFunctionPlanner::default()),
// register crate of array expressions (if enabled)
#[cfg(feature = "array_expressions")]
Arc::new(functions_array::planner::ArrayFunctionPlanner),
#[cfg(feature = "array_expressions")]
Arc::new(functions_array::planner::FieldAccessPlanner),
#[cfg(feature = "datetime_expressions")]
Arc::new(functions::datetime::planner::ExtractPlanner),
];

let mut new_self = SessionState {
session_id,
Expand Down
37 changes: 32 additions & 5 deletions datafusion/expr/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ pub trait ContextProvider {

/// This trait allows users to customize the behavior of the SQL planner
pub trait UserDefinedSQLPlanner: Send + Sync {
/// Plan the binary operation between two expressions, returns OriginalBinaryExpr if not possible
/// Plan the binary operation between two expressions, returns original
/// BinaryExpr if not possible
fn plan_binary_op(
&self,
expr: RawBinaryExpr,
Expand All @@ -93,7 +94,9 @@ pub trait UserDefinedSQLPlanner: Send + Sync {
Ok(PlannerResult::Original(expr))
}

/// Plan the field access expression, returns OriginalFieldAccessExpr if not possible
/// Plan the field access expression
///
/// returns original FieldAccessExpr if not possible
fn plan_field_access(
&self,
expr: RawFieldAccessExpr,
Expand All @@ -102,7 +105,9 @@ pub trait UserDefinedSQLPlanner: Send + Sync {
Ok(PlannerResult::Original(expr))
}

// Plan the array literal, returns OriginalArray if not possible
/// Plan the array literal, returns OriginalArray if not possible
///
/// Returns origin expression arguments if not possible
fn plan_array_literal(
&self,
exprs: Vec<Expr>,
Expand All @@ -111,8 +116,20 @@ pub trait UserDefinedSQLPlanner: Send + Sync {
Ok(PlannerResult::Original(exprs))
}

// Plan the Extract expression, e.g., EXTRACT(month FROM foo)
// returns origin expression arguments if not possible
/// Plan the dictionary literal `{ key: value, ...}`
///
/// Returns origin expression arguments if not possible
fn plan_dictionary_literal(
&self,
expr: RawDictionaryExpr,
_schema: &DFSchema,
) -> Result<PlannerResult<RawDictionaryExpr>> {
Ok(PlannerResult::Original(expr))
}

/// Plan an extract expression, e.g., `EXTRACT(month FROM foo)`
///
/// Returns origin expression arguments if not possible
fn plan_extract(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Ok(PlannerResult::Original(args))
}
Expand Down Expand Up @@ -142,6 +159,16 @@ pub struct RawFieldAccessExpr {
pub expr: Expr,
}

/// A Dictionary literal expression `{ key: value, ...}`
///
/// This structure is used by [`UserDefinedSQLPlanner`] to plan operators with
/// custom expressions.
#[derive(Debug, Clone)]
pub struct RawDictionaryExpr {
pub keys: Vec<Expr>,
pub values: Vec<Expr>,
}

/// Result of planning a raw expr with [`UserDefinedSQLPlanner`]
#[derive(Debug, Clone)]
pub enum PlannerResult<T> {
Expand Down
1 change: 1 addition & 0 deletions datafusion/functions/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod named_struct;
pub mod nullif;
pub mod nvl;
pub mod nvl2;
pub mod planner;
pub mod r#struct;

// create UDFs
Expand Down
40 changes: 40 additions & 0 deletions datafusion/functions/src/core/planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use datafusion_common::DFSchema;
use datafusion_common::Result;
use datafusion_expr::planner::{PlannerResult, RawDictionaryExpr, UserDefinedSQLPlanner};

use super::named_struct;

#[derive(Default)]
pub struct CoreFunctionPlanner {}

impl UserDefinedSQLPlanner for CoreFunctionPlanner {
fn plan_dictionary_literal(
&self,
expr: RawDictionaryExpr,
_schema: &DFSchema,
) -> Result<PlannerResult<RawDictionaryExpr>> {
let mut args = vec![];
for (k, v) in expr.keys.into_iter().zip(expr.values.into_iter()) {
args.push(k);
args.push(v);
}
Ok(PlannerResult::Planned(named_struct().call(args)))
}
}
2 changes: 1 addition & 1 deletion datafusion/functions/src/datetime/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion_expr::{
};

#[derive(Default)]
pub struct ExtractPlanner {}
pub struct ExtractPlanner;

impl UserDefinedSQLPlanner for ExtractPlanner {
fn plan_extract(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
Expand Down
37 changes: 36 additions & 1 deletion datafusion/sql/src/expr/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
use arrow_schema::DataType;
use arrow_schema::TimeUnit;
use datafusion_expr::planner::PlannerResult;
use datafusion_expr::planner::RawDictionaryExpr;
use datafusion_expr::planner::RawFieldAccessExpr;
use sqlparser::ast::{CastKind, Expr as SQLExpr, Subscript, TrimWhereField, Value};
use sqlparser::ast::{
CastKind, DictionaryField, Expr as SQLExpr, Subscript, TrimWhereField, Value,
};

use datafusion_common::{
internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, Result,
Expand Down Expand Up @@ -619,10 +622,42 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
}
},
))),
SQLExpr::Dictionary(fields) => {
self.try_plan_dictionary_literal(fields, schema, planner_context)
}
_ => not_impl_err!("Unsupported ast node in sqltorel: {sql:?}"),
}
}

fn try_plan_dictionary_literal(
&self,
fields: Vec<DictionaryField>,
schema: &DFSchema,
planner_context: &mut PlannerContext,
) -> Result<Expr> {
let mut keys = vec![];
let mut values = vec![];
for field in fields {
let key = lit(field.key.value);
let value =
self.sql_expr_to_logical_expr(*field.value, schema, planner_context)?;
keys.push(key);
values.push(value);
}

let mut raw_expr = RawDictionaryExpr { keys, values };

for planner in self.planners.iter() {
match planner.plan_dictionary_literal(raw_expr, schema)? {
PlannerResult::Planned(expr) => {
return Ok(expr);
}
PlannerResult::Original(expr) => raw_expr = expr,
}
}
not_impl_err!("Unsupported dictionary literal: {raw_expr:?}")
}

/// Parses a struct(..) expression
fn parse_struct(
&self,
Expand Down
25 changes: 25 additions & 0 deletions datafusion/sqllogictest/test_files/struct.slt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ select named_struct('scalar', 27, 'array', values.a, 'null', NULL) from values;
{scalar: 27, array: 2, null: }
{scalar: 27, array: 3, null: }

query ?
select {'scalar': 27, 'array': values.a, 'null': NULL} from values;
----
{scalar: 27, array: 1, null: }
{scalar: 27, array: 2, null: }
{scalar: 27, array: 3, null: }

# named_struct with mixed scalar and array values #2
query ?
select named_struct('array', values.a, 'scalar', 27, 'null', NULL) from values;
Expand All @@ -170,6 +177,13 @@ select named_struct('array', values.a, 'scalar', 27, 'null', NULL) from values;
{array: 2, scalar: 27, null: }
{array: 3, scalar: 27, null: }

query ?
select {'array': values.a, 'scalar': 27, 'null': NULL} from values;
----
{array: 1, scalar: 27, null: }
{array: 2, scalar: 27, null: }
{array: 3, scalar: 27, null: }

# named_struct with mixed scalar and array values #3
query ?
select named_struct('null', NULL, 'array', values.a, 'scalar', 27) from values;
Expand Down Expand Up @@ -207,3 +221,14 @@ query T
select arrow_typeof(named_struct('first', 1, 'second', 2, 'third', 3));
----
Struct([Field { name: "first", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "second", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "third", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])

query T
select arrow_typeof({'first': 1, 'second': 2, 'third': 3});
----
Struct([Field { name: "first", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "second", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "third", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])

# test nested struct literal
query ?
select {'animal': {'cat': 1, 'dog': 2, 'bird': {'parrot': 3, 'canary': 1}}, 'genre': {'fiction': ['mystery', 'sci-fi', 'fantasy'], 'non-fiction': {'biography': 5, 'history': 7, 'science': {'physics': 2, 'biology': 3}}}, 'vehicle': {'car': {'sedan': 4, 'suv': 2}, 'bicycle': 3, 'boat': ['sailboat', 'motorboat']}, 'weather': {'sunny': True, 'temperature': 25.5, 'wind': {'speed': 10, 'direction': 'NW'}}};
----
{animal: {cat: 1, dog: 2, bird: {parrot: 3, canary: 1}}, genre: {fiction: [mystery, sci-fi, fantasy], non-fiction: {biography: 5, history: 7, science: {physics: 2, biology: 3}}}, vehicle: {car: {sedan: 4, suv: 2}, bicycle: 3, boat: [sailboat, motorboat]}, weather: {sunny: true, temperature: 25.5, wind: {speed: 10, direction: NW}}}

0 comments on commit fe66daa

Please sign in to comment.