diff --git a/cpp/benchmarks/CMakeLists.txt b/cpp/benchmarks/CMakeLists.txt index 49504e53424..8a48126e195 100644 --- a/cpp/benchmarks/CMakeLists.txt +++ b/cpp/benchmarks/CMakeLists.txt @@ -267,6 +267,11 @@ ConfigureNVBench(PARQUET_MULTITHREAD_READER_NVBENCH io/parquet/parquet_reader_mu # * orc reader benchmark -------------------------------------------------------------------------- ConfigureNVBench(ORC_READER_NVBENCH io/orc/orc_reader_input.cpp io/orc/orc_reader_options.cpp) +# ################################################################################################## +# * orc multithreaded benchmark +# -------------------------------------------------------------------------- +ConfigureNVBench(ORC_MULTITHREADED_NVBENCH io/orc/orc_reader_multithreaded.cpp) + # ################################################################################################## # * csv reader benchmark -------------------------------------------------------------------------- ConfigureNVBench(CSV_READER_NVBENCH io/csv/csv_reader_input.cpp io/csv/csv_reader_options.cpp) diff --git a/cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp b/cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp new file mode 100644 index 00000000000..ffbbc6f8464 --- /dev/null +++ b/cpp/benchmarks/io/orc/orc_reader_multithreaded.cpp @@ -0,0 +1,335 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +#include + +size_t get_num_read_threads(nvbench::state const& state) { return state.get_int64("num_threads"); } + +size_t get_read_size(nvbench::state const& state) +{ + auto const num_reads = get_num_read_threads(state); + return state.get_int64("total_data_size") / num_reads; +} + +std::string get_label(std::string const& test_name, nvbench::state const& state) +{ + auto const num_cols = state.get_int64("num_cols"); + size_t const read_size_mb = get_read_size(state) / (1024 * 1024); + return {test_name + ", " + std::to_string(num_cols) + " columns, " + + std::to_string(get_num_read_threads(state)) + " threads " + " (" + + std::to_string(read_size_mb) + " MB each)"}; +} + +std::tuple, size_t, size_t> write_file_data( + nvbench::state& state, std::vector const& d_types) +{ + auto const cardinality = state.get_int64("cardinality"); + auto const run_length = state.get_int64("run_length"); + auto const num_cols = state.get_int64("num_cols"); + size_t const num_files = get_num_read_threads(state); + size_t const per_file_data_size = get_read_size(state); + + std::vector source_sink_vector; + + size_t total_file_size = 0; + + for (size_t i = 0; i < num_files; ++i) { + cuio_source_sink_pair source_sink{io_type::HOST_BUFFER}; + + auto const tbl = create_random_table( + cycle_dtypes(d_types, num_cols), + table_size_bytes{per_file_data_size}, + data_profile_builder().cardinality(cardinality).avg_run_length(run_length)); + auto const view = tbl->view(); + + cudf::io::orc_writer_options const write_opts = + cudf::io::orc_writer_options::builder(source_sink.make_sink_info(), view) + .compression(cudf::io::compression_type::SNAPPY); + + cudf::io::write_orc(write_opts); + total_file_size += source_sink.size(); + + source_sink_vector.push_back(std::move(source_sink)); + } + + return {std::move(source_sink_vector), total_file_size, num_files}; +} + +void BM_orc_multithreaded_read_common(nvbench::state& state, + std::vector const& d_types, + std::string const& label) +{ + auto const data_size = state.get_int64("total_data_size"); + auto const num_threads = state.get_int64("num_threads"); + + auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); + cudf::detail::thread_pool threads(num_threads); + + auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); + std::vector source_info_vector; + std::transform(source_sink_vector.begin(), + source_sink_vector.end(), + std::back_inserter(source_info_vector), + [](auto& source_sink) { return source_sink.make_source_info(); }); + + auto mem_stats_logger = cudf::memory_stats_logger(); + + { + cudf::scoped_range range{("(read) " + label).c_str()}; + state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer, + [&](nvbench::launch& launch, auto& timer) { + auto read_func = [&](int index) { + auto const stream = streams[index % num_threads]; + cudf::io::orc_reader_options read_opts = + cudf::io::orc_reader_options::builder(source_info_vector[index]); + cudf::io::read_orc(read_opts, stream, rmm::mr::get_current_device_resource()); + }; + + threads.paused = true; + for (size_t i = 0; i < num_files; ++i) { + threads.submit(read_func, i); + } + timer.start(); + threads.paused = false; + threads.wait_for_tasks(); + cudf::detail::join_streams(streams, cudf::get_default_stream()); + timer.stop(); + }); + } + + auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); + state.add_element_count(static_cast(data_size) / time, "bytes_per_second"); + state.add_buffer_size( + mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage"); + state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size"); +} + +void BM_orc_multithreaded_read_mixed(nvbench::state& state) +{ + auto label = get_label("mixed", state); + cudf::scoped_range range{label.c_str()}; + BM_orc_multithreaded_read_common( + state, {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label); +} + +void BM_orc_multithreaded_read_fixed_width(nvbench::state& state) +{ + auto label = get_label("fixed width", state); + cudf::scoped_range range{label.c_str()}; + BM_orc_multithreaded_read_common(state, {cudf::type_id::INT32}, label); +} + +void BM_orc_multithreaded_read_string(nvbench::state& state) +{ + auto label = get_label("string", state); + cudf::scoped_range range{label.c_str()}; + BM_orc_multithreaded_read_common(state, {cudf::type_id::STRING}, label); +} + +void BM_orc_multithreaded_read_list(nvbench::state& state) +{ + auto label = get_label("list", state); + cudf::scoped_range range{label.c_str()}; + BM_orc_multithreaded_read_common(state, {cudf::type_id::LIST}, label); +} + +void BM_orc_multithreaded_read_chunked_common(nvbench::state& state, + std::vector const& d_types, + std::string const& label) +{ + size_t const data_size = state.get_int64("total_data_size"); + auto const num_threads = state.get_int64("num_threads"); + size_t const input_limit = state.get_int64("input_limit"); + size_t const output_limit = state.get_int64("output_limit"); + + auto streams = cudf::detail::fork_streams(cudf::get_default_stream(), num_threads); + cudf::detail::thread_pool threads(num_threads); + auto [source_sink_vector, total_file_size, num_files] = write_file_data(state, d_types); + std::vector source_info_vector; + std::transform(source_sink_vector.begin(), + source_sink_vector.end(), + std::back_inserter(source_info_vector), + [](auto& source_sink) { return source_sink.make_source_info(); }); + + auto mem_stats_logger = cudf::memory_stats_logger(); + + { + cudf::scoped_range range{("(read) " + label).c_str()}; + std::vector chunks; + state.exec(nvbench::exec_tag::sync | nvbench::exec_tag::timer, + [&](nvbench::launch& launch, auto& timer) { + auto read_func = [&](int index) { + auto const stream = streams[index % num_threads]; + cudf::io::orc_reader_options read_opts = + cudf::io::orc_reader_options::builder(source_info_vector[index]); + // divide chunk limits by number of threads so the number of chunks produced is the + // same for all cases. this seems better than the alternative, which is to keep the + // limits the same. if we do that, as the number of threads goes up, the number of + // chunks goes down - so are actually benchmarking the same thing in that case? + auto reader = cudf::io::chunked_orc_reader( + output_limit / num_threads, input_limit / num_threads, read_opts, stream); + + // read all the chunks + do { + auto table = reader.read_chunk(); + } while (reader.has_next()); + }; + + threads.paused = true; + for (size_t i = 0; i < num_files; ++i) { + threads.submit(read_func, i); + } + timer.start(); + threads.paused = false; + threads.wait_for_tasks(); + cudf::detail::join_streams(streams, cudf::get_default_stream()); + timer.stop(); + }); + } + + auto const time = state.get_summary("nv/cold/time/gpu/mean").get_float64("value"); + state.add_element_count(static_cast(data_size) / time, "bytes_per_second"); + state.add_buffer_size( + mem_stats_logger.peak_memory_usage(), "peak_memory_usage", "peak_memory_usage"); + state.add_buffer_size(total_file_size, "encoded_file_size", "encoded_file_size"); +} + +void BM_orc_multithreaded_read_chunked_mixed(nvbench::state& state) +{ + auto label = get_label("mixed", state); + cudf::scoped_range range{label.c_str()}; + BM_orc_multithreaded_read_chunked_common( + state, {cudf::type_id::INT32, cudf::type_id::DECIMAL64, cudf::type_id::STRING}, label); +} + +void BM_orc_multithreaded_read_chunked_fixed_width(nvbench::state& state) +{ + auto label = get_label("fixed width", state); + cudf::scoped_range range{label.c_str()}; + BM_orc_multithreaded_read_chunked_common(state, {cudf::type_id::INT32}, label); +} + +void BM_orc_multithreaded_read_chunked_string(nvbench::state& state) +{ + auto label = get_label("string", state); + cudf::scoped_range range{label.c_str()}; + BM_orc_multithreaded_read_chunked_common(state, {cudf::type_id::STRING}, label); +} + +void BM_orc_multithreaded_read_chunked_list(nvbench::state& state) +{ + auto label = get_label("list", state); + cudf::scoped_range range{label.c_str()}; + BM_orc_multithreaded_read_chunked_common(state, {cudf::type_id::LIST}, label); +} +auto const thread_range = std::vector{1, 2, 4, 8}; +auto const total_data_size = std::vector{512 * 1024 * 1024, 1024 * 1024 * 1024}; + +// mixed data types: fixed width and strings +NVBENCH_BENCH(BM_orc_multithreaded_read_mixed) + .set_name("orc_multithreaded_read_decode_mixed") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", total_data_size) + .add_int64_axis("num_threads", thread_range) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); + +NVBENCH_BENCH(BM_orc_multithreaded_read_fixed_width) + .set_name("orc_multithreaded_read_decode_fixed_width") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", total_data_size) + .add_int64_axis("num_threads", thread_range) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); + +NVBENCH_BENCH(BM_orc_multithreaded_read_string) + .set_name("orc_multithreaded_read_decode_string") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", total_data_size) + .add_int64_axis("num_threads", thread_range) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); + +NVBENCH_BENCH(BM_orc_multithreaded_read_list) + .set_name("orc_multithreaded_read_decode_list") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", total_data_size) + .add_int64_axis("num_threads", thread_range) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}); + +// mixed data types: fixed width, strings +NVBENCH_BENCH(BM_orc_multithreaded_read_chunked_mixed) + .set_name("orc_multithreaded_read_decode_chunked_mixed") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", total_data_size) + .add_int64_axis("num_threads", thread_range) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); + +NVBENCH_BENCH(BM_orc_multithreaded_read_chunked_fixed_width) + .set_name("orc_multithreaded_read_decode_chunked_fixed_width") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", total_data_size) + .add_int64_axis("num_threads", thread_range) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); + +NVBENCH_BENCH(BM_orc_multithreaded_read_chunked_string) + .set_name("orc_multithreaded_read_decode_chunked_string") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", total_data_size) + .add_int64_axis("num_threads", thread_range) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024}); + +NVBENCH_BENCH(BM_orc_multithreaded_read_chunked_list) + .set_name("orc_multithreaded_read_decode_chunked_list") + .set_min_samples(4) + .add_int64_axis("cardinality", {1000}) + .add_int64_axis("total_data_size", total_data_size) + .add_int64_axis("num_threads", thread_range) + .add_int64_axis("num_cols", {4}) + .add_int64_axis("run_length", {8}) + .add_int64_axis("input_limit", {640 * 1024 * 1024}) + .add_int64_axis("output_limit", {640 * 1024 * 1024});