Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-52]Columnar SMJ: fix memory leak by closing stream batches properly #55

Merged
merged 1 commit into from
Jan 25, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ class ColumnarSortMergeJoin(
}
build_cb = realbuildIter.next()
val beforeBuild = System.nanoTime()
// handle projection
val projectedBuildKeyCols: List[ArrowWritableColumnVector] = if (buildProjector != null) {
val builderOrdinalList = buildProjector.getOrdinalList
val builderAttributes = buildProjector.output
Expand All @@ -124,8 +123,9 @@ class ColumnarSortMergeJoin(
build_cb.column(i).asInstanceOf[ArrowWritableColumnVector]) ::: projectedBuildKeyCols
val build_rb =
ConverterUtils.createArrowRecordBatch(build_cb.numRows, buildCols.map(_.getValueVector))
(0 until build_cb.numCols).toList.foreach(i =>
build_cb.column(i).asInstanceOf[ArrowWritableColumnVector].retain())

(0 until buildCols.size).toList.foreach(i =>
buildCols(i).retain())
inputBatchHolder += build_cb
prober.evaluate(build_rb)
prepareTime += NANOSECONDS.toMillis(System.nanoTime() - beforeBuild)
Expand Down Expand Up @@ -167,7 +167,7 @@ class ColumnarSortMergeJoin(
val cb = realstreamIter.next()
last_cb = cb
val beforeJoin = System.nanoTime()
val stream_rb: ArrowRecordBatch = ConverterUtils.createArrowRecordBatch(cb)

val output_rb = if (cb.numRows > 0) {
val projectedStreamKeyCols: List[ArrowWritableColumnVector] =
if (streamProjector != null) {
Expand Down