Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-746]Fix memory allocation in row to columnar #809

Merged
merged 2 commits into from
Apr 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,13 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
private val converters = new RowToColumnConverter(localSchema)
private var last_cb: ColumnarBatch = null
private var elapse: Long = 0
// Allocate large buffer to store the numRows rows
val bufferSize = 134217728 // 128M can estimator the buffer size based on the data type
val allocator = SparkMemoryUtils.contextAllocator()
var arrowBuf: ArrowBuf = null
override def hasNext: Boolean = {
rowIterator.hasNext
}
TaskContext.get().addTaskCompletionListener[Unit] { _ =>
if (arrowBuf != null) {
if (arrowBuf != null && arrowBuf.isOpen()) {
arrowBuf.close()
}
}
Expand All @@ -144,7 +142,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
isUnsafeRow = false
}

if (arrowBuf != null && isUnsafeRow) {
if (isUnsafeRow) {
val rowLength = new ListBuffer[Long]()
var rowCount = 0
var offset = 0
Expand All @@ -154,7 +152,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
val unsafeRow = firstRow.asInstanceOf[UnsafeRow]
val sizeInBytes = unsafeRow.getSizeInBytes
// allocate buffer based on 1st row
val estimatedBufSize = sizeInBytes * numRows * 1.2
val estimatedBufSize = sizeInBytes.toDouble * numRows * 1.2
arrowBuf = allocator.buffer(estimatedBufSize.toLong)
Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset,
null, arrowBuf.memoryAddress() + offset, sizeInBytes)
Expand Down