Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for SparkML KMeansModel conversion #556

Merged
merged 6 commits into from
Jun 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@
from . import linear_classifier
from . import onehot_encoder
from . import vector_assembler
from . import k_means

180 changes: 180 additions & 0 deletions onnxmltools/convert/sparkml/operator_converters/k_means.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
from ...common._registration import register_converter, register_shape_calculator
from ...common.data_types import Int64TensorType, FloatTensorType
from ...common.utils import check_input_and_output_numbers, check_input_and_output_types
from ...common._topology import Operator, Scope, ModelComponentContainer
from ....proto import onnx_proto
from pyspark.ml.clustering import KMeansModel
from typing import List
import numpy as np

def convert_sparkml_k_means_model(scope: Scope, operator: Operator, container: ModelComponentContainer):
if container.target_opset < 7:
raise NotImplementedError("Converting to ONNX for KMeansModel is not supported in opset < 7")

op: KMeansModel = operator.raw_operator
centers: np.ndarray = np.vstack(op.clusterCenters())

K = centers.shape[0] # number of clusters
C = operator.inputs[0].type.shape[1] # Number of features from input

if centers.shape[1] != C:
raise ValueError(f"Number of features {centers.shape[1]} in input does not match number of features in centers {C}")

# [K x C]
centers_variable_name = scope.get_unique_variable_name("centers")
container.add_initializer(
centers_variable_name,
onnx_proto.TensorProto.FLOAT,
centers.shape,
centers.flatten().astype(np.float32)
)

distance_output_variable_name = scope.get_unique_variable_name("distance_output")

if op.getDistanceMeasure() == "euclidean":
# [1 x K]
centers_row_squared_sum_variable_name = scope.get_unique_variable_name("centers_row_squared_sum")
centers_row_squared_sum = np.sum(centers**2,axis=-1).flatten().astype(np.float32)
container.add_initializer(
centers_row_squared_sum_variable_name,
onnx_proto.TensorProto.FLOAT,
[1, K],
centers_row_squared_sum
)

# input_row_squared_sum: [N x 1]
input_row_squared_sum_variable_name = scope.get_unique_variable_name("input_row_squared_sum")
reduce_sum_square_attrs = {
"name": scope.get_unique_operator_name("input_row_squared_sum"),
"axes": [1],
"keepdims": 1,
}
container.add_node(
op_type="ReduceSumSquare",
inputs=[operator.inputs[0].full_name],
outputs=[input_row_squared_sum_variable_name],
**reduce_sum_square_attrs
)

# -2 * input * Transpose(Center) + input_row_squared_sum: [N x K]
gemm_output_variable_name = scope.get_unique_variable_name("gemm_output")
gemm_attrs = {
"name": scope.get_unique_operator_name("GeMM"),
"alpha": -2.0,
"beta": 1.0,
"transB": 1,
}
container.add_node(
op_type="Gemm",
inputs=[operator.inputs[0].full_name, centers_variable_name, input_row_squared_sum_variable_name],
outputs=[gemm_output_variable_name],
op_version=7,
**gemm_attrs
)

# Euclidean Distance Squared = input_row_squared_sum - 2 * input * Transpose(Center) + Transpose(centers_row_squared_sum)
# [N x K]
container.add_node(
op_type="Add",
inputs=[gemm_output_variable_name, centers_row_squared_sum_variable_name],
outputs=[distance_output_variable_name],
op_version=7,
)
elif op.getDistanceMeasure() == "cosine":
# centers_row_norm2: [1 x K]
centers_row_norm2_variable_name = scope.get_unique_variable_name("centers_row_norm2")
centers_row_norm2 = np.linalg.norm(centers, ord = 2, axis=1).flatten().astype(np.float32)
container.add_initializer(
centers_row_norm2_variable_name,
onnx_proto.TensorProto.FLOAT,
[1, K],
centers_row_norm2
)

# input_row_norm2: [N x 1]
input_row_norm2_variable_name = scope.get_unique_variable_name("input_row_norm2")
reduce_l2_attrs = {
"name": scope.get_unique_operator_name("input_row_norm2"),
"axes": [1],
"keepdims": 1,
}
container.add_node(
op_type="ReduceL2",
inputs=[operator.inputs[0].full_name],
outputs=[input_row_norm2_variable_name],
**reduce_l2_attrs
)

# input * Transpose(Center): [N x K]
zeros_variable_name = scope.get_unique_variable_name("zeros")
container.add_initializer(
zeros_variable_name,
onnx_proto.TensorProto.FLOAT,
[1, K],
np.zeros([1, K]).flatten().astype(np.float32)
)
gemm_output_variable_name = scope.get_unique_variable_name("gemm_output")
gemm_attrs = {
"name": scope.get_unique_operator_name("GeMM"),
"alpha": 1.0,
"transB": 1,
}
container.add_node(
op_type="Gemm",
inputs=[operator.inputs[0].full_name, centers_variable_name, zeros_variable_name],
outputs=[gemm_output_variable_name],
op_version=7,
**gemm_attrs
)

# Cosine similarity = gemm_output / input_row_norm2 / centers_row_norm2: [N x K]
div_output_variable_name = scope.get_unique_variable_name("div_output")
container.add_node(
op_type="Div",
inputs=[gemm_output_variable_name, input_row_norm2_variable_name],
outputs=[div_output_variable_name],
op_version=7,
)
cosine_similarity_output_variable_name = scope.get_unique_variable_name("cosine_similarity_output")
container.add_node(
op_type="Div",
inputs=[div_output_variable_name, centers_row_norm2_variable_name],
outputs=[cosine_similarity_output_variable_name],
op_version=7,
)

# Cosine distance - 1 = -Cosine similarity: [N x K]
container.add_node(
op_type="Neg",
inputs=[cosine_similarity_output_variable_name],
outputs=[distance_output_variable_name],
)
else:
raise ValueError(f"Distance measure {op.getDistanceMeasure()} not supported")

# ArgMin(distance): [N]
argmin_attrs = {
"axis": 1,
"keepdims": 0,
}
container.add_node(
op_type="ArgMin",
inputs=[distance_output_variable_name],
outputs=[operator.outputs[0].full_name],
**argmin_attrs
)

register_converter('pyspark.ml.clustering.KMeansModel', convert_sparkml_k_means_model)


def calculate_k_means_model_output_shapes(operator: Operator):
check_input_and_output_numbers(operator, input_count_range=1, output_count_range=1)
check_input_and_output_types(operator, good_input_types=[FloatTensorType])
if len(operator.inputs[0].type.shape) != 2:
raise RuntimeError('Input must be a [N, C]-tensor')

N = operator.inputs[0].type.shape[0]
operator.outputs[0].type = Int64TensorType(shape=[N])


register_shape_calculator('pyspark.ml.clustering.KMeansModel', calculate_k_means_model_output_shapes)
4 changes: 4 additions & 0 deletions onnxmltools/convert/sparkml/ops_input_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ def build_io_name_map():
"pyspark.ml.feature.VectorAssembler": (
lambda model: model.getOrDefault("inputCols"),
lambda model: [model.getOrDefault("outputCol")]
),
"pyspark.ml.clustering.KMeansModel": (
lambda model: [model.getOrDefault("featuresCol")],
lambda model: [model.getOrDefault("predictionCol")]
)
}
return map
Expand Down
5 changes: 4 additions & 1 deletion onnxmltools/convert/sparkml/ops_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from pyspark.ml.regression import LinearRegressionModel

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import KMeansModel
from pyspark.ml.clustering import GaussianMixture
from pyspark.ml.clustering import LDA

Expand All @@ -70,6 +70,9 @@ def build_sparkml_operator_name_map():
AFTSurvivalRegressionModel, DecisionTreeRegressionModel, GBTRegressionModel, GBTRegressionModel,
GeneralizedLinearRegressionModel, IsotonicRegressionModel, LinearRegressionModel, RandomForestRegressionModel
]})
res.update({k: "pyspark.ml.clustering." + k.__name__ for k in [
KMeansModel
]})
return res


Expand Down
79 changes: 79 additions & 0 deletions tests/sparkml/test_k_means.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# SPDX-License-Identifier: Apache-2.0

import sys
import unittest
import numpy
import pandas
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
from onnx.defs import onnx_opset_version
from onnxconverter_common.onnx_ex import DEFAULT_OPSET_NUMBER
from pyspark.ml import Pipeline
from onnxmltools import convert_sparkml
from onnxmltools.convert.common.data_types import FloatTensorType
from tests.sparkml.sparkml_test_utils import save_data_models, run_onnx_model, compare_results
from tests.sparkml import SparkMlTestCase


TARGET_OPSET = min(DEFAULT_OPSET_NUMBER, onnx_opset_version())


class TestSparkmlKMeansModel(SparkMlTestCase):

@unittest.skipIf(sys.version_info < (3, 8),
reason="pickle fails on python 3.7")
def test_model_k_means_euclidean(self):
"""
Testing ONNX conversion for Spark KMeansModel when distanceMeasure is set to "euclidean".
"""
kmeans_euclidean = KMeans(k=3, distanceMeasure="euclidean", featuresCol="features_euclidean", predictionCol="prediction_euclidean")
kmeans_cosine = KMeans(k=3, distanceMeasure="cosine", featuresCol="features_cosine", predictionCol="prediction_cosine")

data = self.spark.createDataFrame([
(0, Vectors.dense([1.0, 3.1, -1.0]),Vectors.dense([1.0, 1.0, 1.0]),),
(1, Vectors.dense([1.1, 3.0, -1.1]),Vectors.dense([2.0, 2.0, 2.0]),),
(2, Vectors.dense([-3.0, 5.1, 9.0]),Vectors.dense([-1.0, 3.0, -5.0]),),
(3, Vectors.dense([-2.9, 4.9, 8.9]),Vectors.dense([-2.0, 6.0, -10.0]),),
(4, Vectors.dense([5.0, -3.5, 2.0]),Vectors.dense([1.0, -2.0, 4.0]),),
(5, Vectors.dense([5.1, -3.3, 2.1]),Vectors.dense([2.0, -4.0, 8.0]),),
], ["id", "features_euclidean", "features_cosine"])

model = Pipeline(stages=[kmeans_euclidean, kmeans_cosine]).fit(data)
model_onnx = convert_sparkml(
model,
'Sparkml KMeansModel',
[('features_euclidean', FloatTensorType([None, 3])), ('features_cosine', FloatTensorType([None, 3]))],
target_opset=TARGET_OPSET
)

self.assertTrue(model_onnx is not None)
self.assertTrue(model_onnx.graph.node is not None)

# run the model
predicted = model.transform(data).toPandas()

data_pd = data.toPandas()
data_np = {
"features_euclidean": data_pd.features_euclidean.apply(lambda x: pandas.Series(x.toArray())).values.astype(numpy.float32),
"features_cosine": data_pd.features_cosine.apply(lambda x: pandas.Series(x.toArray())).values.astype(numpy.float32),
}

expected = {
"prediction_euclidean": numpy.asarray(predicted.prediction_euclidean.values),
"prediction_cosine": numpy.asarray(predicted.prediction_cosine.values),
}

paths = save_data_models(data_np, expected, model, model_onnx, basename="SparkmlKMeansModel")
onnx_model_path = paths[-1]

output_names = ['prediction_euclidean', 'prediction_cosine']
output, output_shapes = run_onnx_model(output_names, data_np, onnx_model_path)
actual_output = dict(zip(output_names, output))

assert output_shapes[0] == [None]
assert output_shapes[1] == [None]
compare_results(expected["prediction_euclidean"], actual_output["prediction_euclidean"], decimal=5)
compare_results(expected["prediction_cosine"], actual_output["prediction_cosine"], decimal=5)

if __name__ == "__main__":
unittest.main()