Skip to content

Commit

Permalink
Merge pull request #9 from zanmato1984/env-var-temp-stack-size-move-t…
Browse files Browse the repository at this point in the history
…o-internal

Move temp stack vector into internal header and reduce including
  • Loading branch information
zanmato1984 authored May 13, 2024
2 parents 2070627 + d4402de commit a3531bf
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 114 deletions.
3 changes: 2 additions & 1 deletion cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,8 @@ set(ARROW_COMPUTE_SRCS
compute/row/compare_internal.cc
compute/row/grouper.cc
compute/row/row_internal.cc
compute/util.cc)
compute/util.cc
compute/util_internal.cc)

append_runtime_avx2_src(ARROW_COMPUTE_SRCS compute/key_hash_internal_avx2.cc)
append_runtime_avx2_bmi2_src(ARROW_COMPUTE_SRCS compute/key_map_internal_avx2.cc)
Expand Down
15 changes: 11 additions & 4 deletions cpp/src/arrow/compute/key_hash_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,12 @@ class ARROW_EXPORT Hashing32 {
static void HashMultiColumn(const std::vector<KeyColumnArray>& cols, LightContext* ctx,
uint32_t* out_hash);

// Clarify the max temp stack usage for HashBatch so the caller could reserve enough
// size in advance.
// Clarify the max temp stack usage for HashBatch, which might be necessary for the
// caller to be aware of at compile time to reserve enough stack size in advance. The
// HashBatch implementation uses one uint32 temp vector as a buffer for hash, one uint16
// temp vector as a buffer for null indices and one uint32 temp vector as a buffer for
// null hash, all are of size kMiniBatchLength. Plus extra kMiniBatchLength to cope with
// stack padding and aligning.
static constexpr auto kHashBatchTempStackUsage =
(sizeof(uint32_t) + sizeof(uint16_t) + sizeof(uint32_t) + /*extra=*/1) *
util::MiniBatch::kMiniBatchLength;
Expand Down Expand Up @@ -167,8 +171,11 @@ class ARROW_EXPORT Hashing64 {
static void HashMultiColumn(const std::vector<KeyColumnArray>& cols, LightContext* ctx,
uint64_t* hashes);

// Clarify the max temp stack usage for HashBatch so the caller could reserve enough
// size in advance.
// Clarify the max temp stack usage for HashBatch, which might be necessary for the
// caller to be aware of at compile time to reserve enough stack size in advance. The
// HashBatch implementation uses one uint16 temp vector as a buffer for null indices and
// one uint64 temp vector as a buffer for null hash, all are of size kMiniBatchLength.
// Plus extra kMiniBatchLength to cope with stack padding and aligning.
static constexpr auto kHashBatchTempStackUsage =
(sizeof(uint16_t) + sizeof(uint64_t) + /*extra=*/1) *
util::MiniBatch::kMiniBatchLength;
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/key_map_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <functional>

#include "arrow/compute/util.h"
#include "arrow/compute/util_internal.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/light_array_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "arrow/array.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/util.h"
#include "arrow/compute/util_internal.h"
#include "arrow/type.h"
#include "arrow/util/cpu_info.h"
#include "arrow/util/logging.h"
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/light_array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <gtest/gtest.h>
#include <numeric>

#include "arrow/memory_pool.h"
#include "arrow/testing/generator.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/type.h"
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/arrow/compute/row/compare_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ namespace compute {

class ARROW_EXPORT KeyCompare {
public:
// Clarify the max temp stack usage for CompareColumnsToRows so the caller could reserve
// enough size in advance.
// Clarify the max temp stack usage for CompareColumnsToRows, which might be necessary
// for the caller to be aware of (possibly at compile time) to reserve enough stack size
// in advance. The CompareColumnsToRows implementation uses three uint8 temp vectors as
// buffers for match vectors, all are of size num_rows. Plus extra kMiniBatchLength to
// cope with stack padding and aligning.
constexpr static int64_t CompareColumnsToRowsTempStackUsage(int64_t num_rows) {
return (sizeof(uint8_t) + sizeof(uint8_t) + sizeof(uint8_t)) * num_rows +
/*extra=*/util::MiniBatch::kMiniBatchLength;
Expand Down
34 changes: 0 additions & 34 deletions cpp/src/arrow/compute/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,7 @@

#include "arrow/compute/util.h"

#include "arrow/table.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/logging.h"
#include "arrow/util/tracing_internal.h"
#include "arrow/util/ubsan.h"

namespace arrow {
Expand All @@ -31,36 +27,6 @@ using internal::CpuInfo;

namespace util {

void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) {
int64_t estimated_alloc_size = EstimatedAllocationSize(num_bytes);
int64_t new_top = top_ + estimated_alloc_size;
// Stack overflow check (see GH-39582).
// XXX cannot return a regular Status because most consumers do not either.
ARROW_CHECK_LE(new_top, buffer_size_)
<< "TempVectorStack::alloc overflow: allocating " << estimated_alloc_size
<< " on top of " << top_ << " in stack of size " << buffer_size_;
*data = buffer_->mutable_data() + top_ + sizeof(uint64_t);
// We set 8 bytes before the beginning of the allocated range and
// 8 bytes after the end to check for stack overflow (which would
// result in those known bytes being corrupted).
reinterpret_cast<uint64_t*>(buffer_->mutable_data() + top_)[0] = kGuard1;
reinterpret_cast<uint64_t*>(buffer_->mutable_data() + new_top)[-1] = kGuard2;
*id = num_vectors_++;
top_ = new_top;
}

void TempVectorStack::release(int id, uint32_t num_bytes) {
ARROW_DCHECK(num_vectors_ == id + 1);
int64_t size = EstimatedAllocationSize(num_bytes);
ARROW_DCHECK(reinterpret_cast<const uint64_t*>(buffer_->mutable_data() + top_)[-1] ==
kGuard2);
ARROW_DCHECK(top_ >= size);
top_ -= size;
ARROW_DCHECK(reinterpret_cast<const uint64_t*>(buffer_->mutable_data() + top_)[0] ==
kGuard1);
--num_vectors_;
}

namespace bit_util {

inline uint64_t SafeLoadUpTo8Bytes(const uint8_t* bytes, int num_bytes) {
Expand Down
73 changes: 0 additions & 73 deletions cpp/src/arrow/compute/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,10 @@
#include <unordered_map>
#include <vector>

#include "arrow/buffer.h"
#include "arrow/compute/expression.h"
#include "arrow/compute/type_fwd.h"
#include "arrow/memory_pool.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/util/bit_util.h"
#include "arrow/util/cpu_info.h"
#include "arrow/util/mutex.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/type_fwd.h"

#if defined(__clang__) || defined(__GNUC__)
#define BYTESWAP(x) __builtin_bswap64(x)
Expand Down Expand Up @@ -77,72 +70,6 @@ class MiniBatch {
static constexpr int kMiniBatchLength = 1 << kLogMiniBatchLength;
};

/// Storage used to allocate temporary vectors of a batch size.
/// Temporary vectors should resemble allocating temporary variables on the stack
/// but in the context of vectorized processing where we need to store a vector of
/// temporaries instead of a single value.
class ARROW_EXPORT TempVectorStack {
template <typename>
friend class TempVectorHolder;

public:
Status Init(MemoryPool* pool, int64_t size) {
num_vectors_ = 0;
top_ = 0;
buffer_size_ = EstimatedAllocationSize(size);
ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool));
// Ensure later operations don't accidentally read uninitialized memory.
std::memset(buffer->mutable_data(), 0xFF, size);
buffer_ = std::move(buffer);
return Status::OK();
}

private:
static int64_t EstimatedAllocationSize(int64_t size) {
return PaddedAllocationSize(size) + 2 * sizeof(uint64_t);
}

static int64_t PaddedAllocationSize(int64_t num_bytes) {
// Round up allocation size to multiple of 8 bytes
// to avoid returning temp vectors with unaligned address.
//
// Also add padding at the end to facilitate loads and stores
// using SIMD when number of vector elements is not divisible
// by the number of SIMD lanes.
//
return ::arrow::bit_util::RoundUp(num_bytes, sizeof(int64_t)) + kPadding;
}
void alloc(uint32_t num_bytes, uint8_t** data, int* id);
void release(int id, uint32_t num_bytes);
static constexpr uint64_t kGuard1 = 0x3141592653589793ULL;
static constexpr uint64_t kGuard2 = 0x0577215664901532ULL;
static constexpr int64_t kPadding = 64;
int num_vectors_;
int64_t top_;
std::unique_ptr<Buffer> buffer_;
int64_t buffer_size_;
};

template <typename T>
class TempVectorHolder {
friend class TempVectorStack;

public:
~TempVectorHolder() { stack_->release(id_, num_elements_ * sizeof(T)); }
T* mutable_data() { return reinterpret_cast<T*>(data_); }
TempVectorHolder(TempVectorStack* stack, uint32_t num_elements) {
stack_ = stack;
num_elements_ = num_elements;
stack_->alloc(num_elements * sizeof(T), &data_, &id_);
}

private:
TempVectorStack* stack_;
uint8_t* data_;
int id_;
uint32_t num_elements_;
};

namespace bit_util {

ARROW_EXPORT void bits_to_indexes(int bit_to_search, int64_t hardware_flags,
Expand Down
79 changes: 79 additions & 0 deletions cpp/src/arrow/compute/util_internal.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/util_internal.h"

#include "arrow/compute/util.h"
#include "arrow/memory_pool.h"

namespace arrow {
namespace util {

Status TempVectorStack::Init(MemoryPool* pool, int64_t size) {
num_vectors_ = 0;
top_ = 0;
buffer_size_ = EstimatedAllocationSize(size);
ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(size, pool));
// Ensure later operations don't accidentally read uninitialized memory.
std::memset(buffer->mutable_data(), 0xFF, size);
buffer_ = std::move(buffer);
return Status::OK();
}

int64_t TempVectorStack::PaddedAllocationSize(int64_t num_bytes) {
// Round up allocation size to multiple of 8 bytes
// to avoid returning temp vectors with unaligned address.
//
// Also add padding at the end to facilitate loads and stores
// using SIMD when number of vector elements is not divisible
// by the number of SIMD lanes.
//
return ::arrow::bit_util::RoundUp(num_bytes, sizeof(int64_t)) + kPadding;
}

void TempVectorStack::alloc(uint32_t num_bytes, uint8_t** data, int* id) {
int64_t estimated_alloc_size = EstimatedAllocationSize(num_bytes);
int64_t new_top = top_ + estimated_alloc_size;
// Stack overflow check (see GH-39582).
// XXX cannot return a regular Status because most consumers do not either.
ARROW_CHECK_LE(new_top, buffer_size_)
<< "TempVectorStack::alloc overflow: allocating " << estimated_alloc_size
<< " on top of " << top_ << " in stack of size " << buffer_size_;
*data = buffer_->mutable_data() + top_ + sizeof(uint64_t);
// We set 8 bytes before the beginning of the allocated range and
// 8 bytes after the end to check for stack overflow (which would
// result in those known bytes being corrupted).
reinterpret_cast<uint64_t*>(buffer_->mutable_data() + top_)[0] = kGuard1;
reinterpret_cast<uint64_t*>(buffer_->mutable_data() + new_top)[-1] = kGuard2;
*id = num_vectors_++;
top_ = new_top;
}

void TempVectorStack::release(int id, uint32_t num_bytes) {
ARROW_DCHECK(num_vectors_ == id + 1);
int64_t size = EstimatedAllocationSize(num_bytes);
ARROW_DCHECK(reinterpret_cast<const uint64_t*>(buffer_->mutable_data() + top_)[-1] ==
kGuard2);
ARROW_DCHECK(top_ >= size);
top_ -= size;
ARROW_DCHECK(reinterpret_cast<const uint64_t*>(buffer_->mutable_data() + top_)[0] ==
kGuard1);
--num_vectors_;
}

} // namespace util
} // namespace arrow
51 changes: 51 additions & 0 deletions cpp/src/arrow/compute/util_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#pragma once

#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/logging.h"

namespace arrow {
Expand All @@ -27,5 +29,54 @@ void CheckAlignment(const void* ptr) {
ARROW_DCHECK(reinterpret_cast<uint64_t>(ptr) % sizeof(T) == 0);
}

/// Storage used to allocate temporary vectors of a batch size.
/// Temporary vectors should resemble allocating temporary variables on the stack
/// but in the context of vectorized processing where we need to store a vector of
/// temporaries instead of a single value.
class ARROW_EXPORT TempVectorStack {
template <typename>
friend class TempVectorHolder;

public:
Status Init(MemoryPool* pool, int64_t size);

private:
static int64_t EstimatedAllocationSize(int64_t size) {
return PaddedAllocationSize(size) + 2 * sizeof(uint64_t);
}

static int64_t PaddedAllocationSize(int64_t num_bytes);

void alloc(uint32_t num_bytes, uint8_t** data, int* id);
void release(int id, uint32_t num_bytes);
static constexpr uint64_t kGuard1 = 0x3141592653589793ULL;
static constexpr uint64_t kGuard2 = 0x0577215664901532ULL;
static constexpr int64_t kPadding = 64;
int num_vectors_;
int64_t top_;
std::unique_ptr<Buffer> buffer_;
int64_t buffer_size_;
};

template <typename T>
class TempVectorHolder {
friend class TempVectorStack;

public:
~TempVectorHolder() { stack_->release(id_, num_elements_ * sizeof(T)); }
T* mutable_data() { return reinterpret_cast<T*>(data_); }
TempVectorHolder(TempVectorStack* stack, uint32_t num_elements) {
stack_ = stack;
num_elements_ = num_elements;
stack_->alloc(num_elements * sizeof(T), &data_, &id_);
}

private:
TempVectorStack* stack_;
uint8_t* data_;
int id_;
uint32_t num_elements_;
};

} // namespace util
} // namespace arrow

0 comments on commit a3531bf

Please sign in to comment.