From 156c2eda17bed6fd110ebeaff9e8b5b0c7e45fc6 Mon Sep 17 00:00:00 2001 From: Yuan Date: Thu, 7 Apr 2022 21:11:40 +0800 Subject: [PATCH] [NSE-746]Fix memory allocation in row to columnar (#809) * fix row to columnar Signed-off-by: Yuan Zhou * fix overflow Signed-off-by: Yuan Zhou --- .../com/intel/oap/execution/ArrowRowToColumnarExec.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala index 60f90e684..a32ae1f69 100644 --- a/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala +++ b/native-sql-engine/core/src/main/scala/com/intel/oap/execution/ArrowRowToColumnarExec.scala @@ -120,15 +120,13 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { private val converters = new RowToColumnConverter(localSchema) private var last_cb: ColumnarBatch = null private var elapse: Long = 0 - // Allocate large buffer to store the numRows rows - val bufferSize = 134217728 // 128M can estimator the buffer size based on the data type val allocator = SparkMemoryUtils.contextAllocator() var arrowBuf: ArrowBuf = null override def hasNext: Boolean = { rowIterator.hasNext } TaskContext.get().addTaskCompletionListener[Unit] { _ => - if (arrowBuf != null) { + if (arrowBuf != null && arrowBuf.isOpen()) { arrowBuf.close() } } @@ -144,7 +142,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { isUnsafeRow = false } - if (arrowBuf != null && isUnsafeRow) { + if (isUnsafeRow) { val rowLength = new ListBuffer[Long]() var rowCount = 0 var offset = 0 @@ -154,7 +152,7 @@ case class ArrowRowToColumnarExec(child: SparkPlan) extends UnaryExecNode { val unsafeRow = firstRow.asInstanceOf[UnsafeRow] val sizeInBytes = unsafeRow.getSizeInBytes // allocate buffer based on 1st row - val estimatedBufSize = sizeInBytes * numRows * 1.2 + val estimatedBufSize = sizeInBytes.toDouble * numRows * 1.2 arrowBuf = allocator.buffer(estimatedBufSize.toLong) Platform.copyMemory(unsafeRow.getBaseObject, unsafeRow.getBaseOffset, null, arrowBuf.memoryAddress() + offset, sizeInBytes)