From a639defa5a5638c8fd83a8f09ea5663bc6c905ab Mon Sep 17 00:00:00 2001 From: Xiaochang Wu Date: Wed, 4 Aug 2021 10:51:52 +0800 Subject: [PATCH] [ML-108] Update PCA GPU, LiR CPU and Improve JAR packaging and libs loading (#111) * Update build scripts for CPU_GPU_PROFILE * Refactor and add PCA GPU * Update load gpu libs and use checkClusterPlatformCompatibility to load libs * update LinearRegression for checkClusterPlatformCompatibility * Update pom and README * Update README --- README.md | 13 +- mllib-dal/build-cpu-gpu.sh | 85 +++++++++++++ mllib-dal/build.sh | 3 + mllib-dal/pom.xml | 22 +++- mllib-dal/src/assembly/assembly-cpu-gpu.xml | 103 +++++++++++++++ .../org/apache/spark/ml/util/LibLoader.java | 23 +++- mllib-dal/src/main/native/ALSDALImpl.cpp | 1 - mllib-dal/src/main/native/ALSShuffle.cpp | 1 - mllib-dal/src/main/native/GPU.cpp | 40 +----- mllib-dal/src/main/native/GPU.h | 8 +- mllib-dal/src/main/native/KMeansDALImpl.cpp | 30 ++--- .../main/native/LinearRegressionDALImpl.cpp | 2 - mllib-dal/src/main/native/Makefile | 45 +++++-- .../src/main/native/NaiveBayesDALImpl.cpp | 1 - mllib-dal/src/main/native/OneDAL.cpp | 1 - mllib-dal/src/main/native/PCADALImpl.cpp | 110 ++++++++++------ mllib-dal/src/main/native/build-cpu-gpu.sh | 20 +++ mllib-dal/src/main/native/service.cpp | 4 +- mllib-dal/src/main/native/service.h | 9 +- .../spark/ml/recommendation/ALSDALImpl.scala | 2 - .../org/apache/spark/ml/util/OneDAL.scala | 15 --- .../org/apache/spark/ml/util/Utils.scala | 5 +- .../ml/regression/LinearRegression.scala | 4 +- .../ml/regression/LinearRegression.scala | 4 +- .../ml/regression/LinearRegression.scala | 4 +- .../ml/regression/LinearRegression.scala | 4 +- mllib-dal/test-cpu-gpu.sh | 117 ++++++++++++++++++ mllib-dal/test.sh | 3 + 28 files changed, 535 insertions(+), 144 deletions(-) create mode 100755 mllib-dal/build-cpu-gpu.sh create mode 100644 mllib-dal/src/assembly/assembly-cpu-gpu.xml create mode 100755 mllib-dal/src/main/native/build-cpu-gpu.sh create mode 100755 mllib-dal/test-cpu-gpu.sh diff --git a/README.md b/README.md index 5a64b08fe..4dff7d7d5 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,10 @@ OAP MLlib is an optimized package to accelerate machine learning algorithms in ## Compatibility -OAP MLlib tried to maintain the same API interfaces and produce same results that are identical with Spark MLlib. However due to the nature of float point operations, there may be some small deviation from the original result, we will try our best to make sure the error is within acceptable range. +OAP MLlib maintains the same API interfaces with Spark MLlib. That means the application built with Spark MLlib can be running directly with minimum configuration. + +Most of the algorithms can produce the same results that are identical with Spark MLlib. However due to the nature of distributed float point operations, there may be some small deviation from the original result, we will make sure the error is within acceptable range and the accuracy is on par with Spark MLlib. + For those algorithms that are not accelerated by OAP MLlib, the original Spark MLlib one will be used. ## Online Documentation @@ -216,8 +219,10 @@ als-pyspark | ALS example for PySpark Algorithm | Category | Maturity ------------------|----------|------------- -K-Means | CPU, GPU | Experimental -PCA | CPU | Experimental -ALS | CPU | Experimental +K-Means | CPU | Stable +K-Means | GPU | Experimental +PCA | CPU | Stable +PCA | GPU | Experimental +ALS | CPU | Stable Naive Bayes | CPU | Experimental Linear Regression | CPU | Experimental diff --git a/mllib-dal/build-cpu-gpu.sh b/mllib-dal/build-cpu-gpu.sh new file mode 100755 index 000000000..27b1777d9 --- /dev/null +++ b/mllib-dal/build-cpu-gpu.sh @@ -0,0 +1,85 @@ +#!/usr/bin/env bash + +# Check envs for building +if [[ -z $JAVA_HOME ]]; then + echo JAVA_HOME not defined! + exit 1 +fi + +if [[ -z $(which mvn) ]]; then + echo Maven not found! + exit 1 +fi + +if [[ -z $DAALROOT ]]; then + echo DAALROOT not defined! + exit 1 +fi + +if [[ -z $TBBROOT ]]; then + echo TBBROOT not defined! + exit 1 +fi + +if [[ -z $CCL_ROOT ]]; then + echo CCL_ROOT not defined! + exit 1 +fi + +versionArray=( + spark-3.0.0 \ + spark-3.0.1 \ + spark-3.0.2 \ + spark-3.1.1 +) + +SPARK_VER=spark-3.1.1 +MVN_NO_TRANSFER_PROGRESS= + +print_usage() { + echo + echo Usage: ./build.sh [-p spark-x.x.x] [-q] [-h] + echo + echo Supported Spark versions: + for version in ${versionArray[*]} + do + echo " $version" + done + echo +} + +while getopts "hqp:" opt +do +case $opt in + p) SPARK_VER=$OPTARG ;; + q) MVN_NO_TRANSFER_PROGRESS=--no-transfer-progress ;; + h | *) + print_usage + exit 1 + ;; +esac +done + +if [[ ! ${versionArray[*]} =~ $SPARK_VER ]]; then + echo Error: $SPARK_VER version is not supported! + exit 1 +fi + +export PLATFORM_PROFILE=CPU_GPU_PROFILE + +print_usage + +echo === Building Environments === +echo JAVA_HOME=$JAVA_HOME +echo DAALROOT=$DAALROOT +echo TBBROOT=$TBBROOT +echo CCL_ROOT=$CCL_ROOT +echo Maven Version: $(mvn -v | head -n 1 | cut -f3 -d" ") +echo Clang Version: $(clang -dumpversion) +echo Spark Version: $SPARK_VER +echo Platform Profile: $PLATFORM_PROFILE +echo ============================= +echo +echo Building with $SPARK_VER ... +echo +mvn $MVN_NO_TRANSFER_PROGRESS -P$SPARK_VER -DskipTests clean package diff --git a/mllib-dal/build.sh b/mllib-dal/build.sh index 0a0ac178e..7ae84e01f 100755 --- a/mllib-dal/build.sh +++ b/mllib-dal/build.sh @@ -67,6 +67,8 @@ fi print_usage +export PLATFORM_PROFILE=CPU_ONLY_PROFILE + echo === Building Environments === echo JAVA_HOME=$JAVA_HOME echo DAALROOT=$DAALROOT @@ -75,6 +77,7 @@ echo CCL_ROOT=$CCL_ROOT echo Maven Version: $(mvn -v | head -n 1 | cut -f3 -d" ") echo Clang Version: $(clang -dumpversion) echo Spark Version: $SPARK_VER +echo Platform Profile: $PLATFORM_PROFILE echo ============================= echo echo Building with $SPARK_VER ... diff --git a/mllib-dal/pom.xml b/mllib-dal/pom.xml index d35d7a13d..d1ef806ba 100644 --- a/mllib-dal/pom.xml +++ b/mllib-dal/pom.xml @@ -24,6 +24,9 @@ libccl.so libfabric.so.1 libmpi.so.12.0.0 + libOpenCL.so.1 + libsycl.so.5 + src/assembly/assembly.xml @@ -149,10 +152,20 @@ - spark-3.0.0 + cpu-gpu - true + + env.PLATFORM_PROFILE + CPU_GPU_PROFILE + + + src/assembly/assembly-cpu-gpu.xml + + + + + spark-3.0.0 3.0.0 3.0.8 @@ -177,6 +190,9 @@ spark-3.1.1 + + true + 3.1.1 3.2.3 @@ -435,7 +451,7 @@ false - src/assembly/assembly.xml + ${assembly.description} diff --git a/mllib-dal/src/assembly/assembly-cpu-gpu.xml b/mllib-dal/src/assembly/assembly-cpu-gpu.xml new file mode 100644 index 000000000..d49fd1adf --- /dev/null +++ b/mllib-dal/src/assembly/assembly-cpu-gpu.xml @@ -0,0 +1,103 @@ + + jar-with-dependencies + + jar + + false + + + / + true + true + runtime + + + + / + true + system + + + + + ${project.basedir} + / + + README* + LICENSE* + NOTICE* + + + + ${project.build.directory} + lib + + *.so + + + + + + + ${env.TBBROOT}/lib/intel64/gcc4.8/${tbb.lib} + lib + libtbb.so.2 + + + ${env.TBBROOT}/lib/intel64/gcc4.8/${tbb.malloc.lib} + lib + libtbbmalloc.so.2 + + + + ${env.DAALROOT}/lib/intel64/${dal.java.lib} + lib + libJavaAPI.so + + + + ${env.CCL_ROOT}/lib/${ccl.fabric.lib} + lib + + + ${env.CCL_ROOT}/lib/${ccl.mpi.lib} + lib + libmpi.so.12 + + + ${env.CCL_ROOT}/lib/libccl.so + lib + + + ${env.CCL_ROOT}/lib/prov/libsockets-fi.so + lib + + + + ${env.CMPLR_ROOT}/linux/compiler/lib/intel64_lin/libintlc.so.5 + lib + + + ${env.CMPLR_ROOT}/linux/compiler/lib/intel64_lin/libsvml.so + lib + + + ${env.CMPLR_ROOT}/linux/compiler/lib/intel64_lin/libirng.so + lib + + + ${env.CMPLR_ROOT}/linux/compiler/lib/intel64_lin/libimf.so + lib + + + ${env.CMPLR_ROOT}/linux/lib/${opencl.lib} + lib + + + ${env.CMPLR_ROOT}/linux/lib/${sycl.lib} + lib + + + diff --git a/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java b/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java index eada9b20c..ac1153ffc 100644 --- a/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java +++ b/mllib-dal/src/main/java/org/apache/spark/ml/util/LibLoader.java @@ -42,9 +42,12 @@ public static String getTempSubDir() { } /** - * Load oneCCL and MLlibDAL libs + * Load all native libs */ public static synchronized void loadLibraries() throws IOException { + if (!loadLibSYCL()) { + log.debug("SYCL libraries are not available, will load CPU libraries only."); + } loadLibCCL(); loadLibMLlibDAL(); } @@ -59,6 +62,24 @@ private static synchronized void loadLibCCL() throws IOException { loadFromJar(subDir, "libsockets-fi.so"); } + /** + * Load SYCL libs in dependency order + */ + private static synchronized Boolean loadLibSYCL() throws IOException { + // Check if SYCL libraries are available + InputStream streamIn = LibLoader.class.getResourceAsStream(LIBRARY_PATH_IN_JAR + "/libsycl.so.5"); + if (streamIn == null) { + return false; + } + loadFromJar(subDir, "libintlc.so.5"); + loadFromJar(subDir, "libimf.so"); + loadFromJar(subDir, "libirng.so"); + loadFromJar(subDir, "libsvml.so"); + loadFromJar(subDir, "libOpenCL.so.1"); + loadFromJar(subDir, "libsycl.so.5"); + return true; + } + /** * Load MLlibDAL lib, it depends TBB libs that are loaded by oneDAL, so this * function should be called after oneDAL loadLibrary diff --git a/mllib-dal/src/main/native/ALSDALImpl.cpp b/mllib-dal/src/main/native/ALSDALImpl.cpp index 941b54c65..5f2de2544 100644 --- a/mllib-dal/src/main/native/ALSDALImpl.cpp +++ b/mllib-dal/src/main/native/ALSDALImpl.cpp @@ -16,7 +16,6 @@ #include #include -#include #include #include "OneCCL.h" diff --git a/mllib-dal/src/main/native/ALSShuffle.cpp b/mllib-dal/src/main/native/ALSShuffle.cpp index 313a0c393..806511fbc 100644 --- a/mllib-dal/src/main/native/ALSShuffle.cpp +++ b/mllib-dal/src/main/native/ALSShuffle.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include diff --git a/mllib-dal/src/main/native/GPU.cpp b/mllib-dal/src/main/native/GPU.cpp index 2e182adc1..122e50887 100644 --- a/mllib-dal/src/main/native/GPU.cpp +++ b/mllib-dal/src/main/native/GPU.cpp @@ -2,15 +2,10 @@ #include #include -#include -#include - -#include - #include "GPU.h" #include "service.h" -std::vector get_gpus() { +static std::vector get_gpus() { auto platforms = sycl::platform::get_platforms(); for (auto p : platforms) { auto devices = p.get_devices(sycl::info::device_type::gpu); @@ -24,7 +19,7 @@ std::vector get_gpus() { return {}; } -int getLocalRank(ccl::communicator &comm, int size, int rank) { +static int getLocalRank(ccl::communicator &comm, int size, int rank) { const int MPI_MAX_PROCESSOR_NAME = 128; /* Obtain local rank among nodes sharing the same host name */ char zero = static_cast(0); @@ -51,34 +46,8 @@ int getLocalRank(ccl::communicator &comm, int size, int rank) { // return 0; } -//void setGPUContext(ccl::communicator &comm, jint *gpu_idx, int n_gpu) { -// int rank = comm.rank(); -// int size = comm.size(); -// -// /* Create GPU device from local rank and set execution context */ -// auto local_rank = getLocalRank(comm, size, rank); -// auto gpus = get_gpus(); -// -// std::cout << rank << " " << size << " " << local_rank << " " << n_gpu -// << std::endl; -// -// for (int i = 0; i < n_gpu; i++) { -// std::cout << gpu_idx[i] << std::endl; -// } -// -// // auto rank_gpu = gpus[gpu_idx[local_rank % n_gpu]]; -// static auto rank_gpu = gpus[0]; -// -// static cl::sycl::queue queue(rank_gpu); -// std::cout << "SyclExecutionContext" << std::endl; -// static daal::services::SyclExecutionContext ctx(queue); -// std::cout << "setDefaultExecutionContext" << std::endl; -// daal::services::Environment::getInstance()->setDefaultExecutionContext(ctx); -//} - sycl::device getAssignedGPU(ccl::communicator &comm, int size, int rankId, - jint *gpu_indices, int n_gpu - ) { + jint *gpu_indices, int n_gpu) { auto local_rank = getLocalRank(comm, size, rankId); auto gpus = get_gpus(); @@ -87,8 +56,7 @@ sycl::device getAssignedGPU(ccl::communicator &comm, int size, int rankId, << std::endl; auto gpu_selected = gpu_indices[local_rank % n_gpu]; - std::cout << "GPU selected for current rank: " << gpu_selected - << std::endl; + std::cout << "GPU selected for current rank: " << gpu_selected << std::endl; auto rank_gpu = gpus[gpu_selected % gpus.size()]; return rank_gpu; diff --git a/mllib-dal/src/main/native/GPU.h b/mllib-dal/src/main/native/GPU.h index cf5805c2a..b82b68291 100644 --- a/mllib-dal/src/main/native/GPU.h +++ b/mllib-dal/src/main/native/GPU.h @@ -1,12 +1,10 @@ #pragma once +#include #include +#include #include #include -int getLocalRank(ccl::communicator &comm, int size, int rank); -std::vector get_gpus(); -// void setGPUContext(ccl::communicator &comm, jint *gpu_idx, int n_gpu); - sycl::device getAssignedGPU(ccl::communicator &comm, int size, int rankId, - jint *gpu_indices, int n_gpu); \ No newline at end of file + jint *gpu_indices, int n_gpu); diff --git a/mllib-dal/src/main/native/KMeansDALImpl.cpp b/mllib-dal/src/main/native/KMeansDALImpl.cpp index 14b09afce..47f1b5dae 100644 --- a/mllib-dal/src/main/native/KMeansDALImpl.cpp +++ b/mllib-dal/src/main/native/KMeansDALImpl.cpp @@ -17,19 +17,14 @@ #include #include -#include -#include - -#include - -#include +#ifdef CPU_GPU_PROFILE +#include "GPU.h" +#endif #include "OneCCL.h" #include "org_apache_spark_ml_clustering_KMeansDALImpl.h" #include "service.h" -#include "GPU.h" - using namespace std; using namespace daal; using namespace daal::algorithms; @@ -247,21 +242,24 @@ Java_org_apache_spark_ml_clustering_KMeansDALImpl_cKMeansDALComputeWithInitCente jobject resultObj) { ccl::communicator &comm = getComm(); - int size = comm.size(); int rankId = comm.rank(); NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); NumericTablePtr centroids = *((NumericTablePtr *)pNumTabCenters); jlong ret = 0L; +#ifdef CPU_GPU_PROFILE + if (use_gpu) { - int n_gpu = env->GetArrayLength(gpu_idx_array); + int n_gpu = env->GetArrayLength(gpu_idx_array); + cout << "oneDAL (native): use GPU kernels with " << n_gpu << " GPU(s)" + << endl; + jint *gpu_indices = env->GetIntArrayElements(gpu_idx_array, 0); - std::cout << "oneDAL (native): use GPU kernels with " << n_gpu << " GPU(s)" - << std::endl; - - auto assigned_gpu = getAssignedGPU(comm, size, rankId, gpu_indices, n_gpu); + int size = comm.size(); + auto assigned_gpu = + getAssignedGPU(comm, size, rankId, gpu_indices, n_gpu); // Set SYCL context cl::sycl::queue queue(assigned_gpu); @@ -274,7 +272,9 @@ Java_org_apache_spark_ml_clustering_KMeansDALImpl_cKMeansDALComputeWithInitCente iteration_num, executor_num, resultObj); env->ReleaseIntArrayElements(gpu_idx_array, gpu_indices, 0); - } else { + } else +#endif + { // Set number of threads for oneDAL to use for each rank services::Environment::getInstance()->setNumberOfThreads( executor_cores); diff --git a/mllib-dal/src/main/native/LinearRegressionDALImpl.cpp b/mllib-dal/src/main/native/LinearRegressionDALImpl.cpp index ac717a67e..5755541d2 100644 --- a/mllib-dal/src/main/native/LinearRegressionDALImpl.cpp +++ b/mllib-dal/src/main/native/LinearRegressionDALImpl.cpp @@ -15,9 +15,7 @@ *******************************************************************************/ #include -#include #include -#include #include #include "OneCCL.h" diff --git a/mllib-dal/src/main/native/Makefile b/mllib-dal/src/main/native/Makefile index 6252b044a..94ab3a9b1 100644 --- a/mllib-dal/src/main/native/Makefile +++ b/mllib-dal/src/main/native/Makefile @@ -16,27 +16,46 @@ CC := clang CXX := clang++ RM := rm -rf -CFLAGS := -g -Wall -Wno-deprecated-declarations -fPIC -std=c++17 -fsycl +PLATFORM_PROFILE ?= CPU_ONLY_PROFILE + +$(info ) +$(info === Profile is $(PLATFORM_PROFILE) ===) +$(info ) + +CFLAGS_COMMON := -g -Wall -Wno-deprecated-declarations -fPIC -std=c++17 + +ifeq ($(PLATFORM_PROFILE),CPU_ONLY_PROFILE) + CFLAGS := $(CFLAGS_COMMON) +else ifeq ($(PLATFORM_PROFILE),CPU_GPU_PROFILE) + CFLAGS := $(CFLAGS_COMMON) -fsycl +else + $(error Unknow building profile, should be CPU_ONLY_PROFILE or CPU_GPU_PROFILE) + exit 1 +endif # The following paths setting works for self-built libs from source code # https://github.com/oneapi-src/oneCCL. If oneCCL package in oneAPI Toolkit is used, -# Should change paths to ${CCL_ROOT}/{include,lib}/cpu_icc instead +# Should change paths to $(CCL_ROOT)/{include,lib}/cpu_icc instead INCS := -I $(JAVA_HOME)/include \ -I $(JAVA_HOME)/include/linux \ - -I ${CCL_ROOT}/include \ + -I $(CCL_ROOT)/include \ -I $(DAALROOT)/include \ -I ./javah \ -I ./ # Use static link if possible, TBB is only available as dynamic libs -LIBS := -L${CCL_ROOT}/lib -lccl \ - -L$(DAALROOT)/lib/intel64 -l:libonedal_core.a -l:libonedal_thread.a -l:libonedal_sycl.a \ - -L$(TBBROOT)/lib/lib/intel64/gcc4.8 -ltbb -ltbbmalloc \ - -lpthread -ldl -lOpenCL +LIBS_COMMON := -L$(CCL_ROOT)/lib -lccl \ + -L$(DAALROOT)/lib/intel64 -l:libonedal_core.a -l:libonedal_thread.a \ + -L$(TBBROOT)/lib/lib/intel64/gcc4.8 -ltbb -ltbbmalloc + +ifeq ($(PLATFORM_PROFILE),CPU_ONLY_PROFILE) + LIBS := $(LIBS_COMMON) +else ifeq ($(PLATFORM_PROFILE),CPU_GPU_PROFILE) + LIBS := "$(LIBS_COMMON) -l:libonedal_sycl.a" +endif CPP_SRCS += \ ./OneCCL.cpp ./OneDAL.cpp ./service.cpp ./error_handling.cpp \ - ./GPU.cpp \ ./KMeansDALImpl.cpp \ ./PCADALImpl.cpp \ ./ALSDALImpl.cpp ./ALSShuffle.cpp \ @@ -45,13 +64,19 @@ CPP_SRCS += \ OBJS += \ ./OneCCL.o ./OneDAL.o ./service.o ./error_handling.o \ - ./GPU.o \ ./KMeansDALImpl.o \ ./PCADALImpl.o \ ./ALSDALImpl.o ./ALSShuffle.o \ ./NaiveBayesDALImpl.o \ ./LinearRegressionDALImpl.o +DEFINES=-D$(PLATFORM_PROFILE) + +ifeq ($(PLATFORM_PROFILE),CPU_GPU_PROFILE) + CPP_SRCS += ./GPU.cpp + OBJS += ./GPU.o +endif + # Output Binary OUTPUT = ../../../target/libMLlibDAL.so @@ -60,7 +85,7 @@ all: $(OUTPUT) # Compile %.o: %.cpp @echo 'Building file: $<' - $(CXX) $(CFLAGS) $(INCS) -c -o "$@" "$<" + $(CXX) $(CFLAGS) $(INCS) $(DEFINES) -c -o "$@" "$<" @echo 'Finished building: $<' @echo ' ' diff --git a/mllib-dal/src/main/native/NaiveBayesDALImpl.cpp b/mllib-dal/src/main/native/NaiveBayesDALImpl.cpp index 402947444..ad1119f2d 100644 --- a/mllib-dal/src/main/native/NaiveBayesDALImpl.cpp +++ b/mllib-dal/src/main/native/NaiveBayesDALImpl.cpp @@ -1,4 +1,3 @@ -#include #include #include "OneCCL.h" diff --git a/mllib-dal/src/main/native/OneDAL.cpp b/mllib-dal/src/main/native/OneDAL.cpp index c684bda1f..8aadda4f0 100644 --- a/mllib-dal/src/main/native/OneDAL.cpp +++ b/mllib-dal/src/main/native/OneDAL.cpp @@ -15,7 +15,6 @@ *******************************************************************************/ #include -#include #include #include "org_apache_spark_ml_util_OneDAL__.h" diff --git a/mllib-dal/src/main/native/PCADALImpl.cpp b/mllib-dal/src/main/native/PCADALImpl.cpp index ca73b7b5b..0f34fa87b 100644 --- a/mllib-dal/src/main/native/PCADALImpl.cpp +++ b/mllib-dal/src/main/native/PCADALImpl.cpp @@ -15,9 +15,12 @@ *******************************************************************************/ #include -#include #include +#ifdef CPU_GPU_PROFILE +#include "GPU.h" +#endif + #include "OneCCL.h" #include "org_apache_spark_ml_feature_PCADALImpl.h" #include "service.h" @@ -28,37 +31,9 @@ using namespace daal::algorithms; typedef double algorithmFPType; /* Algorithm floating-point type */ -/* - * Class: org_apache_spark_ml_feature_PCADALImpl - * Method: cPCATrainDAL - * Signature: (JIIIZ[ILorg/apache/spark/ml/feature/PCAResult;)J - */ - -JNIEXPORT jlong JNICALL -Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL( - JNIEnv *env, jobject obj, jlong pNumTabData, jint k, jint executor_num, - jint executor_cores, jboolean use_gpu, jintArray gpu_idx_array, jobject resultObj) { - - using daal::byte; - - ccl::communicator &comm = getComm(); - size_t rankId = comm.rank(); - - const size_t nBlocks = executor_num; - const int comm_size = executor_num; - - NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); - // Source data already normalized - pData->setNormalizationFlag(NumericTableIface::standardScoreNormalized); - - // Set number of threads for oneDAL to use for each rank - services::Environment::getInstance()->setNumberOfThreads(executor_cores); - - int nThreadsNew = - services::Environment::getInstance()->getNumberOfThreads(); - cout << "oneDAL (native): Number of CPU threads used: " << nThreadsNew - << endl; - +static void doPCADALCompute(JNIEnv *env, jobject obj, int rankId, + ccl::communicator &comm, NumericTablePtr &pData, + int nBlocks, jobject resultObj) { auto t1 = std::chrono::high_resolution_clock::now(); pca::Distributed localAlgorithm; @@ -78,15 +53,15 @@ Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL( t1 = std::chrono::high_resolution_clock::now(); /* Serialize partial results required by step 2 */ - services::SharedPtr serializedData; + services::SharedPtr serializedData; InputDataArchive dataArch; localAlgorithm.getPartialResult()->serialize(dataArch); size_t perNodeArchLength = dataArch.getSizeOfArchive(); - serializedData = - services::SharedPtr(new byte[perNodeArchLength * nBlocks]); + serializedData = services::SharedPtr( + new daal::byte[perNodeArchLength * nBlocks]); - byte *nodeResults = new byte[perNodeArchLength]; + daal::byte *nodeResults = new daal::byte[perNodeArchLength]; dataArch.copyArchiveToArray(nodeResults, perNodeArchLength); t2 = std::chrono::high_resolution_clock::now(); @@ -96,8 +71,8 @@ Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL( std::cout << "PCA (native): serializing partial results took " << duration << " secs" << std::endl; - vector recv_counts(comm_size * perNodeArchLength); - for (int i = 0; i < comm_size; i++) + vector recv_counts(nBlocks * perNodeArchLength); + for (int i = 0; i < nBlocks; i++) recv_counts[i] = perNodeArchLength; cout << "PCA (native): ccl_allgatherv receiving " @@ -166,7 +141,6 @@ Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL( printNumericTable(result->get(pca::eigenvectors), "First 10 eigenvectors with first 20 dimensions:", 10, 20); - // Return all eigenvalues & eigenvectors // Get the class of the input object @@ -186,6 +160,64 @@ Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL( env->SetLongField(resultObj, explainedVarianceNumericTableField, (jlong)eigenvalues); } +} + +/* + * Class: org_apache_spark_ml_feature_PCADALImpl + * Method: cPCATrainDAL + * Signature: (JIIIZ[ILorg/apache/spark/ml/feature/PCAResult;)J + */ + +JNIEXPORT jlong JNICALL +Java_org_apache_spark_ml_feature_PCADALImpl_cPCATrainDAL( + JNIEnv *env, jobject obj, jlong pNumTabData, jint k, jint executor_num, + jint executor_cores, jboolean use_gpu, jintArray gpu_idx_array, + jobject resultObj) { + + ccl::communicator &comm = getComm(); + size_t rankId = comm.rank(); + + const size_t nBlocks = executor_num; + + NumericTablePtr pData = *((NumericTablePtr *)pNumTabData); + // Source data already normalized + pData->setNormalizationFlag(NumericTableIface::standardScoreNormalized); + +#ifdef CPU_GPU_PROFILE + if (use_gpu) { + int n_gpu = env->GetArrayLength(gpu_idx_array); + jint *gpu_indices = env->GetIntArrayElements(gpu_idx_array, 0); + + std::cout << "oneDAL (native): use GPU kernels with " << n_gpu + << " GPU(s)" << std::endl; + + int size = comm.size(); + auto assigned_gpu = + getAssignedGPU(comm, size, rankId, gpu_indices, n_gpu); + + // Set SYCL context + cl::sycl::queue queue(assigned_gpu); + daal::services::SyclExecutionContext ctx(queue); + daal::services::Environment::getInstance()->setDefaultExecutionContext( + ctx); + + doPCADALCompute(env, obj, rankId, comm, pData, nBlocks, resultObj); + + env->ReleaseIntArrayElements(gpu_idx_array, gpu_indices, 0); + } else +#endif + { + // Set number of threads for oneDAL to use for each rank + services::Environment::getInstance()->setNumberOfThreads( + executor_cores); + + int nThreadsNew = + services::Environment::getInstance()->getNumberOfThreads(); + cout << "oneDAL (native): Number of CPU threads used: " << nThreadsNew + << endl; + + doPCADALCompute(env, obj, rankId, comm, pData, nBlocks, resultObj); + } return 0; } diff --git a/mllib-dal/src/main/native/build-cpu-gpu.sh b/mllib-dal/src/main/native/build-cpu-gpu.sh new file mode 100755 index 000000000..81d591516 --- /dev/null +++ b/mllib-dal/src/main/native/build-cpu-gpu.sh @@ -0,0 +1,20 @@ +#!/usr/bin/env bash + +# Copyright 2020 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +export PLATFORM_PROFILE=CPU_GPU_PROFILE + +make clean +make -j diff --git a/mllib-dal/src/main/native/service.cpp b/mllib-dal/src/main/native/service.cpp index 0a97e584a..7b9497c6c 100644 --- a/mllib-dal/src/main/native/service.cpp +++ b/mllib-dal/src/main/native/service.cpp @@ -1,5 +1,3 @@ -#include - #include "error_handling.h" #include "service.h" @@ -761,6 +759,7 @@ SerializationIfacePtr deserializeDAALObject(daal::byte *buff, size_t length) { return dataArch.getAsSharedPtr(); } +#ifdef CPU_GPU_PRFILE NumericTablePtr homegenToSyclHomogen(NumericTablePtr ntHomogen) { int nRows = ntHomogen->getNumberOfRows(); int nColumns = ntHomogen->getNumberOfColumns(); @@ -794,3 +793,4 @@ NumericTablePtr homegenToSyclHomogen(NumericTablePtr ntHomogen) { return ntSycl; } +#endif diff --git a/mllib-dal/src/main/native/service.h b/mllib-dal/src/main/native/service.h index 0bd8b411c..399749ff1 100644 --- a/mllib-dal/src/main/native/service.h +++ b/mllib-dal/src/main/native/service.h @@ -22,7 +22,11 @@ #pragma once -#include "daal.h" +#ifdef CPU_GPU_PROFILE +#include +#else +#include +#endif using namespace daal::data_management; @@ -47,4 +51,7 @@ void printNumericTable(const NumericTablePtr &dataTable, size_t serializeDAALObject(SerializationIface *pData, ByteBuffer &buffer); SerializationIfacePtr deserializeDAALObject(daal::byte *buff, size_t length); CSRNumericTable *createFloatSparseTable(const std::string &datasetFileName); + +#ifdef CPU_GPU_PROFILE NumericTablePtr homegenToSyclHomogen(NumericTablePtr ntHomogen); +#endif diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/recommendation/ALSDALImpl.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/recommendation/ALSDALImpl.scala index 01bf55811..6b4c9f8b1 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/recommendation/ALSDALImpl.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/recommendation/ALSDALImpl.scala @@ -94,8 +94,6 @@ class ALSDALImpl[@specialized(Int, Long) ID: ClassTag]( data: RDD[Rating[ID]], } .mapPartitionsWithIndex { (rank, iter) => val context = new DaalContext() - println("ALSDALImpl: Loading libMLlibDAL.so") - LibLoader.loadLibraries() OneCCL.init(executorNum, rank, kvsIPPort) val rankId = OneCCL.rankID() diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneDAL.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneDAL.scala index a3b24df9a..dee1ee757 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneDAL.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/OneDAL.scala @@ -138,9 +138,6 @@ object OneDAL { val context = new DaalContext() val matrix = new DALMatrix(context, classOf[java.lang.Double], 1, data.length, NumericTable.AllocationFlag.DoAllocate) - // oneDAL libs should be loaded by now, loading other native libs - logger.info("Loading native libraries") - LibLoader.loadLibraries() data.zipWithIndex.foreach { case (value: Double, index: Int) => cSetDouble(matrix.getCNumericTable, index, 0, value) @@ -204,10 +201,6 @@ object OneDAL { val matrixLabel = new DALMatrix(context, classOf[lang.Double], 1, points.length, NumericTable.AllocationFlag.DoAllocate) - // oneDAL libs should be loaded by now, loading other native libs - logger.info("Loading native libraries") - LibLoader.loadLibraries() - points.zipWithIndex.foreach { case (point: Double, index: Int) => cSetDouble(matrixLabel.getCNumericTable, index, 0, point) } @@ -256,10 +249,6 @@ object OneDAL { val contextLocal = new DaalContext() - // oneDAL libs should be loaded by now, loading other native libs - logger.info("Loading native libraries") - LibLoader.loadLibraries() - val cTable = OneDAL.cNewCSRNumericTableDouble(values, columnIndices, rowOffsets.toArray, nFeatures, csrRowNum) val table = new CSRNumericTable(contextLocal, cTable) @@ -383,10 +372,6 @@ object OneDAL { val matrix = new DALMatrix(context, classOf[lang.Double], numCols.toLong, numRows.toLong, NumericTable.AllocationFlag.DoAllocate) - // oneDAL libs should be loaded by now, loading other native libs - logger.info("Loading native libraries") - LibLoader.loadLibraries() - var dalRow = 0 it.foreach { curVector => diff --git a/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala b/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala index ab6f34a49..2908baccb 100644 --- a/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala +++ b/mllib-dal/src/main/scala/org/apache/spark/ml/util/Utils.scala @@ -100,7 +100,6 @@ object Utils { val sc = data.sparkContext val result = data.mapPartitions { p => - LibLoader.loadLibraries() val port = OneCCL.getAvailPort(localIP) if (port != -1) { Iterator(port) @@ -112,6 +111,10 @@ object Utils { result(0) } + // + // This function is first called in driver and executors to load libraries and check compatibility + // All other functions using native libraries will depend on this function to be called first + // def checkClusterPlatformCompatibility(sc: SparkContext): Boolean = { LibLoader.loadLibraries() diff --git a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 1384e4753..6e5a4ee95 100644 --- a/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib-dal/src/spark-3.0.0/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -333,7 +333,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == Normal)) { // oneDAL only support simple linear regression and ridge regression val paramSupported = ($(regParam) == 0) || ($(regParam) != 0 && $(elasticNetParam) == 0) - if (paramSupported && Utils.isOAPEnabled) { + val isPlatformSupported = Utils.checkClusterPlatformCompatibility( + dataset.sparkSession.sparkContext) + if (paramSupported && Utils.isOAPEnabled && isPlatformSupported) { val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() logInfo(s"LinearRegressionDAL fit using $executor_num Executors") diff --git a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 85b8be9ff..62e5e3283 100644 --- a/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib-dal/src/spark-3.0.1/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -327,7 +327,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == Normal)) { // oneDAL only support simple linear regression and ridge regression val paramSupported = ($(regParam) == 0) || ($(regParam) != 0 && $(elasticNetParam) == 0) - if (paramSupported && Utils.isOAPEnabled) { + val isPlatformSupported = Utils.checkClusterPlatformCompatibility( + dataset.sparkSession.sparkContext) + if (paramSupported && Utils.isOAPEnabled && isPlatformSupported) { val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() logInfo(s"LinearRegressionDAL fit using $executor_num Executors") diff --git a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 85b8be9ff..62e5e3283 100644 --- a/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib-dal/src/spark-3.0.2/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -327,7 +327,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String numFeatures <= WeightedLeastSquares.MAX_NUM_FEATURES) || $(solver) == Normal)) { // oneDAL only support simple linear regression and ridge regression val paramSupported = ($(regParam) == 0) || ($(regParam) != 0 && $(elasticNetParam) == 0) - if (paramSupported && Utils.isOAPEnabled) { + val isPlatformSupported = Utils.checkClusterPlatformCompatibility( + dataset.sparkSession.sparkContext) + if (paramSupported && Utils.isOAPEnabled && isPlatformSupported) { val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() logInfo(s"LinearRegressionDAL fit using $executor_num Executors") diff --git a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index 07ce88c23..2e342ea3d 100644 --- a/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib-dal/src/spark-3.1.1/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -441,7 +441,9 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String instr: Instrumentation): LinearRegressionModel = { // oneDAL only support simple linear regression and ridge regression val paramSupported = ($(regParam) == 0) || ($(regParam) != 0 && $(elasticNetParam) == 0) - if (paramSupported && Utils.isOAPEnabled) { + val isPlatformSupported = Utils.checkClusterPlatformCompatibility( + dataset.sparkSession.sparkContext) + if (paramSupported && Utils.isOAPEnabled && isPlatformSupported) { val executor_num = Utils.sparkExecutorNum(dataset.sparkSession.sparkContext) val executor_cores = Utils.sparkExecutorCores() logInfo(s"LinearRegressionDAL fit using $executor_num Executors") diff --git a/mllib-dal/test-cpu-gpu.sh b/mllib-dal/test-cpu-gpu.sh new file mode 100755 index 000000000..eb32e62c8 --- /dev/null +++ b/mllib-dal/test-cpu-gpu.sh @@ -0,0 +1,117 @@ +#!/usr/bin/env bash + +# Check envs for building +if [[ -z $JAVA_HOME ]]; then + echo JAVA_HOME not defined! + exit 1 +fi + +if [[ -z $(which mvn) ]]; then + echo Maven not found! + exit 1 +fi + +if [[ -z $DAALROOT ]]; then + echo DAALROOT not defined! + exit 1 +fi + +if [[ -z $TBBROOT ]]; then + echo TBBROOT not defined! + exit 1 +fi + +if [[ -z $CCL_ROOT ]]; then + echo CCL_ROOT not defined! + exit 1 +fi + +versionArray=( + spark-3.0.0 \ + spark-3.0.1 \ + spark-3.0.2 \ + spark-3.1.1 +) + +suiteArray=( + "clustering.MLlibKMeansSuite" \ + "feature.MLlibPCASuite" \ + "recommendation.MLlibALSSuite" \ + "classification.MLlibNaiveBayesSuite" \ + "regression.MLlibLinearRegressionSuite" +) + +# Set default version +SPARK_VER=spark-3.1.1 +MVN_NO_TRANSFER_PROGRESS= + +print_usage() { + echo + echo Usage: ./test.sh [-p spark-x.x.x] [-q] [-h] [test suite name] + echo + echo Supported Spark versions: + for version in ${versionArray[*]} + do + echo " $version" + done + echo + echo Supported Test suites: + for suite in ${suiteArray[*]} + do + echo " $suite" + done + echo +} + +while getopts "hqp:" opt +do +case $opt in + p) SPARK_VER=$OPTARG ;; + q) MVN_NO_TRANSFER_PROGRESS=--no-transfer-progress ;; + h | *) + print_usage + exit 1 + ;; +esac +done + +shift "$((OPTIND-1))" + +print_usage + +export PLATFORM_PROFILE=CPU_GPU_PROFILE + +echo === Testing Environments === +echo JAVA_HOME=$JAVA_HOME +echo DAALROOT=$DAALROOT +echo TBBROOT=$TBBROOT +echo CCL_ROOT=$CCL_ROOT +echo Maven Version: $(mvn -v | head -n 1 | cut -f3 -d" ") +echo Clang Version: $(clang -dumpversion) +echo Spark Version: $SPARK_VER +echo Platform Profile: $PLATFORM_PROFILE +echo ============================ + +SUITE=$1 + +if [[ ! ${versionArray[*]} =~ $SPARK_VER ]]; then + echo Error: $SPARK_VER version is not supported! + exit 1 +fi + +if [[ ! ${suiteArray[*]} =~ $SUITE ]]; then + echo Error: $SUITE test suite is not supported! + exit 1 +fi + +if [[ -z $SUITE ]]; then + echo + echo Testing ALL suites... + echo + mvn $MVN_NO_TRANSFER_PROGRESS -P$SPARK_VER -Dtest=none clean test +else + echo + echo Testing org.apache.spark.ml.$SUITE ... + echo + mvn $MVN_NO_TRANSFER_PROGRESS -P$SPARK_VER -Dtest=none -DwildcardSuites=org.apache.spark.ml.$SUITE clean test +fi diff --git a/mllib-dal/test.sh b/mllib-dal/test.sh index a39501b67..b9bfd215d 100755 --- a/mllib-dal/test.sh +++ b/mllib-dal/test.sh @@ -79,6 +79,8 @@ shift "$((OPTIND-1))" print_usage +export PLATFORM_PROFILE=CPU_ONLY_PROFILE + echo === Testing Environments === echo JAVA_HOME=$JAVA_HOME echo DAALROOT=$DAALROOT @@ -87,6 +89,7 @@ echo CCL_ROOT=$CCL_ROOT echo Maven Version: $(mvn -v | head -n 1 | cut -f3 -d" ") echo Clang Version: $(clang -dumpversion) echo Spark Version: $SPARK_VER +echo Platform Profile: $PLATFORM_PROFILE echo ============================ SUITE=$1