diff --git a/bigframes/_config/compute_options.py b/bigframes/_config/compute_options.py index fb708b844cb..2b849c558ab 100644 --- a/bigframes/_config/compute_options.py +++ b/bigframes/_config/compute_options.py @@ -40,7 +40,11 @@ class ComputeOptions: bytes billed beyond this limit will fail (without incurring a charge). If unspecified, this will be set to your project default. See `maximum_bytes_billed `_. - + enable_multi_query_execution (bool, Options): + If enabled, large queries may be factored into multiple smaller queries + in order to avoid generating queries that are too complex for the query + engine to handle. However this comes at the cost of increase cost and latency. """ maximum_bytes_billed: Optional[int] = None + enable_multi_query_execution: bool = False diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 0b6e50cfa3e..c7b41e93eb8 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1873,6 +1873,10 @@ def cached(self, *, optimize_offsets=False, force: bool = False) -> Block: expr = self.session._cache_with_cluster_cols( self.expr, cluster_cols=self.index_columns ) + return self.swap_array_expr(expr) + + def swap_array_expr(self, expr: core.ArrayValue) -> Block: + # TODO: Validate schema unchanged return Block( expr, index_columns=self.index_columns, diff --git a/bigframes/core/expression.py b/bigframes/core/expression.py index 8c3f52d22bd..4980f5369de 100644 --- a/bigframes/core/expression.py +++ b/bigframes/core/expression.py @@ -108,6 +108,11 @@ def bind_all_variables(self, bindings: Mapping[str, Expression]) -> Expression: def is_bijective(self) -> bool: return False + @property + def is_identity(self) -> bool: + """True for identity operation that does not transform input.""" + return False + @dataclasses.dataclass(frozen=True) class ScalarConstantExpression(Expression): @@ -173,6 +178,10 @@ def bind_all_variables(self, bindings: Mapping[str, Expression]) -> Expression: def is_bijective(self) -> bool: return True + @property + def is_identity(self) -> bool: + return True + @dataclasses.dataclass(frozen=True) class OpExpression(Expression): diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index d740605a56f..a1072b0d687 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -15,11 +15,11 @@ from __future__ import annotations import abc -from dataclasses import dataclass, field, fields +from dataclasses import dataclass, field, fields, replace import functools import itertools import typing -from typing import Tuple +from typing import Callable, Tuple import pandas @@ -39,6 +39,10 @@ import bigframes.session +# A fixed number of variable to assume for overhead on some operations +OVERHEAD_VARIABLES = 5 + + @dataclass(frozen=True) class BigFrameNode: """ @@ -102,6 +106,60 @@ def roots(self) -> typing.Set[BigFrameNode]: def schema(self) -> schemata.ArraySchema: ... + @property + @abc.abstractmethod + def variables_introduced(self) -> int: + """ + Defines number of values created by the current node. Helps represent the "width" of a query + """ + ... + + @property + def relation_ops_created(self) -> int: + """ + Defines the number of relational ops generated by the current node. Used to estimate query planning complexity. + """ + return 1 + + @property + def joins(self) -> bool: + """ + Defines whether the node joins data. + """ + return False + + @functools.cached_property + def total_variables(self) -> int: + return self.variables_introduced + sum( + map(lambda x: x.total_variables, self.child_nodes) + ) + + @functools.cached_property + def total_relational_ops(self) -> int: + return self.relation_ops_created + sum( + map(lambda x: x.total_relational_ops, self.child_nodes) + ) + + @functools.cached_property + def total_joins(self) -> int: + return int(self.joins) + sum(map(lambda x: x.total_joins, self.child_nodes)) + + @property + def planning_complexity(self) -> int: + """ + Empirical heuristic measure of planning complexity. + + Used to determine when to decompose overly complex computations. May require tuning. + """ + return self.total_variables * self.total_relational_ops * (1 + self.total_joins) + + @abc.abstractmethod + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + """Apply a function to each child node.""" + ... + @dataclass(frozen=True) class UnaryNode(BigFrameNode): @@ -115,6 +173,11 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]: def schema(self) -> schemata.ArraySchema: return self.child.schema + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + return replace(self, child=t(self.child)) + @dataclass(frozen=True) class JoinNode(BigFrameNode): @@ -154,6 +217,22 @@ def join_mapping_to_schema_item(mapping: JoinColumnMapping): ) return schemata.ArraySchema(items) + @functools.cached_property + def variables_introduced(self) -> int: + """Defines the number of variables generated by the current node. Used to estimate query planning complexity.""" + return OVERHEAD_VARIABLES + + @property + def joins(self) -> bool: + return True + + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + return replace( + self, left_child=t(self.left_child), right_child=t(self.right_child) + ) + @dataclass(frozen=True) class ConcatNode(BigFrameNode): @@ -182,6 +261,16 @@ def schema(self) -> schemata.ArraySchema: ) return schemata.ArraySchema(items) + @functools.cached_property + def variables_introduced(self) -> int: + """Defines the number of variables generated by the current node. Used to estimate query planning complexity.""" + return len(self.schema.items) + OVERHEAD_VARIABLES + + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + return replace(self, children=tuple(t(child) for child in self.children)) + # Input Nodex @dataclass(frozen=True) @@ -201,6 +290,16 @@ def roots(self) -> typing.Set[BigFrameNode]: def schema(self) -> schemata.ArraySchema: return self.data_schema + @functools.cached_property + def variables_introduced(self) -> int: + """Defines the number of variables generated by the current node. Used to estimate query planning complexity.""" + return len(self.schema.items) + 1 + + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + return self + # TODO: Refactor to take raw gbq object reference @dataclass(frozen=True) @@ -233,6 +332,20 @@ def schema(self) -> schemata.ArraySchema: ) return schemata.ArraySchema(items) + @functools.cached_property + def variables_introduced(self) -> int: + return len(self.columns) + len(self.hidden_ordering_columns) + + @property + def relation_ops_created(self) -> int: + # Assume worst case, where readgbq actually has baked in analytic operation to generate index + return 2 + + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + return self + # Unary nodes @dataclass(frozen=True) @@ -252,6 +365,14 @@ def schema(self) -> schemata.ArraySchema: schemata.SchemaItem(self.col_id, bigframes.dtypes.INT_DTYPE) ) + @property + def relation_ops_created(self) -> int: + return 2 + + @functools.cached_property + def variables_introduced(self) -> int: + return 1 + @dataclass(frozen=True) class FilterNode(UnaryNode): @@ -264,6 +385,10 @@ def row_preserving(self) -> bool: def __hash__(self): return self._node_hash + @property + def variables_introduced(self) -> int: + return 1 + @dataclass(frozen=True) class OrderByNode(UnaryNode): @@ -281,6 +406,15 @@ def __post_init__(self): def __hash__(self): return self._node_hash + @property + def variables_introduced(self) -> int: + return 0 + + @property + def relation_ops_created(self) -> int: + # Doesnt directly create any relational operations + return 0 + @dataclass(frozen=True) class ReversedNode(UnaryNode): @@ -290,6 +424,15 @@ class ReversedNode(UnaryNode): def __hash__(self): return self._node_hash + @property + def variables_introduced(self) -> int: + return 0 + + @property + def relation_ops_created(self) -> int: + # Doesnt directly create any relational operations + return 0 + @dataclass(frozen=True) class ProjectionNode(UnaryNode): @@ -315,6 +458,12 @@ def schema(self) -> schemata.ArraySchema: ) return schemata.ArraySchema(items) + @property + def variables_introduced(self) -> int: + # ignore passthrough expressions + new_vars = sum(1 for i in self.assignments if not i[0].is_identity) + return new_vars + # TODO: Merge RowCount into Aggregate Node? # Row count can be compute from table metadata sometimes, so it is a bit special. @@ -334,6 +483,10 @@ def schema(self) -> schemata.ArraySchema: (schemata.SchemaItem("count", bigframes.dtypes.INT_DTYPE),) ) + @property + def variables_introduced(self) -> int: + return 1 + @dataclass(frozen=True) class AggregateNode(UnaryNode): @@ -367,6 +520,10 @@ def schema(self) -> schemata.ArraySchema: ) return schemata.ArraySchema(tuple([*by_items, *agg_items])) + @property + def variables_introduced(self) -> int: + return len(self.aggregations) + len(self.by_column_ids) + @dataclass(frozen=True) class WindowOpNode(UnaryNode): @@ -396,12 +553,31 @@ def schema(self) -> schemata.ArraySchema: schemata.SchemaItem(self.output_name, new_item_dtype) ) + @property + def variables_introduced(self) -> int: + return 1 + + @property + def relation_ops_created(self) -> int: + # Assume that if not reprojecting, that there is a sequence of window operations sharing the same window + return 0 if self.skip_reproject_unsafe else 4 + +# TODO: Remove this op @dataclass(frozen=True) class ReprojectOpNode(UnaryNode): def __hash__(self): return self._node_hash + @property + def variables_introduced(self) -> int: + return 0 + + @property + def relation_ops_created(self) -> int: + # This op is not a real transformation, just a hint to the sql generator + return 0 + @dataclass(frozen=True) class UnpivotNode(UnaryNode): @@ -428,6 +604,10 @@ def row_preserving(self) -> bool: def non_local(self) -> bool: return True + @property + def joins(self) -> bool: + return True + @functools.cached_property def schema(self) -> schemata.ArraySchema: def infer_dtype( @@ -469,6 +649,17 @@ def infer_dtype( ] return schemata.ArraySchema((*index_items, *value_items, *passthrough_items)) + @property + def variables_introduced(self) -> int: + return ( + len(self.schema.items) - len(self.passthrough_columns) + OVERHEAD_VARIABLES + ) + + @property + def relation_ops_created(self) -> int: + # Unpivot is essentially a cross join and a projection. + return 2 + @dataclass(frozen=True) class RandomSampleNode(UnaryNode): @@ -485,6 +676,10 @@ def row_preserving(self) -> bool: def __hash__(self): return self._node_hash + @property + def variables_introduced(self) -> int: + return 1 + @dataclass(frozen=True) class ExplodeNode(UnaryNode): @@ -511,3 +706,11 @@ def schema(self) -> schemata.ArraySchema: for name in self.child.schema.names ) return schemata.ArraySchema(items) + + @property + def relation_ops_created(self) -> int: + return 3 + + @functools.cached_property + def variables_introduced(self) -> int: + return len(self.column_ids) + 1 diff --git a/bigframes/core/tree_properties.py b/bigframes/core/tree_properties.py index bc29f115f6d..125a7e6bff7 100644 --- a/bigframes/core/tree_properties.py +++ b/bigframes/core/tree_properties.py @@ -11,12 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations +import functools +import itertools +from typing import Dict import bigframes.core.nodes as nodes -# TODO: Convert these functions to iterative or enforce hard limit on tree depth. The below algorithms can cause stack to exceed limit. - def is_trivially_executable(node: nodes.BigFrameNode) -> bool: if local_only(node): @@ -36,3 +38,48 @@ def peekable(node: nodes.BigFrameNode) -> bool: children_peekable = all(peekable(child) for child in node.child_nodes) self_peekable = not node.non_local return children_peekable and self_peekable + + +def count_complex_nodes( + root: nodes.BigFrameNode, min_complexity: float, max_complexity: float +) -> Dict[nodes.BigFrameNode, int]: + @functools.cache + def _node_counts_inner( + subtree: nodes.BigFrameNode, + ) -> Dict[nodes.BigFrameNode, int]: + """Helper function to count occurences of duplicate nodes in a subtree. Considers only nodes in a complexity range""" + empty_counts: Dict[nodes.BigFrameNode, int] = {} + if subtree.planning_complexity >= min_complexity: + child_counts = [_node_counts_inner(child) for child in subtree.child_nodes] + node_counts = functools.reduce(_combine_counts, child_counts, empty_counts) + if subtree.planning_complexity <= max_complexity: + return _combine_counts(node_counts, {subtree: 1}) + else: + return node_counts + return empty_counts + + return _node_counts_inner(root) + + +def replace_nodes( + root: nodes.BigFrameNode, + to_replace: nodes.BigFrameNode, + replacemenet: nodes.BigFrameNode, +): + @functools.cache + def apply_substition(n: nodes.BigFrameNode) -> nodes.BigFrameNode: + if n == to_replace: + return replacemenet + else: + return n.transform_children(apply_substition) + + return root.transform_children(apply_substition) + + +def _combine_counts( + left: Dict[nodes.BigFrameNode, int], right: Dict[nodes.BigFrameNode, int] +) -> Dict[nodes.BigFrameNode, int]: + return { + key: left.get(key, 0) + right.get(key, 0) + for key in itertools.chain(left.keys(), right.keys()) + } diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 9687bc7ff18..0ac9beb9add 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1123,7 +1123,7 @@ def to_pandas( downsampled rows and all columns of this DataFrame. """ # TODO(orrbradford): Optimize this in future. Potentially some cases where we can return the stored query job - + self._optimize_query_complexity() df, query_job = self._block.to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, @@ -1135,6 +1135,7 @@ def to_pandas( def to_pandas_batches(self) -> Iterable[pandas.DataFrame]: """Stream DataFrame results to an iterable of pandas DataFrame""" + self._optimize_query_complexity() return self._block.to_pandas_batches() def _compute_dry_run(self) -> bigquery.QueryJob: @@ -3133,6 +3134,7 @@ def _run_io_query( """Executes a query job presenting this dataframe and returns the destination table.""" session = self._block.expr.session + self._optimize_query_complexity() export_array, id_overrides = self._prepare_export( index=index, ordering_id=ordering_id ) @@ -3269,6 +3271,14 @@ def _cached(self, *, force: bool = False) -> DataFrame: self._set_block(self._block.cached(force=force)) return self + def _optimize_query_complexity(self): + """Reduce query complexity by caching repeated subtrees and recursively materializing maximum-complexity subtrees. + May generate many queries and take substantial time to execute. + """ + # TODO: Move all this to session + new_expr = self._session._simplify_with_caching(self._block.expr) + self._set_block(self._block.swap_array_expr(new_expr)) + _DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries") def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: diff --git a/bigframes/series.py b/bigframes/series.py index e4d48904b0b..185891bc010 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -176,6 +176,7 @@ def __len__(self): return self.shape[0] def __iter__(self) -> typing.Iterator: + self._optimize_query_complexity() return itertools.chain.from_iterable( map(lambda x: x.squeeze(axis=1), self._block.to_pandas_batches()) ) @@ -328,6 +329,7 @@ def to_pandas( pandas.Series: A pandas Series with all rows of this Series if the data_sampling_threshold_mb is not exceeded; otherwise, a pandas Series with downsampled rows of the DataFrame. """ + self._optimize_query_complexity() df, query_job = self._block.to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, @@ -1603,6 +1605,14 @@ def _cached(self, *, force: bool = True) -> Series: self._set_block(self._block.cached(force=force)) return self + def _optimize_query_complexity(self): + """Reduce query complexity by caching repeated subtrees and recursively materializing maximum-complexity subtrees. + May generate many queries and take substantial time to execute. + """ + # TODO: Move all this to session + new_expr = self._block.session._simplify_with_caching(self._block.expr) + self._set_block(self._block.swap_array_expr(new_expr)) + def _is_list_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Sequence]: return pandas.api.types.is_list_like(obj) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 671a3d65e73..354352f1c9f 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -80,6 +80,7 @@ import bigframes.core.blocks as blocks import bigframes.core.compile import bigframes.core.guid as guid +import bigframes.core.nodes as nodes from bigframes.core.ordering import IntegerEncoding import bigframes.core.ordering as order import bigframes.core.tree_properties as traversals @@ -120,6 +121,11 @@ # Also must assume that text encoding as literals is much less efficient than in-memory representation. MAX_INLINE_DF_BYTES = 5000 +# Max complexity that should be executed as a single query +QUERY_COMPLEXITY_LIMIT = 1e7 +# Number of times to factor out subqueries before giving up. +MAX_SUBTREE_FACTORINGS = 5 + logger = logging.getLogger(__name__) # Excludes geography, bytes, and nested (array, struct) datatypes @@ -1851,6 +1857,52 @@ def _cache_with_offsets(self, array_value: core.ArrayValue) -> core.ArrayValue: ordering=order.ExpressionOrdering.from_offset_col("bigframes_offsets"), ) + def _simplify_with_caching(self, array_value: core.ArrayValue) -> core.ArrayValue: + """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" + if not bigframes.options.compute.enable_multi_query_execution: + return array_value + node = array_value.node + if node.planning_complexity < QUERY_COMPLEXITY_LIMIT: + return array_value + + for _ in range(MAX_SUBTREE_FACTORINGS): + updated = self._cache_most_complex_subtree(node) + if updated is None: + return core.ArrayValue(node) + else: + node = updated + + return core.ArrayValue(node) + + def _cache_most_complex_subtree( + self, node: nodes.BigFrameNode + ) -> Optional[nodes.BigFrameNode]: + # TODO: If query fails, retry with lower complexity limit + valid_candidates = traversals.count_complex_nodes( + node, + min_complexity=(QUERY_COMPLEXITY_LIMIT / 500), + max_complexity=QUERY_COMPLEXITY_LIMIT, + ).items() + # Heuristic: subtree_compleixty * (copies of subtree)^2 + best_candidate = max( + valid_candidates, + key=lambda i: i[0].planning_complexity + (i[1] ** 2), + default=None, + ) + + if best_candidate is None: + # No good subtrees to cache, just return original tree + return None + + # TODO: Add clustering columns based on access patterns + materialized = self._cache_with_cluster_cols( + core.ArrayValue(best_candidate[0]), [] + ).node + + return traversals.replace_nodes( + node, to_replace=best_candidate[0], replacemenet=materialized + ) + def _is_trivially_executable(self, array_value: core.ArrayValue): """ Can the block be evaluated very cheaply? diff --git a/tests/system/conftest.py b/tests/system/conftest.py index a108ff4a8e9..70ff6eee39d 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -953,6 +953,14 @@ def restore_sampling_settings(): bigframes.options.sampling.max_download_size = max_download_size +@pytest.fixture() +def with_multiquery_execution(): + original_setting = bigframes.options.compute.enable_multi_query_execution + bigframes.options.compute.enable_multi_query_execution = True + yield + bigframes.options.compute.enable_multi_query_execution = original_setting + + @pytest.fixture() def weird_strings_pd(): df = pd.DataFrame( diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index b15447e450c..9ce46878a91 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -4173,6 +4173,56 @@ def test_recursion_limit(scalars_df_index): scalars_df_index.to_pandas() +def test_query_complexity_repeated_joins( + scalars_df_index, scalars_pandas_df_index, with_multiquery_execution +): + pd_df = scalars_pandas_df_index + bf_df = scalars_df_index + for _ in range(6): + # recursively join, resuling in 2^6 - 1 = 63 joins + pd_df = pd_df.merge(pd_df, on="int64_col").head(30) + pd_df = pd_df[pd_df.columns[:20]] + bf_df = bf_df.merge(bf_df, on="int64_col").head(30) + bf_df = bf_df[bf_df.columns[:20]] + + bf_result = bf_df.to_pandas() + pd_result = pd_df + assert_pandas_df_equal(bf_result, pd_result, check_index_type=False) + + +def test_query_complexity_repeated_subtrees( + scalars_df_index, scalars_pandas_df_index, with_multiquery_execution +): + # Recursively union the data, if fully inlined has 10^5 identical root tables. + pd_df = scalars_pandas_df_index + bf_df = scalars_df_index + for _ in range(5): + pd_df = pd.concat(10 * [pd_df]).head(5) + bf_df = bigframes.pandas.concat(10 * [bf_df]).head(5) + bf_result = bf_df.to_pandas() + pd_result = pd_df + assert_pandas_df_equal(bf_result, pd_result) + + +@pytest.mark.skipif( + sys.version_info >= (3, 12), + # See: https://github.com/python/cpython/issues/112282 + reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.", +) +def test_query_complexity_repeated_analytic( + scalars_df_index, scalars_pandas_df_index, with_multiquery_execution +): + bf_df = scalars_df_index[["int64_col", "int64_too"]] + pd_df = scalars_pandas_df_index[["int64_col", "int64_too"]] + # Uses LAG analytic operator, each in a new SELECT + for _ in range(50): + bf_df = bf_df.diff() + pd_df = pd_df.diff() + bf_result = bf_df.to_pandas() + pd_result = pd_df + assert_pandas_df_equal(bf_result, pd_result) + + def test_to_pandas_downsampling_option_override(session): df = session.read_gbq("bigframes-dev.bigframes_tests_sys.batting") download_size = 1