Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-45] BHJ memory leak (#50)
Browse files Browse the repository at this point in the history
* [NSE-45] BHJ memory leak

* Unclosed vectors

* More test rounds

* CI: Enable Jemalloc for JVM

* Jemalloc fix

* Jemalloc fix

* CI: Call malloc_trim

* Minor optimization

* Update tpch.yml

* Suppress cleaner errors

* Inner class initializer fix
  • Loading branch information
zhztheplayer authored Feb 1, 2021
1 parent c14a9a6 commit 5c00073
Show file tree
Hide file tree
Showing 18 changed files with 208 additions and 182 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/report_ram_log.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ jobs:
git clone https://github.com/oap-project/arrow-data-source.git
cd arrow-data-source
mvn clean install -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DskipTests
- run: |
- name: Run Maven tests
run: |
cd core/
mvn test -B -DmembersOnlySuites=com.intel.oap.tpch -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DtagsToInclude=com.intel.oap.tags.CommentOnContextPR -Dexec.skip=true
env:
Expand Down
10 changes: 6 additions & 4 deletions .github/workflows/tpch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
git clone https://github.com/intel-bigdata/arrow.git
cd arrow && git checkout oap-master && cd cpp
mkdir build && cd build
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON && make -j2
cmake .. -DARROW_JNI=ON -DARROW_GANDIVA_JAVA=ON -DARROW_GANDIVA=ON -DARROW_PARQUET=ON -DARROW_HDFS=ON -DARROW_FILESYSTEM=ON -DARROW_WITH_SNAPPY=ON -DARROW_JSON=ON -DARROW_DATASET=ON -DARROW_WITH_LZ4=ON -DARROW_JEMALLOC=OFF && make -j2
sudo make install
cd ../../java
mvn clean install -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -P arrow-jni -am -Darrow.cpp.build.dir=/tmp/arrow/cpp/build/release/ -DskipTests -Dcheckstyle.skip
Expand All @@ -53,11 +53,13 @@ jobs:
git clone https://github.com/oap-project/arrow-data-source.git
cd arrow-data-source
mvn clean install -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn
- run: |
- name: Run Maven tests
run: |
cd core/
mvn test -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DmembersOnlySuites=com.intel.oap.tpch -DtagsToInclude=com.intel.oap.tags.TestAndWriteLogs
mvn test -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn -DmembersOnlySuites=com.intel.oap.tpch -DtagsToInclude=com.intel.oap.tags.TestAndWriteLogs -DargLine="-Xmx1G -XX:MaxDirectMemorySize=500M -Dio.netty.allocator.numDirectArena=1"
env:
MAVEN_OPTS: "-Xmx2048m"
MALLOC_ARENA_MAX: "4"
MAVEN_OPTS: "-Xmx1G"
COMMENT_TEXT_OUTPUT_PATH: "/tmp/comment_text.txt"
COMMENT_IMAGE_OUTPUT_PATH: "/tmp/comment_image.png"
ENABLE_TPCH_TESTS: "true"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,12 @@ public final class ArrowWritableColumnVector extends WritableColumnVector {
private int ordinal;
private ValueVector vector;
private ValueVector dictionaryVector;
private static BufferAllocator OffRecordAllocator = null;
private static BufferAllocator OffRecordAllocator = SparkMemoryUtils.globalAllocator();

public static BufferAllocator getAllocator() {
return SparkMemoryUtils.contextAllocator();
}
public static BufferAllocator getOffRecordAllocator() {
if (OffRecordAllocator == null) {
OffRecordAllocator = new RootAllocator(Long.MAX_VALUE);
}
return OffRecordAllocator;
}
public static AtomicLong vectorCount = new AtomicLong(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.arrow.vector.ipc.message.ArrowBuffer;
import org.apache.arrow.vector.ipc.message.MessageSerializer;

public class BatchIterator {
public class BatchIterator implements AutoCloseable {
private native boolean nativeHasNext(long nativeHandler);
private native ArrowRecordBatchBuilder nativeNext(long nativeHandler);
private native MetricsObject nativeFetchMetrics(long nativeHandler);
Expand Down Expand Up @@ -89,7 +89,7 @@ public SerializableObject nextHashRelationObject()
return null;
}
NativeSerializableObject obj = nativeNextHashRelation(nativeHandler);
SerializableObject objImpl = new SerializableObject(obj);
SerializableObject objImpl = new SerializableObject(obj, new AutoCloseable[]{this});
return objImpl;
}

Expand Down Expand Up @@ -196,6 +196,7 @@ public void setDependencies(BatchIterator[] dependencies) {
nativeSetDependencies(nativeHandler, instanceIdList);
}

@Override
public void close() {
if (!closed) {
nativeClose(nativeHandler);
Expand Down
36 changes: 19 additions & 17 deletions core/src/main/java/com/intel/oap/vectorized/SerializableObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.arrow.util.AutoCloseables;

/** ArrowBufBuilder. */
public class SerializableObject implements Externalizable, KryoSerializable {
public int total_size;
public int[] size;
private ByteBuf[] directAddrs;
private ByteBufAllocator allocator;

private transient AutoCloseable[] resources = null;

public SerializableObject() {}

Expand All @@ -44,33 +46,28 @@ public SerializableObject() {}
* @param memoryAddress native ArrowBuf data addr.
* @param size ArrowBuf size.
*/
public SerializableObject(long[] memoryAddress, int[] size) throws IOException {
public SerializableObject(long[] memoryAddress, int[] size, AutoCloseable[] resources) throws IOException {
this.total_size = 0;
this.size = size;
directAddrs = new ByteBuf[size.length];
for (int i = 0; i < size.length; i++) {
this.total_size += size[i];
directAddrs[i] = Unpooled.wrappedBuffer(memoryAddress[i], size[i], false);
}
this.resources = resources;
}

/**
* Create an instance for NativeSerializableObject.
*
* @param memoryAddress native ArrowBuf data addr.
* @param size ArrowBuf size.
*/
public SerializableObject(NativeSerializableObject obj)
public SerializableObject(NativeSerializableObject obj, AutoCloseable[] resources)
throws IOException, ClassNotFoundException {
this(obj.memoryAddress, obj.size);
this(obj.memoryAddress, obj.size, resources);
}

@Override
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
this.total_size = in.readInt();
int size_len = in.readInt();
this.size = (int[]) in.readObject();
allocator = UnpooledByteBufAllocator.DEFAULT;
ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
directAddrs = new ByteBuf[size_len];
for (int i = 0; i < size.length; i++) {
directAddrs[i] = allocator.directBuffer(size[i], size[i]);
Expand Down Expand Up @@ -104,7 +101,7 @@ public void read(Kryo kryo, Input in) {
this.total_size = in.readInt();
int size_len = in.readInt();
this.size = in.readInts(size_len);
allocator = UnpooledByteBufAllocator.DEFAULT;
ByteBufAllocator allocator = UnpooledByteBufAllocator.DEFAULT;
directAddrs = new ByteBuf[size_len];
for (int i = 0; i < size.length; i++) {
directAddrs[i] = allocator.directBuffer(size[i], size[i]);
Expand Down Expand Up @@ -141,6 +138,7 @@ public void write(Kryo kryo, Output out) {

public void close() {
releaseDirectMemory();
releaseResources();
}

public long[] getDirectMemoryAddrs() throws IOException {
Expand All @@ -154,18 +152,22 @@ public long[] getDirectMemoryAddrs() throws IOException {
return addrs;
}

public void releaseDirectMemory() {
private void releaseDirectMemory() {
if (directAddrs != null) {
for (int i = 0; i < directAddrs.length; i++) {
directAddrs[i].release();
}
}
}

public int getRefCnt() {
if (directAddrs != null) {
return directAddrs[0].refCnt();
private void releaseResources() {
if (resources == null) {
return;
}
try {
AutoCloseables.close(resources);
} catch (Exception e) {
throw new RuntimeException(e);
}
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class ColumnarPluginConfig(conf: SparkConf) {
conf.getBoolean("spark.oap.sql.columnar.wholestagecodegen.breakdownTime", defaultValue = false)
val tmpFile: String =
conf.getOption("spark.sql.columnar.tmp_dir").getOrElse(null)
val broadcastCacheTimeout: Int =
@deprecated val broadcastCacheTimeout: Int =
conf.getInt("spark.sql.columnar.sort.broadcast.cache.timeout", defaultValue = -1)
val hashCompare: Boolean =
conf.getBoolean("spark.oap.sql.columnar.hashCompare", defaultValue = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,11 @@

package com.intel.oap.execution

import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.expression.ConverterUtils
import com.intel.oap.vectorized.CloseableColumnBatchIterator
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch

private final case class BroadcastColumnarRDDPartition(index: Int) extends Partition
Expand All @@ -41,9 +37,7 @@ case class BroadcastColumnarRDD(
(0 until numPartitioning).map { index => new BroadcastColumnarRDDPartition(index) }.toArray
}
override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
val timeout: Int = SQLConf.get.broadcastTimeout.toInt
val relation = inputByteBuf.value.asReadOnlyCopy
relation.countDownClose(timeout)
new CloseableColumnBatchIterator(relation.getColumnarBatchAsIter)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,26 @@

package com.intel.oap.execution

import java.io.{ByteArrayInputStream, ObjectInputStream}
import java.nio.ByteBuffer
import java.util.concurrent.TimeUnit._

import com.intel.oap.vectorized._
import com.google.common.collect.Lists
import com.intel.oap.ColumnarPluginConfig
import org.apache.spark.TaskContext
import com.intel.oap.expression._
import com.intel.oap.vectorized.{ExpressionEvaluator, _}
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.vector.types.pojo.{ArrowType, Field}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.{UserAddedJarUtils, Utils, ExecutorManager}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashJoin}
import org.apache.spark.sql.execution.metric.SQLMetrics

import scala.collection.JavaConverters._
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.BoundReference
import org.apache.spark.sql.catalyst.expressions.BindReferences._
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.util.{ExecutorManager, UserAddedJarUtils}

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
import org.apache.arrow.vector.ipc.message.ArrowFieldNode
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch
import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.arrow.vector.types.pojo.Field
import org.apache.arrow.vector.types.pojo.Schema
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.gandiva.evaluator._
import io.netty.buffer.ArrowBuf
import io.netty.buffer.ByteBuf
import com.google.common.collect.Lists
import com.intel.oap.expression._
import com.intel.oap.vectorized.ExpressionEvaluator
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide, HashJoin}
import org.apache.spark.sql.types.{StructField, StructType}

/**
* Performs a hash join of two child relations by first shuffling the data using the join keys.
Expand Down Expand Up @@ -260,8 +239,6 @@ case class ColumnarBroadcastHashJoinExec(
var eval_elapse: Long = 0
val buildInputByteBuf = buildPlan.executeBroadcast[ColumnarHashedRelation]()

val timeout = ColumnarPluginConfig.getConf(sparkConf).broadcastCacheTimeout

streamedPlan.executeColumnar().mapPartitions { iter =>
ExecutorManager.tryTaskSet(numaBindingInfo)
val hashRelationKernel = new ExpressionEvaluator()
Expand All @@ -283,6 +260,7 @@ case class ColumnarBroadcastHashJoinExec(
Field.nullable("result", new ArrowType.Int(32, true)))
hashRelationKernel.build(hash_relation_schema, Lists.newArrayList(hash_relation_expr), true)
val hashRelationResultIterator = hashRelationKernel.finishByIterator()

// we need to set original recordBatch to hashRelationKernel
var numRows = 0
while (depIter.hasNext) {
Expand Down Expand Up @@ -328,7 +306,6 @@ case class ColumnarBroadcastHashJoinExec(
hashRelationResultIterator.close
nativeKernel.close
nativeIterator.close
relation.countDownClose(timeout)
}

// now we can return this wholestagecodegen iter
Expand Down Expand Up @@ -451,7 +428,6 @@ case class ColumnarBroadcastHashJoinExec(
val listJars = uploadAndListJars(signature)
val buildInputByteBuf = buildPlan.executeBroadcast[ColumnarHashedRelation]()
val hashRelationBatchHolder: ListBuffer[ColumnarBatch] = ListBuffer()
val timeout = ColumnarPluginConfig.getConf(sparkConf).broadcastCacheTimeout

streamedPlan.executeColumnar().mapPartitions { streamIter =>
ExecutorManager.tryTaskSet(numaBindingInfo)
Expand Down Expand Up @@ -518,7 +494,6 @@ case class ColumnarBroadcastHashJoinExec(
hashRelationResultIterator.close
nativeKernel.close
nativeIterator.close
relation.countDownClose(timeout)
}
val resultStructType = ArrowUtils.fromArrowSchema(resCtx.outputSchema)
val res = new Iterator[ColumnarBatch] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,6 @@ case class ColumnarShuffledHashJoinExec(

val signature = getCodeGenSignature(hashTableType)
val listJars = uploadAndListJars(signature)
val timeout = ColumnarPluginConfig.getConf(sparkConf).broadcastCacheTimeout
val hashRelationBatchHolder: ListBuffer[ColumnarBatch] = ListBuffer()

if (hashTableType == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,25 @@

package com.intel.oap.execution

import java.io.{ByteArrayInputStream, ObjectInputStream}
import com.google.common.collect.Lists
import com.intel.oap.ColumnarPluginConfig
import com.intel.oap.execution._
import com.intel.oap.expression._
import com.intel.oap.vectorized._
import org.apache.spark._
import com.intel.oap.vectorized.{BatchIterator, ExpressionEvaluator, _}
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.util.{UserAddedJarUtils, Utils, ExecutorManager}
import org.apache.arrow.gandiva.expression._
import org.apache.arrow.vector.types.pojo.ArrowType
import org.apache.arrow.vector.types.pojo.Field
import org.apache.arrow.vector.types.pojo.Schema
import com.intel.oap.vectorized.ExpressionEvaluator
import com.intel.oap.vectorized.BatchIterator
import com.google.common.collect.Lists
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.util.ArrowUtils
import org.apache.spark.sql.vectorized.{ColumnVector, ColumnarBatch}
import org.apache.spark.util.{ExecutorManager, UserAddedJarUtils}

import scala.collection.mutable.ListBuffer
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

case class ColumnarCodegenContext(inputSchema: Schema, outputSchema: Schema, root: TreeNode) {}

Expand Down Expand Up @@ -264,7 +256,6 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I

val numOutputBatches = child.longMetric("numOutputBatches")
val pipelineTime = longMetric("pipelineTime")
val timeout = ColumnarPluginConfig.getConf(sparkConf).broadcastCacheTimeout

var build_elapse: Long = 0
var eval_elapse: Long = 0
Expand Down Expand Up @@ -523,7 +514,7 @@ case class ColumnarWholeStageCodegenExec(child: SparkPlan)(val codegenStageId: I
dependentKernelIterators.foreach(_.close)
nativeKernel.close
nativeIterator.close
relationHolder.foreach(r => r.countDownClose(timeout))
relationHolder.clear()
}
SparkMemoryUtils.addLeakSafeTaskCompletionListener[Unit](_ => {
close
Expand Down
Loading

0 comments on commit 5c00073

Please sign in to comment.