Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for BigQuery DataFrames. Set context.engine to 'bigframes' to support query results larger than 10 GB #58

Merged
merged 18 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 44 additions & 11 deletions bigquery_magics/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@
from IPython import display # type: ignore
from IPython.core import magic_arguments # type: ignore
from IPython.core.getipython import get_ipython
import bigframes.pandas as bpd
from bigframes.pandas import options as bf_options
from google.api_core import client_info
from google.api_core.exceptions import NotFound
from google.cloud import bigquery
Expand Down Expand Up @@ -255,6 +257,7 @@ def _create_dataset_if_necessary(client, dataset_id):
help=(
"Sets query to be a dry run to estimate costs. "
"Defaults to executing the query instead of dry run if this argument is not used."
"Does not work with engine 'bigframes'. "
),
)
@magic_arguments.argument(
Expand Down Expand Up @@ -319,6 +322,7 @@ def _create_dataset_if_necessary(client, dataset_id):
"amount of time for the query to finish. By default, this "
"information will be displayed as the query runs, but will be "
"cleared after the query is finished."
"This flag is ignored when the engine is 'bigframes'."
),
)
@magic_arguments.argument(
Expand Down Expand Up @@ -350,6 +354,7 @@ def _create_dataset_if_necessary(client, dataset_id):
help=(
"Set the location to execute query."
"Defaults to location set in query setting in console."
"This flag is ignored when the engine is 'bigframes'."
),
)
def _cell_magic(line, query):
Expand All @@ -376,18 +381,10 @@ def _cell_magic(line, query):
return
query = _validate_and_resolve_query(query, args)

bq_client, bqstorage_client = _create_clients(args)
if context.engine == "bigframes":
return _query_with_bigframes(query, params, args)

try:
return _make_bq_query(
query,
args=args,
params=params,
bq_client=bq_client,
bqstorage_client=bqstorage_client,
)
finally:
_close_transports(bq_client, bqstorage_client)
return _query_with_pandas(query, params, args)


def _parse_magic_args(line: str) -> Tuple[List[Any], Any]:
Expand Down Expand Up @@ -444,6 +441,42 @@ def _split_args_line(line: str) -> Tuple[str, str]:
return params_option_value, rest_of_args


def _query_with_bigframes(query: str, params: List[Any], args: Any):
if args.dry_run:
raise ValueError("Dry run is not supported by bigframes engine.")

bf_options.bigquery.project = context.project
bf_options.bigquery.credentials = context.credentials

max_results = int(args.max_results) if args.max_results else None

result = bpd.read_gbq_query(
query,
max_results=max_results,
configuration=_create_job_config(args, params).to_api_repr(),
)

if args.destination_var:
get_ipython().push({args.destination_var: result})
else:
return result


def _query_with_pandas(query: str, params: List[Any], args: Any):
bq_client, bqstorage_client = _create_clients(args)

try:
return _make_bq_query(
query,
args=args,
params=params,
bq_client=bq_client,
bqstorage_client=bqstorage_client,
)
finally:
_close_transports(bq_client, bqstorage_client)


def _create_clients(args: Any) -> Tuple[bigquery.Client, Any]:
bigquery_client_options = copy.deepcopy(context.bigquery_client_options)
if args.bigquery_api_endpoint:
Expand Down
29 changes: 27 additions & 2 deletions bigquery_magics/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ class Context(object):
and can be found at ``bigquery_magics.context``.
"""

_credentials = None
_project = None
_connection = None

default_query_job_config = bigquery.QueryJobConfig()
Expand Down Expand Up @@ -103,6 +101,8 @@ class Context(object):
>>> bigquery_magics.context.progress_bar_type = "tqdm_notebook"
"""

_credentials = None

@property
def credentials(self):
"""google.auth.credentials.Credentials: Credentials to use for queries
Expand Down Expand Up @@ -138,6 +138,8 @@ def credentials(self):
def credentials(self, value):
self._credentials = value

_project = None

@property
def project(self):
"""str: Default project to use for queries performed through IPython
Expand All @@ -163,5 +165,28 @@ def project(self):
def project(self, value):
self._project = value

_engine = "pandas"

@property
def engine(self) -> str:
"""Engine to run the query. Could either be "pandas" or "bigframes".

If using "pandas", the query result will be stored in a Pandas dataframe.
If using "bigframes", the query result will be stored in a bigframes dataframe instead.

Example:
Manully setting the content engine:

>>> from google.cloud.bigquery import magics
>>> bigquery_magics.context.engine = 'bigframes'
"""
return self._engine

@engine.setter
def engine(self, value):
if value != "pandas" and value != "bigframes":
raise ValueError("engine must be either 'pandas' or 'bigframes'")
self._engine = value


context = Context()
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
# 'Development Status :: 5 - Production/Stable'``
release_status = "Development Status :: 4 - Beta"
dependencies = [
"bigframes >= 1.17.0",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't.

In the design I shared that this should be an optional dependency. That means it should be an "extra".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Moved it to extra.

"db-dtypes>=0.3.0,<2.0.0dev",
"google-cloud-bigquery >= 3.13.0, <4.0.0dev",
"ipywidgets>=7.7.1",
Expand Down
104 changes: 104 additions & 0 deletions tests/unit/test_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import IPython.terminal.interactiveshell as interactiveshell
import IPython.testing.tools as tools
import IPython.utils.io as io
from bigframes.pandas import options as bf_options
from google.api_core import exceptions
import google.auth.credentials
from google.cloud import bigquery
Expand Down Expand Up @@ -121,6 +122,11 @@ def mock_credentials(monkeypatch):
monkeypatch.setattr(bigquery_magics.context, "_credentials", credentials)


@pytest.fixture
def bigframes_engine(monkeypatch):
monkeypatch.setattr(bigquery_magics.context, "engine", "bigframes")


PROJECT_ID = "its-a-project-eh"
JOB_ID = "some-random-id"
JOB_REFERENCE_RESOURCE = {"projectId": PROJECT_ID, "jobId": JOB_ID}
Expand Down Expand Up @@ -1884,3 +1890,101 @@ def test_bigquery_magic_with_location():

client_options_used = run_query_mock.call_args_list[0][0][0]
assert client_options_used.location == "us-east1"


@pytest.mark.usefixtures("ipython_interactive", "mock_credentials", "bigframes_engine")
def test_big_query_magic_bigframes():
ip = IPython.get_ipython()
ip.extension_manager.load_extension("bigquery_magics")
sql = "SELECT 0 AS something"
expected_configuration = {
"query": {"queryParameters": [], "useLegacySql": False},
"dryRun": False,
}
bf_patch = mock.patch("bigframes.pandas.read_gbq_query", autospec=True)

with bf_patch as bf_mock:
ip.run_cell_magic("bigquery", "", sql)

bf_mock.assert_called_once_with(
sql, max_results=None, configuration=expected_configuration
)
assert bf_options.bigquery.credentials is bigquery_magics.context.credentials
assert bf_options.bigquery.project == bigquery_magics.context.project


@pytest.mark.usefixtures("ipython_interactive", "mock_credentials", "bigframes_engine")
def test_big_query_magic_bigframes_with_params():
ip = IPython.get_ipython()
ip.extension_manager.load_extension("bigquery_magics")
sql = "SELECT 0 AS @p"
expected_configuration = {
"query": {
"queryParameters": [
{
"name": "p",
"parameterType": {"type": "STRING"},
"parameterValue": {"value": "num"},
},
],
"useLegacySql": False,
"parameterMode": "NAMED",
},
"dryRun": False,
}
bf_patch = mock.patch("bigframes.pandas.read_gbq_query", autospec=True)

with bf_patch as bf_mock:
ip.run_cell_magic("bigquery", '--params {"p":"num"}', sql)

bf_mock.assert_called_once_with(
sql, max_results=None, configuration=expected_configuration
)


@pytest.mark.usefixtures("ipython_interactive", "mock_credentials", "bigframes_engine")
def test_big_query_magic_bigframes_with_max_results():
ip = IPython.get_ipython()
ip.extension_manager.load_extension("bigquery_magics")
sql = "SELECT 0 AS something"
expected_configuration = {
"query": {"queryParameters": [], "useLegacySql": False},
"dryRun": False,
}
bf_patch = mock.patch("bigframes.pandas.read_gbq_query", autospec=True)

with bf_patch as bf_mock:
ip.run_cell_magic("bigquery", "--max_results 10", sql)

bf_mock.assert_called_once_with(
sql, max_results=10, configuration=expected_configuration
)


@pytest.mark.usefixtures("ipython_interactive", "mock_credentials", "bigframes_engine")
def test_big_query_magic_bigframes_with_destination_var(ipython_ns_cleanup):
ip = IPython.get_ipython()
ip.extension_manager.load_extension("bigquery_magics")
sql = "SELECT 0 AS something"

bf_patch = mock.patch("bigframes.pandas.read_gbq_query", autospec=True)
ipython_ns_cleanup.append((ip, "df"))

with bf_patch as bf_mock:
ip.run_cell_magic("bigquery", "df", sql)

assert "df" in ip.user_ns
df = ip.user_ns["df"]
assert df is bf_mock.return_value


@pytest.mark.usefixtures("ipython_interactive", "mock_credentials", "bigframes_engine")
def test_big_query_magic_bigframes_with_dry_run__should_fail():
ip = IPython.get_ipython()
ip.extension_manager.load_extension("bigquery_magics")
sql = "SELECT 0 AS @p"

bf_patch = mock.patch("bigframes.pandas.read_gbq_query", autospec=True)

with bf_patch, pytest.raises(ValueError):
ip.run_cell_magic("bigquery", "--dry_run", sql)
13 changes: 13 additions & 0 deletions tests/unit/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import google.auth.credentials
import pydata_google_auth
import pytest

import bigquery_magics

Expand Down Expand Up @@ -64,3 +65,15 @@ def test_context_credentials_and_project_can_be_set_explicitly():
assert bigquery_magics.context.credentials is credentials_mock
# default should not be called if credentials & project are explicitly set
assert default_mock.call_count == 0


@pytest.mark.parametrize("engine", ["pandas", "bigframes"])
def test_context_set_engine(engine):
bigquery_magics.context.engine = engine

assert bigquery_magics.context.engine == engine


def test_context_set_invalid_engine():
with pytest.raises(ValueError):
bigquery_magics.context.engine = "whatever"
Loading