Skip to content

Commit

Permalink
fix: use anonymous dataset to create remote_function (#205)
Browse files Browse the repository at this point in the history
* fix: use anonymous dataset to create `remote_function`

* update README about anonymous dataset instead of bigframes_temp_location

* remove dataset creation step from remote function

This is because now the dataset is an anonymous dataset that must have
been created previously as part of bigframes session creation.

* restore create_dataset, guarded by get_dataset
  • Loading branch information
shobsi authored Nov 28, 2023
1 parent 9d6613d commit 69b016e
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 51 deletions.
7 changes: 3 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,9 @@ definition. To view and manage connections, do the following:
3. In the Explorer pane, expand that project and then expand External connections.

BigQuery remote functions are created in the dataset you specify, or
in a dataset with the name ``bigframes_temp_location``, where location is
the location used by the BigQuery DataFrames session. For example,
``bigframes_temp_us_central1``. To view and manage remote functions, do
the following:
in a special type of `hidden dataset <https://cloud.google.com/bigquery/docs/datasets#hidden_datasets>`__
referred to as an anonymous dataset. To view and manage remote functions created
in a user provided dataset, do the following:

1. Go to `BigQuery in the Google Cloud Console <https://console.cloud.google.com/bigquery>`__.
2. Select the project in which you created the remote function.
Expand Down
19 changes: 13 additions & 6 deletions bigframes/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ def create_bq_remote_function(
# https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2
bq_function_args = []
bq_function_return_type = BigQueryType.from_ibis(output_type)

# We are expecting the input type annotations to be 1:1 with the input args
for idx, name in enumerate(input_args):
bq_function_args.append(
Expand All @@ -204,14 +205,22 @@ def create_bq_remote_function(

logger.info(f"Creating BQ remote function: {create_function_ddl}")

# Make sure the dataset exists
# Make sure the dataset exists. I.e. if it doesn't exist, go ahead and
# create it
dataset = bigquery.Dataset(
bigquery.DatasetReference.from_string(
self._bq_dataset, default_project=self._gcp_project_id
)
)
dataset.location = self._bq_location
self._bq_client.create_dataset(dataset, exists_ok=True)
try:
# This check does not require bigquery.datasets.create IAM
# permission. So, if the data set already exists, then user can work
# without having that permission.
self._bq_client.get_dataset(dataset)
except google.api_core.exceptions.NotFound:
# This requires bigquery.datasets.create IAM permission
self._bq_client.create_dataset(dataset, exists_ok=True)

# TODO: Use session._start_query() so we get progress bar
query_job = self._bq_client.query(create_function_ddl) # Make an API request.
Expand Down Expand Up @@ -610,7 +619,7 @@ def get_routine_reference(
raise DatasetMissingError

dataset_ref = bigquery.DatasetReference(
bigquery_client.project, session._session_dataset_id
bigquery_client.project, session._anonymous_dataset.dataset_id
)
return dataset_ref.routine(routine_ref_str)

Expand Down Expand Up @@ -778,9 +787,7 @@ def remote_function(
dataset, default_project=bigquery_client.project
)
else:
dataset_ref = bigquery.DatasetReference.from_string(
session._session_dataset_id, default_project=bigquery_client.project
)
dataset_ref = session._anonymous_dataset

bq_location, cloud_function_region = get_remote_function_locations(
bigquery_client.location
Expand Down
14 changes: 0 additions & 14 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,6 @@ def cloudfunctionsclient(self):
def resourcemanagerclient(self):
return self._clients_provider.resourcemanagerclient

@property
def _session_dataset_id(self):
"""A dataset for storing temporary objects local to the session
This is a workaround for remote functions that do not
yet support session-temporary instances."""
return self._session_dataset.dataset_id

@property
def _project(self):
return self.bqclient.project
Expand All @@ -229,13 +222,6 @@ def _create_bq_datasets(self):
query_destination.dataset_id,
)

# Dataset for storing remote functions, which don't yet
# support proper session temporary storage yet
self._session_dataset = bigquery.Dataset(
f"{self.bqclient.project}.bigframes_temp_{self._location.lower().replace('-', '_')}"
)
self._session_dataset.location = self._location

def close(self):
"""No-op. Temporary resources are deleted after 7 days."""

Expand Down
47 changes: 46 additions & 1 deletion tests/system/large/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import textwrap

from google.api_core.exceptions import NotFound, ResourceExhausted
from google.cloud import functions_v2
from google.cloud import bigquery, functions_v2
import pandas
import pytest
import test_utils.prefixer
Expand Down Expand Up @@ -1210,3 +1210,48 @@ def square(x):
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square
)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_anonymous_dataset(session, scalars_dfs):
try:
# This usage of remote_function is expected to create the remote
# function in the bigframes session's anonymous dataset. Use reuse=False
# param to make sure parallel instances of the test don't step over each
# other due to the common anonymous dataset.
@session.remote_function([int], int, reuse=False)
def square(x):
return x * x

assert (
bigquery.Routine(square.bigframes_remote_function).dataset_id
== session._anonymous_dataset.dataset_id
)

scalars_df, scalars_pandas_df = scalars_dfs

bf_int64_col = scalars_df["int64_col"]
bf_int64_col_filter = bf_int64_col.notnull()
bf_int64_col_filtered = bf_int64_col[bf_int64_col_filter]
bf_result_col = bf_int64_col_filtered.apply(square)
bf_result = (
bf_int64_col_filtered.to_frame().assign(result=bf_result_col).to_pandas()
)

pd_int64_col = scalars_pandas_df["int64_col"]
pd_int64_col_filter = pd_int64_col.notnull()
pd_int64_col_filtered = pd_int64_col[pd_int64_col_filter]
pd_result_col = pd_int64_col_filtered.apply(lambda x: x * x)
# TODO(shobs): Figure why pandas .apply() changes the dtype, i.e.
# pd_int64_col_filtered.dtype is Int64Dtype()
# pd_int64_col_filtered.apply(lambda x: x * x).dtype is int64.
# For this test let's force the pandas dtype to be same as bigframes' dtype.
pd_result_col = pd_result_col.astype(pandas.Int64Dtype())
pd_result = pd_int64_col_filtered.to_frame().assign(result=pd_result_col)

assert_pandas_df_equal(bf_result, pd_result)
finally:
# clean up the gcp assets created for the remote function
cleanup_remote_function_assets(
session.bqclient, session.cloudfunctionsclient, square
)
37 changes: 11 additions & 26 deletions tests/system/small/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,12 @@ def bq_cf_connection_location_project_mismatched() -> str:


@pytest.fixture(scope="module")
def session_with_bq_connection_and_permanent_dataset(
def session_with_bq_connection(
bq_cf_connection, dataset_id_permanent
) -> bigframes.Session:
session = bigframes.Session(
bigframes.BigQueryOptions(bq_connection=bq_cf_connection)
)
session._session_dataset = bigquery.Dataset(dataset_id_permanent)
return session


Expand Down Expand Up @@ -277,13 +276,11 @@ def square(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_direct_session_param(
session_with_bq_connection_and_permanent_dataset, scalars_dfs
):
def test_remote_function_direct_session_param(session_with_bq_connection, scalars_dfs):
@rf.remote_function(
[int],
int,
session=session_with_bq_connection_and_permanent_dataset,
session=session_with_bq_connection,
)
def square(x):
return x * x
Expand Down Expand Up @@ -313,17 +310,15 @@ def square(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_via_session_default(
session_with_bq_connection_and_permanent_dataset, scalars_dfs
):
def test_remote_function_via_session_default(session_with_bq_connection, scalars_dfs):
# Session has bigquery connection initialized via context. Without an
# explicit dataset the default dataset from the session would be used.
# Without an explicit bigquery connection, the one present in Session set
# through the explicit BigQueryOptions would be used. Without an explicit `reuse`
# the default behavior of reuse=True will take effect. Please note that the
# udf is same as the one used in other tests in this file so the underlying
# cloud function would be common and quickly reused.
@session_with_bq_connection_and_permanent_dataset.remote_function([int], int)
@session_with_bq_connection.remote_function([int], int)
def square(x):
return x * x

Expand Down Expand Up @@ -391,15 +386,11 @@ def square(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_dataframe_applymap(
session_with_bq_connection_and_permanent_dataset, scalars_dfs
):
def test_dataframe_applymap(session_with_bq_connection, scalars_dfs):
def add_one(x):
return x + 1

remote_add_one = session_with_bq_connection_and_permanent_dataset.remote_function(
[int], int
)(add_one)
remote_add_one = session_with_bq_connection.remote_function([int], int)(add_one)

scalars_df, scalars_pandas_df = scalars_dfs
int64_cols = ["int64_col", "int64_too"]
Expand All @@ -422,15 +413,11 @@ def add_one(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_dataframe_applymap_na_ignore(
session_with_bq_connection_and_permanent_dataset, scalars_dfs
):
def test_dataframe_applymap_na_ignore(session_with_bq_connection, scalars_dfs):
def add_one(x):
return x + 1

remote_add_one = session_with_bq_connection_and_permanent_dataset.remote_function(
[int], int
)(add_one)
remote_add_one = session_with_bq_connection.remote_function([int], int)(add_one)

scalars_df, scalars_pandas_df = scalars_dfs
int64_cols = ["int64_col", "int64_too"]
Expand All @@ -451,13 +438,11 @@ def add_one(x):


@pytest.mark.flaky(retries=2, delay=120)
def test_series_map(session_with_bq_connection_and_permanent_dataset, scalars_dfs):
def test_series_map(session_with_bq_connection, scalars_dfs):
def add_one(x):
return x + 1

remote_add_one = session_with_bq_connection_and_permanent_dataset.remote_function(
[int], int
)(add_one)
remote_add_one = session_with_bq_connection.remote_function([int], int)(add_one)

scalars_df, scalars_pandas_df = scalars_dfs

Expand Down

0 comments on commit 69b016e

Please sign in to comment.