Skip to content

Commit

Permalink
fix: Fix erroneous window bounds removal during compilation (#1163)
Browse files Browse the repository at this point in the history
  • Loading branch information
TrevorBergeron authored and Shuowei Li committed Jan 16, 2025
1 parent c7a2c06 commit 75b4596
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 815 deletions.
3 changes: 2 additions & 1 deletion bigframes/core/block_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,10 @@ def indicate_duplicates(
# Discard this value if there are copies ANYWHERE
window_spec = windows.unbound(grouping_keys=tuple(columns))
block, dummy = block.create_constant(1)
# use row number as will work even with partial ordering
block, val_count_col_id = block.apply_window_op(
dummy,
agg_ops.count_op,
agg_ops.sum_op,
window_spec=window_spec,
)
block, duplicate_indicator = block.project_expr(
Expand Down
9 changes: 9 additions & 0 deletions bigframes/core/compile/aggregate_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,15 @@ def _(
return _apply_window_if_present(column.dense_rank(), window) + 1


@compile_unary_agg.register
def _(
op: agg_ops.RowNumberOp,
column: ibis_types.Column,
window=None,
) -> ibis_types.IntegerValue:
return _apply_window_if_present(ibis_api.row_number(), window)


@compile_unary_agg.register
def _(op: agg_ops.FirstOp, column: ibis_types.Column, window=None) -> ibis_types.Value:
return _apply_window_if_present(column.first(), window)
Expand Down
2 changes: 1 addition & 1 deletion bigframes/core/compile/compiled.py
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ def _ibis_window_from_spec(
if require_total_order or isinstance(window_spec.bounds, RowsWindowBounds):
# Some operators need an unambiguous ordering, so the table's total ordering is appended
order_by = tuple([*order_by, *self._ibis_order])
elif isinstance(window_spec.bounds, RowsWindowBounds):
elif require_total_order or isinstance(window_spec.bounds, RowsWindowBounds):
# If window spec has following or preceding bounds, we need to apply an unambiguous ordering.
order_by = tuple(self._ibis_order)
else:
Expand Down
13 changes: 13 additions & 0 deletions bigframes/operations/aggregations.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,19 @@ def skips_nulls(self):
return True


# This should really by a NullaryWindowOp, but APIs don't support that yet.
@dataclasses.dataclass(frozen=True)
class RowNumberOp(UnaryWindowOp):
name: ClassVar[str] = "rownumber"

@property
def skips_nulls(self):
return False

def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType:
return dtypes.INT_DTYPE


@dataclasses.dataclass(frozen=True)
class RankOp(UnaryWindowOp):
name: ClassVar[str] = "rank"
Expand Down
21 changes: 21 additions & 0 deletions tests/system/small/test_unordered.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,27 @@ def test_unordered_merge(unordered_session):
assert_pandas_df_equal(bf_result.to_pandas(), pd_result, ignore_order=True)


def test_unordered_drop_duplicates_ambiguous(unordered_session):
pd_df = pd.DataFrame(
{"a": [1, 1, 1], "b": [4, 4, 6], "c": [1, 1, 3]}, dtype=pd.Int64Dtype()
)
bf_df = bpd.DataFrame(pd_df, session=unordered_session)

# merge first to discard original ordering
bf_result = (
bf_df.merge(bf_df, left_on="a", right_on="c")
.sort_values("c_y")
.drop_duplicates()
)
pd_result = (
pd_df.merge(pd_df, left_on="a", right_on="c")
.sort_values("c_y")
.drop_duplicates()
)

assert_pandas_df_equal(bf_result.to_pandas(), pd_result, ignore_order=True)


def test_unordered_mode_cache_preserves_order(unordered_session):
pd_df = pd.DataFrame(
{"a": [1, 2, 3, 4, 5, 6], "b": [4, 5, 9, 3, 1, 6]}, dtype=pd.Int64Dtype()
Expand Down
Loading

0 comments on commit 75b4596

Please sign in to comment.