diff --git a/cpp/benchmarks/io/parquet/parquet_reader_input.cpp b/cpp/benchmarks/io/parquet/parquet_reader_input.cpp index 019e0f30fe9..7563c823454 100644 --- a/cpp/benchmarks/io/parquet/parquet_reader_input.cpp +++ b/cpp/benchmarks/io/parquet/parquet_reader_input.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,20 +59,18 @@ void parquet_read_common(cudf::size_type num_rows_to_read, } template -void BM_parquet_read_data(nvbench::state& state, nvbench::type_list>) +void BM_parquet_read_data_common(nvbench::state& state, + data_profile const& profile, + nvbench::type_list>) { auto const d_type = get_type_or_group(static_cast(DataType)); - auto const cardinality = static_cast(state.get_int64("cardinality")); - auto const run_length = static_cast(state.get_int64("run_length")); auto const source_type = retrieve_io_type_enum(state.get_string("io_type")); auto const compression = cudf::io::compression_type::SNAPPY; cuio_source_sink_pair source_sink(source_type); auto const num_rows_written = [&]() { - auto const tbl = create_random_table( - cycle_dtypes(d_type, num_cols), - table_size_bytes{data_size}, - data_profile_builder().cardinality(cardinality).avg_run_length(run_length)); + auto const tbl = + create_random_table(cycle_dtypes(d_type, num_cols), table_size_bytes{data_size}, profile); auto const view = tbl->view(); cudf::io::parquet_writer_options write_opts = @@ -85,6 +83,32 @@ void BM_parquet_read_data(nvbench::state& state, nvbench::type_list +void BM_parquet_read_data(nvbench::state& state, + nvbench::type_list> type_list) +{ + auto const cardinality = static_cast(state.get_int64("cardinality")); + auto const run_length = static_cast(state.get_int64("run_length")); + BM_parquet_read_data_common( + state, data_profile_builder().cardinality(cardinality).avg_run_length(run_length), type_list); +} + +template +void BM_parquet_read_fixed_width_struct(nvbench::state& state, + nvbench::type_list> type_list) +{ + auto const cardinality = static_cast(state.get_int64("cardinality")); + auto const run_length = static_cast(state.get_int64("run_length")); + std::vector s_types{ + cudf::type_id::INT32, cudf::type_id::FLOAT32, cudf::type_id::INT64}; + BM_parquet_read_data_common(state, + data_profile_builder() + .cardinality(cardinality) + .avg_run_length(run_length) + .struct_types(s_types), + type_list); +} + void BM_parquet_read_io_compression(nvbench::state& state) { auto const d_type = get_type_or_group({static_cast(data_type::INTEGRAL), @@ -247,3 +271,13 @@ NVBENCH_BENCH(BM_parquet_read_io_small_mixed) .add_int64_axis("cardinality", {0, 1000}) .add_int64_axis("run_length", {1, 32}) .add_int64_axis("num_string_cols", {1, 2, 3}); + +// a benchmark for structs that only contain fixed-width types +using d_type_list_struct_only = nvbench::enum_type_list; +NVBENCH_BENCH_TYPES(BM_parquet_read_fixed_width_struct, NVBENCH_TYPE_AXES(d_type_list_struct_only)) + .set_name("parquet_read_fixed_width_struct") + .set_type_axes_names({"data_type"}) + .add_string_axis("io_type", {"DEVICE_BUFFER"}) + .set_min_samples(4) + .add_int64_axis("cardinality", {0, 1000}) + .add_int64_axis("run_length", {1, 32}); diff --git a/cpp/src/io/parquet/decode_fixed.cu b/cpp/src/io/parquet/decode_fixed.cu index bfd89200786..ea80ae73c2f 100644 --- a/cpp/src/io/parquet/decode_fixed.cu +++ b/cpp/src/io/parquet/decode_fixed.cu @@ -24,136 +24,11 @@ namespace cudf::io::parquet::detail { namespace { -constexpr int decode_block_size = 128; -constexpr int rolling_buf_size = decode_block_size * 2; -// the required number of runs in shared memory we will need to provide the -// rle_stream object -constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size(); - -template -static __device__ int gpuUpdateValidityOffsetsAndRowIndicesFlat( - int32_t target_value_count, page_state_s* s, state_buf* sb, level_t const* const def, int t) -{ - constexpr int num_warps = decode_block_size / cudf::detail::warp_size; - constexpr int max_batch_size = num_warps * cudf::detail::warp_size; - - auto& ni = s->nesting_info[0]; - - // how many (input) values we've processed in the page so far - int value_count = s->input_value_count; - int valid_count = ni.valid_count; - - // cap by last row so that we don't process any rows past what we want to output. - int const first_row = s->first_row; - int const last_row = first_row + s->num_rows; - int const capped_target_value_count = min(target_value_count, last_row); - - int const valid_map_offset = ni.valid_map_offset; - int const row_index_lower_bound = s->row_index_lower_bound; - - __syncthreads(); - - while (value_count < capped_target_value_count) { - int const batch_size = min(max_batch_size, capped_target_value_count - value_count); - - // definition level. only need to process for nullable columns - int d = 0; - if constexpr (nullable) { - d = t < batch_size - ? static_cast(def[rolling_index(value_count + t)]) - : -1; - } - - int const thread_value_count = t + 1; - int const block_value_count = batch_size; - - // compute our row index, whether we're in row bounds, and validity - int const row_index = (thread_value_count + value_count) - 1; - int const in_row_bounds = (row_index >= row_index_lower_bound) && (row_index < last_row); - int is_valid; - if constexpr (nullable) { - is_valid = ((d > 0) && in_row_bounds) ? 1 : 0; - } else { - is_valid = in_row_bounds; - } - - // thread and block validity count - int thread_valid_count, block_valid_count; - if constexpr (nullable) { - using block_scan = cub::BlockScan; - __shared__ typename block_scan::TempStorage scan_storage; - block_scan(scan_storage).InclusiveSum(is_valid, thread_valid_count, block_valid_count); - __syncthreads(); - - // validity is processed per-warp - // - // nested schemas always read and write to the same bounds (that is, read and write - // positions are already pre-bounded by first_row/num_rows). flat schemas will start reading - // at the first value, even if that is before first_row, because we cannot trivially jump to - // the correct position to start reading. since we are about to write the validity vector - // here we need to adjust our computed mask to take into account the write row bounds. - int const in_write_row_bounds = ballot(row_index >= first_row && row_index < last_row); - int const write_start = __ffs(in_write_row_bounds) - 1; // first bit in the warp to store - int warp_null_count = 0; - if (write_start >= 0) { - uint32_t const warp_validity_mask = ballot(is_valid); - // lane 0 from each warp writes out validity - if ((t % cudf::detail::warp_size) == 0) { - int const vindex = (value_count + thread_value_count) - 1; // absolute input value index - int const bit_offset = (valid_map_offset + vindex + write_start) - - first_row; // absolute bit offset into the output validity map - int const write_end = - cudf::detail::warp_size - __clz(in_write_row_bounds); // last bit in the warp to store - int const bit_count = write_end - write_start; - warp_null_count = bit_count - __popc(warp_validity_mask >> write_start); - - store_validity(bit_offset, ni.valid_map, warp_validity_mask >> write_start, bit_count); - } - } - - // sum null counts. we have to do it this way instead of just incrementing by (value_count - - // valid_count) because valid_count also includes rows that potentially start before our row - // bounds. if we could come up with a way to clean that up, we could remove this and just - // compute it directly at the end of the kernel. - size_type const block_null_count = - cudf::detail::single_lane_block_sum_reduce(warp_null_count); - if (t == 0) { ni.null_count += block_null_count; } - } - // trivial for non-nullable columns - else { - thread_valid_count = thread_value_count; - block_valid_count = block_value_count; - } - - // output offset - if (is_valid) { - int const dst_pos = (value_count + thread_value_count) - 1; - int const src_pos = (valid_count + thread_valid_count) - 1; - sb->nz_idx[rolling_index(src_pos)] = dst_pos; - } - - // update stuff - value_count += block_value_count; - valid_count += block_valid_count; - } - - if (t == 0) { - // update valid value count for decoding and total # of values we've processed - ni.valid_count = valid_count; - ni.value_count = value_count; - s->nz_count = valid_count; - s->input_value_count = value_count; - s->input_row_count = value_count; - } - - return valid_count; -} - -template -__device__ inline void gpuDecodeValues( +template +__device__ inline void gpuDecodeFixedWidthValues( page_state_s* s, state_buf* const sb, int start, int end, int t) { - constexpr int num_warps = decode_block_size / cudf::detail::warp_size; + constexpr int num_warps = block_size / cudf::detail::warp_size; constexpr int max_batch_size = num_warps * cudf::detail::warp_size; PageNestingDecodeInfo* nesting_info_base = s->nesting_info; @@ -217,18 +92,22 @@ __device__ inline void gpuDecodeValues( } } -template -__device__ inline void gpuDecodeSplitValues(page_state_s* s, - state_buf* const sb, - int start, - int end) +template +struct decode_fixed_width_values_func { + __device__ inline void operator()(page_state_s* s, state_buf* const sb, int start, int end, int t) + { + gpuDecodeFixedWidthValues(s, sb, start, end, t); + } +}; + +template +__device__ inline void gpuDecodeFixedWidthSplitValues( + page_state_s* s, state_buf* const sb, int start, int end, int t) { using cudf::detail::warp_size; - constexpr int num_warps = decode_block_size / warp_size; + constexpr int num_warps = block_size / warp_size; constexpr int max_batch_size = num_warps * warp_size; - auto const t = threadIdx.x; - PageNestingDecodeInfo* nesting_info_base = s->nesting_info; int const dtype = s->col.physical_type; auto const data_len = thrust::distance(s->data_start, s->data_end); @@ -307,266 +186,293 @@ __device__ inline void gpuDecodeSplitValues(page_state_s* s, } } -// is the page marked nullable or not -__device__ inline bool is_nullable(page_state_s* s) -{ - auto const lvl = level_type::DEFINITION; - auto const max_def_level = s->col.max_level[lvl]; - return max_def_level > 0; -} +template +struct decode_fixed_width_split_values_func { + __device__ inline void operator()(page_state_s* s, state_buf* const sb, int start, int end, int t) + { + gpuDecodeFixedWidthSplitValues(s, sb, start, end, t); + } +}; -// for a nullable page, check to see if it could have nulls -__device__ inline bool has_nulls(page_state_s* s) +template +static __device__ int gpuUpdateValidityAndRowIndicesNested( + int32_t target_value_count, page_state_s* s, state_buf* sb, level_t const* const def, int t) { - auto const lvl = level_type::DEFINITION; - auto const init_run = s->initial_rle_run[lvl]; - // literal runs, lets assume they could hold nulls - if (is_literal_run(init_run)) { return true; } - - // repeated run with number of items in the run not equal - // to the rows in the page, assume that means we could have nulls - if (s->page.num_input_values != (init_run >> 1)) { return true; } - - auto const lvl_bits = s->col.level_bits[lvl]; - auto const run_val = lvl_bits == 0 ? 0 : s->initial_rle_value[lvl]; - - // the encoded repeated value isn't valid, we have (all) nulls - return run_val != s->col.max_level[lvl]; -} + constexpr int num_warps = decode_block_size / cudf::detail::warp_size; + constexpr int max_batch_size = num_warps * cudf::detail::warp_size; -/** - * @brief Kernel for computing fixed width non dictionary column data stored in the pages - * - * This function will write the page data and the page data's validity to the - * output specified in the page's column chunk. If necessary, additional - * conversion will be performed to translate from the Parquet datatype to - * desired output datatype. - * - * @param pages List of pages - * @param chunks List of column chunks - * @param min_row Row index to start reading at - * @param num_rows Maximum number of rows to read - * @param error_code Error code to set if an error is encountered - */ -template -CUDF_KERNEL void __launch_bounds__(decode_block_size) - gpuDecodePageDataFixed(PageInfo* pages, - device_span chunks, - size_t min_row, - size_t num_rows, - kernel_error::pointer error_code) -{ - __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s // unused in this kernel - state_buffers; + // how many (input) values we've processed in the page so far + int value_count = s->input_value_count; - page_state_s* const s = &state_g; - auto* const sb = &state_buffers; - int const page_idx = blockIdx.x; - int const t = threadIdx.x; - PageInfo* pp = &pages[page_idx]; + // cap by last row so that we don't process any rows past what we want to output. + int const first_row = s->first_row; + int const last_row = first_row + s->num_rows; + int const capped_target_value_count = min(target_value_count, last_row); - if (!(BitAnd(pages[page_idx].kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT))) { return; } + int const row_index_lower_bound = s->row_index_lower_bound; - // must come after the kernel mask check - [[maybe_unused]] null_count_back_copier _{s, t}; + int const max_depth = s->col.max_nesting_depth - 1; + __syncthreads(); - if (!setupLocalPageInfo(s, - pp, - chunks, - min_row, - num_rows, - mask_filter{decode_kernel_mask::FIXED_WIDTH_NO_DICT}, - page_processing_stage::DECODE)) { - return; - } + while (value_count < capped_target_value_count) { + int const batch_size = min(max_batch_size, capped_target_value_count - value_count); - // the level stream decoders - __shared__ rle_run def_runs[rle_run_buffer_size]; - rle_stream def_decoder{def_runs}; + // definition level. only need to process for nullable columns + int d = 0; + if constexpr (nullable) { + if (def) { + d = t < batch_size + ? static_cast(def[rolling_index(value_count + t)]) + : -1; + } else { + d = t < batch_size ? 1 : -1; + } + } - // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. - if (s->num_rows == 0) { return; } + int const thread_value_count = t + 1; + int const block_value_count = batch_size; - bool const nullable = is_nullable(s); - bool const nullable_with_nulls = nullable && has_nulls(s); + // compute our row index, whether we're in row bounds, and validity + int const row_index = (thread_value_count + value_count) - 1; + int const in_row_bounds = (row_index >= row_index_lower_bound) && (row_index < last_row); + int const in_write_row_bounds = ballot(row_index >= first_row && row_index < last_row); + int const write_start = __ffs(in_write_row_bounds) - 1; // first bit in the warp to store + + // iterate by depth + for (int d_idx = 0; d_idx <= max_depth; d_idx++) { + auto& ni = s->nesting_info[d_idx]; + + int is_valid; + if constexpr (nullable) { + is_valid = ((d >= ni.max_def_level) && in_row_bounds) ? 1 : 0; + } else { + is_valid = in_row_bounds; + } - // initialize the stream decoders (requires values computed in setupLocalPageInfo) - level_t* const def = reinterpret_cast(pp->lvl_decode_buf[level_type::DEFINITION]); - if (nullable_with_nulls) { - def_decoder.init(s->col.level_bits[level_type::DEFINITION], - s->abs_lvl_start[level_type::DEFINITION], - s->abs_lvl_end[level_type::DEFINITION], - def, - s->page.num_input_values); - } - __syncthreads(); + // thread and block validity count + int thread_valid_count, block_valid_count; + if constexpr (nullable) { + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage scan_storage; + block_scan(scan_storage).InclusiveSum(is_valid, thread_valid_count, block_valid_count); + __syncthreads(); + + // validity is processed per-warp + // + // nested schemas always read and write to the same bounds (that is, read and write + // positions are already pre-bounded by first_row/num_rows). flat schemas will start reading + // at the first value, even if that is before first_row, because we cannot trivially jump to + // the correct position to start reading. since we are about to write the validity vector + // here we need to adjust our computed mask to take into account the write row bounds. + int warp_null_count = 0; + if (write_start >= 0 && ni.valid_map != nullptr) { + int const valid_map_offset = ni.valid_map_offset; + uint32_t const warp_validity_mask = ballot(is_valid); + // lane 0 from each warp writes out validity + if ((t % cudf::detail::warp_size) == 0) { + int const vindex = + (value_count + thread_value_count) - 1; // absolute input value index + int const bit_offset = (valid_map_offset + vindex + write_start) - + first_row; // absolute bit offset into the output validity map + int const write_end = cudf::detail::warp_size - + __clz(in_write_row_bounds); // last bit in the warp to store + int const bit_count = write_end - write_start; + warp_null_count = bit_count - __popc(warp_validity_mask >> write_start); + + store_validity(bit_offset, ni.valid_map, warp_validity_mask >> write_start, bit_count); + } + } - // We use two counters in the loop below: processed_count and valid_count. - // - processed_count: number of rows out of num_input_values that we have decoded so far. - // the definition stream returns the number of total rows it has processed in each call - // to decode_next and we accumulate in process_count. - // - valid_count: number of non-null rows we have decoded so far. In each iteration of the - // loop below, we look at the number of valid items (which could be all for non-nullable), - // and valid_count is that running count. - int processed_count = 0; - int valid_count = 0; - // the core loop. decode batches of level stream data using rle_stream objects - // and pass the results to gpuDecodeValues - while (s->error == 0 && processed_count < s->page.num_input_values) { - int next_valid_count; + // sum null counts. we have to do it this way instead of just incrementing by (value_count - + // valid_count) because valid_count also includes rows that potentially start before our row + // bounds. if we could come up with a way to clean that up, we could remove this and just + // compute it directly at the end of the kernel. + size_type const block_null_count = + cudf::detail::single_lane_block_sum_reduce(warp_null_count); + if (t == 0) { ni.null_count += block_null_count; } + } + // trivial for non-nullable columns + else { + thread_valid_count = thread_value_count; + block_valid_count = block_value_count; + } - // only need to process definition levels if the column has nulls - if (nullable_with_nulls) { - processed_count += def_decoder.decode_next(t); - __syncthreads(); + // if this is valid and we're at the leaf, output dst_pos + __syncthreads(); // handle modification of ni.value_count from below + if (is_valid && d_idx == max_depth) { + // for non-list types, the value count is always the same across + int const dst_pos = (value_count + thread_value_count) - 1; + int const src_pos = (ni.valid_count + thread_valid_count) - 1; + sb->nz_idx[rolling_index(src_pos)] = dst_pos; + } + __syncthreads(); // handle modification of ni.value_count from below - next_valid_count = - gpuUpdateValidityOffsetsAndRowIndicesFlat(processed_count, s, sb, def, t); + // update stuff + if (t == 0) { ni.valid_count += block_valid_count; } } - // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip - // this function call entirely since all it will ever generate is a mapping of (i -> i) for - // nz_idx. gpuDecodeValues would be the only work that happens. - else { - processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, nullptr, t); - } - __syncthreads(); - // decode the values themselves - gpuDecodeValues(s, sb, valid_count, next_valid_count, t); - __syncthreads(); + value_count += block_value_count; + } - valid_count = next_valid_count; + if (t == 0) { + // update valid value count for decoding and total # of values we've processed + s->nz_count = s->nesting_info[max_depth].valid_count; + s->input_value_count = value_count; + s->input_row_count = value_count; } - if (t == 0 and s->error != 0) { set_error(s->error, error_code); } + + __syncthreads(); + return s->nesting_info[max_depth].valid_count; } -/** - * @brief Kernel for computing fixed width dictionary column data stored in the pages - * - * This function will write the page data and the page data's validity to the - * output specified in the page's column chunk. If necessary, additional - * conversion will be performed to translate from the Parquet datatype to - * desired output datatype. - * - * @param pages List of pages - * @param chunks List of column chunks - * @param min_row Row index to start reading at - * @param num_rows Maximum number of rows to read - * @param error_code Error code to set if an error is encountered - */ -template -CUDF_KERNEL void __launch_bounds__(decode_block_size) - gpuDecodePageDataFixedDict(PageInfo* pages, - device_span chunks, - size_t min_row, - size_t num_rows, - kernel_error::pointer error_code) +template +static __device__ int gpuUpdateValidityAndRowIndicesFlat( + int32_t target_value_count, page_state_s* s, state_buf* sb, level_t const* const def, int t) { - __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s // unused in this kernel - state_buffers; - - page_state_s* const s = &state_g; - auto* const sb = &state_buffers; - int const page_idx = blockIdx.x; - int const t = threadIdx.x; - PageInfo* pp = &pages[page_idx]; + constexpr int num_warps = decode_block_size / cudf::detail::warp_size; + constexpr int max_batch_size = num_warps * cudf::detail::warp_size; - if (!(BitAnd(pages[page_idx].kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT))) { return; } + auto& ni = s->nesting_info[0]; - // must come after the kernel mask check - [[maybe_unused]] null_count_back_copier _{s, t}; + // how many (input) values we've processed in the page so far + int value_count = s->input_value_count; + int valid_count = ni.valid_count; - if (!setupLocalPageInfo(s, - pp, - chunks, - min_row, - num_rows, - mask_filter{decode_kernel_mask::FIXED_WIDTH_DICT}, - page_processing_stage::DECODE)) { - return; - } + // cap by last row so that we don't process any rows past what we want to output. + int const first_row = s->first_row; + int const last_row = first_row + s->num_rows; + int const capped_target_value_count = min(target_value_count, last_row); - __shared__ rle_run def_runs[rle_run_buffer_size]; - rle_stream def_decoder{def_runs}; + int const valid_map_offset = ni.valid_map_offset; + int const row_index_lower_bound = s->row_index_lower_bound; - __shared__ rle_run dict_runs[rle_run_buffer_size]; - rle_stream dict_stream{dict_runs}; + __syncthreads(); - // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. - if (s->num_rows == 0) { return; } + while (value_count < capped_target_value_count) { + int const batch_size = min(max_batch_size, capped_target_value_count - value_count); - bool const nullable = is_nullable(s); - bool const nullable_with_nulls = nullable && has_nulls(s); + // definition level. only need to process for nullable columns + int d = 0; + if constexpr (nullable) { + if (def) { + d = t < batch_size + ? static_cast(def[rolling_index(value_count + t)]) + : -1; + } else { + d = t < batch_size ? 1 : -1; + } + } - // initialize the stream decoders (requires values computed in setupLocalPageInfo) - level_t* const def = reinterpret_cast(pp->lvl_decode_buf[level_type::DEFINITION]); - if (nullable_with_nulls) { - def_decoder.init(s->col.level_bits[level_type::DEFINITION], - s->abs_lvl_start[level_type::DEFINITION], - s->abs_lvl_end[level_type::DEFINITION], - def, - s->page.num_input_values); - } + int const thread_value_count = t + 1; + int const block_value_count = batch_size; - dict_stream.init( - s->dict_bits, s->data_start, s->data_end, sb->dict_idx, s->page.num_input_values); - __syncthreads(); + // compute our row index, whether we're in row bounds, and validity + int const row_index = (thread_value_count + value_count) - 1; + int const in_row_bounds = (row_index >= row_index_lower_bound) && (row_index < last_row); + int is_valid; + if constexpr (nullable) { + is_valid = ((d > 0) && in_row_bounds) ? 1 : 0; + } else { + is_valid = in_row_bounds; + } - // We use two counters in the loop below: processed_count and valid_count. - // - processed_count: number of rows out of num_input_values that we have decoded so far. - // the definition stream returns the number of total rows it has processed in each call - // to decode_next and we accumulate in process_count. - // - valid_count: number of non-null rows we have decoded so far. In each iteration of the - // loop below, we look at the number of valid items (which could be all for non-nullable), - // and valid_count is that running count. - int processed_count = 0; - int valid_count = 0; + // thread and block validity count + int thread_valid_count, block_valid_count; + if constexpr (nullable) { + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage scan_storage; + block_scan(scan_storage).InclusiveSum(is_valid, thread_valid_count, block_valid_count); + __syncthreads(); - // the core loop. decode batches of level stream data using rle_stream objects - // and pass the results to gpuDecodeValues - while (s->error == 0 && processed_count < s->page.num_input_values) { - int next_valid_count; + // validity is processed per-warp + // + // nested schemas always read and write to the same bounds (that is, read and write + // positions are already pre-bounded by first_row/num_rows). flat schemas will start reading + // at the first value, even if that is before first_row, because we cannot trivially jump to + // the correct position to start reading. since we are about to write the validity vector + // here we need to adjust our computed mask to take into account the write row bounds. + int const in_write_row_bounds = ballot(row_index >= first_row && row_index < last_row); + int const write_start = __ffs(in_write_row_bounds) - 1; // first bit in the warp to store + int warp_null_count = 0; + if (write_start >= 0) { + uint32_t const warp_validity_mask = ballot(is_valid); + // lane 0 from each warp writes out validity + if ((t % cudf::detail::warp_size) == 0) { + int const vindex = (value_count + thread_value_count) - 1; // absolute input value index + int const bit_offset = (valid_map_offset + vindex + write_start) - + first_row; // absolute bit offset into the output validity map + int const write_end = + cudf::detail::warp_size - __clz(in_write_row_bounds); // last bit in the warp to store + int const bit_count = write_end - write_start; + warp_null_count = bit_count - __popc(warp_validity_mask >> write_start); - // only need to process definition levels if the column has nulls - if (nullable_with_nulls) { - processed_count += def_decoder.decode_next(t); - __syncthreads(); + store_validity(bit_offset, ni.valid_map, warp_validity_mask >> write_start, bit_count); + } + } - // count of valid items in this batch - next_valid_count = - gpuUpdateValidityOffsetsAndRowIndicesFlat(processed_count, s, sb, def, t); + // sum null counts. we have to do it this way instead of just incrementing by (value_count - + // valid_count) because valid_count also includes rows that potentially start before our row + // bounds. if we could come up with a way to clean that up, we could remove this and just + // compute it directly at the end of the kernel. + size_type const block_null_count = + cudf::detail::single_lane_block_sum_reduce(warp_null_count); + if (t == 0) { ni.null_count += block_null_count; } } - // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip - // this function call entirely since all it will ever generate is a mapping of (i -> i) for - // nz_idx. gpuDecodeValues would be the only work that happens. + // trivial for non-nullable columns else { - processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, nullptr, t); + thread_valid_count = thread_value_count; + block_valid_count = block_value_count; } - __syncthreads(); - // We want to limit the number of dictionary items we decode, that correspond to - // the rows we have processed in this iteration that are valid. - // We know the number of valid rows to process with: next_valid_count - valid_count. - dict_stream.decode_next(t, next_valid_count - valid_count); - __syncthreads(); + // output offset + if (is_valid) { + int const dst_pos = (value_count + thread_value_count) - 1; + int const src_pos = (valid_count + thread_valid_count) - 1; + sb->nz_idx[rolling_index(src_pos)] = dst_pos; + } - // decode the values themselves - gpuDecodeValues(s, sb, valid_count, next_valid_count, t); - __syncthreads(); + // update stuff + value_count += block_value_count; + valid_count += block_valid_count; + } - valid_count = next_valid_count; + if (t == 0) { + // update valid value count for decoding and total # of values we've processed + ni.valid_count = valid_count; + ni.value_count = value_count; // TODO: remove? this is unused in the non-list path + s->nz_count = valid_count; + s->input_value_count = value_count; + s->input_row_count = value_count; } - if (t == 0 and s->error != 0) { set_error(s->error, error_code); } + + return valid_count; +} + +// is the page marked nullable or not +__device__ inline bool is_nullable(page_state_s* s) +{ + auto const lvl = level_type::DEFINITION; + auto const max_def_level = s->col.max_level[lvl]; + return max_def_level > 0; +} + +// for a nullable page, check to see if it could have nulls +__device__ inline bool maybe_has_nulls(page_state_s* s) +{ + auto const lvl = level_type::DEFINITION; + auto const init_run = s->initial_rle_run[lvl]; + // literal runs, lets assume they could hold nulls + if (is_literal_run(init_run)) { return true; } + + // repeated run with number of items in the run not equal + // to the rows in the page, assume that means we could have nulls + if (s->page.num_input_values != (init_run >> 1)) { return true; } + + auto const lvl_bits = s->col.level_bits[lvl]; + auto const run_val = lvl_bits == 0 ? 0 : s->initial_rle_value[lvl]; + + // the encoded repeated value isn't valid, we have (all) nulls + return run_val != s->col.max_level[lvl]; } /** @@ -583,19 +489,28 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) * @param num_rows Maximum number of rows to read * @param error_code Error code to set if an error is encountered */ -template -CUDF_KERNEL void __launch_bounds__(decode_block_size) - gpuDecodeSplitPageDataFlat(PageInfo* pages, - device_span chunks, - size_t min_row, - size_t num_rows, - kernel_error::pointer error_code) +template + typename DecodeValuesFunc> +CUDF_KERNEL void __launch_bounds__(decode_block_size_t) + gpuDecodePageDataGeneric(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + kernel_error::pointer error_code) { + constexpr int rolling_buf_size = decode_block_size_t * 2; + constexpr int rle_run_buffer_size = rle_stream_required_run_buffer_size(); + __shared__ __align__(16) page_state_s state_g; - __shared__ __align__(16) page_state_buffers_s // unused in this kernel - state_buffers; + using state_buf_t = page_state_buffers_s; + __shared__ __align__(16) state_buf_t state_buffers; page_state_s* const s = &state_g; auto* const sb = &state_buffers; @@ -603,9 +518,7 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) int const t = threadIdx.x; PageInfo* pp = &pages[page_idx]; - if (!(BitAnd(pages[page_idx].kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT))) { - return; - } + if (!(BitAnd(pages[page_idx].kernel_mask, kernel_mask_t))) { return; } // must come after the kernel mask check [[maybe_unused]] null_count_back_copier _{s, t}; @@ -615,30 +528,70 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) chunks, min_row, num_rows, - mask_filter{decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT}, + mask_filter{kernel_mask_t}, page_processing_stage::DECODE)) { return; } - // the level stream decoders - __shared__ rle_run def_runs[rle_run_buffer_size]; - rle_stream def_decoder{def_runs}; - // if we have no work to do (eg, in a skip_rows/num_rows case) in this page. if (s->num_rows == 0) { return; } - bool const nullable = is_nullable(s); - bool const nullable_with_nulls = nullable && has_nulls(s); + DecodeValuesFunc decode_values; + + bool const nullable = is_nullable(s); + bool const should_process_nulls = nullable && maybe_has_nulls(s); + + // shared buffer. all shared memory is suballocated out of here + // constexpr int shared_rep_size = has_lists_t ? cudf::util::round_up_unsafe(rle_run_buffer_size * + // sizeof(rle_run), size_t{16}) : 0; + constexpr int shared_dict_size = + has_dict_t + ? cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}) + : 0; + constexpr int shared_def_size = + cudf::util::round_up_unsafe(rle_run_buffer_size * sizeof(rle_run), size_t{16}); + constexpr int shared_buf_size = /*shared_rep_size +*/ shared_dict_size + shared_def_size; + __shared__ __align__(16) uint8_t shared_buf[shared_buf_size]; + + // setup all shared memory buffers + int shared_offset = 0; + /* + rle_run *rep_runs = reinterpret_cast*>(shared_buf + shared_offset); + if constexpr (has_lists_t){ + shared_offset += shared_rep_size; + } + */ + rle_run* dict_runs = reinterpret_cast*>(shared_buf + shared_offset); + if constexpr (has_dict_t) { shared_offset += shared_dict_size; } + rle_run* def_runs = reinterpret_cast*>(shared_buf + shared_offset); // initialize the stream decoders (requires values computed in setupLocalPageInfo) + rle_stream def_decoder{def_runs}; level_t* const def = reinterpret_cast(pp->lvl_decode_buf[level_type::DEFINITION]); - if (nullable_with_nulls) { + if (should_process_nulls) { def_decoder.init(s->col.level_bits[level_type::DEFINITION], s->abs_lvl_start[level_type::DEFINITION], s->abs_lvl_end[level_type::DEFINITION], def, s->page.num_input_values); } + /* + rle_stream rep_decoder{rep_runs}; + level_t* const rep = reinterpret_cast(pp->lvl_decode_buf[level_type::REPETITION]); + if constexpr(has_lists_t){ + rep_decoder.init(s->col.level_bits[level_type::REPETITION], + s->abs_lvl_start[level_type::REPETITION], + s->abs_lvl_end[level_type::REPETITION], + rep, + s->page.num_input_values); + } + */ + + rle_stream dict_stream{dict_runs}; + if constexpr (has_dict_t) { + dict_stream.init( + s->dict_bits, s->data_start, s->data_end, sb->dict_idx, s->page.num_input_values); + } __syncthreads(); // We use two counters in the loop below: processed_count and valid_count. @@ -655,26 +608,47 @@ CUDF_KERNEL void __launch_bounds__(decode_block_size) while (s->error == 0 && processed_count < s->page.num_input_values) { int next_valid_count; - // only need to process definition levels if the column has nulls - if (nullable_with_nulls) { + // only need to process definition levels if this is a nullable column + if (should_process_nulls) { processed_count += def_decoder.decode_next(t); __syncthreads(); - next_valid_count = - gpuUpdateValidityOffsetsAndRowIndicesFlat(processed_count, s, sb, def, t); + if constexpr (has_nesting_t) { + next_valid_count = gpuUpdateValidityAndRowIndicesNested( + processed_count, s, sb, def, t); + } else { + next_valid_count = gpuUpdateValidityAndRowIndicesFlat( + processed_count, s, sb, def, t); + } } // if we wanted to split off the skip_rows/num_rows case into a separate kernel, we could skip // this function call entirely since all it will ever generate is a mapping of (i -> i) for - // nz_idx. gpuDecodeValues would be the only work that happens. + // nz_idx. gpuDecodeFixedWidthValues would be the only work that happens. else { processed_count += min(rolling_buf_size, s->page.num_input_values - processed_count); - next_valid_count = gpuUpdateValidityOffsetsAndRowIndicesFlat( - processed_count, s, sb, nullptr, t); + + if constexpr (has_nesting_t) { + next_valid_count = + gpuUpdateValidityAndRowIndicesNested( + processed_count, s, sb, nullptr, t); + } else { + next_valid_count = gpuUpdateValidityAndRowIndicesFlat( + processed_count, s, sb, nullptr, t); + } } __syncthreads(); + // if we have dictionary data + if constexpr (has_dict_t) { + // We want to limit the number of dictionary items we decode, that correspond to + // the rows we have processed in this iteration that are valid. + // We know the number of valid rows to process with: next_valid_count - valid_count. + dict_stream.decode_next(t, next_valid_count - valid_count); + __syncthreads(); + } + // decode the values themselves - gpuDecodeSplitValues(s, sb, valid_count, next_valid_count); + decode_values(s, sb, valid_count, next_valid_count, t); __syncthreads(); valid_count = next_valid_count; @@ -689,18 +663,55 @@ void __host__ DecodePageDataFixed(cudf::detail::hostdevice_span pages, size_t num_rows, size_t min_row, int level_type_size, + bool has_nesting, kernel_error::pointer error_code, rmm::cuda_stream_view stream) { + constexpr int decode_block_size = 128; + dim3 dim_block(decode_block_size, 1); dim3 dim_grid(pages.size(), 1); // 1 threadblock per page if (level_type_size == 1) { - gpuDecodePageDataFixed<<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); + if (has_nesting) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } } else { - gpuDecodePageDataFixed<<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); + if (has_nesting) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } } } @@ -709,40 +720,113 @@ void __host__ DecodePageDataFixedDict(cudf::detail::hostdevice_span pa size_t num_rows, size_t min_row, int level_type_size, + bool has_nesting, kernel_error::pointer error_code, rmm::cuda_stream_view stream) { - // dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block - // 1 full warp, and 1 warp of 1 thread + constexpr int decode_block_size = 128; + dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks if (level_type_size == 1) { - gpuDecodePageDataFixedDict<<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); + if (has_nesting) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } } else { - gpuDecodePageDataFixedDict<<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); + if (has_nesting) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } } } -void __host__ DecodeSplitPageDataFlat(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - size_t num_rows, - size_t min_row, - int level_type_size, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream) +void __host__ +DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + bool has_nesting, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream) { + constexpr int decode_block_size = 128; + dim3 dim_block(decode_block_size, 1); // decode_block_size = 128 threads per block dim3 dim_grid(pages.size(), 1); // 1 thread block per page => # blocks if (level_type_size == 1) { - gpuDecodeSplitPageDataFlat<<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); + if (has_nesting) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } } else { - gpuDecodeSplitPageDataFlat<<>>( - pages.device_ptr(), chunks, min_row, num_rows, error_code); + if (has_nesting) { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } else { + gpuDecodePageDataGeneric + <<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); + } } } diff --git a/cpp/src/io/parquet/page_hdr.cu b/cpp/src/io/parquet/page_hdr.cu index cf0dd85e490..d604642be54 100644 --- a/cpp/src/io/parquet/page_hdr.cu +++ b/cpp/src/io/parquet/page_hdr.cu @@ -145,6 +145,11 @@ __device__ inline bool is_nested(ColumnChunkDesc const& chunk) return chunk.max_nesting_depth > 1; } +__device__ inline bool is_list(ColumnChunkDesc const& chunk) +{ + return chunk.max_level[level_type::REPETITION] > 0; +} + __device__ inline bool is_byte_array(ColumnChunkDesc const& chunk) { return chunk.physical_type == BYTE_ARRAY; @@ -178,14 +183,17 @@ __device__ decode_kernel_mask kernel_mask_for_page(PageInfo const& page, return decode_kernel_mask::STRING; } - if (!is_nested(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) { + if (!is_list(chunk) && !is_byte_array(chunk) && !is_boolean(chunk)) { if (page.encoding == Encoding::PLAIN) { - return decode_kernel_mask::FIXED_WIDTH_NO_DICT; + return is_nested(chunk) ? decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED + : decode_kernel_mask::FIXED_WIDTH_NO_DICT; } else if (page.encoding == Encoding::PLAIN_DICTIONARY || page.encoding == Encoding::RLE_DICTIONARY) { - return decode_kernel_mask::FIXED_WIDTH_DICT; + return is_nested(chunk) ? decode_kernel_mask::FIXED_WIDTH_DICT_NESTED + : decode_kernel_mask::FIXED_WIDTH_DICT; } else if (page.encoding == Encoding::BYTE_STREAM_SPLIT) { - return decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT; + return is_nested(chunk) ? decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED + : decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT; } } diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index d82c6f0de59..efc1f5ebab1 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -207,16 +207,20 @@ enum level_type { * Used to control which decode kernels to run. */ enum class decode_kernel_mask { - NONE = 0, - GENERAL = (1 << 0), // Run catch-all decode kernel - STRING = (1 << 1), // Run decode kernel for string data - DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data - DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data - DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data - FIXED_WIDTH_NO_DICT = (1 << 5), // Run decode kernel for fixed width non-dictionary pages - FIXED_WIDTH_DICT = (1 << 6), // Run decode kernel for fixed width dictionary pages - BYTE_STREAM_SPLIT = (1 << 7), // Run decode kernel for BYTE_STREAM_SPLIT encoded data - BYTE_STREAM_SPLIT_FLAT = (1 << 8), // Same as above but with a flat schema + NONE = 0, + GENERAL = (1 << 0), // Run catch-all decode kernel + STRING = (1 << 1), // Run decode kernel for string data + DELTA_BINARY = (1 << 2), // Run decode kernel for DELTA_BINARY_PACKED data + DELTA_BYTE_ARRAY = (1 << 3), // Run decode kernel for DELTA_BYTE_ARRAY encoded data + DELTA_LENGTH_BA = (1 << 4), // Run decode kernel for DELTA_LENGTH_BYTE_ARRAY encoded data + FIXED_WIDTH_NO_DICT = (1 << 5), // Run decode kernel for fixed width non-dictionary pages + FIXED_WIDTH_DICT = (1 << 6), // Run decode kernel for fixed width dictionary pages + BYTE_STREAM_SPLIT = (1 << 7), // Run decode kernel for BYTE_STREAM_SPLIT encoded data + BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT = (1 << 8), // Same as above but for flat, fixed-width data + BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED = + (1 << 9), // Same as above but for nested, fixed-width data + FIXED_WIDTH_NO_DICT_NESTED = (1 << 10), // Run decode kernel for fixed width non-dictionary pages + FIXED_WIDTH_DICT_NESTED = (1 << 11), // Run decode kernel for fixed width dictionary pages }; // mask representing all the ways in which a string can be encoded @@ -888,6 +892,7 @@ void DecodeDeltaLengthByteArray(cudf::detail::hostdevice_span pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[in] has_nesting Whether or not the data contains nested (but not list) data. * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ @@ -896,6 +901,7 @@ void DecodePageDataFixed(cudf::detail::hostdevice_span pages, std::size_t num_rows, size_t min_row, int level_type_size, + bool has_nesting, kernel_error::pointer error_code, rmm::cuda_stream_view stream); @@ -910,6 +916,7 @@ void DecodePageDataFixed(cudf::detail::hostdevice_span pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[in] has_nesting Whether or not the data contains nested (but not list) data. * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ @@ -918,11 +925,12 @@ void DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, std::size_t num_rows, size_t min_row, int level_type_size, + bool has_nesting, kernel_error::pointer error_code, rmm::cuda_stream_view stream); /** - * @brief Launches kernel for reading dictionary fixed width column data stored in the pages + * @brief Launches kernel for reading fixed width column data stored in the pages * * The page data will be written to the output pointed to in the page's * associated column chunk. @@ -932,16 +940,18 @@ void DecodePageDataFixedDict(cudf::detail::hostdevice_span pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[in] has_nesting Whether or not the data contains nested (but not list) data. * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ -void DecodeSplitPageDataFlat(cudf::detail::hostdevice_span pages, - cudf::detail::hostdevice_span chunks, - std::size_t num_rows, - size_t min_row, - int level_type_size, - kernel_error::pointer error_code, - rmm::cuda_stream_view stream); +void DecodeSplitPageFixedWidthData(cudf::detail::hostdevice_span pages, + cudf::detail::hostdevice_span chunks, + std::size_t num_rows, + size_t min_row, + int level_type_size, + bool has_nesting, + kernel_error::pointer error_code, + rmm::cuda_stream_view stream); /** * @brief Launches kernel for initializing encoder row group fragments diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 1bd2fae281c..f705f6626e7 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -267,14 +267,27 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num } // launch byte stream split decoder - if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FLAT) != 0) { - DecodeSplitPageDataFlat(subpass.pages, - pass.chunks, - num_rows, - skip_rows, - level_type_size, - error_code.data(), - streams[s_idx++]); + if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_FLAT) != 0) { + DecodeSplitPageFixedWidthData(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + false, + error_code.data(), + streams[s_idx++]); + } + + // launch byte stream split decoder, for nested columns + if (BitAnd(kernel_mask, decode_kernel_mask::BYTE_STREAM_SPLIT_FIXED_WIDTH_NESTED) != 0) { + DecodeSplitPageFixedWidthData(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + true, + error_code.data(), + streams[s_idx++]); } // launch byte stream split decoder @@ -288,22 +301,50 @@ void reader::impl::decode_page_data(read_mode mode, size_t skip_rows, size_t num streams[s_idx++]); } + // launch fixed width type decoder if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT) != 0) { DecodePageDataFixed(subpass.pages, pass.chunks, num_rows, skip_rows, level_type_size, + false, + error_code.data(), + streams[s_idx++]); + } + + // launch fixed width type decoder, for nested columns + if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_NO_DICT_NESTED) != 0) { + DecodePageDataFixed(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + true, error_code.data(), streams[s_idx++]); } + // launch fixed width type decoder with dictionaries if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT) != 0) { DecodePageDataFixedDict(subpass.pages, pass.chunks, num_rows, skip_rows, level_type_size, + false, + error_code.data(), + streams[s_idx++]); + } + + // launch fixed width type decoder with dictionaries, for nested columns + if (BitAnd(kernel_mask, decode_kernel_mask::FIXED_WIDTH_DICT_NESTED) != 0) { + DecodePageDataFixedDict(subpass.pages, + pass.chunks, + num_rows, + skip_rows, + level_type_size, + true, error_code.data(), streams[s_idx++]); } diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 84ab83e33d0..a1f4c7b81d8 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1785,7 +1785,8 @@ TEST_F(ParquetWriterTest, DeltaBinaryStartsWithNulls) CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); } -TEST_F(ParquetWriterTest, ByteStreamSplit) +std::pair, cudf::io::table_input_metadata> +make_byte_stream_split_table(bool as_struct) { constexpr auto num_rows = 100; std::mt19937 engine{31337}; @@ -1802,24 +1803,73 @@ TEST_F(ParquetWriterTest, ByteStreamSplit) // throw in a list to make sure both decoders are working auto col4 = make_parquet_list_col(engine, num_rows, 5, true); - auto expected = table_view{{col0, col1, col2, col3, *col4}}; + std::vector> columns; + columns.reserve(5); + columns.push_back(col0.release()); + columns.push_back(col1.release()); + columns.push_back(col2.release()); + columns.push_back(col3.release()); + columns.push_back(std::move(col4)); + + return [&]() -> std::pair, cudf::io::table_input_metadata> { + auto const encoding = cudf::io::column_encoding::BYTE_STREAM_SPLIT; + + // make as a nested struct + if (as_struct) { + auto valids = + cudf::detail::make_counting_transform_iterator(0, [](int i) { return i % 2 == 0; }); + auto [null_mask, null_count] = cudf::test::detail::make_null_mask(valids, valids + num_rows); + + std::vector> table_cols; + table_cols.push_back( + cudf::make_structs_column(num_rows, std::move(columns), null_count, std::move(null_mask))); + + auto tbl = std::make_unique(std::move(table_cols)); + auto expected = table_view{*tbl}; + + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("struct"); + expected_metadata.column_metadata[0].set_encoding(encoding); + + expected_metadata.column_metadata[0].child(0).set_name("int32s"); + expected_metadata.column_metadata[0].child(1).set_name("int64s"); + expected_metadata.column_metadata[0].child(2).set_name("floats"); + expected_metadata.column_metadata[0].child(3).set_name("doubles"); + expected_metadata.column_metadata[0].child(4).set_name("int32list"); + for (int idx = 0; idx <= 3; idx++) { + expected_metadata.column_metadata[0].child(idx).set_encoding(encoding); + } + expected_metadata.column_metadata[0].child(4).child(1).set_encoding(encoding); - cudf::io::table_input_metadata expected_metadata(expected); - expected_metadata.column_metadata[0].set_name("int32s"); - expected_metadata.column_metadata[1].set_name("int64s"); - expected_metadata.column_metadata[2].set_name("floats"); - expected_metadata.column_metadata[3].set_name("doubles"); - expected_metadata.column_metadata[4].set_name("int32list"); - auto const encoding = cudf::io::column_encoding::BYTE_STREAM_SPLIT; - for (int i = 0; i <= 3; i++) { - expected_metadata.column_metadata[i].set_encoding(encoding); - } + return {std::move(tbl), expected_metadata}; + } + + // make flat + auto tbl = std::make_unique(std::move(columns)); + auto expected = table_view{*tbl}; - expected_metadata.column_metadata[4].child(1).set_encoding(encoding); + cudf::io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("int32s"); + expected_metadata.column_metadata[1].set_name("int64s"); + expected_metadata.column_metadata[2].set_name("floats"); + expected_metadata.column_metadata[3].set_name("doubles"); + expected_metadata.column_metadata[4].set_name("int32list"); + for (int idx = 0; idx <= 3; idx++) { + expected_metadata.column_metadata[idx].set_encoding(encoding); + } + + expected_metadata.column_metadata[4].child(1).set_encoding(encoding); + return {std::move(tbl), expected_metadata}; + }(); +} + +TEST_F(ParquetWriterTest, ByteStreamSplit) +{ + auto [expected, expected_metadata] = make_byte_stream_split_table(false); auto const filepath = temp_env->get_temp_filepath("ByteStreamSplit.parquet"); cudf::io::parquet_writer_options out_opts = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *expected) .metadata(expected_metadata); cudf::io::write_parquet(out_opts); @@ -1827,7 +1877,24 @@ TEST_F(ParquetWriterTest, ByteStreamSplit) cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); auto result = cudf::io::read_parquet(in_opts); - CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, result.tbl->view()); +} + +TEST_F(ParquetWriterTest, ByteStreamSplitStruct) +{ + auto [expected, expected_metadata] = make_byte_stream_split_table(true); + + auto const filepath = temp_env->get_temp_filepath("ByteStreamSplitStruct.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, *expected) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(*expected, result.tbl->view()); } TEST_F(ParquetWriterTest, DecimalByteStreamSplit)