Skip to content

Commit

Permalink
Native ColumnarToRow
Browse files Browse the repository at this point in the history
  • Loading branch information
parthchandra committed Oct 23, 2024
1 parent dd5134f commit 81e411f
Show file tree
Hide file tree
Showing 14 changed files with 625 additions and 36 deletions.
6 changes: 6 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(false)

val COMET_EXEC_NATIVE_COLUMNAR_TO_ROW_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.nativeColumnarToRow.enabled")
.doc("Experimental support for native columnar to row for fixed width types")
.booleanConf
.createWithDefault(true)

val COMET_EXPR_STDDEV_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(
"stddev",
Expand Down
44 changes: 43 additions & 1 deletion common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.comet.vector
import scala.collection.mutable

import org.apache.arrow.c.{ArrowArray, ArrowImporter, ArrowSchema, CDataDictionaryProvider, Data}
import org.apache.arrow.vector.VectorSchemaRoot
import org.apache.arrow.vector.{NullVector, VectorSchemaRoot}
import org.apache.arrow.vector.dictionary.DictionaryProvider
import org.apache.spark.SparkException
import org.apache.spark.sql.comet.util.Utils
Expand Down Expand Up @@ -136,6 +136,33 @@ class NativeUtil {
numRows.headOption.getOrElse(batch.numRows())
}

/**
* Utility function to check if a batch has vectors with dictionary. Used by native operators
* that may not support dictionary to fallback to Spark
* @param batch
* @return
*/
def hasDictionaryOrNullVector(batch: ColumnarBatch): Boolean = {
var index = 0;
val numCols = batch.numCols()
var hasDictionary = false
var hasNullVector = false
while (index < numCols && !hasDictionary) {
batch.column(index) match {
case vec: CometVector =>
val valueVector = vec.getValueVector
if (valueVector != null && valueVector.getField.getDictionary != null) {
hasDictionary = true
}
if (valueVector == null || valueVector.isInstanceOf[NullVector]) {
hasNullVector = true
}
index = index + 1
}
}
hasDictionary || hasNullVector
}

/**
* Gets the next batch from native execution.
*
Expand Down Expand Up @@ -168,6 +195,21 @@ class NativeUtil {
}
}

/**
* Export a columnarBatch, returning arrays of pointers (addresses) to the underlying arrays and
* schemas
* @param batch
* @return
* arrayAddresses, schemaAddresses
*/
def exportColumnarBatch(batch: ColumnarBatch): (Array[Long], Array[Long]) = {
val (arrays, schemas) = allocateArrowStructs(batch.numCols())
val arrayAddrs = arrays.map(_.memoryAddress())
val schemaAddrs = schemas.map(_.memoryAddress())
exportBatch(arrayAddrs, schemaAddrs, batch)
(arrayAddrs, schemaAddrs)
}

/**
* Imports a list of Arrow addresses from native execution, and return a list of Comet vectors.
*
Expand Down
29 changes: 29 additions & 0 deletions common/src/main/scala/org/apache/spark/sql/comet/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.arrow.vector.types._
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, FieldType, Schema}
import org.apache.spark.{SparkEnv, SparkException}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
Expand Down Expand Up @@ -261,4 +262,32 @@ object Utils {
throw new SparkException(s"Unsupported Arrow Vector for $reason: ${valueVector.getClass}")
}
}

// Given an array of Comet vectors calculate the number of bytes required for a batch of
// UnsafeRow rows to hold the values of the vectors
def getUnsafeRowBatchSize(vectors: Array[CometVector]): Long = {
val bitSetWidth = UnsafeRow.calculateBitSetWidthInBytes(vectors.length)
val num_rows = vectors.apply(0).getValueVector.getValueCount
val dataBytes: Long = vectors
.map(v => {
val dt = fromArrowField(v.getValueVector.getField)
assert(
UnsafeRow.isMutable(dt) || dt.isInstanceOf[BinaryType] || dt.isInstanceOf[StringType])
// For variable length types, assuming that the vector has not been read from, the
// readable bytes are the number of bytes of data in the vector.
val fixedBytes = num_rows * 8L // offset (4 bytes) and length (4 bytes)
val varBytes = dt match {
case datatype if UnsafeRow.isFixedLength(datatype) => 0L
case DecimalType.Fixed(_, _) => num_rows * 16L
case BinaryType | StringType =>
num_rows * 8L + v.getValueVector.getDataBuffer.readableBytes
case _ =>
throw new UnsupportedOperationException(s"Unsupported data type: ${dt.catalogString}")
}
fixedBytes + varBytes
})
.sum
num_rows * bitSetWidth + dataBytes
}

}
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.hashJoin.enabled | Whether to enable hashJoin by default. | true |
| spark.comet.exec.localLimit.enabled | Whether to enable localLimit by default. | true |
| spark.comet.exec.memoryFraction | The fraction of memory from Comet memory overhead that the native memory manager can use for execution. The purpose of this config is to set aside memory for untracked data structures, as well as imprecise size estimation during memory acquisition. Default value is 0.7. | 0.7 |
| spark.comet.exec.nativeColumnarToRow.enabled | Experimental support for native columnar to row for fixed width types | true |
| spark.comet.exec.project.enabled | Whether to enable project by default. | true |
| spark.comet.exec.replaceSortMergeJoin | Experimental feature to force Spark to replace SortMergeJoin with ShuffledHashJoin for improved performance. This feature is not stable yet. For more information, refer to the Comet Tuning Guide (https://datafusion.apache.org/comet/user-guide/tuning.html). | false |
| spark.comet.exec.shuffle.codec | The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. | zstd |
Expand Down
62 changes: 61 additions & 1 deletion native/core/src/execution/jni_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
//! Define JNI APIs which can be called from Java/Scala.

use arrow::datatypes::DataType as ArrowDataType;
use arrow_array::RecordBatch;
use arrow_array::{make_array, Array, ArrayRef, RecordBatch};
use arrow_data::ArrayData;
use datafusion::{
execution::{
disk_manager::DiskManagerConfig,
Expand Down Expand Up @@ -59,6 +60,7 @@ use jni::{
use tokio::runtime::Runtime;

use crate::execution::operators::ScanExec;
use crate::execution::shuffle::row::SparkUnsafeRow;
use log::info;

/// Comet native execution context. Kept alive across JNI calls.
Expand Down Expand Up @@ -564,3 +566,61 @@ pub extern "system" fn Java_org_apache_comet_Native_sortRowPartitionsNative(
Ok(())
})
}

#[no_mangle]
/// Used by Comet ColumnarToRow
/// From a set of vectors and a block of native memory allocated by Spark, convert the vectors
/// into a batch of SparkUnsafeRows. The unsafe rows are written to the memory block passed in
/// Currently implemented only for boolean and numeric types.
pub extern "system" fn Java_org_apache_comet_Native_getUnsafeRowsNative(
e: JNIEnv,
_class: JClass,
_base_object: JObject,
offset: jlong,
_length: jlong,
array_addrs: jlongArray,
schema_addrs: jlongArray,
) -> jlong {
try_unwrap_or_throw(&e, |mut env| {
// SAFETY: JVM unsafe memory allocation is aligned with long.
// let long_array = env.new_long_array(2)?;

let array_address_array = unsafe { JLongArray::from_raw(array_addrs) };
let num_cols = env.get_array_length(&array_address_array)? as usize;

let array_addrs =
unsafe { env.get_array_elements(&array_address_array, ReleaseMode::NoCopyBack)? };
let array_addrs = &*array_addrs;

let schema_address_array = unsafe { JLongArray::from_raw(schema_addrs) };
let schema_addrs =
unsafe { env.get_array_elements(&schema_address_array, ReleaseMode::NoCopyBack)? };
let schema_addrs = &*schema_addrs;

let mut schema: Vec<ArrowDataType> = Vec::with_capacity(num_cols);
let mut arrays: Vec<ArrayRef> = Vec::with_capacity(num_cols);

let mut num_rows = 0;
for i in 0..num_cols {
let array_ptr = array_addrs[i];
let schema_ptr = schema_addrs[i];
let array_data = ArrayData::from_spark((array_ptr, schema_ptr))?;

let array = make_array(array_data);
if num_rows == 0 {
num_rows = array.len();
} else {
assert_eq!(array.len(), num_rows)
}
schema.push(array.data_type().clone());
arrays.push(array);
}

// A MemoryBlock object allocated by UnsafeMemoryAllocator has 'null' as the underlying
// object, and the address of the allocated memory as the offset.
// The base_object passed in is therefore a null pointer and the offset is the raw pointer
// to the memory we wish to write to.
SparkUnsafeRow::get_rows_from_arrays(schema, arrays, num_rows, num_cols, offset as usize);
Ok(array_addrs[0]) // Bogus
})
}
Loading

0 comments on commit 81e411f

Please sign in to comment.