diff --git a/onnxmltools/convert/sparkml/operator_converters/__init__.py b/onnxmltools/convert/sparkml/operator_converters/__init__.py index ab8b6729..5202e4e7 100644 --- a/onnxmltools/convert/sparkml/operator_converters/__init__.py +++ b/onnxmltools/convert/sparkml/operator_converters/__init__.py @@ -33,4 +33,4 @@ from . import onehot_encoder from . import vector_assembler from . import k_means - +from . import count_vectorizer \ No newline at end of file diff --git a/onnxmltools/convert/sparkml/operator_converters/count_vectorizer.py b/onnxmltools/convert/sparkml/operator_converters/count_vectorizer.py new file mode 100644 index 00000000..502ee556 --- /dev/null +++ b/onnxmltools/convert/sparkml/operator_converters/count_vectorizer.py @@ -0,0 +1,85 @@ +# SPDX-License-Identifier: Apache-2.0 + +from ...common._registration import register_converter, register_shape_calculator +from ...common.data_types import StringTensorType, FloatTensorType +from ...common.utils import check_input_and_output_numbers, check_input_and_output_types +from ...common._topology import Operator, Scope, ModelComponentContainer +from pyspark.ml.feature import CountVectorizerModel + + +def convert_count_vectorizer(scope: Scope, operator: Operator, container: ModelComponentContainer): + op: CountVectorizerModel = operator.raw_operator + vocab, minTF, binary = op.vocabulary, op.getOrDefault("minTF"), op.getOrDefault("binary") + + if minTF < 1.0: + raise NotImplementedError("Converting to ONNX for CountVectorizerModel is not supported when minTF < 1.0") + + min_opset = 9 + if not binary: + # If binary is False, then we need the ThresholdedRelu operator which is only available since opset 10. + min_opset = 10 + + if container.target_opset < min_opset: + raise NotImplementedError( + f"Converting to ONNX for CountVectorizerModel is not supported in opset < {min_opset}" + ) + + # Create a TfIdfVectorizer node with gram length set to 1 and mode set to "TF". + vectorizer_output_variable_name = scope.get_unique_variable_name("vectorizer_output") + tfIdfVectorizer_attrs = { + "name": scope.get_unique_operator_name("tfIdfVectorizer"), + "min_gram_length": 1, + "max_gram_length": 1, + "max_skip_count": 0, + "mode": "TF", + "ngram_counts": [0], + "ngram_indexes": [*range(len(vocab))], + "pool_strings": vocab, + } + + container.add_node( + op_type="TfIdfVectorizer", + inputs=[operator.inputs[0].full_name], + outputs=[vectorizer_output_variable_name], + op_version=9, + **tfIdfVectorizer_attrs, + ) + + # In Spark's CountVectorizerModel, the comparison with minTF is inclusive, + # but in ThresholdedRelu (or Binarizer) node, the comparison with `alpha` (or `threshold`) is exclusive. + # So, we need to subtract epsilon from minTF to make the comparison with `alpha` (or `threshold`) effectively inclusive. + epsilon = 1e-6 + if binary: + # Create a Binarizer node with threshold set to minTF - epsilon. + container.add_node( + op_type="Binarizer", + inputs=[vectorizer_output_variable_name], + outputs=[operator.outputs[0].full_name], + op_version=1, + op_domain="ai.onnx.ml", + threshold=minTF - epsilon, + ) + else: + # Create a ThresholdedRelu node with alpha set to minTF - epsilon + container.add_node( + op_type="ThresholdedRelu", + inputs=[vectorizer_output_variable_name], + outputs=[operator.outputs[0].full_name], + op_version=10, + alpha=minTF - epsilon, + ) + + +register_converter("pyspark.ml.feature.CountVectorizerModel", convert_count_vectorizer) + + +def calculate_count_vectorizer_output_shapes(operator): + check_input_and_output_numbers(operator, output_count_range=1) + check_input_and_output_types(operator, good_input_types=[StringTensorType]) + + N = operator.inputs[0].type.shape[0] + C = len(operator.raw_operator.vocabulary) + operator.outputs[0].type = FloatTensorType([N, C]) + + +register_shape_calculator("pyspark.ml.feature.CountVectorizerModel", calculate_count_vectorizer_output_shapes) diff --git a/onnxmltools/convert/sparkml/ops_input_output.py b/onnxmltools/convert/sparkml/ops_input_output.py index 7837d203..405eba78 100644 --- a/onnxmltools/convert/sparkml/ops_input_output.py +++ b/onnxmltools/convert/sparkml/ops_input_output.py @@ -141,6 +141,10 @@ def build_io_name_map(): lambda model: [model.getOrDefault("inputCol")], lambda model: [model.getOrDefault("outputCol")] ), + "pyspark.ml.feature.CountVectorizerModel": ( + lambda model: [model.getOrDefault("inputCol")], + lambda model: [model.getOrDefault("outputCol")] + ), "pyspark.ml.classification.LinearSVCModel": ( lambda model: [model.getOrDefault("featuresCol")], lambda model: [model.getOrDefault("predictionCol")] diff --git a/tests/sparkml/test_count_vectorizer.py b/tests/sparkml/test_count_vectorizer.py new file mode 100644 index 00000000..b24bc1cf --- /dev/null +++ b/tests/sparkml/test_count_vectorizer.py @@ -0,0 +1,87 @@ +# SPDX-License-Identifier: Apache-2.0 + +import sys +import unittest +import numpy +import pandas +from pyspark.ml.feature import CountVectorizer, CountVectorizerModel +from onnx.defs import onnx_opset_version +from onnxconverter_common.onnx_ex import DEFAULT_OPSET_NUMBER +from onnxmltools import convert_sparkml +from onnxmltools.convert.common.data_types import StringTensorType +from tests.sparkml.sparkml_test_utils import save_data_models, run_onnx_model, compare_results +from tests.sparkml import SparkMlTestCase + +TARGET_OPSET = min(DEFAULT_OPSET_NUMBER, onnx_opset_version()) + +class TestSparkmlCountVectorizer(SparkMlTestCase): + + @unittest.skipIf(sys.version_info < (3, 8), + reason="pickle fails on python 3.7") + def test_count_vectorizer_default(self): + data = self.spark.createDataFrame([ + ("A B C".split(" "), ), + ("A B B C A".split(" "), ), + ], ["text"]) + count_vec = CountVectorizer(inputCol="text", outputCol="result", minTF=1.0, binary=False) + model: CountVectorizerModel = count_vec.fit(data) + result = model.transform(data) + + model_onnx = convert_sparkml(model, 'Sparkml CountVectorizer', [('text', StringTensorType([None, None]))], target_opset=TARGET_OPSET) + self.assertTrue(model_onnx is not None) + + data_pd = data.toPandas() + data_np = { + "text": data_pd.text.apply(lambda x: pandas.Series(x)).values.astype(str), + } + + expected = { + "prediction_result": numpy.asarray(result.toPandas().result.apply(lambda x: pandas.Series(x.toArray())).values.astype(numpy.float32)), + } + + paths = save_data_models(data_np, expected, model, model_onnx, basename="SparkmlCountVectorizerModel_Default") + onnx_model_path = paths[-1] + + output_names = ['result'] + output, output_shapes = run_onnx_model(output_names, data_np, onnx_model_path) + actual_output = dict(zip(output_names, output)) + + assert output_shapes[0] == [None, 3] + compare_results(expected["prediction_result"], actual_output["result"], decimal=5) + + @unittest.skipIf(sys.version_info < (3, 8), + reason="pickle fails on python 3.7") + def test_count_vectorizer_binary(self): + data = self.spark.createDataFrame([ + ("A B C".split(" "), ), + ("A B B C A".split(" "), ), + ("B B B D".split(" "), ), + ], ["text"]) + count_vec = CountVectorizer(inputCol="text", outputCol="result", minTF=2.0, binary=True) + model: CountVectorizerModel = count_vec.fit(data) + result = model.transform(data) + + model_onnx = convert_sparkml(model, 'Sparkml CountVectorizer', [('text', StringTensorType([None, None]))], target_opset=TARGET_OPSET) + self.assertTrue(model_onnx is not None) + + data_pd = data.toPandas() + data_np = { + "text": data_pd.text.apply(lambda x: pandas.Series(x)).values.astype(str), + } + + expected = { + "prediction_result": numpy.asarray(result.toPandas().result.apply(lambda x: pandas.Series(x.toArray())).values.astype(numpy.float32)), + } + + paths = save_data_models(data_np, expected, model, model_onnx, basename="SparkmlCountVectorizerModel_Binary") + onnx_model_path = paths[-1] + + output_names = ['result'] + output, output_shapes = run_onnx_model(output_names, data_np, onnx_model_path) + actual_output = dict(zip(output_names, output)) + + assert output_shapes[0] == [None, 4] + compare_results(expected["prediction_result"], actual_output["result"], decimal=5) + +if __name__ == "__main__": + unittest.main()