Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to call UDF-based LevelComparer within pyspark UDF #64

Closed
jstammers opened this issue Sep 30, 2024 · 7 comments
Closed

Unable to call UDF-based LevelComparer within pyspark UDF #64

jstammers opened this issue Sep 30, 2024 · 7 comments

Comments

@jstammers
Copy link
Contributor

I've encountered an issue that is preventing me from using a LevelComparer that has been logged using mlflow and loaded as a spark UDF.

Rougly speaking, here's what I'm attempting to do.

from mlflow.pyfunc import PythonModel

def prepare(table: ir.Table) -> ir.Table: #prepare the data
def compare(table: ir.Table) -> ir.Table:
    # calls pre-defined function decorated with ibis.udf.scalar.python

def block(left: ir.Table, right: ir.Table) -> ir.Table # blocks records together


class DeduperModel(PythonModel):
   def __init___(self, weights):
        self._weights = weights

    def predict(self, context, data):
        table = con.create_table('table', data, overwrite=True)
        prepared = prepare(table)
        blocked = block(table, table)
        compared = compare(blocked)
        scored = self._weights.score_compared(compared)
        return scored.select("odds").to_pandas()


# train and log model

with mlflow.start_run():
    weights  = fs.train_using_labels(left, right)
    model = DeduperModel(weights=weights)
    mlflow.pyfunc.log_model("model", python_model=mlflow_model)
    model_uri = mlflow.get_artifact_uri("model")

# load model for prediction

df = spark.createDataFrame(...)
loaded_model = mlflow.pyfunc.spark(spark, model_uri)
pred = df.withColumn("prediction", struct(*map(df.columns))
pred.show() # compare throws AttributeError

After some investigation, I think this is related to how the predict method is registered as a UDF when loading it using mlflow.pyfunc.spark. I'm able to call this correctly when using mlflow.pyfunc.load_model, but I have other dependencies that require the use of the spark UDF API.

I have an example here that replicates the error I see that doesn't require mlflow and should hopefully be easier to test

import pandas as pd
import ibis
from ibis import _
from mismo.compare import LevelComparer, MatchLevel
from pyspark.sql import functions as F

@ibis.udf.scalar.python
def ratio(x: str, y: str) -> float:
    return (len(x) + len(y)) / 100


class NameLevel(MatchLevel):
    CLOSE = 1
    NOT_CLOSE = 2


comparer = LevelComparer(
    name="Name",
    cases=[
        (
            ratio(_.name_l, _.name_r) > 0.9,
            NameLevel.CLOSE,
        ),
        (True, NameLevel.NOT_CLOSE),
    ],
    levels=NameLevel,
)

#register a pandas-compatible UDF using pyspark
@F.pandas_udf(returnType="int")
def comparer_udf(df):
    con = ibis.get_backend()
    table = con.create_table("df", df, overwrite=True)
    compared = comparer(table)
    return compared.select("Name").execute()["Name"]

df = spark.createDataFrame(
    pd.DataFrame(
        {
            "record_id_l": (1, 1, 1, 1),
            "record_id_r": (1, 2, 3, 4),
            "name_l": ("foo", "foo", "foo", "foo"),
            "name_r": ("foo", "fop", "bar", "boz"),
        }
    )
)

con = ibis.get_backend()
comparer(ibis.memtable(df.toPandas())).execute()

con_db = ibis.pyspark.connect(spark)
table_s = con_db.create_table(
    "df",
    df.toPandas(),
    overwrite=True,
    format="delta",
)
comparer(table_s).execute() #works as expected


#replicate mlflow's method
df.withColumn("prediction", comparer_udf(F.struct(*map(F.col, df.columns)))).toPandas() # raises AttributeError

Which gives the following traceback (I've omitted the Java components for brevity)

org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1062.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1062.0 (TID 3459) (172.30.80.10 executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/root/.ipykernel/69656/command-2127072225484968-1760691286", line 7, in comparer_udf
  File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/mismo/compare/_match_level.py", line 378, in __call__
    cases = [(pairs.bind(c)[0], level) for c, level in self.cases]
  File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/mismo/compare/_match_level.py", line 378, in <listcomp>
    cases = [(pairs.bind(c)[0], level) for c, level in self.cases]
  File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/expr/types/relations.py", line 279, in bind
    values = self._fast_bind(*args, **kwargs)
  File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/expr/types/relations.py", line 246, in _fast_bind
    values.extend(bind(self, arg))
  File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/expr/types/relations.py", line 110, in bind
    yield value.resolve(table)
  File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/common/deferred.py", line 91, in resolve
    return self._resolver.resolve(context)
  File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/common/deferred.py", line 481, in resolve
    left = self.left.resolve(context)
  File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/common/deferred.py", line 407, in resolve
    return func(*args, **kwargs)
  File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/expr/operations/udf.py", line 165, in construct
    return node(*args, **kwargs).to_expr()
  File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/common/bases.py", line 72, in __call__
    return cls.__create__(*args, **kwargs)
  File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/common/grounds.py", line 120, in __create__
    return super().__create__(**kwargs)
  File "/local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/ibis/common/grounds.py", line 199, in __init__
    object.__setattr__(self, name, value)
AttributeError: 'ratio_3' object has no attribute 'x'

I don't see this same behaviour when using ibis-native comparison functions, which makes me suspect it's caused by the python UDF.

I may be able to work-around this specific error by converting my python UDFs into pyarrow ones. If so, I'll post an update here

@jstammers
Copy link
Contributor Author

Looking into this further, it seems to be related to how the LevelComparer is being saved and loaded.
I have a class that looks as follows

class Deduper:
    def __init__(self, ...):
        self.comparer = LevelComparer(....)
    def compare(self, table):
        return self.comparer(table)
    

Which fails with the same AttributeError

I can get this to work if I create a new instance of Deduper, e.g.

    @classmethod
    def from_config(cls, conf):
        return cls(**conf)

@NickCrews
Copy link
Owner

NickCrews commented Oct 1, 2024

I don't have any experience with spark so I'm mostly guessing here:

I bet each worker is in a different python process (correct me if I'm wrong). On script startup, the globalratio UDF is getting registered on the default backend. Then, inside the UDF, when you call ibis.get_backend(), you are getting a different backend instance on every node, and the UDF isn't registered correctly with that node.

Can you make it so that the construction of your comparer and UDF happens INSIDE each UDF, so you are sure that every node is self-sufficient? Something like

# maybe put this inside the spark UDF too?
# But since this doesn't do any registration with
# ibis, I don't think this would be needed.
class NameLevel(MatchLevel):
    CLOSE = 1
    NOT_CLOSE = 2

def make_comparer():
    @ibis.udf.scalar.python
    def ratio(x: str, y: str) -> float:
        return (len(x) + len(y)) / 100

    return LevelComparer(
        name="Name",
        cases=[
            (
                ratio(_.name_l, _.name_r) > 0.9,
                NameLevel.CLOSE,
            ),
            (True, NameLevel.NOT_CLOSE),
        ],
        levels=NameLevel,
    )

#register a pandas-compatible UDF using pyspark
@F.pandas_udf(returnType="int")
def comparer_udf(df):
    con = ibis.get_backend()
    table = con.create_table("df", df, overwrite=True)
    compared = make_comparer()(table)
    return compared.select("Name").execute()["Name"]

@NickCrews NickCrews reopened this Oct 1, 2024
@NickCrews
Copy link
Owner

Also, just to be sure, your python UDF is easy to implement in pure ibis? But I'm guessing you just wanted a toy example.

@ibis.udf.scalar.python
def ratio(x: str, y: str) -> float:
    return (len(x) + len(y)) / 100

def ratio(x: ir.StringValue, y: ir.StringValue) -> ir.FloatingValue:
    return (x.length() + y.length()) / 100

@jstammers
Copy link
Contributor Author

Your implementation worked exactly as intended, so it must be related to the fact that it's not being registered correctly.
I'll have a think about the best way to handle this , as re-instantiating the Deduper object doesn't feel like the best approach

Also, just to be sure, your python UDF is easy to implement in pure ibis? But I'm guessing you just wanted a toy example.

Yes that's right. My actual UDF uses some functions from rapidfuzz. I could re-visit the ibis versions I was implementing in #51 if this demonstrates a use-case for them

@NickCrews
Copy link
Owner

NickCrews commented Oct 1, 2024

@cpcloud, could we make it so that @jstammers original workflow works?

Perhaps: when a user does @ibis.udf.<something>, the function isn't immediately registered with the default backend, but is instead added to a pool of ibis-module-wide UDFs, and then when a computation happens, the backend looks into this pool and registers any UDFs it has not yet?

@NickCrews
Copy link
Owner

re-instantiating the Deduper object doesn't feel like the best approach

II think you could just re-create the comparer every time the .compare method is called?

class Deduper:
    def compare(self, table):
        comparer = make_comparer()
        return comparer(table)

@NickCrews
Copy link
Owner

@jstammers I filed the above bug with ibis, I don't think this is anything that mismo can address, so closing this. LEt me know if there is still something pending.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants