diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 067c1b1ebe..b6476c5eb8 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -492,9 +492,10 @@ def read_gbq( *, index_col: Iterable[str] | str = (), columns: Iterable[str] = (), + configuration: Optional[Dict] = None, max_results: Optional[int] = None, filters: vendored_pandas_gbq.FiltersType = (), - use_cache: bool = True, + use_cache: Optional[bool] = None, col_order: Iterable[str] = (), ) -> bigframes.dataframe.DataFrame: _set_default_session_location_if_possible(query_or_table) @@ -503,6 +504,7 @@ def read_gbq( query_or_table, index_col=index_col, columns=columns, + configuration=configuration, max_results=max_results, filters=filters, use_cache=use_cache, @@ -528,8 +530,9 @@ def read_gbq_query( *, index_col: Iterable[str] | str = (), columns: Iterable[str] = (), + configuration: Optional[Dict] = None, max_results: Optional[int] = None, - use_cache: bool = True, + use_cache: Optional[bool] = None, col_order: Iterable[str] = (), ) -> bigframes.dataframe.DataFrame: _set_default_session_location_if_possible(query) @@ -538,6 +541,7 @@ def read_gbq_query( query, index_col=index_col, columns=columns, + configuration=configuration, max_results=max_results, use_cache=use_cache, col_order=col_order, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index b826d42923..14df7edeb2 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -16,6 +16,7 @@ from __future__ import annotations +import copy import datetime import itertools import logging @@ -283,9 +284,10 @@ def read_gbq( *, index_col: Iterable[str] | str = (), columns: Iterable[str] = (), + configuration: Optional[Dict] = None, max_results: Optional[int] = None, filters: third_party_pandas_gbq.FiltersType = (), - use_cache: bool = True, + use_cache: Optional[bool] = None, col_order: Iterable[str] = (), # Add a verify index argument that fails if the index is not unique. ) -> dataframe.DataFrame: @@ -306,6 +308,7 @@ def read_gbq( query_or_table, index_col=index_col, columns=columns, + configuration=configuration, max_results=max_results, api_name="read_gbq", use_cache=use_cache, @@ -314,13 +317,20 @@ def read_gbq( # TODO(swast): Query the snapshot table but mark it as a # deterministic query so we can avoid serializing if we have a # unique index. + if configuration is not None: + raise ValueError( + "The 'configuration' argument is not allowed when " + "directly reading from a table. Please remove " + "'configuration' or use a query." + ) + return self._read_gbq_table( query_or_table, index_col=index_col, columns=columns, max_results=max_results, api_name="read_gbq", - use_cache=use_cache, + use_cache=use_cache if use_cache is not None else True, ) def _to_query( @@ -405,7 +415,7 @@ def _query_to_destination( query: str, index_cols: List[str], api_name: str, - use_cache: bool = True, + configuration: dict = {"query": {"useQueryCache": True}}, ) -> Tuple[Optional[bigquery.TableReference], Optional[bigquery.QueryJob]]: # If a dry_run indicates this is not a query type job, then don't # bother trying to do a CREATE TEMP TABLE ... AS SELECT ... statement. @@ -427,23 +437,35 @@ def _query_to_destination( ][:_MAX_CLUSTER_COLUMNS] temp_table = self._create_empty_temp_table(schema, cluster_cols) - job_config = bigquery.QueryJobConfig() + timeout_ms = configuration.get("jobTimeoutMs") or configuration["query"].get( + "timeoutMs" + ) + + # Convert timeout_ms to seconds, ensuring a minimum of 0.1 seconds to avoid + # the program getting stuck on too-short timeouts. + timeout = max(int(timeout_ms) * 1e-3, 0.1) if timeout_ms else None + + job_config = typing.cast( + bigquery.QueryJobConfig, + bigquery.QueryJobConfig.from_api_repr(configuration), + ) job_config.labels["bigframes-api"] = api_name job_config.destination = temp_table - job_config.use_query_cache = use_cache try: # Write to temp table to workaround BigQuery 10 GB query results # limit. See: internal issue 303057336. job_config.labels["error_caught"] = "true" - _, query_job = self._start_query(query, job_config=job_config) + _, query_job = self._start_query( + query, job_config=job_config, timeout=timeout + ) return query_job.destination, query_job except google.api_core.exceptions.BadRequest: # Some SELECT statements still aren't compatible with cluster # tables as the destination. For example, if the query has a # top-level ORDER BY, this conflicts with our ability to cluster # the table by the index column(s). - _, query_job = self._start_query(query) + _, query_job = self._start_query(query, timeout=timeout) return query_job.destination, query_job def read_gbq_query( @@ -452,8 +474,9 @@ def read_gbq_query( *, index_col: Iterable[str] | str = (), columns: Iterable[str] = (), + configuration: Optional[Dict] = None, max_results: Optional[int] = None, - use_cache: bool = True, + use_cache: Optional[bool] = None, col_order: Iterable[str] = (), ) -> dataframe.DataFrame: """Turn a SQL query into a DataFrame. @@ -517,6 +540,7 @@ def read_gbq_query( query=query, index_col=index_col, columns=columns, + configuration=configuration, max_results=max_results, api_name="read_gbq_query", use_cache=use_cache, @@ -528,10 +552,34 @@ def _read_gbq_query( *, index_col: Iterable[str] | str = (), columns: Iterable[str] = (), + configuration: Optional[Dict] = None, max_results: Optional[int] = None, api_name: str = "read_gbq_query", - use_cache: bool = True, + use_cache: Optional[bool] = None, ) -> dataframe.DataFrame: + configuration = _transform_read_gbq_configuration(configuration) + + if "query" not in configuration: + configuration["query"] = {} + + if "query" in configuration["query"]: + raise ValueError( + "The query statement must not be included in the ", + "'configuration' because it is already provided as", + " a separate parameter.", + ) + + if "useQueryCache" in configuration["query"]: + if use_cache is not None: + raise ValueError( + "'useQueryCache' in 'configuration' conflicts with" + " 'use_cache' parameter. Please specify only one." + ) + else: + configuration["query"]["useQueryCache"] = ( + True if use_cache is None else use_cache + ) + if isinstance(index_col, str): index_cols = [index_col] else: @@ -541,7 +589,7 @@ def _read_gbq_query( query, index_cols, api_name=api_name, - use_cache=use_cache, + configuration=configuration, ) # If there was no destination table, that means the query must have @@ -565,7 +613,7 @@ def _read_gbq_query( index_col=index_cols, columns=columns, max_results=max_results, - use_cache=use_cache, + use_cache=configuration["query"]["useQueryCache"], ) def read_gbq_table( @@ -1656,13 +1704,14 @@ def _start_query( sql: str, job_config: Optional[bigquery.job.QueryJobConfig] = None, max_results: Optional[int] = None, + timeout: Optional[float] = None, ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """ Starts BigQuery query job and waits for results. """ job_config = self._prepare_query_job_config(job_config) return bigframes.session._io.bigquery.start_query_with_client( - self.bqclient, sql, job_config, max_results + self.bqclient, sql, job_config, max_results, timeout ) def _start_query_ml_ddl( @@ -1876,3 +1925,25 @@ def _convert_to_nonnull_string(column: ibis_types.Column) -> ibis_types.StringVa # Escape backslashes and use backslash as delineator escaped = typing.cast(ibis_types.StringColumn, result.fillna("")).replace("\\", "\\\\") # type: ignore return typing.cast(ibis_types.StringColumn, ibis.literal("\\")).concat(escaped) + + +def _transform_read_gbq_configuration(configuration: Optional[dict]) -> dict: + """ + For backwards-compatibility, convert any previously client-side only + parameters such as timeoutMs to the property name expected by the REST API. + + Makes a copy of configuration if changes are needed. + """ + + if configuration is None: + return {} + + timeout_ms = configuration.get("query", {}).get("timeoutMs") + if timeout_ms is not None: + # Transform timeoutMs to an actual server-side configuration. + # https://github.com/googleapis/python-bigquery-pandas/issues/479 + configuration = copy.deepcopy(configuration) + del configuration["query"]["timeoutMs"] + configuration["jobTimeoutMs"] = timeout_ms + + return configuration diff --git a/bigframes/session/_io/bigquery.py b/bigframes/session/_io/bigquery.py index 67820bbbcb..38ff7429ec 100644 --- a/bigframes/session/_io/bigquery.py +++ b/bigframes/session/_io/bigquery.py @@ -220,6 +220,7 @@ def start_query_with_client( sql: str, job_config: bigquery.job.QueryJobConfig, max_results: Optional[int] = None, + timeout: Optional[float] = None, ) -> Tuple[bigquery.table.RowIterator, bigquery.QueryJob]: """ Starts query job and waits for results. @@ -230,7 +231,7 @@ def start_query_with_client( ) try: - query_job = bq_client.query(sql, job_config=job_config) + query_job = bq_client.query(sql, job_config=job_config, timeout=timeout) except google.api_core.exceptions.Forbidden as ex: if "Drive credentials" in ex.message: ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions." diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index c6702aa032..d0c20f3839 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -20,6 +20,7 @@ import typing from typing import List +import google import google.cloud.bigquery as bigquery import numpy as np import pandas as pd @@ -363,6 +364,47 @@ def test_read_gbq_table_wildcard_with_filter(session: bigframes.Session): assert df.shape == (348485, 32) +@pytest.mark.parametrize( + ("config"), + [ + { + "query": { + "useQueryCache": True, + "maximumBytesBilled": "1000000000", + "timeoutMs": 10000, + } + }, + pytest.param( + {"query": {"useQueryCache": True, "timeoutMs": 50}}, + marks=pytest.mark.xfail( + raises=google.api_core.exceptions.BadRequest, + reason="Expected failure due to timeout being set too short.", + ), + ), + pytest.param( + {"query": {"useQueryCache": False, "maximumBytesBilled": "100"}}, + marks=pytest.mark.xfail( + raises=google.api_core.exceptions.InternalServerError, + reason="Expected failure when the query exceeds the maximum bytes billed limit.", + ), + ), + ], +) +def test_read_gbq_with_configuration( + session: bigframes.Session, scalars_table_id: str, config: dict +): + query = f"""SELECT + t.float64_col * 2 AS my_floats, + CONCAT(t.string_col, "_2") AS my_strings, + t.int64_col > 0 AS my_bools, + FROM `{scalars_table_id}` AS t + """ + + df = session.read_gbq(query, configuration=config) + + assert df.shape == (9, 3) + + def test_read_gbq_model(session, penguins_linear_model_name): model = session.read_gbq_model(penguins_linear_model_name) assert isinstance(model, bigframes.ml.linear_model.LinearRegression) diff --git a/third_party/bigframes_vendored/pandas/io/gbq.py b/third_party/bigframes_vendored/pandas/io/gbq.py index 74602b5af1..b5feeb13c5 100644 --- a/third_party/bigframes_vendored/pandas/io/gbq.py +++ b/third_party/bigframes_vendored/pandas/io/gbq.py @@ -3,7 +3,7 @@ from __future__ import annotations -from typing import Any, Iterable, Literal, Optional, Tuple, Union +from typing import Any, Dict, Iterable, Literal, Optional, Tuple, Union from bigframes import constants @@ -19,9 +19,10 @@ def read_gbq( *, index_col: Iterable[str] | str = (), columns: Iterable[str] = (), + configuration: Optional[Dict] = None, max_results: Optional[int] = None, filters: FiltersType = (), - use_cache: bool = True, + use_cache: Optional[bool] = None, col_order: Iterable[str] = (), ): """Loads a DataFrame from BigQuery. @@ -107,6 +108,11 @@ def read_gbq( columns (Iterable[str]): List of BigQuery column names in the desired order for results DataFrame. + configuration (dict, optional): + Query config parameters for job processing. + For example: configuration = {'query': {'useQueryCache': False}}. + For more information see `BigQuery REST API Reference + `__. max_results (Optional[int], default None): If set, limit the maximum number of rows to fetch from the query results. @@ -121,8 +127,10 @@ def read_gbq( If using wildcard table suffix in query_or_table, can specify '_table_suffix' pseudo column to filter the tables to be read into the DataFrame. - use_cache (bool, default True): - Whether to cache the query inputs. Default to True. + use_cache (Optional[bool], default None): + Caches query results if set to `True`. When `None`, it behaves + as `True`, but should not be combined with `useQueryCache` in + `configuration` to avoid conflicts. col_order (Iterable[str]): Alias for columns, retained for backwards compatibility.