Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: make the Series.apply outcome assignable to the original dataframe in partial ordering mode #874

Merged
merged 16 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions bigframes/functions/_remote_function_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from typing import Any, cast, Dict, Mapping, Optional, Sequence, TYPE_CHECKING, Union
import warnings

import cloudpickle
import google.api_core.exceptions
from google.cloud import (
bigquery,
Expand Down Expand Up @@ -458,6 +459,11 @@ def wrapper(func):
session=session, # type: ignore
)

# To respect the user code/environment let's use a copy of the
# original udf, especially since we would be setting some properties
# on it
func = cloudpickle.loads(cloudpickle.dumps(func))

# In the unlikely case where the user is trying to re-deploy the same
# function, cleanup the attributes we add below, first. This prevents
# the pickle from having dependencies that might not otherwise be
Expand Down Expand Up @@ -499,6 +505,18 @@ def try_delattr(attr):
cloud_function_memory_mib=cloud_function_memory_mib,
)

# TODO(shobs): Find a better way to support udfs with param named "name".
# This causes an issue in the ibis compilation.
func.__signature__ = inspect.signature(func).replace( # type: ignore
parameters=[
inspect.Parameter(
f"bigframes_{param.name}",
param.kind,
)
for param in inspect.signature(func).parameters.values()
]
)

# TODO: Move ibis logic to compiler step
node = ibis.udf.scalar.builtin(
func,
Expand Down
11 changes: 8 additions & 3 deletions bigframes/functions/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,16 +144,21 @@ def read_gbq_function(

# The name "args" conflicts with the Ibis operator, so we use
# non-standard names for the arguments here.
def func(*ignored_args, **ignored_kwargs):
def func(*bigframes_args, **bigframes_kwargs):
f"""Remote function {str(routine_ref)}."""
nonlocal node # type: ignore

expr = node(*ignored_args, **ignored_kwargs) # type: ignore
expr = node(*bigframes_args, **bigframes_kwargs) # type: ignore
return ibis_client.execute(expr)

func.__signature__ = inspect.signature(func).replace( # type: ignore
parameters=[
inspect.Parameter(name, inspect.Parameter.POSITIONAL_OR_KEYWORD)
# TODO(shobs): Find a better way to support functions with param
# named "name". This causes an issue in the ibis compilation.
inspect.Parameter(
f"bigframes_{name}",
inspect.Parameter.POSITIONAL_OR_KEYWORD,
)
for name in ibis_signature.parameter_names
]
)
Expand Down
8 changes: 2 additions & 6 deletions bigframes/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1481,12 +1481,8 @@ def apply(
ex.message += f"\n{_remote_function_recommendation_message}"
raise

# We are working with remote function at this point.
# Reproject as workaround to applying filter too late. This forces the
# filter to be applied before passing data to remote function,
# protecting from bad inputs causing errors.
reprojected_series = Series(self._block._force_reproject())
result_series = reprojected_series._apply_unary_op(
# We are working with remote function at this point
result_series = self._apply_unary_op(
ops.RemoteFunctionOp(func=func, apply_on_null=True)
)

Expand Down
110 changes: 110 additions & 0 deletions tests/system/small/test_remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect
import re

import google.api_core.exceptions
Expand Down Expand Up @@ -972,3 +973,112 @@ def echo_len(row):
bigframes.exceptions.PreviewWarning, match="axis=1 scenario is in preview."
):
scalars_df[[column]].apply(echo_len_remote, axis=1)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_application_repr(session, dataset_id_permanent):
# This function deliberately has a param with name "name", this is to test
# a specific ibis' internal handling of object names
def should_mask(name: str) -> bool:
hash = 0
for char_ in name:
hash += ord(char_)
return hash % 2 == 0

assert "name" in inspect.signature(should_mask).parameters

should_mask = session.remote_function(
dataset=dataset_id_permanent, name=get_rf_name(should_mask)
)(should_mask)

s = bigframes.series.Series(["Alice", "Bob", "Caroline"])

repr(s.apply(should_mask))
repr(s.where(s.apply(should_mask)))
repr(s.where(~s.apply(should_mask)))
repr(s.mask(should_mask))
repr(s.mask(should_mask, "REDACTED"))


@pytest.mark.flaky(retries=2, delay=120)
def test_read_gbq_function_application_repr(session, dataset_id, scalars_df_index):
gbq_function = f"{dataset_id}.should_mask"

# This function deliberately has a param with name "name", this is to test
# a specific ibis' internal handling of object names
session.bqclient.query_and_wait(
f"CREATE OR REPLACE FUNCTION `{gbq_function}`(name STRING) RETURNS BOOL AS (MOD(LENGTH(name), 2) = 1)"
)
routine = session.bqclient.get_routine(gbq_function)
assert "name" in [arg.name for arg in routine.arguments]

# read the function and apply to dataframe
should_mask = session.read_gbq_function(gbq_function)

s = scalars_df_index["string_col"]

repr(s.apply(should_mask))
repr(s.where(s.apply(should_mask)))
repr(s.where(~s.apply(should_mask)))
repr(s.mask(should_mask))
repr(s.mask(should_mask, "REDACTED"))


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_apply_after_filter(session, dataset_id_permanent, scalars_dfs):

# This function is deliberately written to not work with NA input
def plus_one(x: int) -> int:
return x + 1

scalars_df, scalars_pandas_df = scalars_dfs
int_col_name_with_nulls = "int64_col"

# make sure there are NA values in the test column
assert any([pd.isna(val) for val in scalars_df[int_col_name_with_nulls]])

# create a remote function
plus_one_remote = session.remote_function(
dataset=dataset_id_permanent, name=get_rf_name(plus_one)
)(plus_one)

# with nulls in the series the remote function application would fail
with pytest.raises(
google.api_core.exceptions.BadRequest, match="unsupported operand"
):
scalars_df[int_col_name_with_nulls].apply(plus_one_remote).to_pandas()

# after filtering out nulls the remote function application should works
# similar to pandas
pd_result = scalars_pandas_df[scalars_pandas_df[int_col_name_with_nulls].notnull()][
int_col_name_with_nulls
].apply(plus_one)
bf_result = (
scalars_df[scalars_df[int_col_name_with_nulls].notnull()][
int_col_name_with_nulls
]
.apply(plus_one_remote)
.to_pandas()
)

# ignore pandas "int64" vs bigframes "Int64" dtype difference
pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)


@pytest.mark.flaky(retries=2, delay=120)
def test_remote_function_apply_assign_partial_ordering_mode(dataset_id_permanent):
session = bigframes.Session(bigframes.BigQueryOptions(ordering_mode="partial"))

df = session.read_gbq("bigquery-public-data.baseball.schedules")[
["duration_minutes"]
]

def plus_one(x: int) -> int:
return x + 1

plus_one = session.remote_function(
dataset=dataset_id_permanent, name=get_rf_name(plus_one)
)(plus_one)

df1 = df.assign(duration_cat=df["duration_minutes"].apply(plus_one))
repr(df1)