From 5375c96e4d0df149d000cbf15e15393e08793384 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 19 Aug 2021 06:32:55 +0800 Subject: [PATCH] [NSE-481] JVM heap memory leak on memory leak tracker facilities (#482) * [NSE-481] JVM heap memory leak on memory leak tracker facilities * fixup * fixup --- .../SparkManagedReservationListener.java | 30 ++++++++++--- .../v2/arrow/SparkMemoryUtils.scala | 42 ++++++++++++------- 2 files changed, 51 insertions(+), 21 deletions(-) diff --git a/arrow-data-source/common/src/main/java/com/intel/oap/spark/sql/execution/datasources/v2/arrow/SparkManagedReservationListener.java b/arrow-data-source/common/src/main/java/com/intel/oap/spark/sql/execution/datasources/v2/arrow/SparkManagedReservationListener.java index 92b140bc9..27f1b76ad 100644 --- a/arrow-data-source/common/src/main/java/com/intel/oap/spark/sql/execution/datasources/v2/arrow/SparkManagedReservationListener.java +++ b/arrow-data-source/common/src/main/java/com/intel/oap/spark/sql/execution/datasources/v2/arrow/SparkManagedReservationListener.java @@ -24,8 +24,9 @@ */ public class SparkManagedReservationListener implements ReservationListener { - private final NativeSQLMemoryConsumer consumer; - private final NativeSQLMemoryMetrics metrics; + private NativeSQLMemoryConsumer consumer; + private NativeSQLMemoryMetrics metrics; + private volatile boolean open = true; public SparkManagedReservationListener(NativeSQLMemoryConsumer consumer, NativeSQLMemoryMetrics metrics) { this.consumer = consumer; @@ -34,13 +35,30 @@ public SparkManagedReservationListener(NativeSQLMemoryConsumer consumer, NativeS @Override public void reserve(long size) { - consumer.acquire(size); - metrics.inc(size); + synchronized (this) { + if (!open) { + return; + } + consumer.acquire(size); + metrics.inc(size); + } } @Override public void unreserve(long size) { - consumer.free(size); - metrics.inc(-size); + synchronized (this) { + if (!open) { + return; + } + consumer.free(size); + metrics.inc(-size); + } + } + + public void inactivate() { + synchronized (this) { + consumer = null; // make it gc reachable + open = false; + } } } diff --git a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala index d0056c0d5..0ca9921a6 100644 --- a/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala +++ b/arrow-data-source/common/src/main/scala/org/apache/spark/sql/execution/datasources/v2/arrow/SparkMemoryUtils.scala @@ -27,10 +27,12 @@ import org.apache.arrow.dataset.jni.NativeMemoryPool import org.apache.arrow.memory.BufferAllocator import org.apache.spark.TaskContext +import org.apache.spark.internal.Logging import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.util.TaskCompletionListener -object SparkMemoryUtils { +object SparkMemoryUtils extends Logging { + private val DEBUG: Boolean = false class TaskMemoryResources { if (!inSparkTask()) { @@ -49,17 +51,17 @@ object SparkMemoryUtils { UUID.randomUUID().toString, al, 0, parent.getLimit) } - val defaultMemoryPool: NativeMemoryPool = { + val defaultMemoryPool: NativeMemoryPoolWrapper = { val rl = new SparkManagedReservationListener( new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP), sharedMetrics) - NativeMemoryPool.createListenable(rl) + NativeMemoryPoolWrapper(NativeMemoryPool.createListenable(rl), rl) } private val allocators = new util.ArrayList[BufferAllocator]() allocators.add(defaultAllocator) - private val memoryPools = new util.ArrayList[NativeMemoryPool]() + private val memoryPools = new util.ArrayList[NativeMemoryPoolWrapper]() memoryPools.add(defaultMemoryPool) def createSpillableMemoryPool(spiller: Spiller): NativeMemoryPool = { @@ -67,7 +69,7 @@ object SparkMemoryUtils { new NativeSQLMemoryConsumer(getTaskMemoryManager(), spiller), sharedMetrics) val pool = NativeMemoryPool.createListenable(rl) - memoryPools.add(pool) + memoryPools.add(NativeMemoryPoolWrapper(pool, rl)) pool } @@ -99,16 +101,23 @@ object SparkMemoryUtils { */ private def softClose(allocator: BufferAllocator): Unit = { // move to leaked list - leakedAllocators.add(allocator) + logWarning(s"Detected leaked allocator, size: ${allocator.getAllocatedMemory}...") + if (DEBUG) { + leakedAllocators.add(allocator) + } } - private def close(pool: NativeMemoryPool): Unit = { - pool.close() + private def close(pool: NativeMemoryPoolWrapper): Unit = { + pool.pool.close() } - private def softClose(pool: NativeMemoryPool): Unit = { + private def softClose(pool: NativeMemoryPoolWrapper): Unit = { // move to leaked list - leakedMemoryPools.add(pool) + logWarning(s"Detected leaked memory pool, size: ${pool.pool.getBytesAllocated}...") + pool.listener.inactivate() + if (DEBUG) { + leakedMemoryPools.add(pool) + } } def release(): Unit = { @@ -121,7 +130,7 @@ object SparkMemoryUtils { } } for (pool <- memoryPools.asScala) { - val allocated = pool.getBytesAllocated + val allocated = pool.pool.getBytesAllocated if (allocated == 0L) { close(pool) } else { @@ -134,7 +143,7 @@ object SparkMemoryUtils { private val taskToResourcesMap = new java.util.IdentityHashMap[TaskContext, TaskMemoryResources]() private val leakedAllocators = new java.util.Vector[BufferAllocator]() - private val leakedMemoryPools = new java.util.Vector[NativeMemoryPool]() + private val leakedMemoryPools = new java.util.Vector[NativeMemoryPoolWrapper]() private def getLocalTaskContext: TaskContext = TaskContext.get() @@ -214,7 +223,7 @@ object SparkMemoryUtils { if (!inSparkTask()) { return globalMemoryPool() } - getTaskMemoryResources().defaultMemoryPool + getTaskMemoryResources().defaultMemoryPool.pool } def getLeakedAllocators(): List[BufferAllocator] = { @@ -222,8 +231,8 @@ object SparkMemoryUtils { list.asScala.toList } - def getLeakedMemoryPools(): List[NativeMemoryPool] = { - val list = new util.ArrayList[NativeMemoryPool](leakedMemoryPools) + def getLeakedMemoryPools(): List[NativeMemoryPoolWrapper] = { + val list = new util.ArrayList[NativeMemoryPoolWrapper](leakedMemoryPools) list.asScala.toList } @@ -263,4 +272,7 @@ object SparkMemoryUtils { retained = None } } + + case class NativeMemoryPoolWrapper(pool: NativeMemoryPool, + listener: SparkManagedReservationListener) }