Skip to content

Commit

Permalink
feat: Enable columnar shuffle by default (apache#250)
Browse files Browse the repository at this point in the history
* feat: Enable columnar shuffle by default

* Update plan stability

* Fix

* Update diff

* Add Comet memoryoverhead for Spark SQL tests

* Update plan stability

* Update diff

* Update more diff

* Update DataFusion commit

* Update diff

* Update diff

* Update diff

* Update diff

* Update diff

* Fix more tests

* Fix more

* Fix

* Fix more

* Fix more

* Fix more

* Fix more

* Fix more

* Update diff

* Fix memory leak

* Update plan stability

* Restore diff

* Update core/src/execution/datafusion/planner.rs

Co-authored-by: Andy Grove <andygrove73@gmail.com>

* Update core/src/execution/datafusion/planner.rs

Co-authored-by: Andy Grove <andygrove73@gmail.com>

* Fix style

* Use ShuffleExchangeLike instead

---------

Co-authored-by: Andy Grove <andygrove73@gmail.com>
  • Loading branch information
2 people authored and Steve Vaughan committed May 9, 2024
1 parent 86582b9 commit 92e975d
Show file tree
Hide file tree
Showing 10 changed files with 1,207 additions and 177 deletions.
16 changes: 8 additions & 8 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ object CometConf {
.booleanConf
.createWithDefault(false)

val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] = conf(
"spark.comet.columnar.shuffle.enabled")
.doc(
"Force Comet to only use columnar shuffle for CometScan and Spark regular operators. " +
"If this is enabled, Comet native shuffle will not be enabled but only Arrow shuffle. " +
"By default, this config is false.")
.booleanConf
.createWithDefault(false)
val COMET_COLUMNAR_SHUFFLE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.columnar.shuffle.enabled")
.doc(
"Whether to enable Arrow-based columnar shuffle for Comet and Spark regular operators. " +
"If this is enabled, Comet prefers columnar shuffle than native shuffle. " +
"By default, this config is true.")
.booleanConf
.createWithDefault(true)

val COMET_SHUFFLE_ENFORCE_MODE_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.shuffle.enforceMode.enabled")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class NativeUtil {
val arrowArray = ArrowArray.allocateNew(allocator)
Data.exportVector(
allocator,
getFieldVector(valueVector),
getFieldVector(valueVector, "export"),
provider,
arrowArray,
arrowSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ object Utils {
}
}

getFieldVector(valueVector)
getFieldVector(valueVector, "serialize")

case c =>
throw new SparkException(
Expand All @@ -253,14 +253,15 @@ object Utils {
(fieldVectors, provider)
}

def getFieldVector(valueVector: ValueVector): FieldVector = {
def getFieldVector(valueVector: ValueVector, reason: String): FieldVector = {
valueVector match {
case v @ (_: BitVector | _: TinyIntVector | _: SmallIntVector | _: IntVector |
_: BigIntVector | _: Float4Vector | _: Float8Vector | _: VarCharVector |
_: DecimalVector | _: DateDayVector | _: TimeStampMicroTZVector | _: VarBinaryVector |
_: FixedSizeBinaryVector | _: TimeStampMicroVector) =>
v.asInstanceOf[FieldVector]
case _ => throw new SparkException(s"Unsupported Arrow Vector: ${valueVector.getClass}")
case _ =>
throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}")
}
}
}
Loading

0 comments on commit 92e975d

Please sign in to comment.