Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-481] JVM heap memory leak on memory leak tracker facilities (#482)…
Browse files Browse the repository at this point in the history
… (#483)

* [NSE-481] JVM heap memory leak on memory leak tracker facilities

* fixup

* fixup

Co-authored-by: Hongze Zhang <hongze.zhang@intel.com>
  • Loading branch information
zhouyuan and zhztheplayer authored Aug 19, 2021
1 parent 6819ee8 commit 3805f33
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
*/
public class SparkManagedReservationListener implements ReservationListener {

private final NativeSQLMemoryConsumer consumer;
private final NativeSQLMemoryMetrics metrics;
private NativeSQLMemoryConsumer consumer;
private NativeSQLMemoryMetrics metrics;
private volatile boolean open = true;

public SparkManagedReservationListener(NativeSQLMemoryConsumer consumer, NativeSQLMemoryMetrics metrics) {
this.consumer = consumer;
Expand All @@ -34,13 +35,30 @@ public SparkManagedReservationListener(NativeSQLMemoryConsumer consumer, NativeS

@Override
public void reserve(long size) {
consumer.acquire(size);
metrics.inc(size);
synchronized (this) {
if (!open) {
return;
}
consumer.acquire(size);
metrics.inc(size);
}
}

@Override
public void unreserve(long size) {
consumer.free(size);
metrics.inc(-size);
synchronized (this) {
if (!open) {
return;
}
consumer.free(size);
metrics.inc(-size);
}
}

public void inactivate() {
synchronized (this) {
consumer = null; // make it gc reachable
open = false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import org.apache.arrow.dataset.jni.NativeMemoryPool
import org.apache.arrow.memory.BufferAllocator

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.util.TaskCompletionListener

object SparkMemoryUtils {
object SparkMemoryUtils extends Logging {
private val DEBUG: Boolean = false

class TaskMemoryResources {
if (!inSparkTask()) {
Expand All @@ -49,25 +51,25 @@ object SparkMemoryUtils {
UUID.randomUUID().toString, al, 0, parent.getLimit)
}

val defaultMemoryPool: NativeMemoryPool = {
val defaultMemoryPool: NativeMemoryPoolWrapper = {
val rl = new SparkManagedReservationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP),
sharedMetrics)
NativeMemoryPool.createListenable(rl)
NativeMemoryPoolWrapper(NativeMemoryPool.createListenable(rl), rl)
}

private val allocators = new util.ArrayList[BufferAllocator]()
allocators.add(defaultAllocator)

private val memoryPools = new util.ArrayList[NativeMemoryPool]()
private val memoryPools = new util.ArrayList[NativeMemoryPoolWrapper]()
memoryPools.add(defaultMemoryPool)

def createSpillableMemoryPool(spiller: Spiller): NativeMemoryPool = {
val rl = new SparkManagedReservationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), spiller),
sharedMetrics)
val pool = NativeMemoryPool.createListenable(rl)
memoryPools.add(pool)
memoryPools.add(NativeMemoryPoolWrapper(pool, rl))
pool
}

Expand Down Expand Up @@ -99,16 +101,23 @@ object SparkMemoryUtils {
*/
private def softClose(allocator: BufferAllocator): Unit = {
// move to leaked list
leakedAllocators.add(allocator)
logWarning(s"Detected leaked allocator, size: ${allocator.getAllocatedMemory}...")
if (DEBUG) {
leakedAllocators.add(allocator)
}
}

private def close(pool: NativeMemoryPool): Unit = {
pool.close()
private def close(pool: NativeMemoryPoolWrapper): Unit = {
pool.pool.close()
}

private def softClose(pool: NativeMemoryPool): Unit = {
private def softClose(pool: NativeMemoryPoolWrapper): Unit = {
// move to leaked list
leakedMemoryPools.add(pool)
logWarning(s"Detected leaked memory pool, size: ${pool.pool.getBytesAllocated}...")
pool.listener.inactivate()
if (DEBUG) {
leakedMemoryPools.add(pool)
}
}

def release(): Unit = {
Expand All @@ -121,7 +130,7 @@ object SparkMemoryUtils {
}
}
for (pool <- memoryPools.asScala) {
val allocated = pool.getBytesAllocated
val allocated = pool.pool.getBytesAllocated
if (allocated == 0L) {
close(pool)
} else {
Expand All @@ -134,7 +143,7 @@ object SparkMemoryUtils {
private val taskToResourcesMap = new java.util.IdentityHashMap[TaskContext, TaskMemoryResources]()

private val leakedAllocators = new java.util.Vector[BufferAllocator]()
private val leakedMemoryPools = new java.util.Vector[NativeMemoryPool]()
private val leakedMemoryPools = new java.util.Vector[NativeMemoryPoolWrapper]()

private def getLocalTaskContext: TaskContext = TaskContext.get()

Expand Down Expand Up @@ -214,16 +223,16 @@ object SparkMemoryUtils {
if (!inSparkTask()) {
return globalMemoryPool()
}
getTaskMemoryResources().defaultMemoryPool
getTaskMemoryResources().defaultMemoryPool.pool
}

def getLeakedAllocators(): List[BufferAllocator] = {
val list = new util.ArrayList[BufferAllocator](leakedAllocators)
list.asScala.toList
}

def getLeakedMemoryPools(): List[NativeMemoryPool] = {
val list = new util.ArrayList[NativeMemoryPool](leakedMemoryPools)
def getLeakedMemoryPools(): List[NativeMemoryPoolWrapper] = {
val list = new util.ArrayList[NativeMemoryPoolWrapper](leakedMemoryPools)
list.asScala.toList
}

Expand Down Expand Up @@ -263,4 +272,7 @@ object SparkMemoryUtils {
retained = None
}
}

case class NativeMemoryPoolWrapper(pool: NativeMemoryPool,
listener: SparkManagedReservationListener)
}

0 comments on commit 3805f33

Please sign in to comment.