From d077d8e723aede3b2d933b78f7820c89880120ad Mon Sep 17 00:00:00 2001 From: "Wu, Xiaochang" Date: Thu, 12 Aug 2021 10:35:40 +0800 Subject: [PATCH] use getOneCCLIPPort --- .../spark/ml/clustering/KMeansDALImpl.scala | 10 ++-------- .../apache/spark/ml/feature/PCADALImpl.scala | 11 ++--------- .../spark/ml/recommendation/ALSDALImpl.scala | 18 +++++------------- .../scala/org/apache/spark/ml/util/Utils.scala | 9 +++++---- 4 files changed, 14 insertions(+), 34 deletions(-) diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala index 096b0326f..2330e5a67 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala @@ -21,6 +21,7 @@ import com.intel.daal.services.DaalContext import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg.Vector +import org.apache.spark.ml.util.Utils.getOneCCLIPPort import org.apache.spark.ml.util._ import org.apache.spark.mllib.clustering.{KMeansModel => MLlibKMeansModel} import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors} @@ -39,14 +40,7 @@ class KMeansDALImpl(var nClusters: Int, val coalescedTables = OneDAL.rddVectorToMergedTables(data, executorNum) - val executorIPAddress = Utils.sparkFirstExecutorIP(coalescedTables.sparkContext) - val kvsIP = coalescedTables.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", - executorIPAddress) - val kvsPortDetected = Utils.checkExecutorAvailPort(coalescedTables, kvsIP) - val kvsPort = coalescedTables.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", - kvsPortDetected) - - val kvsIPPort = kvsIP + "_" + kvsPort + val kvsIPPort = getOneCCLIPPort(coalescedTables) val sparkContext = data.sparkContext val useGPU = sparkContext.conf.getBoolean("spark.oap.mllib.useGPU", false) diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala index 2b9b7e26e..9908f173b 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala @@ -21,6 +21,7 @@ import com.intel.daal.data_management.data.{HomogenNumericTable, NumericTable} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging import org.apache.spark.ml.linalg._ +import org.apache.spark.ml.util.Utils.getOneCCLIPPort import org.apache.spark.ml.util.{OneCCL, OneDAL, Utils} import org.apache.spark.mllib.feature.{PCAModel => MLlibPCAModel, StandardScaler => MLlibStandardScaler} import org.apache.spark.mllib.linalg.{DenseMatrix => OldDenseMatrix, Vectors => OldVectors} @@ -37,15 +38,7 @@ class PCADALImpl(val k: Int, val coalescedTables = OneDAL.rddVectorToMergedTables(normalizedData, executorNum) - val executorIPAddress = Utils.sparkFirstExecutorIP(coalescedTables.sparkContext) - val kvsIP = coalescedTables.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", - executorIPAddress) - - val kvsPortDetected = Utils.checkExecutorAvailPort(coalescedTables, kvsIP) - val kvsPort = coalescedTables.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", - kvsPortDetected) - - val kvsIPPort = kvsIP + "_" + kvsPort + val kvsIPPort = getOneCCLIPPort(coalescedTables) val sparkContext = data.sparkContext val useGPU = sparkContext.conf.getBoolean("spark.oap.mllib.useGPU", false) diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/recommendation/ALSDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/recommendation/ALSDALImpl.scala index 6b4c9f8b1..88653afc3 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/recommendation/ALSDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/recommendation/ALSDALImpl.scala @@ -17,16 +17,15 @@ package org.apache.spark.ml.recommendation import java.nio.{ByteBuffer, ByteOrder, FloatBuffer} - import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag - import com.intel.daal.data_management.data.CSRNumericTable import com.intel.daal.services.DaalContext - import org.apache.spark.Partitioner import org.apache.spark.internal.Logging import org.apache.spark.ml.recommendation.ALS.Rating +import org.apache.spark.ml.util.LibLoader.loadLibraries +import org.apache.spark.ml.util.Utils.getOneCCLIPPort import org.apache.spark.ml.util._ import org.apache.spark.rdd.RDD @@ -77,15 +76,7 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], val numericTables = data.repartition(executorNum) .setName("Repartitioned for conversion").cache() - val executorIPAddress = Utils.sparkFirstExecutorIP(numericTables.sparkContext) - val kvsIP = numericTables.sparkContext.conf.get( - "spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) - - val kvsPortDetected = Utils.checkExecutorAvailPort(numericTables, kvsIP) - val kvsPort = numericTables.sparkContext.conf.getInt( - "spark.oap.mllib.oneccl.kvs.port", kvsPortDetected) - - val kvsIPPort = kvsIP + "_" + kvsPort + val kvsIPPort = getOneCCLIPPort(numericTables) val results = numericTables // Transpose the dataset @@ -93,7 +84,8 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], Rating(p.item, p.user, p.rating) } .mapPartitionsWithIndex { (rank, iter) => - val context = new DaalContext() + // TODO: Use one-time init to load libraries + loadLibraries() OneCCL.init(executorNum, rank, kvsIPPort) val rankId = OneCCL.rankID() diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala index 2908baccb..d680830bf 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala @@ -33,8 +33,9 @@ object Utils { val executorIPAddress = Utils.sparkFirstExecutorIP(data.sparkContext) val kvsIP = data.sparkContext.conf.get("spark.oap.mllib.oneccl.kvs.ip", executorIPAddress) - val kvsPortDetected = 5000 -// val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP) + // TODO: right now we use a configured port, will optimize auto port detection + // val kvsPortDetected = Utils.checkExecutorAvailPort(data, kvsIP) + val kvsPortDetected = 3000 val kvsPort = data.sparkContext.conf.getInt("spark.oap.mllib.oneccl.kvs.port", kvsPortDetected) @@ -126,9 +127,9 @@ object Utils { // check workers' platform compatibility val executor_num = Utils.sparkExecutorNum(sc) val data = sc.parallelize(1 to executor_num, executor_num) - val result = data.map { p => + val result = data.mapPartitions { p => LibLoader.loadLibraries() - OneDAL.cCheckPlatformCompatibility() + Iterator(OneDAL.cCheckPlatformCompatibility()) }.collect() result.forall(_ == true)