Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
ArnavBalyan committed Jan 1, 2025
1 parent 715fc49 commit 0fcbd7f
Show file tree
Hide file tree
Showing 11 changed files with 639 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,4 +547,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportColumnarArrowUdf(): Boolean = true

override def needPreComputeRangeFrameBoundary(): Boolean = true

override def supportRangeExec(): Boolean = true

}
Original file line number Diff line number Diff line change
Expand Up @@ -827,4 +827,15 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
attributeSeq: Seq[Attribute]): ExpressionTransformer = {
VeloxHiveUDFTransformer.replaceWithExpressionTransformer(expr, attributeSeq)
}

override def genRangeExecTransformer(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]): RangeExecBaseTransformer =
RangeExecTransformer(start, end, step, numSlices, numElements, outputAttributes, child)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.columnarbatch.ColumnarBatches
import org.apache.gluten.iterator.Iterators
import org.apache.gluten.memory.arrow.alloc.ArrowBufferAllocators
import org.apache.gluten.vectorized.ArrowWritableColumnVector

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.utils.SparkArrowUtil
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

/**
* RangeExecTransformer is a concrete implementation of RangeExecBaseTransformer that executes the
* Range operation and supports columnar processing. It generates columnar batches for the specified
* range.
*
* @param start
* Starting value of the range.
* @param end
* Ending value of the range.
* @param step
* Step size for the range.
* @param numSlices
* Number of slices for partitioning the range.
* @param numElements
* Total number of elements in the range.
* @param outputAttributes
* Attributes defining the output schema of the operator.
* @param child
* Child SparkPlan nodes for this operator, if any.
*/
case class RangeExecTransformer(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]
) extends RangeExecBaseTransformer(
start,
end,
step,
numSlices,
numElements,
outputAttributes,
child) {

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
if (start == end || (start < end ^ 0 < step)) {
sparkContext.emptyRDD[ColumnarBatch]
} else {
sparkContext
.parallelize(0 until numSlices, numSlices)
.mapPartitionsWithIndex {
(partitionIndex, _) =>
val allocator = ArrowBufferAllocators.contextInstance()
val sessionLocalTimeZone = SQLConf.get.sessionLocalTimeZone
val arrowSchema = SparkArrowUtil.toArrowSchema(schema, SQLConf.get.sessionLocalTimeZone)

val batchSize = 1000
val safePartitionStart =
start + step * (partitionIndex * numElements.toLong / numSlices)
val safePartitionEnd =
start + step * ((partitionIndex + 1) * numElements.toLong / numSlices)

/**
* Generates the columnar batches for the specified range. Each batch contains a subset
* of the range values, managed using Arrow column vectors.
*/
val iterator = new Iterator[ColumnarBatch] {
var current = safePartitionStart

override def hasNext: Boolean = {
if (step > 0) {
current < safePartitionEnd
} else {
current > safePartitionEnd
}
}

override def next(): ColumnarBatch = {
val numRows = math.min(
((safePartitionEnd - current) / step).toInt.max(1),
batchSize
)

val vectors = ArrowWritableColumnVector.allocateColumns(numRows, schema)

for (i <- 0 until numRows) {
val value = current + i * step
vectors(0).putLong(i, value)
}
vectors.foreach(_.setValueCount(numRows))
current += numRows * step

val batch = new ColumnarBatch(vectors.asInstanceOf[Array[ColumnVector]], numRows)
val offloadedBatch = ColumnarBatches.offload(allocator, batch)
offloadedBatch
}
}
Iterators
.wrap(iterator)
.recyclePayload(
batch => {
batch.close()
})
.recycleIterator {
allocator.close()
}
.create()

}
}
}

override protected def doExecute(): RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException("doExecute is not supported for this operator")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,8 @@ trait BackendSettingsApi {
def supportColumnarArrowUdf(): Boolean = false

def needPreComputeRangeFrameBoundary(): Boolean = false

def supportRangeExec(): Boolean = {
false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -694,4 +694,14 @@ trait SparkPlanExecApi {
limitExpr: ExpressionTransformer,
original: StringSplit): ExpressionTransformer =
GenericExpressionTransformer(substraitExprName, Seq(srcExpr, regexExpr, limitExpr), original)

def genRangeExecTransformer(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan]): RangeExecBaseTransformer

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution

import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.extension.ValidationResult
import org.apache.gluten.extension.columnar.transition.Convention

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.{LeafExecNode, RangeExec, SparkPlan}

/**
* Base class for RangeExec transformation, can be implemented by the by supported backends.
* Currently velox is supported.
*/
abstract class RangeExecBaseTransformer(
start: Long,
end: Long,
step: Long,
numSlices: Int,
numElements: BigInt,
outputAttributes: Seq[Attribute],
child: Seq[SparkPlan])
extends LeafExecNode
with ValidatablePlan {

override def output: Seq[Attribute] = {
outputAttributes
}

override protected def doValidateInternal(): ValidationResult = {
val isSupported = BackendsApiManager.getSettings.supportRangeExec()

if (!isSupported) {
return ValidationResult.failed(
s"RangeExec is not supported by the current backend."
)
}
ValidationResult.succeeded
}

override def batchType(): Convention.BatchType = {
BackendsApiManager.getSettings.primaryBatchType
}

override def rowType0(): Convention.RowType = Convention.RowType.None

override protected def doExecute()
: org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support doExecute().")
}
}

/**
* Companion object for RangeExecBaseTransformer, provides factory methods to create instance from
* existing RangeExec plan.
*/
object RangeExecBaseTransformer {
def from(rangeExec: RangeExec): RangeExecBaseTransformer = {
BackendsApiManager.getSparkPlanExecApiInstance
.genRangeExecTransformer(
rangeExec.start,
rangeExec.end,
rangeExec.step,
rangeExec.numSlices,
rangeExec.numElements,
rangeExec.output,
rangeExec.children
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,17 @@ object OffloadOthers {
child,
plan.evalType)
}
case plan: RangeExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
BackendsApiManager.getSparkPlanExecApiInstance.genRangeExecTransformer(
plan.start,
plan.end,
plan.step,
plan.numSlices,
plan.numElements,
plan.output,
plan.children
)
case plan: SampleExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.")
val child = plan.child
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.gluten.execution.RangeExecTransformer

import org.apache.spark.sql.GlutenSQLTestsTrait
import org.apache.spark.sql.Row

class GlutenSQLRangeExecSuite extends GlutenSQLTestsTrait {

testGluten("RangeExecTransformer produces correct results") {
val df = spark.range(0, 10, 1)
val expectedData = (0L until 10L).map(Row(_)).toSeq
checkAnswer(df, expectedData)

assert(
getExecutedPlan(df).exists {
case _: RangeExecTransformer => true
case _ => false
}
)
}

testGluten("RangeExecTransformer with step") {
val df = spark.range(5, 15, 2)
val expectedData = Seq(523L, 7L, 9L, 11L, 13L).map(Row(_))
checkAnswer(df, expectedData)

assert(
getExecutedPlan(df).exists {
case _: RangeExecTransformer => true
case _ => false
}
)
}

testGluten("RangeExecTransformer with filter") {
val df = spark.range(0, 20, 1).filter("id % 3 == 0")
val expectedData = Seq(0L, 3L, 6L, 9L, 12L, 15L, 18L).map(Row(_))
checkAnswer(df, expectedData)

assert(
getExecutedPlan(df).exists {
case _: RangeExecTransformer => true
case _ => false
}
)
}

testGluten("RangeExecTransformer with aggregation") {
val df = spark.range(1, 6, 1)
val sumDf = df.agg(sum("id"))
val expectedData = Seq(Row(15L))
checkAnswer(sumDf, expectedData)

assert(
getExecutedPlan(sumDf).exists {
case _: RangeExecTransformer => true
case _ => false
}
)
}

testGluten("RangeExecTransformer with join") {
val df1 = spark.range(0, 5, 1).toDF("id1")
val df2 = spark.range(3, 8, 1).toDF("id2")
val joinDf = df1.join(df2, df1("id1") === df2("id2"))
val expectedData = Seq(Row(3L, 3L), Row(4L, 4L))
checkAnswer(joinDf, expectedData)

assert(
getExecutedPlan(joinDf).exists {
case _: RangeExecTransformer => true
case _ => false
}
)
}
}
Loading

0 comments on commit 0fcbd7f

Please sign in to comment.