Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
Merge pull request #13 from Eugene-Mark/klockwork
Browse files Browse the repository at this point in the history
[PMEM-SHUFFLE-10] Fix potential issues reported by klockwork for branch 1.1.
  • Loading branch information
Jian-Zhang authored Mar 29, 2021
2 parents 8bf35eb + d0cc866 commit 20662d6
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 18 deletions.
17 changes: 15 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
[5. Install dependencies for PMem Shuffle](#install-dependencies-for-pmem-shuffle)
[6. Install PMem Shuffle for Spark](#install-pmem-shuffle-for-spark)
[7. PMem Shuffle for Spark Testing](#pmem-shuffle-for-spark-testing)
[8. Trouble Shooting](#trouble-shooting)
[Reference](#reference)


Expand Down Expand Up @@ -134,8 +135,8 @@ installation/enabling or FW installation is out of the scope of this guide.
6) Step 5 is required only when running this solution over RDMA is considered. Otherwise PMem can be initialized in fsdax mode.
a. Run *ndctl create-namespace -m fsdax -r region0 -s 120g*
b. Run *ndctl create-namespace -m fsdax -r region0 -s 120g*
c. Run *ndctl create-namespace -m fsdax -r region0 -s 120g*
d. Run *ndctl create-namespace -m fsdax -r region0 -s 120g*
c. Run *ndctl create-namespace -m fsdax -r region1 -s 120g*
d. Run *ndctl create-namespace -m fsdax -r region1 -s 120g*
Four namespaces /dev/pmem0, /dev/pmem0.1, /dev/pmem1, /dev/pmem1.1 are created. Note that the namespace name might vary due to existing namespaces. In general, the name is consistent with the pattern /dev/pmem*.

After creating the namespace in fsdax mode, the namespace is ready for a file system. Here we use Ext4 file system in enabling.
Expand Down Expand Up @@ -574,6 +575,9 @@ spark.driver.rport 61000
```
**FSDAX**
Use `spark.shuffle.pmof.pmpool_size` to specify the size of created shuffle file. The size should obey the rule: `max(spark.shuffle.pmof.pmpool_size) < size_of_namespace * 0.9`. It's because we need to reserve some space in fsdax namespace for meta data.
**Misc**
The config `spark.sql.shuffle.partitions` is required to set explicitly, it's suggested to use default value `200` unless you're pretty sure what's the meaning of this value.
Expand Down Expand Up @@ -781,6 +785,15 @@ spark.driver.rhost $IP //change to your
spark.driver.rport 61000
```
## <a id="trouble-shooting"></a>8. Trouble shooting
For any reason that a previous job is failed, please empty PMem spaces before another run.
It's because normal space release operation might fail to be invoked for failed jobs.
For devdax, use `pmempool rm {devdax-namespace}` to reset the entire namespace.
For fsdax, use `rm -rf {mounted-pmem-folder}/shuffle_block*` to remove corresponding shuffle pool files.
### Reference guides (without BKC access)
-----------------------------------
If you do not have BKC access, please following below official guide:
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/java/org/apache/spark/jni/pmof/JniUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,20 @@ static void loadLibraryFromJar(String libName) throws IOException, IllegalAccess
private static File moveFileFromJarToTemp(final String tmpDir, String libraryToLoad)
throws IOException {
final File temp = File.createTempFile(tmpDir, libraryToLoad);
try (final InputStream is =
JniUtils.class.getClassLoader().getResourceAsStream(libraryToLoad)) {
final InputStream is = JniUtils.class.getClassLoader().getResourceAsStream(libraryToLoad);
try {
if (is == null) {
throw new FileNotFoundException(libraryToLoad);
} else {
Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING);
}
}catch(Exception e){
e.printStackTrace();
}finally{
if (is != null){
is.close();
}
}
return temp;
return temp;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public void createTable(String root_dir) {
if (fl != null) {
fl.release();
}
}catch(IOException e){
e.printStackTrace();
}
try{
if (fos != null) {
fos.close();
}
Expand Down Expand Up @@ -113,6 +117,10 @@ public void insertRecord(String shuffleId, String device) {
if (fl != null) {
fl.release();
}
}catch(IOException e){
e.printStackTrace();
}
try{
if (fos != null) {
fos.close();
}
Expand Down Expand Up @@ -165,6 +173,10 @@ public String getShuffleDevice(String shuffleId){
if (fl != null) {
fl.release();
}
}catch(IOException e){
e.printStackTrace();
}
try{
if (fos != null) {
fos.close();
}
Expand Down Expand Up @@ -242,6 +254,10 @@ public String getUnusedDevice(ArrayList<String> full_device_list){
if (fl != null) {
fl.release();
}
}catch(IOException e){
e.printStackTrace();
}
try{
if (fos != null) {
fos.close();
}
Expand Down
4 changes: 3 additions & 1 deletion native/src/PmemBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class PmemBuffer {
free(tmp_buf_data);
}
pos = remaining;
memcpy(buf_data + pos, pmem_data_addr, pmem_data_len);
if(buf_data != nullptr){
memcpy(buf_data + pos, pmem_data_addr, pmem_data_len);
}
} else if (remaining == 0) {
if (buf_data_capacity < pmem_data_len) {
free(buf_data);
Expand Down
24 changes: 15 additions & 9 deletions native/src/lib_jni_pmdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ JNIEXPORT jlongArray JNICALL Java_org_apache_spark_storage_pmof_PersistentMemory
uint64_t size = 0;
pmkv->get_meta_size(key_str, &size);
struct memory_meta* mm = (struct memory_meta*)std::malloc(sizeof(struct memory_meta));
mm->meta = (uint64_t*)std::malloc(size*2*sizeof(uint64_t));
pmkv->get_meta(key_str, mm);
jlongArray data = env->NewLongArray(mm->length);
env->SetLongArrayRegion(data, 0, mm->length, (jlong*)mm->meta);
std::free(mm->meta);
std::free(mm);
return data;
if(mm != nullptr){
mm->meta = (uint64_t*)std::malloc(size*2*sizeof(uint64_t));
pmkv->get_meta(key_str, mm);
jlongArray data = env->NewLongArray(mm->length);
env->SetLongArrayRegion(data, 0, mm->length, (jlong*)mm->meta);
std::free(mm->meta);
std::free(mm);
return data;
}
return nullptr;
}

JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeGetBlockSize
Expand All @@ -61,8 +64,9 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_
JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeCloseDevice
(JNIEnv *env, jclass obj, jlong kv) {
pmemkv *pmkv = static_cast<pmemkv*>((void*)kv);
pmkv->free_all();
long result = pmkv->free_all();
delete pmkv;
return result;
}

JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PersistentMemoryPool_nativeGetRoot
Expand Down Expand Up @@ -110,7 +114,8 @@ JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeWrite

JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeGetPmemBufferRemaining
(JNIEnv *env, jobject obj, jlong pmBuffer) {
((PmemBuffer*)pmBuffer)->getRemaining();
int remaining = ((PmemBuffer*)pmBuffer)->getRemaining();
return remaining;
}

JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeGetPmemBufferDataAddr
Expand All @@ -121,6 +126,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeGetP
JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeCleanPmemBuffer
(JNIEnv *env, jobject obj, jlong pmBuffer) {
((PmemBuffer*)pmBuffer)->clean();
return 0;
}

JNIEXPORT jint JNICALL Java_org_apache_spark_storage_pmof_PmemBuffer_nativeDeletePmemBuffer
Expand Down
Binary file added native/src/libjnipmdk.so
Binary file not shown.
29 changes: 26 additions & 3 deletions native/src/pmemkv.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ class pmemkv {
return -1;
}
struct block_entry* bep = (struct block_entry*)pmemobj_direct(beo);
if (bep == nullptr){
return -1;
}
bep->data = pmemobj_tx_zalloc(count, DATA_TYPE);
if (bep->data.off == 0) {
(void) pmemobj_tx_end();
Expand All @@ -156,7 +159,10 @@ class pmemkv {
} else {
// add the modified tail entry to the undo data
pmemobj_tx_add_range(bp->tail, 0, sizeof(struct block_entry));
((struct block_entry*)pmemobj_direct(bp->tail))->hdr.next = beo;
block_entry* tail_bep = (struct block_entry*)pmemobj_direct(bp->tail);
if (tail_bep != nullptr){
tail_bep->hdr.next = beo;
}
}

bp->tail = beo; // update tail
Expand Down Expand Up @@ -261,7 +267,9 @@ class pmemkv {
//There are two or more block_entry
bp->head = bep->hdr.next;
block_entry* new_head_pointer = (struct block_entry*)pmemobj_direct(bp->head);
new_head_pointer->hdr.pre = OID_NULL;
if(new_head_pointer != nullptr){
new_head_pointer->hdr.pre = OID_NULL;
}
bp->bytes_written = bp->bytes_written - bep->hdr.size;
pmemobj_free(&bep->data);
pmemobj_free(&cur->beo);
Expand All @@ -277,7 +285,9 @@ class pmemkv {
if (pmemobj_direct(bep->hdr.next) == nullptr){
//The one node scenario is already covered in head judgement, there are two or more nodes here
struct block_entry* prebep = (struct block_entry*)pmemobj_direct(bep->hdr.pre);
prebep->hdr.next = OID_NULL;
if (prebep != nullptr){
prebep->hdr.next = OID_NULL;
}
bp->tail = bep->hdr.pre;
if((struct block_entry*)pmemobj_direct(bp->tail) == nullptr){
std::cout<<"Error. The bp->tail should not be nullptr"<<std::endl;
Expand All @@ -296,8 +306,14 @@ class pmemkv {

//Node to be deleted is at the middle, no head or tail, there are at least three nodes
struct block_entry* prebep = (struct block_entry*)pmemobj_direct(bep->hdr.pre);
if(prebep == nullptr){
return -1;
}
prebep->hdr.next = bep->hdr.next;
struct block_entry* nextbep = (struct block_entry*)pmemobj_direct(bep->hdr.next);
if(nextbep == nullptr){
return -1;
}
nextbep->hdr.pre = bep->hdr.pre;
bp->bytes_written = bp->bytes_written - bep->hdr.size;
pmemobj_free(&bep->data);
Expand Down Expand Up @@ -459,6 +475,9 @@ class pmemkv {
}
bo = pmemobj_root(pmem_pool, sizeof(struct base));
bp = (struct base*)pmemobj_direct(bo);
if(bp == nullptr){
return -1;
}
bp->head = OID_NULL;
bp->tail = OID_NULL;
bp->bytes_written = 0;
Expand All @@ -478,6 +497,9 @@ class pmemkv {
// walk through all the block entry in pmem, don't need lock here
bo = pmemobj_root(pmem_pool, sizeof(struct base));
bp = (struct base*)pmemobj_direct(bo);
if(bp == nullptr){
return -1;
}
struct block_entry *next = (struct block_entry*)pmemobj_direct(bp->head);
PMEMoid next_beo = bp->head;
while (next != nullptr) {
Expand Down Expand Up @@ -534,6 +556,7 @@ class pmemkv {
bm->beo = beo;
struct block_meta_list* bml = (struct block_meta_list*)std::malloc(sizeof(block_meta_list));
if (!bml) {
free(bm);
perror("malloc error in pmemkv update_meta");
return -1;
}
Expand Down

0 comments on commit 20662d6

Please sign in to comment.