Skip to content

Commit

Permalink
perf: Generate SQL with fewer CTEs
Browse files Browse the repository at this point in the history
  • Loading branch information
TrevorBergeron committed Aug 1, 2024
1 parent 30aaae5 commit fff9408
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 21 deletions.
69 changes: 50 additions & 19 deletions bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import ibis.backends.bigquery as ibis_bigquery
import ibis.common.deferred # type: ignore
import ibis.expr.datatypes as ibis_dtypes
import ibis.expr.operations as ibis_ops
import ibis.expr.types as ibis_types
import pandas

Expand Down Expand Up @@ -71,19 +72,16 @@ def __init__(
# Allow creating a DataFrame directly from an Ibis table expression.
# TODO(swast): Validate that each column references the same table (or
# no table for literal values).
self._columns = tuple(columns)
self._columns = tuple(
column.resolve(table)
# TODO(https://github.com/ibis-project/ibis/issues/7613): use
# public API to refer to Deferred type.
if isinstance(column, ibis.common.deferred.Deferred) else column
for column in columns
)
# To allow for more efficient lookup by column name, create a
# dictionary mapping names to column values.
self._column_names = {
(
column.resolve(table)
# TODO(https://github.com/ibis-project/ibis/issues/7613): use
# public API to refer to Deferred type.
if isinstance(column, ibis.common.deferred.Deferred)
else column
).get_name(): column
for column in self._columns
}
self._column_names = {column.get_name(): column for column in self._columns}

@property
def columns(self) -> typing.Tuple[ibis_types.Value, ...]:
Expand Down Expand Up @@ -139,10 +137,6 @@ def projection(
for expression, id in expression_id_pairs
]
result = self._select(tuple(values)) # type: ignore

# Need to reproject to convert ibis Scalar to ibis Column object
if any(exp_id[0].is_const for exp_id in expression_id_pairs):
result = result._reproject_to_table()
return result

@abc.abstractmethod
Expand All @@ -166,6 +160,13 @@ def _get_ibis_column(self, key: str) -> ibis_types.Value:
),
)

def is_scalar_expr(self, key: str) -> bool:
# sometimes need to determine if column expression is a scalar expression.
# For instance, cannot filter on an analytic expression, or nest analytic expressions.
# Literals are excluded because ibis itself doesn't work well with them, not because of sql limitations.
ibis_expr = self._get_ibis_column(key)
return not is_literal(ibis_expr) and not is_window(ibis_expr)

def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
ibis_type = typing.cast(
bigframes.core.compile.ibis_types.IbisDtype,
Expand Down Expand Up @@ -355,6 +356,9 @@ def _to_ibis_expr(
return table

def filter(self, predicate: ex.Expression) -> UnorderedIR:
if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))):
# ibis doesn't support qualify syntax
return self._reproject_to_table().filter(predicate)
bindings = {col: self._get_ibis_column(col) for col in self.column_ids}
condition = op_compiler.compile_expression(predicate, bindings)
return self._filter(condition)
Expand Down Expand Up @@ -806,7 +810,6 @@ def project_window_op(
output_name=None,
*,
never_skip_nulls=False,
skip_reproject_unsafe: bool = False,
) -> OrderedIR:
"""
Creates a new expression based on this expression with unary operation applied to one column.
Expand All @@ -815,8 +818,18 @@ def project_window_op(
window_spec: a specification of the window over which to apply the operator
output_name: the id to assign to the output of the operator, by default will replace input col if distinct output id not provided
never_skip_nulls: will disable null skipping for operators that would otherwise do so
skip_reproject_unsafe: skips the reprojection step, can be used when performing many non-dependent window operations, user responsible for not nesting window expressions, or using outputs as join, filter or aggregation keys before a reprojection
"""
used_vars = [column_name, *window_spec.all_referenced_columns]
# Cannot nest analytic expressions, so reproject to cte first if needed.
if not all(map(self.is_scalar_expr, used_vars)):
return self._reproject_to_table().project_window_op(
column_name,
op,
window_spec,
output_name,
never_skip_nulls=never_skip_nulls,
)

column = typing.cast(ibis_types.Column, self._get_ibis_column(column_name))
window = self._ibis_window_from_spec(
window_spec, require_total_order=op.uses_total_row_ordering
Expand Down Expand Up @@ -861,8 +874,7 @@ def project_window_op(
window_op = case_statement

result = self._set_or_replace_by_id(output_name or column_name, window_op)
# TODO(tbergeron): Automatically track analytic expression usage and defer reprojection until required for valid query generation.
return result._reproject_to_table() if not skip_reproject_unsafe else result
return result

def _reproject_to_table(self) -> OrderedIR:
table = self._to_ibis_expr(
Expand Down Expand Up @@ -1034,6 +1046,9 @@ def _to_ibis_expr(
return table

def filter(self, predicate: ex.Expression) -> OrderedIR:
if any(map(is_window, map(self._get_ibis_column, predicate.unbound_variables))):
# ibis doesn't support qualify syntax
return self._reproject_to_table().filter(predicate)
bindings = {col: self._get_ibis_column(col) for col in self.column_ids}
condition = op_compiler.compile_expression(predicate, bindings)
return self._filter(condition)
Expand Down Expand Up @@ -1328,6 +1343,22 @@ def build(self) -> OrderedIR:
)


def is_literal(column: ibis_types.Value) -> bool:
# Unfortunately, Literals in ibis are not "Columns"s and therefore can't be aggregated.
return not isinstance(column, ibis_types.Column)


def is_window(column: ibis_types.Value) -> bool:
matches = (
(column)
.op()
.find_topmost(
lambda x: isinstance(x, (ibis_ops.WindowFunction, ibis_ops.Relation))
)
)
return any(isinstance(op, ibis_ops.WindowFunction) for op in matches)


def _reduce_predicate_list(
predicate_list: typing.Collection[ibis_types.BooleanValue],
) -> ibis_types.BooleanValue:
Expand Down
1 change: 0 additions & 1 deletion bigframes/core/compile/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,6 @@ def compile_window(self, node: nodes.WindowOpNode, ordered: bool = True):
node.window_spec,
node.output_name,
never_skip_nulls=node.never_skip_nulls,
skip_reproject_unsafe=node.skip_reproject_unsafe,
)
return result if ordered else result.to_unordered()

Expand Down
14 changes: 13 additions & 1 deletion bigframes/core/window_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Optional, Tuple, Union
import itertools
from typing import Optional, Set, Tuple, Union

import bigframes.core.ordering as orderings

Expand Down Expand Up @@ -162,3 +163,14 @@ def row_bounded(self):
to calculate deterministically.
"""
return isinstance(self.bounds, RowsWindowBounds)

@property
def all_referenced_columns(self) -> Set[str]:
"""
Return list of all variables reference ind the window.
"""
ordering_vars = itertools.chain.from_iterable(
item.scalar_expression.unbound_variables for item in self.ordering
)
itertools.chain(self.grouping_keys, ordering_vars)
return set(itertools.chain(self.grouping_keys, ordering_vars))

0 comments on commit fff9408

Please sign in to comment.