From a4b951a6c140c05178edb61d8e28f51a4b430e15 Mon Sep 17 00:00:00 2001 From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com> Date: Fri, 28 Jun 2024 10:20:42 -0500 Subject: [PATCH] Templatization of fixed-width parquet decoding kernels. (#15911) This PR merges all of the fixed-width parquet decoding kernels into a single templatized kernel that can be selectively instantiated with desired features (dictionary/no-dictionary, nested/non-nested, etc). It also adds support for (non-list) nested columns in this path. So structs do not have to use the much slower general decode kernel any more. A new benchmark was added specific to structs containing only fixed width columns. I added this because the performance improvement is fairly high (+20%) but we don't see it in the normal struct benchmarks because they include (and are dominated by) string decode times. The new benchmark shows: Before this PR: ``` | data_type | io_type | cardinality | run_length | bytes_per_second | peak_memory_usage | encoded_file_size | |-----------|---------------|-------------|------------|------------------|-------------------|-------------------| | STRUCT | DEVICE_BUFFER | 0 | 1 | 21071216823 | 1.047 GiB | 511.675 MiB | | STRUCT | DEVICE_BUFFER | 1000 | 1 | 18974392387 | 821.312 MiB | 128.884 MiB | | STRUCT | DEVICE_BUFFER | 0 | 32 | 20429356824 | 621.787 MiB | 28.141 MiB | | STRUCT | DEVICE_BUFFER | 1000 | 32 | 20572327813 | 598.421 MiB | 16.475 MiB | ``` After this PR: ``` | data_type | io_type | cardinality | run_length | bytes_per_second | peak_memory_usage | encoded_file_size | |-----------|---------------|-------------|------------|------------------|-------------------|-------------------| | STRUCT | DEVICE_BUFFER | 0 | 1 | 25805996399 | 1.047 GiB | 511.675 MiB | | STRUCT | DEVICE_BUFFER | 1000 | 1 | 22422306660 | 821.312 MiB | 128.884 MiB | | STRUCT | DEVICE_BUFFER | 0 | 32 | 24460694014 | 621.787 MiB | 28.141 MiB | | STRUCT | DEVICE_BUFFER | 1000 | 32 | 24674861214 | 598.421 MiB | 16.475 MiB | ``` Split-page decoding for fixed-width types + structs are also going through this new path. New test added. This brings us closer to eliminating the "general" kernel. The only things left that run through it are lists and booleans. This is PR 1 of 2, with the followup moving a lot of code around. At this point, I think it makes sense to start consolidating our files a bit. I also left some breadcrumbs (a few small commented out code blocks) in the core kernel `gpuDecodePageDataGeneric` for the next step of adding list support. They can be removed if people don't like them. Authors: - https://github.com/nvdbaranec Approvers: - Mike Wilson (https://github.com/hyperbolic2346) - Vukasin Milovanovic (https://github.com/vuule) - Muhammad Haseeb (https://github.com/mhaseeb123) URL: https://github.com/rapidsai/cudf/pull/15911 --- .../io/parquet/parquet_reader_input.cpp | 50 +- cpp/src/io/parquet/decode_fixed.cu | 896 ++++++++++-------- cpp/src/io/parquet/page_hdr.cu | 16 +- cpp/src/io/parquet/parquet_gpu.hpp | 46 +- cpp/src/io/parquet/reader_impl.cpp | 57 +- cpp/tests/io/parquet_writer_test.cpp | 97 +- 6 files changed, 703 insertions(+), 459 deletions(-) diff --git a/cpp/benchmarks/io/parquet/parquet_reader_input.cpp b/cpp/benchmarks/io/parquet/parquet_reader_input.cpp index 019e0f30fe9..7563c823454 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader_input.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader_input.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,20 +59,18 @@ void parquet_read_common(cudf::size_type num_rows_to_read, } template -void BM_parquet_read_data(nvbench::state& state, nvbench::type_list>) +void BM_parquet_read_data_common(nvbench::state& state, + data_profile const& profile, + nvbench::type_list>) { auto const d_type = get_type_or_group(static_cast(DataType)); - auto const cardinality = static_cast(state.get_int64("cardinality")); - auto const run_length = static_cast(state.get_int64("run_length")); auto const source_type = retrieve_io_type_enum(state.get_string("io_type")); auto const compression = cudf::io::compression_type::SNAPPY; cuio_source_sink_pair source_sink(source_type); auto const num_rows_written = [&]() { - auto const tbl = create_random_table( - cycle_dtypes(d_type, num_cols), - table_size_bytes{data_size}, - data_profile_builder().cardinality(cardinality).avg_run_length(run_length)); + auto const tbl = + create_random_table(cycle_dtypes(d_type, num_cols), table_size_bytes{data_size}, profile); auto const view = tbl->view(); cudf::io::parquet_writer_options write_opts = @@ -85,6 +83,32 @@ void BM_parquet_read_data(nvbench::state& state, nvbench::type_list +void BM_parquet_read_data(nvbench::state& state, + nvbench::type_list> type_list) +{ + auto const cardinality = static_cast(state.get_int64("cardinality")); + auto const run_length = static_cast(state.get_int64("run_length")); + BM_parquet_read_data_common( + state, data_profile_builder().cardinality(cardinality).avg_run_length(run_length), type_list); +} + +template +void BM_parquet_read_fixed_width_struct(nvbench::state& state, + nvbench::type_list> type_list) +{ + auto const cardinality = static_cast(state.get_int64("cardinality")); + auto const run_length = static_cast(state.get_int64("run_length")); + std::vector s_types{ + cudf::type_id::INT32, cudf::type_id::FLOAT32, cudf::type_id::INT64}; + BM_parquet_read_data_common(state, + data_profile_builder() + .cardinality(cardinality) + .avg_run_length(run_length) + .struct_types(s_types), + type_list); +} + void BM_parquet_read_io_compression(nvbench::state& state) { auto const d_type = get_type_or_group({static_cast(data_type::INTEGRAL), @@ -247,3 +271,13 @@ NVBENCH_BENCH(BM_parquet_read_io_small_mixed) .add_int64_axis("cardinality", {0, 1000}) .add_int64_axis("run_length", {1, 32}) .add_int64_axis("num_string_cols", {1, 2, 3}); + +// a benchmark for structs that only contain fixed-width types +using d_type_list_struct_only = nvbench::enum_type_list; +NVBENCH_BENCH_TYPES(BM_parquet_read_fixed_width_struct, NVBENCH_TYPE_AXES(d_type_list_struct_only)) + .set_name("parquet_read_fixed_width_struct") + .set_type_axes_names({"data_type"}) + .add_string_axis("io_type", {"DEVICE_BUFFER"}) + .set_min_samples(4) + .add_int64_axis("cardinality", {0, 1000}) + .add_int64_axis("run_length", {1, 32}); diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index bfd89200786..ea80ae73c2f 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -24,136 +24,11 @@ namespace cudf::io::parquet::detail { namespace { -constexpr int decode_block_size = 128; -constexpr int rolling_buf_size = decode_block_size * 2; -// the required number of runs in shared memory we will need to provide the -// rle_stream object -constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size(); - -template -static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat( - int32_t target_value_count, page_state_s* s, state_buf* sb, level_t const* const def, int t) -{ - constexpr int num_warps = decode_block_size / cudf::detail::warp_size; - constexpr int max_batch_size = num_warps * cudf::detail::warp_size; - - auto& ni = s->nesting_info[0]; - - // how many (input) values we've processed in the page so far - int value_count = s->input_value_count; - int valid_count = ni.valid_count; - - // cap by last row so that we don't process any rows past what we want to output. - int const first_row = s->first_row; - int const last_row = first_row + s->num_rows; - int const capped_target_value_count = min(target_value_count, last_row); - - int const valid_map_offset = ni.valid_map_offset; - int const row_index_lower_bound = s->row_index_lower_bound; - - __syncthreads(); - - while (value_count < capped_target_value_count) { - int const batch_size = min(max_batch_size, capped_target_value_count - value_count); - - // definition level. only need to process for nullable columns - int d = 0; - if constexpr (nullable) { - d = t < batch_size - ? static_cast(def[rolling_index(value_count + t)]) - : -1; - } - - int const thread_value_count = t + 1; - int const block_value_count = batch_size; - - // compute our row index, whether we're in row bounds, and validity - int const row_index = (thread_value_count + value_count) - 1; - int const in_row_bounds = (row_index >= row_index_lower_bound) && (row_index < last_row); - int is_valid; - if constexpr (nullable) { - is_valid = ((d > 0) && in_row_bounds) ? 1 : 0; - } else { - is_valid = in_row_bounds; - } - - // thread and block validity count - int thread_valid_count, block_valid_count; - if constexpr (nullable) { - using block_scan = cub::BlockScan; - __shared__ typename block_scan::TempStorage scan_storage; - block_scan(scan_storage).InclusiveSum(is_valid, thread_valid_count, block_valid_count); - __syncthreads(); - - // validity is processed per-warp - // - // nested schemas always read and write to the same bounds (that is, read and write - // positions are already pre-bounded by first_row/num_rows). flat schemas will start reading - // at the first value, even if that is before first_row, because we cannot trivially jump to - // the correct position to start reading. since we are about to write the validity vector - // here we need to adjust our computed mask to take into account the write row bounds. - int const in_write_row_bounds = ballot(row_index >= first_row && row_index < last_row); - int const write_start = __ffs(in_write_row_bounds) - 1; // first bit in the warp to store - int warp_null_count = 0; - if (write_start >= 0) { - uint32_t const warp_validity_mask = ballot(is_valid); - // lane 0 from each warp writes out validity - if ((t % cudf::detail::warp_size) == 0) { - int const vindex = (value_count + thread_value_count) - 1; // absolute input value index - int const bit_offset = (valid_map_offset + vindex + write_start) - - first_row; // absolute bit offset into the output validity map - int const write_end = - cudf::detail::warp_size - __clz(in_write_row_bounds); // last bit in the warp to store - int const bit_count = write_end - write_start; - warp_null_count = bit_count - __popc(warp_validity_mask >> write_start); - - store_validity(bit_offset, ni.valid_map, warp_validity_mask >> write_start, bit_count); - } - } - - // sum null counts. we have to do it this way instead of just incrementing by (value_count - - // valid_count) because valid_count also includes rows that potentially start before our row - // bounds. if we could come up with a way to clean that up, we could remove this and just - // compute it directly at the end of the kernel. - size_type const block_null_count = - cudf::detail::single_lane_block_sum_reduce(warp_null_count); - if (t == 0) { ni.null_count += block_null_count; } - } - // trivial for non-nullable columns - else { - thread_valid_count = thread_value_count; - block_valid_count = block_value_count; - } - - // output offset - if (is_valid) { - int const dst_pos = (value_count + thread_value_count) - 1; - int const src_pos = (valid_count + thread_valid_count) - 1; - sb->nz_idx[rolling_index(src_pos)] = dst_pos; - } - - // update stuff - value_count += block_value_count; - valid_count += block_valid_count; - } - - if (t == 0) { - // update valid value count for decoding and total # of values we've processed - ni.valid_count = valid_count; - ni.value_count = value_count; - s->nz_count = valid_count; - s->input_value_count = value_count; - s->input_row_count = value_count; - } - - return valid_count; -} - -template -__device__ inline void gpuDecodeValues( +template +__device__ inline void gpuDecodeFixedWidthValues( page_state_s* s, state_buf* const sb, int start, int end, int t) { - constexpr int num_warps = decode_block_size / cudf::detail::warp_size; + constexpr int num_warps = block_size / cudf::detail::warp_size; constexpr int max_batch_size = num_warps * cudf::detail::warp_size; PageNestingDecodeInfo* nesting_info_base = s->nesting_info; @@ -217,18 +92,22 @@ __device__ inline void gpuDecodeValues( } } -template -__device__ inline void gpuDecodeSplitValues(page_state_s* s, - state_buf* const sb, - int start, - int end) +template +struct decode_fixed_width_values_func { + __device__ inline void operator()(page_state_s* s, state_buf* const sb, int start, int end, int t) + { + gpuDecodeFixedWidthValues(s, sb, start, end, t); + } +}; + +template +__device__ inline void gpuDecodeFixedWidthSplitValues( + page_state_s* s, state_buf* const sb, int start, int end, int t) { using cudf::detail::warp_size; - constexpr int num_warps = decode_block_size / warp_size; + constexpr int num_warps = block_size / warp_size; constexpr int max_batch_size = num_warps * warp_size; - auto const t = threadIdx.x; - PageNestingDecodeInfo* nesting_info_base = s->nesting_info; int const dtype = s->col.physical_type; auto const data_len = thrust::distance(s->data_start, s->data_end); @@ -307,266 +186,293 @@ __device__ inline void gpuDecodeSplitValues(page_state_s* s, } } -// is the page marked nullable or not -__device__ inline bool is_nullable(page_state_s* s) -{ - auto const lvl = level_type::DEFINITION; - auto const max_def_level = s->col.max_level[lvl]; - return max_def_level > 0; -} +template +struct decode_fixed_width_split_values_func { + __device__ inline void operator()(page_state_s* s, state_buf* const sb, int start, int end, int t) + { + gpuDecodeFixedWidthSplitValues(s, sb, start, end, t); + } +}; -// for a nullable page, check to see if it could have nulls -__device__ inline bool has_nulls(page_state_s* s) +template +static __device__ int gpuUpdateValidityAndRowIndicesNested( + int32_t target_value_count, page_state_s* s, state_buf* sb, level_t const* const def, int t) { - auto const lvl = level_type::DEFINITION; - auto const init_run = s->initial_rle_run[lvl]; - // literal runs, lets assume they could hold nulls - if (is_literal_run(init_run)) { return true; } - - // repeated run with number of items in the run not equal - // to the rows in the page, assume that means we could have nulls - if (s->page.num_input_values != (init_run >> 1)) { return true; } - - auto const lvl_bits = s->col.level_bits[lvl]; - auto const run_val = lvl_bits == 0 ? 0 : s->initial_rle_value[lvl]; - - // the encoded repeated value isn't valid, we have (all) nulls - return run_val != s->col.max_level[lvl]; -} + constexpr int num_warps = decode_block_size / cudf::detail::warp_size; + constexpr int max_batch_size = num_warps * cudf::detail::warp_size; -/** - * @brief Kernel for computing fixed width non dictionary column data stored in the pages - * - * This function will write the page data and the page data's validity to the - * output specified in the page's column chunk. If necessary, additional - * conversion will be performed to translate from the Parquet datatype to - * desired output datatype. - * - * @param pages List of pages - * @param chunks List of column chunks - * @param min_row Row index to start reading at - * @param num_rows Maximum number of rows to read - * @param error_code Error code to set if an error is encountered - */ -template -CUDF_KERNEL void __launch_bounds__(decode_block_size) - gpuDecodePageDataFixed(PageInfo* pages, - device_span chunks, - size_t min_row, - size_t num_rows, - kernel_error::pointer error_code) -{ - __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s // unused in this kernel - state_buffers; + // how many (input) values we've processed in the page so far + int value_count = s->input_value_count; - page_state_s* const s = &state_g; - auto* const sb = &state_buffers; - int const page_idx = blockIdx.x; - int const t = threadIdx.x; - PageInfo* pp = &pages[page_idx]; + // cap by last row so that we don't process any rows past what we want to output. + int const first_row = s->first_row; + int const last_row = first_row + s->num_rows; + int const capped_target_value_count = min(target_value_count, last_row); - if (!(BitAnd(pages[page_idx].kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT))) { return; } + int const row_index_lower_bound = s->row_index_lower_bound; - // must come after the kernel mask check - [[maybe_unused]] null_count_back_copier _{s, t}; + int const max_depth = s->col.max_nesting_depth - 1; + __syncthreads(); - if (!setupLocalPageInfo(s, - pp, - chunks, - min_row, - num_rows, - mask_filter{decode_kernel_mask::FIXED_WIDTH_NO_DICT}, - page_processing_stage::DECODE)) { - return; - } + while (value_count < capped_target_value_count) { + int const batch_size = min(max_batch_size, capped_target_value_count - value_count); - // the level stream decoders - __shared__ rle_run def_runs[rle_run_buffer_size]; - rle_stream def_decoder{def_runs}; + // definition level. only need to process for nullable columns + int d = 0; + if constexpr (nullable) { + if (def) { + d = t < batch_size + ? static_cast(def[rolling_index(value_count + t)]) + : -1; + } else { + d = t < batch_size ? 1 : -1; + } + } - // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. - if (s->num_rows == 0) { return; } + int const thread_value_count = t + 1; + int const block_value_count = batch_size; - bool const nullable = is_nullable(s); - bool const nullable_with_nulls = nullable && has_nulls(s); + // compute our row index, whether we're in row bounds, and validity + int const row_index = (thread_value_count + value_count) - 1; + int const in_row_bounds = (row_index >= row_index_lower_bound) && (row_index < last_row); + int const in_write_row_bounds = ballot(row_index >= first_row && row_index < last_row); + int const write_start = __ffs(in_write_row_bounds) - 1; // first bit in the warp to store + + // iterate by depth + for (int d_idx = 0; d_idx <= max_depth; d_idx++) { + auto& ni = s->nesting_info[d_idx]; + + int is_valid; + if constexpr (nullable) { + is_valid = ((d >= ni.max_def_level) && in_row_bounds) ? 1 : 0; + } else { + is_valid = in_row_bounds; + } - // initialize the stream decoders (requires values computed in setupLocalPageInfo) - level_t* const def = reinterpret_cast(pp->lvl_decode_buf[level_type::DEFINITION]); - if (nullable_with_nulls) { - def_decoder.init(s->col.level_bits[level_type::DEFINITION], - s->abs_lvl_start[level_type::DEFINITION], - s->abs_lvl_end[level_type::DEFINITION], - def, - s->page.num_input_values); - } - __syncthreads(); + // thread and block validity count + int thread_valid_count, block_valid_count; + if constexpr (nullable) { + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage scan_storage; + block_scan(scan_storage).InclusiveSum(is_valid, thread_valid_count, block_valid_count); + __syncthreads(); + + // validity is processed per-warp + // + // nested schemas always read and write to the same bounds (that is, read and write + // positions are already pre-bounded by first_row/num_rows). flat schemas will start reading + // at the first value, even if that is before first_row, because we cannot trivially jump to + // the correct position to start reading. since we are about to write the validity vector + // here we need to adjust our computed mask to take into account the write row bounds. + int warp_null_count = 0; + if (write_start >= 0 && ni.valid_map != nullptr) { + int const valid_map_offset = ni.valid_map_offset; + uint32_t const warp_validity_mask = ballot(is_valid); + // lane 0 from each warp writes out validity + if ((t % cudf::detail::warp_size) == 0) { + int const vindex = + (value_count + thread_value_count) - 1; // absolute input value index + int const bit_offset = (valid_map_offset + vindex + write_start) - + first_row; // absolute bit offset into the output validity map + int const write_end = cudf::detail::warp_size - + __clz(in_write_row_bounds); // last bit in the warp to store + int const bit_count = write_end - write_start; + warp_null_count = bit_count - __popc(warp_validity_mask >> write_start); + + store_validity(bit_offset, ni.valid_map, warp_validity_mask >> write_start, bit_count); + } + } - // We use two counters in the loop below: processed_count and valid_count. - // - processed_count: number of rows out of num_input_values that we have decoded so far. - // the definition stream returns the number of total rows it has processed in each call - // to decode_next and we accumulate in process_count. - // - valid_count: number of non-null rows we have decoded so far. In each iteration of the - // loop below, we look at the number of valid items (which could be all for non-nullable), - // and valid_count is that running count. - int processed_count = 0; - int valid_count = 0; - // the core loop. decode batches of level stream data using rle_stream objects - // and pass the results to gpuDecodeValues - while (s->error == 0 && processed_count < s->page.num_input_values) { - int next_valid_count; + // sum null counts. we have to do it this way instead of just incrementing by (value_count - + // valid_count) because valid_count also includes rows that potentially start before our row + // bounds. if we could come up with a way to clean that up, we could remove this and just + // compute it directly at the end of the kernel. + size_type const block_null_count = + cudf::detail::single_lane_block_sum_reduce(warp_null_count); + if (t == 0) { ni.null_count += block_null_count; } + } + // trivial for non-nullable columns + else { + thread_valid_count = thread_value_count; + block_valid_count = block_value_count; + } - // only need to process definition levels if the column has nulls - if (nullable_with_nulls) { - processed_count += def_decoder.decode_next(t); - __syncthreads(); + // if this is valid and we're at the leaf, output dst_pos + __syncthreads(); // handle modification of ni.value_count from below + if (is_valid && d_idx == max_depth) { + // for non-list types, the value count is always the same across + int const dst_pos = (value_count + thread_value_count) - 1; + int const src_pos = (ni.valid_count + thread_valid_count) - 1; + sb->nz_idx[rolling_index(src_pos)] = dst_pos; + } + __syncthreads(); // handle modification of ni.value_count from below - next_valid_count = - gpuUpdateValidityOffsetsAndRowIndicesFlat(processed_count, s, sb, def, t); + // update stuff + if (t == 0) { ni.valid_count += block_valid_count; } } - // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip - // this function call entirely since all it will ever generate is a mapping of (i -> i) for - // nz_idx. gpuDecodeValues would be the only work that happens. - else { - processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, nullptr, t); - } - __syncthreads(); - // decode the values themselves - gpuDecodeValues(s, sb, valid_count, next_valid_count, t); - __syncthreads(); + value_count += block_value_count; + } - valid_count = next_valid_count; + if (t == 0) { + // update valid value count for decoding and total # of values we've processed + s->nz_count = s->nesting_info[max_depth].valid_count; + s->input_value_count = value_count; + s->input_row_count = value_count; } - if (t == 0 and s->error != 0) { set_error(s->error, error_code); } + + __syncthreads(); + return s->nesting_info[max_depth].valid_count; } -/** - * @brief Kernel for computing fixed width dictionary column data stored in the pages - * - * This function will write the page data and the page data's validity to the - * output specified in the page's column chunk. If necessary, additional - * conversion will be performed to translate from the Parquet datatype to - * desired output datatype. - * - * @param pages List of pages - * @param chunks List of column chunks - * @param min_row Row index to start reading at - * @param num_rows Maximum number of rows to read - * @param error_code Error code to set if an error is encountered - */ -template -CUDF_KERNEL void __launch_bounds__(decode_block_size) - gpuDecodePageDataFixedDict(PageInfo* pages, - device_span chunks, - size_t min_row, - size_t num_rows, - kernel_error::pointer error_code) +template +static __device__ int gpuUpdateValidityAndRowIndicesFlat( + int32_t target_value_count, page_state_s* s, state_buf* sb, level_t const* const def, int t) { - __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s // unused in this kernel - state_buffers; - - page_state_s* const s = &state_g; - auto* const sb = &state_buffers; - int const page_idx = blockIdx.x; - int const t = threadIdx.x; - PageInfo* pp = &pages[page_idx]; + constexpr int num_warps = decode_block_size / cudf::detail::warp_size; + constexpr int max_batch_size = num_warps * cudf::detail::warp_size; - if (!(BitAnd(pages[page_idx].kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT))) { return; } + auto& ni = s->nesting_info[0]; - // must come after the kernel mask check - [[maybe_unused]] null_count_back_copier _{s, t}; + // how many (input) values we've processed in the page so far + int value_count = s->input_value_count; + int valid_count = ni.valid_count; - if (!setupLocalPageInfo(s, - pp, - chunks, - min_row, - num_rows, - mask_filter{decode_kernel_mask::FIXED_WIDTH_DICT}, - page_processing_stage::DECODE)) { - return; - } + // cap by last row so that we don't process any rows past what we want to output. + int const first_row = s->first_row; + int const last_row = first_row + s->num_rows; + int const capped_target_value_count = min(target_value_count, last_row); - __shared__ rle_run def_runs[rle_run_buffer_size]; - rle_stream def_decoder{def_runs}; + int const valid_map_offset = ni.valid_map_offset; + int const row_index_lower_bound = s->row_index_lower_bound; - __shared__ rle_run dict_runs[rle_run_buffer_size]; - rle_stream dict_stream{dict_runs}; + __syncthreads(); - // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. - if (s->num_rows == 0) { return; } + while (value_count < capped_target_value_count) { + int const batch_size = min(max_batch_size, capped_target_value_count - value_count); - bool const nullable = is_nullable(s); - bool const nullable_with_nulls = nullable && has_nulls(s); + // definition level. only need to process for nullable columns + int d = 0; + if constexpr (nullable) { + if (def) { + d = t < batch_size + ? static_cast(def[rolling_index(value_count + t)]) + : -1; + } else { + d = t < batch_size ? 1 : -1; + } + } - // initialize the stream decoders (requires values computed in setupLocalPageInfo) - level_t* const def = reinterpret_cast(pp->lvl_decode_buf[level_type::DEFINITION]); - if (nullable_with_nulls) { - def_decoder.init(s->col.level_bits[level_type::DEFINITION], - s->abs_lvl_start[level_type::DEFINITION], - s->abs_lvl_end[level_type::DEFINITION], - def, - s->page.num_input_values); - } + int const thread_value_count = t + 1; + int const block_value_count = batch_size; - dict_stream.init( - s->dict_bits, s->data_start, s->data_end, sb->dict_idx, s->page.num_input_values); - __syncthreads(); + // compute our row index, whether we're in row bounds, and validity + int const row_index = (thread_value_count + value_count) - 1; + int const in_row_bounds = (row_index >= row_index_lower_bound) && (row_index < last_row); + int is_valid; + if constexpr (nullable) { + is_valid = ((d > 0) && in_row_bounds) ? 1 : 0; + } else { + is_valid = in_row_bounds; + } - // We use two counters in the loop below: processed_count and valid_count. - // - processed_count: number of rows out of num_input_values that we have decoded so far. - // the definition stream returns the number of total rows it has processed in each call - // to decode_next and we accumulate in process_count. - // - valid_count: number of non-null rows we have decoded so far. In each iteration of the - // loop below, we look at the number of valid items (which could be all for non-nullable), - // and valid_count is that running count. - int processed_count = 0; - int valid_count = 0; + // thread and block validity count + int thread_valid_count, block_valid_count; + if constexpr (nullable) { + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage scan_storage; + block_scan(scan_storage).InclusiveSum(is_valid, thread_valid_count, block_valid_count); + __syncthreads(); - // the core loop. decode batches of level stream data using rle_stream objects - // and pass the results to gpuDecodeValues - while (s->error == 0 && processed_count < s->page.num_input_values) { - int next_valid_count; + // validity is processed per-warp + // + // nested schemas always read and write to the same bounds (that is, read and write + // positions are already pre-bounded by first_row/num_rows). flat schemas will start reading + // at the first value, even if that is before first_row, because we cannot trivially jump to + // the correct position to start reading. since we are about to write the validity vector + // here we need to adjust our computed mask to take into account the write row bounds. + int const in_write_row_bounds = ballot(row_index >= first_row && row_index < last_row); + int const write_start = __ffs(in_write_row_bounds) - 1; // first bit in the warp to store + int warp_null_count = 0; + if (write_start >= 0) { + uint32_t const warp_validity_mask = ballot(is_valid); + // lane 0 from each warp writes out validity + if ((t % cudf::detail::warp_size) == 0) { + int const vindex = (value_count + thread_value_count) - 1; // absolute input value index + int const bit_offset = (valid_map_offset + vindex + write_start) - + first_row; // absolute bit offset into the output validity map + int const write_end = + cudf::detail::warp_size - __clz(in_write_row_bounds); // last bit in the warp to store + int const bit_count = write_end - write_start; + warp_null_count = bit_count - __popc(warp_validity_mask >> write_start); - // only need to process definition levels if the column has nulls - if (nullable_with_nulls) { - processed_count += def_decoder.decode_next(t); - __syncthreads(); + store_validity(bit_offset, ni.valid_map, warp_validity_mask >> write_start, bit_count); + } + } - // count of valid items in this batch - next_valid_count = - gpuUpdateValidityOffsetsAndRowIndicesFlat(processed_count, s, sb, def, t); + // sum null counts. we have to do it this way instead of just incrementing by (value_count - + // valid_count) because valid_count also includes rows that potentially start before our row + // bounds. if we could come up with a way to clean that up, we could remove this and just + // compute it directly at the end of the kernel. + size_type const block_null_count = + cudf::detail::single_lane_block_sum_reduce(warp_null_count); + if (t == 0) { ni.null_count += block_null_count; } } - // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip - // this function call entirely since all it will ever generate is a mapping of (i -> i) for - // nz_idx. gpuDecodeValues would be the only work that happens. + // trivial for non-nullable columns else { - processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, nullptr, t); + thread_valid_count = thread_value_count; + block_valid_count = block_value_count; } - __syncthreads(); - // We want to limit the number of dictionary items we decode, that correspond to - // the rows we have processed in this iteration that are valid. - // We know the number of valid rows to process with: next_valid_count - valid_count. - dict_stream.decode_next(t, next_valid_count - valid_count); - __syncthreads(); + // output offset + if (is_valid) { + int const dst_pos = (value_count + thread_value_count) - 1; + int const src_pos = (valid_count + thread_valid_count) - 1; + sb->nz_idx[rolling_index(src_pos)] = dst_pos; + } - // decode the values themselves - gpuDecodeValues(s, sb, valid_count, next_valid_count, t); - __syncthreads(); + // update stuff + value_count += block_value_count; + valid_count += block_valid_count; + } - valid_count = next_valid_count; + if (t == 0) { + // update valid value count for decoding and total # of values we've processed + ni.valid_count = valid_count; + ni.value_count = value_count; // TODO: remove? this is unused in the non-list path + s->nz_count = valid_count; + s->input_value_count = value_count; + s->input_row_count = value_count; } - if (t == 0 and s->error != 0) { set_error(s->error, error_code); } + + return valid_count; +} + +// is the page marked nullable or not +__device__ inline bool is_nullable(page_state_s* s) +{ + auto const lvl = level_type::DEFINITION; + auto const max_def_level = s->col.max_level[lvl]; + return max_def_level > 0; +} + +// for a nullable page, check to see if it could have nulls +__device__ inline bool maybe_has_nulls(page_state_s* s) +{ + auto const lvl = level_type::DEFINITION; + auto const init_run = s->initial_rle_run[lvl]; + // literal runs, lets assume they could hold nulls + if (is_literal_run(init_run)) { return true; } + + // repeated run with number of items in the run not equal + // to the rows in the page, assume that means we could have nulls + if (s->page.num_input_values != (init_run >> 1)) { return true; } + + auto const lvl_bits = s->col.level_bits[lvl]; + auto const run_val = lvl_bits == 0 ? 0 : s->initial_rle_value[lvl]; + + // the encoded repeated value isn't valid, we have (all) nulls + return run_val != s->col.max_level[lvl]; } /** @@ -583,19 +489,28 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) * @param num_rows Maximum number of rows to read * @param error_code Error code to set if an error is encountered */ -template -CUDF_KERNEL void __launch_bounds__(decode_block_size) - gpuDecodeSplitPageDataFlat(PageInfo* pages, - device_span chunks, - size_t min_row, - size_t num_rows, - kernel_error::pointer error_code) +template + typename DecodeValuesFunc> +CUDF_KERNEL void __launch_bounds__(decode_block_size_t) + gpuDecodePageDataGeneric(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + kernel_error::pointer error_code) { + constexpr int rolling_buf_size = decode_block_size_t * 2; + constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size(); + __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s // unused in this kernel - state_buffers; + using state_buf_t = page_state_buffers_s; + __shared__ __align__(16) state_buf_t state_buffers; page_state_s* const s = &state_g; auto* const sb = &state_buffers; @@ -603,9 +518,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) int const t = threadIdx.x; PageInfo* pp = &pages[page_idx]; - if (!(BitAnd(pages[page_idx].kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT))) { - return; - } + if (!(BitAnd(pages[page_idx].kernel_mask, kernel_mask_t))) { return; } // must come after the kernel mask check [[maybe_unused]] null_count_back_copier _{s, t}; @@ -615,30 +528,70 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) chunks, min_row, num_rows, - mask_filter{decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT}, + mask_filter{kernel_mask_t}, page_processing_stage::DECODE)) { return; } - // the level stream decoders - __shared__ rle_run def_runs[rle_run_buffer_size]; - rle_stream def_decoder{def_runs}; - // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. if (s->num_rows == 0) { return; } - bool const nullable = is_nullable(s); - bool const nullable_with_nulls = nullable && has_nulls(s); + DecodeValuesFunc decode_values; + + bool const nullable = is_nullable(s); + bool const should_process_nulls = nullable && maybe_has_nulls(s); + + // shared buffer. all shared memory is suballocated out of here + // constexpr int shared_rep_size = has_lists_t ? cudf::util::round_up_unsafe(rle_run_buffer_size * + // sizeof(rle_run), size_t{16}) : 0; + constexpr int shared_dict_size = + has_dict_t + ? cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}) + : 0; + constexpr int shared_def_size = + cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}); + constexpr int shared_buf_size = /*shared_rep_size +*/ shared_dict_size + shared_def_size; + __shared__ __align__(16) uint8_t shared_buf[shared_buf_size]; + + // setup all shared memory buffers + int shared_offset = 0; + /* + rle_run *rep_runs = reinterpret_cast*>(shared_buf + shared_offset); + if constexpr (has_lists_t){ + shared_offset += shared_rep_size; + } + */ + rle_run* dict_runs = reinterpret_cast*>(shared_buf + shared_offset); + if constexpr (has_dict_t) { shared_offset += shared_dict_size; } + rle_run* def_runs = reinterpret_cast*>(shared_buf + shared_offset); // initialize the stream decoders (requires values computed in setupLocalPageInfo) + rle_stream def_decoder{def_runs}; level_t* const def = reinterpret_cast(pp->lvl_decode_buf[level_type::DEFINITION]); - if (nullable_with_nulls) { + if (should_process_nulls) { def_decoder.init(s->col.level_bits[level_type::DEFINITION], s->abs_lvl_start[level_type::DEFINITION], s->abs_lvl_end[level_type::DEFINITION], def, s->page.num_input_values); } + /* + rle_stream rep_decoder{rep_runs}; + level_t* const rep = reinterpret_cast(pp->lvl_decode_buf[level_type::REPETITION]); + if constexpr(has_lists_t){ + rep_decoder.init(s->col.level_bits[level_type::REPETITION], + s->abs_lvl_start[level_type::REPETITION], + s->abs_lvl_end[level_type::REPETITION], + rep, + s->page.num_input_values); + } + */ + + rle_stream dict_stream{dict_runs}; + if constexpr (has_dict_t) { + dict_stream.init( + s->dict_bits, s->data_start, s->data_end, sb->dict_idx, s->page.num_input_values); + } __syncthreads(); // We use two counters in the loop below: processed_count and valid_count. @@ -655,26 +608,47 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) while (s->error == 0 && processed_count < s->page.num_input_values) { int next_valid_count; - // only need to process definition levels if the column has nulls - if (nullable_with_nulls) { + // only need to process definition levels if this is a nullable column + if (should_process_nulls) { processed_count += def_decoder.decode_next(t); __syncthreads(); - next_valid_count = - gpuUpdateValidityOffsetsAndRowIndicesFlat(processed_count, s, sb, def, t); + if constexpr (has_nesting_t) { + next_valid_count = gpuUpdateValidityAndRowIndicesNested( + processed_count, s, sb, def, t); + } else { + next_valid_count = gpuUpdateValidityAndRowIndicesFlat( + processed_count, s, sb, def, t); + } } // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip // this function call entirely since all it will ever generate is a mapping of (i -> i) for - // nz_idx. gpuDecodeValues would be the only work that happens. + // nz_idx. gpuDecodeFixedWidthValues would be the only work that happens. else { processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, nullptr, t); + + if constexpr (has_nesting_t) { + next_valid_count = + gpuUpdateValidityAndRowIndicesNested( + processed_count, s, sb, nullptr, t); + } else { + next_valid_count = gpuUpdateValidityAndRowIndicesFlat( + processed_count, s, sb, nullptr, t); + } } __syncthreads(); + // if we have dictionary data + if constexpr (has_dict_t) { + // We want to limit the number of dictionary items we decode, that correspond to + // the rows we have processed in this iteration that are valid. + // We know the number of valid rows to process with: next_valid_count - valid_count. + dict_stream.decode_next(t, next_valid_count - valid_count); + __syncthreads(); + } + // decode the values themselves - gpuDecodeSplitValues(s, sb, valid_count, next_valid_count); + decode_values(s, sb, valid_count, next_valid_count, t); __syncthreads(); valid_count = next_valid_count; @@ -689,18 +663,55 @@ void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span pages, size_t num_rows, size_t min_row, int level_type_size, + bool has_nesting, kernel_error::pointer error_code, rmm::cuda_stream_view stream) { + constexpr int decode_block_size = 128; + dim3 dim_block(decode_block_size, 1); dim3 dim_grid(pages.size(), 1); // 1 threadblock per page if (level_type_size == 1) { - gpuDecodePageDataFixed<<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); + if (has_nesting) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } } else { - gpuDecodePageDataFixed<<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); + if (has_nesting) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } } } @@ -709,40 +720,113 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pa size_t num_rows, size_t min_row, int level_type_size, + bool has_nesting, kernel_error::pointer error_code, rmm::cuda_stream_view stream) { - // dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block - // 1 full warp, and 1 warp of 1 thread + constexpr int decode_block_size = 128; + dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks if (level_type_size == 1) { - gpuDecodePageDataFixedDict<<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); + if (has_nesting) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } } else { - gpuDecodePageDataFixedDict<<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); + if (has_nesting) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } } } -void __host__ DecodeSplitPageDataFlat(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - size_t num_rows, - size_t min_row, - int level_type_size, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream) +void __host__ +DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + bool has_nesting, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream) { + constexpr int decode_block_size = 128; + dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks if (level_type_size == 1) { - gpuDecodeSplitPageDataFlat<<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); + if (has_nesting) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } } else { - gpuDecodeSplitPageDataFlat<<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); + if (has_nesting) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } } } diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index cf0dd85e490..d604642be54 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -145,6 +145,11 @@ __device__ inline bool is_nested(ColumnChunkDesc const& chunk) return chunk.max_nesting_depth > 1; } +__device__ inline bool is_list(ColumnChunkDesc const& chunk) +{ + return chunk.max_level[level_type::REPETITION] > 0; +} + __device__ inline bool is_byte_array(ColumnChunkDesc const& chunk) { return chunk.physical_type == BYTE_ARRAY; @@ -178,14 +183,17 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, return decode_kernel_mask::STRING; } - if (!is_nested(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) { + if (!is_list(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) { if (page.encoding == Encoding::PLAIN) { - return decode_kernel_mask::FIXED_WIDTH_NO_DICT; + return is_nested(chunk) ? decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED + : decode_kernel_mask::FIXED_WIDTH_NO_DICT; } else if (page.encoding == Encoding::PLAIN_DICTIONARY || page.encoding == Encoding::RLE_DICTIONARY) { - return decode_kernel_mask::FIXED_WIDTH_DICT; + return is_nested(chunk) ? decode_kernel_mask::FIXED_WIDTH_DICT_NESTED + : decode_kernel_mask::FIXED_WIDTH_DICT; } else if (page.encoding == Encoding::BYTE_STREAM_SPLIT) { - return decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT; + return is_nested(chunk) ? decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED + : decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT; } } diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index d82c6f0de59..efc1f5ebab1 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -207,16 +207,20 @@ enum level_type { * Used to control which decode kernels to run. */ enum class decode_kernel_mask { - NONE = 0, - GENERAL = (1 << 0), // Run catch-all decode kernel - STRING = (1 << 1), // Run decode kernel for string data - DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data - DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data - DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data - FIXED_WIDTH_NO_DICT = (1 << 5), // Run decode kernel for fixed width non-dictionary pages - FIXED_WIDTH_DICT = (1 << 6), // Run decode kernel for fixed width dictionary pages - BYTE_STREAM_SPLIT = (1 << 7), // Run decode kernel for BYTE_STREAM_SPLIT encoded data - BYTE_STREAM_SPLIT_FLAT = (1 << 8), // Same as above but with a flat schema + NONE = 0, + GENERAL = (1 << 0), // Run catch-all decode kernel + STRING = (1 << 1), // Run decode kernel for string data + DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data + DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data + DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data + FIXED_WIDTH_NO_DICT = (1 << 5), // Run decode kernel for fixed width non-dictionary pages + FIXED_WIDTH_DICT = (1 << 6), // Run decode kernel for fixed width dictionary pages + BYTE_STREAM_SPLIT = (1 << 7), // Run decode kernel for BYTE_STREAM_SPLIT encoded data + BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT = (1 << 8), // Same as above but for flat, fixed-width data + BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED = + (1 << 9), // Same as above but for nested, fixed-width data + FIXED_WIDTH_NO_DICT_NESTED = (1 << 10), // Run decode kernel for fixed width non-dictionary pages + FIXED_WIDTH_DICT_NESTED = (1 << 11), // Run decode kernel for fixed width dictionary pages }; // mask representing all the ways in which a string can be encoded @@ -888,6 +892,7 @@ void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_span pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[in] has_nesting Whether or not the data contains nested (but not list) data. * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ @@ -896,6 +901,7 @@ void DecodePageDataFixed(cudf::detail::hostdevice_span pages, std::size_t num_rows, size_t min_row, int level_type_size, + bool has_nesting, kernel_error::pointer error_code, rmm::cuda_stream_view stream); @@ -910,6 +916,7 @@ void DecodePageDataFixed(cudf::detail::hostdevice_span pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[in] has_nesting Whether or not the data contains nested (but not list) data. * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ @@ -918,11 +925,12 @@ void DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, std::size_t num_rows, size_t min_row, int level_type_size, + bool has_nesting, kernel_error::pointer error_code, rmm::cuda_stream_view stream); /** - * @brief Launches kernel for reading dictionary fixed width column data stored in the pages + * @brief Launches kernel for reading fixed width column data stored in the pages * * The page data will be written to the output pointed to in the page's * associated column chunk. @@ -932,16 +940,18 @@ void DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[in] has_nesting Whether or not the data contains nested (but not list) data. * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ -void DecodeSplitPageDataFlat(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - std::size_t num_rows, - size_t min_row, - int level_type_size, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream); +void DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + std::size_t num_rows, + size_t min_row, + int level_type_size, + bool has_nesting, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream); /** * @brief Launches kernel for initializing encoder row group fragments diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 1bd2fae281c..f705f6626e7 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -267,14 +267,27 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num } // launch byte stream split decoder - if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT) != 0) { - DecodeSplitPageDataFlat(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - error_code.data(), - streams[s_idx++]); + if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT) != 0) { + DecodeSplitPageFixedWidthData(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + false, + error_code.data(), + streams[s_idx++]); + } + + // launch byte stream split decoder, for nested columns + if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED) != 0) { + DecodeSplitPageFixedWidthData(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + true, + error_code.data(), + streams[s_idx++]); } // launch byte stream split decoder @@ -288,22 +301,50 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num streams[s_idx++]); } + // launch fixed width type decoder if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT) != 0) { DecodePageDataFixed(subpass.pages, pass.chunks, num_rows, skip_rows, level_type_size, + false, + error_code.data(), + streams[s_idx++]); + } + + // launch fixed width type decoder, for nested columns + if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED) != 0) { + DecodePageDataFixed(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + true, error_code.data(), streams[s_idx++]); } + // launch fixed width type decoder with dictionaries if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT) != 0) { DecodePageDataFixedDict(subpass.pages, pass.chunks, num_rows, skip_rows, level_type_size, + false, + error_code.data(), + streams[s_idx++]); + } + + // launch fixed width type decoder with dictionaries, for nested columns + if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT_NESTED) != 0) { + DecodePageDataFixedDict(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + true, error_code.data(), streams[s_idx++]); } diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 84ab83e33d0..a1f4c7b81d8 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1785,7 +1785,8 @@ TEST_F(ParquetWriterTest, DeltaBinaryStartsWithNulls) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } -TEST_F(ParquetWriterTest, ByteStreamSplit) +std::pair, cudf::io::table_input_metadata> +make_byte_stream_split_table(bool as_struct) { constexpr auto num_rows = 100; std::mt19937 engine{31337}; @@ -1802,24 +1803,73 @@ TEST_F(ParquetWriterTest, ByteStreamSplit) // throw in a list to make sure both decoders are working auto col4 = make_parquet_list_col(engine, num_rows, 5, true); - auto expected = table_view{{col0, col1, col2, col3, *col4}}; + std::vector> columns; + columns.reserve(5); + columns.push_back(col0.release()); + columns.push_back(col1.release()); + columns.push_back(col2.release()); + columns.push_back(col3.release()); + columns.push_back(std::move(col4)); + + return [&]() -> std::pair, cudf::io::table_input_metadata> { + auto const encoding = cudf::io::column_encoding::BYTE_STREAM_SPLIT; + + // make as a nested struct + if (as_struct) { + auto valids = + cudf::detail::make_counting_transform_iterator(0, [](int i) { return i % 2 == 0; }); + auto [null_mask, null_count] = cudf::test::detail::make_null_mask(valids, valids + num_rows); + + std::vector> table_cols; + table_cols.push_back( + cudf::make_structs_column(num_rows, std::move(columns), null_count, std::move(null_mask))); + + auto tbl = std::make_unique(std::move(table_cols)); + auto expected = table_view{*tbl}; + + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("struct"); + expected_metadata.column_metadata[0].set_encoding(encoding); + + expected_metadata.column_metadata[0].child(0).set_name("int32s"); + expected_metadata.column_metadata[0].child(1).set_name("int64s"); + expected_metadata.column_metadata[0].child(2).set_name("floats"); + expected_metadata.column_metadata[0].child(3).set_name("doubles"); + expected_metadata.column_metadata[0].child(4).set_name("int32list"); + for (int idx = 0; idx <= 3; idx++) { + expected_metadata.column_metadata[0].child(idx).set_encoding(encoding); + } + expected_metadata.column_metadata[0].child(4).child(1).set_encoding(encoding); - cudf::io::table_input_metadata expected_metadata(expected); - expected_metadata.column_metadata[0].set_name("int32s"); - expected_metadata.column_metadata[1].set_name("int64s"); - expected_metadata.column_metadata[2].set_name("floats"); - expected_metadata.column_metadata[3].set_name("doubles"); - expected_metadata.column_metadata[4].set_name("int32list"); - auto const encoding = cudf::io::column_encoding::BYTE_STREAM_SPLIT; - for (int i = 0; i <= 3; i++) { - expected_metadata.column_metadata[i].set_encoding(encoding); - } + return {std::move(tbl), expected_metadata}; + } + + // make flat + auto tbl = std::make_unique(std::move(columns)); + auto expected = table_view{*tbl}; - expected_metadata.column_metadata[4].child(1).set_encoding(encoding); + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("int32s"); + expected_metadata.column_metadata[1].set_name("int64s"); + expected_metadata.column_metadata[2].set_name("floats"); + expected_metadata.column_metadata[3].set_name("doubles"); + expected_metadata.column_metadata[4].set_name("int32list"); + for (int idx = 0; idx <= 3; idx++) { + expected_metadata.column_metadata[idx].set_encoding(encoding); + } + + expected_metadata.column_metadata[4].child(1).set_encoding(encoding); + return {std::move(tbl), expected_metadata}; + }(); +} + +TEST_F(ParquetWriterTest, ByteStreamSplit) +{ + auto [expected, expected_metadata] = make_byte_stream_split_table(false); auto const filepath = temp_env->get_temp_filepath("ByteStreamSplit.parquet"); cudf::io::parquet_writer_options out_opts = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *expected) .metadata(expected_metadata); cudf::io::write_parquet(out_opts); @@ -1827,7 +1877,24 @@ TEST_F(ParquetWriterTest, ByteStreamSplit) cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); auto result = cudf::io::read_parquet(in_opts); - CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, result.tbl->view()); +} + +TEST_F(ParquetWriterTest, ByteStreamSplitStruct) +{ + auto [expected, expected_metadata] = make_byte_stream_split_table(true); + + auto const filepath = temp_env->get_temp_filepath("ByteStreamSplitStruct.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *expected) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, result.tbl->view()); } TEST_F(ParquetWriterTest, DecimalByteStreamSplit)