From 99b9551cde660154a688af43a9dd5240d9933ce5 Mon Sep 17 00:00:00 2001 From: Xiaochang Wu Date: Tue, 26 Jan 2021 14:33:05 +0800 Subject: [PATCH] Add IP Port kvs_attr and Kmeans, PCA test cases validated --- mllib-dal/build.sh | 32 ++++++++ mllib-dal/pom.xml | 16 ++-- mllib-dal/src/assembly/assembly.xml | 13 +--- .../org/apache/spark/ml/util/LibLoader.java | 5 +- mllib-dal/src/main/native/KMeansDALImpl.cpp | 8 +- mllib-dal/src/main/native/Makefile | 2 +- mllib-dal/src/main/native/OneCCL.cpp | 34 ++++---- mllib-dal/src/main/native/OneCCL.h | 2 +- mllib-dal/src/main/native/PCADALImpl.cpp | 6 +- .../javah/org_apache_spark_ml_util_OneCCL__.h | 4 +- .../spark/ml/clustering/KMeansDALImpl.scala | 4 +- .../apache/spark/ml/feature/PCADALImpl.scala | 4 +- .../org/apache/spark/ml/util/OneCCL.scala | 78 ++++++++++--------- 13 files changed, 126 insertions(+), 82 deletions(-) create mode 100755 mllib-dal/build.sh diff --git a/mllib-dal/build.sh b/mllib-dal/build.sh new file mode 100755 index 000000000..da1d8df75 --- /dev/null +++ b/mllib-dal/build.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +# Check envs for building +if [[ -z $JAVA_HOME ]]; then + echo $JAVA_HOME not defined! + exit 1 +fi + +if [[ -z $DAALROOT ]]; then + echo DAALROOT not defined! + exit 1 +fi + +if [[ -z $TBBROOT ]]; then + echo TBBROOT not defined! + exit 1 +fi + +if [[ -z $CCL_ROOT ]]; then + echo CCL_ROOT not defined! + exit 1 +fi + +echo === Building Environments === +echo JAVA_HOME=$JAVA_HOME +echo DAALROOT=$DAALROOT +echo TBBROOT=$TBBROOT +echo CCL_ROOT=$CCL_ROOT +echo GCC Version: $(gcc -dumpversion) +echo ============================= + +mvn -DskipTests clean package diff --git a/mllib-dal/pom.xml b/mllib-dal/pom.xml index 01e002830..4e51f9157 100644 --- a/mllib-dal/pom.xml +++ b/mllib-dal/pom.xml @@ -218,10 +218,12 @@ ${env.CCL_ROOT}/lib - libpmi.so.1 - libresizable_pmi.so.1 + + + libmpi.so.12.0.0 libfabric.so.1 - libccl_atl_ofi.so.1 + libccl.so + @@ -271,9 +273,13 @@ ${project.build.testOutputDirectory}/lib/libtbbmalloc.so.2 - ${project.build.testOutputDirectory}/lib/libccl_atl_ofi.so.1 - ${project.build.testOutputDirectory}/lib/libccl_atl_ofi.so + ${project.build.testOutputDirectory}/lib/libmpi.so.12.0.0 + ${project.build.testOutputDirectory}/lib/libmpi.so.12 + + + + diff --git a/mllib-dal/src/assembly/assembly.xml b/mllib-dal/src/assembly/assembly.xml index 137f19b81..498b90e02 100644 --- a/mllib-dal/src/assembly/assembly.xml +++ b/mllib-dal/src/assembly/assembly.xml @@ -58,26 +58,21 @@ - ${env.CCL_ROOT}/lib/libpmi.so.1 + ${env.CCL_ROOT}/lib/libfabric.so.1 lib - ${env.CCL_ROOT}/lib/libresizable_pmi.so.1 + ${env.CCL_ROOT}/lib/libmpi.so.12.0.0 lib + libmpi.so.12 - ${env.CCL_ROOT}/lib//libfabric.so.1 + ${env.CCL_ROOT}/lib/libccl.so lib ${env.CCL_ROOT}/lib/prov/libsockets-fi.so lib - - - ${env.CCL_ROOT}/lib/libccl_atl_ofi.so.1 - lib - libccl_atl_ofi.so - \ No newline at end of file diff --git a/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java b/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java index 5b51451ae..c11b4e56e 100644 --- a/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java +++ b/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java @@ -55,11 +55,10 @@ public static synchronized void loadLibraries() throws IOException { * Load oneCCL libs in dependency order */ public static synchronized void loadLibCCL() throws IOException { - loadFromJar(subDir, "libpmi.so.1"); - loadFromJar(subDir, "libresizable_pmi.so.1"); loadFromJar(subDir, "libfabric.so.1"); + loadFromJar(subDir, "libmpi.so.12"); + loadFromJar(subDir, "libccl.so"); loadFromJar(subDir, "libsockets-fi.so"); - loadFromJar(subDir, "libccl_atl_ofi.so"); } /** diff --git a/mllib-dal/src/main/native/KMeansDALImpl.cpp b/mllib-dal/src/main/native/KMeansDALImpl.cpp index 688dd84b3..d9c7a2f29 100644 --- a/mllib-dal/src/main/native/KMeansDALImpl.cpp +++ b/mllib-dal/src/main/native/KMeansDALImpl.cpp @@ -163,8 +163,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_clustering_KMeansDALImpl_cKMean jint executor_num, jint executor_cores, jobject resultObj) { - ccl::communicator *comm = getComm(); - size_t rankId = comm->rank(); + ccl::communicator &comm = getComm(); + size_t rankId = comm.rank(); NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); NumericTablePtr centroids = *((NumericTablePtr *)pNumTabCenters); @@ -184,14 +184,14 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_clustering_KMeansDALImpl_cKMean for (it = 0; it < iteration_num && !converged; it++) { auto t1 = std::chrono::high_resolution_clock::now(); - newCentroids = kmeans_compute(rankId, *comm, pData, centroids, cluster_num, executor_num, totalCost); + newCentroids = kmeans_compute(rankId, comm, pData, centroids, cluster_num, executor_num, totalCost); if (rankId == ccl_root) { converged = areAllCentersConverged(centroids, newCentroids, tolerance); } // Sync converged status - ccl::broadcast(&converged, 1, ccl::datatype::uint8, ccl_root, *comm).wait(); + ccl::broadcast(&converged, 1, ccl::datatype::uint8, ccl_root, comm).wait(); centroids = newCentroids; diff --git a/mllib-dal/src/main/native/Makefile b/mllib-dal/src/main/native/Makefile index bb071d6ec..d6f5ce431 100644 --- a/mllib-dal/src/main/native/Makefile +++ b/mllib-dal/src/main/native/Makefile @@ -31,7 +31,7 @@ INCS := -I $(JAVA_HOME)/include \ # Use static link if possible, TBB is only available as dynamic libs -LIBS := -L${CCL_ROOT}/lib -l:libccl.a \ +LIBS := -L${CCL_ROOT}/lib -lccl \ -L$(DAALROOT)/lib/intel64 -l:libdaal_core.a -l:libdaal_thread.a \ -L$(TBBROOT)/lib -ltbb -ltbbmalloc # TODO: Add signal chaining support, should fix linking, package so and loading diff --git a/mllib-dal/src/main/native/OneCCL.cpp b/mllib-dal/src/main/native/OneCCL.cpp index 9d84754cf..a0fb131a8 100644 --- a/mllib-dal/src/main/native/OneCCL.cpp +++ b/mllib-dal/src/main/native/OneCCL.cpp @@ -6,27 +6,32 @@ size_t comm_size; size_t rank_id; -ccl::communicator *getComm() { - ccl::shared_ptr_class kvs; - static ccl::communicator b = ccl::create_communicator(comm_size, rank_id, kvs); - return &b; +std::vector g_comms; + +ccl::communicator &getComm() { + return g_comms[0]; } JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init - (JNIEnv *env, jobject obj, jobject param) { + (JNIEnv *env, jobject obj, jint size, jint rank, jstring ip_port, jobject param) { std::cout << "oneCCL (native): init" << std::endl; ccl::init(); + const char *str = env->GetStringUTFChars(ip_port, 0); + ccl::string ccl_ip_port(str); + + auto kvs_attr = ccl::create_kvs_attr(); + kvs_attr.set(ccl_ip_port); + ccl::shared_ptr_class kvs; - ccl::kvs::address_type main_addr; - kvs = ccl::create_kvs(main_addr); - - auto comm = getComm(); + kvs = ccl::create_main_kvs(kvs_attr); - rank_id = comm->rank(); - comm_size = comm->size(); + g_comms.push_back(ccl::create_communicator(size, rank, kvs)); + + rank_id = getComm().rank(); + comm_size = getComm().size(); jclass cls = env->GetObjectClass(param); jfieldID fid_comm_size = env->GetFieldID(cls, "commSize", "J"); @@ -34,6 +39,7 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init env->SetLongField(param, fid_comm_size, comm_size); env->SetLongField(param, fid_rank_id, rank_id); + env->ReleaseStringUTFChars(ip_port, str); return 1; } @@ -46,6 +52,8 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init JNIEXPORT void JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1cleanup (JNIEnv *env, jobject obj) { + g_comms.pop_back(); + std::cout << "oneCCL (native): cleanup" << std::endl; } @@ -58,7 +66,7 @@ JNIEXPORT void JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1cleanup JNIEXPORT jboolean JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_isRoot (JNIEnv *env, jobject obj) { - return getComm()->rank() == 0; + return getComm().rank() == 0; } /* @@ -68,7 +76,7 @@ JNIEXPORT jboolean JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_isRoot */ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_rankID (JNIEnv *env, jobject obj) { - return getComm()->rank(); + return getComm().rank(); } /* diff --git a/mllib-dal/src/main/native/OneCCL.h b/mllib-dal/src/main/native/OneCCL.h index bead7fdee..b579c4697 100644 --- a/mllib-dal/src/main/native/OneCCL.h +++ b/mllib-dal/src/main/native/OneCCL.h @@ -2,4 +2,4 @@ #include -ccl::communicator *getComm(); +ccl::communicator &getComm(); diff --git a/mllib-dal/src/main/native/PCADALImpl.cpp b/mllib-dal/src/main/native/PCADALImpl.cpp index 7aa1dd488..57e7f5dc5 100644 --- a/mllib-dal/src/main/native/PCADALImpl.cpp +++ b/mllib-dal/src/main/native/PCADALImpl.cpp @@ -25,8 +25,8 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL JNIEnv *env, jobject obj, jlong pNumTabData, jint k, jint executor_num, jint executor_cores, jobject resultObj) { - ccl::communicator *comm = getComm(); - size_t rankId = comm->rank(); + ccl::communicator &comm = getComm(); + size_t rankId = comm.rank(); const size_t nBlocks = executor_num; const int comm_size = executor_num; @@ -71,7 +71,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL // MPI_Gather(nodeResults, perNodeArchLength, MPI_CHAR, serializedData.get(), // perNodeArchLength, MPI_CHAR, ccl_root, MPI_COMM_WORLD); ccl::allgatherv(nodeResults, perNodeArchLength, serializedData.get(), recv_counts, - ccl::datatype::uint8, *comm).wait(); + ccl::datatype::uint8, comm).wait(); auto t2 = std::chrono::high_resolution_clock::now(); diff --git a/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h b/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h index 60825ae3f..4066067f6 100644 --- a/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h +++ b/mllib-dal/src/main/native/javah/org_apache_spark_ml_util_OneCCL__.h @@ -10,10 +10,10 @@ extern "C" { /* * Class: org_apache_spark_ml_util_OneCCL__ * Method: c_init - * Signature: (Lorg/apache/spark/ml/util/CCLParam;)I + * Signature: (IILjava/lang/String;Lorg/apache/spark/ml/util/CCLParam;)I */ JNIEXPORT jint JNICALL Java_org_apache_spark_ml_util_OneCCL_00024_c_1init - (JNIEnv *, jobject, jobject); + (JNIEnv *, jobject, jint, jint, jstring, jobject); /* * Class: org_apache_spark_ml_util_OneCCL__ diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala index 8a69d13f6..d8829b2c9 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/clustering/KMeansDALImpl.scala @@ -111,9 +111,9 @@ class KMeansDALImpl ( }.cache() - val results = coalescedTables.mapPartitions { table => + val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, executorIPAddress, OneCCL.KVS_PORT) + OneCCL.init(executorNum, rank, executorIPAddress) val initCentroids = OneDAL.makeNumericTable(centers) val result = new KMeansResult() diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala index 1760aa171..6f9aaa442 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/feature/PCADALImpl.scala @@ -48,9 +48,9 @@ class PCADALImpl ( val executorIPAddress = Utils.sparkFirstExecutorIP(input.sparkContext) - val results = coalescedTables.mapPartitions { table => + val results = coalescedTables.mapPartitionsWithIndex { (rank, table) => val tableArr = table.next() - OneCCL.init(executorNum, executorIPAddress, OneCCL.KVS_PORT) + OneCCL.init(executorNum, rank, executorIPAddress) val result = new PCAResult() cPCATrainDAL( diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala index 87022f4c9..af9080856 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneCCL.scala @@ -23,52 +23,56 @@ object OneCCL { var cclParam = new CCLParam() - var kvsIPPort = sys.env.getOrElse("CCL_KVS_IP_PORT", "") - var worldSize = sys.env.getOrElse("CCL_WORLD_SIZE", "1").toInt - - val KVS_PORT = 51234 - - private def checkEnv() { - val altTransport = sys.env.getOrElse("CCL_ATL_TRANSPORT", "") - val pmType = sys.env.getOrElse("CCL_PM_TYPE", "") - val ipExchange = sys.env.getOrElse("CCL_KVS_IP_EXCHANGE", "") - - assert(altTransport == "ofi") - assert(pmType == "resizable") - assert(ipExchange == "env") - assert(kvsIPPort != "") - - } +// var kvsIPPort = sys.env.getOrElse("CCL_KVS_IP_PORT", "") +// var worldSize = sys.env.getOrElse("CCL_WORLD_SIZE", "1").toInt + + var kvsPort = 5000 + +// private def checkEnv() { +// val altTransport = sys.env.getOrElse("CCL_ATL_TRANSPORT", "") +// val pmType = sys.env.getOrElse("CCL_PM_TYPE", "") +// val ipExchange = sys.env.getOrElse("CCL_KVS_IP_EXCHANGE", "") +// +// assert(altTransport == "ofi") +// assert(pmType == "resizable") +// assert(ipExchange == "env") +// assert(kvsIPPort != "") +// +// } // Run on Executor - def setExecutorEnv(executor_num: Int, ip: String, port: Int): Unit = { - // Work around ccl by passings in a spark.executorEnv.CCL_KVS_IP_PORT. - val ccl_kvs_ip_port = sys.env.getOrElse("CCL_KVS_IP_PORT", s"${ip}_${port}") - - println(s"oneCCL: Initializing with CCL_KVS_IP_PORT: $ccl_kvs_ip_port") - - setEnv("CCL_PM_TYPE", "resizable") - setEnv("CCL_ATL_TRANSPORT","ofi") - setEnv("CCL_ATL_TRANSPORT_PATH", LibLoader.getTempSubDir()) - setEnv("CCL_KVS_IP_EXCHANGE","env") - setEnv("CCL_KVS_IP_PORT", ccl_kvs_ip_port) - setEnv("CCL_WORLD_SIZE", s"${executor_num}") - // Uncomment this if you whant to debug oneCCL - // setEnv("CCL_LOG_LEVEL", "2") - } - - def init(executor_num: Int, ip: String, port: Int) = { - - setExecutorEnv(executor_num, ip, port) +// def setExecutorEnv(executor_num: Int, ip: String, port: Int): Unit = { +// // Work around ccl by passings in a spark.executorEnv.CCL_KVS_IP_PORT. +// val ccl_kvs_ip_port = sys.env.getOrElse("CCL_KVS_IP_PORT", s"${ip}_${port}") +// +// println(s"oneCCL: Initializing with CCL_KVS_IP_PORT: $ccl_kvs_ip_port") +// +// setEnv("CCL_PM_TYPE", "resizable") +// setEnv("CCL_ATL_TRANSPORT","ofi") +// setEnv("CCL_ATL_TRANSPORT_PATH", LibLoader.getTempSubDir()) +// setEnv("CCL_KVS_IP_EXCHANGE","env") +// setEnv("CCL_KVS_IP_PORT", ccl_kvs_ip_port) +// setEnv("CCL_WORLD_SIZE", s"${executor_num}") +// // Uncomment this if you whant to debug oneCCL +// // setEnv("CCL_LOG_LEVEL", "2") +// } + + def init(executor_num: Int, rank: Int, ip: String) = { + +// setExecutorEnv(executor_num, ip, port) + println(s"oneCCL: Initializing with IP_PORT: ${ip}_${kvsPort}") // cclParam is output from native code - c_init(cclParam) + c_init(executor_num, rank, ip+"_"+kvsPort.toString, cclParam) // executor number should equal to oneCCL world size assert(executor_num == cclParam.commSize, "executor number should equal to oneCCL world size") println(s"oneCCL: Initialized with executorNum: $executor_num, commSize, ${cclParam.commSize}, rankId: ${cclParam.rankId}") + // Use a new port when calling init again + kvsPort = kvsPort + 1 + } // Run on Executor @@ -76,7 +80,7 @@ object OneCCL { c_cleanup() } - @native private def c_init(param: CCLParam) : Int + @native private def c_init(size: Int, rank: Int, ip_port: String, param: CCLParam) : Int @native private def c_cleanup() : Unit @native def isRoot() : Boolean