Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[PMEM-SHUFFLE-10] Fix potential issues reported by klockwork for branch 1.1. #13

Merged
merged 3 commits into from
Mar 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
[5. Install dependencies for PMem Shuffle](#install-dependencies-for-pmem-shuffle)
[6. Install PMem Shuffle for Spark](#install-pmem-shuffle-for-spark)
[7. PMem Shuffle for Spark Testing](#pmem-shuffle-for-spark-testing)
[8. Trouble Shooting](#trouble-shooting)
[Reference](#reference)


Expand Down Expand Up @@ -134,8 +135,8 @@ installation/enabling or FW installation is out of the scope of this guide.
6) Step 5 is required only when running this solution over RDMA is considered. Otherwise PMem can be initialized in fsdax mode.
a. Run *ndctl create-namespace -m fsdax -r region0 -s 120g*
b. Run *ndctl create-namespace -m fsdax -r region0 -s 120g*
c. Run *ndctl create-namespace -m fsdax -r region0 -s 120g*
d. Run *ndctl create-namespace -m fsdax -r region0 -s 120g*
c. Run *ndctl create-namespace -m fsdax -r region1 -s 120g*
d. Run *ndctl create-namespace -m fsdax -r region1 -s 120g*
Four namespaces /dev/pmem0, /dev/pmem0.1, /dev/pmem1, /dev/pmem1.1 are created. Note that the namespace name might vary due to existing namespaces. In general, the name is consistent with the pattern /dev/pmem*.

After creating the namespace in fsdax mode, the namespace is ready for a file system. Here we use Ext4 file system in enabling.
Expand Down Expand Up @@ -574,6 +575,9 @@ spark.driver.rport 61000

```

**FSDAX**
Use `spark.shuffle.pmof.pmpool_size` to specify the size of created shuffle file. The size should obey the rule: `max(spark.shuffle.pmof.pmpool_size) < size_of_namespace * 0.9`. It's because we need to reserve some space in fsdax namespace for meta data.

**Misc**
The config `spark.sql.shuffle.partitions` is required to set explicitly, it's suggested to use default value `200` unless you're pretty sure what's the meaning of this value.

Expand Down Expand Up @@ -781,6 +785,15 @@ spark.driver.rhost $IP //change to your
spark.driver.rport 61000

```

## <a id="trouble-shooting"></a>8. Trouble shooting
For any reason that a previous job is failed, please empty PMem spaces before another run.
It's because normal space release operation might fail to be invoked for failed jobs.

For devdax, use `pmempool rm {devdax-namespace}` to reset the entire namespace.
For fsdax, use `rm -rf {mounted-pmem-folder}/shuffle_block*` to remove corresponding shuffle pool files.


### Reference guides (without BKC access)
-----------------------------------
If you do not have BKC access, please following below official guide:
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/java/org/apache/spark/jni/pmof/JniUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,20 @@ static void loadLibraryFromJar(String libName) throws IOException, IllegalAccess
private static File moveFileFromJarToTemp(final String tmpDir, String libraryToLoad)
throws IOException {
final File temp = File.createTempFile(tmpDir, libraryToLoad);
try (final InputStream is =
JniUtils.class.getClassLoader().getResourceAsStream(libraryToLoad)) {
final InputStream is = JniUtils.class.getClassLoader().getResourceAsStream(libraryToLoad);
try {
if (is == null) {
throw new FileNotFoundException(libraryToLoad);
} else {
Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if (is != null){
is.close();
}
}
return temp;
return temp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public void createTable(String root_dir) {
if (fl != null) {
fl.release();
}
}catch(IOException e){
e.printStackTrace();
}
try{
if (fos != null) {
fos.close();
}
Expand Down Expand Up @@ -113,6 +117,10 @@ public void insertRecord(String shuffleId, String device) {
if (fl != null) {
fl.release();
}
}catch(IOException e){
e.printStackTrace();
}
try{
if (fos != null) {
fos.close();
}
Expand Down Expand Up @@ -165,6 +173,10 @@ public String getShuffleDevice(String shuffleId){
if (fl != null) {
fl.release();
}
}catch(IOException e){
e.printStackTrace();
}
try{
if (fos != null) {
fos.close();
}
Expand Down Expand Up @@ -242,6 +254,10 @@ public String getUnusedDevice(ArrayList<String> full_device_list){
if (fl != null) {
fl.release();
}
}catch(IOException e){
e.printStackTrace();
}
try{
if (fos != null) {
fos.close();
}
Expand Down
4 changes: 3 additions & 1 deletion native/src/PmemBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class PmemBuffer {
free(tmp_buf_data);
}
pos = remaining;
memcpy(buf_data + pos, pmem_data_addr, pmem_data_len);
if(buf_data != nullptr){
memcpy(buf_data + pos, pmem_data_addr, pmem_data_len);
}
} else if (remaining == 0) {
if (buf_data_capacity < pmem_data_len) {
free(buf_data);
Expand Down
24 changes: 15 additions & 9 deletions native/src/lib_jni_pmdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ JNIEXPORT jlongArray JNICALL Java_org_apache_spark_storage_pmof_PersistentMemory
uint64_t size = 0;
pmkv->get_meta_size(key_str, &size);
struct memory_meta* mm = (struct memory_meta*)std::malloc(sizeof(struct memory_meta));
mm->meta = (uint64_t*)std::malloc(size*2*sizeof(uint64_t));
pmkv->get_meta(key_str, mm);
jlongArray data = env->NewLongArray(mm->length);
env->SetLongArrayRegion(data, 0, mm->length, (jlong*)mm->meta);
std::free(mm->meta);
std::free(mm);
return data;
if(mm != nullptr){
mm->meta = (uint64_t*)std::malloc(size*2*sizeof(uint64_t));
pmkv->get_meta(key_str, mm);
jlongArray data = env->NewLongArray(mm->length);
env->SetLongArrayRegion(data, 0, mm->length, (jlong*)mm->meta);
std::free(mm->meta);
std::free(mm);
return data;
}
return nullptr;
}

JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeGetBlockSize
Expand All @@ -61,8 +64,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_
JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeCloseDevice
(JNIEnv *env, jclass obj, jlong kv) {
pmemkv *pmkv = static_cast<pmemkv*>((void*)kv);
pmkv->free_all();
long result = pmkv->free_all();
delete pmkv;
return result;
}

JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeGetRoot
Expand Down Expand Up @@ -110,7 +114,8 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeWrite

JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeGetPmemBufferRemaining
(JNIEnv *env, jobject obj, jlong pmBuffer) {
((PmemBuffer*)pmBuffer)->getRemaining();
int remaining = ((PmemBuffer*)pmBuffer)->getRemaining();
return remaining;
}

JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeGetPmemBufferDataAddr
Expand All @@ -121,6 +126,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeGetP
JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeCleanPmemBuffer
(JNIEnv *env, jobject obj, jlong pmBuffer) {
((PmemBuffer*)pmBuffer)->clean();
return 0;
}

JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeDeletePmemBuffer
Expand Down
Binary file added native/src/libjnipmdk.so
Binary file not shown.
29 changes: 26 additions & 3 deletions native/src/pmemkv.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ class pmemkv {
return -1;
}
struct block_entry* bep = (struct block_entry*)pmemobj_direct(beo);
if (bep == nullptr){
return -1;
}
bep->data = pmemobj_tx_zalloc(count, DATA_TYPE);
if (bep->data.off == 0) {
(void) pmemobj_tx_end();
Expand All @@ -156,7 +159,10 @@ class pmemkv {
} else {
// add the modified tail entry to the undo data
pmemobj_tx_add_range(bp->tail, 0, sizeof(struct block_entry));
((struct block_entry*)pmemobj_direct(bp->tail))->hdr.next = beo;
block_entry* tail_bep = (struct block_entry*)pmemobj_direct(bp->tail);
if (tail_bep != nullptr){
tail_bep->hdr.next = beo;
}
}

bp->tail = beo; // update tail
Expand Down Expand Up @@ -261,7 +267,9 @@ class pmemkv {
//There are two or more block_entry
bp->head = bep->hdr.next;
block_entry* new_head_pointer = (struct block_entry*)pmemobj_direct(bp->head);
new_head_pointer->hdr.pre = OID_NULL;
if(new_head_pointer != nullptr){
new_head_pointer->hdr.pre = OID_NULL;
}
bp->bytes_written = bp->bytes_written - bep->hdr.size;
pmemobj_free(&bep->data);
pmemobj_free(&cur->beo);
Expand All @@ -277,7 +285,9 @@ class pmemkv {
if (pmemobj_direct(bep->hdr.next) == nullptr){
//The one node scenario is already covered in head judgement, there are two or more nodes here
struct block_entry* prebep = (struct block_entry*)pmemobj_direct(bep->hdr.pre);
prebep->hdr.next = OID_NULL;
if (prebep != nullptr){
prebep->hdr.next = OID_NULL;
}
bp->tail = bep->hdr.pre;
if((struct block_entry*)pmemobj_direct(bp->tail) == nullptr){
std::cout<<"Error. The bp->tail should not be nullptr"<<std::endl;
Expand All @@ -296,8 +306,14 @@ class pmemkv {

//Node to be deleted is at the middle, no head or tail, there are at least three nodes
struct block_entry* prebep = (struct block_entry*)pmemobj_direct(bep->hdr.pre);
if(prebep == nullptr){
return -1;
}
prebep->hdr.next = bep->hdr.next;
struct block_entry* nextbep = (struct block_entry*)pmemobj_direct(bep->hdr.next);
if(nextbep == nullptr){
return -1;
}
nextbep->hdr.pre = bep->hdr.pre;
bp->bytes_written = bp->bytes_written - bep->hdr.size;
pmemobj_free(&bep->data);
Expand Down Expand Up @@ -459,6 +475,9 @@ class pmemkv {
}
bo = pmemobj_root(pmem_pool, sizeof(struct base));
bp = (struct base*)pmemobj_direct(bo);
if(bp == nullptr){
return -1;
}
bp->head = OID_NULL;
bp->tail = OID_NULL;
bp->bytes_written = 0;
Expand All @@ -478,6 +497,9 @@ class pmemkv {
// walk through all the block entry in pmem, don't need lock here
bo = pmemobj_root(pmem_pool, sizeof(struct base));
bp = (struct base*)pmemobj_direct(bo);
if(bp == nullptr){
return -1;
}
struct block_entry *next = (struct block_entry*)pmemobj_direct(bp->head);
PMEMoid next_beo = bp->head;
while (next != nullptr) {
Expand Down Expand Up @@ -534,6 +556,7 @@ class pmemkv {
bm->beo = beo;
struct block_meta_list* bml = (struct block_meta_list*)std::malloc(sizeof(block_meta_list));
if (!bml) {
free(bm);
perror("malloc error in pmemkv update_meta");
return -1;
}
Expand Down