From 98b573e343d711d30a1da37edbde19e7e78c4cc5 Mon Sep 17 00:00:00 2001 From: Arwa Date: Thu, 5 Sep 2024 14:59:32 -0500 Subject: [PATCH 01/12] docs: add docstring return type section to BigQueryOptions class --- bigframes/_config/bigquery_options.py | 48 +++++++++++++++++++++++++-- 1 file changed, 45 insertions(+), 3 deletions(-) diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index 502f103bb5..1b05afe352 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -101,6 +101,10 @@ def application_name(self) -> Optional[str]: The application name to amend to the user agent sent to Google APIs. The recommended format is ``"application-name/major.minor.patch_version"`` or ``"(gpn:PartnerName;)"`` for official Google partners. + + Returns: + None or str: + Application name as a string if exists; otherwise None. """ return self._application_name @@ -114,7 +118,12 @@ def application_name(self, value: Optional[str]): @property def credentials(self) -> Optional[google.auth.credentials.Credentials]: - """The OAuth2 credentials to use for this client.""" + """The OAuth2 credentials to use for this client. + + Returns: + None or google.auth.credentials.Credentials: + google.auth.credentials.Credentials if exists; otherwise None. + """ return self._credentials @credentials.setter @@ -128,6 +137,10 @@ def location(self) -> Optional[str]: """Default location for job, datasets, and tables. For more information, see https://cloud.google.com/bigquery/docs/locations BigQuery locations. + + Returns: + None or str: + Default location as a string; otherwise None. """ return self._location @@ -140,7 +153,12 @@ def location(self, value: Optional[str]): @property def project(self) -> Optional[str]: - """Google Cloud project ID to use for billing and as the default project.""" + """Google Cloud project ID to use for billing and as the default project. + + Returns: + None or str: + Google Cloud project ID as a string; otherwise None. + """ return self._project @project.setter @@ -163,6 +181,10 @@ def bq_connection(self) -> Optional[str]: If this option isn't provided, or project or location aren't provided, session will use its default project/location/connection_id as default connection. + + Returns: + None or str: + Name of the BigQuery connection as a string; otherwise None. """ return self._bq_connection @@ -181,6 +203,12 @@ def skip_bq_connection_check(self) -> bool: connection (default or user-provided) does not exist, or it does not have necessary permissions set up to support BigQuery DataFrames operations, then a runtime error will be reported. + + Returns: + bool: + A boolean value, where True indicates a BigQuery connection is + not created or the connection does not have necessary + permissions set up; otherwise False. """ return self._skip_bq_connection_check @@ -203,6 +231,11 @@ def use_regional_endpoints(self) -> bool: Requires that ``location`` is set. For example, to connect to asia-northeast1-bigquery.googleapis.com, specify ``location='asia-northeast1'`` and ``use_regional_endpoints=True``. + + Returns: + bool: + A boolean value, where True indicates that a location is set; + otherwise False. """ return self._use_regional_endpoints @@ -235,6 +268,10 @@ def kms_key_name(self) -> Optional[str]: Cloud KMS CryptoKey Encrypter/Decrypter IAM role in the key's project. For more information, see https://cloud.google.com/bigquery/docs/customer-managed-encryption#assign_role Assign the Encrypter/Decrypter. + + Returns: + None or str: + Customer managed encryption key as a string; otherwise None. """ return self._kms_key_name @@ -247,7 +284,12 @@ def kms_key_name(self, value: str): @property def ordering_mode(self) -> Literal["strict", "partial"]: - """Controls whether total row order is always maintained for DataFrame/Series.""" + """Controls whether total row order is always maintained for DataFrame/Series. + + Returns: + Literal: + A literal string value of either strict or partial ordering mode. + """ return self._ordering_mode.value @ordering_mode.setter From c5f94ef11f728f61ace7dbe123f518088742dbad Mon Sep 17 00:00:00 2001 From: Arwa Date: Thu, 5 Sep 2024 14:59:32 -0500 Subject: [PATCH 02/12] docs: add docstring return type section to BigQueryOptions class --- bigframes/_config/bigquery_options.py | 2 +- bigframes/functions/_remote_function_session.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index 1b05afe352..18bc5402dd 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -271,7 +271,7 @@ def kms_key_name(self) -> Optional[str]: Returns: None or str: - Customer managed encryption key as a string; otherwise None. + Name of the customer managed encryption key as a string; otherwise None. """ return self._kms_key_name diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index 0ab19ca353..fba3b5ba41 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -387,7 +387,7 @@ def wrapper(func): # https://docs.python.org/3/library/inspect.html#inspect.signature signature_kwargs: Mapping[str, Any] = {"eval_str": True} else: - signature_kwargs = {} + signature_kwargs = {} # type: ignore signature = inspect.signature( func, From a7985d2bf00c9612761443b0b06eb314e2d2e2b8 Mon Sep 17 00:00:00 2001 From: Arwa Date: Mon, 9 Sep 2024 14:41:37 -0500 Subject: [PATCH 03/12] docs: add docstrings to SamplingOptions class --- bigframes/_config/sampling_options.py | 36 +++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/bigframes/_config/sampling_options.py b/bigframes/_config/sampling_options.py index f4fa0928e1..c0411eba84 100644 --- a/bigframes/_config/sampling_options.py +++ b/bigframes/_config/sampling_options.py @@ -33,14 +33,44 @@ class SamplingOptions: random_state: Optional[int] = None def with_max_download_size(self, max_rows: Optional[int]) -> SamplingOptions: + """Configures the maximum download size for data sampling in MB + + Args: + max_rows (None or int): + An int value for the maximum row size. + + Returns: + SamplingOptions: + The configuration for data sampling. + """ return SamplingOptions( max_rows, self.enable_downsampling, self.sampling_method, self.random_state ) def with_method(self, method: Literal["head", "uniform"]) -> SamplingOptions: + """Configures the downsampling algorithms to be chosen from + + Args: + method (None or Literal): + A literal string value of either head or uniform data sampling method. + + Returns: + SamplingOptions: + The configuration for data sampling. + """ return SamplingOptions(self.max_download_size, True, method, self.random_state) def with_random_state(self, state: Optional[int]) -> SamplingOptions: + """Configures the seed for the uniform downsampling algorithm + + Args: + state (None or int): + An int value for the data sampling random state + + Returns: + SamplingOptions: + The configuration for data sampling. + """ return SamplingOptions( self.max_download_size, self.enable_downsampling, @@ -49,6 +79,12 @@ def with_random_state(self, state: Optional[int]) -> SamplingOptions: ) def with_disabled(self) -> SamplingOptions: + """Configures whether to disable downsampling + + Returns: + SamplingOptions: + The configuration for data sampling. + """ return SamplingOptions( self.max_download_size, False, self.sampling_method, self.random_state ) From 6701dd74b38ce2f1ddbf03faa0e22fcb79b2781a Mon Sep 17 00:00:00 2001 From: Garrett Wu <6505921+GarrettWu@users.noreply.github.com> Date: Thu, 5 Sep 2024 15:04:33 -0700 Subject: [PATCH 04/12] feat: add Gemini 1.5 stable models support (#945) * feat: add Gemini 1.5 stable models support * add to loader --- bigframes/ml/llm.py | 12 ++++++++++-- bigframes/ml/loader.py | 2 ++ tests/system/small/ml/test_llm.py | 26 ++++++++++++++++++++++---- 3 files changed, 34 insertions(+), 6 deletions(-) diff --git a/bigframes/ml/llm.py b/bigframes/ml/llm.py index 35bcf0a33c..a3cd065a55 100644 --- a/bigframes/ml/llm.py +++ b/bigframes/ml/llm.py @@ -55,10 +55,14 @@ _GEMINI_PRO_ENDPOINT = "gemini-pro" _GEMINI_1P5_PRO_PREVIEW_ENDPOINT = "gemini-1.5-pro-preview-0514" _GEMINI_1P5_PRO_FLASH_PREVIEW_ENDPOINT = "gemini-1.5-flash-preview-0514" +_GEMINI_1P5_PRO_001_ENDPOINT = "gemini-1.5-pro-001" +_GEMINI_1P5_FLASH_001_ENDPOINT = "gemini-1.5-flash-001" _GEMINI_ENDPOINTS = ( _GEMINI_PRO_ENDPOINT, _GEMINI_1P5_PRO_PREVIEW_ENDPOINT, _GEMINI_1P5_PRO_FLASH_PREVIEW_ENDPOINT, + _GEMINI_1P5_PRO_001_ENDPOINT, + _GEMINI_1P5_FLASH_001_ENDPOINT, ) _CLAUDE_3_SONNET_ENDPOINT = "claude-3-sonnet" @@ -728,7 +732,7 @@ class GeminiTextGenerator(base.BaseEstimator): Args: model_name (str, Default to "gemini-pro"): - The model for natural language tasks. Accepted values are "gemini-pro", "gemini-1.5-pro-preview-0514" and "gemini-1.5-flash-preview-0514". Default to "gemini-pro". + The model for natural language tasks. Accepted values are "gemini-pro", "gemini-1.5-pro-preview-0514", "gemini-1.5-flash-preview-0514", "gemini-1.5-pro-001" and "gemini-1.5-flash-001". Default to "gemini-pro". .. note:: "gemini-1.5-pro-preview-0514" and "gemini-1.5-flash-preview-0514" is subject to the "Pre-GA Offerings Terms" in the General Service Terms section of the @@ -750,7 +754,11 @@ def __init__( self, *, model_name: Literal[ - "gemini-pro", "gemini-1.5-pro-preview-0514", "gemini-1.5-flash-preview-0514" + "gemini-pro", + "gemini-1.5-pro-preview-0514", + "gemini-1.5-flash-preview-0514", + "gemini-1.5-pro-001", + "gemini-1.5-flash-001", ] = "gemini-pro", session: Optional[bigframes.Session] = None, connection_name: Optional[str] = None, diff --git a/bigframes/ml/loader.py b/bigframes/ml/loader.py index 7d75f4c65a..4e7e808260 100644 --- a/bigframes/ml/loader.py +++ b/bigframes/ml/loader.py @@ -63,6 +63,8 @@ llm._GEMINI_PRO_ENDPOINT: llm.GeminiTextGenerator, llm._GEMINI_1P5_PRO_PREVIEW_ENDPOINT: llm.GeminiTextGenerator, llm._GEMINI_1P5_PRO_FLASH_PREVIEW_ENDPOINT: llm.GeminiTextGenerator, + llm._GEMINI_1P5_PRO_001_ENDPOINT: llm.GeminiTextGenerator, + llm._GEMINI_1P5_FLASH_001_ENDPOINT: llm.GeminiTextGenerator, llm._CLAUDE_3_HAIKU_ENDPOINT: llm.Claude3TextGenerator, llm._CLAUDE_3_SONNET_ENDPOINT: llm.Claude3TextGenerator, llm._CLAUDE_3_5_SONNET_ENDPOINT: llm.Claude3TextGenerator, diff --git a/tests/system/small/ml/test_llm.py b/tests/system/small/ml/test_llm.py index 43e756019d..e3d2b51081 100644 --- a/tests/system/small/ml/test_llm.py +++ b/tests/system/small/ml/test_llm.py @@ -324,7 +324,7 @@ def test_create_load_text_embedding_generator_model( ("text-embedding-004", "text-multilingual-embedding-002"), ) @pytest.mark.flaky(retries=2) -def test_gemini_text_embedding_generator_predict_default_params_success( +def test_text_embedding_generator_predict_default_params_success( llm_text_df, model_name, session, bq_connection ): text_embedding_model = llm.TextEmbeddingGenerator( @@ -340,7 +340,13 @@ def test_gemini_text_embedding_generator_predict_default_params_success( @pytest.mark.parametrize( "model_name", - ("gemini-pro", "gemini-1.5-pro-preview-0514", "gemini-1.5-flash-preview-0514"), + ( + "gemini-pro", + "gemini-1.5-pro-preview-0514", + "gemini-1.5-flash-preview-0514", + "gemini-1.5-pro-001", + "gemini-1.5-flash-001", + ), ) def test_create_load_gemini_text_generator_model( dataset_id, model_name, session, bq_connection @@ -362,7 +368,13 @@ def test_create_load_gemini_text_generator_model( @pytest.mark.parametrize( "model_name", - ("gemini-pro", "gemini-1.5-pro-preview-0514", "gemini-1.5-flash-preview-0514"), + ( + "gemini-pro", + "gemini-1.5-pro-preview-0514", + "gemini-1.5-flash-preview-0514", + "gemini-1.5-pro-001", + "gemini-1.5-flash-001", + ), ) @pytest.mark.flaky(retries=2) def test_gemini_text_generator_predict_default_params_success( @@ -379,7 +391,13 @@ def test_gemini_text_generator_predict_default_params_success( @pytest.mark.parametrize( "model_name", - ("gemini-pro", "gemini-1.5-pro-preview-0514", "gemini-1.5-flash-preview-0514"), + ( + "gemini-pro", + "gemini-1.5-pro-preview-0514", + "gemini-1.5-flash-preview-0514", + "gemini-1.5-pro-001", + "gemini-1.5-flash-001", + ), ) @pytest.mark.flaky(retries=2) def test_gemini_text_generator_predict_with_params_success( From 46d80125e449ac53f879ca317da0de28725fa47b Mon Sep 17 00:00:00 2001 From: TrevorBergeron Date: Thu, 5 Sep 2024 15:24:34 -0700 Subject: [PATCH 05/12] refactor: Simplify projection nodes (#961) --- bigframes/core/__init__.py | 77 +++++++++--------------------- bigframes/core/blocks.py | 25 ++++++---- bigframes/core/compile/compiled.py | 15 +++++- bigframes/core/compile/compiler.py | 5 ++ bigframes/core/expression.py | 29 ++++++++--- bigframes/core/nodes.py | 31 +++++++++++- bigframes/core/ordering.py | 2 +- bigframes/core/rewrite.py | 39 ++++++++++++--- bigframes/session/executor.py | 4 +- bigframes/session/planner.py | 12 ++++- tests/unit/test_planner.py | 16 +++---- 11 files changed, 164 insertions(+), 91 deletions(-) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index f3c75f7143..f65509e5b7 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -192,20 +192,15 @@ def concat(self, other: typing.Sequence[ArrayValue]) -> ArrayValue: ) def project_to_id(self, expression: ex.Expression, output_id: str): - if output_id in self.column_ids: # Mutate case - exprs = [ - ((expression if (col_id == output_id) else ex.free_var(col_id)), col_id) - for col_id in self.column_ids - ] - else: # append case - self_projection = ( - (ex.free_var(col_id), col_id) for col_id in self.column_ids - ) - exprs = [*self_projection, (expression, output_id)] return ArrayValue( nodes.ProjectionNode( child=self.node, - assignments=tuple(exprs), + assignments=( + ( + expression, + output_id, + ), + ), ) ) @@ -213,28 +208,22 @@ def assign(self, source_id: str, destination_id: str) -> ArrayValue: if destination_id in self.column_ids: # Mutate case exprs = [ ( - ( - ex.free_var(source_id) - if (col_id == destination_id) - else ex.free_var(col_id) - ), + (source_id if (col_id == destination_id) else col_id), col_id, ) for col_id in self.column_ids ] else: # append case - self_projection = ( - (ex.free_var(col_id), col_id) for col_id in self.column_ids - ) - exprs = [*self_projection, (ex.free_var(source_id), destination_id)] + self_projection = ((col_id, col_id) for col_id in self.column_ids) + exprs = [*self_projection, (source_id, destination_id)] return ArrayValue( - nodes.ProjectionNode( + nodes.SelectionNode( child=self.node, - assignments=tuple(exprs), + input_output_pairs=tuple(exprs), ) ) - def assign_constant( + def create_constant( self, destination_id: str, value: typing.Any, @@ -244,49 +233,31 @@ def assign_constant( # Need to assign a data type when value is NaN. dtype = dtype or bigframes.dtypes.DEFAULT_DTYPE - if destination_id in self.column_ids: # Mutate case - exprs = [ - ( - ( - ex.const(value, dtype) - if (col_id == destination_id) - else ex.free_var(col_id) - ), - col_id, - ) - for col_id in self.column_ids - ] - else: # append case - self_projection = ( - (ex.free_var(col_id), col_id) for col_id in self.column_ids - ) - exprs = [*self_projection, (ex.const(value, dtype), destination_id)] return ArrayValue( nodes.ProjectionNode( child=self.node, - assignments=tuple(exprs), + assignments=((ex.const(value, dtype), destination_id),), ) ) def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue: - selections = ((ex.free_var(col_id), col_id) for col_id in column_ids) + # This basically just drops and reorders columns - logically a no-op except as a final step + selections = ((col_id, col_id) for col_id in column_ids) return ArrayValue( - nodes.ProjectionNode( + nodes.SelectionNode( child=self.node, - assignments=tuple(selections), + input_output_pairs=tuple(selections), ) ) def drop_columns(self, columns: Iterable[str]) -> ArrayValue: new_projection = ( - (ex.free_var(col_id), col_id) - for col_id in self.column_ids - if col_id not in columns + (col_id, col_id) for col_id in self.column_ids if col_id not in columns ) return ArrayValue( - nodes.ProjectionNode( + nodes.SelectionNode( child=self.node, - assignments=tuple(new_projection), + input_output_pairs=tuple(new_projection), ) ) @@ -422,15 +393,13 @@ def unpivot( col_expr = ops.case_when_op.as_expr(*cases) unpivot_exprs.append((col_expr, col_id)) - label_exprs = ((ex.free_var(id), id) for id in index_col_ids) - # passthrough columns are unchanged, just repeated N times each - passthrough_exprs = ((ex.free_var(id), id) for id in passthrough_columns) + unpivot_col_ids = [id for id, _ in unpivot_columns] return ArrayValue( nodes.ProjectionNode( child=joined_array.node, - assignments=(*label_exprs, *unpivot_exprs, *passthrough_exprs), + assignments=(*unpivot_exprs,), ) - ) + ).select_columns([*index_col_ids, *unpivot_col_ids, *passthrough_columns]) def _cross_join_w_labels( self, labels_array: ArrayValue, join_side: typing.Literal["left", "right"] diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index a309671842..d7df7801bc 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -939,7 +939,7 @@ def multi_apply_unary_op( for col_id in columns: label = self.col_id_to_label[col_id] block, result_id = block.project_expr( - expr.bind_all_variables({input_varname: ex.free_var(col_id)}), + expr.bind_variables({input_varname: ex.free_var(col_id)}), label=label, ) block = block.copy_values(result_id, col_id) @@ -1006,7 +1006,7 @@ def create_constant( dtype: typing.Optional[bigframes.dtypes.Dtype] = None, ) -> typing.Tuple[Block, str]: result_id = guid.generate_guid() - expr = self.expr.assign_constant(result_id, scalar_constant, dtype=dtype) + expr = self.expr.create_constant(result_id, scalar_constant, dtype=dtype) # Create index copy with label inserted # See: https://pandas.pydata.org/docs/reference/api/pandas.Index.insert.html labels = self.column_labels.insert(len(self.column_labels), label) @@ -1067,7 +1067,7 @@ def aggregate_all_and_stack( index_id = guid.generate_guid() result_expr = self.expr.aggregate( aggregations, dropna=dropna - ).assign_constant(index_id, None, None) + ).create_constant(index_id, None, None) # Transpose as last operation so that final block has valid transpose cache return Block( result_expr, @@ -1222,7 +1222,7 @@ def aggregate( names: typing.List[Label] = [] if len(by_column_ids) == 0: label_id = guid.generate_guid() - result_expr = result_expr.assign_constant(label_id, 0, pd.Int64Dtype()) + result_expr = result_expr.create_constant(label_id, 0, pd.Int64Dtype()) index_columns = (label_id,) names = [None] else: @@ -1614,17 +1614,22 @@ def add_prefix(self, prefix: str, axis: str | int | None = None) -> Block: axis_number = utils.get_axis_number("rows" if (axis is None) else axis) if axis_number == 0: expr = self._expr + new_index_cols = [] for index_col in self._index_columns: + new_col = guid.generate_guid() expr = expr.project_to_id( expression=ops.add_op.as_expr( ex.const(prefix), ops.AsTypeOp(to_type="string").as_expr(index_col), ), - output_id=index_col, + output_id=new_col, ) + new_index_cols.append(new_col) + expr = expr.select_columns((*new_index_cols, *self.value_columns)) + return Block( expr, - index_columns=self.index_columns, + index_columns=new_index_cols, column_labels=self.column_labels, index_labels=self.index.names, ) @@ -1635,17 +1640,21 @@ def add_suffix(self, suffix: str, axis: str | int | None = None) -> Block: axis_number = utils.get_axis_number("rows" if (axis is None) else axis) if axis_number == 0: expr = self._expr + new_index_cols = [] for index_col in self._index_columns: + new_col = guid.generate_guid() expr = expr.project_to_id( expression=ops.add_op.as_expr( ops.AsTypeOp(to_type="string").as_expr(index_col), ex.const(suffix), ), - output_id=index_col, + output_id=new_col, ) + new_index_cols.append(new_col) + expr = expr.select_columns((*new_index_cols, *self.value_columns)) return Block( expr, - index_columns=self.index_columns, + index_columns=new_index_cols, column_labels=self.column_labels, index_labels=self.index.names, ) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index 512238440c..9a9f598e89 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -134,10 +134,23 @@ def projection( ) -> T: """Apply an expression to the ArrayValue and assign the output to a column.""" bindings = {col: self._get_ibis_column(col) for col in self.column_ids} - values = [ + new_values = [ op_compiler.compile_expression(expression, bindings).name(id) for expression, id in expression_id_pairs ] + result = self._select(tuple([*self._columns, *new_values])) # type: ignore + return result + + def selection( + self: T, + input_output_pairs: typing.Tuple[typing.Tuple[str, str], ...], + ) -> T: + """Apply an expression to the ArrayValue and assign the output to a column.""" + bindings = {col: self._get_ibis_column(col) for col in self.column_ids} + values = [ + op_compiler.compile_expression(ex.free_var(input), bindings).name(id) + for input, id in input_output_pairs + ] result = self._select(tuple(values)) # type: ignore return result diff --git a/bigframes/core/compile/compiler.py b/bigframes/core/compile/compiler.py index 3fedf5c0c8..80d5f5a893 100644 --- a/bigframes/core/compile/compiler.py +++ b/bigframes/core/compile/compiler.py @@ -264,6 +264,11 @@ def compile_reversed(self, node: nodes.ReversedNode, ordered: bool = True): else: return self.compile_unordered_ir(node.child) + @_compile_node.register + def compile_selection(self, node: nodes.SelectionNode, ordered: bool = True): + result = self.compile_node(node.child, ordered) + return result.selection(node.input_output_pairs) + @_compile_node.register def compile_projection(self, node: nodes.ProjectionNode, ordered: bool = True): result = self.compile_node(node.child, ordered) diff --git a/bigframes/core/expression.py b/bigframes/core/expression.py index c216c29717..bbd23b689c 100644 --- a/bigframes/core/expression.py +++ b/bigframes/core/expression.py @@ -110,8 +110,13 @@ def output_type( ... @abc.abstractmethod - def bind_all_variables(self, bindings: Mapping[str, Expression]) -> Expression: - """Replace all variables with expression given in `bindings`.""" + def bind_variables( + self, bindings: Mapping[str, Expression], check_bind_all: bool = True + ) -> Expression: + """Replace variables with expression given in `bindings`. + + If check_bind_all is True, validate that all free variables are bound to a new value. + """ ... @property @@ -141,7 +146,9 @@ def output_type( ) -> dtypes.ExpressionType: return self.dtype - def bind_all_variables(self, bindings: Mapping[str, Expression]) -> Expression: + def bind_variables( + self, bindings: Mapping[str, Expression], check_bind_all: bool = True + ) -> Expression: return self @property @@ -178,11 +185,14 @@ def output_type( else: raise ValueError(f"Type of variable {self.id} has not been fixed.") - def bind_all_variables(self, bindings: Mapping[str, Expression]) -> Expression: + def bind_variables( + self, bindings: Mapping[str, Expression], check_bind_all: bool = True + ) -> Expression: if self.id in bindings.keys(): return bindings[self.id] - else: + elif check_bind_all: raise ValueError(f"Variable {self.id} remains unbound") + return self @property def is_bijective(self) -> bool: @@ -225,10 +235,15 @@ def output_type( ) return self.op.output_type(*operand_types) - def bind_all_variables(self, bindings: Mapping[str, Expression]) -> Expression: + def bind_variables( + self, bindings: Mapping[str, Expression], check_bind_all: bool = True + ) -> Expression: return OpExpression( self.op, - tuple(input.bind_all_variables(bindings) for input in self.inputs), + tuple( + input.bind_variables(bindings, check_bind_all=check_bind_all) + for input in self.inputs + ), ) @property diff --git a/bigframes/core/nodes.py b/bigframes/core/nodes.py index 73780719a9..27e76c7910 100644 --- a/bigframes/core/nodes.py +++ b/bigframes/core/nodes.py @@ -622,8 +622,32 @@ def relation_ops_created(self) -> int: return 0 +@dataclass(frozen=True) +class SelectionNode(UnaryNode): + input_output_pairs: typing.Tuple[typing.Tuple[str, str], ...] + + def __hash__(self): + return self._node_hash + + @functools.cached_property + def schema(self) -> schemata.ArraySchema: + input_types = self.child.schema._mapping + items = tuple( + schemata.SchemaItem(output, input_types[input]) + for input, output in self.input_output_pairs + ) + return schemata.ArraySchema(items) + + @property + def variables_introduced(self) -> int: + # This operation only renames variables, doesn't actually create new ones + return 0 + + @dataclass(frozen=True) class ProjectionNode(UnaryNode): + """Assigns new variables (without modifying existing ones)""" + assignments: typing.Tuple[typing.Tuple[ex.Expression, str], ...] def __post_init__(self): @@ -631,6 +655,8 @@ def __post_init__(self): for expression, id in self.assignments: # throws TypeError if invalid _ = expression.output_type(input_types) + # Cannot assign to existing variables - append only! + assert all(name not in self.child.schema.names for _, name in self.assignments) def __hash__(self): return self._node_hash @@ -644,7 +670,10 @@ def schema(self) -> schemata.ArraySchema: ) for ex, id in self.assignments ) - return schemata.ArraySchema(items) + schema = self.child.schema + for item in items: + schema = schema.append(item) + return schema @property def variables_introduced(self) -> int: diff --git a/bigframes/core/ordering.py b/bigframes/core/ordering.py index bff7e2ce44..a57d7a18d6 100644 --- a/bigframes/core/ordering.py +++ b/bigframes/core/ordering.py @@ -63,7 +63,7 @@ def bind_variables( self, mapping: Mapping[str, expression.Expression] ) -> OrderingExpression: return OrderingExpression( - self.scalar_expression.bind_all_variables(mapping), + self.scalar_expression.bind_variables(mapping), self.direction, self.na_last, ) diff --git a/bigframes/core/rewrite.py b/bigframes/core/rewrite.py index 60ed4069a9..0e73166ea5 100644 --- a/bigframes/core/rewrite.py +++ b/bigframes/core/rewrite.py @@ -27,6 +27,7 @@ Selection = Tuple[Tuple[scalar_exprs.Expression, str], ...] REWRITABLE_NODE_TYPES = ( + nodes.SelectionNode, nodes.ProjectionNode, nodes.FilterNode, nodes.ReversedNode, @@ -54,7 +55,12 @@ def from_node_span( for id in get_node_column_ids(node) ) return cls(node, selection, None, ()) - if isinstance(node, nodes.ProjectionNode): + + if isinstance(node, nodes.SelectionNode): + return cls.from_node_span(node.child, target).select( + node.input_output_pairs + ) + elif isinstance(node, nodes.ProjectionNode): return cls.from_node_span(node.child, target).project(node.assignments) elif isinstance(node, nodes.FilterNode): return cls.from_node_span(node.child, target).filter(node.predicate) @@ -69,22 +75,39 @@ def from_node_span( def column_lookup(self) -> Mapping[str, scalar_exprs.Expression]: return {col_id: expr for expr, col_id in self.columns} + def select(self, input_output_pairs: Tuple[Tuple[str, str], ...]) -> SquashedSelect: + new_columns = tuple( + ( + scalar_exprs.free_var(input).bind_variables(self.column_lookup), + output, + ) + for input, output in input_output_pairs + ) + return SquashedSelect( + self.root, new_columns, self.predicate, self.ordering, self.reverse_root + ) + def project( self, projection: Tuple[Tuple[scalar_exprs.Expression, str], ...] ) -> SquashedSelect: + existing_columns = self.columns new_columns = tuple( - (expr.bind_all_variables(self.column_lookup), id) for expr, id in projection + (expr.bind_variables(self.column_lookup), id) for expr, id in projection ) return SquashedSelect( - self.root, new_columns, self.predicate, self.ordering, self.reverse_root + self.root, + (*existing_columns, *new_columns), + self.predicate, + self.ordering, + self.reverse_root, ) def filter(self, predicate: scalar_exprs.Expression) -> SquashedSelect: if self.predicate is None: - new_predicate = predicate.bind_all_variables(self.column_lookup) + new_predicate = predicate.bind_variables(self.column_lookup) else: new_predicate = ops.and_op.as_expr( - self.predicate, predicate.bind_all_variables(self.column_lookup) + self.predicate, predicate.bind_variables(self.column_lookup) ) return SquashedSelect( self.root, self.columns, new_predicate, self.ordering, self.reverse_root @@ -204,7 +227,11 @@ def expand(self) -> nodes.BigFrameNode: root = nodes.FilterNode(child=root, predicate=self.predicate) if self.ordering: root = nodes.OrderByNode(child=root, by=self.ordering) - return nodes.ProjectionNode(child=root, assignments=self.columns) + selection = tuple((id, id) for _, id in self.columns) + return nodes.SelectionNode( + child=nodes.ProjectionNode(child=root, assignments=self.columns), + input_output_pairs=selection, + ) def join_as_projection( diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 72d5493294..424e6d7dad 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -457,9 +457,7 @@ def generate_head_plan(node: nodes.BigFrameNode, n: int): predicate = ops.lt_op.as_expr(ex.free_var(offsets_id), ex.const(n)) plan_w_head = nodes.FilterNode(plan_w_offsets, predicate) # Finally, drop the offsets column - return nodes.ProjectionNode( - plan_w_head, tuple((ex.free_var(i), i) for i in node.schema.names) - ) + return nodes.SelectionNode(plan_w_head, tuple((i, i) for i in node.schema.names)) def generate_row_count_plan(node: nodes.BigFrameNode): diff --git a/bigframes/session/planner.py b/bigframes/session/planner.py index 2a74521b43..bc640ec9fa 100644 --- a/bigframes/session/planner.py +++ b/bigframes/session/planner.py @@ -33,7 +33,7 @@ def session_aware_cache_plan( """ node_counts = traversals.count_nodes(session_forest) # These node types are cheap to re-compute, so it makes more sense to cache their children. - de_cachable_types = (nodes.FilterNode, nodes.ProjectionNode) + de_cachable_types = (nodes.FilterNode, nodes.ProjectionNode, nodes.SelectionNode) caching_target = cur_node = root caching_target_refs = node_counts.get(caching_target, 0) @@ -49,7 +49,15 @@ def session_aware_cache_plan( # Projection defines the variables that are used in the filter expressions, need to substitute variables with their scalar expressions # that instead reference variables in the child node. bindings = {name: expr for expr, name in cur_node.assignments} - filters = [i.bind_all_variables(bindings) for i in filters] + filters = [ + i.bind_variables(bindings, check_bind_all=False) for i in filters + ] + elif isinstance(cur_node, nodes.SelectionNode): + bindings = { + output: ex.free_var(input) + for input, output in cur_node.input_output_pairs + } + filters = [i.bind_variables(bindings) for i in filters] else: raise ValueError(f"Unexpected de-cached node: {cur_node}") diff --git a/tests/unit/test_planner.py b/tests/unit/test_planner.py index 2e276d0f1a..84dd05ddaa 100644 --- a/tests/unit/test_planner.py +++ b/tests/unit/test_planner.py @@ -46,8 +46,8 @@ def test_session_aware_caching_project_filter(): """ Test that if a node is filtered by a column, the node is cached pre-filter and clustered by the filter column. """ - session_objects = [LEAF, LEAF.assign_constant("col_c", 4, pd.Int64Dtype())] - target = LEAF.assign_constant("col_c", 4, pd.Int64Dtype()).filter( + session_objects = [LEAF, LEAF.create_constant("col_c", 4, pd.Int64Dtype())] + target = LEAF.create_constant("col_c", 4, pd.Int64Dtype()).filter( ops.gt_op.as_expr("col_a", ex.const(3)) ) result, cluster_cols = planner.session_aware_cache_plan( @@ -61,14 +61,14 @@ def test_session_aware_caching_project_multi_filter(): """ Test that if a node is filtered by multiple columns, all of them are in the cluster cols """ - session_objects = [LEAF, LEAF.assign_constant("col_c", 4, pd.Int64Dtype())] + session_objects = [LEAF, LEAF.create_constant("col_c", 4, pd.Int64Dtype())] predicate_1a = ops.gt_op.as_expr("col_a", ex.const(3)) predicate_1b = ops.lt_op.as_expr("col_a", ex.const(55)) predicate_1 = ops.and_op.as_expr(predicate_1a, predicate_1b) predicate_3 = ops.eq_op.as_expr("col_b", ex.const(1)) target = ( LEAF.filter(predicate_1) - .assign_constant("col_c", 4, pd.Int64Dtype()) + .create_constant("col_c", 4, pd.Int64Dtype()) .filter(predicate_3) ) result, cluster_cols = planner.session_aware_cache_plan( @@ -84,8 +84,8 @@ def test_session_aware_caching_unusable_filter(): Most filters with multiple column references cannot be used for scan pruning, as they cannot be converted to fixed value ranges. """ - session_objects = [LEAF, LEAF.assign_constant("col_c", 4, pd.Int64Dtype())] - target = LEAF.assign_constant("col_c", 4, pd.Int64Dtype()).filter( + session_objects = [LEAF, LEAF.create_constant("col_c", 4, pd.Int64Dtype())] + target = LEAF.create_constant("col_c", 4, pd.Int64Dtype()).filter( ops.gt_op.as_expr("col_a", "col_b") ) result, cluster_cols = planner.session_aware_cache_plan( @@ -101,12 +101,12 @@ def test_session_aware_caching_fork_after_window_op(): Windowing is expensive, so caching should always compute the window function, in order to avoid later recomputation. """ - other = LEAF.promote_offsets("offsets_col").assign_constant( + other = LEAF.promote_offsets("offsets_col").create_constant( "col_d", 5, pd.Int64Dtype() ) target = ( LEAF.promote_offsets("offsets_col") - .assign_constant("col_c", 4, pd.Int64Dtype()) + .create_constant("col_c", 4, pd.Int64Dtype()) .filter( ops.eq_op.as_expr("col_a", ops.add_op.as_expr(ex.const(4), ex.const(3))) ) From 874d3b704be43ac54290dae2b3590819f300a01b Mon Sep 17 00:00:00 2001 From: Arwa Sharif <146148342+arwas11@users.noreply.github.com> Date: Thu, 5 Sep 2024 18:41:49 -0500 Subject: [PATCH 06/12] docs: add docstring returns section to Options (#937) --- bigframes/_config/__init__.py | 30 +++++++++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/bigframes/_config/__init__.py b/bigframes/_config/__init__.py index c9b2a3f95a..ac58c19fa5 100644 --- a/bigframes/_config/__init__.py +++ b/bigframes/_config/__init__.py @@ -73,7 +73,12 @@ def _init_bigquery_thread_local(self): @property def bigquery(self) -> bigquery_options.BigQueryOptions: - """Options to use with the BigQuery engine.""" + """Options to use with the BigQuery engine. + + Returns: + bigframes._config.bigquery_options.BigQueryOptions: + Options for BigQuery engine. + """ if self._local.bigquery_options is not None: # The only way we can get here is if someone called # _init_bigquery_thread_local. @@ -83,7 +88,12 @@ def bigquery(self) -> bigquery_options.BigQueryOptions: @property def display(self) -> display_options.DisplayOptions: - """Options controlling object representation.""" + """Options controlling object representation. + + Returns: + bigframes._config.display_options.DisplayOptions: + Options for controlling object representation. + """ return self._local.display_options @property @@ -95,12 +105,21 @@ def sampling(self) -> sampling_options.SamplingOptions: (e.g., to_pandas, to_numpy, values) or implicitly (e.g., matplotlib plotting). This option can be overriden by parameters in specific functions. + + Returns: + bigframes._config.sampling_options.SamplingOptions: + Options for controlling downsampling. """ return self._local.sampling_options @property def compute(self) -> compute_options.ComputeOptions: - """Thread-local options controlling object computation.""" + """Thread-local options controlling object computation. + + Returns: + bigframes._config.compute_options.ComputeOptions: + Thread-local options for controlling object computation + """ return self._local.compute_options @property @@ -109,6 +128,11 @@ def is_bigquery_thread_local(self) -> bool: A thread-local session can be started by using `with bigframes.option_context("bigquery.some_option", "some-value"):`. + + Returns: + bool: + A boolean value, where a value is True if a thread-local session + is in use; otherwise False. """ return self._local.bigquery_options is not None From 2d99a9ad68863271f50354af8d4b3c0dddafd487 Mon Sep 17 00:00:00 2001 From: Chelsea Lin <124939984+chelsea-lin@users.noreply.github.com> Date: Thu, 5 Sep 2024 18:42:09 -0700 Subject: [PATCH 07/12] chore: drop unused columns at is_monotonic methods (#912) * chore: drop unused columns at is_monotonic methods * fixing mypy --- bigframes/core/blocks.py | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index d7df7801bc..4db171ec70 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -2429,9 +2429,11 @@ def _is_monotonic( block, last_notna_id = self.apply_unary_op(column_ids[0], ops.notnull_op) for column_id in column_ids[1:]: block, notna_id = block.apply_unary_op(column_id, ops.notnull_op) + old_last_notna_id = last_notna_id block, last_notna_id = block.apply_binary_op( - last_notna_id, notna_id, ops.and_op + old_last_notna_id, notna_id, ops.and_op ) + block.drop_columns([notna_id, old_last_notna_id]) # loop over all columns to check monotonicity last_result_id = None @@ -2443,21 +2445,27 @@ def _is_monotonic( column_id, lag_result_id, ops.gt_op if increasing else ops.lt_op ) block, equal_id = block.apply_binary_op(column_id, lag_result_id, ops.eq_op) + block = block.drop_columns([lag_result_id]) if last_result_id is None: block, last_result_id = block.apply_binary_op( equal_id, strict_monotonic_id, ops.or_op ) - continue - block, equal_monotonic_id = block.apply_binary_op( - equal_id, last_result_id, ops.and_op - ) - block, last_result_id = block.apply_binary_op( - equal_monotonic_id, strict_monotonic_id, ops.or_op - ) + block = block.drop_columns([equal_id, strict_monotonic_id]) + else: + block, equal_monotonic_id = block.apply_binary_op( + equal_id, last_result_id, ops.and_op + ) + block = block.drop_columns([equal_id, last_result_id]) + block, last_result_id = block.apply_binary_op( + equal_monotonic_id, strict_monotonic_id, ops.or_op + ) + block = block.drop_columns([equal_monotonic_id, strict_monotonic_id]) block, monotonic_result_id = block.apply_binary_op( last_result_id, last_notna_id, ops.and_op # type: ignore ) + if last_result_id is not None: + block = block.drop_columns([last_result_id, last_notna_id]) result = block.get_stat(monotonic_result_id, agg_ops.all_op) self._stats_cache[column_name].update({op_name: result}) return result From 4b8ad55a50ba68e47d9fc485694f92ae9afaa84b Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 5 Sep 2024 22:18:38 -0700 Subject: [PATCH 08/12] test: retry streaming tests to accommodate flakiness (#956) * test: retry streaming tests to accommodate flakiness * reduce delay, increase retries --- tests/system/large/test_streaming.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/system/large/test_streaming.py b/tests/system/large/test_streaming.py index 391aec8533..e4992f8573 100644 --- a/tests/system/large/test_streaming.py +++ b/tests/system/large/test_streaming.py @@ -14,10 +14,13 @@ import time +import pytest + import bigframes import bigframes.streaming +@pytest.mark.flaky(retries=3, delay=10) def test_streaming_df_to_bigtable(session_load: bigframes.Session): # launch a continuous query job_id_prefix = "test_streaming_" @@ -51,6 +54,7 @@ def test_streaming_df_to_bigtable(session_load: bigframes.Session): query_job.cancel() +@pytest.mark.flaky(retries=3, delay=10) def test_streaming_df_to_pubsub(session_load: bigframes.Session): # launch a continuous query job_id_prefix = "test_streaming_pubsub_" From 7ca0331b4de360e33bc7780ba7f22016135833d0 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Thu, 5 Sep 2024 23:22:53 -0700 Subject: [PATCH 09/12] fix: make `read_gbq_function` work for multi-param functions (#947) * fix: make `read_gbq_function` work for multi-param functions * fix hyperlink * specify hyperlink differently * make hyperlink markdown format --- bigframes/functions/remote_function.py | 8 +++++++ bigframes/session/__init__.py | 27 ++++++++++++++++++---- tests/system/small/test_remote_function.py | 9 +++++++- 3 files changed, 38 insertions(+), 6 deletions(-) diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index 7e9df74e76..ddb36a9bef 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -14,6 +14,7 @@ from __future__ import annotations +import inspect import logging from typing import cast, Optional, TYPE_CHECKING import warnings @@ -149,6 +150,13 @@ def func(*ignored_args, **ignored_kwargs): expr = node(*ignored_args, **ignored_kwargs) # type: ignore return ibis_client.execute(expr) + func.__signature__ = inspect.signature(func).replace( # type: ignore + parameters=[ + inspect.Parameter(name, inspect.Parameter.POSITIONAL_OR_KEYWORD) + for name in ibis_signature.parameter_names + ] + ) + # TODO: Move ibis logic to compiler step func.__name__ = routine_ref.routine_id diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 7aa4ed4b5a..e52e2ef17f 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1241,12 +1241,22 @@ def read_gbq_function( **Examples:** - Use the ``cw_lower_case_ascii_only`` function from Community UDFs. - (https://github.com/GoogleCloudPlatform/bigquery-utils/blob/master/udfs/community/cw_lower_case_ascii_only.sqlx) - >>> import bigframes.pandas as bpd >>> bpd.options.display.progress_bar = None + Use the [cw_lower_case_ascii_only](https://github.com/GoogleCloudPlatform/bigquery-utils/blob/master/udfs/community/README.md#cw_lower_case_ascii_onlystr-string) + function from Community UDFs. + + >>> func = bpd.read_gbq_function("bqutil.fn.cw_lower_case_ascii_only") + + You can run it on scalar input. Usually you would do so to verify that + it works as expected before applying to all values in a Series. + + >>> func('AURÉLIE') + 'aurÉlie' + + You can apply it to a BigQuery DataFrame Series. + >>> df = bpd.DataFrame({'id': [1, 2, 3], 'name': ['AURÉLIE', 'CÉLESTINE', 'DAPHNÉ']}) >>> df id name @@ -1256,7 +1266,6 @@ def read_gbq_function( [3 rows x 2 columns] - >>> func = bpd.read_gbq_function("bqutil.fn.cw_lower_case_ascii_only") >>> df1 = df.assign(new_name=df['name'].apply(func)) >>> df1 id name new_name @@ -1266,9 +1275,17 @@ def read_gbq_function( [3 rows x 3 columns] + You can even use a function with multiple inputs. For example, let's use + [cw_instr4](https://github.com/GoogleCloudPlatform/bigquery-utils/blob/master/udfs/community/README.md#cw_instr4source-string-search-string-position-int64-ocurrence-int64) + from Community UDFs. + + >>> func = bpd.read_gbq_function("bqutil.fn.cw_instr4") + >>> func('TestStr123456Str', 'Str', 1, 2) + 14 + Args: function_name (str): - the function's name in BigQuery in the format + The function's name in BigQuery in the format `project_id.dataset_id.function_name`, or `dataset_id.function_name` to load from the default project, or `function_name` to load from the default project and the dataset diff --git a/tests/system/small/test_remote_function.py b/tests/system/small/test_remote_function.py index db573efa40..b000354ed4 100644 --- a/tests/system/small/test_remote_function.py +++ b/tests/system/small/test_remote_function.py @@ -671,12 +671,19 @@ def square1(x): @pytest.mark.flaky(retries=2, delay=120) -def test_read_gbq_function_runs_existing_udf(session, bigquery_client, dataset_id): +def test_read_gbq_function_runs_existing_udf(session): func = session.read_gbq_function("bqutil.fn.cw_lower_case_ascii_only") got = func("AURÉLIE") assert got == "aurÉlie" +@pytest.mark.flaky(retries=2, delay=120) +def test_read_gbq_function_runs_existing_udf_4_params(session): + func = session.read_gbq_function("bqutil.fn.cw_instr4") + got = func("TestStr123456Str", "Str", 1, 2) + assert got == 14 + + @pytest.mark.flaky(retries=2, delay=120) def test_read_gbq_function_reads_udfs(session, bigquery_client, dataset_id): dataset_ref = bigquery.DatasetReference.from_string(dataset_id) From ea9ac47367d9241616f4179d679428caff0459c9 Mon Sep 17 00:00:00 2001 From: Shobhit Singh Date: Fri, 6 Sep 2024 09:59:54 -0700 Subject: [PATCH 10/12] fix: support `read_gbq_function` for axis=1 application (#950) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: support `read_gbq_function` for axis=1 application * remove stray newline * Update bigframes/session/__init__.py * remove first person reference in the doc * use correct product name --------- Co-authored-by: Tim Sweña (Swast) --- .../functions/_remote_function_session.py | 2 +- bigframes/functions/remote_function.py | 2 + bigframes/pandas/__init__.py | 3 +- bigframes/session/__init__.py | 38 ++++++++++++++++--- tests/system/large/test_remote_function.py | 14 +++++++ 5 files changed, 51 insertions(+), 8 deletions(-) diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index fba3b5ba41..893b903aeb 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -176,7 +176,7 @@ def remote_function( getting and setting IAM roles on cloud resources. If this param is not provided then resource manager client from the session would be used. - dataset (str, Optional.): + dataset (str, Optional): Dataset in which to create a BigQuery remote function. It should be in `.` or `` format. If this parameter is not provided then session dataset id is used. diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index ddb36a9bef..39e3bfd8f0 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -108,6 +108,7 @@ def read_gbq_function( function_name: str, *, session: Session, + is_row_processor: bool = False, ): """ Read an existing BigQuery function and prepare it for use in future queries. @@ -194,5 +195,6 @@ def func(*ignored_args, **ignored_kwargs): func.output_dtype = bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( # type: ignore ibis_signature.output_type ) + func.is_row_processor = is_row_processor # type: ignore func.ibis_node = node # type: ignore return func diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 08d808572d..9f33a8a1ea 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -692,10 +692,11 @@ def remote_function( remote_function.__doc__ = inspect.getdoc(bigframes.session.Session.remote_function) -def read_gbq_function(function_name: str): +def read_gbq_function(function_name: str, is_row_processor: bool = False): return global_session.with_default_session( bigframes.session.Session.read_gbq_function, function_name=function_name, + is_row_processor=is_row_processor, ) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index e52e2ef17f..045483bd53 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1225,6 +1225,7 @@ def remote_function( def read_gbq_function( self, function_name: str, + is_row_processor: bool = False, ): """Loads a BigQuery function from BigQuery. @@ -1255,7 +1256,7 @@ def read_gbq_function( >>> func('AURÉLIE') 'aurÉlie' - You can apply it to a BigQuery DataFrame Series. + You can apply it to a BigQuery DataFrames Series. >>> df = bpd.DataFrame({'id': [1, 2, 3], 'name': ['AURÉLIE', 'CÉLESTINE', 'DAPHNÉ']}) >>> df @@ -1275,13 +1276,33 @@ def read_gbq_function( [3 rows x 3 columns] - You can even use a function with multiple inputs. For example, let's use - [cw_instr4](https://github.com/GoogleCloudPlatform/bigquery-utils/blob/master/udfs/community/README.md#cw_instr4source-string-search-string-position-int64-ocurrence-int64) + You can even use a function with multiple inputs. For example, + [cw_regexp_replace_5](https://github.com/GoogleCloudPlatform/bigquery-utils/blob/master/udfs/community/README.md#cw_regexp_replace_5haystack-string-regexp-string-replacement-string-offset-int64-occurrence-int64) from Community UDFs. - >>> func = bpd.read_gbq_function("bqutil.fn.cw_instr4") - >>> func('TestStr123456Str', 'Str', 1, 2) - 14 + >>> func = bpd.read_gbq_function("bqutil.fn.cw_regexp_replace_5") + >>> func('TestStr123456', 'Str', 'Cad$', 1, 1) + 'TestCad$123456' + + >>> df = bpd.DataFrame({ + ... "haystack" : ["TestStr123456", "TestStr123456Str", "TestStr123456Str"], + ... "regexp" : ["Str", "Str", "Str"], + ... "replacement" : ["Cad$", "Cad$", "Cad$"], + ... "offset" : [1, 1, 1], + ... "occurrence" : [1, 2, 1] + ... }) + >>> df + haystack regexp replacement offset occurrence + 0 TestStr123456 Str Cad$ 1 1 + 1 TestStr123456Str Str Cad$ 1 2 + 2 TestStr123456Str Str Cad$ 1 1 + + [3 rows x 5 columns] + >>> df.apply(func, axis=1) + 0 TestCad$123456 + 1 TestStr123456Cad$ + 2 TestCad$123456Str + dtype: string Args: function_name (str): @@ -1290,6 +1311,10 @@ def read_gbq_function( `dataset_id.function_name` to load from the default project, or `function_name` to load from the default project and the dataset associated with the current session. + is_row_processor (bool, default False): + Whether the function is a row processor. This is set to True + for a function which receives an entire row of a DataFrame as + a pandas Series. Returns: callable: A function object pointing to the BigQuery function read @@ -1303,6 +1328,7 @@ def read_gbq_function( return bigframes_rf.read_gbq_function( function_name=function_name, session=self, + is_row_processor=is_row_processor, ) def _prepare_copy_job_config(self) -> bigquery.CopyJobConfig: diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index d6eefc1e31..77ea4627ec 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -1603,6 +1603,13 @@ def serialize_row(row): # bf_result.dtype is 'string[pyarrow]' while pd_result.dtype is 'object' # , ignore this mismatch by using check_dtype=False. pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + + # Let's make sure the read_gbq_function path works for this function + serialize_row_reuse = session.read_gbq_function( + serialize_row_remote.bigframes_remote_function, is_row_processor=True + ) + bf_result = scalars_df[columns].apply(serialize_row_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( @@ -2085,6 +2092,13 @@ def foo(x, y, z): pandas.testing.assert_series_equal( expected_result, bf_result, check_dtype=False, check_index_type=False ) + + # Let's make sure the read_gbq_function path works for this function + foo_reuse = session.read_gbq_function(foo.bigframes_remote_function) + bf_result = bf_df.apply(foo_reuse, axis=1).to_pandas() + pandas.testing.assert_series_equal( + expected_result, bf_result, check_dtype=False, check_index_type=False + ) finally: # clean up the gcp assets created for the remote function cleanup_remote_function_assets( From 0ff0d0e36e971fc81e682d39944eff7907c4fb2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Mon, 9 Sep 2024 10:45:50 -0500 Subject: [PATCH 11/12] docs: update title of pypi notebook example to reflect use of the PyPI public dataset (#952) In response to feedback on internal change 662899733. --- notebooks/dataframes/pypi.ipynb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/notebooks/dataframes/pypi.ipynb b/notebooks/dataframes/pypi.ipynb index 3777e98d42..7b16412ff5 100644 --- a/notebooks/dataframes/pypi.ipynb +++ b/notebooks/dataframes/pypi.ipynb @@ -25,7 +25,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "# Analyzing Python dependencies with BigQuery DataFrames\n", + "# Analyzing package downloads from PyPI with BigQuery DataFrames\n", "\n", "In this notebook, you'll use the [PyPI public dataset](https://console.cloud.google.com/marketplace/product/gcp-public-data-pypi/pypi) and the [deps.dev public dataset](https://deps.dev/) to visualize Python package downloads for a package and its dependencies.\n", "\n", From eff12735064c8a64cf584f351cd0b00c2fadbed9 Mon Sep 17 00:00:00 2001 From: Arwa Date: Mon, 9 Sep 2024 14:41:37 -0500 Subject: [PATCH 12/12] docs: add docstrings to SamplingOptions class --- bigframes/_config/sampling_options.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bigframes/_config/sampling_options.py b/bigframes/_config/sampling_options.py index c0411eba84..ddb2a49713 100644 --- a/bigframes/_config/sampling_options.py +++ b/bigframes/_config/sampling_options.py @@ -40,7 +40,7 @@ def with_max_download_size(self, max_rows: Optional[int]) -> SamplingOptions: An int value for the maximum row size. Returns: - SamplingOptions: + bigframes._config.sampling_options.SamplingOptions: The configuration for data sampling. """ return SamplingOptions( @@ -55,7 +55,7 @@ def with_method(self, method: Literal["head", "uniform"]) -> SamplingOptions: A literal string value of either head or uniform data sampling method. Returns: - SamplingOptions: + bigframes._config.sampling_options.SamplingOptions: The configuration for data sampling. """ return SamplingOptions(self.max_download_size, True, method, self.random_state) @@ -68,7 +68,7 @@ def with_random_state(self, state: Optional[int]) -> SamplingOptions: An int value for the data sampling random state Returns: - SamplingOptions: + bigframes._config.sampling_options.SamplingOptions: The configuration for data sampling. """ return SamplingOptions( @@ -82,7 +82,7 @@ def with_disabled(self) -> SamplingOptions: """Configures whether to disable downsampling Returns: - SamplingOptions: + bigframes._config.sampling_options.SamplingOptions: The configuration for data sampling. """ return SamplingOptions(