diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index a43c6d4cbb6..17ccb73c0a8 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -22,6 +22,7 @@ #include +#include #include namespace cudf::io::parquet::detail { @@ -30,28 +31,14 @@ namespace { constexpr int DEFAULT_BLOCK_SIZE = 256; } -template -CUDF_KERNEL void __launch_bounds__(block_size) - initialize_chunk_hash_maps_kernel(device_span chunks) -{ - auto const chunk = chunks[blockIdx.x]; - auto const t = threadIdx.x; - // fut: Now that per-chunk dict is same size as ck.num_values, try to not use one block per chunk - for (thread_index_type i = 0; i < chunk.dict_map_size; i += block_size) { - if (t + i < chunk.dict_map_size) { - new (&chunk.dict_map_slots[t + i].first) map_type::atomic_key_type{KEY_SENTINEL}; - new (&chunk.dict_map_slots[t + i].second) map_type::atomic_mapped_type{VALUE_SENTINEL}; - } - } -} - template struct equality_functor { column_device_view const& col; - __device__ bool operator()(size_type lhs_idx, size_type rhs_idx) + __device__ bool operator()(key_type lhs_idx, key_type rhs_idx) const { - // We don't call this for nulls so this is fine - auto const equal = cudf::experimental::row::equality::nan_equal_physical_equality_comparator{}; + // We don't call this for nulls so this is fine. + auto constexpr equal = + cudf::experimental::row::equality::nan_equal_physical_equality_comparator{}; return equal(col.element(lhs_idx), col.element(rhs_idx)); } }; @@ -59,38 +46,167 @@ struct equality_functor { template struct hash_functor { column_device_view const& col; - __device__ auto operator()(size_type idx) const + uint32_t const seed = 0; + __device__ auto operator()(key_type idx) const { - return cudf::hashing::detail::MurmurHash3_x86_32{}(col.element(idx)); + return cudf::hashing::detail::MurmurHash3_x86_32{seed}(col.element(idx)); } }; +template struct map_insert_fn { - map_type::device_mutable_view& map; + storage_ref_type const& storage_ref; + EncColumnChunk* const& chunk; template - __device__ bool operator()(column_device_view const& col, size_type i) + __device__ void operator()(size_type const s_start_value_idx, size_type const end_value_idx) { if constexpr (column_device_view::has_element_accessor()) { - auto hash_fn = hash_functor{col}; - auto equality_fn = equality_functor{col}; - return map.insert(std::pair(i, i), hash_fn, equality_fn); + using block_reduce = cub::BlockReduce; + __shared__ typename block_reduce::TempStorage reduce_storage; + + auto const col = chunk->col_desc; + column_device_view const& data_col = *col->leaf_column; + __shared__ size_type total_num_dict_entries; + + using equality_fn_type = equality_functor; + using hash_fn_type = hash_functor; + // Choosing `linear_probing` over `double_hashing` for slighhhtly better performance seen in + // benchmarks. + using probing_scheme_type = cuco::linear_probing; + + // Make a view of the hash map. + auto hash_map_ref = cuco::static_map_ref{cuco::empty_key{KEY_SENTINEL}, + cuco::empty_value{VALUE_SENTINEL}, + equality_fn_type{data_col}, + probing_scheme_type{hash_fn_type{data_col}}, + cuco::thread_scope_block, + storage_ref}; + + // Create a map ref with `cuco::insert` operator + auto map_insert_ref = hash_map_ref.with_operators(cuco::insert); + auto const t = threadIdx.x; + + // Create atomic refs to the current chunk's num_dict_entries and uniq_data_size + cuda::atomic_ref const chunk_num_dict_entries{chunk->num_dict_entries}; + cuda::atomic_ref const chunk_uniq_data_size{chunk->uniq_data_size}; + + // Note: Adjust the following loop to use `cg::tile` if needed in the future. + for (thread_index_type val_idx = s_start_value_idx + t; val_idx - t < end_value_idx; + val_idx += block_size) { + size_type is_unique = 0; + size_type uniq_elem_size = 0; + + // Check if this index is valid. + auto const is_valid = + val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx); + + // Insert tile_val_idx to hash map and count successful insertions. + if (is_valid) { + // Insert the keys using a single thread for best performance for now. + is_unique = map_insert_ref.insert(cuco::pair{val_idx, val_idx}); + uniq_elem_size = [&]() -> size_type { + if (not is_unique) { return 0; } + switch (col->physical_type) { + case Type::INT32: return 4; + case Type::INT64: return 8; + case Type::INT96: return 12; + case Type::FLOAT: return 4; + case Type::DOUBLE: return 8; + case Type::BYTE_ARRAY: { + auto const col_type = data_col.type().id(); + if (col_type == type_id::STRING) { + // Strings are stored as 4 byte length + string bytes + return 4 + data_col.element(val_idx).size_bytes(); + } else if (col_type == type_id::LIST) { + // Binary is stored as 4 byte length + bytes + return 4 + + get_element(data_col, val_idx).size_bytes(); + } + CUDF_UNREACHABLE( + "Byte array only supports string and list column types for dictionary " + "encoding!"); + } + case Type::FIXED_LEN_BYTE_ARRAY: + if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); } + CUDF_UNREACHABLE( + "Fixed length byte array only supports decimal 128 column types for dictionary " + "encoding!"); + default: CUDF_UNREACHABLE("Unsupported type for dictionary encoding"); + } + }(); + } + // Reduce num_unique and uniq_data_size from all tiles. + auto num_unique = block_reduce(reduce_storage).Sum(is_unique); + __syncthreads(); + auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size); + // The first thread in the block atomically updates total num_unique and uniq_data_size + if (t == 0) { + total_num_dict_entries = + chunk_num_dict_entries.fetch_add(num_unique, cuda::std::memory_order_relaxed); + total_num_dict_entries += num_unique; + chunk_uniq_data_size.fetch_add(uniq_data_size, cuda::std::memory_order_relaxed); + } + __syncthreads(); + + // Check if the num unique values in chunk has already exceeded max dict size and early exit + if (total_num_dict_entries > MAX_DICT_SIZE) { return; } + } // for loop } else { CUDF_UNREACHABLE("Unsupported type to insert in map"); } } }; +template struct map_find_fn { - map_type::device_view& map; - + storage_ref_type const& storage_ref; + EncColumnChunk* const& chunk; template - __device__ map_type::device_view::iterator operator()(column_device_view const& col, size_type i) + __device__ void operator()(size_type const s_start_value_idx, + size_type const end_value_idx, + size_type const s_ck_start_val_idx) { if constexpr (column_device_view::has_element_accessor()) { - auto hash_fn = hash_functor{col}; - auto equality_fn = equality_functor{col}; - return map.find(i, hash_fn, equality_fn); + auto const col = chunk->col_desc; + column_device_view const& data_col = *col->leaf_column; + + using equality_fn_type = equality_functor; + using hash_fn_type = hash_functor; + // Choosing `linear_probing` over `double_hashing` for slighhhtly better performance seen in + // benchmarks. + using probing_scheme_type = cuco::linear_probing; + + // Make a view of the hash map. + auto hash_map_ref = cuco::static_map_ref{cuco::empty_key{KEY_SENTINEL}, + cuco::empty_value{VALUE_SENTINEL}, + equality_fn_type{data_col}, + probing_scheme_type{hash_fn_type{data_col}}, + cuco::thread_scope_block, + storage_ref}; + + // Create a map ref with `cuco::find` operator + auto const map_find_ref = hash_map_ref.with_operators(cuco::find); + auto const t = threadIdx.x; + + // Note: Adjust the following loop to use `cg::tiles` if needed in the future. + for (thread_index_type val_idx = s_start_value_idx + t; val_idx < end_value_idx; + val_idx += block_size) { + // Find the key using a single thread for best performance for now. + if (data_col.is_valid(val_idx)) { + // No need for atomic as this is not going to be modified by any other thread. + chunk->dict_index[val_idx - s_ck_start_val_idx] = [&]() { + auto const found_slot = map_find_ref.find(val_idx); + + // Fail if we didn't find the previously inserted key. + cudf_assert(found_slot != map_find_ref.end() && + "Unable to find value in map in dictionary index construction"); + + // Return the found value. + return found_slot->second; + }(); + } + } } else { CUDF_UNREACHABLE("Unsupported type to find in map"); } @@ -99,124 +215,61 @@ struct map_find_fn { template CUDF_KERNEL void __launch_bounds__(block_size) - populate_chunk_hash_maps_kernel(cudf::detail::device_2dspan frags) + populate_chunk_hash_maps_kernel(device_span const map_storage, + cudf::detail::device_2dspan frags) { - auto col_idx = blockIdx.y; - auto block_x = blockIdx.x; - auto t = threadIdx.x; - auto frag = frags[col_idx][block_x]; - auto chunk = frag.chunk; - auto col = chunk->col_desc; + auto const col_idx = blockIdx.y; + auto const block_x = blockIdx.x; + auto const frag = frags[col_idx][block_x]; + auto chunk = frag.chunk; + auto col = chunk->col_desc; if (not chunk->use_dictionary) { return; } - using block_reduce = cub::BlockReduce; - __shared__ typename block_reduce::TempStorage reduce_storage; - size_type start_row = frag.start_row; size_type end_row = frag.start_row + frag.num_rows; - // Find the bounds of values in leaf column to be inserted into the map for current chunk + // Find the bounds of values in leaf column to be inserted into the map for current chunk. size_type const s_start_value_idx = row_to_value_idx(start_row, *col); size_type const end_value_idx = row_to_value_idx(end_row, *col); column_device_view const& data_col = *col->leaf_column; - - // Make a view of the hash map - auto hash_map_mutable = map_type::device_mutable_view(chunk->dict_map_slots, - chunk->dict_map_size, - cuco::empty_key{KEY_SENTINEL}, - cuco::empty_value{VALUE_SENTINEL}); - - __shared__ size_type total_num_dict_entries; - thread_index_type val_idx = s_start_value_idx + t; - while (val_idx - block_size < end_value_idx) { - auto const is_valid = - val_idx < end_value_idx and val_idx < data_col.size() and data_col.is_valid(val_idx); - - // insert element at val_idx to hash map and count successful insertions - size_type is_unique = 0; - size_type uniq_elem_size = 0; - if (is_valid) { - is_unique = - type_dispatcher(data_col.type(), map_insert_fn{hash_map_mutable}, data_col, val_idx); - uniq_elem_size = [&]() -> size_type { - if (not is_unique) { return 0; } - switch (col->physical_type) { - case Type::INT32: return 4; - case Type::INT64: return 8; - case Type::INT96: return 12; - case Type::FLOAT: return 4; - case Type::DOUBLE: return 8; - case Type::BYTE_ARRAY: { - auto const col_type = data_col.type().id(); - if (col_type == type_id::STRING) { - // Strings are stored as 4 byte length + string bytes - return 4 + data_col.element(val_idx).size_bytes(); - } else if (col_type == type_id::LIST) { - // Binary is stored as 4 byte length + bytes - return 4 + get_element(data_col, val_idx).size_bytes(); - } - CUDF_UNREACHABLE( - "Byte array only supports string and list column types for dictionary " - "encoding!"); - } - case Type::FIXED_LEN_BYTE_ARRAY: - if (data_col.type().id() == type_id::DECIMAL128) { return sizeof(__int128_t); } - CUDF_UNREACHABLE( - "Fixed length byte array only supports decimal 128 column types for dictionary " - "encoding!"); - default: CUDF_UNREACHABLE("Unsupported type for dictionary encoding"); - } - }(); - } - - auto num_unique = block_reduce(reduce_storage).Sum(is_unique); - __syncthreads(); - auto uniq_data_size = block_reduce(reduce_storage).Sum(uniq_elem_size); - if (t == 0) { - total_num_dict_entries = atomicAdd(&chunk->num_dict_entries, num_unique); - total_num_dict_entries += num_unique; - atomicAdd(&chunk->uniq_data_size, uniq_data_size); - } - __syncthreads(); - - // Check if the num unique values in chunk has already exceeded max dict size and early exit - if (total_num_dict_entries > MAX_DICT_SIZE) { return; } - - val_idx += block_size; - } // while + storage_ref_type const storage_ref{chunk->dict_map_size, + map_storage.data() + chunk->dict_map_offset}; + type_dispatcher(data_col.type(), + map_insert_fn{storage_ref, chunk}, + s_start_value_idx, + end_value_idx); } template CUDF_KERNEL void __launch_bounds__(block_size) - collect_map_entries_kernel(device_span chunks) + collect_map_entries_kernel(device_span const map_storage, + device_span chunks) { auto& chunk = chunks[blockIdx.x]; if (not chunk.use_dictionary) { return; } - auto t = threadIdx.x; - auto map = map_type::device_view(chunk.dict_map_slots, - chunk.dict_map_size, - cuco::empty_key{KEY_SENTINEL}, - cuco::empty_value{VALUE_SENTINEL}); - - __shared__ cuda::atomic counter; + auto t = threadIdx.x; + __shared__ cuda::atomic counter; using cuda::std::memory_order_relaxed; - if (t == 0) { new (&counter) cuda::atomic{0}; } + if (t == 0) { new (&counter) cuda::atomic{0}; } __syncthreads(); - for (size_type i = 0; i < chunk.dict_map_size; i += block_size) { - if (t + i < chunk.dict_map_size) { - auto* slot = reinterpret_cast(map.begin_slot() + t + i); - auto key = slot->first; + + // Iterate over all windows in the map. + for (; t < chunk.dict_map_size; t += block_size) { + auto window = map_storage.data() + chunk.dict_map_offset + t; + // Collect all slots from each window. + for (auto& slot : *window) { + auto const key = slot.first; if (key != KEY_SENTINEL) { - auto loc = counter.fetch_add(1, memory_order_relaxed); + auto const loc = counter.fetch_add(1, memory_order_relaxed); cudf_assert(loc < MAX_DICT_SIZE && "Number of filled slots exceeds max dict size"); chunk.dict_data[loc] = key; - // If sorting dict page ever becomes a hard requirement, enable the following statement and - // add a dict sorting step before storing into the slot's second field. - // chunk.dict_data_idx[loc] = t + i; - slot->second = loc; + // If sorting dict page ever becomes a hard requirement, enable the following statement + // and add a dict sorting step before storing into the slot's second field. + // chunk.dict_data_idx[loc] = idx; + slot.second = loc; } } } @@ -224,75 +277,60 @@ CUDF_KERNEL void __launch_bounds__(block_size) template CUDF_KERNEL void __launch_bounds__(block_size) - get_dictionary_indices_kernel(cudf::detail::device_2dspan frags) + get_dictionary_indices_kernel(device_span const map_storage, + cudf::detail::device_2dspan frags) { - auto col_idx = blockIdx.y; - auto block_x = blockIdx.x; - auto t = threadIdx.x; - auto frag = frags[col_idx][block_x]; - auto chunk = frag.chunk; - auto col = chunk->col_desc; + auto const col_idx = blockIdx.y; + auto const block_x = blockIdx.x; + auto const frag = frags[col_idx][block_x]; + auto chunk = frag.chunk; if (not chunk->use_dictionary) { return; } size_type start_row = frag.start_row; size_type end_row = frag.start_row + frag.num_rows; + auto const col = chunk->col_desc; // Find the bounds of values in leaf column to be searched in the map for current chunk auto const s_start_value_idx = row_to_value_idx(start_row, *col); auto const s_ck_start_val_idx = row_to_value_idx(chunk->start_row, *col); auto const end_value_idx = row_to_value_idx(end_row, *col); column_device_view const& data_col = *col->leaf_column; - - auto map = map_type::device_view(chunk->dict_map_slots, - chunk->dict_map_size, - cuco::empty_key{KEY_SENTINEL}, - cuco::empty_value{VALUE_SENTINEL}); - - thread_index_type val_idx = s_start_value_idx + t; - while (val_idx < end_value_idx) { - if (data_col.is_valid(val_idx)) { - auto found_slot = type_dispatcher(data_col.type(), map_find_fn{map}, data_col, val_idx); - cudf_assert(found_slot != map.end() && - "Unable to find value in map in dictionary index construction"); - if (found_slot != map.end()) { - // No need for atomic as this is not going to be modified by any other thread - auto* val_ptr = reinterpret_cast(&found_slot->second); - chunk->dict_index[val_idx - s_ck_start_val_idx] = *val_ptr; - } - } - - val_idx += block_size; - } -} - -void initialize_chunk_hash_maps(device_span chunks, rmm::cuda_stream_view stream) -{ - constexpr int block_size = 1024; - initialize_chunk_hash_maps_kernel - <<>>(chunks); + storage_ref_type const storage_ref{chunk->dict_map_size, + map_storage.data() + chunk->dict_map_offset}; + + type_dispatcher(data_col.type(), + map_find_fn{storage_ref, chunk}, + s_start_value_idx, + end_value_idx, + s_ck_start_val_idx); } -void populate_chunk_hash_maps(cudf::detail::device_2dspan frags, +void populate_chunk_hash_maps(device_span const map_storage, + cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { dim3 const dim_grid(frags.size().second, frags.size().first); populate_chunk_hash_maps_kernel - <<>>(frags); + <<>>(map_storage, frags); } -void collect_map_entries(device_span chunks, rmm::cuda_stream_view stream) +void collect_map_entries(device_span const map_storage, + device_span chunks, + rmm::cuda_stream_view stream) { constexpr int block_size = 1024; - collect_map_entries_kernel<<>>(chunks); + collect_map_entries_kernel + <<>>(map_storage, chunks); } -void get_dictionary_indices(cudf::detail::device_2dspan frags, +void get_dictionary_indices(device_span const map_storage, + cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { dim3 const dim_grid(frags.size().second, frags.size().first); get_dictionary_indices_kernel - <<>>(frags); + <<>>(map_storage, frags); } } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/parquet_gpu.cuh b/cpp/src/io/parquet/parquet_gpu.cuh index e3c44c78898..7c09764da2d 100644 --- a/cpp/src/io/parquet/parquet_gpu.cuh +++ b/cpp/src/io/parquet/parquet_gpu.cuh @@ -18,25 +18,37 @@ #include "parquet_gpu.hpp" +#include #include #include -#include +#include +#include namespace cudf::io::parquet::detail { -auto constexpr KEY_SENTINEL = size_type{-1}; -auto constexpr VALUE_SENTINEL = size_type{-1}; +using key_type = size_type; +using mapped_type = size_type; +using slot_type = cuco::pair; -using map_type = cuco::legacy::static_map; +auto constexpr map_cg_size = + 1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset. + ///< Note: Adjust insert and find loops to use `cg::tile` if increasing this. +auto constexpr window_size = + 1; ///< Number of concurrent slots (set for best performance) handled by each thread. +auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size + ///< N * (1/0.7) = 1.43 to target a 70% occupancy factor. -/** - * @brief The alias of `map_type::pair_atomic_type` class. - * - * Declare this struct by trivial subclassing instead of type aliasing so we can have forward - * declaration of this struct somewhere else. - */ -struct slot_type : public map_type::pair_atomic_type {}; +auto constexpr KEY_SENTINEL = key_type{-1}; +auto constexpr VALUE_SENTINEL = mapped_type{-1}; +auto constexpr SCOPE = cuda::thread_scope_block; + +using storage_type = cuco::aow_storage, + cudf::detail::cuco_allocator>; +using storage_ref_type = typename storage_type::ref_type; +using window_type = typename storage_type::window_type; /** * @brief Return the byte length of parquet dtypes that are physically represented by INT32 @@ -81,4 +93,43 @@ inline size_type __device__ row_to_value_idx(size_type idx, return idx; } +/** + * @brief Insert chunk values into their respective hash maps + * + * @param map_storage Bulk hashmap storage + * @param frags Column fragments + * @param stream CUDA stream to use + */ +void populate_chunk_hash_maps(device_span const map_storage, + cudf::detail::device_2dspan frags, + rmm::cuda_stream_view stream); + +/** + * @brief Compact dictionary hash map entries into chunk.dict_data + * + * @param map_storage Bulk hashmap storage + * @param chunks Flat span of chunks to compact hash maps for + * @param stream CUDA stream to use + */ +void collect_map_entries(device_span const map_storage, + device_span chunks, + rmm::cuda_stream_view stream); + +/** + * @brief Get the Dictionary Indices for each row + * + * For each row of a chunk, gets the indices into chunk.dict_data which contains the value otherwise + * stored in input column [row]. Stores these indices into chunk.dict_index. + * + * Since dict_data itself contains indices into the original cudf column, this means that + * col[row] == col[dict_data[dict_index[row - chunk.start_row]]] + * + * @param map_storage Bulk hashmap storage + * @param frags Column fragments + * @param stream CUDA stream to use + */ +void get_dictionary_indices(device_span const map_storage, + cudf::detail::device_2dspan frags, + rmm::cuda_stream_view stream); + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 8f52f073833..125d35f6499 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -514,7 +514,6 @@ constexpr unsigned int kDictHashBits = 16; constexpr size_t kDictScratchSize = (1 << kDictHashBits) * sizeof(uint32_t); struct EncPage; -struct slot_type; // convert Encoding to a mask value constexpr uint32_t encoding_to_mask(Encoding encoding) @@ -560,7 +559,8 @@ struct EncColumnChunk { uint8_t is_compressed; //!< Nonzero if the chunk uses compression uint32_t dictionary_size; //!< Size of dictionary page including header uint32_t ck_stat_size; //!< Size of chunk-level statistics (included in 1st page header) - slot_type* dict_map_slots; //!< Hash map storage for calculating dict encoding for this chunk + uint32_t dict_map_offset; //!< Offset of the hash map storage for calculating dict encoding for + //!< this chunk size_type dict_map_size; //!< Size of dict_map_slots size_type num_dict_entries; //!< Total number of entries in dictionary size_type @@ -1001,46 +1001,6 @@ void InitFragmentStatistics(device_span groups, device_span fragments, rmm::cuda_stream_view stream); -/** - * @brief Initialize per-chunk hash maps used for dictionary with sentinel values - * - * @param chunks Flat span of chunks to initialize hash maps for - * @param stream CUDA stream to use - */ -void initialize_chunk_hash_maps(device_span chunks, rmm::cuda_stream_view stream); - -/** - * @brief Insert chunk values into their respective hash maps - * - * @param frags Column fragments - * @param stream CUDA stream to use - */ -void populate_chunk_hash_maps(cudf::detail::device_2dspan frags, - rmm::cuda_stream_view stream); - -/** - * @brief Compact dictionary hash map entries into chunk.dict_data - * - * @param chunks Flat span of chunks to compact hash maps for - * @param stream CUDA stream to use - */ -void collect_map_entries(device_span chunks, rmm::cuda_stream_view stream); - -/** - * @brief Get the Dictionary Indices for each row - * - * For each row of a chunk, gets the indices into chunk.dict_data which contains the value otherwise - * stored in input column [row]. Stores these indices into chunk.dict_index. - * - * Since dict_data itself contains indices into the original cudf column, this means that - * col[row] == col[dict_data[dict_index[row - chunk.start_row]]] - * - * @param frags Column fragments - * @param stream CUDA stream to use - */ -void get_dictionary_indices(cudf::detail::device_2dspan frags, - rmm::cuda_stream_view stream); - /** * @brief Launches kernel for initializing encoder data pages * diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 74992aa733f..46c3151c731 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1285,10 +1285,10 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, return std::pair(std::move(dict_data), std::move(dict_index)); } - // Allocate slots for each chunk - std::vector> hash_maps_storage; - hash_maps_storage.reserve(h_chunks.size()); - for (auto& chunk : h_chunks) { + // Variable to keep track of the current total map storage size + size_t total_map_storage_size = 0; + // Populate dict offsets and sizes for each chunk that need to build a dictionary. + std::for_each(h_chunks.begin(), h_chunks.end(), [&](auto& chunk) { auto const& chunk_col_desc = col_desc[chunk.col_desc_id]; auto const is_requested_non_dict = chunk_col_desc.requested_encoding != column_encoding::USE_DEFAULT && @@ -1300,19 +1300,31 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, chunk.use_dictionary = false; } else { chunk.use_dictionary = true; - // cuCollections suggests using a hash map of size N * (1/0.7) = num_values * 1.43 - // https://github.com/NVIDIA/cuCollections/blob/3a49fc71/include/cuco/static_map.cuh#L190-L193 - auto& inserted_map = hash_maps_storage.emplace_back(chunk.num_values * 1.43, stream); - chunk.dict_map_slots = inserted_map.data(); - chunk.dict_map_size = inserted_map.size(); + chunk.dict_map_size = + static_cast(cuco::make_window_extent( + static_cast(occupancy_factor * chunk.num_values))); + chunk.dict_map_offset = total_map_storage_size; + total_map_storage_size += chunk.dict_map_size; } - } + }); - chunks.host_to_device_async(stream); + // No chunk needs to create a dictionary, exit early + if (total_map_storage_size == 0) { return {std::move(dict_data), std::move(dict_index)}; } - initialize_chunk_hash_maps(chunks.device_view().flat_view(), stream); - populate_chunk_hash_maps(frags, stream); + // Create a single bulk storage used by all sub-dictionaries + auto map_storage = storage_type{ + total_map_storage_size, + cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}}; + // Create a span of non-const map_storage as map_storage_ref takes in a non-const pointer. + device_span const map_storage_data{map_storage.data(), total_map_storage_size}; + // Synchronize + chunks.host_to_device_async(stream); + // Initialize storage with the given sentinel + map_storage.initialize_async({KEY_SENTINEL, VALUE_SENTINEL}, {stream.value()}); + // Populate the hash map for each chunk + populate_chunk_hash_maps(map_storage_data, frags, stream); + // Synchronize again chunks.device_to_host_sync(stream); // Make decision about which chunks have dictionary @@ -1372,8 +1384,8 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, chunk.dict_index = inserted_dict_index.data(); } chunks.host_to_device_async(stream); - collect_map_entries(chunks.device_view().flat_view(), stream); - get_dictionary_indices(frags, stream); + collect_map_entries(map_storage_data, chunks.device_view().flat_view(), stream); + get_dictionary_indices(map_storage_data, frags, stream); return std::pair(std::move(dict_data), std::move(dict_index)); }