Skip to content

Commit

Permalink
feat: add support for SparkML CountVectorizer conversion (#560)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Wang <jasowang@microsoft.com>
  • Loading branch information
memoryz authored Jun 8, 2022
1 parent f0fdf12 commit d0130f2
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,4 @@
from . import onehot_encoder
from . import vector_assembler
from . import k_means

from . import count_vectorizer
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# SPDX-License-Identifier: Apache-2.0

from ...common._registration import register_converter, register_shape_calculator
from ...common.data_types import StringTensorType, FloatTensorType
from ...common.utils import check_input_and_output_numbers, check_input_and_output_types
from ...common._topology import Operator, Scope, ModelComponentContainer
from pyspark.ml.feature import CountVectorizerModel


def convert_count_vectorizer(scope: Scope, operator: Operator, container: ModelComponentContainer):
op: CountVectorizerModel = operator.raw_operator
vocab, minTF, binary = op.vocabulary, op.getOrDefault("minTF"), op.getOrDefault("binary")

if minTF < 1.0:
raise NotImplementedError("Converting to ONNX for CountVectorizerModel is not supported when minTF < 1.0")

min_opset = 9
if not binary:
# If binary is False, then we need the ThresholdedRelu operator which is only available since opset 10.
min_opset = 10

if container.target_opset < min_opset:
raise NotImplementedError(
f"Converting to ONNX for CountVectorizerModel is not supported in opset < {min_opset}"
)

# Create a TfIdfVectorizer node with gram length set to 1 and mode set to "TF".
vectorizer_output_variable_name = scope.get_unique_variable_name("vectorizer_output")
tfIdfVectorizer_attrs = {
"name": scope.get_unique_operator_name("tfIdfVectorizer"),
"min_gram_length": 1,
"max_gram_length": 1,
"max_skip_count": 0,
"mode": "TF",
"ngram_counts": [0],
"ngram_indexes": [*range(len(vocab))],
"pool_strings": vocab,
}

container.add_node(
op_type="TfIdfVectorizer",
inputs=[operator.inputs[0].full_name],
outputs=[vectorizer_output_variable_name],
op_version=9,
**tfIdfVectorizer_attrs,
)

# In Spark's CountVectorizerModel, the comparison with minTF is inclusive,
# but in ThresholdedRelu (or Binarizer) node, the comparison with `alpha` (or `threshold`) is exclusive.
# So, we need to subtract epsilon from minTF to make the comparison with `alpha` (or `threshold`) effectively inclusive.
epsilon = 1e-6
if binary:
# Create a Binarizer node with threshold set to minTF - epsilon.
container.add_node(
op_type="Binarizer",
inputs=[vectorizer_output_variable_name],
outputs=[operator.outputs[0].full_name],
op_version=1,
op_domain="ai.onnx.ml",
threshold=minTF - epsilon,
)
else:
# Create a ThresholdedRelu node with alpha set to minTF - epsilon
container.add_node(
op_type="ThresholdedRelu",
inputs=[vectorizer_output_variable_name],
outputs=[operator.outputs[0].full_name],
op_version=10,
alpha=minTF - epsilon,
)


register_converter("pyspark.ml.feature.CountVectorizerModel", convert_count_vectorizer)


def calculate_count_vectorizer_output_shapes(operator):
check_input_and_output_numbers(operator, output_count_range=1)
check_input_and_output_types(operator, good_input_types=[StringTensorType])

N = operator.inputs[0].type.shape[0]
C = len(operator.raw_operator.vocabulary)
operator.outputs[0].type = FloatTensorType([N, C])


register_shape_calculator("pyspark.ml.feature.CountVectorizerModel", calculate_count_vectorizer_output_shapes)
4 changes: 4 additions & 0 deletions onnxmltools/convert/sparkml/ops_input_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ def build_io_name_map():
lambda model: [model.getOrDefault("inputCol")],
lambda model: [model.getOrDefault("outputCol")]
),
"pyspark.ml.feature.CountVectorizerModel": (
lambda model: [model.getOrDefault("inputCol")],
lambda model: [model.getOrDefault("outputCol")]
),
"pyspark.ml.classification.LinearSVCModel": (
lambda model: [model.getOrDefault("featuresCol")],
lambda model: [model.getOrDefault("predictionCol")]
Expand Down
87 changes: 87 additions & 0 deletions tests/sparkml/test_count_vectorizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# SPDX-License-Identifier: Apache-2.0

import sys
import unittest
import numpy
import pandas
from pyspark.ml.feature import CountVectorizer, CountVectorizerModel
from onnx.defs import onnx_opset_version
from onnxconverter_common.onnx_ex import DEFAULT_OPSET_NUMBER
from onnxmltools import convert_sparkml
from onnxmltools.convert.common.data_types import StringTensorType
from tests.sparkml.sparkml_test_utils import save_data_models, run_onnx_model, compare_results
from tests.sparkml import SparkMlTestCase

TARGET_OPSET = min(DEFAULT_OPSET_NUMBER, onnx_opset_version())

class TestSparkmlCountVectorizer(SparkMlTestCase):

@unittest.skipIf(sys.version_info < (3, 8),
reason="pickle fails on python 3.7")
def test_count_vectorizer_default(self):
data = self.spark.createDataFrame([
("A B C".split(" "), ),
("A B B C A".split(" "), ),
], ["text"])
count_vec = CountVectorizer(inputCol="text", outputCol="result", minTF=1.0, binary=False)
model: CountVectorizerModel = count_vec.fit(data)
result = model.transform(data)

model_onnx = convert_sparkml(model, 'Sparkml CountVectorizer', [('text', StringTensorType([None, None]))], target_opset=TARGET_OPSET)
self.assertTrue(model_onnx is not None)

data_pd = data.toPandas()
data_np = {
"text": data_pd.text.apply(lambda x: pandas.Series(x)).values.astype(str),
}

expected = {
"prediction_result": numpy.asarray(result.toPandas().result.apply(lambda x: pandas.Series(x.toArray())).values.astype(numpy.float32)),
}

paths = save_data_models(data_np, expected, model, model_onnx, basename="SparkmlCountVectorizerModel_Default")
onnx_model_path = paths[-1]

output_names = ['result']
output, output_shapes = run_onnx_model(output_names, data_np, onnx_model_path)
actual_output = dict(zip(output_names, output))

assert output_shapes[0] == [None, 3]
compare_results(expected["prediction_result"], actual_output["result"], decimal=5)

@unittest.skipIf(sys.version_info < (3, 8),
reason="pickle fails on python 3.7")
def test_count_vectorizer_binary(self):
data = self.spark.createDataFrame([
("A B C".split(" "), ),
("A B B C A".split(" "), ),
("B B B D".split(" "), ),
], ["text"])
count_vec = CountVectorizer(inputCol="text", outputCol="result", minTF=2.0, binary=True)
model: CountVectorizerModel = count_vec.fit(data)
result = model.transform(data)

model_onnx = convert_sparkml(model, 'Sparkml CountVectorizer', [('text', StringTensorType([None, None]))], target_opset=TARGET_OPSET)
self.assertTrue(model_onnx is not None)

data_pd = data.toPandas()
data_np = {
"text": data_pd.text.apply(lambda x: pandas.Series(x)).values.astype(str),
}

expected = {
"prediction_result": numpy.asarray(result.toPandas().result.apply(lambda x: pandas.Series(x.toArray())).values.astype(numpy.float32)),
}

paths = save_data_models(data_np, expected, model, model_onnx, basename="SparkmlCountVectorizerModel_Binary")
onnx_model_path = paths[-1]

output_names = ['result']
output, output_shapes = run_onnx_model(output_names, data_np, onnx_model_path)
actual_output = dict(zip(output_names, output))

assert output_shapes[0] == [None, 4]
compare_results(expected["prediction_result"], actual_output["result"], decimal=5)

if __name__ == "__main__":
unittest.main()

0 comments on commit d0130f2

Please sign in to comment.