Skip to content

Spark plug-in package for accelerating Spark runtime spill functions using PMem such as RDD cache PMem extension.

License

Notifications You must be signed in to change notification settings

yeyuqiang/pmem-spill

 
 

Repository files navigation

* LEGAL NOTICE: Your use of this software and any required dependent software (the "Software Package") is subject to the terms and conditions of the software license agreements for the Software Package, which may also include notices, disclaimers, or license terms for third party or open source software included in or with the Software Package, and your use indicates your acceptance of all such terms. Please refer to the "TPP.txt" or other similarly-named text file included with the Software Package for additional details.
* Optimized Analytics Package for Spark* Platform is under Apache 2.0 (https://www.apache.org/licenses/LICENSE-2.0).

PMem Spill

Online Documentation

You can find the all the PMem Spill documents on the project web page.

Contents

Introduction

Spark supports to cache RDDs in memory and disk. We know that memory is small in size and costly. Disks are much slower, although with large capacity. PMem Spill supports RDD Cache with Intel Optane PMem, adds PMem storage level to the existing RDD cache solutions besides memory and disk.

RDD Persistence

One of the most important capabilities in Spark is persisting (or caching) a dataset across operations, each persisted RDD can be stored using a different storage level, allowing you, for example, to persist the dataset on disk, persist it in memory but as serialized Java objects (to save space), replicate it across nodes. These levels are set by passing a StorageLevel object, to persist(). The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY (store deserialized objects in memory). The full set of storage levels is:

Storage Level Meaning
MEMORY_ONLY Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, some partitions will not be cached and will be recomputed on the fly each time they're needed. This is the default level.
MEMORY_AND_DISK Store RDD as deserialized Java objects in the JVM. If the RDD does not fit in memory, store the partitions that don't fit on disk, and read them from there when they're needed.
MEMORY_ONLY_SER (Java and Scala) Store RDD as serialized Java objects (one byte array per partition). This is generally more space-efficient than deserialized objects, especially when using a fast serializer, but more CPU-intensive to read.
MEMORY_AND_DISK_SER (Java and Scala) Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to disk instead of recomputing them on the fly each time they're needed.
DISK_ONLY Store the RDD partitions only on disk.
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as the levels above, but replicate each partition on two cluster nodes.
OFF_HEAP (experimental) Similar to MEMORY_ONLY_SER, but store the data in off-heap memory. This requires off-heap memory to be enabled.
PMEM_ONLY Similar to MEMORY_ONLY_SER, but spill partitions that don't fit in memory to Intel Optane PMem instead of recomputing them on the fly each time they're needed.
PMEM_AND_DISK Similar to MEMORY_AND_DISK_SER, but spill partitions that don't fit in memory to Intel Optane PMem and disk instead of recomputing them on the fly each time they're needed.

Which Storage Level to Choose?

Spark's storage levels are meant to provide different trade-offs between memory usage and CPU efficiency.

PMem storage level is added to support a new tier for storage level besides memory and disk. Using PMem library to access Optane PMem can help to avoid the overhead from disk. Large capacity and high I/O performance of PMem shows better performance than tied DRAM and disk solution under the same cost.

User Guide

Installation

We have provided a Conda package which will automatically install dependencies needed by OAP, you can refer to OAP-Installation-Guide for more information. If you have finished OAP-Installation-Guide, you needn't compile and install Memkind, and you can find compiled OAP jars in $HOME/miniconda2/envs/oapenv/oap_jars.

Prerequisites

The following are required to configure OAP to use PMem cache in AppDirect mode.

  • PMem hardware is successfully deployed on each node in cluster.

  • Directories exposing PMem hardware on each socket. For example, on a two socket system the mounted PMem directories should appear as /mnt/pmem0 and /mnt/pmem1. Correctly installed PMem must be formatted and mounted on every cluster worker node.

    // use ipmctl command to show topology and dimm info of PMem
    ipmctl show -topology
    ipmctl show -dimm
    // provision PMem in app direct mode
    ipmctl create -goal PersistentMemoryType=AppDirect
    // reboot system to make configuration take affect
    reboot
    // check capacity provisioned for app direct mode(AppDirectCapacity)
    ipmctl show -memoryresources
    // show the PMem region information
    ipmctl show -region
    // create namespace based on the region, multi namespaces can be created on a single region
    ndctl create-namespace -m fsdax -r region0
    ndctl create-namespace -m fsdax -r region1
    // show the created namespaces
    fdisk -l
    // create and mount file system
    echo y | mkfs.ext4 /dev/pmem0
    echo y | mkfs.ext4 /dev/pmem1
    mkdir -p /mnt/pmem0
    mkdir -p /mnt/pmem1 
    mount -o dax /dev/pmem0 /mnt/pmem0
    mount -o dax /dev/pmem1 /mnt/pmem1
    

    In this case file systems are generated for 2 numa nodes, which can be checked by "numactl --hardware". For a different number of numa nodes, a corresponding number of namespaces should be created to assure correct file system paths mapping to numa nodes.

  • Make sure Memkind library installed on every cluster worker node.
    The Memkind library depends on libnuma at the runtime, so it must already exist in the worker node system. Build the latest memkind lib from source:

    git clone -b v1.10.1 https://github.com/memkind/memkind
    cd memkind
    ./autogen.sh
    ./configure
    make
    make install
    
  • For KMem Dax mode, we need to configure PMem as system ram. Kernel 5.1 or above is required to this mode.

    daxctl migrate-device-model
    ndctl create-namespace --mode=devdax --map=mem
    ndctl list
    daxctl reconfigure-device dax0.0 --mode=system-ram
    daxctl reconfigure-device dax1.0 --mode=system-ram
    daxctl reconfigure-device daxX.Y --mode=system-ram
    

Refer Memkind KMem for details.

Compiling

Before building PMem Spill, install PMem-Common locally:

git clone -b <tag-version> https://github.com/oap-project/pmem-common.git
cd pmem-common
mvn clean install -DskipTests

To build pmem spill, you can run below commands:

cd ${PMEM-SPILL}
mvn clean package -DskipTests

You will find jar files under oap-common/target and oap-spark/target.

Configuration

To enable rdd cache on Intel Optane PMem, you need add the following configurations to spark-defaults.conf

spark.memory.pmem.extension.enabled true
spark.memory.pmem.initial.path [Your Optane PMem paths seperate with comma]
spark.memory.pmem.initial.size [Your Optane PMem size in GB]
spark.memory.pmem.usable.ratio [from 0 to 1, 0.85 is recommended]
spark.yarn.numa.enabled true
spark.yarn.numa.num [Your numa node number]
spark.memory.pmem.mode [AppDirect | KMemDax]

spark.files                       file://${PATH_TO_PMEM_SPILL_JAR}/pmem-rdd-cache-<version>-with-spark-<version>.jar,file://${PATH_TO_PMEM_COMMON_JAR}/pmem-common-<version>-with-spark-<version>.jar
spark.executor.extraClassPath     ./pmem-rdd-cache-<version>-with-spark-<version>.jar:./pmem-common-<version>-with-spark-<version>.jar
spark.driver.extraClassPath       file://${PATH_TO_PMEM_SPILL_JAR}/pmem-rdd-cache-<version>-with-spark-<version>.jar:file://${PATH_TO_PMEM_COMMON_JAR}/pmem-common-<version>-with-spark-<version>.jar

Use Optane PMem to cache data

There's a new StorageLevel: PMEM_AND_DISK being added to cache data to Optane PMem, at the places you previously cache/persist data to memory, use PMEM_AND_DISK to substitute the previous StorageLevel, data will be cached to Optane PMem.

persist(StorageLevel.PMEM_AND_DISK)

Run K-means benchmark

You can use Hibench to run K-means workload:

After you Build Hibench, then follow Run SparkBench documentation. Here are some tips besides this documentation you need to notice. Follow the documentation to configure these 4 files:

HiBench/conf/hadoop.conf
HiBench/conf/hibench.conf
HiBench/conf/spark.conf
HiBench/conf/workloads/ml/kmeans.conf

Note that you need add hibench.kmeans.storage.level PMEM_AND_DISK to kmeans.conf, which can enable both PMem and Disk to cache data. If you completed OAP-Installation-Guide, you also need add the following configs to spark.conf

spark.executorEnv.LD_LIBRARY_PATH   $HOME/miniconda2/envs/oapenv/lib
spark.executor.extraLibraryPath     $HOME/miniconda2/envs/oapenv/lib
spark.driver.extraLibraryPath       $HOME/miniconda2/envs/oapenv/lib

Then you can run the following 2 commands to run K-means workloads:

bin/workloads/ml/kmeans/prepare/prepare.sh
bin/workloads/ml/kmeans/spark/run.sh

Then you can find the log as below:

patching args=
Parsing conf: /home/wh/HiBench/conf/hadoop.conf
Parsing conf: /home/wh/HiBench/conf/hibench.conf
Parsing conf: /home/wh/HiBench/conf/spark.conf
Parsing conf: /home/wh/HiBench/conf/workloads/ml/kmeans.conf
probe sleep jar: /opt/Beaver/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-2.7.3-tests.jar
start ScalaSparkKmeans bench
hdfs rm -r: /opt/Beaver/hadoop/bin/hadoop --config /opt/Beaver/hadoop/etc/hadoop fs -rm -r -skipTrash hdfs://vsr219:9000/HiBench/Kmeans/Output
rm: `hdfs://vsr219:9000/HiBench/Kmeans/Output': No such file or directory
hdfs du -s: /opt/Beaver/hadoop/bin/hadoop --config /opt/Beaver/hadoop/etc/hadoop fs -du -s hdfs://vsr219:9000/HiBench/Kmeans/Input
Export env: SPARKBENCH_PROPERTIES_FILES=/home/wh/HiBench/report/kmeans/spark/conf/sparkbench/sparkbench.conf
Export env: HADOOP_CONF_DIR=/opt/Beaver/hadoop/etc/hadoop
Submit Spark job: /opt/Beaver/spark/bin/spark-submit  --properties-file /home/wh/HiBench/report/kmeans/spark/conf/sparkbench/spark.conf --class com.intel.hibench.sparkbench.ml.DenseKMeans --master yarn-client --num-executors 2 --executor-cores 45 --executor-memory 100g /home/wh/HiBench/sparkbench/assembly/target/sparkbench-assembly-8.0-SNAPSHOT-dist.jar -k 10 --numIterations 5 --storageLevel PMEM_AND_DISK hdfs://vsr219:9000/HiBench/Kmeans/Input/samples
20/07/03 09:07:49 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-43459116-f4e3-4fe1-bb29-9bca0afa5286
finish ScalaSparkKmeans bench

Open the Spark History Web UI and go to the Storage tab page to verify the cache metrics.

Limitations

For the scenario that data will exceed the block cache capacity. Memkind 1.9.0 and kernel 4.18 is recommended to avoid the unexpected issue.

How to contribute

Currently, PMem Spill packages includes all Spark changed files.

  • MemoryMode.java
  • MemoryManager.scala
  • StorageMemoryPool.scala
  • UnifiedMemoryManager.scala
  • MemoryStore.scala
  • BlockManager.scala
  • StorageLevel.scala
  • TestMemoryManager

Please make sure your code change in above source code will not break current function.

The files from this package should avoid depending on other OAP module except PMem-Common.

About

Spark plug-in package for accelerating Spark runtime spill functions using PMem such as RDD cache PMem extension.

Resources

License

Security policy

Stars

Watchers

Forks

Packages

No packages published

Languages

  • Scala 96.9%
  • JavaScript 2.5%
  • Java 0.6%