Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-7313][VL] Explicit Arrow transitions, part 1: add LoadArrowDataExec / OffloadArrowDataExec #7343

Merged
merged 6 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.columnarbatch

import org.apache.gluten.execution.{RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.execution.{LoadArrowDataExec, OffloadArrowDataExec, RowToVeloxColumnarExec, VeloxColumnarToRowExec}
import org.apache.gluten.extension.columnar.transition.{Convention, TransitionDef}

import org.apache.spark.sql.execution.SparkPlan
Expand All @@ -34,7 +34,29 @@ object VeloxBatch extends Convention.BatchType {
VeloxColumnarToRowExec(plan)
})

// Velox batch is considered one-way compatible with Arrow batch.
// This is practically achieved by utilizing C++ API VeloxColumnarBatch::from at runtime.
fromBatch(ArrowBatch, TransitionDef.empty)
// TODO: Add explicit transitions between Arrow native batch and Velox batch.
// See https://github.com/apache/incubator-gluten/issues/7313.

fromBatch(
ArrowBatches.ArrowJavaBatch,
() =>
(plan: SparkPlan) => {
OffloadArrowDataExec(plan)
})

toBatch(
ArrowBatches.ArrowJavaBatch,
() =>
(plan: SparkPlan) => {
LoadArrowDataExec(plan)
})

fromBatch(
ArrowBatches.ArrowNativeBatch,
() =>
(plan: SparkPlan) => {
LoadArrowDataExec(plan)
})

toBatch(ArrowBatches.ArrowNativeBatch, TransitionDef.empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ object ArrowCSVFileFormat {
batchSize
)
veloxBatch
.map(v => ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), v))
.map(v => ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), v))
}

private def toAttribute(field: StructField): AttributeReference =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import scala.collection.JavaConverters._
/**
* An operator to resize input batches by appending the later batches to the one that comes earlier,
* or splitting one batch to smaller ones.
*
* FIXME: Code duplication with ColumnarToColumnarExec.
*/
case class VeloxResizeBatchesExec(
override val child: SparkPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.gluten.execution.datasource.v2

import org.apache.gluten.columnarbatch.ArrowBatch
import org.apache.gluten.columnarbatch.ArrowBatches
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.transition.Convention

Expand All @@ -34,7 +34,7 @@ case class ArrowBatchScanExec(original: BatchScanExec)
@transient lazy val batch: Batch = original.batch

override protected def batchType0(): Convention.BatchType = {
ArrowBatch
ArrowBatches.ArrowJavaBatch
}

override lazy val readerFactory: PartitionReaderFactory = original.readerFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package org.apache.spark.api.python

import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.columnarbatch.{ColumnarBatches, VeloxBatch}
import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
import org.apache.gluten.exception.GlutenException
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.transition.{Convention, ConventionReq}
import org.apache.gluten.extension.columnar.transition.ConventionReq.KnownChildrenConventions
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.utils.PullOutProjectHelper
Expand Down Expand Up @@ -209,9 +212,20 @@ case class ColumnarArrowEvalPythonExec(
child: SparkPlan,
evalType: Int)
extends EvalPythonExec
with GlutenPlan {
with GlutenPlan
with KnownChildrenConventions {
override def supportsColumnar: Boolean = true

override protected def batchType0(): Convention.BatchType = ArrowJavaBatch

// FIXME: Make this accepts ArrowJavaBatch as input. Before doing that, a weight-based
// shortest patch algorithm should be added into transition factory. So that the factory
// can find out row->velox->arrow-native->arrow-java as the possible viable transition.
// Otherwise with current solution, any input (even already in Arrow Java format) will be
// converted into Velox format then into Arrow Java format before entering python runner.
override def requiredChildrenConventions(): Seq[ConventionReq] = List(
ConventionReq.of(ConventionReq.RowType.Any, ConventionReq.BatchType.Is(VeloxBatch)))

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "output_batches"),
Expand Down Expand Up @@ -334,17 +348,17 @@ case class ColumnarArrowEvalPythonExec(
val inputBatchIter = contextAwareIterator.map {
inputCb =>
start_time = System.nanoTime()
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, inputCb)
ColumnarBatches.retain(inputCb)
val loaded = ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), inputCb)
ColumnarBatches.retain(loaded)
// 0. cache input for later merge
inputCbCache += inputCb
numInputRows += inputCb.numRows
inputCbCache += loaded
numInputRows += loaded.numRows
// We only need to pass the referred cols data to python worker for evaluation.
var colsForEval = new ArrayBuffer[ColumnVector]()
for (i <- originalOffsets) {
colsForEval += inputCb.column(i)
colsForEval += loaded.column(i)
}
new ColumnarBatch(colsForEval.toArray, inputCb.numRows())
new ColumnarBatch(colsForEval.toArray, loaded.numRows())
}

val outputColumnarBatchIterator =
Expand All @@ -366,11 +380,9 @@ case class ColumnarArrowEvalPythonExec(
numOutputBatches += 1
numOutputRows += numRows
val batch = new ColumnarBatch(joinedVectors, numRows)
val offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
ColumnarBatches.release(outputCb)
ColumnarBatches.checkLoaded(batch)
procTime += (System.nanoTime() - start_time) / 1000000
offloaded
batch
}
Iterators
.wrap(res)
Expand All @@ -390,13 +402,13 @@ case class ColumnarArrowEvalPythonExec(
if (from > to) {
do {
vector.close()
} while (vector.refCnt() == to)
} while (vector.refCnt() != to)
return
}
// from < to
do {
vector.retain()
} while (vector.refCnt() == to)
} while (vector.refCnt() != to)
}

override protected def withNewChildInternal(newChild: SparkPlan): ColumnarArrowEvalPythonExec =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.execution

import org.apache.gluten.columnarbatch.ArrowBatch
import org.apache.gluten.columnarbatch.ArrowBatches
import org.apache.gluten.extension.GlutenPlan
import org.apache.gluten.extension.columnar.transition.Convention

Expand All @@ -42,7 +42,7 @@ case class ArrowFileSourceScanExec(original: FileSourceScanExec)
override def doCanonicalize(): FileSourceScanExec = original.doCanonicalize()

override protected def batchType0(): Convention.BatchType = {
ArrowBatch
ArrowBatches.ArrowJavaBatch
}

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class VeloxColumnarWriteFilesRDD(
// Currently, the cb contains three columns: row, fragments, and context.
// The first row in the row column contains the number of written numRows.
// The fragments column contains detailed information about the file writes.
val loadedCb = ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance, cb)
val loadedCb = ColumnarBatches.load(ArrowBufferAllocators.contextInstance, cb)
assert(loadedCb.numCols() == 3)
val numWrittenRows = loadedCb.column(0).getLong(0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ trait VeloxFormatWriterInjects extends GlutenFormatWriterInjectsBase {
// the operation will find a zero column batch from a task-local pool
ColumnarBatchJniWrapper.create(runtime).getForEmptySchema(batch.numRows)
} else {
val offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance, batch)
ColumnarBatches.getNativeHandle(offloaded)
ColumnarBatches.checkOffloaded(batch)
ColumnarBatches.getNativeHandle(batch)
}
}
datasourceJniWrapper.writeBatch(dsHandle, batchHandle)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ object ExecUtil {
val pid = rangePartitioner.get.getPartition(partitionKeyExtractor(row))
pidVec.putInt(i, pid)
}
val pidBatch = ColumnarBatches.ensureOffloaded(
val pidBatch = ColumnarBatches.offload(
ArrowBufferAllocators.contextInstance(),
new ColumnarBatch(Array[ColumnVector](pidVec), cb.numRows))
val newHandle = ColumnarBatches.compose(pidBatch, cb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ public void testOffloadAndLoad() {
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), batch);
Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded));
final ColumnarBatch loaded =
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), offloaded);
ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), offloaded);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded));
long cnt =
StreamSupport.stream(
Expand All @@ -69,7 +69,7 @@ public void testCreateByHandle() {
final ColumnarBatch batch = newArrowBatch("a boolean, b int", numRows);
Assert.assertEquals(1, ColumnarBatches.getRefCnt(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), batch);
Assert.assertEquals(1, ColumnarBatches.getRefCnt(offloaded));
final long handle = ColumnarBatches.getNativeHandle(offloaded);
final ColumnarBatch created = ColumnarBatches.create(handle);
Expand Down Expand Up @@ -110,10 +110,10 @@ public void testOffloadAndLoadReadRow() {
col1.putNull(numRows - 1);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(batch));
final ColumnarBatch offloaded =
ColumnarBatches.ensureOffloaded(ArrowBufferAllocators.contextInstance(), batch);
ColumnarBatches.offload(ArrowBufferAllocators.contextInstance(), batch);
Assert.assertTrue(ColumnarBatches.isLightBatch(offloaded));
final ColumnarBatch loaded =
ColumnarBatches.ensureLoaded(ArrowBufferAllocators.contextInstance(), offloaded);
ColumnarBatches.load(ArrowBufferAllocators.contextInstance(), offloaded);
Assert.assertTrue(ColumnarBatches.isHeavyBatch(loaded));
long cnt =
StreamSupport.stream(
Expand Down
Loading
Loading