Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-8453] [VL] Allow Heavy Batch to be Processed by ColumnarCachedBatchSerializer #8454

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxColumnarBatches}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zhztheplayer Ok to add VeloxColumnarBatches here?

Copy link
Member

@zhztheplayer zhztheplayer Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's normal to call the utility from Velox backend code. However it seems like discussible on whether to rely on isLightBatch / isHeavyBatch to add conditional transitions.

@ArnavBalyan Would you like to help check if we can somehow add explicit transition nodes (LoadArrowData / OffloadArrowData) into query plan instead of the PR's change? Or is the last Note. in pr description meant for something similar? Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could also refer to a previous effort #7313 if needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review @FelixYBW @zhztheplayer!
Yes, the note was meant for that. Ideally the transitions should have added the correct transition node before this, However the serializer is a special case since it's not an operator and does not extend the GlutenPlan, I have some ideas to explore this which may require some design changes in the serializer to make it work with transitions.

Would it be possible to merge this for now since the ColumnarRange operator depends on it and I'll work on the serializer compatibility for transitions, let me know what you think thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However the serializer is a special case since it's not an operator and does not extend the GlutenPlan

Agreed. The code path is different. Thanks for figuring out on this.

Do you think we can add a UT for the change in this PR? If this can be considered an individual fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure let me add it in the ColumnarRangeExec, since it already has the failing UT thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes please feel free to move forward to the Range PR. I am also testing the relevant code and will help add a test case here.

import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution.{RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.iterator.Iterators
Expand Down Expand Up @@ -171,11 +171,24 @@ class ColumnarCachedBatchSerializer extends CachedBatchSerializer with Logging {
conf: SQLConf): RDD[CachedBatch] = {
input.mapPartitions {
it =>
val lightBatches = it.map {
/* Native code needs a Velox offloaded batch, making sure to offload
if heavy batch is encountered */
batch =>
val heavy = ColumnarBatches.isHeavyBatch(batch)
if (heavy) {
val offloaded = VeloxColumnarBatches.toVeloxBatch(
ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), batch))
offloaded
} else {
batch
}
}
new Iterator[CachedBatch] {
override def hasNext: Boolean = it.hasNext
override def hasNext: Boolean = lightBatches.hasNext

override def next(): CachedBatch = {
val batch = it.next()
val batch = lightBatches.next()
val results =
ColumnarBatchSerializerJniWrapper
.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,15 @@ private static BatchType identifyBatchType(ColumnarBatch batch) {
}

/** Heavy batch: Data is readable from JVM and formatted as Arrow data. */
@VisibleForTesting
static boolean isHeavyBatch(ColumnarBatch batch) {
public static boolean isHeavyBatch(ColumnarBatch batch) {
return identifyBatchType(batch) == BatchType.HEAVY;
}

/**
* Light batch: Data is not readable from JVM, a long int handle (which is a pointer usually) is
* used to bind the batch to a native side implementation.
*/
@VisibleForTesting
static boolean isLightBatch(ColumnarBatch batch) {
public static boolean isLightBatch(ColumnarBatch batch) {
return identifyBatchType(batch) == BatchType.LIGHT;
}

Expand Down Expand Up @@ -230,7 +228,8 @@ public static ColumnarBatch offload(BufferAllocator allocator, ColumnarBatch inp
if (input.numCols() == 0) {
throw new IllegalArgumentException("batch with zero columns cannot be offloaded");
}
// Batch-offloading doesn't involve any backend-specific native code. Use the internal
// Batch-offloading doesn't involve any backend-specific native code. Use the
// internal
// backend to store native batch references only.
final Runtime runtime =
Runtimes.contextInstance(INTERNAL_BACKEND_KIND, "ColumnarBatches#offload");
Expand Down
Loading