Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable Comet broadcast by default #213

Merged
merged 13 commits into from
Apr 5, 2024

Conversation

viirya
Copy link
Member

@viirya viirya commented Mar 18, 2024

Which issue does this PR close?

Closes #212.
Closes #241.
Closes #243.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

Comment on lines 372 to 455
case plan
if isCometNative(plan) &&
plan.children.exists(_.isInstanceOf[BroadcastExchangeExec]) =>
val newChildren = plan.children.map {
case b: BroadcastExchangeExec
if isCometNative(b.child) &&
isCometOperatorEnabled(conf, "broadcastExchangeExec") =>
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the common operator enable config to control broadcast operator as other operators.

Comment on lines 558 to 560
val operatorDisabledFlag = s"$COMET_EXEC_CONFIG_PREFIX.$operator.disabled"
conf.getConfString(operatorFlag, "false").toBoolean || isCometAllOperatorEnabled(conf) &&
!conf.getConfString(operatorDisabledFlag, "false").toBoolean
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added to be able disable certain operator specially.

@viirya viirya force-pushed the remove_broadcast_config branch 2 times, most recently from a24c3b5 to 86167fd Compare March 18, 2024 04:35
@viirya viirya changed the title feat: Remove COMET_EXEC_BROADCAST_ENABLED feat: Enable Comet broadcast by default Mar 18, 2024
@codecov-commenter
Copy link

codecov-commenter commented Mar 18, 2024

Codecov Report

Attention: Patch coverage is 38.23529% with 63 lines in your changes are missing coverage. Please review.

Project coverage is 33.58%. Comparing base (aa6ddc5) to head (32f3ae1).
Report is 2 commits behind head on main.

Files Patch % Lines
.../scala/org/apache/spark/sql/comet/util/Utils.scala 0.00% 38 Missing ⚠️
...n/scala/org/apache/spark/sql/comet/operators.scala 59.25% 5 Missing and 6 partials ⚠️
...e/spark/sql/comet/CometBroadcastExchangeExec.scala 41.17% 9 Missing and 1 partial ⚠️
...org/apache/comet/CometSparkSessionExtensions.scala 77.77% 1 Missing and 3 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main     #213      +/-   ##
============================================
+ Coverage     33.48%   33.58%   +0.09%     
- Complexity      776      780       +4     
============================================
  Files           108      107       -1     
  Lines         37178    37211      +33     
  Branches       8146     8160      +14     
============================================
+ Hits          12448    12496      +48     
+ Misses        22107    22076      -31     
- Partials       2623     2639      +16     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Contributor

@snmvaughan snmvaughan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@viirya
Copy link
Member Author

viirya commented Mar 19, 2024

cc @sunchao

@viirya
Copy link
Member Author

viirya commented Mar 26, 2024

cc @sunchao Please take a look. Thanks.

Copy link
Member

@sunchao sunchao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM (pending CI)

@viirya
Copy link
Member Author

viirya commented Mar 29, 2024

A few tests seem needed to be updated. Let me take a look.

Comment on lines 98 to 112
batches.map { batch =>
val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
val cbbos = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate)
val out = new DataOutputStream(codec.compressedOutputStream(cbbos))

val count = new NativeUtil().serializeBatches(iter, out)
val (fieldVectors, batchProviderOpt) = nativeUtil.getBatchFieldVectors(batch)
val root = new VectorSchemaRoot(fieldVectors.asJava)
val provider = batchProviderOpt.getOrElse(nativeUtil.getDictionaryProvider)

val writer = new ArrowStreamWriter(root, provider, Channels.newChannel(out))
writer.start()
writer.writeBatch()

root.clear()
writer.end()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously serializeBatches is wrong which serializes all batches with a ArrowStreamWriter. It causes wrong results when serializing dictionary arrays, i.e., #241.

Each batch could have different dictionary provider content. But when ArrowStreamWriter starts to serialize, it writes out dictionaries at the beginning. So later batch will use incorrect dictionary value.

@@ -191,7 +193,7 @@ case class CometBroadcastExchangeExec(originalPlan: SparkPlan, child: SparkPlan)
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
val broadcasted = executeBroadcast[Array[ChunkedByteBuffer]]()

new CometBatchRDD(sparkContext, broadcasted.value.length, broadcasted)
new CometBatchRDD(sparkContext, childRDD.getNumPartitions, broadcasted)
Copy link
Member Author

@viirya viirya Apr 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The broadcast RDD must have same number of partitions as child RDD. Previously we serialize all batches in one partition into a ChunkedByteBuffer, so broadcasted.value.length is the number of partitions. Now it is changed to serialize one batch in one ChunkedByteBuffer, so we need to use the correct number.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update. Child RDD partition number may also not be same as the zipping side. We need to get the number of partition of zipping side when triggering this execution method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is described in #243.

Comment on lines +187 to +192
def serializeBatches(batches: Iterator[ColumnarBatch]): Iterator[(Long, ChunkedByteBuffer)] = {
batches.map { batch =>
val dictionaryProvider: CDataDictionaryProvider = new CDataDictionaryProvider

val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
val cbbos = new ChunkedByteBufferOutputStream(1024 * 1024, ByteBuffer.allocate)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to move serializeBatches into spark package because ChunkedByteBufferOutputStream is a spark private class. I cannot move serializeBatches to spark module because it uses arrow packages (we shade arrow in common module).

@viirya
Copy link
Member Author

viirya commented Apr 5, 2024

Alright. I fixed all bugs (#241, #243) around broadcast and now all CIs are passed. I will go to merge later today.

@viirya viirya merged commit d76c113 into apache:main Apr 5, 2024
29 checks passed
@viirya
Copy link
Member Author

viirya commented Apr 5, 2024

Merged. Thanks.

szehon-ho pushed a commit to szehon-ho/spark that referenced this pull request Aug 7, 2024
himadripal pushed a commit to himadripal/datafusion-comet that referenced this pull request Sep 7, 2024
* feat: Remove COMET_EXEC_BROADCAST_ENABLED

* Fix

* Fix

* Update plan stability

* Fix

* Remove unused import and class

* Fix

* Remove unused imports

* Fix

* Fix scala style

* fix

* Fix

* Update diff
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants