From 49efa5105810b5496725b7875dad5b6c3f758e89 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 20 Sep 2023 14:33:27 -0700 Subject: [PATCH 01/10] pass error code back to host --- cpp/src/io/parquet/page_data.cu | 23 ++++++++++++++++------- cpp/src/io/parquet/page_decode.cuh | 6 +++--- cpp/src/io/parquet/page_delta_decode.cu | 19 +++++++++++++------ cpp/src/io/parquet/page_string_decode.cu | 19 +++++++++++++------ cpp/src/io/parquet/parquet_gpu.hpp | 6 ++++++ cpp/src/io/parquet/reader_impl.cpp | 16 ++++++++++++---- 6 files changed, 63 insertions(+), 26 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index c26802aa3c2..ef2516d2817 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -430,10 +430,15 @@ static __device__ void gpuOutputGeneric( * @param chunks List of column chunks * @param min_row Row index to start reading at * @param num_rows Maximum number of rows to read + * @param error_code Error code to set if an error is encountered */ template -__global__ void __launch_bounds__(decode_block_size) gpuDecodePageData( - PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) +__global__ void __launch_bounds__(decode_block_size) + gpuDecodePageData(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + int* error_code) { __shared__ __align__(16) page_state_s state_g; __shared__ __align__(16) @@ -472,7 +477,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData( // skipped_leaf_values will always be 0 for flat hierarchies. uint32_t skipped_leaf_values = s->page.skipped_leaf_values; - while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { + while (s->error != 0 && + (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { int target_pos; int src_pos = s->src_pos; @@ -596,6 +602,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData( } __syncthreads(); } + + if (!t and s->error != 0) { atomicOr(error_code, s->error); } } struct mask_tform { @@ -621,6 +629,7 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, + int* error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); @@ -629,11 +638,11 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector& pages, dim3 dim_grid(pages.size(), 1); // 1 threadblock per page if (level_type_size == 1) { - gpuDecodePageData - <<>>(pages.device_ptr(), chunks, min_row, num_rows); + gpuDecodePageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); } else { - gpuDecodePageData - <<>>(pages.device_ptr(), chunks, min_row, num_rows); + gpuDecodePageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); } } diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 26e3c951b2e..c95b2deeeee 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -42,7 +42,7 @@ struct page_state_s { int32_t dict_val; uint32_t initial_rle_run[NUM_LEVEL_TYPES]; // [def,rep] int32_t initial_rle_value[NUM_LEVEL_TYPES]; // [def,rep] - int32_t error; + int error; PageInfo page; ColumnChunkDesc col; @@ -495,7 +495,7 @@ __device__ void gpuDecodeStream( level_run = shuffle(level_run); cur_def += sym_len; } - if (s->error) { break; } + if (s->error != 0) { break; } batch_len = min(num_input_values - value_count, 32); if (level_run & 1) { @@ -851,7 +851,7 @@ __device__ void gpuDecodeLevels(page_state_s* s, constexpr int batch_size = 32; int cur_leaf_count = target_leaf_count; - while (!s->error && s->nz_count < target_leaf_count && + while (s->error != 0 && s->nz_count < target_leaf_count && s->input_value_count < s->num_input_values) { if (has_repetition) { gpuDecodeStream(rep, s, cur_leaf_count, t, level_type::REPETITION); diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index 35f33a761be..8ec90ae890d 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -32,8 +32,12 @@ namespace { // with V2 page headers; see https://www.mail-archive.com/dev@parquet.apache.org/msg11826.html). // this kernel only needs 96 threads (3 warps)(for now). template -__global__ void __launch_bounds__(96) gpuDecodeDeltaBinary( - PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) +__global__ void __launch_bounds__(96) + gpuDecodeDeltaBinary(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + int* error_code) { using cudf::detail::warp_size; __shared__ __align__(16) delta_binary_decoder db_state; @@ -145,6 +149,8 @@ __global__ void __launch_bounds__(96) gpuDecodeDeltaBinary( } __syncthreads(); } + + if (!t and s->error != 0) { atomicOr(error_code, s->error); } } } // anonymous namespace @@ -157,6 +163,7 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages size_t num_rows, size_t min_row, int level_type_size, + int* error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); @@ -165,11 +172,11 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages dim3 dim_grid(pages.size(), 1); // 1 threadblock per page if (level_type_size == 1) { - gpuDecodeDeltaBinary - <<>>(pages.device_ptr(), chunks, min_row, num_rows); + gpuDecodeDeltaBinary<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); } else { - gpuDecodeDeltaBinary - <<>>(pages.device_ptr(), chunks, min_row, num_rows); + gpuDecodeDeltaBinary<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); } } diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 1ac4c95f713..e636e2fd91a 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -582,8 +582,12 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz * @tparam level_t Type used to store decoded repetition and definition levels */ template -__global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData( - PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) +__global__ void __launch_bounds__(decode_block_size) + gpuDecodeStringPageData(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + int* error_code) { __shared__ __align__(16) page_state_s state_g; __shared__ __align__(4) size_type last_offset; @@ -742,6 +746,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodeStringPageData( auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); block_excl_sum(offptr, value_count, s->page.str_offset); + + if (!t and s->error != 0) { atomicOr(error_code, s->error); } } } // anonymous namespace @@ -775,6 +781,7 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector& pa size_t num_rows, size_t min_row, int level_type_size, + int* error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); @@ -783,11 +790,11 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector& pa dim3 dim_grid(pages.size(), 1); // 1 threadblock per page if (level_type_size == 1) { - gpuDecodeStringPageData - <<>>(pages.device_ptr(), chunks, min_row, num_rows); + gpuDecodeStringPageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); } else { - gpuDecodeStringPageData - <<>>(pages.device_ptr(), chunks, min_row, num_rows); + gpuDecodeStringPageData<<>>( + pages.device_ptr(), chunks, min_row, num_rows, error_code); } } diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index a3cc37dee4f..a528a297328 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -566,6 +566,7 @@ void ComputePageStringSizes(cudf::detail::hostdevice_vector& pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ void DecodePageData(cudf::detail::hostdevice_vector& pages, @@ -573,6 +574,7 @@ void DecodePageData(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, + int* error_code, rmm::cuda_stream_view stream); /** @@ -586,6 +588,7 @@ void DecodePageData(cudf::detail::hostdevice_vector& pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use */ void DecodeStringPageData(cudf::detail::hostdevice_vector& pages, @@ -593,6 +596,7 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, + int* error_code, rmm::cuda_stream_view stream); /** @@ -606,6 +610,7 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector& pages, * @param[in] num_rows Total number of rows to read * @param[in] min_row Minimum number of rows to read * @param[in] level_type_size Size in bytes of the type for level decoding + * @param[out] error_code Error code for kernel failures * @param[in] stream CUDA stream to use, default 0 */ void DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages, @@ -613,6 +618,7 @@ void DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, + int* error_code, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 8b0a0bd4eb0..fecc1acca45 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -163,6 +163,8 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) chunk_nested_valids.host_to_device_async(_stream); chunk_nested_data.host_to_device_async(_stream); + rmm::device_scalar error_code(0, _stream); + // get the number of streams we need from the pool and tell them to wait on the H2D copies int const nkernels = std::bitset<32>(kernel_mask).count(); auto streams = cudf::detail::fork_streams(_stream, nkernels); @@ -174,17 +176,20 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) if (has_strings) { auto& stream = streams[s_idx++]; chunk_nested_str_data.host_to_device_async(stream); - gpu::DecodeStringPageData(pages, chunks, num_rows, skip_rows, level_type_size, stream); + gpu::DecodeStringPageData( + pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), stream); } // launch delta binary decoder if ((kernel_mask & gpu::KERNEL_MASK_DELTA_BINARY) != 0) { - gpu::DecodeDeltaBinary(pages, chunks, num_rows, skip_rows, level_type_size, streams[s_idx++]); + gpu::DecodeDeltaBinary( + pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), streams[s_idx++]); } // launch the catch-all page decoder if ((kernel_mask & gpu::KERNEL_MASK_GENERAL) != 0) { - gpu::DecodePageData(pages, chunks, num_rows, skip_rows, level_type_size, streams[s_idx++]); + gpu::DecodePageData( + pages, chunks, num_rows, skip_rows, level_type_size, error_code.data(), streams[s_idx++]); } // synchronize the streams @@ -193,7 +198,10 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) pages.device_to_host_async(_stream); page_nesting.device_to_host_async(_stream); page_nesting_decode.device_to_host_async(_stream); - _stream.synchronize(); + + auto const decode_error = error_code.value(_stream); + CUDF_EXPECTS(decode_error == 0, + "Parquet data decode failed with error code " + std::to_string(decode_error)); // for list columns, add the final offset to every offset buffer. // TODO : make this happen in more efficiently. Maybe use thrust::for_each From 519f37ff52995394930bf3df66cbc7eb575d3d7f Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 21 Sep 2023 11:06:37 -0700 Subject: [PATCH 02/10] fix conditions --- cpp/src/io/parquet/page_data.cu | 2 +- cpp/src/io/parquet/page_decode.cuh | 2 +- cpp/src/io/parquet/page_delta_decode.cu | 3 ++- cpp/src/io/parquet/page_string_decode.cu | 3 ++- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index ef2516d2817..396cd487771 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -477,7 +477,7 @@ __global__ void __launch_bounds__(decode_block_size) // skipped_leaf_values will always be 0 for flat hierarchies. uint32_t skipped_leaf_values = s->page.skipped_leaf_values; - while (s->error != 0 && + while (s->error == 0 && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { int target_pos; int src_pos = s->src_pos; diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index c95b2deeeee..0550ebce93c 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -851,7 +851,7 @@ __device__ void gpuDecodeLevels(page_state_s* s, constexpr int batch_size = 32; int cur_leaf_count = target_leaf_count; - while (s->error != 0 && s->nz_count < target_leaf_count && + while (s->error == 0 && s->nz_count < target_leaf_count && s->input_value_count < s->num_input_values) { if (has_repetition) { gpuDecodeStream(rep, s, cur_leaf_count, t, level_type::REPETITION); diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index 8ec90ae890d..af0904f3893 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -83,7 +83,8 @@ __global__ void __launch_bounds__(96) // that has a value we need. if (skipped_leaf_values > 0) { db->skip_values(skipped_leaf_values); } - while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { + while (s->error == 0 && + (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { uint32_t target_pos; uint32_t const src_pos = s->src_pos; diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index e636e2fd91a..c150938bdba 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -621,7 +621,8 @@ __global__ void __launch_bounds__(decode_block_size) // skipped_leaf_values will always be 0 for flat hierarchies. uint32_t skipped_leaf_values = s->page.skipped_leaf_values; - while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { + while (s->error == 0 && + (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { int target_pos; int src_pos = s->src_pos; From c2e8ff02d5db3c11e3b054ff1e27bd781382e6c5 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 21 Sep 2023 14:35:03 -0700 Subject: [PATCH 03/10] atomic shared error code --- cpp/src/io/parquet/page_decode.cuh | 50 ++++++++++++++++++++---------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 0550ebce93c..fb4f946e6b8 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -21,6 +21,7 @@ #include +#include #include namespace cudf::io::parquet::gpu { @@ -42,7 +43,7 @@ struct page_state_s { int32_t dict_val; uint32_t initial_rle_run[NUM_LEVEL_TYPES]; // [def,rep] int32_t initial_rle_value[NUM_LEVEL_TYPES]; // [def,rep] - int error; + int32_t error; PageInfo page; ColumnChunkDesc col; @@ -68,6 +69,12 @@ struct page_state_s { PageNestingDecodeInfo nesting_decode_cache[max_cacheable_nesting_decode_info]; // points to either nesting_decode_cache above when possible, or to the global source otherwise PageNestingDecodeInfo* nesting_info; + + inline __device__ void set_error_code(int32_t err) volatile + { + cuda::atomic_ref ref{const_cast(error)}; + ref.store(err, cuda::std::memory_order_relaxed); + } }; // buffers only used in the decode kernel. separated from page_state_s to keep @@ -423,7 +430,12 @@ __device__ size_type gpuInitStringDescriptors(page_state_s volatile* s, while (pos < target_pos) { int len = 0; if ((s->col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) { - if (k < dict_size) { len = s->dtype_len_in; } + if (k < dict_size) { + len = s->dtype_len_in; + } else { + s->set_error_code(0x20); + break; + } } else { if (k + 4 <= dict_size) { len = (cur[k]) | (cur[k + 1] << 8) | (cur[k + 2] << 16) | (cur[k + 3] << 24); @@ -470,7 +482,7 @@ __device__ void gpuDecodeStream( int32_t value_count = s->lvl_count[lvl]; int32_t batch_coded_count = 0; - while (value_count < target_count && value_count < num_input_values) { + while (s->error == 0 && value_count < target_count && value_count < num_input_values) { int batch_len; if (level_run <= 1) { // Get a new run symbol from the byte stream @@ -486,7 +498,10 @@ __device__ void gpuDecodeStream( cur++; } } - if (cur > end || level_run <= 1) { s->error = 0x10; } + if (cur > end || level_run <= 1) { + s->set_error_code(0x10); + break; + } sym_len = (int32_t)(cur - cur_def); __threadfence_block(); } @@ -915,7 +930,7 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s, } s->lvl_start[lvl] = cur; - if (cur > end) { s->error = 2; } + if (cur > end) { s->set_error_code(2); } }; // this is a little redundant. if level_bits == 0, then nothing should be encoded @@ -940,8 +955,8 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s, // add back the 4 bytes for the length len += 4; } else { - len = 0; - s->error = 2; + len = 0; + s->set_error_code(2); } } else if (encoding == Encoding::BIT_PACKED) { len = (s->page.num_input_values * level_bits + 7) >> 3; @@ -950,8 +965,8 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s, s->lvl_start[lvl] = cur; s->abs_lvl_start[lvl] = cur; } else { - s->error = 3; - len = 0; + len = 0; + s->set_error_code(3); } s->abs_lvl_end[lvl] = start + len; @@ -1093,7 +1108,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, } if (!t) { - s->error = 0; + s->set_error_code(0); // IMPORTANT : nested schemas can have 0 rows in a page but still have // values. The case is: @@ -1151,7 +1166,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, break; default: // FIXED_LEN_BYTE_ARRAY: s->dtype_len = dtype_len_out; - s->error |= (s->dtype_len <= 0); + s->set_error_code(s->dtype_len <= 0); break; } // Special check for downconversions @@ -1267,7 +1282,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, s->dict_run = 0; s->dict_val = 0; s->dict_bits = (cur < end) ? *cur++ : 0; - if (s->dict_bits > 32 || !s->dict_base) { s->error = (10 << 8) | s->dict_bits; } + if (s->dict_bits > 32 || !s->dict_base) { s->set_error_code((10 << 8) | s->dict_bits); } break; case Encoding::PLAIN: s->dict_size = static_cast(end - cur); @@ -1278,22 +1293,23 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, // first 4 bytes are length of RLE data int const len = (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24); cur += 4; - if (cur + len > end) { s->error = 2; } + if (cur + len > end) { s->set_error_code(2); } s->dict_run = 0; } break; case Encoding::DELTA_BINARY_PACKED: // nothing to do, just don't error break; - default: - s->error = 1; // Unsupported encoding + default: { + s->set_error_code(1); // Unsupported encoding break; + } } - if (cur > end) { s->error = 1; } + if (cur > end) { s->set_error_code(1); } s->lvl_end = cur; s->data_start = cur; s->data_end = end; } else { - s->error = 1; + s->set_error_code(1); } s->lvl_count[level_type::REPETITION] = 0; From da1222410f5e4ff6912052dd1b443648fe1f7c89 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 21 Sep 2023 14:41:45 -0700 Subject: [PATCH 04/10] atomic global --- cpp/src/io/parquet/page_data.cu | 10 ++++++---- cpp/src/io/parquet/page_delta_decode.cu | 9 ++++++--- cpp/src/io/parquet/page_string_decode.cu | 9 ++++++--- cpp/src/io/parquet/parquet_gpu.hpp | 6 +++--- cpp/src/io/parquet/reader_impl.cpp | 2 +- 5 files changed, 22 insertions(+), 14 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 396cd487771..c58d0746665 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -438,7 +438,7 @@ __global__ void __launch_bounds__(decode_block_size) device_span chunks, size_t min_row, size_t num_rows, - int* error_code) + int32_t* error_code) { __shared__ __align__(16) page_state_s state_g; __shared__ __align__(16) @@ -602,8 +602,10 @@ __global__ void __launch_bounds__(decode_block_size) } __syncthreads(); } - - if (!t and s->error != 0) { atomicOr(error_code, s->error); } + if (!t and s->error != 0) { + cuda::atomic_ref ref{*error_code}; + ref.store(s->error, cuda::std::memory_order_relaxed); + } } struct mask_tform { @@ -629,7 +631,7 @@ void __host__ DecodePageData(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, - int* error_code, + int32_t* error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index af0904f3893..b512f2a775a 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -37,7 +37,7 @@ __global__ void __launch_bounds__(96) device_span chunks, size_t min_row, size_t num_rows, - int* error_code) + int32_t* error_code) { using cudf::detail::warp_size; __shared__ __align__(16) delta_binary_decoder db_state; @@ -151,7 +151,10 @@ __global__ void __launch_bounds__(96) __syncthreads(); } - if (!t and s->error != 0) { atomicOr(error_code, s->error); } + if (!t and s->error != 0) { + cuda::atomic_ref ref{*error_code}; + ref.store(s->error, cuda::std::memory_order_relaxed); + } } } // anonymous namespace @@ -164,7 +167,7 @@ void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages size_t num_rows, size_t min_row, int level_type_size, - int* error_code, + int32_t* error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index c150938bdba..192979f082a 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -587,7 +587,7 @@ __global__ void __launch_bounds__(decode_block_size) device_span chunks, size_t min_row, size_t num_rows, - int* error_code) + int32_t* error_code) { __shared__ __align__(16) page_state_s state_g; __shared__ __align__(4) size_type last_offset; @@ -748,7 +748,10 @@ __global__ void __launch_bounds__(decode_block_size) auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); block_excl_sum(offptr, value_count, s->page.str_offset); - if (!t and s->error != 0) { atomicOr(error_code, s->error); } + if (!t and s->error != 0) { + cuda::atomic_ref ref{*error_code}; + ref.store(s->error, cuda::std::memory_order_relaxed); + } } } // anonymous namespace @@ -782,7 +785,7 @@ void __host__ DecodeStringPageData(cudf::detail::hostdevice_vector& pa size_t num_rows, size_t min_row, int level_type_size, - int* error_code, + int32_t* error_code, rmm::cuda_stream_view stream) { CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index a528a297328..73a8d08e5c2 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -574,7 +574,7 @@ void DecodePageData(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, - int* error_code, + int32_t* error_code, rmm::cuda_stream_view stream); /** @@ -596,7 +596,7 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, - int* error_code, + int32_t* error_code, rmm::cuda_stream_view stream); /** @@ -618,7 +618,7 @@ void DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages, size_t num_rows, size_t min_row, int level_type_size, - int* error_code, + int32_t* error_code, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index fecc1acca45..0ae57d0d038 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -163,7 +163,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) chunk_nested_valids.host_to_device_async(_stream); chunk_nested_data.host_to_device_async(_stream); - rmm::device_scalar error_code(0, _stream); + rmm::device_scalar error_code(0, _stream); // get the number of streams we need from the pool and tell them to wait on the H2D copies int const nkernels = std::bitset<32>(kernel_mask).count(); From 57e0bc77273462e078439569c5c42ccea8717443 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Thu, 21 Sep 2023 14:42:15 -0700 Subject: [PATCH 05/10] revert test error --- cpp/src/io/parquet/page_decode.cuh | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index fb4f946e6b8..8efdf6e63f6 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -430,12 +430,7 @@ __device__ size_type gpuInitStringDescriptors(page_state_s volatile* s, while (pos < target_pos) { int len = 0; if ((s->col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) { - if (k < dict_size) { - len = s->dtype_len_in; - } else { - s->set_error_code(0x20); - break; - } + if (k < dict_size) { len = s->dtype_len_in; } } else { if (k + 4 <= dict_size) { len = (cur[k]) | (cur[k + 1] << 8) | (cur[k + 2] << 16) | (cur[k + 3] << 24); From 3831bc83baf8430a77bc730a0f37dc8e075facb7 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 27 Sep 2023 01:14:07 -0700 Subject: [PATCH 06/10] codes as masks; enum --- cpp/src/io/parquet/page_data.cu | 4 +- cpp/src/io/parquet/page_decode.cuh | 50 +++++++++++++++++------- cpp/src/io/parquet/page_delta_decode.cu | 4 +- cpp/src/io/parquet/page_string_decode.cu | 4 +- cpp/src/io/parquet/reader_impl.cpp | 7 +++- 5 files changed, 47 insertions(+), 22 deletions(-) diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index c58d0746665..230834632dd 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -602,9 +602,9 @@ __global__ void __launch_bounds__(decode_block_size) } __syncthreads(); } - if (!t and s->error != 0) { + if (t == 0 and s->error != 0) { cuda::atomic_ref ref{*error_code}; - ref.store(s->error, cuda::std::memory_order_relaxed); + ref.fetch_or(s->error, cuda::std::memory_order_relaxed); } } diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 8efdf6e63f6..111329249ec 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -26,6 +26,16 @@ namespace cudf::io::parquet::gpu { +enum class decode_error { + STREAM_OVERRUN = 1, + RLE_STREAM_OVERRUN = 2, + UNSUPPORTED_ENCODING = 4, + INVALID_LEVEL_RUN = 8, + INVALID_DATA_TYPE = 16, + EMPTY_PAGE = 32, + INVALID_DICT_WIDTH = 64, +}; + struct page_state_s { uint8_t const* data_start; uint8_t const* data_end; @@ -70,10 +80,16 @@ struct page_state_s { // points to either nesting_decode_cache above when possible, or to the global source otherwise PageNestingDecodeInfo* nesting_info; - inline __device__ void set_error_code(int32_t err) volatile + inline __device__ void set_error_code(decode_error err) volatile { cuda::atomic_ref ref{const_cast(error)}; - ref.store(err, cuda::std::memory_order_relaxed); + ref.fetch_or(static_cast(err), cuda::std::memory_order_relaxed); + } + + inline __device__ void reset_error_code() volatile + { + cuda::atomic_ref ref{const_cast(error)}; + ref.store(0, cuda::std::memory_order_relaxed); } }; @@ -493,8 +509,12 @@ __device__ void gpuDecodeStream( cur++; } } - if (cur > end || level_run <= 1) { - s->set_error_code(0x10); + if (cur > end) { + s->set_error_code(decode_error::STREAM_OVERRUN); + break; + } + if (level_run <= 1) { + s->set_error_code(decode_error::INVALID_LEVEL_RUN); break; } sym_len = (int32_t)(cur - cur_def); @@ -925,7 +945,7 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s, } s->lvl_start[lvl] = cur; - if (cur > end) { s->set_error_code(2); } + if (cur > end) { s->set_error_code(decode_error::RLE_STREAM_OVERRUN); } }; // this is a little redundant. if level_bits == 0, then nothing should be encoded @@ -951,7 +971,7 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s, len += 4; } else { len = 0; - s->set_error_code(2); + s->set_error_code(decode_error::RLE_STREAM_OVERRUN); } } else if (encoding == Encoding::BIT_PACKED) { len = (s->page.num_input_values * level_bits + 7) >> 3; @@ -961,7 +981,7 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s, s->abs_lvl_start[lvl] = cur; } else { len = 0; - s->set_error_code(3); + s->set_error_code(decode_error::UNSUPPORTED_ENCODING); } s->abs_lvl_end[lvl] = start + len; @@ -1103,7 +1123,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, } if (!t) { - s->set_error_code(0); + s->reset_error_code(); // IMPORTANT : nested schemas can have 0 rows in a page but still have // values. The case is: @@ -1161,7 +1181,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, break; default: // FIXED_LEN_BYTE_ARRAY: s->dtype_len = dtype_len_out; - s->set_error_code(s->dtype_len <= 0); + if (s->dtype_len <= 0) { s->set_error_code(decode_error::INVALID_DATA_TYPE); } break; } // Special check for downconversions @@ -1277,7 +1297,9 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, s->dict_run = 0; s->dict_val = 0; s->dict_bits = (cur < end) ? *cur++ : 0; - if (s->dict_bits > 32 || !s->dict_base) { s->set_error_code((10 << 8) | s->dict_bits); } + if (s->dict_bits > 32 || !s->dict_base) { + s->set_error_code(decode_error::INVALID_DICT_WIDTH); + } break; case Encoding::PLAIN: s->dict_size = static_cast(end - cur); @@ -1288,23 +1310,23 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, // first 4 bytes are length of RLE data int const len = (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24); cur += 4; - if (cur + len > end) { s->set_error_code(2); } + if (cur + len > end) { s->set_error_code(decode_error::RLE_STREAM_OVERRUN); } s->dict_run = 0; } break; case Encoding::DELTA_BINARY_PACKED: // nothing to do, just don't error break; default: { - s->set_error_code(1); // Unsupported encoding + s->set_error_code(decode_error::UNSUPPORTED_ENCODING); break; } } - if (cur > end) { s->set_error_code(1); } + if (cur > end) { s->set_error_code(decode_error::STREAM_OVERRUN); } s->lvl_end = cur; s->data_start = cur; s->data_end = end; } else { - s->set_error_code(1); + s->set_error_code(decode_error::EMPTY_PAGE); } s->lvl_count[level_type::REPETITION] = 0; diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu index b512f2a775a..2b78dead205 100644 --- a/cpp/src/io/parquet/page_delta_decode.cu +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -151,9 +151,9 @@ __global__ void __launch_bounds__(96) __syncthreads(); } - if (!t and s->error != 0) { + if (t == 0 and s->error != 0) { cuda::atomic_ref ref{*error_code}; - ref.store(s->error, cuda::std::memory_order_relaxed); + ref.fetch_or(s->error, cuda::std::memory_order_relaxed); } } diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 192979f082a..d79abe4a6d2 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -748,9 +748,9 @@ __global__ void __launch_bounds__(decode_block_size) auto const offptr = reinterpret_cast(nesting_info_base[leaf_level_index].data_out); block_excl_sum(offptr, value_count, s->page.str_offset); - if (!t and s->error != 0) { + if (t == 0 and s->error != 0) { cuda::atomic_ref ref{*error_code}; - ref.store(s->error, cuda::std::memory_order_relaxed); + ref.fetch_or(s->error, cuda::std::memory_order_relaxed); } } diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 0ae57d0d038..159520506c1 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -200,8 +200,11 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) page_nesting_decode.device_to_host_async(_stream); auto const decode_error = error_code.value(_stream); - CUDF_EXPECTS(decode_error == 0, - "Parquet data decode failed with error code " + std::to_string(decode_error)); + if (decode_error != 0) { + std::stringstream stream; + stream << std::hex << decode_error; + CUDF_FAIL("Parquet data decode failed with code(s) " + stream.str()); + } // for list columns, add the final offset to every offset buffer. // TODO : make this happen in more efficiently. Maybe use thrust::for_each From 4c2a235c30f7c03838a6cf05f2c93ece4331a77f Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 27 Sep 2023 12:45:24 -0700 Subject: [PATCH 07/10] memory order Co-authored-by: Yunsong Wang --- cpp/src/io/parquet/page_decode.cuh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 1b852a58149..f32bfe3c636 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -90,7 +90,7 @@ struct page_state_s { inline __device__ void reset_error_code() volatile { cuda::atomic_ref ref{const_cast(error)}; - ref.store(0, cuda::std::memory_order_relaxed); + ref.store(0, cuda::std::memory_order_release); } }; From 0d15e496e569b5eb8f9f9b3e286d79a652a137bd Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 27 Sep 2023 12:48:22 -0700 Subject: [PATCH 08/10] move enum; docs --- cpp/src/io/parquet/page_decode.cuh | 10 ---------- cpp/src/io/parquet/parquet_gpu.hpp | 15 +++++++++++++++ cpp/src/io/parquet/reader_impl.cpp | 2 +- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 111329249ec..879a6c475f2 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -26,16 +26,6 @@ namespace cudf::io::parquet::gpu { -enum class decode_error { - STREAM_OVERRUN = 1, - RLE_STREAM_OVERRUN = 2, - UNSUPPORTED_ENCODING = 4, - INVALID_LEVEL_RUN = 8, - INVALID_DATA_TYPE = 16, - EMPTY_PAGE = 32, - INVALID_DICT_WIDTH = 64, -}; - struct page_state_s { uint8_t const* data_start; uint8_t const* data_end; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 73a8d08e5c2..6057677b21c 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -54,6 +54,21 @@ constexpr int rolling_index(int index) return index % rolling_size; } +/** + * @brief Enum for the different types of errors that can occur during decoding. + * + * These values are used as bitmasks, so they must be powers of 2. + */ +enum class decode_error : int32_t { + STREAM_OVERRUN = 1, + RLE_STREAM_OVERRUN = 2, + UNSUPPORTED_ENCODING = 4, + INVALID_LEVEL_RUN = 8, + INVALID_DATA_TYPE = 16, + EMPTY_PAGE = 32, + INVALID_DICT_WIDTH = 64, +}; + /** * @brief Struct representing an input column in the file. */ diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 159520506c1..6cbe64e227b 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -203,7 +203,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) if (decode_error != 0) { std::stringstream stream; stream << std::hex << decode_error; - CUDF_FAIL("Parquet data decode failed with code(s) " + stream.str()); + CUDF_FAIL("Parquet data decode failed with code(s) 0x" + stream.str()); } // for list columns, add the final offset to every offset buffer. From 575f40c44606b77c3ef62f5a1d355a4588cedef6 Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 27 Sep 2023 12:50:16 -0700 Subject: [PATCH 09/10] style --- cpp/src/io/parquet/page_decode.cuh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 209253a7efa..a3f514dea42 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -63,14 +63,14 @@ struct page_state_s { uint8_t const* abs_lvl_end[NUM_LEVEL_TYPES]{}; // [def,rep] int32_t lvl_count[NUM_LEVEL_TYPES]{}; // how many of each of the streams we've decoded int32_t row_index_lower_bound{}; // lower bound of row indices we should process - + // a shared-memory cache of frequently used data when decoding. The source of this data is // normally stored in global memory which can yield poor performance. So, when possible // we copy that info here prior to decoding PageNestingDecodeInfo nesting_decode_cache[max_cacheable_nesting_decode_info]{}; // points to either nesting_decode_cache above when possible, or to the global source otherwise PageNestingDecodeInfo* nesting_info{}; - + inline __device__ void set_error_code(decode_error err) volatile { cuda::atomic_ref ref{const_cast(error)}; From 718c16658f17114926893e29a70955d3579efd1d Mon Sep 17 00:00:00 2001 From: Vukasin Milovanovic Date: Wed, 27 Sep 2023 13:50:44 -0700 Subject: [PATCH 10/10] rename some codes --- cpp/src/io/parquet/page_decode.cuh | 10 +++++----- cpp/src/io/parquet/parquet_gpu.hpp | 14 +++++++------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index a3f514dea42..cdc29197eb3 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -501,7 +501,7 @@ __device__ void gpuDecodeStream( } } if (cur > end) { - s->set_error_code(decode_error::STREAM_OVERRUN); + s->set_error_code(decode_error::LEVEL_STREAM_OVERRUN); break; } if (level_run <= 1) { @@ -936,7 +936,7 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s, } s->lvl_start[lvl] = cur; - if (cur > end) { s->set_error_code(decode_error::RLE_STREAM_OVERRUN); } + if (cur > end) { s->set_error_code(decode_error::LEVEL_STREAM_OVERRUN); } }; // this is a little redundant. if level_bits == 0, then nothing should be encoded @@ -962,7 +962,7 @@ inline __device__ uint32_t InitLevelSection(page_state_s* s, len += 4; } else { len = 0; - s->set_error_code(decode_error::RLE_STREAM_OVERRUN); + s->set_error_code(decode_error::LEVEL_STREAM_OVERRUN); } } else if (encoding == Encoding::BIT_PACKED) { len = (s->page.num_input_values * level_bits + 7) >> 3; @@ -1301,7 +1301,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, // first 4 bytes are length of RLE data int const len = (cur[0]) + (cur[1] << 8) + (cur[2] << 16) + (cur[3] << 24); cur += 4; - if (cur + len > end) { s->set_error_code(decode_error::RLE_STREAM_OVERRUN); } + if (cur + len > end) { s->set_error_code(decode_error::DATA_STREAM_OVERRUN); } s->dict_run = 0; } break; case Encoding::DELTA_BINARY_PACKED: @@ -1312,7 +1312,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, break; } } - if (cur > end) { s->set_error_code(decode_error::STREAM_OVERRUN); } + if (cur > end) { s->set_error_code(decode_error::DATA_STREAM_OVERRUN); } s->lvl_end = cur; s->data_start = cur; s->data_end = end; diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 529e939e7a5..3c37c0df021 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -60,13 +60,13 @@ constexpr int rolling_index(int index) * These values are used as bitmasks, so they must be powers of 2. */ enum class decode_error : int32_t { - STREAM_OVERRUN = 1, - RLE_STREAM_OVERRUN = 2, - UNSUPPORTED_ENCODING = 4, - INVALID_LEVEL_RUN = 8, - INVALID_DATA_TYPE = 16, - EMPTY_PAGE = 32, - INVALID_DICT_WIDTH = 64, + DATA_STREAM_OVERRUN = 0x1, + LEVEL_STREAM_OVERRUN = 0x2, + UNSUPPORTED_ENCODING = 0x4, + INVALID_LEVEL_RUN = 0x8, + INVALID_DATA_TYPE = 0x10, + EMPTY_PAGE = 0x20, + INVALID_DICT_WIDTH = 0x40, }; /**