From d4cf9da8fef9c79752b35a8adf91fad1f37a910d Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 3 Jan 2025 22:58:41 +0000 Subject: [PATCH 1/3] fix: Workaround functools cache recursion limits for python 3.12+ --- bigframes/core/compile/compiler.py | 6 +- bigframes/core/nodes.py | 70 ++-- tests/system/small/test_dataframe.py | 7 - .../bigframes_vendored/cpython/functools.py | 364 ++++++++++++++++++ 4 files changed, 414 insertions(+), 33 deletions(-) create mode 100644 third_party/bigframes_vendored/cpython/functools.py diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 78dff26228..a5c259b4fb 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -39,6 +39,9 @@ import bigframes.core.ordering as bf_ordering import bigframes.core.rewrite as rewrites +# Modified version of functools cache methods to workaround https://github.com/python/cpython/issues/112215 +import third_party.bigframes_vendored.cpython.functools as vendored_functools + if typing.TYPE_CHECKING: import bigframes.core import bigframes.session @@ -112,8 +115,7 @@ def compile_ordered_ir(self, node: nodes.BigFrameNode) -> compiled.OrderedIR: def compile_unordered_ir(self, node: nodes.BigFrameNode) -> compiled.UnorderedIR: return typing.cast(compiled.UnorderedIR, self.compile_node(node, False)) - # TODO: Remove cache when schema no longer requires compilation to derive schema (and therefor only compiles for execution) - @functools.lru_cache(maxsize=5000) + @vendored_functools.lru_cache(maxsize=5000) def compile_node( self, node: nodes.BigFrameNode, ordered: bool = True ) -> compiled.UnorderedIR | compiled.OrderedIR: diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 244f1e7751..91a1635948 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -17,7 +17,6 @@ import abc import dataclasses import datetime -import functools import itertools import typing from typing import Callable, cast, Iterable, Mapping, Optional, Sequence, Tuple @@ -35,6 +34,9 @@ import bigframes.dtypes import bigframes.operations.aggregations as agg_ops +# Modified version of functools cache methods to workaround https://github.com/python/cpython/issues/112215 +import third_party.bigframes_vendored.cpython.functools as vendored_functools + if typing.TYPE_CHECKING: import bigframes.core.ordering as orderings import bigframes.session @@ -101,7 +103,8 @@ def node_defined_ids(self) -> Tuple[bfet_ids.ColumnId, ...]: """The variables defined in this node (as opposed to by child nodes).""" ... - @functools.cached_property + @property + @vendored_functools.cache def session(self): sessions = [] for child in self.child_nodes: @@ -118,7 +121,7 @@ def _validate(self): """Validate the local data in the node.""" return - @functools.cache + @vendored_functools.cache def validate_tree(self) -> bool: for child in self.child_nodes: child.validate_tree() @@ -147,9 +150,9 @@ def __eq__(self, other) -> bool: return self._as_tuple() == other._as_tuple() # BigFrameNode trees can be very deep so its important avoid recalculating the hash from scratch - # Each subclass of BigFrameNode should use this property to implement __hash__ # The default dataclass-generated __hash__ method is not cached - @functools.cached_property + @property + @vendored_functools.cache def _cached_hash(self): return hash(self._as_tuple()) @@ -209,23 +212,27 @@ def explicitly_ordered(self) -> bool: """ ... - @functools.cached_property + @property + @vendored_functools.cache def total_variables(self) -> int: return self.variables_introduced + sum( map(lambda x: x.total_variables, self.child_nodes) ) - @functools.cached_property + @property + @vendored_functools.cache def total_relational_ops(self) -> int: return self.relation_ops_created + sum( map(lambda x: x.total_relational_ops, self.child_nodes) ) - @functools.cached_property + @property + @vendored_functools.cache def total_joins(self) -> int: return int(self.joins) + sum(map(lambda x: x.total_joins, self.child_nodes)) - @functools.cached_property + @property + @vendored_functools.cache def schema(self) -> schemata.ArraySchema: # TODO: Make schema just a view on fields return schemata.ArraySchema( @@ -264,7 +271,8 @@ def defines_namespace(self) -> bool: """ return False - @functools.cached_property + @property + @vendored_functools.cache def defined_variables(self) -> set[str]: """Full set of variables defined in the namespace, even if not selected.""" self_defined_variables = set(self.schema.names) @@ -277,7 +285,8 @@ def defined_variables(self) -> set[str]: def get_type(self, id: bfet_ids.ColumnId) -> bigframes.dtypes.Dtype: return self._dtype_lookup[id] - @functools.cached_property + @property + @vendored_functools.cache def _dtype_lookup(self): return {field.id: field.dtype for field in self.fields} @@ -419,7 +428,8 @@ def explicitly_ordered(self) -> bool: def fields(self) -> Iterable[Field]: return itertools.chain(self.left_child.fields, self.right_child.fields) - @functools.cached_property + @property + @vendored_functools.cache def variables_introduced(self) -> int: """Defines the number of variables generated by the current node. Used to estimate query planning complexity.""" return OVERHEAD_VARIABLES @@ -511,7 +521,8 @@ def fields(self) -> Iterable[Field]: for id, field in zip(self.output_ids, self.children[0].fields) ) - @functools.cached_property + @property + @vendored_functools.cache def variables_introduced(self) -> int: """Defines the number of variables generated by the current node. Used to estimate query planning complexity.""" return len(self.schema.items) + OVERHEAD_VARIABLES @@ -579,11 +590,13 @@ def order_ambiguous(self) -> bool: def explicitly_ordered(self) -> bool: return True - @functools.cached_property + @property + @vendored_functools.cache def fields(self) -> Iterable[Field]: return (Field(self.output_id, next(iter(self.start.fields)).dtype),) - @functools.cached_property + @property + @vendored_functools.cache def variables_introduced(self) -> int: """Defines the number of variables generated by the current node. Used to estimate query planning complexity.""" return len(self.schema.items) + OVERHEAD_VARIABLES @@ -840,7 +853,8 @@ def order_ambiguous(self) -> bool: def explicitly_ordered(self) -> bool: return self.source.ordering is not None - @functools.cached_property + @property + @vendored_functools.cache def variables_introduced(self) -> int: return len(self.scan_list.items) + 1 @@ -902,7 +916,8 @@ def fields(self) -> Iterable[Field]: def relation_ops_created(self) -> int: return 2 - @functools.cached_property + @property + @vendored_functools.cache def variables_introduced(self) -> int: return 1 @@ -1068,7 +1083,8 @@ def _validate(self): if ref.id not in set(self.child.ids): raise ValueError(f"Reference to column not in child: {ref.id}") - @functools.cached_property + @property + @vendored_functools.cache def fields(self) -> Iterable[Field]: return tuple( Field(output, self.child.get_type(ref.id)) @@ -1139,7 +1155,8 @@ def _validate(self): # Cannot assign to existing variables - append only! assert all(name not in self.child.schema.names for _, name in self.assignments) - @functools.cached_property + @property + @vendored_functools.cache def added_fields(self) -> Tuple[Field, ...]: input_types = self.child._dtype_lookup return tuple( @@ -1252,7 +1269,8 @@ def row_preserving(self) -> bool: def non_local(self) -> bool: return True - @functools.cached_property + @property + @vendored_functools.cache def fields(self) -> Iterable[Field]: by_items = ( Field(ref.id, self.child.get_type(ref.id)) for ref in self.by_column_ids @@ -1358,7 +1376,8 @@ def relation_ops_created(self) -> int: def row_count(self) -> Optional[int]: return self.child.row_count - @functools.cached_property + @property + @vendored_functools.cache def added_field(self) -> Field: input_type = self.child.get_type(self.column_name.id) new_item_dtype = self.op.output_type(input_type) @@ -1459,7 +1478,8 @@ def fields(self) -> Iterable[Field]: def relation_ops_created(self) -> int: return 3 - @functools.cached_property + @property + @vendored_functools.cache def variables_introduced(self) -> int: return len(self.column_ids) + 1 @@ -1489,6 +1509,8 @@ def remap_refs( # Tree operators + + def top_down( root: BigFrameNode, transform: Callable[[BigFrameNode], BigFrameNode], @@ -1507,7 +1529,7 @@ def top_down_internal(root: BigFrameNode) -> BigFrameNode: if memoize: # MUST reassign to the same name or caching won't work recursively - top_down_internal = functools.cache(top_down_internal) + top_down_internal = vendored_functools.cache(top_down_internal) result = top_down_internal(root) if validate: @@ -1533,7 +1555,7 @@ def bottom_up_internal(root: BigFrameNode) -> BigFrameNode: if memoize: # MUST reassign to the same name or caching won't work recursively - bottom_up_internal = functools.cache(bottom_up_internal) + bottom_up_internal = vendored_functools.cache(bottom_up_internal) result = bottom_up_internal(root) if validate: diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index bae71b33be..de5993a36c 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -4885,13 +4885,6 @@ def test_df_dot_operator_series( ) -# TODO(tswast): We may be able to re-enable this test after we break large -# queries up in https://github.com/googleapis/python-bigquery-dataframes/pull/427 -@pytest.mark.skipif( - sys.version_info >= (3, 12), - # See: https://github.com/python/cpython/issues/112282 - reason="setrecursionlimit has no effect on the Python C stack since Python 3.12.", -) def test_recursion_limit(scalars_df_index): scalars_df_index = scalars_df_index[["int64_too", "int64_col", "float64_col"]] for i in range(400): diff --git a/third_party/bigframes_vendored/cpython/functools.py b/third_party/bigframes_vendored/cpython/functools.py new file mode 100644 index 0000000000..0e692616b8 --- /dev/null +++ b/third_party/bigframes_vendored/cpython/functools.py @@ -0,0 +1,364 @@ +# mypy: ignore-errors +""" +functools.py - Tools for working with functions and callable objects +""" +__all__ = [ + "update_wrapper", + "WRAPPER_ASSIGNMENTS", + "WRAPPER_UPDATES", + "cache", + "lru_cache", + "cached_property", +] + +from collections import namedtuple +import sys +from threading import RLock + +# import weakref # Deferred to single_dispatch() +from types import GenericAlias + +################################################################################ +### update_wrapper() and wraps() decorator +################################################################################ + +# update_wrapper() and wraps() are tools to help write +# wrapper functions that can handle naive introspection + +WRAPPER_ASSIGNMENTS = ( + "__module__", + "__name__", + "__qualname__", + "__doc__", + "__annotate__", + "__type_params__", +) +WRAPPER_UPDATES = ("__dict__",) + + +def update_wrapper( + wrapper, wrapped, assigned=WRAPPER_ASSIGNMENTS, updated=WRAPPER_UPDATES +): + """Update a wrapper function to look like the wrapped function + + wrapper is the function to be updated + wrapped is the original function + assigned is a tuple naming the attributes assigned directly + from the wrapped function to the wrapper function (defaults to + functools.WRAPPER_ASSIGNMENTS) + updated is a tuple naming the attributes of the wrapper that + are updated with the corresponding attribute from the wrapped + function (defaults to functools.WRAPPER_UPDATES) + """ + for attr in assigned: + try: + value = getattr(wrapped, attr) + except AttributeError: + pass + else: + setattr(wrapper, attr, value) + for attr in updated: + getattr(wrapper, attr).update(getattr(wrapped, attr, {})) + # Issue #17482: set __wrapped__ last so we don't inadvertently copy it + # from the wrapped function when updating __dict__ + wrapper.__wrapped__ = wrapped + # Return the wrapper so this can be used as a decorator via partial() + return wrapper + + +################################################################################ +### LRU Cache function decorator +################################################################################ + +_CacheInfo = namedtuple( + "CacheInfo", ["hits", "misses", "maxsize", "currsize"] +) # typing: ignore + + +class _HashedSeq(list): + """This class guarantees that hash() will be called no more than once + per element. This is important because the lru_cache() will hash + the key multiple times on a cache miss. + + """ + + __slots__ = "hashvalue" + + def __init__(self, tup, hash=hash): + self[:] = tup + self.hashvalue = hash(tup) + + def __hash__(self): + return self.hashvalue + + +def _make_key( + args, + kwds, + typed, + kwd_mark=(object(),), + fasttypes={int, str}, + tuple=tuple, + type=type, + len=len, +): + """Make a cache key from optionally typed positional and keyword arguments + + The key is constructed in a way that is flat as possible rather than + as a nested structure that would take more memory. + + If there is only a single argument and its data type is known to cache + its hash value, then that argument is returned without a wrapper. This + saves space and improves lookup speed. + + """ + # All of code below relies on kwds preserving the order input by the user. + # Formerly, we sorted() the kwds before looping. The new way is *much* + # faster; however, it means that f(x=1, y=2) will now be treated as a + # distinct call from f(y=2, x=1) which will be cached separately. + key = args + if kwds: + key += kwd_mark + for item in kwds.items(): + key += item + if typed: + key += tuple(type(v) for v in args) + if kwds: + key += tuple(type(v) for v in kwds.values()) + elif len(key) == 1 and type(key[0]) in fasttypes: + return key[0] + return _HashedSeq(key) + + +def lru_cache(maxsize=128, typed=False): + """Least-recently-used cache decorator. + + If *maxsize* is set to None, the LRU features are disabled and the cache + can grow without bound. + + If *typed* is True, arguments of different types will be cached separately. + For example, f(decimal.Decimal("3.0")) and f(3.0) will be treated as + distinct calls with distinct results. Some types such as str and int may + be cached separately even when typed is false. + + Arguments to the cached function must be hashable. + + View the cache statistics named tuple (hits, misses, maxsize, currsize) + with f.cache_info(). Clear the cache and statistics with f.cache_clear(). + Access the underlying function with f.__wrapped__. + + See: https://en.wikipedia.org/wiki/Cache_replacement_policies#Least_recently_used_(LRU) + + """ + + # Users should only access the lru_cache through its public API: + # cache_info, cache_clear, and f.__wrapped__ + # The internals of the lru_cache are encapsulated for thread safety and + # to allow the implementation to change (including a possible C version). + + if isinstance(maxsize, int): + # Negative maxsize is treated as 0 + if maxsize < 0: + maxsize = 0 + elif callable(maxsize) and isinstance(typed, bool): + # The user_function was passed in directly via the maxsize argument + user_function, maxsize = maxsize, 128 + wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo) + wrapper.cache_parameters = lambda: {"maxsize": maxsize, "typed": typed} + return update_wrapper(wrapper, user_function) + elif maxsize is not None: + raise TypeError("Expected first argument to be an integer, a callable, or None") + + def decorating_function(user_function): + wrapper = _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo) + wrapper.cache_parameters = lambda: {"maxsize": maxsize, "typed": typed} + return update_wrapper(wrapper, user_function) + + return decorating_function + + +def _lru_cache_wrapper(user_function, maxsize, typed, _CacheInfo): + # Constants shared by all lru cache instances: + sentinel = object() # unique object used to signal cache misses + make_key = _make_key # build a key from the function arguments + PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 # names for the link fields + + cache = {} # type: ignore + hits = misses = 0 + full = False + cache_get = cache.get # bound method to lookup a key or return None + cache_len = cache.__len__ # get cache size without calling len() + lock = RLock() # because linkedlist updates aren't threadsafe + root = [] # root of the circular doubly linked list type: ignore + root[:] = [root, root, None, None] # initialize by pointing to self + + if maxsize == 0: + + def wrapper(*args, **kwds): + # No caching -- just a statistics update + nonlocal misses + misses += 1 + result = user_function(*args, **kwds) + return result + + elif maxsize is None: + + def wrapper(*args, **kwds): + # Simple caching without ordering or size limit + nonlocal hits, misses + key = make_key(args, kwds, typed) + result = cache_get(key, sentinel) + if result is not sentinel: + hits += 1 + return result + misses += 1 + result = user_function(*args, **kwds) + cache[key] = result + return result + + else: + + def wrapper(*args, **kwds): + # Size limited caching that tracks accesses by recency + nonlocal root, hits, misses, full + key = make_key(args, kwds, typed) + with lock: + link = cache_get(key) + if link is not None: + # Move the link to the front of the circular queue + link_prev, link_next, _key, result = link + link_prev[NEXT] = link_next + link_next[PREV] = link_prev + last = root[PREV] + last[NEXT] = root[PREV] = link + link[PREV] = last + link[NEXT] = root + hits += 1 + return result + misses += 1 + result = user_function(*args, **kwds) + with lock: + if key in cache: + # Getting here means that this same key was added to the + # cache while the lock was released. Since the link + # update is already done, we need only return the + # computed result and update the count of misses. + pass + elif full: + # Use the old root to store the new key and result. + oldroot = root + oldroot[KEY] = key + oldroot[RESULT] = result + # Empty the oldest link and make it the new root. + # Keep a reference to the old key and old result to + # prevent their ref counts from going to zero during the + # update. That will prevent potentially arbitrary object + # clean-up code (i.e. __del__) from running while we're + # still adjusting the links. + root = oldroot[NEXT] + oldkey = root[KEY] + oldresult = root[RESULT] # noqa + root[KEY] = root[RESULT] = None + # Now update the cache dictionary. + del cache[oldkey] + # Save the potentially reentrant cache[key] assignment + # for last, after the root and links have been put in + # a consistent state. + cache[key] = oldroot + else: + # Put result in a new link at the front of the queue. + last = root[PREV] + link = [last, root, key, result] + last[NEXT] = root[PREV] = cache[key] = link + # Use the cache_len bound method instead of the len() function + # which could potentially be wrapped in an lru_cache itself. + full = cache_len() >= maxsize + return result + + def cache_info(): + """Report cache statistics""" + with lock: + return _CacheInfo(hits, misses, maxsize, cache_len()) + + def cache_clear(): + """Clear the cache and cache statistics""" + nonlocal hits, misses, full + with lock: + cache.clear() + root[:] = [root, root, None, None] + hits = misses = 0 + full = False + + wrapper.cache_info = cache_info # type: ignore + wrapper.cache_clear = cache_clear # type: ignore + return wrapper + + +################################################################################ +### cache -- simplified access to the infinity cache +################################################################################ + + +def cache(user_function, /): + 'Simple lightweight unbounded cache. Sometimes called "memoize".' + return lru_cache(maxsize=None)(user_function) + + +################################################################################ +### cached_property() - property result cached as instance attribute +################################################################################ + +_NOT_FOUND = object() + + +class cached_property: + def __init__(self, func): + self.func = func + self.attrname = None + self.__doc__ = func.__doc__ + self.__module__ = func.__module__ + + def __set_name__(self, owner, name): + if self.attrname is None: + self.attrname = name + elif name != self.attrname: + raise TypeError( + "Cannot assign the same cached_property to two different names " + f"({self.attrname!r} and {name!r})." + ) + + def __get__(self, instance, owner=None): + if instance is None: + return self + if self.attrname is None: + raise TypeError( + "Cannot use cached_property instance without calling __set_name__ on it." + ) + try: + cache = instance.__dict__ + except AttributeError: # not all objects have __dict__ (e.g. class defines slots) + msg = ( + f"No '__dict__' attribute on {type(instance).__name__!r} " + f"instance to cache {self.attrname!r} property." + ) + raise TypeError(msg) from None + val = cache.get(self.attrname, _NOT_FOUND) + if val is _NOT_FOUND: + val = self.func(instance) + try: + cache[self.attrname] = val + except TypeError: + msg = ( + f"The '__dict__' attribute on {type(instance).__name__!r} instance " + f"does not support item assignment for caching {self.attrname!r} property." + ) + raise TypeError(msg) from None + return val + + __class_getitem__ = classmethod(GenericAlias) # type: ignore + + +# https://github.com/python/cpython/issues/112215 only affects newer python versions +# for older python versions, use the speedier built-in implementation that uses C +if sys.version_info < (3, 12): + from functools import cache, cached_property, lru_cache # noqa type: ignore From d9126466f40047f451d4eeb06ac9d7a2234a2f79 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 7 Jan 2025 00:17:04 +0000 Subject: [PATCH 2/3] fix functools import --- bigframes/core/compile/compiler.py | 5 ++--- bigframes/core/nodes.py | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index a5c259b4fb..7f787851f0 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -18,6 +18,8 @@ import io import typing +# Modified version of functools cache methods to workaround https://github.com/python/cpython/issues/112215 +import bigframes_vendored.cpython.functools as vendored_functools import bigframes_vendored.ibis.backends.bigquery as ibis_bigquery import bigframes_vendored.ibis.expr.api as ibis_api import bigframes_vendored.ibis.expr.types as ibis_types @@ -39,9 +41,6 @@ import bigframes.core.ordering as bf_ordering import bigframes.core.rewrite as rewrites -# Modified version of functools cache methods to workaround https://github.com/python/cpython/issues/112215 -import third_party.bigframes_vendored.cpython.functools as vendored_functools - if typing.TYPE_CHECKING: import bigframes.core import bigframes.session diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 91a1635948..6167e4d221 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -21,6 +21,8 @@ import typing from typing import Callable, cast, Iterable, Mapping, Optional, Sequence, Tuple +# Modified version of functools cache methods to workaround https://github.com/python/cpython/issues/112215 +import bigframes_vendored.cpython.functools as vendored_functools import google.cloud.bigquery as bq import bigframes.core.expression as ex @@ -34,9 +36,6 @@ import bigframes.dtypes import bigframes.operations.aggregations as agg_ops -# Modified version of functools cache methods to workaround https://github.com/python/cpython/issues/112215 -import third_party.bigframes_vendored.cpython.functools as vendored_functools - if typing.TYPE_CHECKING: import bigframes.core.ordering as orderings import bigframes.session From fcaaf8dbcffda1b8355bc384a1249e643580ef3c Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 7 Jan 2025 00:28:46 +0000 Subject: [PATCH 3/3] use cached_property to prevent infinite hash recursion --- bigframes/core/nodes.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 6167e4d221..9259707d18 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -150,8 +150,7 @@ def __eq__(self, other) -> bool: # BigFrameNode trees can be very deep so its important avoid recalculating the hash from scratch # The default dataclass-generated __hash__ method is not cached - @property - @vendored_functools.cache + @vendored_functools.cached_property def _cached_hash(self): return hash(self._as_tuple())