diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index cb582df21e0..a35802f2ab0 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -25,6 +25,7 @@ jobs: - docs-build - wheel-build-cudf - wheel-tests-cudf + - test-cudf-polars - wheel-build-dask-cudf - wheel-tests-dask-cudf - devcontainer @@ -132,6 +133,17 @@ jobs: with: build_type: pull-request script: ci/test_wheel_cudf.sh + test-cudf-polars: + needs: wheel-build-cudf + secrets: inherit + uses: rapidsai/shared-workflows/.github/workflows/wheels-test.yaml@branch-24.08 + with: + # This selects "ARCH=amd64 + the latest supported Python + CUDA". + matrix_filter: map(select(.ARCH == "amd64")) | group_by(.CUDA_VER|split(".")|map(tonumber)|.[0]) | map(max_by([(.PY_VER|split(".")|map(tonumber)), (.CUDA_VER|split(".")|map(tonumber))])) + build_type: pull-request + # This always runs, but only fails if this PR touches code in + # pylibcudf or cudf_polars + script: "ci/test_cudf_polars.sh" wheel-build-dask-cudf: needs: wheel-build-cudf secrets: inherit diff --git a/ci/test_cudf_polars.sh b/ci/test_cudf_polars.sh new file mode 100755 index 00000000000..669e049ab26 --- /dev/null +++ b/ci/test_cudf_polars.sh @@ -0,0 +1,68 @@ +#!/bin/bash +# Copyright (c) 2024, NVIDIA CORPORATION. + +set -eou pipefail + +# We will only fail these tests if the PR touches code in pylibcudf +# or cudf_polars itself. +# Note, the three dots mean we are doing diff between the merge-base +# of upstream and HEAD. So this is asking, "does _this branch_ touch +# files in cudf_polars/pylibcudf", rather than "are there changes +# between upstream and this branch which touch cudf_polars/pylibcudf" +# TODO: is the target branch exposed anywhere in an environment variable? +if [ -n "$(git diff --name-only origin/branch-24.08...HEAD -- python/cudf_polars/ python/cudf/cudf/_lib/pylibcudf/)" ]; +then + HAS_CHANGES=1 +else + HAS_CHANGES=0 +fi + +RAPIDS_PY_CUDA_SUFFIX="$(rapids-wheel-ctk-name-gen ${RAPIDS_CUDA_VERSION})" +RAPIDS_PY_WHEEL_NAME="cudf_${RAPIDS_PY_CUDA_SUFFIX}" rapids-download-wheels-from-s3 ./dist + +RESULTS_DIR=${RAPIDS_TESTS_DIR:-"$(mktemp -d)"} +RAPIDS_TESTS_DIR=${RAPIDS_TESTS_DIR:-"${RESULTS_DIR}/test-results"}/ +mkdir -p "${RAPIDS_TESTS_DIR}" + +rapids-logger "Install cudf wheel" +# echo to expand wildcard before adding `[extra]` requires for pip +python -m pip install $(echo ./dist/cudf*.whl)[test] + +rapids-logger "Install polars (allow pre-release versions)" +python -m pip install 'polars>=1.0.0a0' + +rapids-logger "Install cudf_polars" +python -m pip install --no-deps python/cudf_polars + +rapids-logger "Run cudf_polars tests" + +function set_exitcode() +{ + EXITCODE=$? +} +EXITCODE=0 +trap set_exitcode ERR +set +e + +python -m pytest \ + --cache-clear \ + --cov cudf_polars \ + --cov-fail-under=100 \ + --cov-config=python/cudf_polars/pyproject.toml \ + --junitxml="${RAPIDS_TESTS_DIR}/junit-cudf_polars.xml" \ + python/cudf_polars/tests + +trap ERR +set -e + +if [ ${EXITCODE} != 0 ]; then + rapids-logger "Testing FAILED: exitcode ${EXITCODE}" +else + rapids-logger "Testing PASSED" +fi + +if [ ${HAS_CHANGES} == 1 ]; then + exit ${EXITCODE} +else + exit 0 +fi diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 5fd68bfb26c..35cf90411f2 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -662,6 +662,7 @@ add_library( src/unary/math_ops.cu src/unary/nan_ops.cu src/unary/null_ops.cu + src/utilities/cuda_memcpy.cu src/utilities/default_stream.cpp src/utilities/linked_column.cpp src/utilities/logger.cpp diff --git a/cpp/include/cudf/detail/utilities/cuda_memcpy.hpp b/cpp/include/cudf/detail/utilities/cuda_memcpy.hpp new file mode 100644 index 00000000000..b66c461ab12 --- /dev/null +++ b/cpp/include/cudf/detail/utilities/cuda_memcpy.hpp @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace cudf::detail { + +enum class host_memory_kind : uint8_t { PINNED, PAGEABLE }; + +/** + * @brief Asynchronously copies data between the host and device. + * + * Implementation may use different strategies depending on the size and type of host data. + * + * @param dst Destination memory address + * @param src Source memory address + * @param size Number of bytes to copy + * @param kind Type of host memory + * @param stream CUDA stream used for the copy + */ +void cuda_memcpy_async( + void* dst, void const* src, size_t size, host_memory_kind kind, rmm::cuda_stream_view stream); + +/** + * @brief Synchronously copies data between the host and device. + * + * Implementation may use different strategies depending on the size and type of host data. + * + * @param dst Destination memory address + * @param src Source memory address + * @param size Number of bytes to copy + * @param kind Type of host memory + * @param stream CUDA stream used for the copy + */ +void cuda_memcpy( + void* dst, void const* src, size_t size, host_memory_kind kind, rmm::cuda_stream_view stream); + +} // namespace cudf::detail diff --git a/cpp/include/cudf/utilities/pinned_memory.hpp b/cpp/include/cudf/utilities/pinned_memory.hpp index b423eab6d38..3e2fa43cb50 100644 --- a/cpp/include/cudf/utilities/pinned_memory.hpp +++ b/cpp/include/cudf/utilities/pinned_memory.hpp @@ -55,4 +55,20 @@ struct pinned_mr_options { */ bool config_default_pinned_memory_resource(pinned_mr_options const& opts); +/** + * @brief Set the threshold size for using kernels for pinned memory copies. + * + * @param threshold The threshold size in bytes. If the size of the copy is less than this + * threshold, the copy will be done using kernels. If the size is greater than or equal to this + * threshold, the copy will be done using cudaMemcpyAsync. + */ +void set_kernel_pinned_copy_threshold(size_t threshold); + +/** + * @brief Get the threshold size for using kernels for pinned memory copies. + * + * @return The threshold size in bytes. + */ +size_t get_kernel_pinned_copy_threshold(); + } // namespace cudf diff --git a/cpp/include/cudf_test/column_wrapper.hpp b/cpp/include/cudf_test/column_wrapper.hpp index 47d17988775..7363f965af8 100644 --- a/cpp/include/cudf_test/column_wrapper.hpp +++ b/cpp/include/cudf_test/column_wrapper.hpp @@ -314,7 +314,12 @@ auto make_chars_and_offsets(StringsIterator begin, StringsIterator end, Validity for (auto str = begin; str < end; ++str) { std::string tmp = (*v++) ? std::string(*str) : std::string{}; chars.insert(chars.end(), std::cbegin(tmp), std::cend(tmp)); - offsets.push_back(offsets.back() + tmp.length()); + auto const last_offset = static_cast(offsets.back()); + auto const next_offset = last_offset + tmp.length(); + CUDF_EXPECTS( + next_offset < static_cast(std::numeric_limits::max()), + "Cannot use strings_column_wrapper to build a large strings column"); + offsets.push_back(static_cast(next_offset)); } return std::pair(std::move(chars), std::move(offsets)); }; diff --git a/cpp/src/io/parquet/reader_impl_chunking.cu b/cpp/src/io/parquet/reader_impl_chunking.cu index 9ad5a2d6e8d..d371ef5de93 100644 --- a/cpp/src/io/parquet/reader_impl_chunking.cu +++ b/cpp/src/io/parquet/reader_impl_chunking.cu @@ -337,7 +337,8 @@ int64_t find_next_split(int64_t cur_pos, size_t cur_row_index, size_t cur_cumulative_size, cudf::host_span sizes, - size_t size_limit) + size_t size_limit, + size_t min_row_count) { auto const start = thrust::make_transform_iterator( sizes.begin(), @@ -357,7 +358,7 @@ int64_t find_next_split(int64_t cur_pos, // this guarantees that even if we cannot fit the set of rows represented by our where our cur_pos // is, we will still move forward instead of failing. while (split_pos < (static_cast(sizes.size()) - 1) && - (sizes[split_pos].end_row_index == cur_row_index)) { + (sizes[split_pos].end_row_index - cur_row_index < min_row_count)) { split_pos++; } @@ -657,8 +658,10 @@ std::tuple, size_t, size_t> compute_next_subpass( auto const start_index = find_start_index(h_aggregated_info, start_row); auto const cumulative_size = start_row == 0 || start_index == 0 ? 0 : h_aggregated_info[start_index - 1].size_bytes; + // when choosing subpasses, we need to guarantee at least 2 rows in the included pages so that all + // list columns have a clear start and end. auto const end_index = - find_next_split(start_index, start_row, cumulative_size, h_aggregated_info, size_limit); + find_next_split(start_index, start_row, cumulative_size, h_aggregated_info, size_limit, 2); auto const end_row = h_aggregated_info[end_index].end_row_index; // for each column, collect the set of pages that spans start_row / end_row @@ -703,8 +706,8 @@ std::vector compute_page_splits_by_row(device_span 0; if (is_list && max_col_row < last_pass_row) { - size_t const min_col_row = static_cast(chunk.start_row + last_page.chunk_row); + auto const& first_page = subpass.pages[page_index]; + size_t const min_col_row = static_cast(chunk.start_row + first_page.chunk_row); CUDF_EXPECTS((max_col_row - min_col_row) > 1, "Unexpected short subpass"); max_col_row--; } diff --git a/cpp/src/io/utilities/hostdevice_vector.hpp b/cpp/src/io/utilities/hostdevice_vector.hpp index 9acd6a1e3a9..aed745c42dd 100644 --- a/cpp/src/io/utilities/hostdevice_vector.hpp +++ b/cpp/src/io/utilities/hostdevice_vector.hpp @@ -18,6 +18,7 @@ #include "hostdevice_span.hpp" +#include #include #include #include @@ -124,26 +125,22 @@ class hostdevice_vector { void host_to_device_async(rmm::cuda_stream_view stream) { - CUDF_CUDA_TRY( - cudaMemcpyAsync(device_ptr(), host_ptr(), size_bytes(), cudaMemcpyDefault, stream.value())); + cuda_memcpy_async(device_ptr(), host_ptr(), size_bytes(), host_memory_kind::PINNED, stream); } void host_to_device_sync(rmm::cuda_stream_view stream) { - host_to_device_async(stream); - stream.synchronize(); + cuda_memcpy(device_ptr(), host_ptr(), size_bytes(), host_memory_kind::PINNED, stream); } void device_to_host_async(rmm::cuda_stream_view stream) { - CUDF_CUDA_TRY( - cudaMemcpyAsync(host_ptr(), device_ptr(), size_bytes(), cudaMemcpyDefault, stream.value())); + cuda_memcpy_async(host_ptr(), device_ptr(), size_bytes(), host_memory_kind::PINNED, stream); } void device_to_host_sync(rmm::cuda_stream_view stream) { - device_to_host_async(stream); - stream.synchronize(); + cuda_memcpy(host_ptr(), device_ptr(), size_bytes(), host_memory_kind::PINNED, stream); } /** diff --git a/cpp/src/join/conditional_join.cu b/cpp/src/join/conditional_join.cu index f02dee5f7f5..97a06d5a923 100644 --- a/cpp/src/join/conditional_join.cu +++ b/cpp/src/join/conditional_join.cu @@ -48,8 +48,7 @@ std::unique_ptr> conditional_join_anti_semi( { if (right.num_rows() == 0) { switch (join_type) { - case join_kind::LEFT_ANTI_JOIN: - return std::make_unique>(left.num_rows(), stream, mr); + case join_kind::LEFT_ANTI_JOIN: return get_trivial_left_join_indices(left, stream, mr).first; case join_kind::LEFT_SEMI_JOIN: return std::make_unique>(0, stream, mr); default: CUDF_FAIL("Invalid join kind."); break; @@ -96,10 +95,6 @@ std::unique_ptr> conditional_join_anti_semi( join_size = size.value(stream); } - if (left.num_rows() == 0) { - return std::make_unique>(0, stream, mr); - } - rmm::device_scalar write_index(0, stream); auto left_indices = std::make_unique>(join_size, stream, mr); @@ -149,8 +144,7 @@ conditional_join(table_view const& left, // with a corresponding NULL from the right. case join_kind::LEFT_JOIN: case join_kind::LEFT_ANTI_JOIN: - case join_kind::FULL_JOIN: - return get_trivial_left_join_indices(left, stream, rmm::mr::get_current_device_resource()); + case join_kind::FULL_JOIN: return get_trivial_left_join_indices(left, stream, mr); // Inner and left semi joins return empty output because no matches can exist. case join_kind::INNER_JOIN: case join_kind::LEFT_SEMI_JOIN: @@ -169,8 +163,7 @@ conditional_join(table_view const& left, std::make_unique>(0, stream, mr)); // Full joins need to return the trivial complement. case join_kind::FULL_JOIN: { - auto ret_flipped = - get_trivial_left_join_indices(right, stream, rmm::mr::get_current_device_resource()); + auto ret_flipped = get_trivial_left_join_indices(right, stream, mr); return std::pair(std::move(ret_flipped.second), std::move(ret_flipped.first)); } default: CUDF_FAIL("Invalid join kind."); break; diff --git a/cpp/src/stream_compaction/distinct.cu b/cpp/src/stream_compaction/distinct.cu index a6f15cc49ec..e5cf29f3ebf 100644 --- a/cpp/src/stream_compaction/distinct.cu +++ b/cpp/src/stream_compaction/distinct.cu @@ -17,28 +17,62 @@ #include "distinct_helpers.hpp" #include +#include #include #include #include #include +#include #include #include #include +#include +#include #include #include -#include -#include -#include -#include -#include - #include #include namespace cudf { namespace detail { +namespace { +/** + * @brief Invokes the given `func` with desired the row equality + * + * @tparam HasNested Flag indicating whether there are nested columns in the input + * @tparam Func Type of the helper function doing `distinct` check + * + * @param compare_nulls Control whether nulls should be compared as equal or not + * @param compare_nans Control whether floating-point NaNs values should be compared as equal or not + * @param has_nulls Flag indicating whether the input has nulls or not + * @param row_equal Self table comparator + * @param func The input functor to invoke + */ +template +rmm::device_uvector dipatch_row_equal( + null_equality compare_nulls, + nan_equality compare_nans, + bool has_nulls, + cudf::experimental::row::equality::self_comparator row_equal, + Func&& func) +{ + if (compare_nans == nan_equality::ALL_EQUAL) { + auto const d_equal = row_equal.equal_to( + nullate::DYNAMIC{has_nulls}, + compare_nulls, + cudf::experimental::row::equality::nan_equal_physical_equality_comparator{}); + return func(d_equal); + } else { + auto const d_equal = row_equal.equal_to( + nullate::DYNAMIC{has_nulls}, + compare_nulls, + cudf::experimental::row::equality::physical_equality_comparator{}); + return func(d_equal); + } +} +} // namespace rmm::device_uvector distinct_indices(table_view const& input, duplicate_keep_option keep, @@ -47,97 +81,39 @@ rmm::device_uvector distinct_indices(table_view const& input, rmm::cuda_stream_view stream, rmm::device_async_resource_ref mr) { - if (input.num_rows() == 0 or input.num_columns() == 0) { + auto const num_rows = input.num_rows(); + + if (num_rows == 0 or input.num_columns() == 0) { return rmm::device_uvector(0, stream, mr); } - auto map = hash_map_type{compute_hash_table_size(input.num_rows()), - cuco::empty_key{-1}, - cuco::empty_value{std::numeric_limits::min()}, - cudf::detail::cuco_allocator{stream}, - stream.value()}; - auto const preprocessed_input = cudf::experimental::row::hash::preprocessed_table::create(input, stream); auto const has_nulls = nullate::DYNAMIC{cudf::has_nested_nulls(input)}; auto const has_nested_columns = cudf::detail::has_nested_columns(input); - auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input); - auto const key_hasher = row_hasher.device_hasher(has_nulls); - - auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input); - - auto const pair_iter = cudf::detail::make_counting_transform_iterator( - size_type{0}, - cuda::proclaim_return_type>( - [] __device__(size_type const i) { return cuco::make_pair(i, i); })); - - auto const insert_keys = [&](auto const value_comp) { - if (has_nested_columns) { - auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal, value_comp); - map.insert(pair_iter, pair_iter + input.num_rows(), key_hasher, key_equal, stream.value()); - } else { - auto const key_equal = row_comp.equal_to(has_nulls, nulls_equal, value_comp); - map.insert(pair_iter, pair_iter + input.num_rows(), key_hasher, key_equal, stream.value()); - } + auto const row_hash = cudf::experimental::row::hash::row_hasher(preprocessed_input); + auto const row_equal = cudf::experimental::row::equality::self_comparator(preprocessed_input); + + auto const helper_func = [&](auto const& d_equal) { + using RowHasher = std::decay_t; + auto set = hash_set_type{num_rows, + 0.5, // desired load factor + cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL}, + d_equal, + {row_hash.device_hasher(has_nulls)}, + {}, + {}, + cudf::detail::cuco_allocator{stream}, + stream.value()}; + return detail::reduce_by_row(set, num_rows, keep, stream, mr); }; - if (nans_equal == nan_equality::ALL_EQUAL) { - using nan_equal_comparator = - cudf::experimental::row::equality::nan_equal_physical_equality_comparator; - insert_keys(nan_equal_comparator{}); + if (cudf::detail::has_nested_columns(input)) { + return dipatch_row_equal(nulls_equal, nans_equal, has_nulls, row_equal, helper_func); } else { - using nan_unequal_comparator = cudf::experimental::row::equality::physical_equality_comparator; - insert_keys(nan_unequal_comparator{}); + return dipatch_row_equal(nulls_equal, nans_equal, has_nulls, row_equal, helper_func); } - - auto output_indices = rmm::device_uvector(map.get_size(), stream, mr); - - // If we don't care about order, just gather indices of distinct keys taken from map. - if (keep == duplicate_keep_option::KEEP_ANY) { - map.retrieve_all(output_indices.begin(), thrust::make_discard_iterator(), stream.value()); - return output_indices; - } - - // For other keep options, reduce by row on rows that compare equal. - auto const reduction_results = reduce_by_row(map, - std::move(preprocessed_input), - input.num_rows(), - has_nulls, - has_nested_columns, - keep, - nulls_equal, - nans_equal, - stream, - rmm::mr::get_current_device_resource()); - - // Extract the desired output indices from reduction results. - auto const map_end = [&] { - if (keep == duplicate_keep_option::KEEP_NONE) { - // Reduction results with `KEEP_NONE` are either group sizes of equal rows, or `0`. - // Thus, we only output index of the rows in the groups having group size of `1`. - return thrust::copy_if(rmm::exec_policy(stream), - thrust::make_counting_iterator(0), - thrust::make_counting_iterator(input.num_rows()), - output_indices.begin(), - [reduction_results = reduction_results.begin()] __device__( - auto const idx) { return reduction_results[idx] == size_type{1}; }); - } - - // Reduction results with `KEEP_FIRST` and `KEEP_LAST` are row indices of the first/last row in - // each group of equal rows (which are the desired output indices), or the value given by - // `reduction_init_value()`. - return thrust::copy_if(rmm::exec_policy(stream), - reduction_results.begin(), - reduction_results.end(), - output_indices.begin(), - [init_value = reduction_init_value(keep)] __device__(auto const idx) { - return idx != init_value; - }); - }(); - - output_indices.resize(thrust::distance(output_indices.begin(), map_end), stream); - return output_indices; } std::unique_ptr distinct(table_view const& input, diff --git a/cpp/src/stream_compaction/distinct_count.cu b/cpp/src/stream_compaction/distinct_count.cu index 99ca89cc021..9843bb889f4 100644 --- a/cpp/src/stream_compaction/distinct_count.cu +++ b/cpp/src/stream_compaction/distinct_count.cu @@ -15,16 +15,17 @@ */ #include "stream_compaction_common.cuh" -#include "stream_compaction_common.hpp" #include #include #include +#include #include #include #include #include #include +#include #include #include #include diff --git a/cpp/src/stream_compaction/distinct_helpers.cu b/cpp/src/stream_compaction/distinct_helpers.cu index 13e89b15bb7..c3a004b7f28 100644 --- a/cpp/src/stream_compaction/distinct_helpers.cu +++ b/cpp/src/stream_compaction/distinct_helpers.cu @@ -16,96 +16,127 @@ #include "distinct_helpers.hpp" -#include - -#include +#include +#include namespace cudf::detail { -namespace { -/** - * @brief The functor to find the first/last/all duplicate row for rows that compared equal. - */ -template -struct reduce_fn : reduce_by_row_fn_base { - duplicate_keep_option const keep; - - reduce_fn(MapView const& d_map, - KeyHasher const& d_hasher, - KeyEqual const& d_equal, - duplicate_keep_option const keep, - size_type* const d_output) - : reduce_by_row_fn_base{d_map, - d_hasher, - d_equal, - d_output}, - keep{keep} - { +template +rmm::device_uvector reduce_by_row(hash_set_type& set, + size_type num_rows, + duplicate_keep_option keep, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto output_indices = rmm::device_uvector(num_rows, stream, mr); + + // If we don't care about order, just gather indices of distinct keys taken from set. + if (keep == duplicate_keep_option::KEEP_ANY) { + auto const iter = thrust::counting_iterator{0}; + set.insert_async(iter, iter + num_rows, stream.value()); + auto const output_end = set.retrieve_all(output_indices.begin(), stream.value()); + output_indices.resize(thrust::distance(output_indices.begin(), output_end), stream); + return output_indices; } - __device__ void operator()(size_type const idx) const - { - auto const out_ptr = this->get_output_ptr(idx); - - if (keep == duplicate_keep_option::KEEP_FIRST) { - // Store the smallest index of all rows that are equal. - atomicMin(out_ptr, idx); - } else if (keep == duplicate_keep_option::KEEP_LAST) { - // Store the greatest index of all rows that are equal. - atomicMax(out_ptr, idx); - } else { - // Count the number of rows in each group of rows that are compared equal. - atomicAdd(out_ptr, size_type{1}); + auto reduction_results = rmm::device_uvector(num_rows, stream, mr); + thrust::uninitialized_fill(rmm::exec_policy_nosync(stream), + reduction_results.begin(), + reduction_results.end(), + reduction_init_value(keep)); + + auto set_ref = set.ref(cuco::op::insert_and_find); + + thrust::for_each(rmm::exec_policy_nosync(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_rows), + [set_ref, keep, reduction_results = reduction_results.begin()] __device__( + size_type const idx) mutable { + auto const [inserted_idx_ptr, _] = set_ref.insert_and_find(idx); + + auto ref = cuda::atomic_ref{ + reduction_results[*inserted_idx_ptr]}; + if (keep == duplicate_keep_option::KEEP_FIRST) { + // Store the smallest index of all rows that are equal. + ref.fetch_min(idx, cuda::memory_order_relaxed); + } else if (keep == duplicate_keep_option::KEEP_LAST) { + // Store the greatest index of all rows that are equal. + ref.fetch_max(idx, cuda::memory_order_relaxed); + } else { + // Count the number of rows in each group of rows that are compared equal. + ref.fetch_add(size_type{1}, cuda::memory_order_relaxed); + } + }); + + auto const map_end = [&] { + if (keep == duplicate_keep_option::KEEP_NONE) { + // Reduction results with `KEEP_NONE` are either group sizes of equal rows, or `0`. + // Thus, we only output index of the rows in the groups having group size of `1`. + return thrust::copy_if( + rmm::exec_policy(stream), + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(num_rows), + output_indices.begin(), + cuda::proclaim_return_type( + [reduction_results = reduction_results.begin()] __device__(auto const idx) { + return reduction_results[idx] == size_type{1}; + })); } - } -}; -/** - * @brief The builder to construct an instance of `reduce_fn` functor base on the given - * value of the `duplicate_keep_option` member variable. - */ -struct reduce_func_builder { - duplicate_keep_option const keep; - - template - auto build(MapView const& d_map, - KeyHasher const& d_hasher, - KeyEqual const& d_equal, - size_type* const d_output) - { - return reduce_fn{d_map, d_hasher, d_equal, keep, d_output}; - } -}; + // Reduction results with `KEEP_FIRST` and `KEEP_LAST` are row indices of the first/last row in + // each group of equal rows (which are the desired output indices), or the value given by + // `reduction_init_value()`. + return thrust::copy_if( + rmm::exec_policy(stream), + reduction_results.begin(), + reduction_results.end(), + output_indices.begin(), + cuda::proclaim_return_type([init_value = reduction_init_value(keep)] __device__( + auto const idx) { return idx != init_value; })); + }(); -} // namespace + output_indices.resize(thrust::distance(output_indices.begin(), map_end), stream); + return output_indices; +} -// This function is split from `distinct.cu` to improve compile time. -rmm::device_uvector reduce_by_row( - hash_map_type const& map, - std::shared_ptr const preprocessed_input, +template rmm::device_uvector reduce_by_row( + hash_set_type>& set, size_type num_rows, - cudf::nullate::DYNAMIC has_nulls, - bool has_nested_columns, duplicate_keep_option keep, - null_equality nulls_equal, - nan_equality nans_equal, rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - CUDF_EXPECTS(keep != duplicate_keep_option::KEEP_ANY, - "This function should not be called with KEEP_ANY"); - - return hash_reduce_by_row(map, - preprocessed_input, - num_rows, - has_nulls, - has_nested_columns, - nulls_equal, - nans_equal, - reduce_func_builder{keep}, - reduction_init_value(keep), - stream, - mr); -} + rmm::device_async_resource_ref mr); + +template rmm::device_uvector reduce_by_row( + hash_set_type>& set, + size_type num_rows, + duplicate_keep_option keep, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +template rmm::device_uvector reduce_by_row( + hash_set_type>& set, + size_type num_rows, + duplicate_keep_option keep, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); + +template rmm::device_uvector reduce_by_row( + hash_set_type>& set, + size_type num_rows, + duplicate_keep_option keep, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); } // namespace cudf::detail diff --git a/cpp/src/stream_compaction/distinct_helpers.hpp b/cpp/src/stream_compaction/distinct_helpers.hpp index 40f97e00ce5..fca67c98873 100644 --- a/cpp/src/stream_compaction/distinct_helpers.hpp +++ b/cpp/src/stream_compaction/distinct_helpers.hpp @@ -14,8 +14,7 @@ * limitations under the License. */ -#include "stream_compaction_common.hpp" - +#include #include #include #include @@ -24,6 +23,12 @@ #include #include +#include +#include +#include +#include +#include + namespace cudf::detail { /** @@ -42,13 +47,28 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) } } +template +using hash_set_type = + cuco::static_set, + cuda::thread_scope_device, + RowHasher, + cuco::linear_probing<1, + cudf::experimental::row::hash::device_row_hasher< + cudf::hashing::detail::default_hash, + cudf::nullate::DYNAMIC>>, + cudf::detail::cuco_allocator, + cuco::storage<1>>; + /** - * @brief Perform a reduction on groups of rows that are compared equal. + * @brief Perform a reduction on groups of rows that are compared equal and returns output indices + * of the occurrences of the distinct elements based on `keep` parameter. * * This is essentially a reduce-by-key operation with keys are non-contiguous rows and are compared - * equal. A hash table is used to find groups of equal rows. + * equal. A hash set is used to find groups of equal rows. * * Depending on the `keep` parameter, the reduction operation for each row group is: + * - If `keep == KEEP_ANY` : order does not matter. * - If `keep == KEEP_FIRST`: min of row indices in the group. * - If `keep == KEEP_LAST`: max of row indices in the group. * - If `keep == KEEP_NONE`: count of equivalent rows (group size). @@ -59,30 +79,18 @@ auto constexpr reduction_init_value(duplicate_keep_option keep) * the `reduction_init_value()` function. Then, the reduction result for each row group is written * into the output array at the index of an unspecified row in the group. * - * @param map The auxiliary map to perform reduction - * @param preprocessed_input The preprocessed of the input rows for computing row hashing and row - * comparisons + * @param set The auxiliary set to perform reduction + * @param set_size The number of elements in set * @param num_rows The number of all input rows - * @param has_nulls Indicate whether the input rows has any nulls at any nested levels - * @param has_nested_columns Indicates whether the input table has any nested columns * @param keep The parameter to determine what type of reduction to perform - * @param nulls_equal Flag to specify whether null elements should be considered as equal - * @param nans_equal Flag to specify whether NaN values in floating point column should be - * considered equal. * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned vector - * @return A device_uvector containing the reduction results + * @return A device_uvector containing the output indices */ -rmm::device_uvector reduce_by_row( - hash_map_type const& map, - std::shared_ptr const preprocessed_input, - size_type num_rows, - cudf::nullate::DYNAMIC has_nulls, - bool has_nested_columns, - duplicate_keep_option keep, - null_equality nulls_equal, - nan_equality nans_equal, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr); - +template +rmm::device_uvector reduce_by_row(hash_set_type& set, + size_type num_rows, + duplicate_keep_option keep, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr); } // namespace cudf::detail diff --git a/cpp/src/stream_compaction/stream_compaction_common.cuh b/cpp/src/stream_compaction/stream_compaction_common.cuh index 839672d6a56..0f9bc18e258 100644 --- a/cpp/src/stream_compaction/stream_compaction_common.cuh +++ b/cpp/src/stream_compaction/stream_compaction_common.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,9 +15,8 @@ */ #pragma once -#include "stream_compaction_common.hpp" - #include +#include #include #include diff --git a/cpp/src/stream_compaction/stream_compaction_common.hpp b/cpp/src/stream_compaction/stream_compaction_common.hpp deleted file mode 100644 index 13795f49781..00000000000 --- a/cpp/src/stream_compaction/stream_compaction_common.hpp +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2022-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#pragma once - -#include -#include -#include -#include - -#include -#include - -#include - -namespace cudf { -namespace detail { - -using hash_map_type = cuco::legacy:: - static_map; - -} // namespace detail -} // namespace cudf diff --git a/cpp/src/stream_compaction/unique.cu b/cpp/src/stream_compaction/unique.cu index c1f8b17938c..edb47984d13 100644 --- a/cpp/src/stream_compaction/unique.cu +++ b/cpp/src/stream_compaction/unique.cu @@ -15,7 +15,6 @@ */ #include "stream_compaction_common.cuh" -#include "stream_compaction_common.hpp" #include #include diff --git a/cpp/src/utilities/cuda_memcpy.cu b/cpp/src/utilities/cuda_memcpy.cu new file mode 100644 index 00000000000..3d0822d8545 --- /dev/null +++ b/cpp/src/utilities/cuda_memcpy.cu @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include + +#include + +#include + +namespace cudf::detail { + +namespace { + +void copy_pinned(void* dst, void const* src, std::size_t size, rmm::cuda_stream_view stream) +{ + if (size == 0) return; + + if (size < get_kernel_pinned_copy_threshold()) { + thrust::copy_n(rmm::exec_policy_nosync(stream), + static_cast(src), + size, + static_cast(dst)); + } else { + CUDF_CUDA_TRY(cudaMemcpyAsync(dst, src, size, cudaMemcpyDefault, stream)); + } +} + +void copy_pageable(void* dst, void const* src, std::size_t size, rmm::cuda_stream_view stream) +{ + if (size == 0) return; + + CUDF_CUDA_TRY(cudaMemcpyAsync(dst, src, size, cudaMemcpyDefault, stream)); +} + +}; // namespace + +void cuda_memcpy_async( + void* dst, void const* src, size_t size, host_memory_kind kind, rmm::cuda_stream_view stream) +{ + if (kind == host_memory_kind::PINNED) { + copy_pinned(dst, src, size, stream); + } else if (kind == host_memory_kind::PAGEABLE) { + copy_pageable(dst, src, size, stream); + } else { + CUDF_FAIL("Unsupported host memory kind"); + } +} + +void cuda_memcpy( + void* dst, void const* src, size_t size, host_memory_kind kind, rmm::cuda_stream_view stream) +{ + cuda_memcpy_async(dst, src, size, kind, stream); + stream.synchronize(); +} + +} // namespace cudf::detail diff --git a/cpp/src/utilities/pinned_memory.cpp b/cpp/src/utilities/pinned_memory.cpp index e90b7969b4d..3ea4293fc60 100644 --- a/cpp/src/utilities/pinned_memory.cpp +++ b/cpp/src/utilities/pinned_memory.cpp @@ -211,4 +211,18 @@ bool config_default_pinned_memory_resource(pinned_mr_options const& opts) return did_configure; } +CUDF_EXPORT auto& kernel_pinned_copy_threshold() +{ + // use cudaMemcpyAsync for all pinned copies + static std::atomic threshold = 0; + return threshold; +} + +void set_kernel_pinned_copy_threshold(size_t threshold) +{ + kernel_pinned_copy_threshold() = threshold; +} + +size_t get_kernel_pinned_copy_threshold() { return kernel_pinned_copy_threshold(); } + } // namespace cudf diff --git a/cpp/tests/join/conditional_join_tests.cu b/cpp/tests/join/conditional_join_tests.cu index 79968bcd7f4..7ab4a2ea465 100644 --- a/cpp/tests/join/conditional_join_tests.cu +++ b/cpp/tests/join/conditional_join_tests.cu @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -222,21 +223,25 @@ struct ConditionalJoinPairReturnTest : public ConditionalJoinTest { std::vector> expected_outputs) { auto result_size = this->join_size(left, right, predicate); - EXPECT_TRUE(result_size == expected_outputs.size()); - - auto result = this->join(left, right, predicate); - std::vector> result_pairs; - for (size_t i = 0; i < result.first->size(); ++i) { - // Note: Not trying to be terribly efficient here since these tests are - // small, otherwise a batch copy to host before constructing the tuples - // would be important. - result_pairs.push_back({result.first->element(i, cudf::get_default_stream()), - result.second->element(i, cudf::get_default_stream())}); - } + EXPECT_EQ(result_size, expected_outputs.size()); + + auto result = this->join(left, right, predicate); + auto lhs_result = cudf::detail::make_std_vector_sync(*result.first, cudf::get_default_stream()); + auto rhs_result = + cudf::detail::make_std_vector_sync(*result.second, cudf::get_default_stream()); + std::vector> result_pairs(lhs_result.size()); + std::transform(lhs_result.begin(), + lhs_result.end(), + rhs_result.begin(), + result_pairs.begin(), + [](cudf::size_type lhs, cudf::size_type rhs) { + return std::pair{lhs, rhs}; + }); std::sort(result_pairs.begin(), result_pairs.end()); std::sort(expected_outputs.begin(), expected_outputs.end()); - EXPECT_TRUE(std::equal(expected_outputs.begin(), expected_outputs.end(), result_pairs.begin())); + EXPECT_TRUE(std::equal( + expected_outputs.begin(), expected_outputs.end(), result_pairs.begin(), result_pairs.end())); } /* @@ -411,6 +416,11 @@ TYPED_TEST(ConditionalInnerJoinTest, TestOneColumnLeftEmpty) this->test({{}}, {{3, 4, 5}}, left_zero_eq_right_zero, {}); }; +TYPED_TEST(ConditionalInnerJoinTest, TestOneColumnRightEmpty) +{ + this->test({{3, 4, 5}}, {{}}, left_zero_eq_right_zero, {}); +}; + TYPED_TEST(ConditionalInnerJoinTest, TestOneColumnTwoRowAllEqual) { this->test({{0, 1}}, {{0, 0}}, left_zero_eq_right_zero, {{0, 0}, {0, 1}}); @@ -600,6 +610,14 @@ TYPED_TEST(ConditionalLeftJoinTest, TestOneColumnLeftEmpty) this->test({{}}, {{3, 4, 5}}, left_zero_eq_right_zero, {}); }; +TYPED_TEST(ConditionalLeftJoinTest, TestOneColumnRightEmpty) +{ + this->test({{3, 4, 5}}, + {{}}, + left_zero_eq_right_zero, + {{0, JoinNoneValue}, {1, JoinNoneValue}, {2, JoinNoneValue}}); +}; + TYPED_TEST(ConditionalLeftJoinTest, TestCompareRandomToHash) { auto [left, right] = gen_random_repeated_columns(); @@ -666,6 +684,14 @@ TYPED_TEST(ConditionalFullJoinTest, TestOneColumnLeftEmpty) {{JoinNoneValue, 0}, {JoinNoneValue, 1}, {JoinNoneValue, 2}}); }; +TYPED_TEST(ConditionalFullJoinTest, TestOneColumnRightEmpty) +{ + this->test({{3, 4, 5}}, + {{}}, + left_zero_eq_right_zero, + {{0, JoinNoneValue}, {1, JoinNoneValue}, {2, JoinNoneValue}}); +}; + TYPED_TEST(ConditionalFullJoinTest, TestTwoColumnThreeRowSomeEqual) { this->test({{0, 1, 2}, {10, 20, 30}}, @@ -705,20 +731,16 @@ struct ConditionalJoinSingleReturnTest : public ConditionalJoinTest { auto [left_wrappers, right_wrappers, left_columns, right_columns, left, right] = this->parse_input(left_data, right_data); auto result_size = this->join_size(left, right, predicate); - EXPECT_TRUE(result_size == expected_outputs.size()); - - auto result = this->join(left, right, predicate); - std::vector resulting_indices; - for (size_t i = 0; i < result->size(); ++i) { - // Note: Not trying to be terribly efficient here since these tests are - // small, otherwise a batch copy to host before constructing the tuples - // would be important. - resulting_indices.push_back(result->element(i, cudf::get_default_stream())); - } - std::sort(resulting_indices.begin(), resulting_indices.end()); + EXPECT_EQ(result_size, expected_outputs.size()); + + auto result = this->join(left, right, predicate); + auto result_indices = cudf::detail::make_std_vector_sync(*result, cudf::get_default_stream()); + std::sort(result_indices.begin(), result_indices.end()); std::sort(expected_outputs.begin(), expected_outputs.end()); - EXPECT_TRUE( - std::equal(resulting_indices.begin(), resulting_indices.end(), expected_outputs.begin())); + EXPECT_TRUE(std::equal(result_indices.begin(), + result_indices.end(), + expected_outputs.begin(), + expected_outputs.end())); } void _compare_to_hash_join(std::unique_ptr> const& result, @@ -826,6 +848,16 @@ struct ConditionalLeftSemiJoinTest : public ConditionalJoinSingleReturnTest { TYPED_TEST_SUITE(ConditionalLeftSemiJoinTest, cudf::test::IntegralTypesNotBool); +TYPED_TEST(ConditionalLeftSemiJoinTest, TestOneColumnLeftEmpty) +{ + this->test({{}}, {{3, 4, 5}}, left_zero_eq_right_zero, {}); +}; + +TYPED_TEST(ConditionalLeftSemiJoinTest, TestOneColumnRightEmpty) +{ + this->test({{3, 4, 5}}, {{}}, left_zero_eq_right_zero, {}); +}; + TYPED_TEST(ConditionalLeftSemiJoinTest, TestTwoColumnThreeRowSomeEqual) { this->test({{0, 1, 2}, {10, 20, 30}}, {{0, 1, 3}, {30, 40, 50}}, left_zero_eq_right_zero, {0, 1}); @@ -873,6 +905,16 @@ struct ConditionalLeftAntiJoinTest : public ConditionalJoinSingleReturnTest { TYPED_TEST_SUITE(ConditionalLeftAntiJoinTest, cudf::test::IntegralTypesNotBool); +TYPED_TEST(ConditionalLeftAntiJoinTest, TestOneColumnLeftEmpty) +{ + this->test({{}}, {{3, 4, 5}}, left_zero_eq_right_zero, {}); +}; + +TYPED_TEST(ConditionalLeftAntiJoinTest, TestOneColumnRightEmpty) +{ + this->test({{3, 4, 5}}, {{}}, left_zero_eq_right_zero, {0, 1, 2}); +}; + TYPED_TEST(ConditionalLeftAntiJoinTest, TestTwoColumnThreeRowSomeEqual) { this->test({{0, 1, 2}, {10, 20, 30}}, {{0, 1, 3}, {30, 40, 50}}, left_zero_eq_right_zero, {2}); diff --git a/docs/cudf/source/_static/cudf-pandas-line-profile.png b/docs/cudf/source/_static/cudf-pandas-line-profile.png new file mode 100644 index 00000000000..1d5a07c72eb Binary files /dev/null and b/docs/cudf/source/_static/cudf-pandas-line-profile.png differ diff --git a/docs/cudf/source/cudf_pandas/faq.md b/docs/cudf/source/cudf_pandas/faq.md index 55976740105..cdf32216619 100644 --- a/docs/cudf/source/cudf_pandas/faq.md +++ b/docs/cudf/source/cudf_pandas/faq.md @@ -53,6 +53,22 @@ print(pd) ``` +## Which functions will run on the GPU? + +Generally, `cudf.pandas` will accelerate all the features in the +{ref}`cuDF API ` on the GPU. There are some exceptions. For +example, some functions are GPU-accelerated by cuDF but do not support +every combination of keyword arguments. In cases like unsupported +keyword arguments, cuDF is not able to provide GPU acceleration and +`cudf.pandas` will fall back to the CPU. + +The most accurate way to assess which functions run on the GPU is to try +running the code while using the `cudf.pandas` profiling features. The +profiler will indicate which functions ran on GPU / CPU. To improve +performance, try to use only functionality that can run entirely on GPU. +This helps reduce the number of memory transfers needed to fallback to +CPU. + ## Does it work with third-party libraries? `cudf.pandas` is tested with numerous popular third-party libraries. diff --git a/docs/cudf/source/cudf_pandas/usage.md b/docs/cudf/source/cudf_pandas/usage.md index 376784439aa..0398a8d7086 100644 --- a/docs/cudf/source/cudf_pandas/usage.md +++ b/docs/cudf/source/cudf_pandas/usage.md @@ -63,16 +63,22 @@ back to CPU for certain operations. Running your code with the `cudf.pandas.profile` magic generates a report showing which operations used the GPU and which used the CPU. This can help you identify parts of your code that could be rewritten to be more -GPU-friendly: +GPU-friendly. + +### Using the Function Profiler + +First, enable `cudf.pandas`: ```python %load_ext cudf.pandas import pandas as pd ``` +Next, use the IPython/Jupyter magic `cudf.pandas.profile`: + ```python %%cudf.pandas.profile -df = pd.DataFrame({'a': [0, 1, 2], 'b': [3,4,3]}) +df = pd.DataFrame({'a': [0, 1, 2], 'b': [3, 4, 3]}) df.min(axis=1) out = df.groupby('a').filter( @@ -80,13 +86,35 @@ out = df.groupby('a').filter( ) ``` +This gives a profiler output after the cell runs, shown below. + ![cudf-pandas-profile](../_static/cudf-pandas-profile.png) When an operation falls back to using the CPU, it's typically because that operation isn't implemented by cuDF. The profiler generates a handy link to report the missing functionality to the cuDF team. -To profile a script being run from the command-line, pass the +### Using the Line Profiler + +There is a line profiler activated by the IPython/Jupyter magic `cudf.pandas.line_profile`: + +```python +%%cudf.pandas.line_profile +df = pd.DataFrame({'a': [0, 1, 2], 'b': [3, 4, 3]}) + +df.min(axis=1) +out = df.groupby('a').filter( + lambda group: len(group) > 1 +) +``` + +The output of the line profiler shows the source code and how much time each line spent executing on the GPU and CPU. + +![cudf-pandas-line-profile](../_static/cudf-pandas-line-profile.png) + +### Profiling from the command line + +To profile a script being run from the command line, pass the `--profile` argument: ```bash diff --git a/docs/cudf/source/user_guide/api_docs/index.rst b/docs/cudf/source/user_guide/api_docs/index.rst index b3442908531..5f26a921012 100644 --- a/docs/cudf/source/user_guide/api_docs/index.rst +++ b/docs/cudf/source/user_guide/api_docs/index.rst @@ -1,3 +1,5 @@ +.. _cudf-api: + ============= API reference ============= diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst index f98298ff052..e9dad705cbf 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst @@ -19,6 +19,7 @@ This page provides API documentation for pylibcudf. gpumemoryview groupby io/index.rst + interop join lists merge diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/interop.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/interop.rst new file mode 100644 index 00000000000..881ab8d7be4 --- /dev/null +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/interop.rst @@ -0,0 +1,6 @@ +======= +interop +======= + +.. automodule:: cudf._lib.pylibcudf.interop + :members: diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/strings/index.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/strings/index.rst index bfaef732555..cecf1ccc9bb 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/strings/index.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/strings/index.rst @@ -6,3 +6,4 @@ strings contains replace + slice diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/strings/slice.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/strings/slice.rst new file mode 100644 index 00000000000..0ee5af71c03 --- /dev/null +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/strings/slice.rst @@ -0,0 +1,6 @@ +===== +slice +===== + +.. automodule:: cudf._lib.pylibcudf.strings.slice + :members: diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 7914ed7e9d9..d1ec5be9e62 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -20,6 +20,7 @@ from cudf.api.types import is_list_like from cudf._lib.utils cimport data_from_unique_ptr +from cudf._lib import pylibcudf from cudf._lib.utils import _index_level_name, generate_pandas_metadata from libc.stdint cimport uint8_t @@ -70,8 +71,11 @@ from cudf._lib.utils cimport table_view_from_table from pyarrow.lib import NativeFile +from cudf._lib.concat import concat_columns from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT +from cudf._lib.utils cimport data_from_pylibcudf_table + cdef class BufferArrayFromVector: cdef Py_ssize_t length @@ -878,14 +882,32 @@ cdef class ParquetReader: return df def read(self): - dfs = [] + dfs = self._read_chunk() + column_names = dfs._column_names + concatenated_columns = list(dfs._columns) + del dfs while self._has_next(): - dfs.append(self._read_chunk()) - df = cudf.concat(dfs) - df = _process_metadata(df, self.result_meta, self.names, self.row_groups, - self.filepaths_or_buffers, self.pa_buffers, - self.allow_range_index, self.cpp_use_pandas_metadata) - return df + new_chunk = list(self._read_chunk()._columns) + for i in range(len(column_names)): + concatenated_columns[i] = concat_columns( + [concatenated_columns[i], new_chunk[i]] + ) + # Must drop any residual GPU columns to save memory + new_chunk[i] = None + + dfs = cudf.DataFrame._from_data( + *data_from_pylibcudf_table( + pylibcudf.Table( + [col.to_pylibcudf(mode="read") for col in concatenated_columns] + ), + column_names=column_names, + index_names=None + ) + ) + + return _process_metadata(dfs, self.result_meta, self.names, self.row_groups, + self.filepaths_or_buffers, self.pa_buffers, + self.allow_range_index, self.cpp_use_pandas_metadata) cpdef merge_filemetadata(object filemetadata_list): """ diff --git a/python/cudf/cudf/_lib/pylibcudf/join.pxd b/python/cudf/cudf/_lib/pylibcudf/join.pxd index f560eeef06d..83b4776c16e 100644 --- a/python/cudf/cudf/_lib/pylibcudf/join.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/join.pxd @@ -35,3 +35,5 @@ cpdef Column left_anti_join( Table right_keys, null_equality nulls_equal ) + +cpdef Table cross_join(Table left, Table right) diff --git a/python/cudf/cudf/_lib/pylibcudf/join.pyx b/python/cudf/cudf/_lib/pylibcudf/join.pyx index cf2a6a8187f..308b1b39291 100644 --- a/python/cudf/cudf/_lib/pylibcudf/join.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/join.pyx @@ -2,13 +2,14 @@ from cython.operator import dereference -from libcpp.memory cimport make_unique +from libcpp.memory cimport make_unique, unique_ptr from libcpp.utility cimport move from rmm._lib.device_buffer cimport device_buffer from cudf._lib.pylibcudf.libcudf cimport join as cpp_join from cudf._lib.pylibcudf.libcudf.column.column cimport column +from cudf._lib.pylibcudf.libcudf.table.table cimport table from cudf._lib.pylibcudf.libcudf.types cimport ( data_type, null_equality, @@ -88,7 +89,6 @@ cpdef tuple left_join( nulls_equal : NullEquality Should nulls compare equal? - Returns ------- Tuple[Column, Column] @@ -122,7 +122,6 @@ cpdef tuple full_join( nulls_equal : NullEquality Should nulls compare equal? - Returns ------- Tuple[Column, Column] @@ -156,7 +155,6 @@ cpdef Column left_semi_join( nulls_equal : NullEquality Should nulls compare equal? - Returns ------- Column @@ -190,7 +188,6 @@ cpdef Column left_anti_join( nulls_equal : NullEquality Should nulls compare equal? - Returns ------- Column @@ -204,3 +201,26 @@ cpdef Column left_anti_join( nulls_equal ) return _column_from_gather_map(move(c_result)) + + +cpdef Table cross_join(Table left, Table right): + """Perform a cross join on two tables. + + For details see :cpp:func:`cross_join`. + + Parameters + ---------- + left : Table + The left table to join. + right: Table + The right table to join. + + Returns + ------- + Table + The result of cross joining the two inputs. + """ + cdef unique_ptr[table] result + with nogil: + result = move(cpp_join.cross_join(left.view(), right.view())) + return Table.from_libcudf(move(result)) diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/join.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/join.pxd index 89a30f0f255..32cd17f7c11 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/join.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/join.pxd @@ -70,3 +70,8 @@ cdef extern from "cudf/join.hpp" namespace "cudf" nogil: const table_view right_keys, null_equality nulls_equal, ) except + + + cdef unique_ptr[table] cross_join( + const table_view left, + const table_view right, + ) except + diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/scalar/scalar_factories.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/scalar/scalar_factories.pxd index 5c4e5bf346f..c8220df8938 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/scalar/scalar_factories.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/scalar/scalar_factories.pxd @@ -8,3 +8,4 @@ from cudf._lib.pylibcudf.libcudf.scalar.scalar cimport scalar cdef extern from "cudf/scalar/scalar_factories.hpp" namespace "cudf" nogil: cdef unique_ptr[scalar] make_string_scalar(const string & _string) except + + cdef unique_ptr[scalar] make_fixed_width_scalar[T](T value) except + diff --git a/python/cudf/cudf/_lib/pylibcudf/strings/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/strings/CMakeLists.txt index cb7f71b1912..b499a127541 100644 --- a/python/cudf/cudf/_lib/pylibcudf/strings/CMakeLists.txt +++ b/python/cudf/cudf/_lib/pylibcudf/strings/CMakeLists.txt @@ -13,7 +13,7 @@ # ============================================================================= set(cython_sources capitalize.pyx case.pyx char_types.pyx contains.pyx find.pyx regex_flags.pyx - regex_program.pyx replace.pyx + regex_program.pyx replace.pyx slice.pyx ) set(linked_libraries cudf::cudf) diff --git a/python/cudf/cudf/_lib/pylibcudf/strings/__init__.pxd b/python/cudf/cudf/_lib/pylibcudf/strings/__init__.pxd index 959aa94737d..d1f632d6d8e 100644 --- a/python/cudf/cudf/_lib/pylibcudf/strings/__init__.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/strings/__init__.pxd @@ -9,4 +9,5 @@ from . cimport ( regex_flags, regex_program, replace, + slice, ) diff --git a/python/cudf/cudf/_lib/pylibcudf/strings/__init__.py b/python/cudf/cudf/_lib/pylibcudf/strings/__init__.py index b7384913286..ef102aff2af 100644 --- a/python/cudf/cudf/_lib/pylibcudf/strings/__init__.py +++ b/python/cudf/cudf/_lib/pylibcudf/strings/__init__.py @@ -9,4 +9,5 @@ regex_flags, regex_program, replace, + slice, ) diff --git a/python/cudf/cudf/_lib/pylibcudf/strings/slice.pxd b/python/cudf/cudf/_lib/pylibcudf/strings/slice.pxd new file mode 100644 index 00000000000..7d8d0006ef4 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/strings/slice.pxd @@ -0,0 +1,15 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from cudf._lib.pylibcudf.column cimport Column +from cudf._lib.pylibcudf.scalar cimport Scalar + +ctypedef fused ColumnOrScalar: + Column + Scalar + +cpdef Column slice_strings( + Column input, + ColumnOrScalar start=*, + ColumnOrScalar stop=*, + Scalar step=* +) diff --git a/python/cudf/cudf/_lib/pylibcudf/strings/slice.pyx b/python/cudf/cudf/_lib/pylibcudf/strings/slice.pyx new file mode 100644 index 00000000000..df75134fb71 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/strings/slice.pyx @@ -0,0 +1,102 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp.memory cimport unique_ptr +from libcpp.utility cimport move + +from cudf._lib.pylibcudf.column cimport Column +from cudf._lib.pylibcudf.libcudf.column.column cimport column +from cudf._lib.pylibcudf.libcudf.scalar.scalar cimport numeric_scalar +from cudf._lib.pylibcudf.libcudf.scalar.scalar_factories cimport ( + make_fixed_width_scalar as cpp_make_fixed_width_scalar, +) +from cudf._lib.pylibcudf.libcudf.strings cimport substring as cpp_slice +from cudf._lib.pylibcudf.libcudf.types cimport size_type +from cudf._lib.pylibcudf.scalar cimport Scalar + +from cython.operator import dereference + + +cpdef Column slice_strings( + Column input, + ColumnOrScalar start=None, + ColumnOrScalar stop=None, + Scalar step=None +): + """Perform a slice operation on a strings column. + + ``start`` and ``stop`` may be a + :py:class:`~cudf._lib.pylibcudf.column.Column` or a + :py:class:`~cudf._lib.pylibcudf.scalar.Scalar`. But ``step`` must be a + :py:class:`~cudf._lib.pylibcudf.scalar.Scalar`. + + For details, see :cpp:func:`cudf::strings::slice_strings`. + + Parameters + ---------- + input : Column + Strings column for this operation + start : Union[Column, Scalar] + The start character position or positions. + stop : Union[Column, Scalar] + The end character position or positions + step : Scalar + Distance between input characters retrieved + + Returns + ------- + pylibcudf.Column + The result of the slice operation + """ + cdef unique_ptr[column] c_result + cdef numeric_scalar[size_type]* cpp_start + cdef numeric_scalar[size_type]* cpp_stop + cdef numeric_scalar[size_type]* cpp_step + + if input is None: + raise ValueError("input cannot be None") + + if ColumnOrScalar is Column: + if step is not None: + raise ValueError("Column-wise slice does not support step") + + if start is None or stop is None: + raise ValueError( + "start and stop must be provided for Column-wise slice" + ) + + with nogil: + c_result = cpp_slice.slice_strings( + input.view(), + start.view(), + stop.view() + ) + + elif ColumnOrScalar is Scalar: + if start is None: + start = Scalar.from_libcudf( + cpp_make_fixed_width_scalar(0) + ) + if stop is None: + stop = Scalar.from_libcudf( + cpp_make_fixed_width_scalar(0) + ) + if step is None: + step = Scalar.from_libcudf( + cpp_make_fixed_width_scalar(1) + ) + + cpp_start = start.c_obj.get() + cpp_stop = stop.c_obj.get() + cpp_step = step.c_obj.get() + + with nogil: + c_result = cpp_slice.slice_strings( + input.view(), + dereference(cpp_start), + dereference(cpp_stop), + dereference(cpp_step) + ) + else: + raise ValueError("start, stop, and step must be either Column or Scalar") + + return Column.from_libcudf(move(c_result)) diff --git a/python/cudf/cudf/_lib/pylibcudf/table.pyx b/python/cudf/cudf/_lib/pylibcudf/table.pyx index d93ac78721b..d91fa0474b0 100644 --- a/python/cudf/cudf/_lib/pylibcudf/table.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/table.pyx @@ -83,6 +83,8 @@ cdef class Table: cpdef int num_rows(self): """The number of rows in this table.""" + if self.num_columns() == 0: + return 0 return self._columns[0].size() cpdef list columns(self): diff --git a/python/cudf/cudf/_lib/strings/substring.pyx b/python/cudf/cudf/_lib/strings/substring.pyx index 170c1016b89..706c21c0634 100644 --- a/python/cudf/cudf/_lib/strings/substring.pyx +++ b/python/cudf/cudf/_lib/strings/substring.pyx @@ -2,24 +2,16 @@ import numpy as np -from libcpp.memory cimport unique_ptr -from libcpp.utility cimport move - from cudf.core.buffer import acquire_spill_lock from cudf._lib.column cimport Column -from cudf._lib.pylibcudf.libcudf.column.column cimport column -from cudf._lib.pylibcudf.libcudf.column.column_view cimport column_view -from cudf._lib.pylibcudf.libcudf.strings.substring cimport ( - slice_strings as cpp_slice_strings, -) -from cudf._lib.pylibcudf.libcudf.types cimport size_type from cudf._lib.scalar import as_device_scalar -from cudf._lib.pylibcudf.libcudf.scalar.scalar cimport numeric_scalar from cudf._lib.scalar cimport DeviceScalar +import cudf._lib.pylibcudf as plc + @acquire_spill_lock() def slice_strings(Column source_strings, @@ -32,30 +24,18 @@ def slice_strings(Column source_strings, performed in steps by skipping `step` number of characters in a string. """ - cdef unique_ptr[column] c_result - cdef column_view source_view = source_strings.view() - cdef DeviceScalar start_scalar = as_device_scalar(start, np.int32) cdef DeviceScalar end_scalar = as_device_scalar(end, np.int32) cdef DeviceScalar step_scalar = as_device_scalar(step, np.int32) - cdef numeric_scalar[size_type]* start_numeric_scalar = \ - ( - start_scalar.get_raw_ptr()) - cdef numeric_scalar[size_type]* end_numeric_scalar = \ - (end_scalar.get_raw_ptr()) - cdef numeric_scalar[size_type]* step_numeric_scalar = \ - (step_scalar.get_raw_ptr()) - - with nogil: - c_result = move(cpp_slice_strings( - source_view, - start_numeric_scalar[0], - end_numeric_scalar[0], - step_numeric_scalar[0] - )) - - return Column.from_unique_ptr(move(c_result)) + return Column.from_pylibcudf( + plc.strings.slice.slice_strings( + source_strings.to_pylibcudf(mode="read"), + start_scalar.c_value, + end_scalar.c_value, + step_scalar.c_value + ) + ) @acquire_spill_lock() @@ -67,19 +47,13 @@ def slice_from(Column source_strings, at given starts and stops positions. `starts` and `stops` here are positions per element in the string-column. """ - cdef unique_ptr[column] c_result - cdef column_view source_view = source_strings.view() - cdef column_view starts_view = starts.view() - cdef column_view stops_view = stops.view() - - with nogil: - c_result = move(cpp_slice_strings( - source_view, - starts_view, - stops_view - )) - - return Column.from_unique_ptr(move(c_result)) + return Column.from_pylibcudf( + plc.strings.slice.slice_strings( + source_strings.to_pylibcudf(mode="read"), + starts.to_pylibcudf(mode="read"), + stops.to_pylibcudf(mode="read") + ) + ) @acquire_spill_lock() @@ -90,8 +64,7 @@ def get(Column source_strings, character from each input string. The index of characters required can be controlled by passing `index`. """ - cdef unique_ptr[column] c_result - cdef column_view source_view = source_strings.view() + if index < 0: next_index = index - 1 step = -1 @@ -102,20 +75,11 @@ def get(Column source_strings, cdef DeviceScalar end_scalar = as_device_scalar(next_index, np.int32) cdef DeviceScalar step_scalar = as_device_scalar(step, np.int32) - cdef numeric_scalar[size_type]* start_numeric_scalar = \ - ( - start_scalar.get_raw_ptr()) - cdef numeric_scalar[size_type]* end_numeric_scalar = \ - (end_scalar.get_raw_ptr()) - cdef numeric_scalar[size_type]* step_numeric_scalar = \ - (step_scalar.get_raw_ptr()) - - with nogil: - c_result = move(cpp_slice_strings( - source_view, - start_numeric_scalar[0], - end_numeric_scalar[0], - step_numeric_scalar[0] - )) - - return Column.from_unique_ptr(move(c_result)) + return Column.from_pylibcudf( + plc.strings.slice.slice_strings( + source_strings.to_pylibcudf(mode="read"), + start_scalar.c_value, + end_scalar.c_value, + step_scalar.c_value + ) + ) diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index dfcdfbb9d91..5db6fd904a9 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -927,13 +927,13 @@ def is_unique(self) -> bool: @property def is_monotonic_increasing(self) -> bool: - return not self.has_nulls() and libcudf.sort.is_sorted( + return not self.has_nulls(include_nan=True) and libcudf.sort.is_sorted( [self], [True], None ) @property def is_monotonic_decreasing(self) -> bool: - return not self.has_nulls() and libcudf.sort.is_sorted( + return not self.has_nulls(include_nan=True) and libcudf.sort.is_sorted( [self], [False], None ) diff --git a/python/cudf/cudf/core/column_accessor.py b/python/cudf/cudf/core/column_accessor.py index 1bf9a393566..f30a557efb0 100644 --- a/python/cudf/cudf/core/column_accessor.py +++ b/python/cudf/cudf/core/column_accessor.py @@ -472,6 +472,7 @@ def swaplevel(self, i=-2, j=-1): new_keys[n][i], new_keys[n][j] = row[j], row[i] new_dict.update({row: tuple(new_keys[n])}) + # TODO: Change to deep=False when copy-on-write is default new_data = {new_dict[k]: v.copy(deep=True) for k, v in self.items()} # swap level_names for i and j @@ -669,10 +670,11 @@ def rename_column(x): raise ValueError("Duplicate column names are not allowed") data = dict(zip(new_col_names, self.values())) - return self.__class__( + return type(self)( data=data, level_names=self.level_names, multiindex=self.multiindex, + label_dtype=self.label_dtype, verify=False, ) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index f0d8157011d..f7f5ef792d6 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -1121,8 +1121,6 @@ def _from_data( @staticmethod @_cudf_nvtx_annotate def _align_input_series_indices(data, index): - data = data.copy() - input_series = [ Series(val) for val in data.values() @@ -1142,6 +1140,7 @@ def _align_input_series_indices(data, index): ) index = aligned_input_series[0].index + data = data.copy() for name, val in data.items(): if isinstance(val, (pd.Series, Series, dict)): data[name] = aligned_input_series.pop(0) @@ -2969,6 +2968,7 @@ def set_index( idx = MultiIndex._from_data(dict(enumerate(data_to_add))) idx.names = names + # TODO: Change to deep=False when copy-on-write is default df = self if inplace else self.copy(deep=True) if verify_integrity and not idx.is_unique: @@ -3565,6 +3565,9 @@ def rename( mapper if columns is None and axis in (1, "columns") else columns ) + result = self if inplace else self.copy(deep=copy) + + out_index = None if index: if ( any(isinstance(item, str) for item in index.values()) @@ -3586,36 +3589,36 @@ def rename( ) out_index._data[level] = column.as_column(level_values) out_index._compute_levels_and_codes() - out = DataFrame(index=out_index) else: to_replace = list(index.keys()) vals = list(index.values()) is_all_na = vals.count(None) == len(vals) try: - index_data = { - name: col.find_and_replace(to_replace, vals, is_all_na) - for name, col in self.index._data.items() - } + out_index = _index_from_data( + { + name: col.find_and_replace( + to_replace, vals, is_all_na + ) + for name, col in self.index._data.items() + } + ) except OverflowError: - index_data = self.index._data.copy(deep=True) + pass - out = DataFrame(index=_index_from_data(index_data)) - else: - out = DataFrame(index=self.index) + if out_index is not None: + result.index = out_index if columns: - out._data = self._data.rename_levels(mapper=columns, level=level) - else: - out._data = self._data.copy(deep=copy) + result._data = result._data.rename_levels( + mapper=columns, level=level + ) - if inplace: - self._data = out._data - else: - return out.copy(deep=copy) + return result @_cudf_nvtx_annotate def add_prefix(self, prefix): + # TODO: Change to deep=False when copy-on-write is default out = self.copy(deep=True) out.columns = [ prefix + col_name for col_name in list(self._data.keys()) @@ -3624,6 +3627,7 @@ def add_prefix(self, prefix): @_cudf_nvtx_annotate def add_suffix(self, suffix): + # TODO: Change to deep=False when copy-on-write is default out = self.copy(deep=True) out.columns = [ col_name + suffix for col_name in list(self._data.keys()) @@ -3956,7 +3960,8 @@ def swaplevel(self, i=-2, j=-1, axis=0): weight 1.0 0.8 length 0.3 0.2 """ - result = self.copy() + # TODO: Change to deep=False when copy-on-write is default + result = self.copy(deep=True) # To get axis number axis = self._get_axis_from_axis_arg(axis) @@ -4027,7 +4032,7 @@ def transpose(self): # Set the old column names as the new index result = self.__class__._from_data( - {i: col for i, col in enumerate(result_columns)}, + ColumnAccessor(dict(enumerate(result_columns)), verify=False), index=as_index(index), ) # Set the old index as the new column names @@ -5528,7 +5533,7 @@ def to_arrow(self, preserve_index=None): b: [[4,5,6]] """ - data = self.copy(deep=False) + data = self index_descr = [] write_index = preserve_index is not False keep_range_index = write_index and preserve_index is None @@ -5556,6 +5561,7 @@ def to_arrow(self, preserve_index=None): index_descr = ( index.names if index.name is not None else ("index",) ) + data = data.copy(deep=False) for gen_name, col_name in zip(index_descr, index._data.names): data._insert( data.shape[1], diff --git a/python/cudf/cudf/core/reshape.py b/python/cudf/cudf/core/reshape.py index 903c4fe7df5..1120642947b 100644 --- a/python/cudf/cudf/core/reshape.py +++ b/python/cudf/cudf/core/reshape.py @@ -300,51 +300,31 @@ def concat(objs, axis=0, join="outer", ignore_index=False, sort=None): obj = objs[0] if ignore_index: if axis == 1: - result = cudf.DataFrame._from_data( - data=obj._data.copy(deep=True), - index=obj.index.copy(deep=True), - ) - # The DataFrame constructor for dict-like data (such as the - # ColumnAccessor given by obj._data here) will drop any columns - # in the data that are not in `columns`, so we have to rename - # after construction. - result.columns = pd.RangeIndex(len(obj._data.names)) - else: if isinstance(obj, cudf.Series): - result = cudf.Series._from_data( - data=obj._data.copy(deep=True), - index=cudf.RangeIndex(len(obj)), - ) - elif isinstance(obj, pd.Series): - result = cudf.Series( - data=obj, - index=cudf.RangeIndex(len(obj)), - ) + result = obj.to_frame() else: - result = cudf.DataFrame._from_data( - data=obj._data.copy(deep=True), - index=cudf.RangeIndex(len(obj)), - ) + result = obj.copy(deep=True) + result.columns = pd.RangeIndex(len(result._data)) + else: + result = type(obj)._from_data( + data=obj._data.copy(deep=True), + index=cudf.RangeIndex(len(obj)), + ) + elif axis == 0: + result = obj.copy(deep=True) else: - if axis == 0: - result = obj.copy() + if isinstance(obj, cudf.Series): + result = obj.to_frame() else: - data = obj._data.copy(deep=True) - if isinstance(obj, cudf.Series) and obj.name is None: - # If the Series has no name, pandas renames it to 0. - data[0] = data.pop(None) - result = cudf.DataFrame._from_data( - data, index=obj.index.copy(deep=True) + result = obj.copy(deep=True) + if keys is not None and isinstance(result, cudf.DataFrame): + k = keys[0] + result.columns = cudf.MultiIndex.from_tuples( + [ + (k, *c) if isinstance(c, tuple) else (k, c) + for c in result._column_names + ] ) - if keys is not None: - if isinstance(result, cudf.DataFrame): - k = keys[0] - result.columns = cudf.MultiIndex.from_tuples( - [ - (k, *c) if isinstance(c, tuple) else (k, c) - for c in result._column_names - ] - ) if isinstance(result, cudf.Series) and axis == 0: # sort has no effect for series concatted along axis 0 @@ -1179,7 +1159,6 @@ def unstack(df, level, fill_value=None): if pd.api.types.is_list_like(level): if not level: return df - df = df.copy(deep=False) if not isinstance(df.index, cudf.MultiIndex): dtype = df._columns[0].dtype for col in df._columns: @@ -1195,6 +1174,7 @@ def unstack(df, level, fill_value=None): ) return res else: + df = df.copy(deep=False) columns = df.index._poplevels(level) index = df.index result = _pivot(df, index, columns) diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index 15ad0813601..ea25d482578 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -584,7 +584,7 @@ def __init__( data = {} if isinstance(data, (pd.Series, pd.Index, BaseIndex, Series)): - if copy: + if copy and not isinstance(data, (pd.Series, pd.Index)): data = data.copy(deep=True) name_from_data = data.name column = as_column(data, nan_as_null=nan_as_null, dtype=dtype) @@ -3434,6 +3434,7 @@ def rename(self, index=None, copy=True): @_cudf_nvtx_annotate def add_prefix(self, prefix): return Series._from_data( + # TODO: Change to deep=False when copy-on-write is default data=self._data.copy(deep=True), index=prefix + self.index.astype(str), ) @@ -3441,6 +3442,7 @@ def add_prefix(self, prefix): @_cudf_nvtx_annotate def add_suffix(self, suffix): return Series._from_data( + # TODO: Change to deep=False when copy-on-write is default data=self._data.copy(deep=True), index=self.index.astype(str) + suffix, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 58b104b84e9..2a838ca7417 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -908,12 +908,20 @@ def _read_parquet( "cudf engine doesn't support the " f"following positional arguments: {list(args)}" ) - return libparquet.read_parquet( - filepaths_or_buffers, - columns=columns, - row_groups=row_groups, - use_pandas_metadata=use_pandas_metadata, - ) + if cudf.get_option("mode.pandas_compatible"): + return libparquet.ParquetReader( + filepaths_or_buffers, + columns=columns, + row_groups=row_groups, + use_pandas_metadata=use_pandas_metadata, + ).read() + else: + return libparquet.read_parquet( + filepaths_or_buffers, + columns=columns, + row_groups=row_groups, + use_pandas_metadata=use_pandas_metadata, + ) else: if ( isinstance(filepaths_or_buffers, list) diff --git a/python/cudf/cudf/pylibcudf_tests/test_join.py b/python/cudf/cudf/pylibcudf_tests/test_join.py new file mode 100644 index 00000000000..eb25ed915b1 --- /dev/null +++ b/python/cudf/cudf/pylibcudf_tests/test_join.py @@ -0,0 +1,29 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +import numpy as np +import pyarrow as pa +from utils import assert_table_eq + +from cudf._lib import pylibcudf as plc + + +def test_cross_join(): + left = pa.Table.from_arrays([[0, 1, 2], [3, 4, 5]], names=["a", "b"]) + right = pa.Table.from_arrays( + [[6, 7, 8, 9], [10, 11, 12, 13]], names=["c", "d"] + ) + + pleft = plc.interop.from_arrow(left) + pright = plc.interop.from_arrow(right) + + expect = pa.Table.from_arrays( + [ + *(np.repeat(c.to_numpy(), len(right)) for c in left.columns), + *(np.tile(c.to_numpy(), len(left)) for c in right.columns), + ], + names=["a", "b", "c", "d"], + ) + + got = plc.join.cross_join(pleft, pright) + + assert_table_eq(expect, got) diff --git a/python/cudf/cudf/pylibcudf_tests/test_string_slice.py b/python/cudf/cudf/pylibcudf_tests/test_string_slice.py new file mode 100644 index 00000000000..bd63987b30f --- /dev/null +++ b/python/cudf/cudf/pylibcudf_tests/test_string_slice.py @@ -0,0 +1,116 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +import pyarrow as pa +import pytest +from utils import assert_column_eq + +import cudf._lib.pylibcudf as plc + + +@pytest.fixture(scope="module") +def pa_col(): + return pa.array(["AbC", "123abc", "", " ", None]) + + +@pytest.fixture(scope="module") +def plc_col(pa_col): + return plc.interop.from_arrow(pa_col) + + +@pytest.fixture( + scope="module", + params=[(1, 3, 1), (0, 3, -1), (3, 2, 1), (1, 5, 5), (1, 100, 2)], +) +def pa_start_stop_step(request): + return tuple(pa.scalar(x, type=pa.int32()) for x in request.param) + + +@pytest.fixture(scope="module") +def plc_start_stop_step(pa_start_stop_step): + return tuple(plc.interop.from_arrow(x) for x in pa_start_stop_step) + + +@pytest.fixture(scope="module") +def pa_starts_col(): + return pa.array([0, 1, 3, -1, 100]) + + +@pytest.fixture(scope="module") +def plc_starts_col(pa_starts_col): + return plc.interop.from_arrow(pa_starts_col) + + +@pytest.fixture(scope="module") +def pa_stops_col(): + return pa.array([1, 3, 4, -1, 100]) + + +@pytest.fixture(scope="module") +def plc_stops_col(pa_stops_col): + return plc.interop.from_arrow(pa_stops_col) + + +def test_slice(pa_col, plc_col, pa_start_stop_step, plc_start_stop_step): + pa_start, pa_stop, pa_step = pa_start_stop_step + plc_start, plc_stop, plc_step = plc_start_stop_step + + def slice_string(st, start, stop, step): + return st[start:stop:step] if st is not None else None + + expected = pa.array( + [ + slice_string(x, pa_start.as_py(), pa_stop.as_py(), pa_step.as_py()) + for x in pa_col.to_pylist() + ], + type=pa.string(), + ) + + got = plc.strings.slice.slice_strings( + plc_col, start=plc_start, stop=plc_stop, step=plc_step + ) + + assert_column_eq(expected, got) + + +def test_slice_column( + pa_col, plc_col, pa_starts_col, plc_starts_col, pa_stops_col, plc_stops_col +): + def slice_string(st, start, stop): + if stop < 0: + stop = len(st) + return st[start:stop] if st is not None else None + + expected = pa.array( + [ + slice_string(x, start, stop) + for x, start, stop in zip( + pa_col.to_pylist(), + pa_starts_col.to_pylist(), + pa_stops_col.to_pylist(), + ) + ], + type=pa.string(), + ) + + got = plc.strings.slice.slice_strings( + plc_col, plc_starts_col, plc_stops_col + ) + + assert_column_eq(expected, got) + + +def test_slice_invalid(plc_col, plc_starts_col, plc_stops_col): + with pytest.raises(TypeError): + # no maching signature + plc.strings.slice.slice_strings(None, pa_starts_col, pa_stops_col) + with pytest.raises(ValueError): + # signature found but wrong value passed + plc.strings.slice.slice_strings(plc_col, plc_starts_col, None) + with pytest.raises(TypeError): + # no matching signature (2nd arg) + plc.strings.slice.slice_strings(plc_col, None, plc_stops_col) + with pytest.raises(TypeError): + # can't provide step for columnwise api + plc.strings.slice.slice_strings( + plc_col, plc_starts_col, plc_stops_col, plc_starts_col + ) diff --git a/python/cudf/cudf/pylibcudf_tests/test_table.py b/python/cudf/cudf/pylibcudf_tests/test_table.py new file mode 100644 index 00000000000..cf1d51f6491 --- /dev/null +++ b/python/cudf/cudf/pylibcudf_tests/test_table.py @@ -0,0 +1,22 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +import pyarrow as pa +import pytest + +import cudf._lib.pylibcudf as plc + + +@pytest.mark.parametrize( + "arrow_tbl", + [ + pa.table([]), + pa.table({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]}), + pa.table({"a": [1, 2, 3]}), + pa.table({"a": [1], "b": [2], "c": [3]}), + ], +) +def test_table_shape(arrow_tbl): + plc_tbl = plc.interop.from_arrow(arrow_tbl) + + plc_tbl_shape = (plc_tbl.num_rows(), plc_tbl.num_columns()) + assert plc_tbl_shape == arrow_tbl.shape diff --git a/python/cudf/cudf/tests/test_dataframe.py b/python/cudf/cudf/tests/test_dataframe.py index 05ee8346afa..fc7fd87d4c5 100644 --- a/python/cudf/cudf/tests/test_dataframe.py +++ b/python/cudf/cudf/tests/test_dataframe.py @@ -10024,6 +10024,14 @@ def test_dataframe_rename_duplicate_column(): gdf.rename(columns={"a": "b"}, inplace=True) +def test_dataframe_rename_columns_keep_type(): + gdf = cudf.DataFrame([[1, 2, 3]]) + gdf.columns = cudf.Index([4, 5, 6], dtype=np.int8) + result = gdf.rename({4: 50}, axis="columns").columns + expected = pd.Index([50, 5, 6], dtype=np.int8) + assert_eq(result, expected) + + @pytest_unmark_spilling @pytest.mark.skipif( PANDAS_VERSION < PANDAS_CURRENT_SUPPORTED_VERSION, diff --git a/python/cudf/cudf/tests/test_monotonic.py b/python/cudf/cudf/tests/test_monotonic.py index 0896d91570e..790e84559a9 100644 --- a/python/cudf/cudf/tests/test_monotonic.py +++ b/python/cudf/cudf/tests/test_monotonic.py @@ -33,11 +33,13 @@ def test_range_index(testrange): "testlist", [ [1, 2, 3, 4], + [1, 2, 3, 4, None], [1, 2, 3, 3, 4], [10, 9, 8, 7], [10, 9, 8, 8, 7], ["c", "d", "e", "f"], ["c", "d", "e", "e", "f"], + ["c", "d", "e", "f", None], ["z", "y", "x", "r"], ["z", "y", "x", "x", "r"], ], @@ -51,6 +53,23 @@ def test_generic_index(testlist): assert index.is_monotonic_decreasing == index_pd.is_monotonic_decreasing +@pytest.mark.parametrize( + "testlist", + [ + [1, 2, 3, 4, np.nan], + [10, 9, 8, np.nan, 7], + [10, 9, 8, 8, 7, np.nan], + ], +) +def test_float_index(testlist): + index_pd = pd.Index(testlist) + index = cudf.from_pandas(index_pd, nan_as_null=False) + + assert index.is_unique == index_pd.is_unique + assert index.is_monotonic_increasing == index_pd.is_monotonic_increasing + assert index.is_monotonic_decreasing == index_pd.is_monotonic_decreasing + + @pytest.mark.parametrize( "testlist", [ diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index e1e7952605b..588bc87d268 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -3485,3 +3485,14 @@ def test_parquet_chunked_reader( ) actual = reader.read() assert_eq(expected, actual) + + +def test_parquet_reader_pandas_compatibility(): + df = pd.DataFrame( + {"a": [1, 2, 3, 4] * 10000, "b": ["av", "qw", "hi", "xyz"] * 10000} + ) + buffer = BytesIO() + df.to_parquet(buffer) + with cudf.option_context("mode.pandas_compatible", True): + expected = cudf.read_parquet(buffer) + assert_eq(expected, df) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index b3dd6ae7cc3..3f5f3c74050 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -123,7 +123,7 @@ def broadcast( ] -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class IR: """Abstract plan node, representing an unevaluated dataframe.""" @@ -157,7 +157,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: ) # pragma: no cover -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class PythonScan(IR): """Representation of input from a python function.""" @@ -171,7 +171,7 @@ def __post_init__(self): raise NotImplementedError("PythonScan not implemented") -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class Scan(IR): """Input from files.""" @@ -248,7 +248,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return df.filter(mask) -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class Cache(IR): """ Return a cached plan node. @@ -269,7 +269,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return cache.setdefault(self.key, self.value.evaluate(cache=cache)) -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class DataFrameScan(IR): """ Input from an existing polars DataFrame. @@ -315,7 +315,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return df -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class Select(IR): """Produce a new dataframe selecting given expressions from an input.""" @@ -336,7 +336,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return DataFrame(columns) -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class Reduce(IR): """ Produce a new dataframe selecting given expressions from an input. @@ -389,7 +389,7 @@ def placeholder_column(n: int) -> plc.Column: ) -@dataclasses.dataclass(slots=False) +@dataclasses.dataclass class GroupBy(IR): """Perform a groupby.""" @@ -490,7 +490,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return DataFrame([*result_keys, *results]).slice(self.options.slice) -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class Join(IR): """A join of two dataframes.""" @@ -503,7 +503,7 @@ class Join(IR): right_on: list[expr.NamedExpr] """List of expressions used as keys in the right frame.""" options: tuple[ - Literal["inner", "left", "full", "leftsemi", "leftanti"], + Literal["inner", "left", "full", "leftsemi", "leftanti", "cross"], bool, tuple[int, int] | None, str | None, @@ -520,11 +520,14 @@ class Join(IR): def __post_init__(self) -> None: """Validate preconditions.""" - if self.options[0] == "cross": - raise NotImplementedError("cross join not implemented") + if any( + isinstance(e.value, expr.Literal) + for e in itertools.chain(self.left_on, self.right_on) + ): + raise NotImplementedError("Join with literal as join key.") - @cache @staticmethod + @cache def _joiners( how: Literal["inner", "left", "full", "leftsemi", "leftanti"], ) -> tuple[ @@ -567,35 +570,42 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" left = self.left.evaluate(cache=cache) right = self.right.evaluate(cache=cache) - left_on = DataFrame( - broadcast( - *(e.evaluate(left) for e in self.left_on), target_length=left.num_rows - ) - ) - right_on = DataFrame( - broadcast( - *(e.evaluate(right) for e in self.right_on), - target_length=right.num_rows, - ) - ) how, join_nulls, zlice, suffix, coalesce = self.options + suffix = "_right" if suffix is None else suffix + if how == "cross": + # Separate implementation, since cross_join returns the + # result, not the gather maps + columns = plc.join.cross_join(left.table, right.table).columns() + left_cols = [ + NamedColumn(new, old.name).sorted_like(old) + for new, old in zip(columns[: left.num_columns], left.columns) + ] + right_cols = [ + NamedColumn( + new, + old.name + if old.name not in left.column_names_set + else f"{old.name}{suffix}", + ) + for new, old in zip(columns[left.num_columns :], right.columns) + ] + return DataFrame([*left_cols, *right_cols]) + # TODO: Waiting on clarity based on https://github.com/pola-rs/polars/issues/17184 + left_on = DataFrame(broadcast(*(e.evaluate(left) for e in self.left_on))) + right_on = DataFrame(broadcast(*(e.evaluate(right) for e in self.right_on))) null_equality = ( plc.types.NullEquality.EQUAL if join_nulls else plc.types.NullEquality.UNEQUAL ) - suffix = "_right" if suffix is None else suffix join_fn, left_policy, right_policy = Join._joiners(how) if right_policy is None: # Semi join lg = join_fn(left_on.table, right_on.table, null_equality) - left = left.replace_columns(*left_on.columns) table = plc.copying.gather(left.table, lg, left_policy) result = DataFrame.from_table(table, left.column_names) else: lg, rg = join_fn(left_on.table, right_on.table, null_equality) - left = left.replace_columns(*left_on.columns) - right = right.replace_columns(*right_on.columns) if coalesce and how == "inner": right = right.discard_columns(right_on.column_names_set) left = DataFrame.from_table( @@ -629,7 +639,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return result.slice(zlice) -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class HStack(IR): """Add new columns to a dataframe.""" @@ -658,7 +668,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return df.with_columns(columns) -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class Distinct(IR): """Produce a new dataframe with distinct rows.""" @@ -728,7 +738,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return result.slice(self.zlice) -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class Sort(IR): """Sort a dataframe.""" @@ -797,7 +807,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return DataFrame(columns).slice(self.zlice) -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class Slice(IR): """Slice a dataframe.""" @@ -814,7 +824,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return df.slice((self.offset, self.length)) -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class Filter(IR): """Filter a dataframe with a boolean mask.""" @@ -830,7 +840,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return df.filter(mask) -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class Projection(IR): """Select a subset of columns from a dataframe.""" @@ -847,7 +857,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: return DataFrame(columns) -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class MapFunction(IR): """Apply some function to a dataframe.""" @@ -881,6 +891,13 @@ def __post_init__(self) -> None: # polars requires that all to-explode columns have the # same sub-shapes raise NotImplementedError("Explode with more than one column") + elif self.name == "rename": + old, new, _ = self.options + # TODO: perhaps polars should validate renaming in the IR? + if len(new) != len(set(new)) or ( + set(new) & (set(self.df.schema.keys() - set(old))) + ): + raise NotImplementedError("Duplicate new names in rename.") def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" @@ -906,7 +923,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: raise AssertionError("Should never be reached") # pragma: no cover -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class Union(IR): """Concatenate dataframes vertically.""" @@ -930,7 +947,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: ).slice(self.zlice) -@dataclasses.dataclass(slots=True) +@dataclasses.dataclass class HConcat(IR): """Concatenate dataframes horizontally.""" diff --git a/python/cudf_polars/cudf_polars/typing/__init__.py b/python/cudf_polars/cudf_polars/typing/__init__.py index 6d597a91724..c04eac41bb7 100644 --- a/python/cudf_polars/cudf_polars/typing/__init__.py +++ b/python/cudf_polars/cudf_polars/typing/__init__.py @@ -6,7 +6,7 @@ from __future__ import annotations from collections.abc import Mapping -from typing import TYPE_CHECKING, Literal, Protocol, TypeAlias +from typing import TYPE_CHECKING, Literal, Protocol, Union from polars.polars import _expr_nodes as pl_expr, _ir_nodes as pl_ir @@ -15,43 +15,45 @@ if TYPE_CHECKING: from typing import Callable + from typing_extensions import TypeAlias + import polars as pl -IR: TypeAlias = ( - pl_ir.PythonScan - | pl_ir.Scan - | pl_ir.Cache - | pl_ir.DataFrameScan - | pl_ir.Select - | pl_ir.GroupBy - | pl_ir.Join - | pl_ir.HStack - | pl_ir.Distinct - | pl_ir.Sort - | pl_ir.Slice - | pl_ir.Filter - | pl_ir.SimpleProjection - | pl_ir.MapFunction - | pl_ir.Union - | pl_ir.HConcat - | pl_ir.ExtContext -) - -Expr: TypeAlias = ( - pl_expr.Function - | pl_expr.Window - | pl_expr.Literal - | pl_expr.Sort - | pl_expr.SortBy - | pl_expr.Gather - | pl_expr.Filter - | pl_expr.Cast - | pl_expr.Column - | pl_expr.Agg - | pl_expr.BinaryExpr - | pl_expr.Len - | pl_expr.PyExprIR -) +IR: TypeAlias = Union[ + pl_ir.PythonScan, + pl_ir.Scan, + pl_ir.Cache, + pl_ir.DataFrameScan, + pl_ir.Select, + pl_ir.GroupBy, + pl_ir.Join, + pl_ir.HStack, + pl_ir.Distinct, + pl_ir.Sort, + pl_ir.Slice, + pl_ir.Filter, + pl_ir.SimpleProjection, + pl_ir.MapFunction, + pl_ir.Union, + pl_ir.HConcat, + pl_ir.ExtContext, +] + +Expr: TypeAlias = Union[ + pl_expr.Function, + pl_expr.Window, + pl_expr.Literal, + pl_expr.Sort, + pl_expr.SortBy, + pl_expr.Gather, + pl_expr.Filter, + pl_expr.Cast, + pl_expr.Column, + pl_expr.Agg, + pl_expr.BinaryExpr, + pl_expr.Len, + pl_expr.PyExprIR, +] Schema: TypeAlias = Mapping[str, plc.DataType] diff --git a/python/cudf_polars/tests/expressions/test_agg.py b/python/cudf_polars/tests/expressions/test_agg.py index 2ffa1c4af6d..267d0a99692 100644 --- a/python/cudf_polars/tests/expressions/test_agg.py +++ b/python/cudf_polars/tests/expressions/test_agg.py @@ -52,7 +52,7 @@ def test_agg(df, agg): # https://github.com/rapidsai/cudf/issues/15852 check_dtypes = agg not in {"n_unique", "median"} - if not check_dtypes and q.schema["a"] != pl.Float64: + if not check_dtypes and q.collect_schema()["a"] != pl.Float64: with pytest.raises(AssertionError): assert_gpu_result_equal(q) assert_gpu_result_equal(q, check_dtypes=check_dtypes, check_exact=False) @@ -65,7 +65,7 @@ def test_agg(df, agg): ) @pytest.mark.parametrize("op", ["min", "max"]) def test_agg_float_with_nans(propagate_nans, op): - df = pl.LazyFrame({"a": [1, 2, float("nan")]}) + df = pl.LazyFrame({"a": pl.Series([1, 2, float("nan")], dtype=pl.Float64())}) op = getattr(pl.Expr, f"nan_{op}" if propagate_nans else op) q = df.select(op(pl.col("a"))) diff --git a/python/cudf_polars/tests/expressions/test_booleanfunction.py b/python/cudf_polars/tests/expressions/test_booleanfunction.py index 951b749e670..a52fba26528 100644 --- a/python/cudf_polars/tests/expressions/test_booleanfunction.py +++ b/python/cudf_polars/tests/expressions/test_booleanfunction.py @@ -26,7 +26,7 @@ def has_nulls(request): def test_booleanfunction_reduction(ignore_nulls): ldf = pl.LazyFrame( { - "a": [1, 2, 3.0, 2, 5], + "a": pl.Series([1, 2, 3.0, 2, 5], dtype=pl.Float64()), "b": [0, 3, 1, -1, None], "c": [1, 6, 5, 3, 2], } @@ -82,7 +82,9 @@ def test_boolean_function_unary(request, expr, has_nans, has_nulls): ], ) def test_unsupported_boolean_function(expr): - df = pl.LazyFrame({"a": [1, float("nan"), 2, 4], "b": [1, 2, 3, 4]}) + df = pl.LazyFrame( + {"a": pl.Series([1, float("nan"), 2, 4], dtype=pl.Float64()), "b": [1, 2, 3, 4]} + ) q = df.select(expr) @@ -95,7 +97,11 @@ def test_unsupported_boolean_function(expr): ) def test_boolean_isbetween(closed, bounds): df = pl.LazyFrame( - {"a": [1, float("nan"), 2, 4], "lo": [1, 2, 2, 3], "hi": [10, 4, 2, 4]} + { + "a": pl.Series([1, float("nan"), 2, 4], dtype=pl.Float32()), + "lo": [1, 2, 2, 3], + "hi": [10, 4, 2, 4], + } ) q = df.select(pl.col("a").is_between(*bounds, closed=closed)) diff --git a/python/cudf_polars/tests/expressions/test_rolling.py b/python/cudf_polars/tests/expressions/test_rolling.py index d4920d35f14..992efe0ba79 100644 --- a/python/cudf_polars/tests/expressions/test_rolling.py +++ b/python/cudf_polars/tests/expressions/test_rolling.py @@ -3,11 +3,9 @@ from __future__ import annotations -import pytest - import polars as pl -from cudf_polars import translate_ir +from cudf_polars.testing.asserts import assert_ir_translation_raises def test_rolling(): @@ -29,13 +27,13 @@ def test_rolling(): min_a=pl.min("a").rolling(index_column="dt", period="2d"), max_a=pl.max("a").rolling(index_column="dt", period="2d"), ) - with pytest.raises(NotImplementedError): - _ = translate_ir(q._ldf.visit()) + + assert_ir_translation_raises(q, NotImplementedError) def test_grouped_rolling(): df = pl.LazyFrame({"a": [1, 2, 3, 4, 5, 6], "b": [1, 2, 1, 3, 1, 2]}) q = df.select(pl.col("a").min().over("b")) - with pytest.raises(NotImplementedError): - _ = translate_ir(q._ldf.visit()) + + assert_ir_translation_raises(q, NotImplementedError) diff --git a/python/cudf_polars/tests/expressions/test_stringfunction.py b/python/cudf_polars/tests/expressions/test_stringfunction.py index 3c498fe7286..9729e765948 100644 --- a/python/cudf_polars/tests/expressions/test_stringfunction.py +++ b/python/cudf_polars/tests/expressions/test_stringfunction.py @@ -8,8 +8,11 @@ import polars as pl -from cudf_polars import execute_with_cudf, translate_ir -from cudf_polars.testing.asserts import assert_gpu_result_equal +from cudf_polars import execute_with_cudf +from cudf_polars.testing.asserts import ( + assert_gpu_result_equal, + assert_ir_translation_raises, +) @pytest.fixture @@ -47,22 +50,19 @@ def test_supported_stringfunction_expression(ldf): def test_unsupported_stringfunction(ldf): q = ldf.select(pl.col("a").str.count_matches("e", literal=True)) - with pytest.raises(NotImplementedError): - _ = translate_ir(q._ldf.visit()) + assert_ir_translation_raises(q, NotImplementedError) def test_contains_re_non_strict_raises(ldf): q = ldf.select(pl.col("a").str.contains(".", strict=False)) - with pytest.raises(NotImplementedError): - _ = translate_ir(q._ldf.visit()) + assert_ir_translation_raises(q, NotImplementedError) def test_contains_re_non_literal_raises(ldf): q = ldf.select(pl.col("a").str.contains(pl.col("b"), literal=False)) - with pytest.raises(NotImplementedError): - _ = translate_ir(q._ldf.visit()) + assert_ir_translation_raises(q, NotImplementedError) @pytest.mark.parametrize( diff --git a/python/cudf_polars/tests/test_groupby.py b/python/cudf_polars/tests/test_groupby.py index e70f923b097..aefad59eb91 100644 --- a/python/cudf_polars/tests/test_groupby.py +++ b/python/cudf_polars/tests/test_groupby.py @@ -6,8 +6,10 @@ import polars as pl -from cudf_polars import translate_ir -from cudf_polars.testing.asserts import assert_gpu_result_equal +from cudf_polars.testing.asserts import ( + assert_gpu_result_equal, + assert_ir_translation_raises, +) @pytest.fixture @@ -72,7 +74,7 @@ def test_groupby(df: pl.LazyFrame, maintain_order, keys, exprs): q = df.group_by(*keys, maintain_order=maintain_order).agg(*exprs) if not maintain_order: - sort_keys = list(q.schema.keys())[: len(keys)] + sort_keys = list(q.collect_schema().keys())[: len(keys)] q = q.sort(*sort_keys) assert_gpu_result_equal(q, check_exact=False) @@ -97,5 +99,4 @@ def test_groupby_len(df, keys): def test_groupby_unsupported(df, expr): q = df.group_by("key1").agg(expr) - with pytest.raises(NotImplementedError): - _ = translate_ir(q._ldf.visit()) + assert_ir_translation_raises(q, NotImplementedError) diff --git a/python/cudf_polars/tests/test_join.py b/python/cudf_polars/tests/test_join.py index f4a4704f3cc..89f6fd3455b 100644 --- a/python/cudf_polars/tests/test_join.py +++ b/python/cudf_polars/tests/test_join.py @@ -6,7 +6,10 @@ import polars as pl -from cudf_polars.testing.asserts import assert_gpu_result_equal +from cudf_polars.testing.asserts import ( + assert_gpu_result_equal, + assert_ir_translation_raises, +) @pytest.mark.parametrize( @@ -16,10 +19,6 @@ "left", "semi", "anti", - pytest.param( - "cross", - marks=pytest.mark.xfail(reason="cross join not implemented"), - ), "full", ], ) @@ -55,3 +54,34 @@ def test_join(how, coalesce, join_nulls, join_expr): right, on=join_expr, how=how, join_nulls=join_nulls, coalesce=coalesce ) assert_gpu_result_equal(query, check_row_order=False) + + +def test_cross_join(): + left = pl.DataFrame( + { + "a": [1, 2, 3, 1, None], + "b": [1, 2, 3, 4, 5], + "c": [2, 3, 4, 5, 6], + } + ).lazy() + right = pl.DataFrame( + { + "a": [1, 4, 3, 7, None, None], + "c": [2, 3, 4, 5, 6, 7], + } + ).lazy() + + q = left.join(right, how="cross") + + assert_gpu_result_equal(q) + + +@pytest.mark.parametrize( + "left_on,right_on", [(pl.col("a"), pl.lit(2)), (pl.lit(2), pl.col("a"))] +) +def test_join_literal_key_unsupported(left_on, right_on): + left = pl.LazyFrame({"a": [1, 2, 3], "b": [3, 4, 5]}) + right = pl.LazyFrame({"a": [1, 2, 3], "b": [5, 6, 7]}) + q = left.join(right, left_on=left_on, right_on=right_on, how="inner") + + assert_ir_translation_raises(q, NotImplementedError) diff --git a/python/cudf_polars/tests/test_mapfunction.py b/python/cudf_polars/tests/test_mapfunction.py index ec6b3f3fc0a..77032108e6f 100644 --- a/python/cudf_polars/tests/test_mapfunction.py +++ b/python/cudf_polars/tests/test_mapfunction.py @@ -6,8 +6,10 @@ import polars as pl -from cudf_polars import translate_ir -from cudf_polars.testing.asserts import assert_gpu_result_equal +from cudf_polars.testing.asserts import ( + assert_gpu_result_equal, + assert_ir_translation_raises, +) def test_merge_sorted_raises(): @@ -17,16 +19,14 @@ def test_merge_sorted_raises(): q = df1.merge_sorted(df2, key="a").merge_sorted(df3, key="a") - with pytest.raises(NotImplementedError): - _ = translate_ir(q._ldf.visit()) + assert_ir_translation_raises(q, NotImplementedError) def test_explode_multiple_raises(): df = pl.LazyFrame({"a": [[1, 2], [3, 4]], "b": [[5, 6], [7, 8]]}) q = df.explode("a", "b") - with pytest.raises(NotImplementedError): - _ = translate_ir(q._ldf.visit()) + assert_ir_translation_raises(q, NotImplementedError) @pytest.mark.parametrize("column", ["a", "b"]) @@ -41,3 +41,23 @@ def test_explode_single(column): q = df.explode(column) assert_gpu_result_equal(q) + + +@pytest.mark.parametrize("mapping", [{"b": "a"}, {"a": "c", "b": "c"}]) +def test_rename_duplicate_raises(mapping): + df = pl.LazyFrame({"a": [1, 2, 3], "b": [3, 4, 5]}) + + q = df.rename(mapping) + + assert_ir_translation_raises(q, NotImplementedError) + + +@pytest.mark.parametrize( + "mapping", [{}, {"b": "c"}, {"b": "a", "a": "b"}, {"a": "c", "b": "d"}] +) +def test_rename_columns(mapping): + df = pl.LazyFrame({"a": [1, 2, 3], "b": [3, 4, 5]}) + + q = df.rename(mapping) + + assert_gpu_result_equal(q) diff --git a/python/cudf_polars/tests/test_python_scan.py b/python/cudf_polars/tests/test_python_scan.py index c03474e3dc8..fd8453b77c4 100644 --- a/python/cudf_polars/tests/test_python_scan.py +++ b/python/cudf_polars/tests/test_python_scan.py @@ -2,11 +2,9 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations -import pytest - import polars as pl -from cudf_polars import translate_ir +from cudf_polars.testing.asserts import assert_ir_translation_raises def test_python_scan(): @@ -14,7 +12,6 @@ def source(with_columns, predicate, nrows): return pl.DataFrame({"a": pl.Series([1, 2, 3], dtype=pl.Int8())}) q = pl.LazyFrame._scan_python_function({"a": pl.Int8}, source, pyarrow=False) - with pytest.raises(NotImplementedError): - _ = translate_ir(q._ldf.visit()) + assert_ir_translation_raises(q, NotImplementedError) assert q.collect().equals(source(None, None, None)) diff --git a/python/cudf_polars/tests/test_union.py b/python/cudf_polars/tests/test_union.py index 6c9122bc260..b021d832910 100644 --- a/python/cudf_polars/tests/test_union.py +++ b/python/cudf_polars/tests/test_union.py @@ -2,12 +2,12 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations -import pytest - import polars as pl -from cudf_polars import translate_ir -from cudf_polars.testing.asserts import assert_gpu_result_equal +from cudf_polars.testing.asserts import ( + assert_gpu_result_equal, + assert_ir_translation_raises, +) def test_union(): @@ -31,8 +31,8 @@ def test_union_schema_mismatch_raises(): ).lazy() ldf2 = ldf.select(pl.col("a").cast(pl.Float32)) query = pl.concat([ldf, ldf2], how="diagonal") - with pytest.raises(NotImplementedError): - _ = translate_ir(query._ldf.visit()) + + assert_ir_translation_raises(query, NotImplementedError) def test_concat_vertical():