Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-514] Fix the core dump issue in Q93 with V2 test (#515)
Browse files Browse the repository at this point in the history
* Fix the core dump issue in Q93 with V2 test and code refine

* clang format

* memory size update
  • Loading branch information
JkSelf authored Sep 23, 2021
1 parent 16326a8 commit da1756b
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

package com.intel.oap.vectorized;


import java.nio.ByteBuffer;

public class ArrowColumnarToRowInfo {
public long instanceID;
public long[] offsets;
public long[] lengths;
public long memoryAddress;

public ArrowColumnarToRowInfo(long instanceID,
long[] offsets, long[] lengths) {
long[] offsets, long[] lengths, long memoryAddress) {
this.instanceID = instanceID;
this.offsets = offsets;
this.lengths = lengths;
this.memoryAddress = memoryAddress;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package com.intel.oap.vectorized;

import sun.nio.ch.DirectBuffer;

import java.io.IOException;
import java.nio.ByteBuffer;

public class ArrowColumnarToRowJniWrapper {

Expand All @@ -30,6 +27,6 @@ public ArrowColumnarToRowJniWrapper() throws IOException {

public native ArrowColumnarToRowInfo nativeConvertColumnarToRow(
byte[] schema, int numRows, long[] bufAddrs,
long[] bufSizes, long memoryAddress, long memorySize, long fixedSizePerRow) throws RuntimeException;
long[] bufSizes, long memoryPollID) throws RuntimeException;
public native void nativeClose(long instanceID);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ package com.intel.oap.execution

import com.intel.oap.expression.ConverterUtils
import com.intel.oap.vectorized.{ArrowColumnarToRowJniWrapper, ArrowWritableColumnVector}
import org.apache.arrow.vector.BaseVariableWidthVector
import org.apache.arrow.vector.types.pojo.{ArrowType, Field, Schema}
import org.apache.arrow.vector.types.pojo.{Field, Schema}

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.execution.datasources.v2.arrow.SparkMemoryUtils
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.{ColumnarToRowExec, SparkPlan}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.array.ByteArrayMethods

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -83,19 +82,6 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
val schema = new Schema(fields.asJava)
ConverterUtils.getSchemaBytesBuf(schema)
}
// For decimal type if the precision > 18 will need 16 bytes variable size.
def containDecimalCol(field: StructField): Boolean = field.dataType match {
case d: DecimalType if d.precision > 18 => true
case _ => false
}

def estimateBufferSize(numCols: Int, numRows: Int): Int = {
val fields = child.schema.fields
val decimalCols = fields.filter(field => containDecimalCol(field)).length
val fixedLength = UnsafeRow.calculateBitSetWidthInBytes(numCols) + numCols * 8
val decimalColSize = 16 * decimalCols
(fixedLength + decimalColSize) * numRows
}

batches.flatMap { batch =>
numInputBatches += 1
Expand All @@ -117,33 +103,10 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
val bufAddrs = new ListBuffer[Long]()
val bufSizes = new ListBuffer[Long]()
val fields = new ListBuffer[Field]()
var totalVariableSize = 0L
(0 until batch.numCols).foreach { idx =>
val column = batch.column(idx).asInstanceOf[ArrowWritableColumnVector]
fields += column.getValueVector.getField
val valueVector = column.getValueVector
if (valueVector.isInstanceOf[BaseVariableWidthVector]) {
// Calculate the total aligned size of variable cols
val arrowType = column.getValueVector.getField.getFieldType.getType
for (rowId <- 0 until batch.numRows()) {
val variableColSize = arrowType match {
case ArrowType.Utf8.INSTANCE =>
val value = column.getUTF8String(rowId)
if (value == null) {
0
} else value.numBytes()
case ArrowType.Binary.INSTANCE =>
val value = column.getBinary(rowId)
if (value == null) {
0
} else value.length
case _ => 0
}
val alignedSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(variableColSize)
totalVariableSize += alignedSize
}
}
valueVector.getBuffers(false)
column.getValueVector.getBuffers(false)
.foreach { buffer =>
bufAddrs += buffer.memoryAddress()
bufSizes += buffer.readableBytes()
Expand All @@ -155,14 +118,10 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
}

val beforeConvert = System.nanoTime()
val totalFixedSize = estimateBufferSize(batch.numCols(), batch.numRows())
val size = totalFixedSize + totalVariableSize.toInt

val allocator = SparkMemoryUtils.contextAllocator()
val arrowBuf = allocator.buffer(size)
val info = jniWrapper.nativeConvertColumnarToRow(
arrowSchema, batch.numRows, bufAddrs.toArray, bufSizes.toArray,
arrowBuf.memoryAddress(), size, totalFixedSize / batch.numRows())
SparkMemoryUtils.contextMemoryPool().getNativeInstanceId)

convertTime += NANOSECONDS.toMillis(System.nanoTime() - beforeConvert)

Expand All @@ -173,7 +132,6 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
override def hasNext: Boolean = {
val result = rowId < batch.numRows()
if (!result && !closed) {
arrowBuf.release()
jniWrapper.nativeClose(info.instanceID)
closed = true
}
Expand All @@ -184,7 +142,7 @@ class ArrowColumnarToRowExec(child: SparkPlan) extends ColumnarToRowExec(child =
if (rowId >= batch.numRows()) throw new NoSuchElementException

val (offset, length) = (info.offsets(rowId), info.lengths(rowId))
row.pointTo(null, arrowBuf.memoryAddress() + offset, length.toInt)
row.pointTo(null, info.memoryAddress + offset, length.toInt)
rowId += 1
row
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,9 @@ TEST_F(BenchmarkColumnarToRow, test) {
TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch));

if (record_batch) {
std::shared_ptr<arrow::Buffer> buffer;
buffer = *arrow::AllocateBuffer(419430400);

uint8_t* address = buffer->mutable_data();
std::shared_ptr<ColumnarToRowConverter> unsafe_row_writer_reader =
std::make_shared<ColumnarToRowConverter>(record_batch, address);
std::make_shared<ColumnarToRowConverter>(record_batch,
arrow::default_memory_pool());

TIME_NANO_OR_THROW(elapse_init, unsafe_row_writer_reader->Init());
TIME_NANO_OR_THROW(elapse_write, unsafe_row_writer_reader->Write());
Expand Down
23 changes: 12 additions & 11 deletions native-sql-engine/cpp/src/jni/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
arrow_columnar_to_row_info_class = CreateGlobalClassReference(
env, "Lcom/intel/oap/vectorized/ArrowColumnarToRowInfo;");
arrow_columnar_to_row_info_constructor =
GetMethodID(env, arrow_columnar_to_row_info_class, "<init>", "(J[J[J)V");
GetMethodID(env, arrow_columnar_to_row_info_class, "<init>", "(J[J[JJ)V");

return JNI_VERSION;
}
Expand Down Expand Up @@ -1504,8 +1504,7 @@ JNIEXPORT void JNICALL Java_com_intel_oap_vectorized_ShuffleDecompressionJniWrap
JNIEXPORT jobject JNICALL
Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeConvertColumnarToRow(
JNIEnv* env, jobject, jbyteArray schema_arr, jint num_rows, jlongArray buf_addrs,
jlongArray buf_sizes, jlong memory_address, jlong memory_size,
jlong fixed_size_per_row) {
jlongArray buf_sizes, jlong memory_pool_id) {
if (schema_arr == NULL) {
env->ThrowNew(
illegal_argument_exception_class,
Expand Down Expand Up @@ -1558,13 +1557,17 @@ Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeConvertColumnar
return NULL;
}

uint8_t* address = reinterpret_cast<uint8_t*>(memory_address);

// convert the record batch to spark unsafe row.
try {
auto* pool = reinterpret_cast<arrow::MemoryPool*>(memory_pool_id);
if (pool == nullptr) {
env->ThrowNew(illegal_argument_exception_class,
"Memory pool does not exist or has been closed");
return NULL;
}

std::shared_ptr<ColumnarToRowConverter> columnar_to_row_converter =
std::make_shared<ColumnarToRowConverter>(rb, address, memory_size,
fixed_size_per_row);
std::make_shared<ColumnarToRowConverter>(rb, pool);
auto status = columnar_to_row_converter->Init();
if (!status.ok()) {
env->ThrowNew(illegal_argument_exception_class,
Expand All @@ -1574,9 +1577,7 @@ Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeConvertColumnar
.c_str());
return NULL;
}

status = columnar_to_row_converter->Write();

if (!status.ok()) {
env->ThrowNew(
illegal_argument_exception_class,
Expand All @@ -1598,12 +1599,12 @@ Java_com_intel_oap_vectorized_ArrowColumnarToRowJniWrapper_nativeConvertColumnar
auto lengths_arr = env->NewLongArray(num_rows);
auto lengths_src = reinterpret_cast<const jlong*>(lengths.data());
env->SetLongArrayRegion(lengths_arr, 0, num_rows, lengths_src);
long address = reinterpret_cast<long>(columnar_to_row_converter->GetBufferAddress());

jobject arrow_columnar_to_row_info = env->NewObject(
arrow_columnar_to_row_info_class, arrow_columnar_to_row_info_constructor,
instanceID, offsets_arr, lengths_arr);
instanceID, offsets_arr, lengths_arr, address);
return arrow_columnar_to_row_info;

} catch (const std::runtime_error& error) {
env->ThrowNew(unsupportedoperation_exception_class, error.what());
} catch (const std::exception& error) {
Expand Down
100 changes: 64 additions & 36 deletions native-sql-engine/cpp/src/operators/columnar_to_row_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,82 @@

#include "operators/columnar_to_row_converter.h"

#include <iostream>

namespace sparkcolumnarplugin {
namespace columnartorow {

int64_t CalculateBitSetWidthInBytes(int32_t numFields) {
return ((numFields + 63) / 64) * 8;
}

int64_t RoundNumberOfBytesToNearestWord(int64_t numBytes) {
int64_t remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8`
if (remainder == 0) {
return numBytes;
} else {
return numBytes + (8 - remainder);
}
}

int64_t CalculatedFixeSizePerRow(std::shared_ptr<arrow::Schema> schema,
int64_t num_cols) {
std::vector<std::shared_ptr<arrow::Field>> fields = schema->fields();
// Calculate the decimal col num when the precision >18
int32_t count = 0;
for (auto i = 0; i < num_cols; i++) {
auto type = fields[i]->type();
if (type->id() == arrow::Decimal128Type::type_id) {
auto dtype = dynamic_cast<arrow::Decimal128Type*>(type.get());
int32_t precision = dtype->precision();
if (precision > 18) count++;
}
}

int64_t fixed_size = CalculateBitSetWidthInBytes(num_cols) + num_cols * 8;
int64_t decimal_cols_size = count * 16;
return fixed_size + decimal_cols_size;
}

arrow::Status ColumnarToRowConverter::Init() {
num_rows_ = rb_->num_rows();
num_cols_ = rb_->num_columns();
// Calculate the initial size
nullBitsetWidthInBytes_ = CalculateBitSetWidthInBytes(num_cols_);
memset(buffer_address_, 0, sizeof(int8_t) * memory_size_);

int64_t fixed_size_per_row = CalculatedFixeSizePerRow(rb_->schema(), num_cols_);

// Initialize the offsets_ , lengths_, buffer_cursor_
for (auto i = 0; i < num_rows_; i++) {
lengths_.push_back(fixed_size_per_row);
offsets_.push_back(0);
buffer_cursor_.push_back(nullBitsetWidthInBytes_ + 8 * num_cols_);
}
// Calculated the lengths_
for (auto i = 0; i < num_cols_; i++) {
auto array = rb_->column(i);
if (arrow::is_binary_like(array->type_id())) {
auto binary_array = std::static_pointer_cast<arrow::BinaryArray>(array);
using offset_type = typename arrow::BinaryType::offset_type;
offset_type length;
for (auto j = 0; j < num_rows_; j++) {
auto value = binary_array->GetValue(j, &length);
lengths_[j] += RoundNumberOfBytesToNearestWord(length);
}
}
}
// Calculated the offsets_ and total memory size based on lengths_
int64_t total_memory_size = lengths_[0];
for (auto i = 1; i < num_rows_; i++) {
offsets_[i] = offsets_[i - 1] + lengths_[i - 1];
total_memory_size += lengths_[i];
}

ARROW_ASSIGN_OR_RAISE(buffer_, AllocateBuffer(total_memory_size, memory_pool_));

memset(buffer_->mutable_data(), 0, sizeof(int8_t) * total_memory_size);

buffer_address_ = buffer_->mutable_data();
return arrow::Status::OK();
}

Expand Down Expand Up @@ -429,42 +492,7 @@ arrow::Status WriteValue(uint8_t* buffer_address, int64_t field_offset,
return arrow::Status::OK();
}

int64_t RoundNumberOfBytesToNearestWord(int64_t numBytes) {
int64_t remainder = numBytes & 0x07; // This is equivalent to `numBytes % 8`
if (remainder == 0) {
return numBytes;
} else {
return numBytes + (8 - remainder);
}
}

arrow::Status ColumnarToRowConverter::Write() {
// Initialize the offsets_ , lengths_, buffer_cursor_
for (auto i = 0; i < num_rows_; i++) {
lengths_.push_back(fixed_size_per_row_);
offsets_.push_back(0);
buffer_cursor_.push_back(nullBitsetWidthInBytes_ + 8 * num_cols_);
}

// Calculated the lengths_
for (auto i = 0; i < num_cols_; i++) {
auto array = rb_->column(i);
if (arrow::is_binary_like(array->type_id())) {
auto binary_array = std::static_pointer_cast<arrow::BinaryArray>(array);
using offset_type = typename arrow::BinaryType::offset_type;
offset_type length;
for (auto j = 0; j < num_rows_; j++) {
auto value = binary_array->GetValue(j, &length);
lengths_[j] += RoundNumberOfBytesToNearestWord(length);
}
}
}

// Calculated the offsets_ based on lengths_
for (auto i = 1; i < num_rows_; i++) {
offsets_[i] = offsets_[i - 1] + lengths_[i - 1];
}

for (auto i = 0; i < num_cols_; i++) {
auto array = rb_->column(i);
int64_t field_offset = GetFieldOffset(nullBitsetWidthInBytes_, i);
Expand Down
15 changes: 6 additions & 9 deletions native-sql-engine/cpp/src/operators/columnar_to_row_converter.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,29 +32,26 @@ namespace columnartorow {

class ColumnarToRowConverter {
public:
ColumnarToRowConverter(std::shared_ptr<arrow::RecordBatch> rb, uint8_t* buffer_address,
int64_t memory_size = 0, int64_t fixed_size_per_row = 0)
: rb_(rb),
buffer_address_(buffer_address),
memory_size_(memory_size),
fixed_size_per_row_(fixed_size_per_row) {}
ColumnarToRowConverter(std::shared_ptr<arrow::RecordBatch> rb,
arrow::MemoryPool* memory_pool)
: rb_(rb), memory_pool_(memory_pool) {}

arrow::Status Init();
arrow::Status Write();

uint8_t* GetBufferAddress() { return buffer_address_; } // for test
uint8_t* GetBufferAddress() { return buffer_address_; }
const std::vector<int64_t>& GetOffsets() { return offsets_; }
const std::vector<int64_t>& GetLengths() { return lengths_; }

protected:
std::vector<int64_t> buffer_cursor_;
std::shared_ptr<arrow::RecordBatch> rb_;
std::shared_ptr<arrow::Buffer> buffer_;
arrow::MemoryPool* memory_pool_ = arrow::default_memory_pool();
int64_t nullBitsetWidthInBytes_;
int64_t num_cols_;
int64_t num_rows_;
uint8_t* buffer_address_;
int64_t memory_size_;
int64_t fixed_size_per_row_;
std::vector<int64_t> offsets_;
std::vector<int64_t> lengths_;
};
Expand Down
Loading

0 comments on commit da1756b

Please sign in to comment.