diff --git a/native-sql-engine/cpp/CMakeLists.txt b/native-sql-engine/cpp/CMakeLists.txt index fe7e989ee..a1301fd1d 100644 --- a/native-sql-engine/cpp/CMakeLists.txt +++ b/native-sql-engine/cpp/CMakeLists.txt @@ -1,9 +1,6 @@ cmake_minimum_required(VERSION 3.16) project(spark_columnar_plugin) -#add_definitions(-DSKIPWRITE -DSKIPCOMPRESS -DPROCESSROW) -add_definitions(-DPROCESSROW) - #add_compile_options(-g) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) diff --git a/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc b/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc index d2bffe36a..cd81ef877 100644 --- a/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc +++ b/native-sql-engine/cpp/src/benchmarks/shuffle_split_benchmark.cc @@ -41,10 +41,24 @@ namespace shuffle { const int batch_buffer_size = 32768; const int split_buffer_size = 8192; -class BenchmarkShuffleSplit { +class BenchmarkShuffleSplit : public ::benchmark::Fixture { public: - BenchmarkShuffleSplit(std::string file_name) { GetRecordBatchReader(file_name); } + BenchmarkShuffleSplit() { + file_name = + "/mnt/DP_disk1/lineitem/" + "part-00025-356249a2-c285-42b9-8a18-5b10be61e0c4-c000.snappy.parquet"; + GetRecordBatchReader(file_name); + std::cout << schema->ToString() << std::endl; + const auto& fields = schema->fields(); + for (const auto& field : fields) { + if (field->name() == "l_orderkey") { + auto node = gandiva::TreeExprBuilder::MakeField(field); + expr_vector.push_back(gandiva::TreeExprBuilder::MakeExpression( + std::move(node), arrow::field("res_" + field->name(), field->type()))); + } + } + } void GetRecordBatchReader(const std::string& input_file) { std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; std::shared_ptr record_batch_reader; @@ -75,98 +89,11 @@ class BenchmarkShuffleSplit { for (int i = 0; i < num_columns; ++i) { column_indices.push_back(i); } - const auto& fields = schema->fields(); - for (const auto& field : fields) { - if (field->name() == "l_orderkey") { - auto node = gandiva::TreeExprBuilder::MakeField(field); - expr_vector.push_back(gandiva::TreeExprBuilder::MakeExpression( - std::move(node), arrow::field("res_" + field->name(), field->type()))); - } - } } - void operator()(benchmark::State& state) { - SetCPU(state.thread_index()); - arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1); - - const int num_partitions = state.range(0); - - auto options = SplitOptions::Defaults(); - options.compression_type = compression_type; - options.buffer_size = split_buffer_size; - options.buffered_write = true; - options.offheap_per_task = 128 * 1024 * 1024 * 1024L; - options.prefer_spill = true; - options.write_schema = false; - - std::shared_ptr splitter; - int64_t elapse_read = 0; - int64_t num_batches = 0; - int64_t num_rows = 0; - int64_t split_time = 0; - - Do_Split(splitter, elapse_read, num_batches, num_rows, split_time, num_partitions, - options, state); - - auto fs = std::make_shared(); - fs->DeleteFile(splitter->DataFile()); - - state.SetBytesProcessed(int64_t(splitter->RawPartitionBytes())); - - state.counters["rowgroups"] = - benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["columns"] = - benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["batches"] = benchmark::Counter( - num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_rows"] = benchmark::Counter( - num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["num_partitions"] = - benchmark::Counter(num_partitions, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["batch_buffer_size"] = - benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["split_buffer_size"] = - benchmark::Counter(split_buffer_size, benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_written"] = - benchmark::Counter(splitter->TotalBytesWritten(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_raw"] = - benchmark::Counter(splitter->RawPartitionBytes(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - state.counters["bytes_spilled"] = - benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1024); - - state.counters["parquet_parse"] = benchmark::Counter( - elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - state.counters["compute_pid_time"] = benchmark::Counter( - splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["write_time"] = - benchmark::Counter(splitter->TotalWriteTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["spill_time"] = - benchmark::Counter(splitter->TotalSpillTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - state.counters["compress_time"] = - benchmark::Counter(splitter->TotalCompressTime(), benchmark::Counter::kAvgThreads, - benchmark::Counter::OneK::kIs1000); - - split_time = split_time - splitter->TotalSpillTime() - - splitter->TotalComputePidTime() - splitter->TotalCompressTime() - - splitter->TotalWriteTime(); - state.counters["split_time"] = benchmark::Counter( - split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); - } + void SetUp(const ::benchmark::State& state) {} + + void TearDown(const ::benchmark::State& state) {} protected: long SetCPU(uint32_t cpuindex) { @@ -175,9 +102,8 @@ class BenchmarkShuffleSplit { CPU_SET(cpuindex, &cs); return sched_setaffinity(0, sizeof(cs), &cs); } - virtual void Do_Split(std::shared_ptr& splitter, int64_t& elapse_read, + virtual void Do_Split(const std::shared_ptr& splitter, int64_t& elapse_read, int64_t& num_batches, int64_t& num_rows, int64_t& split_time, - const int num_partitions, SplitOptions options, benchmark::State& state) {} protected: @@ -190,120 +116,232 @@ class BenchmarkShuffleSplit { parquet::ArrowReaderProperties properties; }; -class BenchmarkShuffleSplit_CacheScan_Benchmark : public BenchmarkShuffleSplit { - public: - BenchmarkShuffleSplit_CacheScan_Benchmark(std::string filename) - : BenchmarkShuffleSplit(filename) {} +BENCHMARK_DEFINE_F(BenchmarkShuffleSplit, CacheScan)(benchmark::State& state) { + SetCPU(state.thread_index()); - protected: - void Do_Split(std::shared_ptr& splitter, int64_t& elapse_read, - int64_t& num_batches, int64_t& num_rows, int64_t& split_time, - const int num_partitions, SplitOptions options, benchmark::State& state) { - std::vector local_column_indices; - local_column_indices.push_back(0); - local_column_indices.push_back(1); - local_column_indices.push_back(2); - local_column_indices.push_back(4); - local_column_indices.push_back(5); - local_column_indices.push_back(6); - local_column_indices.push_back(7); - - std::shared_ptr local_schema; - local_schema = std::make_shared(*schema.get()); - - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(15)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(14)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(13)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(12)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(11)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(10)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(9)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(8)); - ARROW_ASSIGN_OR_THROW(local_schema, local_schema->RemoveField(3)); - - if (state.thread_index() == 0) std::cout << local_schema->ToString() << std::endl; - - ARROW_ASSIGN_OR_THROW(splitter, - Splitter::Make("rr", local_schema, num_partitions, options)); - - std::shared_ptr record_batch; + arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1); - std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; - std::shared_ptr record_batch_reader; - ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), - properties, &parquet_reader)); + const int num_partitions = state.range(0); - std::vector> batches; - ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( - row_group_indices, local_column_indices, &record_batch_reader)); - do { - TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + auto options = SplitOptions::Defaults(); + options.compression_type = compression_type; + options.buffer_size = split_buffer_size; + options.buffered_write = true; + options.offheap_per_task = 128 * 1024 * 1024 * 1024L; + options.prefer_spill = true; + options.write_schema = false; - if (record_batch) { - batches.push_back(record_batch); - num_batches += 1; - num_rows += record_batch->num_rows(); - } - } while (record_batch); - std::cout << "parquet parse done elapsed time " << elapse_read / 1000000 << " ms " - << std::endl; - std::cout << "batches = " << num_batches << " rows = " << num_rows << std::endl; - - for (auto _ : state) { - for_each( - batches.begin(), batches.end(), - [&splitter, &split_time](std::shared_ptr& record_batch) { - TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); - }); + std::shared_ptr splitter; + + if (!expr_vector.empty()) { + ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, + expr_vector, std::move(options))); + } else { + ARROW_ASSIGN_OR_THROW( + splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); + } + + std::shared_ptr record_batch; + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t split_time = 0; + + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), properties, + &parquet_reader)); + + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, column_indices, + &record_batch_reader)); + do { + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + + if (record_batch) { + batches.push_back(record_batch); + num_batches += 1; + num_rows += record_batch->num_rows(); } + } while (record_batch); - TIME_NANO_OR_THROW(split_time, splitter->Stop()); + for (auto _ : state) { + for_each(batches.begin(), batches.end(), + [&splitter, &split_time](std::shared_ptr& record_batch) { + TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); + }); } -}; -class BenchmarkShuffleSplit_IterateScan_Benchmark : public BenchmarkShuffleSplit { - public: - BenchmarkShuffleSplit_IterateScan_Benchmark(std::string filename) - : BenchmarkShuffleSplit(filename) {} + TIME_NANO_OR_THROW(split_time, splitter->Stop()); + + auto fs = std::make_shared(); + fs->DeleteFile(splitter->DataFile()); + + state.SetBytesProcessed(int64_t(splitter->RawPartitionBytes())); + + state.counters["rowgroups"] = + benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["columns"] = + benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batches"] = benchmark::Counter( + num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_rows"] = benchmark::Counter( + num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_partitions"] = benchmark::Counter( + num_partitions, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["batch_buffer_size"] = + benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["split_buffer_size"] = + benchmark::Counter(split_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["bytes_spilled"] = + benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["bytes_written"] = + benchmark::Counter(splitter->TotalBytesWritten(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["bytes_raw"] = + benchmark::Counter(splitter->RawPartitionBytes(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["bytes_spilled"] = + benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["parquet_parse"] = benchmark::Counter( + elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["compute_pid_time"] = + benchmark::Counter(splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["write_time"] = + benchmark::Counter(splitter->TotalWriteTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["spill_time"] = + benchmark::Counter(splitter->TotalSpillTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["compress_time"] = + benchmark::Counter(splitter->TotalCompressTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + + split_time = split_time - splitter->TotalSpillTime() - splitter->TotalComputePidTime() - + splitter->TotalCompressTime() - splitter->TotalWriteTime(); + state.counters["split_time"] = benchmark::Counter( + split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); +} + +BENCHMARK_DEFINE_F(BenchmarkShuffleSplit, IterateScan)(benchmark::State& state) { + SetCPU(state.thread_index()); + + arrow::Compression::type compression_type = (arrow::Compression::type)state.range(1); + + const int num_partitions = state.range(0); + + auto options = SplitOptions::Defaults(); + options.compression_type = compression_type; + options.buffer_size = split_buffer_size; + options.buffered_write = true; + options.offheap_per_task = 128 * 1024 * 1024 * 1024L; + options.prefer_spill = true; + options.write_schema = false; + + std::shared_ptr splitter; + + if (!expr_vector.empty()) { + ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, + expr_vector, std::move(options))); + } else { + ARROW_ASSIGN_OR_THROW( + splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); + } - protected: - void Do_Split(std::shared_ptr& splitter, int64_t& elapse_read, - int64_t& num_batches, int64_t& num_rows, int64_t& split_time, - const int num_partitions, SplitOptions options, benchmark::State& state) { - if (state.thread_index() == 0) std::cout << schema->ToString() << std::endl; - - if (!expr_vector.empty()) { - ARROW_ASSIGN_OR_THROW(splitter, Splitter::Make("hash", schema, num_partitions, - expr_vector, std::move(options))); - } else { - ARROW_ASSIGN_OR_THROW( - splitter, Splitter::Make("rr", schema, num_partitions, std::move(options))); - } + int64_t elapse_read = 0; + int64_t num_batches = 0; + int64_t num_rows = 0; + int64_t split_time = 0; - std::shared_ptr record_batch; + std::shared_ptr record_batch; - std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; - std::shared_ptr record_batch_reader; - ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( - arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), - properties, &parquet_reader)); + std::unique_ptr<::parquet::arrow::FileReader> parquet_reader; + std::shared_ptr record_batch_reader; + ASSERT_NOT_OK(::parquet::arrow::FileReader::Make( + arrow::default_memory_pool(), ::parquet::ParquetFileReader::Open(file), properties, + &parquet_reader)); - for (auto _ : state) { - std::vector> batches; - ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader( - row_group_indices, column_indices, &record_batch_reader)); + for (auto _ : state) { + std::vector> batches; + ASSERT_NOT_OK(parquet_reader->GetRecordBatchReader(row_group_indices, column_indices, + &record_batch_reader)); + TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); + while (record_batch) { + num_batches += 1; + num_rows += record_batch->num_rows(); + TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); - while (record_batch) { - num_batches += 1; - num_rows += record_batch->num_rows(); - TIME_NANO_OR_THROW(split_time, splitter->Split(*record_batch)); - TIME_NANO_OR_THROW(elapse_read, record_batch_reader->ReadNext(&record_batch)); - } } - TIME_NANO_OR_THROW(split_time, splitter->Stop()); } -}; + TIME_NANO_OR_THROW(split_time, splitter->Stop()); + + auto fs = std::make_shared(); + fs->DeleteFile(splitter->DataFile()); + + state.SetBytesProcessed(int64_t(splitter->RawPartitionBytes())); + + state.counters["rowgroups"] = + benchmark::Counter(row_group_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["columns"] = + benchmark::Counter(column_indices.size(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["batches"] = benchmark::Counter( + num_batches, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_rows"] = benchmark::Counter( + num_rows, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["num_partitions"] = benchmark::Counter( + num_partitions, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["batch_buffer_size"] = + benchmark::Counter(batch_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["split_buffer_size"] = + benchmark::Counter(split_buffer_size, benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["bytes_spilled"] = + benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["bytes_written"] = + benchmark::Counter(splitter->TotalBytesWritten(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["bytes_raw"] = + benchmark::Counter(splitter->RawPartitionBytes(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + state.counters["bytes_spilled"] = + benchmark::Counter(splitter->TotalBytesSpilled(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1024); + + state.counters["parquet_parse"] = benchmark::Counter( + elapse_read, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); + state.counters["compute_pid_time"] = + benchmark::Counter(splitter->TotalComputePidTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["write_time"] = + benchmark::Counter(splitter->TotalWriteTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["spill_time"] = + benchmark::Counter(splitter->TotalSpillTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + state.counters["compress_time"] = + benchmark::Counter(splitter->TotalCompressTime(), benchmark::Counter::kAvgThreads, + benchmark::Counter::OneK::kIs1000); + + split_time = split_time - splitter->TotalSpillTime() - splitter->TotalComputePidTime() - + splitter->TotalCompressTime() - splitter->TotalWriteTime(); + state.counters["split_time"] = benchmark::Counter( + split_time, benchmark::Counter::kAvgThreads, benchmark::Counter::OneK::kIs1000); +} /*BENCHMARK_REGISTER_F(BenchmarkShuffleSplit, CacheScan)->Iterations(1) ->Args({96*2, arrow::Compression::FASTPFOR}) @@ -332,74 +370,14 @@ class BenchmarkShuffleSplit_IterateScan_Benchmark : public BenchmarkShuffleSplit ->Threads(16) ->Threads(24) ->Unit(benchmark::kSecond);*/ -/*BENCHMARK_REGISTER_F(BenchmarkShuffleSplit, CacheScan) - ->Iterations(1000000) - ->Args({512, arrow::Compression::FASTPFOR}) - ->Threads(1) +BENCHMARK_REGISTER_F(BenchmarkShuffleSplit, IterateScan) + ->Iterations(1) + ->Args({96 * 16, arrow::Compression::FASTPFOR}) + ->Threads(24) ->ReportAggregatesOnly(false) ->MeasureProcessCPUTime() - ->Unit(benchmark::kSecond);*/ + ->Unit(benchmark::kSecond); } // namespace shuffle } // namespace sparkcolumnarplugin -int main(int argc, char** argv) { - uint32_t iterations = 1; - uint32_t partitions = 512; - uint32_t threads = 1; - std::string datafile; - - for (int i = 0; i < argc; i++) { - if (strcmp(argv[i], "--iterations") == 0) { - iterations = atol(argv[i + 1]); - } else if (strcmp(argv[i], "--partitions") == 0) { - partitions = atol(argv[i + 1]); - } else if (strcmp(argv[i], "--threads") == 0) { - threads = atol(argv[i + 1]); - } else if (strcmp(argv[i], "--file") == 0) { - datafile = argv[i + 1]; - } - } - std::cout << "iterations = " << iterations << std::endl; - std::cout << "partitions = " << partitions << std::endl; - std::cout << "threads = " << threads << std::endl; - std::cout << "datafile = " << datafile << std::endl; - - sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_CacheScan_Benchmark bck(datafile); - - benchmark::RegisterBenchmark("BenchmarkShuffleSplit::CacheScan", bck) - ->Iterations(iterations) - ->Args({partitions, arrow::Compression::FASTPFOR}) - ->Threads(threads) - ->ReportAggregatesOnly(false) - ->MeasureProcessCPUTime() - ->Unit(benchmark::kSecond); - - /* sparkcolumnarplugin::shuffle::BenchmarkShuffleSplit_IterateScan_Benchmark - bck(datafile); - - benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) - ->Iterations(1) - ->Args({96*2, arrow::Compression::FASTPFOR}) - ->Args({96*4, arrow::Compression::FASTPFOR}) - ->Args({96*8, arrow::Compression::FASTPFOR}) - ->Args({96*16, arrow::Compression::FASTPFOR}) - ->Args({96*32, arrow::Compression::FASTPFOR}) - ->Threads(24) - ->Unit(benchmark::kSecond); - - benchmark::RegisterBenchmark("BenchmarkShuffleSplit::IterateScan", bck) - ->Iterations(1) - ->Args({4096, arrow::Compression::FASTPFOR}) - ->Threads(1) - ->Threads(2) - ->Threads(4) - ->Threads(8) - ->Threads(16) - ->Threads(24) - ->Unit(benchmark::kSecond); - */ - - benchmark::Initialize(&argc, argv); - benchmark::RunSpecifiedBenchmarks(); - benchmark::Shutdown(); -} \ No newline at end of file +BENCHMARK_MAIN(); diff --git a/native-sql-engine/cpp/src/shuffle/splitter.cc b/native-sql-engine/cpp/src/shuffle/splitter.cc index 7839c4ce4..e739bd04f 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.cc +++ b/native-sql-engine/cpp/src/shuffle/splitter.cc @@ -25,47 +25,23 @@ #include #include #include -#include -#include #include -#include -#include #include #include "shuffle/utils.h" #include "utils/macros.h" -/*#if defined(COLUMNAR_PLUGIN_USE_AVX512) +#if defined(COLUMNAR_PLUGIN_USE_AVX512) #include #else #include #endif -*/ namespace sparkcolumnarplugin { namespace shuffle { using arrow::internal::checked_cast; -template -std::string __m128i_toString(const __m128i var) { - std::stringstream sstr; - T values[16 / sizeof(T)]; - std::memcpy(values, &var, sizeof(values)); // See discussion below - if (sizeof(T) == 1) { - for (unsigned int i = 0; i < sizeof(__m128i); i++) { // C++11: Range for also - // possible - sstr << std::hex << (int)values[i] << " " << std::dec; - } - } else { - for (unsigned int i = 0; i < sizeof(__m128i) / sizeof(T); - i++) { // C++11: Range for also possible - sstr << std::hex << values[i] << " " << std::dec; - } - } - return sstr.str(); -} - SplitOptions SplitOptions::Defaults() { return SplitOptions(); } #if defined(COLUMNAR_PLUGIN_USE_AVX512) inline __m256i CountPartitionIdOccurrence(const std::vector& partition_id, @@ -317,7 +293,6 @@ arrow::Status Splitter::Init() { partition_cached_recordbatch_size_.resize(num_partitions_); partition_lengths_.resize(num_partitions_); raw_partition_lengths_.resize(num_partitions_); - reducer_offset_offset_.resize(num_partitions_ + 1); for (int i = 0; i < column_type_id_.size(); ++i) { switch (column_type_id_[i]->id()) { @@ -840,26 +815,6 @@ arrow::Result Splitter::SpillLargestPartition(int64_t* size) { } arrow::Status Splitter::DoSplit(const arrow::RecordBatch& rb) { -#ifdef PROCESSROW - - reducer_offsets_.resize(rb.num_rows()); - - reducer_offset_offset_[0] = 0; - for (auto pid = 1; pid <= num_partitions_; pid++) { - reducer_offset_offset_[pid] = - reducer_offset_offset_[pid - 1] + partition_id_cnt_[pid - 1]; - } - for (auto row = 0; row < rb.num_rows(); row++) { - auto pid = partition_id_[row]; - reducer_offsets_[reducer_offset_offset_[pid]] = row; - _mm_prefetch(reducer_offsets_.data() + reducer_offset_offset_[pid] + 32, _MM_HINT_T0); - reducer_offset_offset_[pid]++; - } - std::transform(reducer_offset_offset_.begin(), std::prev(reducer_offset_offset_.end()), - partition_id_cnt_.begin(), reducer_offset_offset_.begin(), - [](uint16_t x, int16_t y) { return x - y; }); - -#endif // for the first input record batch, scan binary arrays and large binary // arrays to get their empirical sizes @@ -967,27 +922,6 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) auto src_addr = const_cast(rb.column_data(col_idx)->buffers[1]->data()); switch (arrow::bit_width(column_type_id_[col_idx]->id())) { -#ifdef PROCESSROW -// assume batch size = 32k; reducer# = 4K; row/reducer = 8 -#define PROCESS(_CTYPE) \ - std::transform(partition_buffer_idx_offset_.begin(), \ - partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), \ - partition_buffer_idx_offset_.begin(), \ - [](uint8_t* x, int16_t y) { return x + y * sizeof(_CTYPE); }); \ - for (auto pid = 0; pid < num_partitions_; pid++) { \ - auto dst_pid_base = \ - reinterpret_cast<_CTYPE*>(partition_buffer_idx_offset_[pid]); /*32k*/ \ - auto r = reducer_offset_offset_[pid]; /*8k*/ \ - auto size = reducer_offset_offset_[pid + 1]; \ - for (r; r < size; r++) { \ - auto src_offset = reducer_offsets_[r]; /*16k*/ \ - *dst_pid_base = reinterpret_cast<_CTYPE*>(src_addr)[src_offset]; /*64k*/ \ - _mm_prefetch(&(src_addr)[src_offset * sizeof(_CTYPE) + 64], _MM_HINT_T2); \ - dst_pid_base += 1; \ - } \ - } \ - break; -#else #define PROCESS(_CTYPE) \ std::transform(partition_buffer_idx_offset_.begin(), \ partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), \ @@ -998,10 +932,9 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) auto dst_pid_base = reinterpret_cast<_CTYPE*>(partition_buffer_idx_offset_[pid]); \ *dst_pid_base = reinterpret_cast<_CTYPE*>(src_addr)[row]; \ partition_buffer_idx_offset_[pid] += sizeof(_CTYPE); \ - _mm_prefetch(&dst_pid_base[64 / sizeof(_CTYPE)], _MM_HINT_T0); \ + _mm_prefetch(&dst_pid_base[1], _MM_HINT_T0); \ } \ break; -#endif case 8: PROCESS(uint8_t) case 16: @@ -1009,97 +942,9 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) case 32: PROCESS(uint32_t) case 64: -#ifdef PROCESSAVX - std::transform( - partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), - partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), - [](uint8_t* x, int16_t y) { return x + y * sizeof(uint64_t); }); - for (auto pid = 0; pid < num_partitions_; pid++) { - auto dst_pid_base = - reinterpret_cast(partition_buffer_idx_offset_[pid]); /*32k*/ - auto r = reducer_offset_offset_[pid]; /*8k*/ - auto size = reducer_offset_offset_[pid + 1]; -#if 1 - for (r; r < size && (((uint64_t)dst_pid_base & 0x1f) > 0); r++) { - auto src_offset = reducer_offsets_[r]; /*16k*/ - *dst_pid_base = reinterpret_cast(src_addr)[src_offset]; /*64k*/ - _mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2); - dst_pid_base += 1; - } -#if 0 - for (r; r+4(src_addr)[src_offset]; /*64k*/ - _mm_prefetch(&(src_addr)[src_offset * sizeof(uint64_t) + 64], _MM_HINT_T2); - dst_pid_base += 1; - } - } - break; -#else PROCESS(uint64_t) -#endif - #undef PROCESS case 128: // arrow::Decimal128Type::type_id -#ifdef PROCESSROW - // assume batch size = 32k; reducer# = 4K; row/reducer = 8 - std::transform( - partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), - partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), - [](uint8_t* x, int16_t y) { return x + y * 16; }); - for (auto pid = 0; pid < num_partitions_; pid++) { - auto dst_pid_base = - reinterpret_cast(partition_buffer_idx_offset_[pid]); /*32k*/ - auto r = reducer_offset_offset_[pid]; /*8k*/ - auto size = reducer_offset_offset_[pid + 1]; - for (r; r < size; r++) { - auto src_offset = reducer_offsets_[r]; /*16k*/ - *dst_pid_base = - reinterpret_cast(src_addr)[src_offset << 1]; /*128k*/ - *(dst_pid_base + 1) = - reinterpret_cast(src_addr)[src_offset << 1 | 1]; /*128k*/ - _mm_prefetch(&(src_addr)[src_offset * 16 + 64], _MM_HINT_T2); - dst_pid_base += 2; - } - } - break; -#else std::transform( partition_buffer_idx_offset_.begin(), partition_buffer_idx_offset_.end(), partition_buffer_idx_base_.begin(), partition_buffer_idx_offset_.begin(), @@ -1115,7 +960,6 @@ arrow::Status Splitter::SplitFixedWidthValueBuffer(const arrow::RecordBatch& rb) _MM_HINT_T0); } break; -#endif case 1: // arrow::BooleanType::type_id: partition_buffer_idx_offset.resize(partition_buffer_idx_base_.size()); std::copy(partition_buffer_idx_base_.begin(), partition_buffer_idx_base_.end(), @@ -1315,8 +1159,6 @@ arrow::Status Splitter::SplitFixedWidthValidityBuffer(const arrow::RecordBatch& if (rb.column_data(col_idx)->GetNullCount() == 0 && column_has_null_[col_idx] == true) { // if the input record batch doesn't have null, set validity to True - // column_has_null_ is used to skip the partition_id_cnt_[pid] and dst_addrs[pid] - // access for (auto pid = 0; pid < num_partitions_; ++pid) { if (partition_id_cnt_[pid] > 0 && dst_addrs[pid] != nullptr) { arrow::BitUtil::SetBitsTo(dst_addrs[pid], partition_buffer_idx_base_[pid], @@ -1564,13 +1406,7 @@ arrow::Status HashSplitter::ComputeAndCountPartitionId(const arrow::RecordBatch& for (auto i = 0; i < num_rows; ++i) { // positive mod auto pid = pid_arr->Value(i) % num_partitions_; - // force to generate ASM - __asm__( - "lea (%[num_partitions],%[pid],1),%[tmp]\n" - "test %[pid],%[pid]\n" - "cmovs %[tmp],%[pid]\n" - : [ pid ] "+r"(pid) - : [ num_partitions ] "r"(num_partitions_), [ tmp ] "r"(0)); + if (pid < 0) pid = (pid + num_partitions_) % num_partitions_; partition_id_[i] = pid; partition_id_cnt_[pid]++; } diff --git a/native-sql-engine/cpp/src/shuffle/splitter.h b/native-sql-engine/cpp/src/shuffle/splitter.h index 2fb4bb3d4..0dfac2f8c 100644 --- a/native-sql-engine/cpp/src/shuffle/splitter.h +++ b/native-sql-engine/cpp/src/shuffle/splitter.h @@ -226,10 +226,6 @@ class Splitter { // updated for each input record batch // col std::vector partition_id_; - // [num_rows] - std::vector reducer_offsets_; - // [num_partitions] - std::vector reducer_offset_offset_; // col std::vector partition_id_cnt_;