Skip to content

Commit

Permalink
[NSE-746]Fix memory allocation in row to columnar (oap-project#809)
Browse files Browse the repository at this point in the history
* fix row to columnar

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>

* fix overflow

Signed-off-by: Yuan Zhou <yuan.zhou@intel.com>
  • Loading branch information
zhouyuan committed Apr 7, 2022
1 parent 89a2727 commit 156c2ed
Showing 1 changed file with 3 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,13 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
private val converters = new RowToColumnConverter(localSchema)
private var last_cb: ColumnarBatch = null
private var elapse: Long = 0
// Allocate large buffer to store the numRows rows
val bufferSize = 134217728 // 128M can estimator the buffer size based on the data type
val allocator = SparkMemoryUtils.contextAllocator()
var arrowBuf: ArrowBuf = null
override def hasNext: Boolean = {
rowIterator.hasNext
}
TaskContext.get().addTaskCompletionListener[Unit] { _ =>
if (arrowBuf != null) {
if (arrowBuf != null && arrowBuf.isOpen()) {
arrowBuf.close()
}
}
Expand All @@ -144,7 +142,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
isUnsafeRow = false
}

if (arrowBuf != null && isUnsafeRow) {
if (isUnsafeRow) {
val rowLength = new ListBuffer[Long]()
var rowCount = 0
var offset = 0
Expand All @@ -154,7 +152,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode {
val unsafeRow = firstRow.asInstanceOf[UnsafeRow]
val sizeInBytes = unsafeRow.getSizeInBytes
// allocate buffer based on 1st row
val estimatedBufSize = sizeInBytes * numRows * 1.2
val estimatedBufSize = sizeInBytes.toDouble * numRows * 1.2
arrowBuf = allocator.buffer(estimatedBufSize.toLong)
Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset,
null, arrowBuf.memoryAddress() + offset, sizeInBytes)
Expand Down

0 comments on commit 156c2ed

Please sign in to comment.