-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unable to call UDF-based LevelComparer within pyspark UDF #64
Comments
Looking into this further, it seems to be related to how the class Deduper:
def __init__(self, ...):
self.comparer = LevelComparer(....)
def compare(self, table):
return self.comparer(table)
Which fails with the same I can get this to work if I create a new instance of @classmethod
def from_config(cls, conf):
return cls(**conf) |
I don't have any experience with spark so I'm mostly guessing here: I bet each worker is in a different python process (correct me if I'm wrong). On script startup, the global Can you make it so that the construction of your comparer and UDF happens INSIDE each UDF, so you are sure that every node is self-sufficient? Something like # maybe put this inside the spark UDF too?
# But since this doesn't do any registration with
# ibis, I don't think this would be needed.
class NameLevel(MatchLevel):
CLOSE = 1
NOT_CLOSE = 2
def make_comparer():
@ibis.udf.scalar.python
def ratio(x: str, y: str) -> float:
return (len(x) + len(y)) / 100
return LevelComparer(
name="Name",
cases=[
(
ratio(_.name_l, _.name_r) > 0.9,
NameLevel.CLOSE,
),
(True, NameLevel.NOT_CLOSE),
],
levels=NameLevel,
)
#register a pandas-compatible UDF using pyspark
@F.pandas_udf(returnType="int")
def comparer_udf(df):
con = ibis.get_backend()
table = con.create_table("df", df, overwrite=True)
compared = make_comparer()(table)
return compared.select("Name").execute()["Name"] |
Also, just to be sure, your python UDF is easy to implement in pure ibis? But I'm guessing you just wanted a toy example.
|
Your implementation worked exactly as intended, so it must be related to the fact that it's not being registered correctly.
Yes that's right. My actual UDF uses some functions from |
@cpcloud, could we make it so that @jstammers original workflow works? Perhaps: when a user does |
II think you could just re-create the comparer every time the .compare method is called? class Deduper:
def compare(self, table):
comparer = make_comparer()
return comparer(table) |
@jstammers I filed the above bug with ibis, I don't think this is anything that mismo can address, so closing this. LEt me know if there is still something pending. |
I've encountered an issue that is preventing me from using a
LevelComparer
that has been logged usingmlflow
and loaded as a spark UDF.Rougly speaking, here's what I'm attempting to do.
After some investigation, I think this is related to how the
predict
method is registered as a UDF when loading it usingmlflow.pyfunc.spark
. I'm able to call this correctly when usingmlflow.pyfunc.load_model
, but I have other dependencies that require the use of the spark UDF API.I have an example here that replicates the error I see that doesn't require
mlflow
and should hopefully be easier to testWhich gives the following traceback (I've omitted the Java components for brevity)
I don't see this same behaviour when using ibis-native comparison functions, which makes me suspect it's caused by the python UDF.
I may be able to work-around this specific error by converting my python UDFs into pyarrow ones. If so, I'll post an update here
The text was updated successfully, but these errors were encountered: