From 18e511596045d3d20eb1aebdc43ff507e117eb1d Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 7 Mar 2024 01:32:05 +0000 Subject: [PATCH 1/7] perf: multi-query execution of complex dataframes --- bigframes/core/blocks.py | 4 + bigframes/core/nodes.py | 114 ++++++++++++++++++++++++++- bigframes/core/traversal.py | 3 + bigframes/dataframe.py | 11 +++ bigframes/series.py | 10 +++ bigframes/session/__init__.py | 98 +++++++++++++++++++++++ tests/system/small/test_dataframe.py | 25 ++++++ 7 files changed, 263 insertions(+), 2 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 993f2caa47..c31123beee 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1808,6 +1808,10 @@ def cached(self, *, optimize_offsets=False, force: bool = False) -> Block: expr = self.session._cache_with_cluster_cols( self.expr, cluster_cols=self.index_columns ) + return self.swap_array_expr(expr) + + def swap_array_expr(self, expr: core.ArrayValue) -> Block: + # TODO: Validate schema unchanged return Block( expr, index_columns=self.index_columns, diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 1cd3277cbc..b12cb48e0d 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -14,11 +14,12 @@ from __future__ import annotations -from dataclasses import dataclass, field, fields +import abc +from dataclasses import dataclass, field, fields, replace import functools import itertools import typing -from typing import Tuple +from typing import Callable, Tuple import pandas @@ -100,6 +101,19 @@ def roots(self) -> typing.Set[BigFrameNode]: ) return set(roots) + @functools.cached_property + @abc.abstractmethod + def complexity(self) -> int: + """A crude measure of the query complexity. Not necessarily predictive of the complexity of the actual computation.""" + ... + + @abc.abstractmethod + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + """Apply a function to each child node.""" + ... + @dataclass(frozen=True) class UnaryNode(BigFrameNode): @@ -109,6 +123,15 @@ class UnaryNode(BigFrameNode): def child_nodes(self) -> typing.Sequence[BigFrameNode]: return (self.child,) + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + return replace(self, child=t(self.child)) + + @functools.cached_property + def complexity(self) -> int: + return self.child.complexity + 1 + @dataclass(frozen=True) class JoinNode(BigFrameNode): @@ -138,6 +161,18 @@ def peekable(self) -> bool: single_root = len(self.roots) == 1 return children_peekable and single_root + @functools.cached_property + def complexity(self) -> int: + child_complexity = sum(child.complexity for child in self.child_nodes) + return child_complexity * 2 + + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + return replace( + self, left_child=t(self.left_child), right_child=t(self.right_child) + ) + @dataclass(frozen=True) class ConcatNode(BigFrameNode): @@ -150,6 +185,16 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]: def __hash__(self): return self._node_hash + @functools.cached_property + def complexity(self) -> int: + child_complexity = sum(child.complexity for child in self.child_nodes) + return child_complexity * 2 + + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + return replace(self, children=tuple(t(child) for child in self.children)) + # Input Nodex @dataclass(frozen=True) @@ -167,6 +212,16 @@ def peekable(self) -> bool: def roots(self) -> typing.Set[BigFrameNode]: return {self} + @functools.cached_property + def complexity(self) -> int: + # TODO: Set to number of columns once this is more readily available + return 500 + + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + return self + # TODO: Refactor to take raw gbq object reference @dataclass(frozen=True) @@ -192,6 +247,15 @@ def peekable(self) -> bool: def roots(self) -> typing.Set[BigFrameNode]: return {self} + @functools.cached_property + def complexity(self) -> int: + return len(self.columns) + 5 + + def transform_children( + self, t: Callable[[BigFrameNode], BigFrameNode] + ) -> BigFrameNode: + return self + # Unary nodes @dataclass(frozen=True) @@ -209,6 +273,10 @@ def peekable(self) -> bool: def non_local(self) -> bool: return False + @functools.cached_property + def complexity(self) -> int: + return self.child.complexity * 2 + @dataclass(frozen=True) class FilterNode(UnaryNode): @@ -221,6 +289,10 @@ def row_preserving(self) -> bool: def __hash__(self): return self._node_hash + @functools.cached_property + def complexity(self) -> int: + return self.child.complexity + 1 + @dataclass(frozen=True) class OrderByNode(UnaryNode): @@ -229,6 +301,10 @@ class OrderByNode(UnaryNode): def __hash__(self): return self._node_hash + @functools.cached_property + def complexity(self) -> int: + return self.child.complexity + 1 + @dataclass(frozen=True) class ReversedNode(UnaryNode): @@ -238,6 +314,10 @@ class ReversedNode(UnaryNode): def __hash__(self): return self._node_hash + @functools.cached_property + def complexity(self) -> int: + return self.child.complexity + 1 + @dataclass(frozen=True) class ProjectionNode(UnaryNode): @@ -246,6 +326,10 @@ class ProjectionNode(UnaryNode): def __hash__(self): return self._node_hash + @functools.cached_property + def complexity(self) -> int: + return self.child.complexity + 1 + # TODO: Merge RowCount into Aggregate Node? # Row count can be compute from table metadata sometimes, so it is a bit special. @@ -259,6 +343,10 @@ def row_preserving(self) -> bool: def non_local(self) -> bool: return True + @functools.cached_property + def complexity(self) -> int: + return self.child.complexity + 1 + @dataclass(frozen=True) class AggregateNode(UnaryNode): @@ -281,6 +369,10 @@ def peekable(self) -> bool: def non_local(self) -> bool: return True + @functools.cached_property + def complexity(self) -> int: + return self.child.complexity * 2 + @dataclass(frozen=True) class WindowOpNode(UnaryNode): @@ -302,12 +394,22 @@ def peekable(self) -> bool: def non_local(self) -> bool: return True + @functools.cached_property + def complexity(self) -> int: + if self.skip_reproject_unsafe: + return self.child.complexity + return self.child.complexity * 2 + @dataclass(frozen=True) class ReprojectOpNode(UnaryNode): def __hash__(self): return self._node_hash + @functools.cached_property + def complexity(self) -> int: + return self.child.complexity * 2 + @dataclass(frozen=True) class UnpivotNode(UnaryNode): @@ -337,6 +439,10 @@ def non_local(self) -> bool: def peekable(self) -> bool: return False + @functools.cached_property + def complexity(self) -> int: + return self.child.complexity * 2 + @dataclass(frozen=True) class RandomSampleNode(UnaryNode): @@ -350,5 +456,9 @@ def deterministic(self) -> bool: def row_preserving(self) -> bool: return False + @functools.cached_property + def complexity(self) -> int: + return self.child.complexity + 1 + def __hash__(self): return self._node_hash diff --git a/bigframes/core/traversal.py b/bigframes/core/traversal.py index b038ee6599..a008a8b807 100644 --- a/bigframes/core/traversal.py +++ b/bigframes/core/traversal.py @@ -12,9 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import functools + import bigframes.core.nodes as nodes +@functools.cache def is_trivially_executable(node: nodes.BigFrameNode) -> bool: if local_only(node): return True diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 24c4699473..85cb8233af 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -1060,6 +1060,7 @@ def to_pandas( downsampled rows and all columns of this DataFrame. """ # TODO(orrbradford): Optimize this in future. Potentially some cases where we can return the stored query job + self._optimize_query_complexity() df, query_job = self._block.to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, @@ -1071,6 +1072,7 @@ def to_pandas( def to_pandas_batches(self) -> Iterable[pandas.DataFrame]: """Stream DataFrame results to an iterable of pandas DataFrame""" + self._optimize_query_complexity() return self._block.to_pandas_batches() def _compute_dry_run(self) -> bigquery.QueryJob: @@ -2973,6 +2975,7 @@ def _run_io_query( """Executes a query job presenting this dataframe and returns the destination table.""" session = self._block.expr.session + self._optimize_query_complexity() export_array, id_overrides = self._prepare_export( index=index, ordering_id=ordering_id ) @@ -3109,6 +3112,14 @@ def _cached(self, *, force: bool = False) -> DataFrame: self._set_block(self._block.cached(force=force)) return self + def _optimize_query_complexity(self): + """Reduce query complexity by caching repeated subtrees and recursively materializing maximum-complexity subtrees. + May generate many queries and take substantial time to execute. + """ + # TODO: Move all this to session + new_expr = self._session._simplify_with_caching(self._block.expr) + self._set_block(self._block.swap_array_expr(new_expr)) + _DataFrameOrSeries = typing.TypeVar("_DataFrameOrSeries") def dot(self, other: _DataFrameOrSeries) -> _DataFrameOrSeries: diff --git a/bigframes/series.py b/bigframes/series.py index dfa6fa4b0d..18202f5ddd 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -160,6 +160,7 @@ def __len__(self): return self.shape[0] def __iter__(self) -> typing.Iterator: + self._optimize_query_complexity() return itertools.chain.from_iterable( map(lambda x: x.squeeze(axis=1), self._block.to_pandas_batches()) ) @@ -312,6 +313,7 @@ def to_pandas( pandas.Series: A pandas Series with all rows of this Series if the data_sampling_threshold_mb is not exceeded; otherwise, a pandas Series with downsampled rows of the DataFrame. """ + self._optimize_query_complexity() df, query_job = self._block.to_pandas( max_download_size=max_download_size, sampling_method=sampling_method, @@ -1573,6 +1575,14 @@ def _cached(self, *, force: bool = True) -> Series: self._set_block(self._block.cached(force=force)) return self + def _optimize_query_complexity(self): + """Reduce query complexity by caching repeated subtrees and recursively materializing maximum-complexity subtrees. + May generate many queries and take substantial time to execute. + """ + # TODO: Move all this to session + new_expr = self._block.session._simplify_with_caching(self._block.expr) + self._set_block(self._block.swap_array_expr(new_expr)) + def _is_list_like(obj: typing.Any) -> typing_extensions.TypeGuard[typing.Sequence]: return pandas.api.types.is_list_like(obj) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index b553865ea9..4706589329 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -17,8 +17,10 @@ from __future__ import annotations import datetime +import functools import itertools import logging +import math import os import re import typing @@ -69,6 +71,7 @@ import bigframes.core.blocks as blocks import bigframes.core.compile import bigframes.core.guid as guid +import bigframes.core.nodes as nodes from bigframes.core.ordering import IntegerEncoding, OrderingColumnReference import bigframes.core.ordering as orderings import bigframes.core.traversal as traversals @@ -110,6 +113,16 @@ # TODO(tbergeron): Convert to bytes-based limit MAX_INLINE_DF_SIZE = 5000 +# Chosen to prevent very large expression trees from being directly executed as a single query +# Beyond this limit, expression should be cached or decomposed into multiple queries for execution. +COMPLEXITY_SOFT_LIMIT = 2e5 +# Beyond the hard limite, even decomposing the query is unlikely to succeed +COMPLEXITY_HARD_LIMIT = 1e25 +# Number of times to factor out and cache a repeated subtree +MAX_SUBTREE_FACTORINGS = 10 +# Limits how much bigframes will attempt to decompose complex queries into smaller queries +MAX_DECOMPOSITION_STEPS = 5 + logger = logging.getLogger(__name__) @@ -1671,6 +1684,70 @@ def _cache_with_offsets(self, array_value: core.ArrayValue) -> core.ArrayValue: ordering=orderings.ExpressionOrdering.from_offset_col("bigframes_offsets"), ) + def _cache_complex_subtrees( + self, node: nodes.BigFrameNode, max_iterations: int + ) -> nodes.BigFrameNode: + """If the computaiton is complex, execute subtrees of the compuation. + Main goal is to reduce query planning complexity, not query cost. + """ + root = node + # Identify node that maximizes complexity * (repetitions - 1) + # Recurse into subtrees below complexity limit, then cache each of those + + for _ in range(max_iterations): + updated = self._cache_most_complex_subtree(root) + if updated is not None: + root = updated + else: + return root + return root + + def _cache_most_complex_subtree( + self, node: nodes.BigFrameNode + ) -> Optional[nodes.BigFrameNode]: + node_counts = _node_counts(node) + # Only consider nodes the occur at least twice + # valid_candidates = filter(lambda x: x[1] >= 2, node_counts.items()) + valid_candidates = node_counts.items() + # Heuristic: log(Complexity) * (copies of subtree) + best_candidate = max( + valid_candidates, + key=lambda i: math.log(i[0].complexity) * i[1], + default=None, + ) + + if best_candidate is None: + # No good subtrees to cache, just return original tree + return None + + node_to_replace = best_candidate[0] + + # TODO: Add clustering columns based on access patterns + cached_node = self._cache_with_cluster_cols( + core.ArrayValue(best_candidate[0]), [] + ).node + + @functools.cache + def apply_substition(n: nodes.BigFrameNode) -> nodes.BigFrameNode: + if n == node_to_replace: + return cached_node + else: + return n.transform_children(apply_substition) + + return node.transform_children(apply_substition) + + def _simplify_with_caching(self, array_value: core.ArrayValue) -> core.ArrayValue: + """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" + node = array_value.node + if node.complexity < COMPLEXITY_SOFT_LIMIT: + return array_value + node = self._cache_complex_subtrees(node, max_iterations=MAX_SUBTREE_FACTORINGS) + if node.complexity > COMPLEXITY_HARD_LIMIT: + raise ValueError( + f"This dataframe is too complex to convert to SQL queries. Internal complexity measure: {node.complexity}" + ) + return core.ArrayValue(node) + def _is_trivially_executable(self, array_value: core.ArrayValue): """ Can the block be evaluated very cheaply? @@ -1802,3 +1879,24 @@ def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringVa # Escape backslashes and use backslash as delineator escaped = typing.cast(ibis_types.StringColumn, result.fillna("")).replace("\\", "\\\\") # type: ignore return typing.cast(ibis_types.StringColumn, ibis.literal("\\")).concat(escaped) + + +@functools.cache +def _node_counts(subtree: nodes.BigFrameNode) -> Dict[nodes.BigFrameNode, int]: + """Helper function to count occurences of duplicate nodes in a subtree. Considers only nodes in a complexity range""" + if subtree.complexity > COMPLEXITY_SOFT_LIMIT / 50: + child_counts = [_node_counts(child) for child in subtree.child_nodes] + node_counts = functools.reduce( + lambda x, y: dict( + (key, x.get(key, 0) + y.get(key, 0)) + for key in itertools.chain(x.keys(), y.keys()) + ), + child_counts, + ) + + if subtree.complexity < COMPLEXITY_SOFT_LIMIT: + node_count = node_counts.get(subtree, 0) + 1 + node_counts[subtree] = node_count + return node_counts + else: + return dict() diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 9f4e138b73..0444f5c39b 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -27,6 +27,7 @@ import bigframes import bigframes._config.display_options as display_options import bigframes.dataframe as dataframe +import bigframes.pandas import bigframes.series as series from tests.system.utils import ( assert_pandas_df_equal, @@ -3882,6 +3883,30 @@ def test_recursion_limit(scalars_df_index): scalars_df_index.to_pandas() +def test_query_complexity_repeated_subtrees(scalars_df_index, scalars_pandas_df_index): + # Recursively union the data, if fully decorrelated has 10^5 identical root tables. + pd_df = scalars_pandas_df_index + bf_df = scalars_df_index + for _ in range(5): + pd_df = pd.concat(10 * [pd_df]).head(5) + bf_df = bigframes.pandas.concat(10 * [bf_df]).head(5) + bf_result = bf_df.to_pandas() + pd_result = pd_df + assert_pandas_df_equal(bf_result, pd_result) + + +def test_query_complexity_repeated_analytic(scalars_df_index, scalars_pandas_df_index): + bf_df = scalars_df_index[["int64_col", "int64_too"]] + pd_df = scalars_pandas_df_index[["int64_col", "int64_too"]] + # Uses LAG analytic operator, each in a new SELECT + for _ in range(50): + bf_df = bf_df.diff() + pd_df = pd_df.diff() + bf_result = bf_df.to_pandas() + pd_result = pd_df + assert_pandas_df_equal(bf_result, pd_result) + + def test_to_pandas_downsampling_option_override(session): df = session.read_gbq("bigframes-dev.bigframes_tests_sys.batting") download_size = 1 From f3a1671d130d82dc52be2116b5839ca1071ae8cd Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 12 Mar 2024 17:29:41 +0000 Subject: [PATCH 2/7] add config flag for multi query execution --- bigframes/_config/compute_options.py | 6 +++++- bigframes/session/__init__.py | 4 +++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/bigframes/_config/compute_options.py b/bigframes/_config/compute_options.py index 20c31d3906..5d5a74c245 100644 --- a/bigframes/_config/compute_options.py +++ b/bigframes/_config/compute_options.py @@ -29,7 +29,11 @@ class ComputeOptions: bytes billed beyond this limit will fail (without incurring a charge). If unspecified, this will be set to your project default. See `maximum_bytes_billed `_. - + enable_multi_query_execution (bool, Options): + If enabled, large queries may be factored into multiple smaller queries + in order to avoid generating queries that are too complex for the query + engine to handle. However this comes at the cost of increase cost and latency. """ maximum_bytes_billed: Optional[int] = None + enable_multi_query_execution: bool = True diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 4706589329..66d9a1dd2b 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1712,7 +1712,7 @@ def _cache_most_complex_subtree( # Heuristic: log(Complexity) * (copies of subtree) best_candidate = max( valid_candidates, - key=lambda i: math.log(i[0].complexity) * i[1], + key=lambda i: math.log(i[0].complexity) + i[1], default=None, ) @@ -1738,6 +1738,8 @@ def apply_substition(n: nodes.BigFrameNode) -> nodes.BigFrameNode: def _simplify_with_caching(self, array_value: core.ArrayValue) -> core.ArrayValue: """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" + if not bigframes.options.compute.enable_multi_query_execution: + return array_value node = array_value.node if node.complexity < COMPLEXITY_SOFT_LIMIT: return array_value From bbdbdd38fff82aa0e61a393e32e1d03bace8d509 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 1 Apr 2024 21:46:24 +0000 Subject: [PATCH 3/7] refactor algorithm --- bigframes/core/nodes.py | 2 +- bigframes/core/tree_properties.py | 51 +++++++++++- bigframes/session/__init__.py | 116 +++++++++------------------ tests/system/small/test_dataframe.py | 2 +- 4 files changed, 87 insertions(+), 84 deletions(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 78af0e15bd..aee28fd221 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -136,7 +136,7 @@ def total_relational_ops(self) -> int: @property def planning_complexity(self) -> int: """Heuristic measure of planning complexity. Used to determine when to decompose overly complex computations.""" - return self.total_variables * self.total_relational_ops + return self.total_variables * (self.total_relational_ops**2) @abc.abstractmethod def transform_children( diff --git a/bigframes/core/tree_properties.py b/bigframes/core/tree_properties.py index bc29f115f6..125a7e6bff 100644 --- a/bigframes/core/tree_properties.py +++ b/bigframes/core/tree_properties.py @@ -11,12 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import annotations +import functools +import itertools +from typing import Dict import bigframes.core.nodes as nodes -# TODO: Convert these functions to iterative or enforce hard limit on tree depth. The below algorithms can cause stack to exceed limit. - def is_trivially_executable(node: nodes.BigFrameNode) -> bool: if local_only(node): @@ -36,3 +38,48 @@ def peekable(node: nodes.BigFrameNode) -> bool: children_peekable = all(peekable(child) for child in node.child_nodes) self_peekable = not node.non_local return children_peekable and self_peekable + + +def count_complex_nodes( + root: nodes.BigFrameNode, min_complexity: float, max_complexity: float +) -> Dict[nodes.BigFrameNode, int]: + @functools.cache + def _node_counts_inner( + subtree: nodes.BigFrameNode, + ) -> Dict[nodes.BigFrameNode, int]: + """Helper function to count occurences of duplicate nodes in a subtree. Considers only nodes in a complexity range""" + empty_counts: Dict[nodes.BigFrameNode, int] = {} + if subtree.planning_complexity >= min_complexity: + child_counts = [_node_counts_inner(child) for child in subtree.child_nodes] + node_counts = functools.reduce(_combine_counts, child_counts, empty_counts) + if subtree.planning_complexity <= max_complexity: + return _combine_counts(node_counts, {subtree: 1}) + else: + return node_counts + return empty_counts + + return _node_counts_inner(root) + + +def replace_nodes( + root: nodes.BigFrameNode, + to_replace: nodes.BigFrameNode, + replacemenet: nodes.BigFrameNode, +): + @functools.cache + def apply_substition(n: nodes.BigFrameNode) -> nodes.BigFrameNode: + if n == to_replace: + return replacemenet + else: + return n.transform_children(apply_substition) + + return root.transform_children(apply_substition) + + +def _combine_counts( + left: Dict[nodes.BigFrameNode, int], right: Dict[nodes.BigFrameNode, int] +) -> Dict[nodes.BigFrameNode, int]: + return { + key: left.get(key, 0) + right.get(key, 0) + for key in itertools.chain(left.keys(), right.keys()) + } diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 186f104049..82555d0b5b 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -18,10 +18,8 @@ import copy import datetime -import functools import itertools import logging -import math import os import re import typing @@ -121,15 +119,12 @@ # TODO(tbergeron): Convert to bytes-based limit MAX_INLINE_DF_SIZE = 5000 -# Chosen to prevent very large expression trees from being directly executed as a single query -# Beyond this limit, expression should be cached or decomposed into multiple queries for execution. -COMPLEXITY_SOFT_LIMIT = 2e5 +# Max complexity that should be executed as a single query +COMPLEXITY_SOFT_LIMIT = 1e7 # Beyond the hard limite, even decomposing the query is unlikely to succeed -COMPLEXITY_HARD_LIMIT = 1e25 -# Number of times to factor out and cache a repeated subtree -MAX_SUBTREE_FACTORINGS = 10 -# Limits how much bigframes will attempt to decompose complex queries into smaller queries -MAX_DECOMPOSITION_STEPS = 5 +COMPLEXITY_HARD_LIMIT = 1e9 +# Number of times to factor out subqueries before giving up. +MAX_SUBTREE_FACTORINGS = 5 logger = logging.getLogger(__name__) @@ -1828,17 +1823,38 @@ def _cache_with_offsets(self, array_value: core.ArrayValue) -> core.ArrayValue: ordering=order.ExpressionOrdering.from_offset_col("bigframes_offsets"), ) + def _simplify_with_caching(self, array_value: core.ArrayValue) -> core.ArrayValue: + """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" + if not bigframes.options.compute.enable_multi_query_execution: + return array_value + print(f"Query has complexity: {array_value.node.planning_complexity}") + node = array_value.node + if node.planning_complexity < COMPLEXITY_SOFT_LIMIT: + return array_value + + for _ in range(MAX_SUBTREE_FACTORINGS): + updated = self._cache_most_complex_subtree(node) + if updated is None: + print(f"Could not further factor query: {node.planning_complexity}") + return core.ArrayValue(node) + else: + print(f"Refactored query has complexity: {node.planning_complexity}") + node = updated + + return core.ArrayValue(node) + def _cache_most_complex_subtree( self, node: nodes.BigFrameNode ) -> Optional[nodes.BigFrameNode]: - node_counts = _node_counts(node) - # Only consider nodes the occur at least twice - # valid_candidates = filter(lambda x: x[1] >= 2, node_counts.items()) - valid_candidates = node_counts.items() - # Heuristic: log(Complexity) * (copies of subtree) + valid_candidates = traversals.count_complex_nodes( + node, + min_complexity=(COMPLEXITY_SOFT_LIMIT / 500), + max_complexity=COMPLEXITY_SOFT_LIMIT, + ).items() + # Heuristic: subtree_compleixty * (copies of subtree)^2 best_candidate = max( valid_candidates, - key=lambda i: math.log(i[0].planning_complexity) + i[1], + key=lambda i: i[0].planning_complexity + (i[1] ** 2), default=None, ) @@ -1846,53 +1862,14 @@ def _cache_most_complex_subtree( # No good subtrees to cache, just return original tree return None - node_to_replace = best_candidate[0] - # TODO: Add clustering columns based on access patterns - cached_node = self._cache_with_cluster_cols( + materialized = self._cache_with_cluster_cols( core.ArrayValue(best_candidate[0]), [] ).node - @functools.cache - def apply_substition(n: nodes.BigFrameNode) -> nodes.BigFrameNode: - if n == node_to_replace: - return cached_node - else: - return n.transform_children(apply_substition) - - return node.transform_children(apply_substition) - - def _simplify_with_caching(self, array_value: core.ArrayValue) -> core.ArrayValue: - """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" - if not bigframes.options.compute.enable_multi_query_execution: - return array_value - node = array_value.node - if node.planning_complexity < COMPLEXITY_SOFT_LIMIT: - return array_value - node = self._cache_complex_subtrees(node, max_iterations=MAX_SUBTREE_FACTORINGS) - if node.planning_complexity > COMPLEXITY_HARD_LIMIT: - raise ValueError( - f"This dataframe is too complex to convert to SQL queries. Internal complexity measure: {node.planning_complexity}" - ) - return core.ArrayValue(node) - - def _cache_complex_subtrees( - self, node: nodes.BigFrameNode, max_iterations: int - ) -> nodes.BigFrameNode: - """If the computaiton is complex, execute subtrees of the compuation. - Main goal is to reduce query planning complexity, not query cost. - """ - root = node - # Identify node that maximizes complexity * (repetitions - 1) - # Recurse into subtrees below complexity limit, then cache each of those - - for _ in range(max_iterations): - updated = self._cache_most_complex_subtree(root) - if updated is not None: - root = updated - else: - return root - return root + return traversals.replace_nodes( + node, to_replace=best_candidate[0], replacemenet=materialized + ) def _is_trivially_executable(self, array_value: core.ArrayValue): """ @@ -2049,24 +2026,3 @@ def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict: configuration["jobTimeoutMs"] = timeout_ms return configuration - - -@functools.cache -def _node_counts(subtree: nodes.BigFrameNode) -> Dict[nodes.BigFrameNode, int]: - """Helper function to count occurences of duplicate nodes in a subtree. Considers only nodes in a complexity range""" - if subtree.planning_complexity > COMPLEXITY_SOFT_LIMIT / 50: - child_counts = [_node_counts(child) for child in subtree.child_nodes] - node_counts = functools.reduce( - lambda x, y: dict( - (key, x.get(key, 0) + y.get(key, 0)) - for key in itertools.chain(x.keys(), y.keys()) - ), - child_counts, - ) - - if subtree.planning_complexity < COMPLEXITY_SOFT_LIMIT: - node_count = node_counts.get(subtree, 0) + 1 - node_counts[subtree] = node_count - return node_counts - else: - return dict() diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 9e68520f60..ce7716b742 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -4106,7 +4106,7 @@ def test_recursion_limit(scalars_df_index): def test_query_complexity_repeated_subtrees(scalars_df_index, scalars_pandas_df_index): - # Recursively union the data, if fully decorrelated has 10^5 identical root tables. + # Recursively union the data, if fully inlined has 10^5 identical root tables. pd_df = scalars_pandas_df_index bf_df = scalars_df_index for _ in range(5): From fd3e2836c70beb64f5f5693e360711813bc0cf2a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 2 Apr 2024 17:39:35 +0000 Subject: [PATCH 4/7] disable recursive window test for py3.12 --- tests/system/small/test_dataframe.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index ce7716b742..b34cc7d315 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -4117,6 +4117,11 @@ def test_query_complexity_repeated_subtrees(scalars_df_index, scalars_pandas_df_ assert_pandas_df_equal(bf_result, pd_result) +@pytest.mark.skipif( + sys.version_info >= (3, 12), + # See: https://github.com/python/cpython/issues/112282 + reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.", +) def test_query_complexity_repeated_analytic(scalars_df_index, scalars_pandas_df_index): bf_df = scalars_df_index[["int64_col", "int64_too"]] pd_df = scalars_pandas_df_index[["int64_col", "int64_too"]] From 1baf843b90e525ef041db3bfe51911d6dad0125c Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 2 Apr 2024 17:47:53 +0000 Subject: [PATCH 5/7] remove print statements --- bigframes/session/__init__.py | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 82555d0b5b..686836c868 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -120,9 +120,7 @@ MAX_INLINE_DF_SIZE = 5000 # Max complexity that should be executed as a single query -COMPLEXITY_SOFT_LIMIT = 1e7 -# Beyond the hard limite, even decomposing the query is unlikely to succeed -COMPLEXITY_HARD_LIMIT = 1e9 +QUERY_COMPLEXITY_LIMIT = 1e7 # Number of times to factor out subqueries before giving up. MAX_SUBTREE_FACTORINGS = 5 @@ -1827,18 +1825,15 @@ def _simplify_with_caching(self, array_value: core.ArrayValue) -> core.ArrayValu """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" if not bigframes.options.compute.enable_multi_query_execution: return array_value - print(f"Query has complexity: {array_value.node.planning_complexity}") node = array_value.node - if node.planning_complexity < COMPLEXITY_SOFT_LIMIT: + if node.planning_complexity < QUERY_COMPLEXITY_LIMIT: return array_value for _ in range(MAX_SUBTREE_FACTORINGS): updated = self._cache_most_complex_subtree(node) if updated is None: - print(f"Could not further factor query: {node.planning_complexity}") return core.ArrayValue(node) else: - print(f"Refactored query has complexity: {node.planning_complexity}") node = updated return core.ArrayValue(node) @@ -1848,8 +1843,8 @@ def _cache_most_complex_subtree( ) -> Optional[nodes.BigFrameNode]: valid_candidates = traversals.count_complex_nodes( node, - min_complexity=(COMPLEXITY_SOFT_LIMIT / 500), - max_complexity=COMPLEXITY_SOFT_LIMIT, + min_complexity=(QUERY_COMPLEXITY_LIMIT / 500), + max_complexity=QUERY_COMPLEXITY_LIMIT, ).items() # Heuristic: subtree_compleixty * (copies of subtree)^2 best_candidate = max( From 9849acf6be2eeabfc9607b999368fd5a4acea2be Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 3 Apr 2024 23:01:13 +0000 Subject: [PATCH 6/7] update complexity heuristic --- bigframes/core/nodes.py | 37 ++++++++++++++++++++++++---- bigframes/session/__init__.py | 1 + tests/system/small/test_dataframe.py | 15 +++++++++++ 3 files changed, 48 insertions(+), 5 deletions(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index aee28fd221..27f9a5fc7e 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -110,7 +110,7 @@ def schema(self) -> schemata.ArraySchema: @abc.abstractmethod def variables_introduced(self) -> int: """ - Defines the number of variables generated by the current node. Used to estimate query planning complexity. + Defines number of values created by the current node. Helps represent the "width" of a query """ ... @@ -121,6 +121,13 @@ def relation_ops_created(self) -> int: """ return 1 + @property + def joins(self) -> bool: + """ + Defines whether the node joins data. + """ + return False + @functools.cached_property def total_variables(self) -> int: return self.variables_introduced + sum( @@ -133,10 +140,18 @@ def total_relational_ops(self) -> int: map(lambda x: x.total_relational_ops, self.child_nodes) ) + @functools.cached_property + def total_joins(self) -> int: + return int(self.joins) + sum(map(lambda x: x.total_joins, self.child_nodes)) + @property def planning_complexity(self) -> int: - """Heuristic measure of planning complexity. Used to determine when to decompose overly complex computations.""" - return self.total_variables * (self.total_relational_ops**2) + """ + Empirical heuristic measure of planning complexity. + + Used to determine when to decompose overly complex computations. May require tuning. + """ + return self.total_variables * self.total_relational_ops * (1 + self.total_joins) @abc.abstractmethod def transform_children( @@ -207,6 +222,10 @@ def variables_introduced(self) -> int: """Defines the number of variables generated by the current node. Used to estimate query planning complexity.""" return OVERHEAD_VARIABLES + @property + def joins(self) -> bool: + return True + def transform_children( self, t: Callable[[BigFrameNode], BigFrameNode] ) -> BigFrameNode: @@ -245,7 +264,7 @@ def schema(self) -> schemata.ArraySchema: @functools.cached_property def variables_introduced(self) -> int: """Defines the number of variables generated by the current node. Used to estimate query planning complexity.""" - return OVERHEAD_VARIABLES + return len(self.schema.items) + OVERHEAD_VARIABLES def transform_children( self, t: Callable[[BigFrameNode], BigFrameNode] @@ -346,6 +365,10 @@ def schema(self) -> schemata.ArraySchema: schemata.SchemaItem(self.col_id, bigframes.dtypes.INT_DTYPE) ) + @property + def relation_ops_created(self) -> int: + return 2 + @functools.cached_property def variables_introduced(self) -> int: return 1 @@ -537,7 +560,7 @@ def variables_introduced(self) -> int: @property def relation_ops_created(self) -> int: # Assume that if not reprojecting, that there is a sequence of window operations sharing the same window - return 0 if self.skip_reproject_unsafe else 2 + return 0 if self.skip_reproject_unsafe else 4 # TODO: Remove this op @@ -581,6 +604,10 @@ def row_preserving(self) -> bool: def non_local(self) -> bool: return True + @property + def joins(self) -> bool: + return True + @functools.cached_property def schema(self) -> schemata.ArraySchema: def infer_dtype( diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 686836c868..c7500d220e 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1841,6 +1841,7 @@ def _simplify_with_caching(self, array_value: core.ArrayValue) -> core.ArrayValu def _cache_most_complex_subtree( self, node: nodes.BigFrameNode ) -> Optional[nodes.BigFrameNode]: + # TODO: If query fails, retry with lower complexity limit valid_candidates = traversals.count_complex_nodes( node, min_complexity=(QUERY_COMPLEXITY_LIMIT / 500), diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index b34cc7d315..e2eb80ddfc 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -4105,6 +4105,21 @@ def test_recursion_limit(scalars_df_index): scalars_df_index.to_pandas() +def test_query_complexity_repeated_joins(scalars_df_index, scalars_pandas_df_index): + pd_df = scalars_pandas_df_index + bf_df = scalars_df_index + for _ in range(6): + # recursively join, resuling in 2^6 - 1 = 63 joins + pd_df = pd_df.merge(pd_df, on="int64_col").head(30) + pd_df = pd_df[pd_df.columns[:50]] + bf_df = bf_df.merge(bf_df, on="int64_col").head(30) + bf_df = bf_df[bf_df.columns[:50]] + + bf_result = bf_df.to_pandas() + pd_result = pd_df + assert_pandas_df_equal(bf_result, pd_result, check_index_type=False) + + def test_query_complexity_repeated_subtrees(scalars_df_index, scalars_pandas_df_index): # Recursively union the data, if fully inlined has 10^5 identical root tables. pd_df = scalars_pandas_df_index From 84b1dc61ce5addf7ac0ffe9ea20751ca4484de1f Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Thu, 4 Apr 2024 17:21:57 +0000 Subject: [PATCH 7/7] set multi-query execution to disabled by default --- bigframes/_config/compute_options.py | 2 +- tests/system/conftest.py | 8 ++++++++ tests/system/small/test_dataframe.py | 16 +++++++++++----- 3 files changed, 20 insertions(+), 6 deletions(-) diff --git a/bigframes/_config/compute_options.py b/bigframes/_config/compute_options.py index 3906138ec7..2b849c558a 100644 --- a/bigframes/_config/compute_options.py +++ b/bigframes/_config/compute_options.py @@ -47,4 +47,4 @@ class ComputeOptions: """ maximum_bytes_billed: Optional[int] = None - enable_multi_query_execution: bool = True + enable_multi_query_execution: bool = False diff --git a/tests/system/conftest.py b/tests/system/conftest.py index a108ff4a8e..70ff6eee39 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -953,6 +953,14 @@ def restore_sampling_settings(): bigframes.options.sampling.max_download_size = max_download_size +@pytest.fixture() +def with_multiquery_execution(): + original_setting = bigframes.options.compute.enable_multi_query_execution + bigframes.options.compute.enable_multi_query_execution = True + yield + bigframes.options.compute.enable_multi_query_execution = original_setting + + @pytest.fixture() def weird_strings_pd(): df = pd.DataFrame( diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index 6b5df10312..5d6a859c11 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -4145,22 +4145,26 @@ def test_recursion_limit(scalars_df_index): scalars_df_index.to_pandas() -def test_query_complexity_repeated_joins(scalars_df_index, scalars_pandas_df_index): +def test_query_complexity_repeated_joins( + scalars_df_index, scalars_pandas_df_index, with_multiquery_execution +): pd_df = scalars_pandas_df_index bf_df = scalars_df_index for _ in range(6): # recursively join, resuling in 2^6 - 1 = 63 joins pd_df = pd_df.merge(pd_df, on="int64_col").head(30) - pd_df = pd_df[pd_df.columns[:50]] + pd_df = pd_df[pd_df.columns[:20]] bf_df = bf_df.merge(bf_df, on="int64_col").head(30) - bf_df = bf_df[bf_df.columns[:50]] + bf_df = bf_df[bf_df.columns[:20]] bf_result = bf_df.to_pandas() pd_result = pd_df assert_pandas_df_equal(bf_result, pd_result, check_index_type=False) -def test_query_complexity_repeated_subtrees(scalars_df_index, scalars_pandas_df_index): +def test_query_complexity_repeated_subtrees( + scalars_df_index, scalars_pandas_df_index, with_multiquery_execution +): # Recursively union the data, if fully inlined has 10^5 identical root tables. pd_df = scalars_pandas_df_index bf_df = scalars_df_index @@ -4177,7 +4181,9 @@ def test_query_complexity_repeated_subtrees(scalars_df_index, scalars_pandas_df_ # See: https://github.com/python/cpython/issues/112282 reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.", ) -def test_query_complexity_repeated_analytic(scalars_df_index, scalars_pandas_df_index): +def test_query_complexity_repeated_analytic( + scalars_df_index, scalars_pandas_df_index, with_multiquery_execution +): bf_df = scalars_df_index[["int64_col", "int64_too"]] pd_df = scalars_pandas_df_index[["int64_col", "int64_too"]] # Uses LAG analytic operator, each in a new SELECT