Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-481] JVM heap memory leak on memory leak tracker facilities #482

Merged
merged 3 commits into from
Aug 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
*/
public class SparkManagedReservationListener implements ReservationListener {

private final NativeSQLMemoryConsumer consumer;
private final NativeSQLMemoryMetrics metrics;
private NativeSQLMemoryConsumer consumer;
private NativeSQLMemoryMetrics metrics;
private volatile boolean open = true;

public SparkManagedReservationListener(NativeSQLMemoryConsumer consumer, NativeSQLMemoryMetrics metrics) {
this.consumer = consumer;
Expand All @@ -34,13 +35,30 @@ public SparkManagedReservationListener(NativeSQLMemoryConsumer consumer, NativeS

@Override
public void reserve(long size) {
consumer.acquire(size);
metrics.inc(size);
synchronized (this) {
if (!open) {
return;
}
consumer.acquire(size);
metrics.inc(size);
}
}

@Override
public void unreserve(long size) {
consumer.free(size);
metrics.inc(-size);
synchronized (this) {
if (!open) {
return;
}
consumer.free(size);
metrics.inc(-size);
}
}

public void inactivate() {
synchronized (this) {
consumer = null; // make it gc reachable
open = false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import org.apache.arrow.dataset.jni.NativeMemoryPool
import org.apache.arrow.memory.BufferAllocator

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.util.TaskCompletionListener

object SparkMemoryUtils {
object SparkMemoryUtils extends Logging {
private val DEBUG: Boolean = false

class TaskMemoryResources {
if (!inSparkTask()) {
Expand All @@ -49,25 +51,25 @@ object SparkMemoryUtils {
UUID.randomUUID().toString, al, 0, parent.getLimit)
}

val defaultMemoryPool: NativeMemoryPool = {
val defaultMemoryPool: NativeMemoryPoolWrapper = {
val rl = new SparkManagedReservationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP),
sharedMetrics)
NativeMemoryPool.createListenable(rl)
NativeMemoryPoolWrapper(NativeMemoryPool.createListenable(rl), rl)
}

private val allocators = new util.ArrayList[BufferAllocator]()
allocators.add(defaultAllocator)

private val memoryPools = new util.ArrayList[NativeMemoryPool]()
private val memoryPools = new util.ArrayList[NativeMemoryPoolWrapper]()
memoryPools.add(defaultMemoryPool)

def createSpillableMemoryPool(spiller: Spiller): NativeMemoryPool = {
val rl = new SparkManagedReservationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), spiller),
sharedMetrics)
val pool = NativeMemoryPool.createListenable(rl)
memoryPools.add(pool)
memoryPools.add(NativeMemoryPoolWrapper(pool, rl))
pool
}

Expand Down Expand Up @@ -99,16 +101,23 @@ object SparkMemoryUtils {
*/
private def softClose(allocator: BufferAllocator): Unit = {
// move to leaked list
leakedAllocators.add(allocator)
logWarning(s"Detected leaked allocator, size: ${allocator.getAllocatedMemory}...")
if (DEBUG) {
leakedAllocators.add(allocator)
}
}

private def close(pool: NativeMemoryPool): Unit = {
pool.close()
private def close(pool: NativeMemoryPoolWrapper): Unit = {
pool.pool.close()
}

private def softClose(pool: NativeMemoryPool): Unit = {
private def softClose(pool: NativeMemoryPoolWrapper): Unit = {
// move to leaked list
leakedMemoryPools.add(pool)
logWarning(s"Detected leaked memory pool, size: ${pool.pool.getBytesAllocated}...")
pool.listener.inactivate()
if (DEBUG) {
leakedMemoryPools.add(pool)
}
}

def release(): Unit = {
Expand All @@ -121,7 +130,7 @@ object SparkMemoryUtils {
}
}
for (pool <- memoryPools.asScala) {
val allocated = pool.getBytesAllocated
val allocated = pool.pool.getBytesAllocated
if (allocated == 0L) {
close(pool)
} else {
Expand All @@ -134,7 +143,7 @@ object SparkMemoryUtils {
private val taskToResourcesMap = new java.util.IdentityHashMap[TaskContext, TaskMemoryResources]()

private val leakedAllocators = new java.util.Vector[BufferAllocator]()
private val leakedMemoryPools = new java.util.Vector[NativeMemoryPool]()
private val leakedMemoryPools = new java.util.Vector[NativeMemoryPoolWrapper]()

private def getLocalTaskContext: TaskContext = TaskContext.get()

Expand Down Expand Up @@ -214,16 +223,16 @@ object SparkMemoryUtils {
if (!inSparkTask()) {
return globalMemoryPool()
}
getTaskMemoryResources().defaultMemoryPool
getTaskMemoryResources().defaultMemoryPool.pool
}

def getLeakedAllocators(): List[BufferAllocator] = {
val list = new util.ArrayList[BufferAllocator](leakedAllocators)
list.asScala.toList
}

def getLeakedMemoryPools(): List[NativeMemoryPool] = {
val list = new util.ArrayList[NativeMemoryPool](leakedMemoryPools)
def getLeakedMemoryPools(): List[NativeMemoryPoolWrapper] = {
val list = new util.ArrayList[NativeMemoryPoolWrapper](leakedMemoryPools)
list.asScala.toList
}

Expand Down Expand Up @@ -263,4 +272,7 @@ object SparkMemoryUtils {
retained = None
}
}

case class NativeMemoryPoolWrapper(pool: NativeMemoryPool,
listener: SparkManagedReservationListener)
}