Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-855] allocate large block of memory for all reducer #881 #894

Merged
merged 20 commits into from
May 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 167 additions & 45 deletions native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,25 @@
#include <arrow/util/io_util.h>
//#include <gtest/gtest.h>
#include <benchmark/benchmark.h>
#include <execinfo.h>
#include <parquet/arrow/reader.h>
#include <parquet/file_reader.h>
#include <sched.h>
#include <shuffle/splitter.h>
#include <sys/mman.h>

#include <chrono>
void print_trace(void) {
char** strings;
size_t i, size;
enum Constexpr { MAX_SIZE = 1024 };
void* array[MAX_SIZE];
size = backtrace(array, MAX_SIZE);
strings = backtrace_symbols(array, size);
for (i = 0; i < size; i++) printf(" %s\n", strings[i]);
puts("");
free(strings);
}

#include "codegen/code_generator.h"
#include "codegen/code_generator_factory.h"
Expand All @@ -38,9 +51,112 @@
namespace sparkcolumnarplugin {
namespace shuffle {

#define ALIGNMENT 2048 * 1024

const int batch_buffer_size = 32768;
const int split_buffer_size = 8192;

class MyMemoryPool : public arrow::MemoryPool {
public:
explicit MyMemoryPool() {}

Status Allocate(int64_t size, uint8_t** out) override {
RETURN_NOT_OK(pool_->Allocate(size, out));
stats_.UpdateAllocatedBytes(size);
// std::cout << "Allocate: size = " << size << " addr = " << std::hex <<
// (uint64_t)*out << std::dec << std::endl; print_trace();
return arrow::Status::OK();
}

Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override {
auto old_ptr = *ptr;
RETURN_NOT_OK(pool_->Reallocate(old_size, new_size, ptr));
stats_.UpdateAllocatedBytes(new_size - old_size);
// std::cout << "Reallocate: old_size = " << old_size << " old_ptr = " << std::hex <<
// (uint64_t)old_ptr << std::dec << " new_size = " << new_size << " addr = " <<
// std::hex << (uint64_t)*ptr << std::dec << std::endl; print_trace();
return arrow::Status::OK();
}

void Free(uint8_t* buffer, int64_t size) override {
pool_->Free(buffer, size);
stats_.UpdateAllocatedBytes(-size);
// std::cout << "Free: size = " << size << " addr = " << std::hex << (uint64_t)buffer
// << std::dec << std::endl; print_trace();
}

int64_t bytes_allocated() const override { return stats_.bytes_allocated(); }

int64_t max_memory() const override { return pool_->max_memory(); }

std::string backend_name() const override { return pool_->backend_name(); }

private:
MemoryPool* pool_ = arrow::default_memory_pool();
arrow::internal::MemoryPoolStats stats_;
};

#define ENABLELARGEPAGE

class LargePageMemoryPool : public MemoryPool {
public:
explicit LargePageMemoryPool() {}

~LargePageMemoryPool() override = default;

Status Allocate(int64_t size, uint8_t** out) override {
#ifdef ENABLELARGEPAGE
if (size < 2 * 1024 * 1024) {
return pool_->Allocate(size, out);
} else {
Status st = pool_->AlignAllocate(size, out, ALIGNMENT);
madvise(*out, size, /*MADV_HUGEPAGE */ 14);
//std::cout << "Allocate: size = " << size << " addr = " \
// << std::hex << (uint64_t)*out << " end = " << std::hex << (uint64_t)(*out+size) << std::dec << std::endl;
return st;
}
#else
return pool_->Allocate(size, out);
#endif
}

Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override {
return pool_->Reallocate(old_size, new_size, ptr);
#ifdef ENABLELARGEPAGE
if (new_size < 2 * 1024 * 1024) {
return pool_->Reallocate(old_size, new_size, ptr);
} else {
Status st = pool_->AlignReallocate(old_size, new_size, ptr, ALIGNMENT);
// madvise(*ptr, new_size, /*MADV_HUGEPAGE */ 14);
return st;
}
#else
return pool_->Reallocate(old_size, new_size, ptr);
#endif
}

void Free(uint8_t* buffer, int64_t size) override {
#ifdef ENABLELARGEPAGE
if (size < 2 * 1024 * 1024) {
pool_->Free(buffer, size);
} else {
pool_->Free(buffer, size, ALIGNMENT);
}
#else
pool_->Free(buffer, size);
#endif
}

int64_t bytes_allocated() const override { return pool_->bytes_allocated(); }

int64_t max_memory() const override { return pool_->max_memory(); }

std::string backend_name() const override { return "LargePageMemoryPool"; }

private:
MemoryPool* pool_ = arrow::default_memory_pool();
};

class BenchmarkShuffleSplit {
public:
BenchmarkShuffleSplit(std::string file_name) { GetRecordBatchReader(file_name); }
Expand Down Expand Up @@ -89,6 +205,8 @@ class BenchmarkShuffleSplit {
SetCPU(state.thread_index());
arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1);

std::shared_ptr<arrow::MemoryPool> pool = std::make_shared<LargePageMemoryPool>();

const int num_partitions = state.range(0);

auto options = SplitOptions::Defaults();
Expand All @@ -98,6 +216,7 @@ class BenchmarkShuffleSplit {
options.offheap_per_task = 128 * 1024 * 1024 * 1024L;
options.prefer_spill = true;
options.write_schema = false;
options.memory_pool = pool.get();

std::shared_ptr<Splitter> splitter;
int64_t elapse_read = 0;
Expand Down Expand Up @@ -166,6 +285,7 @@ class BenchmarkShuffleSplit {
splitter->TotalWriteTime();
state.counters["split_time"] = benchmark::Counter(
split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000);
splitter.reset();
}

protected:
Expand Down Expand Up @@ -201,26 +321,27 @@ class BenchmarkShuffleSplit_CacheScan_Benchmark : public BenchmarkShuffleSplit {
const int num_partitions, SplitOptions options, benchmark::State& state) {
std::vector<int> local_column_indices;
local_column_indices.push_back(0);
local_column_indices.push_back(1);
local_column_indices.push_back(2);
local_column_indices.push_back(4);
local_column_indices.push_back(5);
local_column_indices.push_back(6);
local_column_indices.push_back(7);
/* local_column_indices.push_back(0);
local_column_indices.push_back(1);
local_column_indices.push_back(2);
local_column_indices.push_back(4);
local_column_indices.push_back(5);
local_column_indices.push_back(6);
local_column_indices.push_back(7);*/

std::shared_ptr<arrow::Schema> local_schema;
local_schema = std::make_shared<arrow::Schema>(*schema.get());

ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(15));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(14));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(13));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(12));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(11));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(10));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(9));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(8));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(3));

/* ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(15));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(14));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(13));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(12));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(11));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(10));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(9));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(8));
ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(3));
*/
if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl;

ARROW_ASSIGN_OR_THROW(splitter,
Expand Down Expand Up @@ -251,11 +372,13 @@ class BenchmarkShuffleSplit_CacheScan_Benchmark : public BenchmarkShuffleSplit {
std::cout << "batches = " << num_batches << " rows = " << num_rows << std::endl;

for (auto _ : state) {
for_each(
batches.begin(), batches.end(),
[&splitter, &split_time](std::shared_ptr<arrow::RecordBatch>& record_batch) {
TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch));
});
for_each(batches.begin(), batches.end(),
[&splitter, &split_time,
&options](std::shared_ptr<arrow::RecordBatch>& record_batch) {
TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch));
});
// std::cout << " split done memory allocated = " <<
// options.memory_pool->bytes_allocated() << std::endl;
}

TIME_NANO_OR_THROW(split_time, splitter->Stop());
Expand Down Expand Up @@ -374,31 +497,30 @@ int main(int argc, char** argv) {
->MeasureProcessCPUTime()
->Unit(benchmark::kSecond);

/* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark
bck(datafile);

benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck)
->Iterations(1)
->Args({96*2, arrow::Compression::FASTPFOR})
->Args({96*4, arrow::Compression::FASTPFOR})
->Args({96*8, arrow::Compression::FASTPFOR})
->Args({96*16, arrow::Compression::FASTPFOR})
->Args({96*32, arrow::Compression::FASTPFOR})
->Threads(24)
->Unit(benchmark::kSecond);

benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck)
->Iterations(1)
->Args({4096, arrow::Compression::FASTPFOR})
->Threads(1)
->Threads(2)
->Threads(4)
->Threads(8)
->Threads(16)
->Threads(24)
->Unit(benchmark::kSecond);
/* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark
bck(datafile);

benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck)
->Iterations(1)
->Args({96*2, arrow::Compression::FASTPFOR})
->Args({96*4, arrow::Compression::FASTPFOR})
->Args({96*8, arrow::Compression::FASTPFOR})
->Args({96*16, arrow::Compression::FASTPFOR})
->Args({96*32, arrow::Compression::FASTPFOR})
->Threads(24)
->Unit(benchmark::kSecond);

benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck)
->Iterations(1)
->Args({4096, arrow::Compression::FASTPFOR})
->Threads(1)
->Threads(2)
->Threads(4)
->Threads(8)
->Threads(16)
->Threads(24)
->Unit(benchmark::kSecond);
*/

benchmark::Initialize(&argc, argv);
benchmark::RunSpecifiedBenchmarks();
benchmark::Shutdown();
Expand Down
Loading