Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Add multi-query execution capability for complex dataframes #427

Merged
merged 13 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion bigframes/_config/compute_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ class ComputeOptions:
bytes billed beyond this limit will fail (without incurring a
charge). If unspecified, this will be set to your project default.
See `maximum_bytes_billed <https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJobConfig#google_cloud_bigquery_job_QueryJobConfig_maximum_bytes_billed>`_.

enable_multi_query_execution (bool, Options):
If enabled, large queries may be factored into multiple smaller queries
in order to avoid generating queries that are too complex for the query
engine to handle. However this comes at the cost of increase cost and latency.
"""

maximum_bytes_billed: Optional[int] = None
enable_multi_query_execution: bool = True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, let's make this false by default for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

4 changes: 4 additions & 0 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,10 @@ def cached(self, *, optimize_offsets=False, force: bool = False) -> Block:
expr = self.session._cache_with_cluster_cols(
self.expr, cluster_cols=self.index_columns
)
return self.swap_array_expr(expr)

def swap_array_expr(self, expr: core.ArrayValue) -> Block:
# TODO: Validate schema unchanged
return Block(
expr,
index_columns=self.index_columns,
Expand Down
9 changes: 9 additions & 0 deletions bigframes/core/expression.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ def bind_all_variables(self, bindings: Mapping[str, Expression]) -> Expression:
def is_bijective(self) -> bool:
return False

@property
def is_identity(self) -> bool:
"""True for identity operation that does not transform input."""
return False


@dataclasses.dataclass(frozen=True)
class ScalarConstantExpression(Expression):
Expand Down Expand Up @@ -173,6 +178,10 @@ def bind_all_variables(self, bindings: Mapping[str, Expression]) -> Expression:
def is_bijective(self) -> bool:
return True

@property
def is_identity(self) -> bool:
return True


@dataclasses.dataclass(frozen=True)
class OpExpression(Expression):
Expand Down
199 changes: 197 additions & 2 deletions bigframes/core/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
from __future__ import annotations

import abc
from dataclasses import dataclass, field, fields
from dataclasses import dataclass, field, fields, replace
import functools
import itertools
import typing
from typing import Tuple
from typing import Callable, Tuple

import pandas

Expand All @@ -39,6 +39,10 @@
import bigframes.session


# A fixed number of variable to assume for overhead on some operations
OVERHEAD_VARIABLES = 5


@dataclass(frozen=True)
class BigFrameNode:
"""
Expand Down Expand Up @@ -102,6 +106,60 @@ def roots(self) -> typing.Set[BigFrameNode]:
def schema(self) -> schemata.ArraySchema:
...

@property
@abc.abstractmethod
def variables_introduced(self) -> int:
"""
Defines number of values created by the current node. Helps represent the "width" of a query
"""
...

@property
def relation_ops_created(self) -> int:
"""
Defines the number of relational ops generated by the current node. Used to estimate query planning complexity.
"""
return 1

@property
def joins(self) -> bool:
"""
Defines whether the node joins data.
"""
return False

@functools.cached_property
def total_variables(self) -> int:
return self.variables_introduced + sum(
map(lambda x: x.total_variables, self.child_nodes)
)

@functools.cached_property
def total_relational_ops(self) -> int:
return self.relation_ops_created + sum(
map(lambda x: x.total_relational_ops, self.child_nodes)
)

@functools.cached_property
def total_joins(self) -> int:
return int(self.joins) + sum(map(lambda x: x.total_joins, self.child_nodes))

@property
def planning_complexity(self) -> int:
"""
Empirical heuristic measure of planning complexity.

Used to determine when to decompose overly complex computations. May require tuning.
"""
return self.total_variables * self.total_relational_ops * (1 + self.total_joins)

@abc.abstractmethod
def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
"""Apply a function to each child node."""
...


@dataclass(frozen=True)
class UnaryNode(BigFrameNode):
Expand All @@ -115,6 +173,11 @@ def child_nodes(self) -> typing.Sequence[BigFrameNode]:
def schema(self) -> schemata.ArraySchema:
return self.child.schema

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return replace(self, child=t(self.child))


@dataclass(frozen=True)
class JoinNode(BigFrameNode):
Expand Down Expand Up @@ -154,6 +217,22 @@ def join_mapping_to_schema_item(mapping: JoinColumnMapping):
)
return schemata.ArraySchema(items)

@functools.cached_property
def variables_introduced(self) -> int:
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
return OVERHEAD_VARIABLES

@property
def joins(self) -> bool:
return True

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return replace(
self, left_child=t(self.left_child), right_child=t(self.right_child)
)


@dataclass(frozen=True)
class ConcatNode(BigFrameNode):
Expand Down Expand Up @@ -182,6 +261,16 @@ def schema(self) -> schemata.ArraySchema:
)
return schemata.ArraySchema(items)

@functools.cached_property
def variables_introduced(self) -> int:
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
return len(self.schema.items) + OVERHEAD_VARIABLES

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return replace(self, children=tuple(t(child) for child in self.children))


# Input Nodex
@dataclass(frozen=True)
Expand All @@ -201,6 +290,16 @@ def roots(self) -> typing.Set[BigFrameNode]:
def schema(self) -> schemata.ArraySchema:
return self.data_schema

@functools.cached_property
def variables_introduced(self) -> int:
"""Defines the number of variables generated by the current node. Used to estimate query planning complexity."""
return len(self.schema.items) + 1

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return self


# TODO: Refactor to take raw gbq object reference
@dataclass(frozen=True)
Expand Down Expand Up @@ -233,6 +332,20 @@ def schema(self) -> schemata.ArraySchema:
)
return schemata.ArraySchema(items)

@functools.cached_property
def variables_introduced(self) -> int:
return len(self.columns) + len(self.hidden_ordering_columns)

@property
def relation_ops_created(self) -> int:
# Assume worst case, where readgbq actually has baked in analytic operation to generate index
return 2

def transform_children(
self, t: Callable[[BigFrameNode], BigFrameNode]
) -> BigFrameNode:
return self


# Unary nodes
@dataclass(frozen=True)
Expand All @@ -252,6 +365,14 @@ def schema(self) -> schemata.ArraySchema:
schemata.SchemaItem(self.col_id, bigframes.dtypes.INT_DTYPE)
)

@property
def relation_ops_created(self) -> int:
return 2

@functools.cached_property
def variables_introduced(self) -> int:
return 1


@dataclass(frozen=True)
class FilterNode(UnaryNode):
Expand All @@ -264,6 +385,10 @@ def row_preserving(self) -> bool:
def __hash__(self):
return self._node_hash

@property
def variables_introduced(self) -> int:
return 1


@dataclass(frozen=True)
class OrderByNode(UnaryNode):
Expand All @@ -281,6 +406,15 @@ def __post_init__(self):
def __hash__(self):
return self._node_hash

@property
def variables_introduced(self) -> int:
return 0

@property
def relation_ops_created(self) -> int:
# Doesnt directly create any relational operations
return 0


@dataclass(frozen=True)
class ReversedNode(UnaryNode):
Expand All @@ -290,6 +424,15 @@ class ReversedNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def variables_introduced(self) -> int:
return 0

@property
def relation_ops_created(self) -> int:
# Doesnt directly create any relational operations
return 0


@dataclass(frozen=True)
class ProjectionNode(UnaryNode):
Expand All @@ -315,6 +458,12 @@ def schema(self) -> schemata.ArraySchema:
)
return schemata.ArraySchema(items)

@property
def variables_introduced(self) -> int:
# ignore passthrough expressions
new_vars = sum(1 for i in self.assignments if not i[0].is_identity)
return new_vars


# TODO: Merge RowCount into Aggregate Node?
# Row count can be compute from table metadata sometimes, so it is a bit special.
Expand All @@ -334,6 +483,10 @@ def schema(self) -> schemata.ArraySchema:
(schemata.SchemaItem("count", bigframes.dtypes.INT_DTYPE),)
)

@property
def variables_introduced(self) -> int:
return 1


@dataclass(frozen=True)
class AggregateNode(UnaryNode):
Expand Down Expand Up @@ -367,6 +520,10 @@ def schema(self) -> schemata.ArraySchema:
)
return schemata.ArraySchema(tuple([*by_items, *agg_items]))

@property
def variables_introduced(self) -> int:
return len(self.aggregations) + len(self.by_column_ids)


@dataclass(frozen=True)
class WindowOpNode(UnaryNode):
Expand Down Expand Up @@ -396,12 +553,31 @@ def schema(self) -> schemata.ArraySchema:
schemata.SchemaItem(self.output_name, new_item_dtype)
)

@property
def variables_introduced(self) -> int:
return 1

@property
def relation_ops_created(self) -> int:
# Assume that if not reprojecting, that there is a sequence of window operations sharing the same window
return 0 if self.skip_reproject_unsafe else 4


# TODO: Remove this op
@dataclass(frozen=True)
class ReprojectOpNode(UnaryNode):
def __hash__(self):
return self._node_hash

@property
def variables_introduced(self) -> int:
return 0

@property
def relation_ops_created(self) -> int:
# This op is not a real transformation, just a hint to the sql generator
return 0


@dataclass(frozen=True)
class UnpivotNode(UnaryNode):
Expand All @@ -428,6 +604,10 @@ def row_preserving(self) -> bool:
def non_local(self) -> bool:
return True

@property
def joins(self) -> bool:
return True

@functools.cached_property
def schema(self) -> schemata.ArraySchema:
def infer_dtype(
Expand Down Expand Up @@ -469,6 +649,17 @@ def infer_dtype(
]
return schemata.ArraySchema((*index_items, *value_items, *passthrough_items))

@property
def variables_introduced(self) -> int:
return (
len(self.schema.items) - len(self.passthrough_columns) + OVERHEAD_VARIABLES
)

@property
def relation_ops_created(self) -> int:
# Unpivot is essentially a cross join and a projection.
return 2


@dataclass(frozen=True)
class RandomSampleNode(UnaryNode):
Expand All @@ -484,3 +675,7 @@ def row_preserving(self) -> bool:

def __hash__(self):
return self._node_hash

@property
def variables_introduced(self) -> int:
return 1
Loading