Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-943] Optimize data conversion for String/Binary type in Row2Columnar #969

Merged
merged 4 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 68 additions & 45 deletions native-sql-engine/cpp/src/operators/row_to_columnar_converter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#include "operators/row_to_columnar_converter.h"

#include <immintrin.h>
#include <x86intrin.h>

#include <algorithm>
#include <iostream>

Expand Down Expand Up @@ -46,7 +49,7 @@ arrow::Status CreateArrayData(std::shared_ptr<arrow::Schema> schema, int64_t num
int32_t columnar_id, int64_t fieldOffset,
std::vector<int64_t>& offsets, uint8_t* memory_address_,
std::shared_ptr<arrow::Array>* array,
arrow::MemoryPool* pool) {
arrow::MemoryPool* pool, bool support_avx512) {
auto field = schema->field(columnar_id);
auto type = field->type();

Expand Down Expand Up @@ -281,58 +284,76 @@ arrow::Status CreateArrayData(std::shared_ptr<arrow::Schema> schema, int64_t num
*array = MakeArray(std::make_shared<arrow::ArrayData>(std::move(out_data)));
break;
}
case arrow::BinaryType::type_id: {
std::unique_ptr<arrow::TypeTraits<arrow::BinaryType>::BuilderType> builder_;
std::unique_ptr<arrow::ArrayBuilder> array_builder;
arrow::MakeBuilder(pool, arrow::TypeTraits<arrow::BinaryType>::type_singleton(),
&array_builder);
builder_.reset(arrow::internal::checked_cast<
arrow::TypeTraits<arrow::BinaryType>::BuilderType*>(
array_builder.release()));

using offset_type = typename arrow::BinaryType::offset_type;
for (int64_t position = 0; position < num_rows; position++) {
bool is_null = IsNull(memory_address_ + offsets[position], columnar_id);
if (is_null) {
RETURN_NOT_OK(builder_->AppendNull());
} else {
int64_t offsetAndSize;
memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset,
sizeof(int64_t));
offset_type length = int32_t(offsetAndSize);
int32_t wordoffset = int32_t(offsetAndSize >> 32);
RETURN_NOT_OK(
builder_->Append(memory_address_ + offsets[position] + wordoffset, length));
}
}
auto status = builder_->Finish(array);
break;
}
case arrow::BinaryType::type_id:
case arrow::StringType::type_id: {
std::unique_ptr<arrow::TypeTraits<arrow::StringType>::BuilderType> builder_;
std::unique_ptr<arrow::ArrayBuilder> array_builder;
arrow::MakeBuilder(pool, arrow::TypeTraits<arrow::StringType>::type_singleton(),
&array_builder);
builder_.reset(arrow::internal::checked_cast<
arrow::TypeTraits<arrow::StringType>::BuilderType*>(
array_builder.release()));

arrow::ArrayData out_data;
out_data.length = num_rows;
out_data.buffers.resize(3);
out_data.type = field->type();
using offset_type = typename arrow::StringType::offset_type;
ARROW_ASSIGN_OR_RAISE(out_data.buffers[0], AllocateBitmap(num_rows, pool));
ARROW_ASSIGN_OR_RAISE(out_data.buffers[1],
AllocateBuffer(sizeof(offset_type) * (num_rows + 1), pool));
ARROW_ASSIGN_OR_RAISE(out_data.buffers[2],
AllocateResizableBuffer(20 * num_rows, pool));
auto validity_buffer = out_data.buffers[0]->mutable_data();
// initialize all true once allocated
memset(validity_buffer, 0xff, out_data.buffers[0]->capacity());
auto array_offset = out_data.GetMutableValues<offset_type>(1);
auto array_data = out_data.buffers[2]->mutable_data();
int64_t null_count = 0;

array_offset[0] = 0;
for (int64_t position = 0; position < num_rows; position++) {
bool is_null = IsNull(memory_address_ + offsets[position], columnar_id);
if (is_null) {
RETURN_NOT_OK(builder_->AppendNull());
arrow::BitUtil::SetBitTo(validity_buffer, position, false);
array_offset[position + 1] = array_offset[position];
null_count++;
} else {
int64_t offsetAndSize;
memcpy(&offsetAndSize, memory_address_ + offsets[position] + fieldOffset,
sizeof(int64_t));
int64_t offsetAndSize =
*(int64_t*)(memory_address_ + offsets[position] + fieldOffset);
offset_type length = int32_t(offsetAndSize);
int32_t wordoffset = int32_t(offsetAndSize >> 32);
RETURN_NOT_OK(
builder_->Append(memory_address_ + offsets[position] + wordoffset, length));
auto value_offset = array_offset[position + 1] =
array_offset[position] + length;
uint64_t capacity = out_data.buffers[2]->capacity();

if (ARROW_PREDICT_FALSE(value_offset >= capacity)) {
// allocate value buffer again
// enlarge the buffer by 1.5x
capacity = capacity + std::max((capacity >> 1), (uint64_t)length);
auto value_buffer =
std::static_pointer_cast<arrow::ResizableBuffer>(out_data.buffers[2]);
value_buffer->Reserve(capacity);
array_data = value_buffer->mutable_data();
}

auto dst_value_base = array_data + array_offset[position];
auto value_src_ptr = memory_address_ + offsets[position] + wordoffset;
#ifdef __AVX512BW__
if (ARROW_PREDICT_TRUE(support_avx512)) {
// write the variable value
uint32_t k;
for (k = 0; k + 32 < length; k += 32) {
__m256i v = _mm256_loadu_si256((const __m256i*)(value_src_ptr + k));
_mm256_storeu_si256((__m256i*)(dst_value_base + k), v);
}
auto mask = (1L << (length - k)) - 1;
__m256i v = _mm256_maskz_loadu_epi8(mask, value_src_ptr + k);
_mm256_mask_storeu_epi8(dst_value_base + k, mask, v);
} else
#endif
{
memcpy(dst_value_base, value_src_ptr, length);
}
}
}
auto status = builder_->Finish(array);
out_data.null_count = null_count;
if (null_count == 0) {
out_data.buffers[0] == nullptr;
}
*array = MakeArray(std::make_shared<arrow::ArrayData>(std::move(out_data)));
break;
}
case arrow::Decimal128Type::type_id: {
Expand Down Expand Up @@ -967,6 +988,7 @@ arrow::Status CreateArrayData(std::shared_ptr<arrow::Schema> schema, int64_t num
}

arrow::Status RowToColumnarConverter::Init(std::shared_ptr<arrow::RecordBatch>* batch) {
support_avx512_ = __builtin_cpu_supports("avx512bw");
int64_t nullBitsetWidthInBytes = CalculateBitSetWidthInBytes(num_cols_);
for (auto i = 0; i < num_rows_; i++) {
offsets_.push_back(0);
Expand All @@ -982,12 +1004,13 @@ arrow::Status RowToColumnarConverter::Init(std::shared_ptr<arrow::RecordBatch>*
std::shared_ptr<arrow::Array> array_data;
int64_t field_offset = GetFieldOffset(nullBitsetWidthInBytes, i);
RETURN_NOT_OK(CreateArrayData(schema_, num_rows_, i, field_offset, offsets_,
memory_address_, &array_data, m_pool_));
memory_address_, &array_data, m_pool_,
support_avx512_));
arrays.push_back(array_data);
}
*batch = arrow::RecordBatch::Make(schema_, num_rows_, arrays);
return arrow::Status::OK();
}

} // namespace rowtocolumnar
} // namespace sparkcolumnarplugin
} // namespace sparkcolumnarplugin
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class RowToColumnarConverter {
arrow::Status Init(std::shared_ptr<arrow::RecordBatch>* batch);

protected:
// Check whether support AVX512 instructions
bool support_avx512_;
std::shared_ptr<arrow::Schema> schema_;
int64_t num_cols_;
int64_t num_rows_;
Expand Down