Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support transformed labels #35

Open
sctincman opened this issue Feb 9, 2018 · 8 comments
Open

Support transformed labels #35

sctincman opened this issue Feb 9, 2018 · 8 comments

Comments

@sctincman
Copy link

sctincman commented Feb 9, 2018

Running Spark 2.1.2, using jpmml-sparkml 1.2.7.

While attempting to run the following pyspark in order to convert a simple pipeline with a RandomForestClassifer model with either toPMMLByteArray or toPMML, I'm receiving the a NullPointerException.

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import *

def updateFlightsSchema(dataSet):
    return ( dataSet.withColumn("DepDelay_Double",  dataSet["DepDelay"].cast("Double"))
                    .withColumn("DepDelay",         dataSet["DepDelay"].cast("Double"))
                    .withColumn("ArrDelay",         dataSet["ArrDelay"].cast("Double"))
                    .withColumn("Month",            dataSet["Month"].cast("Double"))
                    .withColumn("DayofMonth",       dataSet["DayofMonth"].cast("Double"))
                    .withColumn("CRSDepTime",       dataSet["CRSDepTime"].cast("Double"))
                    .withColumn("Distance",         dataSet["Distance"].cast("Double"))
                    .withColumn("AirTime",          dataSet["AirTime"].cast("Double"))
            )
    
data2007 = updateFlightsSchema(sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("mode", "DROPMALFORMED").load("2007_short.csv"))

removeCancelled = SQLTransformer(statement="select * from __THIS__ where Cancelled = \"0\" AND Diverted = \"0\"")
data2007 = removeCancelled.transform(data2007)

binarizer = Binarizer(threshold=15.0, inputCol="DepDelay_Double", outputCol="DepDelay_Bin")
featuresAssembler = VectorAssembler(inputCols=["Month", "CRSDepTime", "Distance"], outputCol="features")
rfc3 = RandomForestClassifier(labelCol="DepDelay_Bin", featuresCol="features", numTrees=3, maxDepth=5, seed=10305)

pipelineRF3 = Pipeline(stages=[binarizer, featuresAssembler, rfc3])

model3 = pipelineRF3.fit(data2007)

from py4j.java_gateway import JavaClass
from pyspark.ml.common import _py2java

javaDF = _py2java(sc, data2007)
javaSchema = javaDF.schema.__call__()

jvm = sc._gateway.jvm

javaConverter = sc._gateway.jvm.org.jpmml.sparkml.ConverterUtil
if(not isinstance(javaConverter, JavaClass)):
    raise RuntimeError("JPMML-SparkML not found on classpath")

pmml = jvm.org.jpmml.sparkml.ConverterUtil.toPMMLByteArray(javaSchema, model3._to_java())
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.jpmml.sparkml.ConverterUtil.toPMMLByteArray.
: java.lang.NullPointerException
	at org.jpmml.converter.CategoricalLabel.<init>(CategoricalLabel.java:35)
	at org.jpmml.sparkml.ModelConverter.encodeSchema(ModelConverter.java:82)
	at org.jpmml.sparkml.ModelConverter.registerModel(ModelConverter.java:162)
	at org.jpmml.sparkml.ConverterUtil.toPMML(ConverterUtil.java:86)
	at org.jpmml.sparkml.ConverterUtil.toPMMLByteArray(ConverterUtil.java:142)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:280)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:214)
	at java.lang.Thread.run(Thread.java:748)

Following #22 I attempted to use the different Indexers on features and label columns to try and hint that these are categorical, but this resulted in the same error. Further, when I print the final tree, I do not see categorical feature declarations.

Dataset used, and tree output attached.
2007_short.zip
rfc.txt

@vruusmann
Copy link
Member

vruusmann commented Feb 9, 2018

The JPMML-SparkML library assumes that the label column of classification models is a "native" categorical label (in PMML, corresponds to a DataDictionary/DataField element), not a "transformed" categorical label (corresponds to a TransformationDictionary/DerivedField element).

I've been taking it granted, and forgot to actually implement this "native" vs "transformed" check around ModelConverter.java:82.

It's possible to make your example work, by applying the Binarize transformation to the dataset outside of the pipeline, and then treating its output column "DepDelay_Bin" as a "native" categorical label:

binarizer = Binarizer(threshold=15.0, inputCol="DepDelay_Double", outputCol="DepDelay_Bin")
data2007 = binarizer.transform(data2007) # THIS!

stringIndexer = StringIndexer(inputCol="DepDelay_Bin", outputCol="DepDelay_Bin_Label") # THIS!
featuresAssembler = VectorAssembler(inputCols=["Month", "CRSDepTime", "Distance"], outputCol="features")
rfc3 = RandomForestClassifier(labelCol="DepDelay_Bin_Label", featuresCol="features", numTrees=3, maxDepth=5, seed=10305)

pipelineRF3 = Pipeline(stages=[stringIndexer, featuresAssembler, rfc3]) # THIS: start the pipeline with StringIndexer not Binarizer

model3 = pipelineRF3.fit(data2007)

from jpmml_sparkml import toPMMLBytes
pmmlBytes = toPMMLBytes(sc, data2007, model3)
print(pmmlBytes.decode("UTF-8"))

@vruusmann vruusmann changed the title Cannot convert Pipeline: NPE on initializing CategoricalLabel Support transformed feature as a label Feb 9, 2018
@vruusmann
Copy link
Member

Technically, it shouldn't be much work to make JPMML-SparkML work with "transformed" labels, so keeping this issue open to track progress towards this functionality.

@vruusmann vruusmann changed the title Support transformed feature as a label Support transformed labels Feb 9, 2018
@alex-krash
Copy link

Looks like it can be closed for current version:

            Binarizer binarizer = new Binarizer()
                    .setInputCol("Sepal_Length")
                    .setOutputCol("Sepal_Length_Binar_")
                    .setThreshold(5.0)
            ;

            StringIndexer labelIndexer = new StringIndexer()
                    .setInputCol("Species")
                    .setOutputCol("Species_Bin");

            VectorAssembler vectorAssembler = new VectorAssembler()
                    .setInputCols(new String[]{
                            "Sepal_Length_Binar_",
                            "Sepal_Width",
                            "Petal_Length",
                            "Petal_Width"})
                    .setOutputCol("features");

            RandomForestClassifier classifier = new RandomForestClassifier()
                    .setLabelCol("Species_Bin");

            Pipeline pipeline = new Pipeline().setStages(new PipelineStage[]{binarizer, labelIndexer, vectorAssembler, classifier});
            PipelineModel model = pipeline.fit(dataset);

            PMMLBuilder builder = new PMMLBuilder(schema, model);
            final PMML build = builder.build();
            JAXBUtil.marshalPMML(build, new StreamResult(System.out));

@vruusmann
Copy link
Member

Looks like it can be closed for current version

Nope, I'd like to be able to use Sepal_Length_Binar_ as the label column here.

@borisborowsky
Copy link

borisborowsky commented Jul 14, 2019

Can someone help me with this error: AttributeError: 'Pipeline' object has no attribute '_transfer_param_map_to_java' error. I get it when i try to execute the PMMLBuilder()

dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="features")
evaluator = MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol='prediction', metricName='f1')

paramGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [1, 2, 6])

             .addGrid(dt.maxBins, [570, 570])

             .build())

stages += [dt]
pipeline = Pipeline(stages=stages)


cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=3)

cvModel = cv.fit(dataSet)
train_dataset = cvModel.transform(dataSet)

train_dataset.show()
print(evaluator.evaluate(train_dataset))

pmmlBuilder = PMMLBuilder(spark, dataSet, cvModel) \
    .putOption(dt, "compact", True)

pmmlBuilder.buildFile("DecisionTreeIris.pmml")

I cannot find any fix to this what I am doing wrong ?

@vruusmann
Copy link
Member

AttributeError: 'Pipeline' object has no attribute '_transfer_param_map_to_java' error

This is clearly a low-level PySpark error, which has got nothing to do with PySpark2PMML or JPMML-SparkML.

Maybe your PySpark and Apache Spark versions are out of sync.

@borisborowsky
Copy link

borisborowsky commented Jul 15, 2019

@vruusmann Thank you. My PySpark and Apache versions are up to date. The problem was you must pass the pipeline's bestmodel in my case cvModel.bestModel do the work.

@borisborowsky
Copy link

@vruusmann Sorry for the off-topic i will delete the question but now i run into another issue when i try to buildFile from the pmmlBuilder object it says format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o57101.buildFile.
: java.lang.IllegalArgumentException: Expected 3 target categories, got 2 target category, raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.IllegalArgumentException: 'Expected 3 target categories, got 2 target categories'. I cannot understand why do you have a clue ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants