Skip to content

Commit

Permalink
[REMOTE-SHUFFLE-24] Enhance executor memory release (#25)
Browse files Browse the repository at this point in the history
* [REMOTE-SHUFFLE-24] Enhance executor memory release

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>
  • Loading branch information
jiafuzha authored Jun 15, 2021
1 parent 492eefc commit 85d0698
Showing 1 changed file with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,11 @@ class MapPartitionsWriter[K, V, C](
}
val (head, end) = if (comparator.isDefined) {
val mapHead = new SizeAwareMap[K, C](-1, partBufferThreshold,
totalBufferInitial, taskMemManager, this)
totalBufferInitial, this)
val mapEnd = new SizeAwareMap[K, C](-2, partBufferThreshold,
totalBufferInitial, taskMemManager, this)
totalBufferInitial, this)
(0 until numPartitions).foreach(i => {
val map = new SizeAwareMap[K, C](i, partBufferThreshold, totalBufferInitial, taskMemManager, this)
val map = new SizeAwareMap[K, C](i, partBufferThreshold, totalBufferInitial, this)
partitionMapArray(i) = map
if (i > 0) {
val prevMap = partitionMapArray(i - 1)
Expand All @@ -200,11 +200,11 @@ class MapPartitionsWriter[K, V, C](
(mapHead, mapEnd)
} else {
val bufferHead = new SizeAwareBuffer[K, C](-1, partBufferThreshold,
totalBufferInitial, taskMemManager, this)
totalBufferInitial, this)
val bufferEnd = new SizeAwareBuffer[K, C](-2, partBufferThreshold,
totalBufferInitial, taskMemManager, this)
totalBufferInitial, this)
(0 until numPartitions).foreach(i => {
val buffer = new SizeAwareBuffer[K, C](i, partBufferThreshold, totalBufferInitial, taskMemManager, this)
val buffer = new SizeAwareBuffer[K, C](i, partBufferThreshold, totalBufferInitial, this)
partitionBufferArray(i) = buffer
if (i > 0) {
val prevBuffer = partitionBufferArray(i - 1)
Expand Down Expand Up @@ -291,9 +291,7 @@ class MapPartitionsWriter[K, V, C](
private def writeFromHead: Unit = {
var buffer = head.next
var count = 0
var totalSize = 0L
while (buffer != end && count < totalPartRatio) {
totalSize += buffer.estimatedSize
buffer.writeAndFlush
val emptyBuffer = buffer
buffer = buffer.next
Expand Down Expand Up @@ -326,7 +324,11 @@ class MapPartitionsWriter[K, V, C](
}

def releaseMemory(memory: Long): Unit = {
freeMemory(Math.min(memory, memoryLimit - totalBufferInitial))
memoryLimit -= memory
if (memoryLimit < totalBufferInitial) {
memoryLimit = totalBufferInitial
}
}

def flushAll: Unit = {
Expand All @@ -337,13 +339,16 @@ class MapPartitionsWriter[K, V, C](
def close: Unit = {
val buffer = if (comparator.isDefined) partitionMapArray else partitionBufferArray
buffer.foreach(b => b.close)
val allocated = memoryLimit - totalBufferInitial
if (allocated > 0) {
freeMemory(allocated)
}
}

def spill(size: Long, trigger: MemoryConsumer): Long = ???
}

private[daos] trait SizeAware[K, C] {
this: MemoryConsumer =>

protected var writeCount = 0

Expand Down Expand Up @@ -376,8 +381,8 @@ class MapPartitionsWriter[K, V, C](
}

def releaseMemory(memory: Long): Unit = {
freeMemory(memory)
parent.releaseMemory(memory)
parent.updateTotalSize(-memory)
}

private def writeAndFlush(memory: Long): Unit = {
Expand All @@ -391,8 +396,7 @@ class MapPartitionsWriter[K, V, C](
writer.flush // force write
writeCount += count
lastSize = 0
parent.updateTotalSize(-memory)
releaseMemory(memory - totalBufferInitial)
releaseMemory(memory)
reset
}
}
Expand Down Expand Up @@ -438,9 +442,8 @@ class MapPartitionsWriter[K, V, C](
val partitionId: Int,
val writeThreshold: Int,
val totalBufferInitial: Long,
taskMemoryManager: TaskMemoryManager,
val parent: PartitionsBuffer[K, C]) extends MemoryConsumer(taskMemoryManager)
with Linked[K, C] with SizeAware[K, C] {
val parent: PartitionsBuffer[K, C]) extends Linked[K, C]
with SizeAware[K, C] {

private var map = new SizeSamplerAppendOnlyMap[K, C](parent.sampleStat)
private var _estSize: Long = _
Expand Down Expand Up @@ -483,9 +486,8 @@ class MapPartitionsWriter[K, V, C](
val partitionId: Int,
val writeThreshold: Int,
val totalBufferInitial: Long,
taskMemoryManager: TaskMemoryManager,
val parent: PartitionsBuffer[K, C]) extends MemoryConsumer(taskMemoryManager)
with Linked[K, C] with SizeAware[K, C] {
val parent: PartitionsBuffer[K, C]) extends Linked[K, C]
with SizeAware[K, C] {

private var buffer = new SizeSamplerPairBuffer[K, C](parent.sampleStat)
private var _estSize: Long = _
Expand Down

0 comments on commit 85d0698

Please sign in to comment.