Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: count literal as leaf columns #11415

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/planner/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ pub(crate) fn create_physical_expr(
let phys_function =
create_physical_expr(function, Context::Aggregation, expr_arena, schema, state)?;
let mut out_name = None;
let mut apply_columns = aexpr_to_leaf_names(function, expr_arena);
let mut apply_columns = aexpr_to_leaf_column_names(function, expr_arena);
// sort and then dedup removes consecutive duplicates == all duplicates
apply_columns.sort();
apply_columns.dedup();
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ fn partitionable_gb(

#[cfg(feature = "object")]
{
for name in aexpr_to_leaf_names(*agg, expr_arena) {
for name in aexpr_to_leaf_column_names(*agg, expr_arena) {
let dtype = _input_schema.get(&name).unwrap();

if let DataType::Object(_) = dtype {
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub static MAP_LIST_NAME: &str = "map_list";
pub static CSE_REPLACED: &str = "__POLARS_CSER_";
pub static LITERAL_NAME: &str = "literal";
3 changes: 2 additions & 1 deletion crates/polars-plan/src/logical_plan/aexpr/schema.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use crate::constants::LITERAL_NAME;

fn float_type(field: &mut Field) {
if field.dtype.is_numeric() && !matches!(&field.dtype, DataType::Float32) {
Expand Down Expand Up @@ -51,7 +52,7 @@ impl AExpr {
},
Literal(sv) => Ok(match sv {
LiteralValue::Series(s) => s.field().into_owned(),
_ => Field::new("literal", sv.get_datatype()),
_ => Field::new(LITERAL_NAME, sv.get_datatype()),
}),
BinaryExpr { left, right, op } => {
use DataType::*;
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-plan/src/logical_plan/optimizer/drop_nulls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::*;
use crate::dsl::function_expr::FunctionExpr;
use crate::logical_plan::functions::FunctionNode;
use crate::logical_plan::iterator::*;
use crate::utils::aexpr_to_leaf_names;
use crate::utils::aexpr_to_leaf_column_names;

/// If we realize that a predicate drops nulls on a subset
/// we replace it with an explicit df.drop_nulls call, as this
Expand Down Expand Up @@ -72,7 +72,7 @@ impl OptimizationRule for ReplaceDropNulls {
}
}
if not_null_count == column_count && binary_and_count < column_count {
let subset = Arc::from(aexpr_to_leaf_names(*predicate, expr_arena));
let subset = Arc::from(aexpr_to_leaf_column_names(*predicate, expr_arena));

Some(ALogicalPlan::MapFunction {
input: *input,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use polars_core::prelude::*;
use super::keys::*;
use crate::logical_plan::Context;
use crate::prelude::*;
use crate::utils::{aexpr_to_leaf_names, check_input_node, has_aexpr, rename_aexpr_leaf_names};
use crate::utils::{
aexpr_to_leaf_column_names, check_input_node, has_aexpr, rename_aexpr_leaf_names,
};

trait Dsl {
fn and(self, right: Node, arena: &mut Arena<AExpr>) -> Node;
Expand Down Expand Up @@ -214,7 +216,7 @@ fn rename_predicate_columns_due_to_aliased_projection(
let projection_aexpr = expr_arena.get(projection_node);
if let AExpr::Alias(_, alias_name) = projection_aexpr {
let alias_name = alias_name.as_ref();
let projection_leaves = aexpr_to_leaf_names(projection_node, expr_arena);
let projection_leaves = aexpr_to_leaf_column_names(projection_node, expr_arena);

// this means the leaf is a literal
if projection_leaves.is_empty() {
Expand Down Expand Up @@ -362,7 +364,7 @@ pub(super) fn no_pushdown_preds<F>(
// matching expr are typically explode, shift, etc. expressions that mess up predicates when pushed down
if has_aexpr(node, arena, matches) {
// columns that are projected. We check if we can push down the predicates past this projection
let columns = aexpr_to_leaf_names(node, arena);
let columns = aexpr_to_leaf_column_names(node, arena);

let condition = |name: Arc<str>| columns.contains(&name);
local_predicates.extend(transfer_to_local_by_name(arena, acc_predicates, condition));
Expand All @@ -382,7 +384,7 @@ where
let mut remove_keys = Vec::with_capacity(acc_predicates.len());

for (key, predicate) in &*acc_predicates {
let root_names = aexpr_to_leaf_names(*predicate, expr_arena);
let root_names = aexpr_to_leaf_column_names(*predicate, expr_arena);
for name in root_names {
if condition(name) {
remove_keys.push(key.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub(super) fn process_asof_join(
let mut add_local = if already_added_local_to_local_projected.is_empty() {
true
} else {
let name = aexpr_to_leaf_name(proj, expr_arena);
let name = aexpr_to_leaf_column_name(proj, expr_arena);
!already_added_local_to_local_projected.contains(&name)
};

Expand Down Expand Up @@ -273,7 +273,7 @@ pub(super) fn process_join(
let mut add_local = if already_added_local_to_local_projected.is_empty() {
true
} else {
let name = aexpr_to_leaf_name(proj, expr_arena);
let name = aexpr_to_leaf_column_name(proj, expr_arena);
!already_added_local_to_local_projected.contains(&name)
};

Expand Down Expand Up @@ -363,7 +363,7 @@ fn process_projection(
// this branch tries to pushdown the column without suffix
{
// Column name of the projection without any alias.
let leaf_column_name = aexpr_to_leaf_names(proj, expr_arena).pop().unwrap();
let leaf_column_name = aexpr_to_leaf_column_names(proj, expr_arena).pop().unwrap();

let suffix = options.args.suffix();
// If _right suffix exists we need to push a projection down without this
Expand Down Expand Up @@ -407,7 +407,7 @@ pub(super) fn process_alias(
mut add_local: bool,
) -> bool {
if let AExpr::Alias(expr, name) = expr_arena.get(proj).clone() {
for root_name in aexpr_to_leaf_names(expr, expr_arena) {
for root_name in aexpr_to_leaf_column_names(expr, expr_arena) {
let node = expr_arena.add(AExpr::Column(root_name));
let proj = expr_arena.add(AExpr::Alias(node, name.clone()));
local_projection.push(proj)
Expand Down Expand Up @@ -448,7 +448,7 @@ fn resolve_join_suffixes(
let schema_after_join = alp.schema(lp_arena);

for proj in local_projection {
for name in aexpr_to_leaf_names(*proj, expr_arena) {
for name in aexpr_to_leaf_column_names(*proj, expr_arena) {
if name.contains(suffix) && schema_after_join.get(&name).is_none() {
let new_name = &name.as_ref()[..name.len() - suffix.len()];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::prelude::optimizer::projection_pushdown::projection::process_projecti
use crate::prelude::optimizer::projection_pushdown::rename::process_rename;
use crate::prelude::*;
use crate::utils::{
aexpr_assign_renamed_leaf, aexpr_to_column_nodes, aexpr_to_leaf_names, check_input_node,
aexpr_assign_renamed_leaf, aexpr_to_column_nodes, aexpr_to_leaf_column_names, check_input_node,
expr_is_projected_upstream,
};

Expand All @@ -45,7 +45,7 @@ fn get_scan_columns(
if !acc_projections.is_empty() {
let mut columns = Vec::with_capacity(acc_projections.len());
for expr in acc_projections {
for name in aexpr_to_leaf_names(*expr, expr_arena) {
for name in aexpr_to_leaf_column_names(*expr, expr_arena) {
// we shouldn't project the row-count column, as that is generated
// in the scan
let push = match row_count {
Expand Down Expand Up @@ -86,7 +86,7 @@ fn split_acc_projections(
.partition(|expr| check_input_node(*expr, down_schema, expr_arena));
let mut names = init_set();
for proj in &acc_projections {
for name in aexpr_to_leaf_names(*proj, expr_arena) {
for name in aexpr_to_leaf_column_names(*proj, expr_arena) {
names.insert(name);
}
}
Expand Down Expand Up @@ -132,7 +132,7 @@ fn update_scan_schema(
let mut new_schema = Schema::with_capacity(acc_projections.len());
let mut new_cols = Vec::with_capacity(acc_projections.len());
for node in acc_projections.iter() {
for name in aexpr_to_leaf_names(*node, expr_arena) {
for name in aexpr_to_leaf_column_names(*node, expr_arena) {
let item = schema.get_full(&name).ok_or_else(|| {
polars_err!(ComputeError: "column '{}' not available in schema {:?}", name, schema)
})?;
Expand Down Expand Up @@ -218,7 +218,7 @@ impl ProjectionPushDown {
) -> (bool, bool) {
let mut pushed_at_least_one = false;
let mut already_projected = false;
let names = aexpr_to_leaf_names(proj, expr_arena);
let names = aexpr_to_leaf_column_names(proj, expr_arena);
let root_projections = aexpr_to_column_nodes(proj, expr_arena);

for (name, root_projection) in names.into_iter().zip(root_projections) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub(super) fn process_projection(
if let AExpr::Alias(_, name) = ae {
if projected_names.remove(name) {
acc_projections.retain(|expr| {
!aexpr_to_leaf_names(*expr, expr_arena).contains(name)
!aexpr_to_leaf_column_names(*expr, expr_arena).contains(name)
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl OptimizationRule for SimplifyBooleanRule {
AExpr::Literal(LiteralValue::Boolean(false))
) =>
{
let names = aexpr_to_leaf_names(*truthy, expr_arena);
let names = aexpr_to_leaf_column_names(*truthy, expr_arena);
let name = names.get(0).map(Arc::clone).unwrap_or_else(|| "".into());
Some(AExpr::Alias(*falsy, name))
},
Expand Down
22 changes: 19 additions & 3 deletions crates/polars-plan/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::vec::IntoIter;
use polars_core::prelude::*;
use smartstring::alias::String as SmartString;

use crate::constants::LITERAL_NAME;
use crate::logical_plan::iterator::ArenaExprIter;
use crate::logical_plan::Context;
use crate::prelude::names::COUNT;
Expand Down Expand Up @@ -345,7 +346,7 @@ pub fn expressions_to_schema(
.collect()
}

pub fn aexpr_to_leaf_names_iter(
pub fn aexpr_to_leaf_column_names_iter(
node: Node,
arena: &Arena<AExpr>,
) -> impl Iterator<Item = Arc<str>> + '_ {
Expand All @@ -358,10 +359,25 @@ pub fn aexpr_to_leaf_names_iter(
})
}

pub fn aexpr_to_leaf_names(node: Node, arena: &Arena<AExpr>) -> Vec<Arc<str>> {
aexpr_to_leaf_names_iter(node, arena).collect()
pub fn aexpr_to_leaf_column_names(node: Node, arena: &Arena<AExpr>) -> Vec<Arc<str>> {
aexpr_to_leaf_column_names_iter(node, arena).collect()
}

pub fn aexpr_to_leaf_column_name(node: Node, arena: &Arena<AExpr>) -> Arc<str> {
aexpr_to_leaf_column_names_iter(node, arena).next().unwrap()
}

pub fn aexpr_to_leaf_names_iter(
node: Node,
arena: &Arena<AExpr>,
) -> impl Iterator<Item = Arc<str>> + '_ {
arena.iter(node).flat_map(|(_, ae)| match ae {
// expecting only columns here, wildcards and dtypes should already be replaced
AExpr::Column(name) => Some(name.clone()),
AExpr::Literal(_) => Some(Arc::from(LITERAL_NAME)),
_ => None,
})
}
pub fn aexpr_to_leaf_name(node: Node, arena: &Arena<AExpr>) -> Arc<str> {
aexpr_to_leaf_names_iter(node, arena).next().unwrap()
}
Expand Down
9 changes: 9 additions & 0 deletions py-polars/tests/unit/operations/test_drop.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,12 @@ def test_drop_nan_ignore_null_3525() -> None:
3.0,
4.0,
]


def test_drop_join_lit() -> None:
df = pl.LazyFrame({"date": [1, 2, 3], "symbol": [4, 5, 6]})
dates = df.select("date").unique()
symbols = df.select("symbol").unique()
assert symbols.join(dates, left_on=pl.lit(1), right_on=pl.lit(1)).drop(
"literal"
).collect().columns == ["symbol", "date"]
Loading