diff --git a/cpp/src/arrow/buffer.cc b/cpp/src/arrow/buffer.cc index 6ffa03a0b5663..6d55f88af1e32 100644 --- a/cpp/src/arrow/buffer.cc +++ b/cpp/src/arrow/buffer.cc @@ -80,13 +80,11 @@ Status PoolBuffer::Reserve(int64_t new_capacity) { uint8_t* new_data; new_capacity = BitUtil::RoundUpToMultipleOf64(new_capacity); if (mutable_data_) { - RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data)); - memcpy(new_data, mutable_data_, size_); - pool_->Free(mutable_data_, capacity_); + RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &mutable_data_)); } else { RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data)); + mutable_data_ = new_data; } - mutable_data_ = new_data; data_ = mutable_data_; capacity_ = new_capacity; } diff --git a/cpp/src/arrow/builder-benchmark.cc b/cpp/src/arrow/builder-benchmark.cc index d08981ffd7e4b..2ea3769c70d3f 100644 --- a/cpp/src/arrow/builder-benchmark.cc +++ b/cpp/src/arrow/builder-benchmark.cc @@ -25,37 +25,42 @@ namespace arrow { constexpr int64_t kFinalSize = 256; -static void BM_BuildPrimitiveArrayNoNulls(benchmark::State& state) { // NOLINT non-const reference - // 1 MiB block +static void BM_BuildPrimitiveArrayNoNulls( + benchmark::State& state) { // NOLINT non-const reference + // 2 MiB block std::vector data(256 * 1024, 100); while (state.KeepRunning()) { Int64Builder builder(default_memory_pool(), arrow::int64()); for (int i = 0; i < kFinalSize; i++) { - // Build up an array of 256 MiB in size + // Build up an array of 512 MiB in size builder.Append(data.data(), data.size(), nullptr); } std::shared_ptr out; builder.Finish(&out); } - state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t) * kFinalSize); + state.SetBytesProcessed( + state.iterations() * data.size() * sizeof(int64_t) * kFinalSize); } -BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);; +BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond); +; -static void BM_BuildVectorNoNulls(benchmark::State& state) { // NOLINT non-const reference - // 1 MiB block +static void BM_BuildVectorNoNulls( + benchmark::State& state) { // NOLINT non-const reference + // 2 MiB block std::vector data(256 * 1024, 100); while (state.KeepRunning()) { std::vector builder; for (int i = 0; i < kFinalSize; i++) { - // Build up an array of 256 MiB in size + // Build up an array of 512 MiB in size builder.insert(builder.end(), data.cbegin(), data.cend()); } } - state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t) * kFinalSize); + state.SetBytesProcessed( + state.iterations() * data.size() * sizeof(int64_t) * kFinalSize); } -BENCHMARK(BM_BuildVectorNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);; +BENCHMARK(BM_BuildVectorNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond); +; } // namespace arrow - diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index 23bef2853b206..8040f93836cdc 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -45,8 +45,8 @@ Status ReadableFileInterface::ReadAt( } Status Writeable::Write(const std::string& data) { - return Write(reinterpret_cast(data.c_str()), - static_cast(data.size())); + return Write( + reinterpret_cast(data.c_str()), static_cast(data.size())); } } // namespace io diff --git a/cpp/src/arrow/io/io-file-test.cc b/cpp/src/arrow/io/io-file-test.cc index 5f5d639fab0d8..0b54c90e3efea 100644 --- a/cpp/src/arrow/io/io-file-test.cc +++ b/cpp/src/arrow/io/io-file-test.cc @@ -291,6 +291,9 @@ class MyMemoryPool : public MemoryPool { } void Free(uint8_t* buffer, int64_t size) override { std::free(buffer); } + Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override { + *ptr = reinterpret_cast(std::realloc(*ptr, new_size)); + } int64_t bytes_allocated() const override { return -1; } diff --git a/cpp/src/arrow/jemalloc/jemalloc-builder-benchmark.cc b/cpp/src/arrow/jemalloc/jemalloc-builder-benchmark.cc index 7e9c278a92be1..8f5f1af9589bf 100644 --- a/cpp/src/arrow/jemalloc/jemalloc-builder-benchmark.cc +++ b/cpp/src/arrow/jemalloc/jemalloc-builder-benchmark.cc @@ -25,22 +25,24 @@ namespace arrow { constexpr int64_t kFinalSize = 256; -static void BM_BuildPrimitiveArrayNoNulls(benchmark::State& state) { // NOLINT non-const reference - // 1 MiB block +static void BM_BuildPrimitiveArrayNoNulls( + benchmark::State& state) { // NOLINT non-const reference + // 2 MiB block std::vector data(256 * 1024, 100); while (state.KeepRunning()) { Int64Builder builder(jemalloc::MemoryPool::default_pool(), arrow::int64()); for (int i = 0; i < kFinalSize; i++) { - // Build up an array of 256 MiB in size + // Build up an array of 512 MiB in size builder.Append(data.data(), data.size(), nullptr); } std::shared_ptr out; builder.Finish(&out); } - state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t) * kFinalSize); + state.SetBytesProcessed( + state.iterations() * data.size() * sizeof(int64_t) * kFinalSize); } -BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);; +BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond); +; } // namespace arrow - diff --git a/cpp/src/arrow/jemalloc/jemalloc-memory_pool-test.cc b/cpp/src/arrow/jemalloc/jemalloc-memory_pool-test.cc index 4b54eee540a9f..f55e0b4298bb6 100644 --- a/cpp/src/arrow/jemalloc/jemalloc-memory_pool-test.cc +++ b/cpp/src/arrow/jemalloc/jemalloc-memory_pool-test.cc @@ -51,4 +51,3 @@ TEST(JemallocMemoryPool, OOM) { } // namespace test } // namespace jemalloc } // namespace arrow - diff --git a/cpp/src/arrow/jemalloc/memory_pool.cc b/cpp/src/arrow/jemalloc/memory_pool.cc index ed084d03d4f3e..d7dd13e3afc37 100644 --- a/cpp/src/arrow/jemalloc/memory_pool.cc +++ b/cpp/src/arrow/jemalloc/memory_pool.cc @@ -35,19 +35,32 @@ MemoryPool* MemoryPool::default_pool() { MemoryPool::MemoryPool() : allocated_size_(0) {} -MemoryPool::~MemoryPool() {}; - +MemoryPool::~MemoryPool(){}; + Status MemoryPool::Allocate(int64_t size, uint8_t** out) { *out = reinterpret_cast(mallocx(size, MALLOCX_ALIGN(kAlignment))); if (*out == NULL) { std::stringstream ss; ss << "malloc of size " << size << " failed"; return Status::OutOfMemory(ss.str()); - } + } allocated_size_ += size; return Status::OK(); } +Status MemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) { + *ptr = reinterpret_cast(rallocx(*ptr, new_size, MALLOCX_ALIGN(kAlignment))); + if (*ptr == NULL) { + std::stringstream ss; + ss << "realloc of size " << new_size << " failed"; + return Status::OutOfMemory(ss.str()); + } + + allocated_size_ += new_size - old_size; + + return Status::OK(); +} + void MemoryPool::Free(uint8_t* buffer, int64_t size) { allocated_size_ -= size; free(buffer); @@ -57,5 +70,5 @@ int64_t MemoryPool::bytes_allocated() const { return allocated_size_.load(); } -} // jemalloc -} // arrow +} // jemalloc +} // arrow diff --git a/cpp/src/arrow/jemalloc/memory_pool.h b/cpp/src/arrow/jemalloc/memory_pool.h index 860ceb1df2e7b..0d32b4658e3e8 100644 --- a/cpp/src/arrow/jemalloc/memory_pool.h +++ b/cpp/src/arrow/jemalloc/memory_pool.h @@ -20,7 +20,7 @@ #ifndef ARROW_JEMALLOC_MEMORY_POOL_H #define ARROW_JEMALLOC_MEMORY_POOL_H -#include "arrow/memory_pool.h" +#include "arrow/memory_pool.h" #include @@ -32,14 +32,15 @@ namespace jemalloc { class ARROW_EXPORT MemoryPool : public ::arrow::MemoryPool { public: - static MemoryPool* default_pool(); + static MemoryPool* default_pool(); MemoryPool(MemoryPool const&) = delete; - MemoryPool& operator=(MemoryPool const&) = delete; + MemoryPool& operator=(MemoryPool const&) = delete; virtual ~MemoryPool(); Status Allocate(int64_t size, uint8_t** out) override; + Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override; void Free(uint8_t* buffer, int64_t size) override; int64_t bytes_allocated() const override; @@ -50,7 +51,6 @@ class ARROW_EXPORT MemoryPool : public ::arrow::MemoryPool { std::atomic allocated_size_; }; - } // namespace jemalloc } // namespace arrow diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index f55b1ac668c7c..48bcb606e325a 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -67,6 +67,7 @@ class InternalMemoryPool : public MemoryPool { virtual ~InternalMemoryPool(); Status Allocate(int64_t size, uint8_t** out) override; + Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override; void Free(uint8_t* buffer, int64_t size) override; @@ -85,6 +86,28 @@ Status InternalMemoryPool::Allocate(int64_t size, uint8_t** out) { return Status::OK(); } +Status InternalMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) { + std::lock_guard guard(pool_lock_); + + // Note: We cannot use realloc() here as it doesn't guarantee alignment. + + // Allocate new chunk + uint8_t* out; + RETURN_NOT_OK(AllocateAligned(new_size, &out)); + // Copy contents and release old memory chunk + memcpy(out, *ptr, std::min(new_size, old_size)); +#ifdef _MSC_VER + _aligned_free(*ptr); +#else + std::free(*ptr); +#endif + *ptr = out; + + bytes_allocated_ += new_size - old_size; + + return Status::OK(); +} + int64_t InternalMemoryPool::bytes_allocated() const { std::lock_guard guard(pool_lock_); return bytes_allocated_; diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index 4c1d699addd50..13a3f129c1a9e 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -31,6 +31,7 @@ class ARROW_EXPORT MemoryPool { virtual ~MemoryPool(); virtual Status Allocate(int64_t size, uint8_t** out) = 0; + virtual Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) = 0; virtual void Free(uint8_t* buffer, int64_t size) = 0; virtual int64_t bytes_allocated() const = 0;