-
Notifications
You must be signed in to change notification settings - Fork 931
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ARROW] Arrow serialization should not introduce extra shuffle for outermost limit #4662
Conversation
|
||
def executeArrowBatchCollect: SparkPlan => Array[Array[Byte]] = { | ||
case adaptiveSparkPlan: AdaptiveSparkPlanExec => | ||
executeArrowBatchCollect(adaptiveSparkPlan.finalPhysicalPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the AdaptiveSparkPlanExec.finalPhysicalPlan
function was introduced in SPARK-41914, and it may present compatibility issues if the underlying Spark runtime lacks the corresponding patch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we reflect some related private method to workaround ? it's unacceptable if we break the compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have changed to reflective call function adaptiveSparkPlan.finalPhysicalPlan
.
} | ||
i += 1 | ||
} | ||
result.toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add offset support in the separate PR to adapt Spark-3.4.0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we unfiy the class name a bit more ? not see the difference between xxxutils and xxx helper... logically, most of those methods should be private.
estimatedBatchSize += (row match { | ||
case ur: UnsafeRow => ur.getSizeInBytes | ||
// Trying to estimate the size of the current row, assuming 16 bytes per value. | ||
case ir: InternalRow => ir.numFields * 16 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in general, we can infer row size by schema.defaultSize
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry for the lack of documentation.
This class ArrowBatchIterator
is derived from org.apache.spark.sql.execution.arrow.ArrowConverters.ArrowBatchWithSchemaIterator
, with two key differences:
- there is no requirement to write the schema at the batch header
- iteration halts when
rowCount
equalslimit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here is the diff, compare with latest spark master branch https://github.com/apache/spark/blob/3c189abd73afa998e8573cbfdaf0f72445284314/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
- private[sql] class ArrowBatchWithSchemaIterator(
+ private[sql] class ArrowBatchIterator(
rowIter: Iterator[InternalRow],
schema: StructType,
maxRecordsPerBatch: Long,
maxEstimatedBatchSize: Long,
+ limit: Long,
timeZoneId: String,
context: TaskContext)
- extends ArrowBatchIterator(
- rowIter, schema, maxRecordsPerBatch, timeZoneId, context) {
+ extends Iterator[Array[Byte]] {
+
- private val arrowSchemaSize = SizeEstimator.estimate(arrowSchema)
var rowCountInLastBatch: Long = 0
+ var rowCount: Long = 0
override def next(): Array[Byte] = {
val out = new ByteArrayOutputStream()
val writeChannel = new WriteChannel(Channels.newChannel(out))
rowCountInLastBatch = 0
- var estimatedBatchSize = arrowSchemaSize
+ var estimatedBatchSize = 0
Utils.tryWithSafeFinally {
- // Always write the schema.
- MessageSerializer.serialize(writeChannel, arrowSchema)
// Always write the first row.
while (rowIter.hasNext && (
@@ -31,15 +30,17 @@
estimatedBatchSize < maxEstimatedBatchSize ||
// If the size of rows are 0 or negative, unlimit it.
maxRecordsPerBatch <= 0 ||
- rowCountInLastBatch < maxRecordsPerBatch)) {
+ rowCountInLastBatch < maxRecordsPerBatch ||
+ rowCount < limit)) {
val row = rowIter.next()
arrowWriter.write(row)
estimatedBatchSize += (row match {
case ur: UnsafeRow => ur.getSizeInBytes
- // Trying to estimate the size of the current row, assuming 16 bytes per value.
- case ir: InternalRow => ir.numFields * 16
+ // Trying to estimate the size of the current row
+ case _: InternalRow => schema.defaultSize
})
rowCountInLastBatch += 1
+ rowCount += 1
}
arrowWriter.finish()
val batch = unloader.getRecordBatch()
...k-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersHelper.scala
Outdated
Show resolved
Hide resolved
|
||
def executeArrowBatchCollect: SparkPlan => Array[Array[Byte]] = { | ||
case adaptiveSparkPlan: AdaptiveSparkPlanExec => | ||
executeArrowBatchCollect(adaptiveSparkPlan.finalPhysicalPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we reflect some related private method to workaround ? it's unacceptable if we break the compatibility.
.../kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
Outdated
Show resolved
Hide resolved
Codecov Report
@@ Coverage Diff @@
## master #4662 +/- ##
============================================
+ Coverage 57.60% 57.85% +0.25%
Complexity 13 13
============================================
Files 579 580 +1
Lines 31951 32212 +261
Branches 4269 4304 +35
============================================
+ Hits 18404 18635 +231
+ Misses 11785 11780 -5
- Partials 1762 1797 +35
... and 23 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
have completed the refactoring, please take another look when you have time. Thank you. @ulysses-you |
...k-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
Outdated
Show resolved
Hide resolved
.../src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
Outdated
Show resolved
Hide resolved
.../kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
Outdated
Show resolved
Hide resolved
thank you @cfmcgrady , lgtm if tests pass |
.../src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
Outdated
Show resolved
Hide resolved
...k-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
Outdated
Show resolved
Hide resolved
.../kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
Show resolved
Hide resolved
.../src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
Outdated
Show resolved
Hide resolved
...k-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
Show resolved
Hide resolved
...k-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
Show resolved
Hide resolved
...k-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
Outdated
Show resolved
Hide resolved
val in = new ByteArrayInputStream(bytes) | ||
val out = new ByteArrayOutputStream(bytes.length) | ||
|
||
val rootAllocator = ArrowUtils.rootAllocator.newChildAllocator( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name "rootAllocator" is not suitable then, and why should we create a child allocator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's for debugging.
Memory was leaked by query. Memory leaked: (128)
Allocator(slice) 0/128/128/9223372036854775807 (res/actual/peak/limit)
java.lang.IllegalStateException: Memory was leaked by query. Memory leaked: (128)
Allocator(slice) 0/128/128/9223372036854775807 (res/actual/peak/limit)
at org.apache.arrow.memory.BaseAllocator.close(BaseAllocator.java:437)
at org.apache.spark.sql.execution.arrow.KyuubiArrowConverters$.slice(KyuubiArrowConverters.scala:91)
at org.apache.spark.sql.kyuubi.SparkDatasetHelper$.doCollectLimit(SparkDatasetHelper.scala:170)
at org.apache.spark.sql.kyuubi.SparkDatasetHelper$.$anonfun$executeArrowBatchCollect$1(SparkDatasetHelper.scala:51)
at org.apache.spark.sql.kyuubi.SparkDatasetHelper$
...k-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM if CI pass
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work, thanks
thanks, merging to master/branch-1.7 |
… shuffle for outermost limit ### _Why are the changes needed?_ The fundamental concept is to execute a job similar to the way in which `CollectLimitExec.executeCollect()` operates. ```sql select * from parquet.`parquet/tpcds/sf1000/catalog_sales` limit 20; ``` Before this PR:   After this PR:   ### _How was this patch tested?_ - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request Closes #4662 from cfmcgrady/arrow-collect-limit-exec-2. Closes #4662 82c912e [Fu Chen] close vector 130bcb1 [Fu Chen] finally close facc13f [Fu Chen] exclude rule OptimizeLimitZero 3700839 [Fu Chen] SparkArrowbasedOperationSuite adapt Spark-3.1.x 6064ab9 [Fu Chen] limit = 0 test case 6d596fc [Fu Chen] address comment 8280783 [Fu Chen] add `isStaticConfigKey` to adapt Spark-3.1.x 22cc70f [Fu Chen] add ut b72bc6f [Fu Chen] add offset support to adapt Spark-3.4.x 9ffb44f [Fu Chen] make toBatchIterator private c83cf3f [Fu Chen] SparkArrowbasedOperationSuite adapt Spark-3.1.x 573a262 [Fu Chen] fix 4cef204 [Fu Chen] SparkArrowbasedOperationSuite adapt Spark-3.1.x d70aee3 [Fu Chen] SparkPlan.session -> SparkSession.active to adapt Spark-3.1.x e3bf84c [Fu Chen] refactor 81886f0 [Fu Chen] address comment 2286afc [Fu Chen] reflective calla AdaptiveSparkPlanExec.finalPhysicalPlan 03d0747 [Fu Chen] address comment 25e4f05 [Fu Chen] add docs 885cf2c [Fu Chen] infer row size by schema.defaultSize 4e7ca54 [Fu Chen] unnecessarily changes ee5a756 [Fu Chen] revert unnecessarily changes 6c5b1eb [Fu Chen] add ut 4212a89 [Fu Chen] refactor and add ut ed8c692 [Fu Chen] refactor 0088671 [Fu Chen] refine 8593d85 [Fu Chen] driver slice last batch a584943 [Fu Chen] arrow take Authored-by: Fu Chen <cfmcgrady@gmail.com> Signed-off-by: ulyssesyou <ulyssesyou@apache.org> (cherry picked from commit 1a65125) Signed-off-by: ulyssesyou <ulyssesyou@apache.org>
Why are the changes needed?
The fundamental concept is to execute a job similar to the way in which
CollectLimitExec.executeCollect()
operates.Before this PR:

After this PR:
How was this patch tested?
Add some test cases that check the changes thoroughly including negative and positive cases if possible
Add screenshots for manual tests if appropriate
Run test locally before make a pull request