diff --git a/common/src/main/scala/org/apache/comet/package.scala b/common/src/main/scala/org/apache/comet/package.scala index c9aca7538..f44139ba6 100644 --- a/common/src/main/scala/org/apache/comet/package.scala +++ b/common/src/main/scala/org/apache/comet/package.scala @@ -21,8 +21,19 @@ package org.apache import java.util.Properties +import org.apache.arrow.memory.RootAllocator + package object comet { + /** + * The root allocator for Comet execution. Because Arrow Java memory management is based on + * reference counting, exposed arrays increase the reference count of the underlying buffers. + * Until the reference count is zero, the memory will not be released. If the consumer side is + * finished later than the close of the allocator, the allocator will think the memory is + * leaked. To avoid this, we use a single allocator for the whole execution process. + */ + val CometArrowAllocator = new RootAllocator(Long.MaxValue) + /** * Provides access to build information about the Comet libraries. This will be used by the * benchmarking software to provide the source revision and repository. In addition, the build diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index 595c0a427..89f79c9cd 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -22,18 +22,18 @@ package org.apache.comet.vector import scala.collection.mutable import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data} -import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.dictionary.DictionaryProvider import org.apache.spark.SparkException import org.apache.spark.sql.comet.util.Utils import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.comet.CometArrowAllocator + class NativeUtil { import Utils._ - private val allocator = new RootAllocator(Long.MaxValue) - .newChildAllocator(this.getClass.getSimpleName, 0, Long.MaxValue) + private val allocator = CometArrowAllocator private val dictionaryProvider: CDataDictionaryProvider = new CDataDictionaryProvider private val importer = new ArrowImporter(allocator) diff --git a/common/src/main/scala/org/apache/comet/vector/StreamReader.scala b/common/src/main/scala/org/apache/comet/vector/StreamReader.scala index 4a08f0521..b8106a96e 100644 --- a/common/src/main/scala/org/apache/comet/vector/StreamReader.scala +++ b/common/src/main/scala/org/apache/comet/vector/StreamReader.scala @@ -21,20 +21,20 @@ package org.apache.comet.vector import java.nio.channels.ReadableByteChannel -import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.VectorSchemaRoot import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel} import org.apache.arrow.vector.ipc.message.MessageChannelReader import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.comet.CometArrowAllocator + /** * A reader that consumes Arrow data from an input channel, and produces Comet batches. */ case class StreamReader(channel: ReadableByteChannel, source: String) extends AutoCloseable { - private var allocator = new RootAllocator(Long.MaxValue) - .newChildAllocator(s"${this.getClass.getSimpleName}/$source", 0, Long.MaxValue) - private val channelReader = new MessageChannelReader(new ReadChannel(channel), allocator) - private var arrowReader = new ArrowStreamReader(channelReader, allocator) + private val channelReader = + new MessageChannelReader(new ReadChannel(channel), CometArrowAllocator) + private var arrowReader = new ArrowStreamReader(channelReader, CometArrowAllocator) private var root = arrowReader.getVectorSchemaRoot def nextBatch(): Option[ColumnarBatch] = { @@ -53,11 +53,9 @@ case class StreamReader(channel: ReadableByteChannel, source: String) extends Au if (root != null) { arrowReader.close() root.close() - allocator.close() arrowReader = null root = null - allocator = null } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala index 3e0f64522..6eeb7e334 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTPCDSQuerySuite.scala @@ -27,10 +27,6 @@ import org.apache.comet.CometConf class CometTPCDSQuerySuite extends { - override val excludedTpcdsQueries: Set[String] = Set() - - // This is private in `TPCDSBase` and `excludedTpcdsQueries` is private too. - // So we cannot override `excludedTpcdsQueries` to exclude the queries. val tpcdsAllQueries: Seq[String] = Seq( "q1", "q2", @@ -112,7 +108,9 @@ class CometTPCDSQuerySuite "q69", "q70", "q71", - "q72", + // TODO: unknown failure (seems memory usage over Github action runner) in CI with q72 in + // https://github.com/apache/datafusion-comet/pull/613. + // "q72", "q73", "q74", "q75", @@ -141,9 +139,45 @@ class CometTPCDSQuerySuite "q98", "q99") - // TODO: enable the 3 queries after fixing the issues #1358. - override val tpcdsQueries: Seq[String] = - tpcdsAllQueries.filterNot(excludedTpcdsQueries.contains) + val tpcdsAllQueriesV2_7_0: Seq[String] = Seq( + "q5a", + "q6", + "q10a", + "q11", + "q12", + "q14", + "q14a", + "q18a", + "q20", + "q22", + "q22a", + "q24", + "q27a", + "q34", + "q35", + "q35a", + "q36a", + "q47", + "q49", + "q51a", + "q57", + "q64", + "q67a", + "q70a", + // TODO: unknown failure (seems memory usage over Github action runner) in CI with q72-v2.7 + // in https://github.com/apache/datafusion-comet/pull/613. + // "q72", + "q74", + "q75", + "q77a", + "q78", + "q80a", + "q86a", + "q98") + + override val tpcdsQueries: Seq[String] = tpcdsAllQueries + + override val tpcdsQueriesV2_7_0: Seq[String] = tpcdsAllQueriesV2_7_0 } with CometTPCDSQueryTestSuite with ShimCometTPCDSQuerySuite { @@ -157,9 +191,11 @@ class CometTPCDSQuerySuite conf.set(CometConf.COMET_EXEC_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true") conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") - conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "20g") + conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "15g") + conf.set(CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key, "true") + conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") conf.set(MEMORY_OFFHEAP_ENABLED.key, "true") - conf.set(MEMORY_OFFHEAP_SIZE.key, "20g") + conf.set(MEMORY_OFFHEAP_SIZE.key, "15g") conf }