Skip to content

Commit

Permalink
feat: Use unified allocator for execution iterators (#613)
Browse files Browse the repository at this point in the history
* feat: Use unified allocator for execution iterators

* Disable CometTakeOrderedAndProjectExec

* Add comment

* Increase heap memory

* Enable CometTakeOrderedAndProjectExec

* More

* More

* Reduce heap memory

* Run sort merge join TPCDS with -e for debugging

* Add -X flag

* Disable q72 and q72-v2.7

* Update .github/workflows/benchmark.yml
  • Loading branch information
viirya authored Jul 10, 2024
1 parent 15e7baa commit 3370612
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 20 deletions.
11 changes: 11 additions & 0 deletions common/src/main/scala/org/apache/comet/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,19 @@ package org.apache

import java.util.Properties

import org.apache.arrow.memory.RootAllocator

package object comet {

/**
* The root allocator for Comet execution. Because Arrow Java memory management is based on
* reference counting, exposed arrays increase the reference count of the underlying buffers.
* Until the reference count is zero, the memory will not be released. If the consumer side is
* finished later than the close of the allocator, the allocator will think the memory is
* leaked. To avoid this, we use a single allocator for the whole execution process.
*/
val CometArrowAllocator = new RootAllocator(Long.MaxValue)

/**
* Provides access to build information about the Comet libraries. This will be used by the
* benchmarking software to provide the source revision and repository. In addition, the build
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@ package org.apache.comet.vector
import scala.collection.mutable

import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data}
import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.spark.SparkException
import org.apache.spark.sql.comet.util.Utils
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator

class NativeUtil {
import Utils._

private val allocator = new RootAllocator(Long.MaxValue)
.newChildAllocator(this.getClass.getSimpleName, 0, Long.MaxValue)
private val allocator = CometArrowAllocator
private val dictionaryProvider: CDataDictionaryProvider = new CDataDictionaryProvider
private val importer = new ArrowImporter(allocator)

Expand Down
12 changes: 5 additions & 7 deletions common/src/main/scala/org/apache/comet/vector/StreamReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ package org.apache.comet.vector

import java.nio.channels.ReadableByteChannel

import org.apache.arrow.memory.RootAllocator
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.ipc.{ArrowStreamReader, ReadChannel}
import org.apache.arrow.vector.ipc.message.MessageChannelReader
import org.apache.spark.sql.vectorized.ColumnarBatch

import org.apache.comet.CometArrowAllocator

/**
* A reader that consumes Arrow data from an input channel, and produces Comet batches.
*/
case class StreamReader(channel: ReadableByteChannel, source: String) extends AutoCloseable {
private var allocator = new RootAllocator(Long.MaxValue)
.newChildAllocator(s"${this.getClass.getSimpleName}/$source", 0, Long.MaxValue)
private val channelReader = new MessageChannelReader(new ReadChannel(channel), allocator)
private var arrowReader = new ArrowStreamReader(channelReader, allocator)
private val channelReader =
new MessageChannelReader(new ReadChannel(channel), CometArrowAllocator)
private var arrowReader = new ArrowStreamReader(channelReader, CometArrowAllocator)
private var root = arrowReader.getVectorSchemaRoot

def nextBatch(): Option[ColumnarBatch] = {
Expand All @@ -53,11 +53,9 @@ case class StreamReader(channel: ReadableByteChannel, source: String) extends Au
if (root != null) {
arrowReader.close()
root.close()
allocator.close()

arrowReader = null
root = null
allocator = null
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ import org.apache.comet.CometConf

class CometTPCDSQuerySuite
extends {
override val excludedTpcdsQueries: Set[String] = Set()

// This is private in `TPCDSBase` and `excludedTpcdsQueries` is private too.
// So we cannot override `excludedTpcdsQueries` to exclude the queries.
val tpcdsAllQueries: Seq[String] = Seq(
"q1",
"q2",
Expand Down Expand Up @@ -112,7 +108,9 @@ class CometTPCDSQuerySuite
"q69",
"q70",
"q71",
"q72",
// TODO: unknown failure (seems memory usage over Github action runner) in CI with q72 in
// https://github.com/apache/datafusion-comet/pull/613.
// "q72",
"q73",
"q74",
"q75",
Expand Down Expand Up @@ -141,9 +139,45 @@ class CometTPCDSQuerySuite
"q98",
"q99")

// TODO: enable the 3 queries after fixing the issues #1358.
override val tpcdsQueries: Seq[String] =
tpcdsAllQueries.filterNot(excludedTpcdsQueries.contains)
val tpcdsAllQueriesV2_7_0: Seq[String] = Seq(
"q5a",
"q6",
"q10a",
"q11",
"q12",
"q14",
"q14a",
"q18a",
"q20",
"q22",
"q22a",
"q24",
"q27a",
"q34",
"q35",
"q35a",
"q36a",
"q47",
"q49",
"q51a",
"q57",
"q64",
"q67a",
"q70a",
// TODO: unknown failure (seems memory usage over Github action runner) in CI with q72-v2.7
// in https://github.com/apache/datafusion-comet/pull/613.
// "q72",
"q74",
"q75",
"q77a",
"q78",
"q80a",
"q86a",
"q98")

override val tpcdsQueries: Seq[String] = tpcdsAllQueries

override val tpcdsQueriesV2_7_0: Seq[String] = tpcdsAllQueriesV2_7_0
}
with CometTPCDSQueryTestSuite
with ShimCometTPCDSQuerySuite {
Expand All @@ -157,9 +191,11 @@ class CometTPCDSQuerySuite
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "20g")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "15g")
conf.set(CometConf.COMET_SHUFFLE_ENFORCE_MODE_ENABLED.key, "true")
conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
conf.set(MEMORY_OFFHEAP_ENABLED.key, "true")
conf.set(MEMORY_OFFHEAP_SIZE.key, "20g")
conf.set(MEMORY_OFFHEAP_SIZE.key, "15g")
conf
}

Expand Down

0 comments on commit 3370612

Please sign in to comment.