From 4e4296deb78ad11bb5fea630257046207083b891 Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Fri, 22 Jan 2021 19:28:19 +0800 Subject: [PATCH] Columnar SMJ: fix memory leak by closing stream batches properly Signed-off-by: Yuan Zhou --- .../com/intel/oap/expression/ColumnarSortMergeJoin.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/com/intel/oap/expression/ColumnarSortMergeJoin.scala b/core/src/main/scala/com/intel/oap/expression/ColumnarSortMergeJoin.scala index 8f4bdb1eb..8009d05a7 100644 --- a/core/src/main/scala/com/intel/oap/expression/ColumnarSortMergeJoin.scala +++ b/core/src/main/scala/com/intel/oap/expression/ColumnarSortMergeJoin.scala @@ -109,7 +109,6 @@ class ColumnarSortMergeJoin( } build_cb = realbuildIter.next() val beforeBuild = System.nanoTime() - // handle projection val projectedBuildKeyCols: List[ArrowWritableColumnVector] = if (buildProjector != null) { val builderOrdinalList = buildProjector.getOrdinalList val builderAttributes = buildProjector.output @@ -124,8 +123,9 @@ class ColumnarSortMergeJoin( build_cb.column(i).asInstanceOf[ArrowWritableColumnVector]) ::: projectedBuildKeyCols val build_rb = ConverterUtils.createArrowRecordBatch(build_cb.numRows, buildCols.map(_.getValueVector)) - (0 until build_cb.numCols).toList.foreach(i => - build_cb.column(i).asInstanceOf[ArrowWritableColumnVector].retain()) + + (0 until buildCols.size).toList.foreach(i => + buildCols(i).retain()) inputBatchHolder += build_cb prober.evaluate(build_rb) prepareTime += NANOSECONDS.toMillis(System.nanoTime() - beforeBuild) @@ -167,7 +167,7 @@ class ColumnarSortMergeJoin( val cb = realstreamIter.next() last_cb = cb val beforeJoin = System.nanoTime() - val stream_rb: ArrowRecordBatch = ConverterUtils.createArrowRecordBatch(cb) + val output_rb = if (cb.numRows > 0) { val projectedStreamKeyCols: List[ArrowWritableColumnVector] = if (streamProjector != null) {