Skip to content

Commit

Permalink
feat: add support for SparkML KMeansModel conversion
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Wang <jasowang@microsoft.com>
  • Loading branch information
memoryz committed May 29, 2022
1 parent 19a0ddf commit 6e29fcc
Show file tree
Hide file tree
Showing 5 changed files with 274 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,5 @@
from . import linear_classifier
from . import onehot_encoder
from . import vector_assembler
from . import k_means

183 changes: 183 additions & 0 deletions onnxmltools/convert/sparkml/operator_converters/k_means.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
from ...common._registration import register_converter, register_shape_calculator
from ...common.data_types import Int64TensorType, FloatTensorType
from ...common.utils import check_input_and_output_numbers, check_input_and_output_types
from ...common._topology import Operator, Scope, ModelComponentContainer
from ....proto import onnx_proto
from pyspark.ml.clustering import KMeansModel
from typing import List
import numpy as np

def convert_sparkml_k_means_model(scope: Scope, operator: Operator, container: ModelComponentContainer):
op: KMeansModel = operator.raw_operator
centers: np.ndarray = np.vstack(op.clusterCenters())

K = centers.shape[0] # number of clusters
C = operator.inputs[0].type.shape[1] # Number of features from input

if centers.shape[1] != C:
raise ValueError(f"Number of features {centers.shape[1]} in input does not match number of features in centers {C}")

# [K x C]
centers_variable_name = scope.get_unique_variable_name("centers")
container.add_initializer(
centers_variable_name,
onnx_proto.TensorProto.FLOAT,
centers.shape,
centers.flatten().astype(np.float32)
)

distance_output_variable_name = scope.get_unique_variable_name("distance_output")

if op.getDistanceMeasure() == "euclidean":
# [1 x K]
centers_row_squared_sum_variable_name = scope.get_unique_variable_name("centers_row_squared_sum")
centers_row_squared_sum = np.sum(centers**2,axis=-1).flatten().astype(np.float32)
container.add_initializer(
centers_row_squared_sum_variable_name,
onnx_proto.TensorProto.FLOAT,
[1, K],
centers_row_squared_sum
)

# input_row_squared_sum: [N x 1]
input_row_squared_sum_variable_name = scope.get_unique_variable_name("input_row_squared_sum")
reduce_sum_square_attrs = {
"name": scope.get_unique_operator_name("input_row_squared_sum"),
"axes": [1],
"keepdims": 1,
}
container.add_node(
op_type="ReduceSumSquare",
inputs=[operator.inputs[0].full_name],
outputs=[input_row_squared_sum_variable_name],
op_domain="ai.onnx",
op_version=13,
**reduce_sum_square_attrs
)

# -2 * input * Transpose(Center) + input_row_squared_sum: [N x K]
gemm_output_variable_name = scope.get_unique_variable_name("gemm_output")
gemm_attrs = {
"name": scope.get_unique_operator_name("GeMM"),
"alpha": -2.0,
"beta": 1.0,
"transB": 1,
}
container.add_node(
op_type="Gemm",
inputs=[operator.inputs[0].full_name, centers_variable_name, input_row_squared_sum_variable_name],
outputs=[gemm_output_variable_name],
op_domain="ai.onnx",
op_version=13,
**gemm_attrs
)

# Euclidean Distance Squared = input_row_squared_sum - 2 * input * Transpose(Center) + Transpose(centers_row_squared_sum)
# [N x K]
container.add_node(
op_type = "Add",
inputs = [gemm_output_variable_name, centers_row_squared_sum_variable_name],
outputs = [distance_output_variable_name],
op_domain = "ai.onnx",
op_version = 14,
)
elif op.getDistanceMeasure() == "cosine":
# centers_row_norm2: [1 x K]
centers_row_norm2_variable_name = scope.get_unique_variable_name("centers_row_norm2")
centers_row_norm2 = np.linalg.norm(centers, ord = 2, axis=1).flatten().astype(np.float32)
container.add_initializer(
centers_row_norm2_variable_name,
onnx_proto.TensorProto.FLOAT,
[1, K],
centers_row_norm2
)

# input_row_norm2: [N x 1]
input_row_norm2_variable_name = scope.get_unique_variable_name("input_row_norm2")
reduce_l2_attrs = {
"name": scope.get_unique_operator_name("input_row_norm2"),
"axes": [1],
"keepdims": 1,
}
container.add_node(
op_type="ReduceL2",
inputs=[operator.inputs[0].full_name],
outputs=[input_row_norm2_variable_name],
op_domain="ai.onnx",
op_version=13,
**reduce_l2_attrs
)

# input * Transpose(Center): [N x K]
gemm_output_variable_name = scope.get_unique_variable_name("gemm_output")
gemm_attrs = {
"name": scope.get_unique_operator_name("GeMM"),
"alpha": 1.0,
"transB": 1,
}
container.add_node(
op_type="Gemm",
inputs=[operator.inputs[0].full_name, centers_variable_name],
outputs=[gemm_output_variable_name],
op_domain="ai.onnx",
op_version=13,
**gemm_attrs
)

# Cosine similarity = gemm_output / input_row_norm2 / centers_row_norm2: [N x K]
div_output_variable_name = scope.get_unique_variable_name("div_output")
container.add_node(
op_type = "Div",
inputs = [gemm_output_variable_name, input_row_norm2_variable_name],
outputs = [div_output_variable_name],
op_domain = "ai.onnx",
op_version = 14,
)
cosine_similarity_output_variable_name = scope.get_unique_variable_name("cosine_similarity_output")
container.add_node(
op_type = "Div",
inputs = [div_output_variable_name, centers_row_norm2_variable_name],
outputs = [cosine_similarity_output_variable_name],
op_domain = "ai.onnx",
op_version = 14,
)

# Cosine distance - 1 = -Cosine similarity: [N x K]
container.add_node(
op_type="Neg",
inputs=[cosine_similarity_output_variable_name],
outputs=[distance_output_variable_name],
op_domain="ai.onnx",
op_version=13,
)
else:
raise ValueError(f"Distance measure {op.getDistanceMeasure()} not supported")

# ArgMin(distance): [N]
argmin_attrs = {
"axis": 1,
"keepdims": 0,
}
container.add_node(
op_type="ArgMin",
inputs=[distance_output_variable_name],
outputs=[operator.outputs[0].full_name],
op_domain="ai.onnx",
op_version=13,
**argmin_attrs
)

register_converter('pyspark.ml.clustering.KMeansModel', convert_sparkml_k_means_model)


def calculate_k_means_model_output_shapes(operator: Operator):
check_input_and_output_numbers(operator, input_count_range=1, output_count_range=1)
check_input_and_output_types(operator, good_input_types=[FloatTensorType])
if len(operator.inputs[0].type.shape) != 2:
raise RuntimeError('Input must be a [N, C]-tensor')

N = operator.inputs[0].type.shape[0]
operator.outputs[0].type = Int64TensorType(shape=[N])


register_shape_calculator('pyspark.ml.clustering.KMeansModel', calculate_k_means_model_output_shapes)
4 changes: 4 additions & 0 deletions onnxmltools/convert/sparkml/ops_input_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ def build_io_name_map():
"pyspark.ml.feature.VectorAssembler": (
lambda model: model.getOrDefault("inputCols"),
lambda model: [model.getOrDefault("outputCol")]
),
"pyspark.ml.clustering.KMeansModel": (
lambda model: [model.getOrDefault("featuresCol")],
lambda model: [model.getOrDefault("predictionCol")]
)
}
return map
Expand Down
5 changes: 4 additions & 1 deletion onnxmltools/convert/sparkml/ops_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
from pyspark.ml.regression import LinearRegressionModel

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.clustering import KMeans
from pyspark.ml.clustering import KMeansModel
from pyspark.ml.clustering import GaussianMixture
from pyspark.ml.clustering import LDA

Expand All @@ -70,6 +70,9 @@ def build_sparkml_operator_name_map():
AFTSurvivalRegressionModel, DecisionTreeRegressionModel, GBTRegressionModel, GBTRegressionModel,
GeneralizedLinearRegressionModel, IsotonicRegressionModel, LinearRegressionModel, RandomForestRegressionModel
]})
res.update({k: "pyspark.ml.clustering." + k.__name__ for k in [
KMeansModel
]})
return res


Expand Down
82 changes: 82 additions & 0 deletions tests/sparkml/test_k_means.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# SPDX-License-Identifier: Apache-2.0

import sys
import unittest
import numpy
import pandas
from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors
from onnx.defs import onnx_opset_version
from onnxconverter_common.onnx_ex import DEFAULT_OPSET_NUMBER
from pyspark.ml import Pipeline
from onnxmltools import convert_sparkml
from onnxmltools.convert.common.data_types import FloatTensorType
from tests.sparkml.sparkml_test_utils import save_data_models, run_onnx_model, compare_results
from tests.sparkml import SparkMlTestCase


TARGET_OPSET = min(DEFAULT_OPSET_NUMBER, onnx_opset_version())


class TestSparkmlKMeansModel(SparkMlTestCase):

@unittest.skipIf(sys.version_info < (3, 8),
reason="pickle fails on python 3.7")
def test_model_k_means_euclidean(self):
"""
Testing ONNX conversion for Spark KMeansModel when distanceMeasure is set to "euclidean".
"""
kmeans_euclidean = KMeans(k=3, distanceMeasure="euclidean", featuresCol="features_euclidean", predictionCol="prediction_euclidean")
kmeans_cosine = KMeans(k=3, distanceMeasure="cosine", featuresCol="features_cosine", predictionCol="prediction_cosine")

data = self.spark.createDataFrame([
(0, Vectors.dense([1.0, 3.1, -1.0]),Vectors.dense([1.0, 1.0, 1.0]),),
(1, Vectors.dense([1.1, 3.0, -1.1]),Vectors.dense([2.0, 2.0, 2.0]),),
(2, Vectors.dense([-3.0, 5.1, 9.0]),Vectors.dense([-1.0, 3.0, -5.0]),),
(3, Vectors.dense([-2.9, 4.9, 8.9]),Vectors.dense([-2.0, 6.0, -10.0]),),
(4, Vectors.dense([5.0, -3.5, 2.0]),Vectors.dense([1.0, -2.0, 4.0]),),
(5, Vectors.dense([5.1, -3.3, 2.1]),Vectors.dense([2.0, -4.0, 8.0]),),
], ["id", "features_euclidean", "features_cosine"])

model = Pipeline(stages=[kmeans_euclidean, kmeans_cosine]).fit(data)
model_onnx = convert_sparkml(
model,
'Sparkml KMeansModel',
[('features_euclidean', FloatTensorType([None, 3])), ('features_cosine', FloatTensorType([None, 3]))],
target_opset=TARGET_OPSET
)

self.assertTrue(model_onnx is not None)
self.assertTrue(model_onnx.graph.node is not None)

with open('kmeans.onnx', 'wb') as f:
f.write(model_onnx.SerializeToString())

# run the model
predicted = model.transform(data).toPandas()

data_pd = data.toPandas()
data_np = {
"features_euclidean": data_pd.features_euclidean.apply(lambda x: pandas.Series(x.toArray())).values.astype(numpy.float32),
"features_cosine": data_pd.features_cosine.apply(lambda x: pandas.Series(x.toArray())).values.astype(numpy.float32),
}

expected = {
"prediction_euclidean": numpy.asarray(predicted.prediction_euclidean.values),
"prediction_cosine": numpy.asarray(predicted.prediction_cosine.values),
}

paths = save_data_models(data_np, expected, model, model_onnx, basename="SparkmlKMeansModel")
onnx_model_path = paths[-1]

output_names = ['prediction_euclidean', 'prediction_cosine']
output, output_shapes = run_onnx_model(output_names, data_np, onnx_model_path)
actual_output = dict(zip(output_names, output))

assert output_shapes[0] == [None]
assert output_shapes[1] == [None]
compare_results(expected["prediction_euclidean"], actual_output["prediction_euclidean"], decimal=5)
compare_results(expected["prediction_cosine"], actual_output["prediction_cosine"], decimal=5)

if __name__ == "__main__":
unittest.main()

0 comments on commit 6e29fcc

Please sign in to comment.