Skip to content

Commit

Permalink
perf: Add multi-query execution capability for complex dataframes (#427)
Browse files Browse the repository at this point in the history
  • Loading branch information
TrevorBergeron authored and Genesis929 committed Apr 9, 2024
1 parent 9917e58 commit adc4d97
Show file tree
Hide file tree
Showing 10 changed files with 403 additions and 6 deletions.
6 changes: 5 additions & 1 deletion bigframes/_config/compute_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ class ComputeOptions:
bytes billed beyond this limit will fail (without incurring a
charge). If unspecified, this will be set to your project default.
See `maximum_bytes_billed <https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJobConfig#google_cloud_bigquery_job_QueryJobConfig_maximum_bytes_billed>`_.
enable_multi_query_execution (bool, Options):
If enabled, large queries may be factored into multiple smaller queries
in order to avoid generating queries that are too complex for the query
engine to handle. However this comes at the cost of increase cost and latency.
"""

maximum_bytes_billed: Optional[int] = None
enable_multi_query_execution: bool = False
4 changes: 4 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1873,6 +1873,10 @@ def cached(self, *, optimize_offsets=False, force: bool = False) -> Block:
expr = self.session._cache_with_cluster_cols(
self.expr, cluster_cols=self.index_columns
)
return self.swap_array_expr(expr)

def swap_array_expr(self, expr: core.ArrayValue) -> Block:
# TODO: Validate schema unchanged
return Block(
expr,
index_columns=self.index_columns,
Expand Down
9 changes: 9 additions & 0 deletions bigframes/core/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ def bind_all_variables(self, bindings: Mapping[str, Expression]) -> Expression:
def is_bijective(self) -> bool:
return False

@property
def is_identity(self) -> bool:
"""True for identity operation that does not transform input."""
return False


@dataclasses.dataclass(frozen=True)
class ScalarConstantExpression(Expression):
Expand Down Expand Up @@ -173,6 +178,10 @@ def bind_all_variables(self, bindings: Mapping[str, Expression]) -> Expression:
def is_bijective(self) -> bool:
return True

@property
def is_identity(self) -> bool:
return True


@dataclasses.dataclass(frozen=True)
class OpExpression(Expression):
Expand Down
207 changes: 205 additions & 2 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
from __future__ import annotations

import abc
from dataclasses import dataclass, field, fields
from dataclasses import dataclass, field, fields, replace
import functools
import itertools
import typing
from typing import Tuple
from typing import Callable, Tuple

import pandas

Expand All @@ -39,6 +39,10 @@
import bigframes.session


# A fixed number of variable to assume for overhead on some operations
OVERHEAD_VARIABLES = 5


@dataclass(frozen=True)
class BigFrameNode:
"""
Expand Down Expand Up @@ -102,6 +106,60 @@ def roots(self) -> typing.Set[BigFrameNode]:
def schema(self) -> schemata.ArraySchema:
...

@property
@abc.abstractmethod
def variables_introduced(self) -> int:
"""
Defines number of values created by the current node. Helps represent the "width" of a query
"""
...

@property
def relation_ops_created(self) -> int:
"""
Defines the number of relational ops generated by the current node. Used to estimate query planning complexity.
"""
return 1

@property
def joins(self) -> bool:
"""
Defines whether the node joins data.
"""
return False

@functools.cached_property
def total_variables(self) -> int:
return self.variables_introduced + sum(
map(lambda x: x.total_variables, self.child_nodes)
)

@functools.cached_property
def total_relational_ops(self) -> int:
return self.relation_ops_created + sum(
map(lambda x: x.total_relational_ops, self.child_nodes)
)

@functools.cached_property
def total_joins(self) -> int:
return int(self.joins) + sum(map(lambda x: x.total_joins, self.child_nodes))

@property
def planning_complexity(self) -> int:
"""
Empirical heuristic measure of planning complexity.
Used to determine when to decompose overly complex computations. May require tuning.
"""
return self.total_variables * self.total_relational_ops * (1 + self.total_joins)

@abc.abstractmethod
def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
"""Apply a function to each child node."""
...


@dataclass(frozen=True)
class UnaryNode(BigFrameNode):
Expand All @@ -115,6 +173,11 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
def schema(self) -> schemata.ArraySchema:
return self.child.schema

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return replace(self, child=t(self.child))


@dataclass(frozen=True)
class JoinNode(BigFrameNode):
Expand Down Expand Up @@ -154,6 +217,22 @@ def join_mapping_to_schema_item(mapping: JoinColumnMapping):
)
return schemata.ArraySchema(items)

@functools.cached_property
def variables_introduced(self) -> int:
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
return OVERHEAD_VARIABLES

@property
def joins(self) -> bool:
return True

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return replace(
self, left_child=t(self.left_child), right_child=t(self.right_child)
)


@dataclass(frozen=True)
class ConcatNode(BigFrameNode):
Expand Down Expand Up @@ -182,6 +261,16 @@ def schema(self) -> schemata.ArraySchema:
)
return schemata.ArraySchema(items)

@functools.cached_property
def variables_introduced(self) -> int:
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
return len(self.schema.items) + OVERHEAD_VARIABLES

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return replace(self, children=tuple(t(child) for child in self.children))


# Input Nodex
@dataclass(frozen=True)
Expand All @@ -201,6 +290,16 @@ def roots(self) -> typing.Set[BigFrameNode]:
def schema(self) -> schemata.ArraySchema:
return self.data_schema

@functools.cached_property
def variables_introduced(self) -> int:
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
return len(self.schema.items) + 1

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return self


# TODO: Refactor to take raw gbq object reference
@dataclass(frozen=True)
Expand Down Expand Up @@ -233,6 +332,20 @@ def schema(self) -> schemata.ArraySchema:
)
return schemata.ArraySchema(items)

@functools.cached_property
def variables_introduced(self) -> int:
return len(self.columns) + len(self.hidden_ordering_columns)

@property
def relation_ops_created(self) -> int:
# Assume worst case, where readgbq actually has baked in analytic operation to generate index
return 2

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return self


# Unary nodes
@dataclass(frozen=True)
Expand All @@ -252,6 +365,14 @@ def schema(self) -> schemata.ArraySchema:
schemata.SchemaItem(self.col_id, bigframes.dtypes.INT_DTYPE)
)

@property
def relation_ops_created(self) -> int:
return 2

@functools.cached_property
def variables_introduced(self) -> int:
return 1


@dataclass(frozen=True)
class FilterNode(UnaryNode):
Expand All @@ -264,6 +385,10 @@ def row_preserving(self) -> bool:
def __hash__(self):
return self._node_hash

@property
def variables_introduced(self) -> int:
return 1


@dataclass(frozen=True)
class OrderByNode(UnaryNode):
Expand All @@ -281,6 +406,15 @@ def __post_init__(self):
def __hash__(self):
return self._node_hash

@property
def variables_introduced(self) -> int:
return 0

@property
def relation_ops_created(self) -> int:
# Doesnt directly create any relational operations
return 0


@dataclass(frozen=True)
class ReversedNode(UnaryNode):
Expand All @@ -290,6 +424,15 @@ class ReversedNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def variables_introduced(self) -> int:
return 0

@property
def relation_ops_created(self) -> int:
# Doesnt directly create any relational operations
return 0


@dataclass(frozen=True)
class ProjectionNode(UnaryNode):
Expand All @@ -315,6 +458,12 @@ def schema(self) -> schemata.ArraySchema:
)
return schemata.ArraySchema(items)

@property
def variables_introduced(self) -> int:
# ignore passthrough expressions
new_vars = sum(1 for i in self.assignments if not i[0].is_identity)
return new_vars


# TODO: Merge RowCount into Aggregate Node?
# Row count can be compute from table metadata sometimes, so it is a bit special.
Expand All @@ -334,6 +483,10 @@ def schema(self) -> schemata.ArraySchema:
(schemata.SchemaItem("count", bigframes.dtypes.INT_DTYPE),)
)

@property
def variables_introduced(self) -> int:
return 1


@dataclass(frozen=True)
class AggregateNode(UnaryNode):
Expand Down Expand Up @@ -367,6 +520,10 @@ def schema(self) -> schemata.ArraySchema:
)
return schemata.ArraySchema(tuple([*by_items, *agg_items]))

@property
def variables_introduced(self) -> int:
return len(self.aggregations) + len(self.by_column_ids)


@dataclass(frozen=True)
class WindowOpNode(UnaryNode):
Expand Down Expand Up @@ -396,12 +553,31 @@ def schema(self) -> schemata.ArraySchema:
schemata.SchemaItem(self.output_name, new_item_dtype)
)

@property
def variables_introduced(self) -> int:
return 1

@property
def relation_ops_created(self) -> int:
# Assume that if not reprojecting, that there is a sequence of window operations sharing the same window
return 0 if self.skip_reproject_unsafe else 4


# TODO: Remove this op
@dataclass(frozen=True)
class ReprojectOpNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def variables_introduced(self) -> int:
return 0

@property
def relation_ops_created(self) -> int:
# This op is not a real transformation, just a hint to the sql generator
return 0


@dataclass(frozen=True)
class UnpivotNode(UnaryNode):
Expand All @@ -428,6 +604,10 @@ def row_preserving(self) -> bool:
def non_local(self) -> bool:
return True

@property
def joins(self) -> bool:
return True

@functools.cached_property
def schema(self) -> schemata.ArraySchema:
def infer_dtype(
Expand Down Expand Up @@ -469,6 +649,17 @@ def infer_dtype(
]
return schemata.ArraySchema((*index_items, *value_items, *passthrough_items))

@property
def variables_introduced(self) -> int:
return (
len(self.schema.items) - len(self.passthrough_columns) + OVERHEAD_VARIABLES
)

@property
def relation_ops_created(self) -> int:
# Unpivot is essentially a cross join and a projection.
return 2


@dataclass(frozen=True)
class RandomSampleNode(UnaryNode):
Expand All @@ -485,6 +676,10 @@ def row_preserving(self) -> bool:
def __hash__(self):
return self._node_hash

@property
def variables_introduced(self) -> int:
return 1


@dataclass(frozen=True)
class ExplodeNode(UnaryNode):
Expand All @@ -511,3 +706,11 @@ def schema(self) -> schemata.ArraySchema:
for name in self.child.schema.names
)
return schemata.ArraySchema(items)

@property
def relation_ops_created(self) -> int:
return 3

@functools.cached_property
def variables_introduced(self) -> int:
return len(self.column_ids) + 1
Loading

0 comments on commit adc4d97

Please sign in to comment.