diff --git a/README.md b/README.md index 6cacae5c..340cfbc2 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,7 @@ [5. Install dependencies for PMem Shuffle](#install-dependencies-for-pmem-shuffle) [6. Install PMem Shuffle for Spark](#install-pmem-shuffle-for-spark) [7. PMem Shuffle for Spark Testing](#pmem-shuffle-for-spark-testing) +[8. Trouble Shooting](#trouble-shooting) [Reference](#reference) @@ -134,8 +135,8 @@ installation/enabling or FW installation is out of the scope of this guide. 6) Step 5 is required only when running this solution over RDMA is considered. Otherwise PMem can be initialized in fsdax mode. a. Run *ndctl create-namespace -m fsdax -r region0 -s 120g* b. Run *ndctl create-namespace -m fsdax -r region0 -s 120g* - c. Run *ndctl create-namespace -m fsdax -r region0 -s 120g* - d. Run *ndctl create-namespace -m fsdax -r region0 -s 120g* + c. Run *ndctl create-namespace -m fsdax -r region1 -s 120g* + d. Run *ndctl create-namespace -m fsdax -r region1 -s 120g* Four namespaces /dev/pmem0, /dev/pmem0.1, /dev/pmem1, /dev/pmem1.1 are created. Note that the namespace name might vary due to existing namespaces. In general, the name is consistent with the pattern /dev/pmem*. After creating the namespace in fsdax mode, the namespace is ready for a file system. Here we use Ext4 file system in enabling. @@ -574,6 +575,9 @@ spark.driver.rport 61000 ``` +**FSDAX** +Use `spark.shuffle.pmof.pmpool_size` to specify the size of created shuffle file. The size should obey the rule: `max(spark.shuffle.pmof.pmpool_size) < size_of_namespace * 0.9`. It's because we need to reserve some space in fsdax namespace for meta data. + **Misc** The config `spark.sql.shuffle.partitions` is required to set explicitly, it's suggested to use default value `200` unless you're pretty sure what's the meaning of this value. @@ -781,6 +785,15 @@ spark.driver.rhost $IP //change to your spark.driver.rport 61000 ``` + +## 8. Trouble shooting +For any reason that a previous job is failed, please empty PMem spaces before another run. +It's because normal space release operation might fail to be invoked for failed jobs. + +For devdax, use `pmempool rm {devdax-namespace}` to reset the entire namespace. +For fsdax, use `rm -rf {mounted-pmem-folder}/shuffle_block*` to remove corresponding shuffle pool files. + + ### Reference guides (without BKC access) ----------------------------------- If you do not have BKC access, please following below official guide: diff --git a/core/src/main/java/org/apache/spark/jni/pmof/JniUtils.java b/core/src/main/java/org/apache/spark/jni/pmof/JniUtils.java index 4a9267e1..59534b2f 100644 --- a/core/src/main/java/org/apache/spark/jni/pmof/JniUtils.java +++ b/core/src/main/java/org/apache/spark/jni/pmof/JniUtils.java @@ -54,14 +54,20 @@ static void loadLibraryFromJar(String libName) throws IOException, IllegalAccess private static File moveFileFromJarToTemp(final String tmpDir, String libraryToLoad) throws IOException { final File temp = File.createTempFile(tmpDir, libraryToLoad); - try (final InputStream is = - JniUtils.class.getClassLoader().getResourceAsStream(libraryToLoad)) { + final InputStream is = JniUtils.class.getClassLoader().getResourceAsStream(libraryToLoad); + try { if (is == null) { throw new FileNotFoundException(libraryToLoad); } else { Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING); } + }catch(Exception e){ + e.printStackTrace(); + }finally{ + if (is != null){ + is.close(); + } } - return temp; + return temp; } } diff --git a/core/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryMetaHandler.java b/core/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryMetaHandler.java index fdc2a839..c47a7c5b 100644 --- a/core/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryMetaHandler.java +++ b/core/src/main/java/org/apache/spark/storage/pmof/PersistentMemoryMetaHandler.java @@ -68,6 +68,10 @@ public void createTable(String root_dir) { if (fl != null) { fl.release(); } + }catch(IOException e){ + e.printStackTrace(); + } + try{ if (fos != null) { fos.close(); } @@ -113,6 +117,10 @@ public void insertRecord(String shuffleId, String device) { if (fl != null) { fl.release(); } + }catch(IOException e){ + e.printStackTrace(); + } + try{ if (fos != null) { fos.close(); } @@ -165,6 +173,10 @@ public String getShuffleDevice(String shuffleId){ if (fl != null) { fl.release(); } + }catch(IOException e){ + e.printStackTrace(); + } + try{ if (fos != null) { fos.close(); } @@ -242,6 +254,10 @@ public String getUnusedDevice(ArrayList full_device_list){ if (fl != null) { fl.release(); } + }catch(IOException e){ + e.printStackTrace(); + } + try{ if (fos != null) { fos.close(); } diff --git a/native/src/PmemBuffer.h b/native/src/PmemBuffer.h index ebb8d68a..1a30cd3c 100644 --- a/native/src/PmemBuffer.h +++ b/native/src/PmemBuffer.h @@ -54,7 +54,9 @@ class PmemBuffer { free(tmp_buf_data); } pos = remaining; - memcpy(buf_data + pos, pmem_data_addr, pmem_data_len); + if(buf_data != nullptr){ + memcpy(buf_data + pos, pmem_data_addr, pmem_data_len); + } } else if (remaining == 0) { if (buf_data_capacity < pmem_data_len) { free(buf_data); diff --git a/native/src/lib_jni_pmdk.cpp b/native/src/lib_jni_pmdk.cpp index 9d02a33b..baf1627f 100644 --- a/native/src/lib_jni_pmdk.cpp +++ b/native/src/lib_jni_pmdk.cpp @@ -30,13 +30,16 @@ JNIEXPORT jlongArray JNICALL Java_org_apache_spark_storage_pmof_PersistentMemory uint64_t size = 0; pmkv->get_meta_size(key_str, &size); struct memory_meta* mm = (struct memory_meta*)std::malloc(sizeof(struct memory_meta)); - mm->meta = (uint64_t*)std::malloc(size*2*sizeof(uint64_t)); - pmkv->get_meta(key_str, mm); - jlongArray data = env->NewLongArray(mm->length); - env->SetLongArrayRegion(data, 0, mm->length, (jlong*)mm->meta); - std::free(mm->meta); - std::free(mm); - return data; + if(mm != nullptr){ + mm->meta = (uint64_t*)std::malloc(size*2*sizeof(uint64_t)); + pmkv->get_meta(key_str, mm); + jlongArray data = env->NewLongArray(mm->length); + env->SetLongArrayRegion(data, 0, mm->length, (jlong*)mm->meta); + std::free(mm->meta); + std::free(mm); + return data; + } + return nullptr; } JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeGetBlockSize @@ -61,8 +64,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_ JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeCloseDevice (JNIEnv *env, jclass obj, jlong kv) { pmemkv *pmkv = static_cast((void*)kv); - pmkv->free_all(); + long result = pmkv->free_all(); delete pmkv; + return result; } JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeGetRoot @@ -110,7 +114,8 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeWrite JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeGetPmemBufferRemaining (JNIEnv *env, jobject obj, jlong pmBuffer) { - ((PmemBuffer*)pmBuffer)->getRemaining(); + int remaining = ((PmemBuffer*)pmBuffer)->getRemaining(); + return remaining; } JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeGetPmemBufferDataAddr @@ -121,6 +126,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeGetP JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeCleanPmemBuffer (JNIEnv *env, jobject obj, jlong pmBuffer) { ((PmemBuffer*)pmBuffer)->clean(); + return 0; } JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeDeletePmemBuffer diff --git a/native/src/libjnipmdk.so b/native/src/libjnipmdk.so new file mode 100755 index 00000000..2621a524 Binary files /dev/null and b/native/src/libjnipmdk.so differ diff --git a/native/src/pmemkv.h b/native/src/pmemkv.h index d7a4f606..e659e478 100644 --- a/native/src/pmemkv.h +++ b/native/src/pmemkv.h @@ -135,6 +135,9 @@ class pmemkv { return -1; } struct block_entry* bep = (struct block_entry*)pmemobj_direct(beo); + if (bep == nullptr){ + return -1; + } bep->data = pmemobj_tx_zalloc(count, DATA_TYPE); if (bep->data.off == 0) { (void) pmemobj_tx_end(); @@ -156,7 +159,10 @@ class pmemkv { } else { // add the modified tail entry to the undo data pmemobj_tx_add_range(bp->tail, 0, sizeof(struct block_entry)); - ((struct block_entry*)pmemobj_direct(bp->tail))->hdr.next = beo; + block_entry* tail_bep = (struct block_entry*)pmemobj_direct(bp->tail); + if (tail_bep != nullptr){ + tail_bep->hdr.next = beo; + } } bp->tail = beo; // update tail @@ -261,7 +267,9 @@ class pmemkv { //There are two or more block_entry bp->head = bep->hdr.next; block_entry* new_head_pointer = (struct block_entry*)pmemobj_direct(bp->head); - new_head_pointer->hdr.pre = OID_NULL; + if(new_head_pointer != nullptr){ + new_head_pointer->hdr.pre = OID_NULL; + } bp->bytes_written = bp->bytes_written - bep->hdr.size; pmemobj_free(&bep->data); pmemobj_free(&cur->beo); @@ -277,7 +285,9 @@ class pmemkv { if (pmemobj_direct(bep->hdr.next) == nullptr){ //The one node scenario is already covered in head judgement, there are two or more nodes here struct block_entry* prebep = (struct block_entry*)pmemobj_direct(bep->hdr.pre); - prebep->hdr.next = OID_NULL; + if (prebep != nullptr){ + prebep->hdr.next = OID_NULL; + } bp->tail = bep->hdr.pre; if((struct block_entry*)pmemobj_direct(bp->tail) == nullptr){ std::cout<<"Error. The bp->tail should not be nullptr"<hdr.pre); + if(prebep == nullptr){ + return -1; + } prebep->hdr.next = bep->hdr.next; struct block_entry* nextbep = (struct block_entry*)pmemobj_direct(bep->hdr.next); + if(nextbep == nullptr){ + return -1; + } nextbep->hdr.pre = bep->hdr.pre; bp->bytes_written = bp->bytes_written - bep->hdr.size; pmemobj_free(&bep->data); @@ -459,6 +475,9 @@ class pmemkv { } bo = pmemobj_root(pmem_pool, sizeof(struct base)); bp = (struct base*)pmemobj_direct(bo); + if(bp == nullptr){ + return -1; + } bp->head = OID_NULL; bp->tail = OID_NULL; bp->bytes_written = 0; @@ -478,6 +497,9 @@ class pmemkv { // walk through all the block entry in pmem, don't need lock here bo = pmemobj_root(pmem_pool, sizeof(struct base)); bp = (struct base*)pmemobj_direct(bo); + if(bp == nullptr){ + return -1; + } struct block_entry *next = (struct block_entry*)pmemobj_direct(bp->head); PMEMoid next_beo = bp->head; while (next != nullptr) { @@ -534,6 +556,7 @@ class pmemkv { bm->beo = beo; struct block_meta_list* bml = (struct block_meta_list*)std::malloc(sizeof(block_meta_list)); if (!bml) { + free(bm); perror("malloc error in pmemkv update_meta"); return -1; }