Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support Struct field selection in the SQL engine, RENAME and REPLACE select wildcard options #17109

Merged
merged 7 commits into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/conversion/expr_expansion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ fn find_flags(expr: &Expr) -> PolarsResult<ExpansionFlags> {

/// In case of single col(*) -> do nothing, no selection is the same as select all
/// In other cases replace the wildcard with an expression with all columns
pub(crate) fn rewrite_projections(
pub fn rewrite_projections(
exprs: Vec<Expr>,
schema: &Schema,
keys: &[Expr],
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/conversion/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod convert_utils;
mod dsl_to_ir;
mod expr_expansion;
pub(crate) mod expr_expansion;
mod expr_to_ir;
mod ir_to_dsl;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv"))]
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-plan/src/plans/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub(crate) mod ir;
mod apply;
mod builder_dsl;
mod builder_ir;
pub(crate) mod conversion;
pub mod conversion;
#[cfg(feature = "debugging")]
pub(crate) mod debug;
pub mod expr_ir;
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub(crate) use polars_time::prelude::*;
pub use polars_utils::arena::{Arena, Node};

pub use crate::dsl::*;
pub use crate::plans::conversion::expr_expansion::rewrite_projections;
#[cfg(feature = "debugging")]
pub use crate::plans::debug::*;
pub use crate::plans::options::*;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ description = "SQL transpiler for Polars. Converts SQL to Polars logical plans"
arrow = { workspace = true }
polars-core = { workspace = true, features = ["rows"] }
polars-error = { workspace = true }
polars-lazy = { workspace = true, features = ["abs", "binary_encoding", "concat_str", "cross_join", "cum_agg", "dtype-date", "dtype-decimal", "is_in", "list_eval", "log", "meta", "regex", "round_series", "sign", "string_reverse", "strings", "timezones", "trigonometry"] }
polars-lazy = { workspace = true, features = ["abs", "binary_encoding", "concat_str", "cross_join", "cum_agg", "dtype-date", "dtype-decimal", "dtype-struct", "is_in", "list_eval", "log", "meta", "regex", "round_series", "sign", "string_reverse", "strings", "timezones", "trigonometry"] }
polars-ops = { workspace = true }
polars-plan = { workspace = true }
polars-time = { workspace = true }
Expand Down
106 changes: 63 additions & 43 deletions crates/polars-sql/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use polars_lazy::prelude::*;
use polars_ops::frame::JoinCoalesce;
use polars_plan::prelude::*;
use sqlparser::ast::{
Distinct, ExcludeSelectItem, Expr as SQLExpr, FunctionArg, GroupByExpr, JoinConstraint,
Distinct, ExcludeSelectItem, Expr as SQLExpr, FunctionArg, GroupByExpr, Ident, JoinConstraint,
JoinOperator, ObjectName, ObjectType, Offset, OrderByExpr, Query, Select, SelectItem, SetExpr,
SetOperator, SetQuantifier, Statement, TableAlias, TableFactor, TableWithJoins, UnaryOperator,
Value as SQLValue, Values, WildcardAdditionalOptions,
Expand Down Expand Up @@ -200,8 +200,9 @@ impl SQLContext {
fn expr_or_ordinal(
&mut self,
e: &SQLExpr,
schema: Option<&Schema>,
exprs: &[Expr],
selected: Option<&[Expr]>,
schema: Option<&Schema>,
clause: &str,
) -> PolarsResult<Expr> {
match e {
Expand Down Expand Up @@ -230,7 +231,12 @@ impl SQLContext {
idx
)
})?;
Ok(exprs
let cols = if let Some(cols) = selected {
cols
} else {
exprs
};
Ok(cols
.get(idx - 1)
.ok_or_else(|| {
polars_err!(
Expand Down Expand Up @@ -600,44 +606,54 @@ impl SQLContext {
lf = self.process_where(lf, &select_stmt.selection)?;

// Column projections.
let projections: Vec<_> = select_stmt
let projections: Vec<Expr> = select_stmt
.projection
.iter()
.map(|select_item| {
Ok(match select_item {
SelectItem::UnnamedExpr(expr) => parse_sql_expr(expr, self, schema.as_deref())?,
SelectItem::UnnamedExpr(expr) => {
vec![parse_sql_expr(expr, self, schema.as_deref())?]
},
SelectItem::ExprWithAlias { expr, alias } => {
let expr = parse_sql_expr(expr, self, schema.as_deref())?;
expr.alias(&alias.value)
vec![expr.alias(&alias.value)]
},
SelectItem::QualifiedWildcard(oname, wildcard_options) => self
.process_qualified_wildcard(
oname,
SelectItem::QualifiedWildcard(obj_name, wildcard_options) => {
let expanded = self.process_qualified_wildcard(
obj_name,
wildcard_options,
&mut contains_wildcard_exclude,
)?,
schema.as_deref(),
)?;
rewrite_projections(vec![expanded], &(schema.clone().unwrap()), &[])?
alexander-beedie marked this conversation as resolved.
Show resolved Hide resolved
},
SelectItem::Wildcard(wildcard_options) => {
contains_wildcard = true;
let e = col("*");
self.process_wildcard_additional_options(
vec![self.process_wildcard_additional_options(
e,
wildcard_options,
&mut contains_wildcard_exclude,
)?
)?]
},
})
})
.collect::<PolarsResult<_>>()?;
.collect::<PolarsResult<Vec<Vec<_>>>>()?
.into_iter()
.flatten()
.collect();

// Check for "GROUP BY ..." (after projections, as there may be ordinal/position ints).
// Check for "GROUP BY ..." (after determining projections)
let mut group_by_keys: Vec<Expr> = Vec::new();
match &select_stmt.group_by {
// Standard "GROUP BY x, y, z" syntax (also recognising ordinal values)
GroupByExpr::Expressions(group_by_exprs) => {
// translate the group expressions, allowing ordinal values
group_by_keys = group_by_exprs
.iter()
.map(|e| self.expr_or_ordinal(e, schema.as_deref(), &projections, "GROUP BY"))
.map(|e| {
self.expr_or_ordinal(e, &projections, None, schema.as_deref(), "GROUP BY")
})
.collect::<PolarsResult<_>>()?
},
// "GROUP BY ALL" syntax; automatically adds expressions that do not contain
Expand Down Expand Up @@ -704,8 +720,9 @@ impl SQLContext {
});
let retained_columns: Vec<_> =
retained_names.into_iter().map(|name| col(&name)).collect();

lf = lf.with_columns(projections);
lf = self.process_order_by(lf, &query.order_by)?;
lf = self.process_order_by(lf, &query.order_by, Some(retained_columns.as_ref()))?;
lf.select(&retained_columns)
} else if contains_wildcard_exclude {
let mut dropped_names = Vec::with_capacity(projections.len());
Expand All @@ -723,19 +740,19 @@ impl SQLContext {
});
if exclude_expr.is_some() {
lf = lf.with_columns(projections);
lf = self.process_order_by(lf, &query.order_by)?;
lf = self.process_order_by(lf, &query.order_by, None)?;
lf.drop(dropped_names)
} else {
lf = lf.select(projections);
self.process_order_by(lf, &query.order_by)?
self.process_order_by(lf, &query.order_by, None)?
}
} else {
lf = lf.select(projections);
self.process_order_by(lf, &query.order_by)?
self.process_order_by(lf, &query.order_by, None)?
}
} else {
lf = self.process_group_by(lf, contains_wildcard, &group_by_keys, &projections)?;
lf = self.process_order_by(lf, &query.order_by)?;
lf = self.process_order_by(lf, &query.order_by, None)?;

// Apply optional 'having' clause, post-aggregation.
let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?);
Expand Down Expand Up @@ -765,7 +782,7 @@ impl SQLContext {

// DISTINCT ON applies the ORDER BY before the operation.
if !query.order_by.is_empty() {
lf = self.process_order_by(lf, &query.order_by)?;
lf = self.process_order_by(lf, &query.order_by, None)?;
}
return Ok(lf.unique_stable(Some(cols), UniqueKeepStrategy::First));
},
Expand Down Expand Up @@ -994,13 +1011,14 @@ impl SQLContext {
&mut self,
mut lf: LazyFrame,
order_by: &[OrderByExpr],
selected: Option<&[Expr]>,
) -> PolarsResult<LazyFrame> {
let mut by = Vec::with_capacity(order_by.len());
let mut descending = Vec::with_capacity(order_by.len());
let mut nulls_last = Vec::with_capacity(order_by.len());

let schema = Some(lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?);
let column_names = schema
let columns = schema
.clone()
.unwrap()
.iter_names()
Expand All @@ -1015,7 +1033,13 @@ impl SQLContext {
descending.push(desc_order);

// translate order expression, allowing ordinal values
by.push(self.expr_or_ordinal(&ob.expr, schema.as_deref(), &column_names, "ORDER BY")?)
by.push(self.expr_or_ordinal(
&ob.expr,
&columns,
selected,
schema.as_deref(),
"ORDER BY",
)?)
}
Ok(lf.sort_by_exprs(
&by,
Expand Down Expand Up @@ -1152,25 +1176,13 @@ impl SQLContext {
ObjectName(idents): &ObjectName,
options: &WildcardAdditionalOptions,
contains_wildcard_exclude: &mut bool,
schema: Option<&Schema>,
) -> PolarsResult<Expr> {
let idents = idents.as_slice();
let e = match idents {
[tbl_name] => {
let lf = self.table_map.get_mut(&tbl_name.value).ok_or_else(|| {
polars_err!(
SQLInterface: "no table named '{}' found",
tbl_name
)
})?;
let schema = lf.schema_with_arenas(&mut self.lp_arena, &mut self.expr_arena)?;
cols(schema.iter_names())
},
e => polars_bail!(
SQLSyntax: "invalid wildcard expression ({:?})",
e
),
};
self.process_wildcard_additional_options(e, options, contains_wildcard_exclude)
let mut new_idents = idents.clone();
new_idents.push(Ident::new("*"));
let identifier = SQLExpr::CompoundIdentifier(new_idents);
let expr = parse_sql_expr(&identifier, self, schema)?;
self.process_wildcard_additional_options(expr, options, contains_wildcard_exclude)
}

fn process_wildcard_additional_options(
Expand All @@ -1179,9 +1191,17 @@ impl SQLContext {
options: &WildcardAdditionalOptions,
contains_wildcard_exclude: &mut bool,
) -> PolarsResult<Expr> {
if options.opt_except.is_some() {
polars_bail!(SQLSyntax: "EXCEPT not supported (use EXCLUDE instead)")
// bail on unsupported wildcard options
if options.opt_ilike.is_some() {
polars_bail!(SQLSyntax: "ILIKE wildcard option is unsupported")
} else if options.opt_rename.is_some() {
polars_bail!(SQLSyntax: "RENAME wildcard option is unsupported")
} else if options.opt_replace.is_some() {
polars_bail!(SQLSyntax: "REPLACE wildcard option is unsupported")
} else if options.opt_except.is_some() {
polars_bail!(SQLSyntax: "EXCEPT wildcard option is unsupported (use EXCLUDE instead)")
}

Ok(match &options.opt_exclude {
Some(ExcludeSelectItem::Single(ident)) => {
*contains_wildcard_exclude = true;
Expand Down
79 changes: 46 additions & 33 deletions crates/polars-sql/src/sql_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,42 +375,55 @@ impl SQLExprVisitor<'_> {

/// Visit a compound SQL identifier
///
/// e.g. df.column or "df"."column"
/// e.g. tbl.column, struct.field, tbl.struct.field (inc. nested struct fields)
fn visit_compound_identifier(&mut self, idents: &[Ident]) -> PolarsResult<Expr> {
match idents {
[tbl_name, column_name] => {
let mut lf = self
.ctx
.get_table_from_current_scope(&tbl_name.value)
.ok_or_else(|| {
polars_err!(
SQLInterface: "no table or alias named '{}' found",
tbl_name
)
})?;

let schema =
lf.schema_with_arenas(&mut self.ctx.lp_arena, &mut self.ctx.expr_arena)?;
if let Some((_, name, _)) = schema.get_full(&column_name.value) {
let resolved = &self.ctx.resolve_name(&tbl_name.value, &column_name.value);
Ok(if name != resolved {
col(resolved).alias(name)
} else {
col(name)
})
// inference priority: table > struct > column
let ident_root = &idents[0];
let mut remaining_idents = idents.iter().skip(1);
let mut lf = self.ctx.get_table_from_current_scope(&ident_root.value);

let schema = if let Some(ref mut lf) = lf {
lf.schema_with_arenas(&mut self.ctx.lp_arena, &mut self.ctx.expr_arena)
} else {
Ok(Arc::new(if let Some(active_schema) = self.active_schema {
active_schema.clone()
} else {
Schema::new()
}))
}?;

let mut column: PolarsResult<Expr> = if lf.is_none() && schema.is_empty() {
Ok(col(&ident_root.value))
} else {
let name = &remaining_idents.next().unwrap().value;
if lf.is_some() && name == "*" {
Ok(cols(schema.iter_names()))
} else if let Some((_, name, _)) = schema.get_full(name) {
let resolved = &self.ctx.resolve_name(&ident_root.value, name);
Ok(if name != resolved {
col(resolved).alias(name)
} else {
polars_bail!(
SQLInterface: "no column named '{}' found in table '{}'",
column_name,
tbl_name
)
}
},
_ => polars_bail!(
SQLInterface: "invalid identifier {:?}",
idents
),
col(name)
})
} else if lf.is_none() {
remaining_idents = idents.iter().skip(1);
Ok(col(&ident_root.value))
} else {
polars_bail!(
SQLInterface: "no column named '{}' found in table '{}'",
name,
ident_root
)
}
};
// additional ident levels index into struct fields
for ident in remaining_idents {
column = Ok(column
.unwrap()
.struct_()
.field_by_name(ident.value.as_str()));
}
column
}

fn visit_interval(&self, interval: &Interval) -> PolarsResult<Expr> {
Expand Down
Loading