Skip to content

Commit

Permalink
Implement Reallocate function
Browse files Browse the repository at this point in the history
  • Loading branch information
xhochy committed Jan 6, 2017
1 parent a17b313 commit 10c6839
Show file tree
Hide file tree
Showing 10 changed files with 77 additions and 33 deletions.
6 changes: 2 additions & 4 deletions cpp/src/arrow/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,11 @@ Status PoolBuffer::Reserve(int64_t new_capacity) {
uint8_t* new_data;
new_capacity = BitUtil::RoundUpToMultipleOf64(new_capacity);
if (mutable_data_) {
RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data));
memcpy(new_data, mutable_data_, size_);
pool_->Free(mutable_data_, capacity_);
RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &mutable_data_));
} else {
RETURN_NOT_OK(pool_->Allocate(new_capacity, &new_data));
mutable_data_ = new_data;
}
mutable_data_ = new_data;
data_ = mutable_data_;
capacity_ = new_capacity;
}
Expand Down
27 changes: 16 additions & 11 deletions cpp/src/arrow/builder-benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,37 +25,42 @@ namespace arrow {

constexpr int64_t kFinalSize = 256;

static void BM_BuildPrimitiveArrayNoNulls(benchmark::State& state) { // NOLINT non-const reference
// 1 MiB block
static void BM_BuildPrimitiveArrayNoNulls(
benchmark::State& state) { // NOLINT non-const reference
// 2 MiB block
std::vector<int64_t> data(256 * 1024, 100);
while (state.KeepRunning()) {
Int64Builder builder(default_memory_pool(), arrow::int64());
for (int i = 0; i < kFinalSize; i++) {
// Build up an array of 256 MiB in size
// Build up an array of 512 MiB in size
builder.Append(data.data(), data.size(), nullptr);
}
std::shared_ptr<Array> out;
builder.Finish(&out);
}
state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t) * kFinalSize);
state.SetBytesProcessed(
state.iterations() * data.size() * sizeof(int64_t) * kFinalSize);
}

BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);;
BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);
;

static void BM_BuildVectorNoNulls(benchmark::State& state) { // NOLINT non-const reference
// 1 MiB block
static void BM_BuildVectorNoNulls(
benchmark::State& state) { // NOLINT non-const reference
// 2 MiB block
std::vector<int64_t> data(256 * 1024, 100);
while (state.KeepRunning()) {
std::vector<int64_t> builder;
for (int i = 0; i < kFinalSize; i++) {
// Build up an array of 256 MiB in size
// Build up an array of 512 MiB in size
builder.insert(builder.end(), data.cbegin(), data.cend());
}
}
state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t) * kFinalSize);
state.SetBytesProcessed(
state.iterations() * data.size() * sizeof(int64_t) * kFinalSize);
}

BENCHMARK(BM_BuildVectorNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);;
BENCHMARK(BM_BuildVectorNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);
;

} // namespace arrow

4 changes: 2 additions & 2 deletions cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ Status ReadableFileInterface::ReadAt(
}

Status Writeable::Write(const std::string& data) {
return Write(reinterpret_cast<const uint8_t*>(data.c_str()),
static_cast<int64_t>(data.size()));
return Write(
reinterpret_cast<const uint8_t*>(data.c_str()), static_cast<int64_t>(data.size()));
}

} // namespace io
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/io/io-file-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,9 @@ class MyMemoryPool : public MemoryPool {
}

void Free(uint8_t* buffer, int64_t size) override { std::free(buffer); }
Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override {
*ptr = reinterpret_cast<uint8_t*>(std::realloc(*ptr, new_size));
}

int64_t bytes_allocated() const override { return -1; }

Expand Down
14 changes: 8 additions & 6 deletions cpp/src/arrow/jemalloc/jemalloc-builder-benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,24 @@ namespace arrow {

constexpr int64_t kFinalSize = 256;

static void BM_BuildPrimitiveArrayNoNulls(benchmark::State& state) { // NOLINT non-const reference
// 1 MiB block
static void BM_BuildPrimitiveArrayNoNulls(
benchmark::State& state) { // NOLINT non-const reference
// 2 MiB block
std::vector<int64_t> data(256 * 1024, 100);
while (state.KeepRunning()) {
Int64Builder builder(jemalloc::MemoryPool::default_pool(), arrow::int64());
for (int i = 0; i < kFinalSize; i++) {
// Build up an array of 256 MiB in size
// Build up an array of 512 MiB in size
builder.Append(data.data(), data.size(), nullptr);
}
std::shared_ptr<Array> out;
builder.Finish(&out);
}
state.SetBytesProcessed(state.iterations() * data.size() * sizeof(int64_t) * kFinalSize);
state.SetBytesProcessed(
state.iterations() * data.size() * sizeof(int64_t) * kFinalSize);
}

BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);;
BENCHMARK(BM_BuildPrimitiveArrayNoNulls)->Repetitions(3)->Unit(benchmark::kMillisecond);
;

} // namespace arrow

1 change: 0 additions & 1 deletion cpp/src/arrow/jemalloc/jemalloc-memory_pool-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,3 @@ TEST(JemallocMemoryPool, OOM) {
} // namespace test
} // namespace jemalloc
} // namespace arrow

23 changes: 18 additions & 5 deletions cpp/src/arrow/jemalloc/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,32 @@ MemoryPool* MemoryPool::default_pool() {

MemoryPool::MemoryPool() : allocated_size_(0) {}

MemoryPool::~MemoryPool() {};
MemoryPool::~MemoryPool(){};

Status MemoryPool::Allocate(int64_t size, uint8_t** out) {
*out = reinterpret_cast<uint8_t*>(mallocx(size, MALLOCX_ALIGN(kAlignment)));
if (*out == NULL) {
std::stringstream ss;
ss << "malloc of size " << size << " failed";
return Status::OutOfMemory(ss.str());
}
}
allocated_size_ += size;
return Status::OK();
}

Status MemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
*ptr = reinterpret_cast<uint8_t*>(rallocx(*ptr, new_size, MALLOCX_ALIGN(kAlignment)));
if (*ptr == NULL) {
std::stringstream ss;
ss << "realloc of size " << new_size << " failed";
return Status::OutOfMemory(ss.str());
}

allocated_size_ += new_size - old_size;

return Status::OK();
}

void MemoryPool::Free(uint8_t* buffer, int64_t size) {
allocated_size_ -= size;
free(buffer);
Expand All @@ -57,5 +70,5 @@ int64_t MemoryPool::bytes_allocated() const {
return allocated_size_.load();
}

} // jemalloc
} // arrow
} // jemalloc
} // arrow
8 changes: 4 additions & 4 deletions cpp/src/arrow/jemalloc/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#ifndef ARROW_JEMALLOC_MEMORY_POOL_H
#define ARROW_JEMALLOC_MEMORY_POOL_H

#include "arrow/memory_pool.h"
#include "arrow/memory_pool.h"

#include <atomic>

Expand All @@ -32,14 +32,15 @@ namespace jemalloc {

class ARROW_EXPORT MemoryPool : public ::arrow::MemoryPool {
public:
static MemoryPool* default_pool();
static MemoryPool* default_pool();

MemoryPool(MemoryPool const&) = delete;
MemoryPool& operator=(MemoryPool const&) = delete;
MemoryPool& operator=(MemoryPool const&) = delete;

virtual ~MemoryPool();

Status Allocate(int64_t size, uint8_t** out) override;
Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;
void Free(uint8_t* buffer, int64_t size) override;

int64_t bytes_allocated() const override;
Expand All @@ -50,7 +51,6 @@ class ARROW_EXPORT MemoryPool : public ::arrow::MemoryPool {
std::atomic<int64_t> allocated_size_;
};


} // namespace jemalloc
} // namespace arrow

Expand Down
23 changes: 23 additions & 0 deletions cpp/src/arrow/memory_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class InternalMemoryPool : public MemoryPool {
virtual ~InternalMemoryPool();

Status Allocate(int64_t size, uint8_t** out) override;
Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;

void Free(uint8_t* buffer, int64_t size) override;

Expand All @@ -85,6 +86,28 @@ Status InternalMemoryPool::Allocate(int64_t size, uint8_t** out) {
return Status::OK();
}

Status InternalMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
std::lock_guard<std::mutex> guard(pool_lock_);

// Note: We cannot use realloc() here as it doesn't guarantee alignment.

// Allocate new chunk
uint8_t* out;
RETURN_NOT_OK(AllocateAligned(new_size, &out));
// Copy contents and release old memory chunk
memcpy(out, *ptr, std::min(new_size, old_size));
#ifdef _MSC_VER
_aligned_free(*ptr);
#else
std::free(*ptr);
#endif
*ptr = out;

bytes_allocated_ += new_size - old_size;

return Status::OK();
}

int64_t InternalMemoryPool::bytes_allocated() const {
std::lock_guard<std::mutex> guard(pool_lock_);
return bytes_allocated_;
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/memory_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class ARROW_EXPORT MemoryPool {
virtual ~MemoryPool();

virtual Status Allocate(int64_t size, uint8_t** out) = 0;
virtual Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) = 0;
virtual void Free(uint8_t* buffer, int64_t size) = 0;

virtual int64_t bytes_allocated() const = 0;
Expand Down

0 comments on commit 10c6839

Please sign in to comment.