diff --git a/.devcontainer/cuda11.8-conda/devcontainer.json b/.devcontainer/cuda11.8-conda/devcontainer.json index c62e18512a0..8423fe21c29 100644 --- a/.devcontainer/cuda11.8-conda/devcontainer.json +++ b/.devcontainer/cuda11.8-conda/devcontainer.json @@ -11,7 +11,7 @@ "runArgs": [ "--rm", "--name", - "${localEnv:USER}-rapids-${localWorkspaceFolderBasename}-24.08-cuda11.8-conda" + "${localEnv:USER:anon}-rapids-${localWorkspaceFolderBasename}-24.08-cuda11.8-conda" ], "hostRequirements": {"gpu": "optional"}, "features": { diff --git a/.devcontainer/cuda11.8-pip/devcontainer.json b/.devcontainer/cuda11.8-pip/devcontainer.json index 4ab4bd75643..4945d6cf753 100644 --- a/.devcontainer/cuda11.8-pip/devcontainer.json +++ b/.devcontainer/cuda11.8-pip/devcontainer.json @@ -11,7 +11,7 @@ "runArgs": [ "--rm", "--name", - "${localEnv:USER}-rapids-${localWorkspaceFolderBasename}-24.08-cuda11.8-pip" + "${localEnv:USER:anon}-rapids-${localWorkspaceFolderBasename}-24.08-cuda11.8-pip" ], "hostRequirements": {"gpu": "optional"}, "features": { diff --git a/.devcontainer/cuda12.2-conda/devcontainer.json b/.devcontainer/cuda12.2-conda/devcontainer.json index 2b50454410f..05bf9173d25 100644 --- a/.devcontainer/cuda12.2-conda/devcontainer.json +++ b/.devcontainer/cuda12.2-conda/devcontainer.json @@ -11,7 +11,7 @@ "runArgs": [ "--rm", "--name", - "${localEnv:USER}-rapids-${localWorkspaceFolderBasename}-24.08-cuda12.2-conda" + "${localEnv:USER:anon}-rapids-${localWorkspaceFolderBasename}-24.08-cuda12.2-conda" ], "hostRequirements": {"gpu": "optional"}, "features": { diff --git a/.devcontainer/cuda12.2-pip/devcontainer.json b/.devcontainer/cuda12.2-pip/devcontainer.json index fc5abc56094..74420214726 100644 --- a/.devcontainer/cuda12.2-pip/devcontainer.json +++ b/.devcontainer/cuda12.2-pip/devcontainer.json @@ -11,7 +11,7 @@ "runArgs": [ "--rm", "--name", - "${localEnv:USER}-rapids-${localWorkspaceFolderBasename}-24.08-cuda12.2-pip" + "${localEnv:USER:anon}-rapids-${localWorkspaceFolderBasename}-24.08-cuda12.2-pip" ], "hostRequirements": {"gpu": "optional"}, "features": { diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 9efac3f1904..5e2f46714d9 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -22,7 +22,7 @@ java/ @rapidsai/cudf-java-codeowners /.pre-commit-config.yaml @rapidsai/ci-codeowners #packaging code owners -/.devcontainers/ @rapidsai/packaging-codeowners +/.devcontainer/ @rapidsai/packaging-codeowners /conda/ @rapidsai/packaging-codeowners /dependencies.yaml @rapidsai/packaging-codeowners /build.sh @rapidsai/packaging-codeowners diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 8865fb48e0d..4cdcac88091 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -134,7 +134,7 @@ repos: - id: rapids-dependency-file-generator args: ["--clean"] - repo: https://github.com/astral-sh/ruff-pre-commit - rev: v0.4.3 + rev: v0.4.8 hooks: - id: ruff files: python/.*$ diff --git a/ci/build_docs.sh b/ci/build_docs.sh index db306046667..67a5415f353 100755 --- a/ci/build_docs.sh +++ b/ci/build_docs.sh @@ -46,9 +46,6 @@ pushd docs/cudf make dirhtml mkdir -p "${RAPIDS_DOCS_DIR}/cudf/html" mv build/dirhtml/* "${RAPIDS_DOCS_DIR}/cudf/html" -make text -mkdir -p "${RAPIDS_DOCS_DIR}/cudf/txt" -mv build/text/* "${RAPIDS_DOCS_DIR}/cudf/txt" popd rapids-logger "Build dask-cuDF Sphinx docs" @@ -56,9 +53,6 @@ pushd docs/dask_cudf make dirhtml mkdir -p "${RAPIDS_DOCS_DIR}/dask-cudf/html" mv build/dirhtml/* "${RAPIDS_DOCS_DIR}/dask-cudf/html" -make text -mkdir -p "${RAPIDS_DOCS_DIR}/dask-cudf/txt" -mv build/text/* "${RAPIDS_DOCS_DIR}/dask-cudf/txt" popd rapids-upload-docs diff --git a/cpp/include/cudf/detail/offsets_iterator.cuh b/cpp/include/cudf/detail/offsets_iterator.cuh index 15b334245ff..1ab1fd46230 100644 --- a/cpp/include/cudf/detail/offsets_iterator.cuh +++ b/cpp/include/cudf/detail/offsets_iterator.cuh @@ -53,7 +53,7 @@ struct input_offsetalator : base_normalator { */ __device__ inline int64_t operator[](size_type idx) const { - void const* tp = p_ + (idx * this->width_); + void const* tp = p_ + (static_cast(idx) * this->width_); return this->width_ == sizeof(int32_t) ? static_cast(*static_cast(tp)) : *static_cast(tp); } @@ -79,7 +79,7 @@ struct input_offsetalator : base_normalator { cudf_assert((dtype.id() == type_id::INT32 || dtype.id() == type_id::INT64) && "Unexpected offsets type"); #endif - p_ += (this->width_ * offset); + p_ += (this->width_ * static_cast(offset)); } protected: @@ -121,7 +121,7 @@ struct output_offsetalator : base_normalator { __device__ inline output_offsetalator const operator[](size_type idx) const { output_offsetalator tmp{*this}; - tmp.p_ += (idx * this->width_); + tmp.p_ += (static_cast(idx) * this->width_); return tmp; } diff --git a/cpp/src/io/utilities/data_casting.cu b/cpp/src/io/utilities/data_casting.cu index 60cbfbc0dae..288a5690282 100644 --- a/cpp/src/io/utilities/data_casting.cu +++ b/cpp/src/io/utilities/data_casting.cu @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -417,6 +418,7 @@ struct bitfield_block { * @param null_mask Null mask * @param null_count_data pointer to store null count * @param options Settings for controlling string processing behavior + * @param d_sizes Output size of each row * @param d_offsets Offsets to identify where to store the results for each string * @param d_chars Character array to store the characters of strings */ @@ -427,7 +429,8 @@ CUDF_KERNEL void parse_fn_string_parallel(str_tuple_it str_tuples, bitmask_type* null_mask, size_type* null_count_data, cudf::io::parse_options_view const options, - size_type* d_offsets, + size_type* d_sizes, + cudf::detail::input_offsetalator d_offsets, char* d_chars) { constexpr auto BLOCK_SIZE = @@ -455,7 +458,7 @@ CUDF_KERNEL void parse_fn_string_parallel(str_tuple_it str_tuples, istring = get_next_string()) { // skip nulls if (null_mask != nullptr && not bit_is_set(null_mask, istring)) { - if (!d_chars && lane == 0) d_offsets[istring] = 0; + if (!d_chars && lane == 0) { d_sizes[istring] = 0; } continue; // gride-stride return; } @@ -476,7 +479,7 @@ CUDF_KERNEL void parse_fn_string_parallel(str_tuple_it str_tuples, if (lane == 0) { clear_bit(null_mask, istring); atomicAdd(null_count_data, 1); - if (!d_chars) d_offsets[istring] = 0; + if (!d_chars) { d_sizes[istring] = 0; } } continue; // gride-stride return; } @@ -491,7 +494,7 @@ CUDF_KERNEL void parse_fn_string_parallel(str_tuple_it str_tuples, // Copy literal/numeric value if (not is_string_value) { if (!d_chars) { - if (lane == 0) { d_offsets[istring] = in_end - in_begin; } + if (lane == 0) { d_sizes[istring] = in_end - in_begin; } } else { for (thread_index_type char_index = lane; char_index < (in_end - in_begin); char_index += BLOCK_SIZE) { @@ -621,8 +624,8 @@ CUDF_KERNEL void parse_fn_string_parallel(str_tuple_it str_tuples, clear_bit(null_mask, istring); atomicAdd(null_count_data, 1); } - last_offset = 0; - d_offsets[istring] = 0; + last_offset = 0; + d_sizes[istring] = 0; } if constexpr (!is_warp) { __syncthreads(); } break; // gride-stride return; @@ -729,7 +732,7 @@ CUDF_KERNEL void parse_fn_string_parallel(str_tuple_it str_tuples, } } } // char for-loop - if (!d_chars && lane == 0) { d_offsets[istring] = last_offset; } + if (!d_chars && lane == 0) { d_sizes[istring] = last_offset; } } // grid-stride for-loop } @@ -739,13 +742,14 @@ struct string_parse { bitmask_type* null_mask; size_type* null_count_data; cudf::io::parse_options_view const options; - size_type* d_offsets{}; + size_type* d_sizes{}; + cudf::detail::input_offsetalator d_offsets; char* d_chars{}; __device__ void operator()(size_type idx) { if (null_mask != nullptr && not bit_is_set(null_mask, idx)) { - if (!d_chars) d_offsets[idx] = 0; + if (!d_chars) { d_sizes[idx] = 0; } return; } auto const in_begin = str_tuples[idx].first; @@ -761,7 +765,7 @@ struct string_parse { if (is_null_literal && null_mask != nullptr) { clear_bit(null_mask, idx); atomicAdd(null_count_data, 1); - if (!d_chars) d_offsets[idx] = 0; + if (!d_chars) { d_sizes[idx] = 0; } return; } } @@ -773,9 +777,9 @@ struct string_parse { clear_bit(null_mask, idx); atomicAdd(null_count_data, 1); } - if (!d_chars) d_offsets[idx] = 0; + if (!d_chars) { d_sizes[idx] = 0; } } else { - if (!d_chars) d_offsets[idx] = str_process_info.bytes; + if (!d_chars) { d_sizes[idx] = str_process_info.bytes; } } } }; @@ -811,13 +815,12 @@ static std::unique_ptr parse_string(string_view_pair_it str_tuples, size_type{0}, thrust::maximum{}); - auto offsets = cudf::make_numeric_column( - data_type{type_to_id()}, col_size + 1, cudf::mask_state::UNALLOCATED, stream, mr); - auto d_offsets = offsets->mutable_view().data(); + auto sizes = rmm::device_uvector(col_size, stream); + auto d_sizes = sizes.data(); auto null_count_data = d_null_count.data(); auto single_thread_fn = string_parse{ - str_tuples, static_cast(null_mask.data()), null_count_data, options, d_offsets}; + str_tuples, static_cast(null_mask.data()), null_count_data, options, d_sizes}; thrust::for_each_n(rmm::exec_policy(stream), thrust::make_counting_iterator(0), col_size, @@ -838,7 +841,8 @@ static std::unique_ptr parse_string(string_view_pair_it str_tuples, static_cast(null_mask.data()), null_count_data, options, - d_offsets, + d_sizes, + cudf::detail::input_offsetalator{}, nullptr); } @@ -853,20 +857,22 @@ static std::unique_ptr parse_string(string_view_pair_it str_tuples, static_cast(null_mask.data()), null_count_data, options, - d_offsets, + d_sizes, + cudf::detail::input_offsetalator{}, nullptr); } - auto const bytes = - cudf::detail::sizes_to_offsets(d_offsets, d_offsets + col_size + 1, d_offsets, stream); - CUDF_EXPECTS(bytes <= std::numeric_limits::max(), - "Size of output exceeds the column size limit", - std::overflow_error); + + auto [offsets, bytes] = + cudf::strings::detail::make_offsets_child_column(sizes.begin(), sizes.end(), stream, mr); + auto d_offsets = cudf::detail::offsetalator_factory::make_input_iterator(offsets->view()); // CHARS column rmm::device_uvector chars(bytes, stream, mr); auto d_chars = chars.data(); - single_thread_fn.d_chars = d_chars; + single_thread_fn.d_chars = d_chars; + single_thread_fn.d_offsets = d_offsets; + thrust::for_each_n(rmm::exec_policy(stream), thrust::make_counting_iterator(0), col_size, @@ -882,6 +888,7 @@ static std::unique_ptr parse_string(string_view_pair_it str_tuples, static_cast(null_mask.data()), null_count_data, options, + d_sizes, d_offsets, d_chars); } @@ -897,6 +904,7 @@ static std::unique_ptr parse_string(string_view_pair_it str_tuples, static_cast(null_mask.data()), null_count_data, options, + d_sizes, d_offsets, d_chars); } diff --git a/cpp/src/quantiles/quantiles.cu b/cpp/src/quantiles/quantiles.cu index c0f536536ce..af3bda2e62e 100644 --- a/cpp/src/quantiles/quantiles.cu +++ b/cpp/src/quantiles/quantiles.cu @@ -34,6 +34,7 @@ #include #include +#include #include namespace cudf { @@ -78,7 +79,8 @@ std::unique_ptr quantiles(table_view const& input, CUDF_EXPECTS(interp == interpolation::HIGHER || interp == interpolation::LOWER || interp == interpolation::NEAREST, - "multi-column quantiles require a non-arithmetic interpolation strategy."); + "multi-column quantiles require a non-arithmetic interpolation strategy.", + std::invalid_argument); CUDF_EXPECTS(input.num_rows() > 0, "multi-column quantiles require at least one input row."); diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index a0d9083c4a4..826f879ddc0 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -570,6 +570,7 @@ ConfigureTest( large_strings/concatenate_tests.cpp large_strings/case_tests.cpp large_strings/large_strings_fixture.cpp + large_strings/many_strings_tests.cpp large_strings/merge_tests.cpp large_strings/parquet_tests.cpp large_strings/reshape_tests.cpp diff --git a/cpp/tests/io/json_test.cpp b/cpp/tests/io/json_test.cpp index 5d790e73246..57aa2721756 100644 --- a/cpp/tests/io/json_test.cpp +++ b/cpp/tests/io/json_test.cpp @@ -2374,7 +2374,6 @@ TEST_F(JsonReaderTest, MapTypes) EXPECT_EQ(col.type().id(), types[i]) << "column[" << i << "].type"; i++; } - std::cout << "\n"; }; // json diff --git a/cpp/tests/large_strings/large_strings_fixture.cpp b/cpp/tests/large_strings/large_strings_fixture.cpp index 59e0cd43d05..416b106c5a5 100644 --- a/cpp/tests/large_strings/large_strings_fixture.cpp +++ b/cpp/tests/large_strings/large_strings_fixture.cpp @@ -95,6 +95,17 @@ cudf::column_view StringsLargeTest::long_column() return g_ls_data->get_column(name); } +cudf::column_view StringsLargeTest::very_long_column() +{ + std::string name("long2"); + if (!g_ls_data->has_key(name)) { + auto itr = thrust::constant_iterator("12345"); + auto input = cudf::test::strings_column_wrapper(itr, itr + 30'000'000); + g_ls_data->add_column(name, input.release()); + } + return g_ls_data->get_column(name); +} + std::unique_ptr StringsLargeTest::get_ls_data() { CUDF_EXPECTS(g_ls_data == nullptr, "invalid call to get_ls_data"); diff --git a/cpp/tests/large_strings/large_strings_fixture.hpp b/cpp/tests/large_strings/large_strings_fixture.hpp index 8827b65f1ce..fb7b1cd00b8 100644 --- a/cpp/tests/large_strings/large_strings_fixture.hpp +++ b/cpp/tests/large_strings/large_strings_fixture.hpp @@ -33,14 +33,25 @@ class LargeStringsData; struct StringsLargeTest : public cudf::test::BaseFixture { /** * @brief Returns a column of long strings + * + * This returns 8 rows of 400 bytes */ cudf::column_view wide_column(); /** * @brief Returns a long column of strings + * + * This returns 5 million rows of 50 bytes */ cudf::column_view long_column(); + /** + * @brief Returns a very long column of strings + * + * This returns 30 million rows of 5 bytes + */ + cudf::column_view very_long_column(); + large_strings_enabler g_ls_enabler; static LargeStringsData* g_ls_data; diff --git a/cpp/tests/large_strings/many_strings_tests.cpp b/cpp/tests/large_strings/many_strings_tests.cpp new file mode 100644 index 00000000000..73fbb21d014 --- /dev/null +++ b/cpp/tests/large_strings/many_strings_tests.cpp @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "large_strings_fixture.hpp" + +#include + +#include +#include +#include +#include +#include +#include + +#include +#include + +struct StringsManyTest : public cudf::test::StringsLargeTest {}; + +TEST_F(StringsManyTest, Replace) +{ + auto const expected = this->very_long_column(); + auto const view = cudf::column_view(expected); + // force addressing (rows > max_size_type/sizeof(int64)) in a 64-bit offsets column + int constexpr max_size_type = std::numeric_limits::max(); + // minimum number of duplicates to achieve large strings (64-bit offsets) + int const min_size_multiplier = + (max_size_type / cudf::strings_column_view(view).chars_size(cudf::get_default_stream())) + 1; + // minimum row multiplier to create max_size_type/sizeof(int64) = 268,435,455 rows + int const min_row_multiplier = ((max_size_type / sizeof(int64_t)) / view.size()) + 1; + int const multiplier = std::max(min_size_multiplier, min_row_multiplier); + + std::vector input_cols(multiplier, view); + std::vector splits; + std::generate_n(std::back_inserter(splits), multiplier - 1, [view, n = 1]() mutable { + return view.size() * (n++); + }); + + auto large_input = cudf::concatenate(input_cols); // 480 million rows + auto const sv = cudf::strings_column_view(large_input->view()); + EXPECT_EQ(sv.size(), view.size() * multiplier); + EXPECT_EQ(sv.offsets().type(), cudf::data_type{cudf::type_id::INT64}); + + // Using replace tests reading large strings as well as creating large strings + auto const target = cudf::string_scalar("3"); // fake the actual replace; + auto const repl = cudf::string_scalar("3"); // logic still builds the output + auto result = cudf::strings::replace(sv, target, repl); + + // verify results in sections + auto sliced = cudf::split(result->view(), splits); + for (auto c : sliced) { + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(c, expected); + } +} diff --git a/cpp/tests/quantiles/quantiles_test.cpp b/cpp/tests/quantiles/quantiles_test.cpp index 5b7b6dd2718..b7faa20e8c1 100644 --- a/cpp/tests/quantiles/quantiles_test.cpp +++ b/cpp/tests/quantiles/quantiles_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,6 +25,8 @@ #include #include +#include + template struct QuantilesTest : public cudf::test::BaseFixture {}; @@ -104,9 +106,10 @@ TYPED_TEST(QuantilesTest, TestMultiColumnArithmeticInterpolation) cudf::test::fixed_width_column_wrapper input_b({}); auto input = cudf::table_view({input_a}); - EXPECT_THROW(cudf::quantiles(input, {0.0f}, cudf::interpolation::LINEAR), cudf::logic_error); + EXPECT_THROW(cudf::quantiles(input, {0.0f}, cudf::interpolation::LINEAR), std::invalid_argument); - EXPECT_THROW(cudf::quantiles(input, {0.0f}, cudf::interpolation::MIDPOINT), cudf::logic_error); + EXPECT_THROW(cudf::quantiles(input, {0.0f}, cudf::interpolation::MIDPOINT), + std::invalid_argument); } TYPED_TEST(QuantilesTest, TestMultiColumnUnsorted) diff --git a/docs/cudf/source/cudf_pandas/usage.md b/docs/cudf/source/cudf_pandas/usage.md index b174c606d66..376784439aa 100644 --- a/docs/cudf/source/cudf_pandas/usage.md +++ b/docs/cudf/source/cudf_pandas/usage.md @@ -26,6 +26,36 @@ From the command line, run your Python scripts with `-m cudf.pandas`: python -m cudf.pandas script.py ``` +### Usage in tandem with +[`multiprocessing`](https://docs.python.org/3/library/multiprocessing.html) +or +[`concurrent.futures`](https://docs.python.org/3/library/concurrent.futures.html) +process pools + +To use a pool of workers (for example +[`multiprocessing.Pool`](https://docs.python.org/3/library/multiprocessing.html#multiprocessing.pool.Pool) +or +[`concurrent.futures.ProcessPoolExecutor`](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor)) +in your script with `cudf.pandas`, the `cudf.pandas` module must be +loaded on the worker processes, as well as by the controlling script. +The most foolproof way to do this is to programmatically install +`cudf.pandas` at the top of your script, before anything else. +For example + +```python +# This is equivalent to python -m cudf.pandas, but will run on the +# workers too. These two lines must run before pandas is imported, +# either directly or transitively. +import cudf.pandas +cudf.pandas.install() + +from multiprocessing import Pool + +with Pool(4) as pool: + # use pool here + ... +``` + ## Understanding performance - the `cudf.pandas` profiler `cudf.pandas` will attempt to use the GPU whenever possible and fall diff --git a/docs/cudf/source/libcudf_docs/api_docs/io_readers.rst b/docs/cudf/source/libcudf_docs/api_docs/io_readers.rst index a835673dee4..f94a5ddb403 100644 --- a/docs/cudf/source/libcudf_docs/api_docs/io_readers.rst +++ b/docs/cudf/source/libcudf_docs/api_docs/io_readers.rst @@ -2,4 +2,4 @@ Io Readers ========== .. doxygengroup:: io_readers - :desc-only: + :members: diff --git a/docs/cudf/source/libcudf_docs/api_docs/strings_convert.rst b/docs/cudf/source/libcudf_docs/api_docs/strings_convert.rst index ae5d78fb1a1..f2f320bd0e4 100644 --- a/docs/cudf/source/libcudf_docs/api_docs/strings_convert.rst +++ b/docs/cudf/source/libcudf_docs/api_docs/strings_convert.rst @@ -2,4 +2,4 @@ Strings Convert =============== .. doxygengroup:: strings_convert - :desc-only: + :members: diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst index 870ed8856d1..1e03fa80bb5 100644 --- a/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst @@ -21,6 +21,7 @@ This page provides API documentation for pylibcudf. join lists merge + quantiles reduce reshape rolling diff --git a/docs/cudf/source/user_guide/api_docs/pylibcudf/quantiles.rst b/docs/cudf/source/user_guide/api_docs/pylibcudf/quantiles.rst new file mode 100644 index 00000000000..3417c1ff59d --- /dev/null +++ b/docs/cudf/source/user_guide/api_docs/pylibcudf/quantiles.rst @@ -0,0 +1,6 @@ +========= +quantiles +========= + +.. automodule:: cudf._lib.pylibcudf.quantiles + :members: diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index ac592cedaac..f6f9cfa9a7c 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -26,6 +26,7 @@ from libc.stdint cimport uint8_t from libcpp cimport bool from libcpp.map cimport map from libcpp.memory cimport make_unique, unique_ptr +from libcpp.pair cimport pair from libcpp.string cimport string from libcpp.unordered_map cimport unordered_map from libcpp.utility cimport move @@ -44,6 +45,7 @@ from cudf._lib.io.utils cimport ( ) from cudf._lib.pylibcudf.libcudf.expressions cimport expression from cudf._lib.pylibcudf.libcudf.io.parquet cimport ( + chunked_parquet_reader as cpp_chunked_parquet_reader, chunked_parquet_writer_options, merge_row_group_metadata as parquet_merge_metadata, parquet_chunked_writer as cpp_parquet_chunked_writer, @@ -60,6 +62,7 @@ from cudf._lib.pylibcudf.libcudf.io.parquet_metadata cimport ( from cudf._lib.pylibcudf.libcudf.io.types cimport ( column_in_metadata, table_input_metadata, + table_metadata, ) from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type @@ -126,50 +129,22 @@ def _parse_metadata(meta): return file_is_range_index, file_index_cols, file_column_dtype -cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, - use_pandas_metadata=True, - Expression filters=None): - """ - Cython function to call into libcudf API, see `read_parquet`. - - filters, if not None, should be an Expression that evaluates to a - boolean predicate as a function of columns being read. - - See Also - -------- - cudf.io.parquet.read_parquet - cudf.io.parquet.to_parquet - """ - - # Convert NativeFile buffers to NativeFileDatasource, - # but save original buffers in case we need to use - # pyarrow for metadata processing - # (See: https://github.com/rapidsai/cudf/issues/9599) - pa_buffers = [] - for i, datasource in enumerate(filepaths_or_buffers): - if isinstance(datasource, NativeFile): - pa_buffers.append(datasource) - filepaths_or_buffers[i] = NativeFileDatasource(datasource) +cdef pair[parquet_reader_options, bool] _setup_parquet_reader_options( + cudf_io_types.source_info source, + vector[vector[size_type]] row_groups, + bool use_pandas_metadata, + Expression filters, + object columns): - cdef cudf_io_types.source_info source = make_source_info( - filepaths_or_buffers) - - cdef bool cpp_use_pandas_metadata = use_pandas_metadata - - cdef vector[vector[size_type]] cpp_row_groups + cdef parquet_reader_options args + cdef parquet_reader_options_builder builder cdef data_type cpp_timestamp_type = cudf_types.data_type( cudf_types.type_id.EMPTY ) - if row_groups is not None: - cpp_row_groups = row_groups - - # Setup parquet reader arguments - cdef parquet_reader_options args - cdef parquet_reader_options_builder builder builder = ( parquet_reader_options.builder(source) - .row_groups(cpp_row_groups) - .use_pandas_metadata(cpp_use_pandas_metadata) + .row_groups(row_groups) + .use_pandas_metadata(use_pandas_metadata) .use_arrow_schema(True) .timestamp_type(cpp_timestamp_type) ) @@ -185,28 +160,28 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, for col in columns: cpp_columns.push_back(str(col).encode()) args.set_columns(cpp_columns) - # Filters don't handle the range index correctly allow_range_index &= filters is None - # Read Parquet - cdef cudf_io_types.table_with_metadata c_result - - with nogil: - c_result = move(parquet_reader(args)) - - names = [info.name.decode() for info in c_result.metadata.schema_info] - - # Access the Parquet per_file_user_data to find the index + return pair[parquet_reader_options, bool](args, allow_range_index) + +cdef object _process_metadata(object df, + table_metadata table_meta, + list names, + object row_groups, + object filepaths_or_buffers, + list pa_buffers, + bool allow_range_index, + bool use_pandas_metadata): + update_struct_field_names(df, table_meta.schema_info) index_col = None - cdef vector[unordered_map[string, string]] per_file_user_data = \ - c_result.metadata.per_file_user_data - + is_range_index = True column_index_type = None index_col_names = None - is_range_index = True + meta = None + cdef vector[unordered_map[string, string]] per_file_user_data = \ + table_meta.per_file_user_data for single_file in per_file_user_data: json_str = single_file[b'pandas'].decode('utf-8') - meta = None if json_str != "": meta = json.loads(json_str) file_is_range_index, index_col, column_index_type = _parse_metadata(meta) @@ -220,13 +195,6 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, if c['field_name'] == idx_col: index_col_names[idx_col] = c['name'] - df = cudf.DataFrame._from_data(*data_from_unique_ptr( - move(c_result.tbl), - column_names=names - )) - - update_struct_field_names(df, c_result.metadata.schema_info) - if meta is not None: # Book keep each column metadata as the order # of `meta["columns"]` and `column_names` are not @@ -319,9 +287,65 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, if use_pandas_metadata: df.index.names = index_col - # Set column dtype for empty types. if len(df._data.names) == 0 and column_index_type is not None: df._data.label_dtype = cudf.dtype(column_index_type) + + return df + + +cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, + use_pandas_metadata=True, + Expression filters=None): + """ + Cython function to call into libcudf API, see `read_parquet`. + + filters, if not None, should be an Expression that evaluates to a + boolean predicate as a function of columns being read. + + See Also + -------- + cudf.io.parquet.read_parquet + cudf.io.parquet.to_parquet + """ + + # Convert NativeFile buffers to NativeFileDatasource, + # but save original buffers in case we need to use + # pyarrow for metadata processing + # (See: https://github.com/rapidsai/cudf/issues/9599) + pa_buffers = [] + for i, datasource in enumerate(filepaths_or_buffers): + if isinstance(datasource, NativeFile): + pa_buffers.append(datasource) + filepaths_or_buffers[i] = NativeFileDatasource(datasource) + + cdef cudf_io_types.source_info source = make_source_info( + filepaths_or_buffers) + + cdef vector[vector[size_type]] cpp_row_groups + if row_groups is not None: + cpp_row_groups = row_groups + + # Setup parquet reader arguments + cdef parquet_reader_options args + cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( + source, cpp_row_groups, use_pandas_metadata, filters, columns) + args, allow_range_index = c_res.first, c_res.second + + # Read Parquet + cdef cudf_io_types.table_with_metadata c_result + + with nogil: + c_result = move(parquet_reader(args)) + + names = [info.name.decode() for info in c_result.metadata.schema_info] + + df = cudf.DataFrame._from_data(*data_from_unique_ptr( + move(c_result.tbl), + column_names=names + )) + df = _process_metadata(df, c_result.metadata, names, row_groups, + filepaths_or_buffers, pa_buffers, + allow_range_index, use_pandas_metadata) return df cpdef read_parquet_metadata(filepaths_or_buffers): @@ -767,6 +791,102 @@ cdef class ParquetWriter: self.initialized = True +cdef class ParquetReader: + cdef bool initialized + cdef unique_ptr[cpp_chunked_parquet_reader] reader + cdef size_t chunk_read_limit + cdef size_t pass_read_limit + cdef size_t row_group_size_bytes + cdef table_metadata result_meta + cdef vector[unordered_map[string, string]] per_file_user_data + cdef object pandas_meta + cdef list pa_buffers + cdef bool allow_range_index + cdef object row_groups + cdef object filepaths_or_buffers + cdef object names + cdef object column_index_type + cdef object index_col_names + cdef bool is_range_index + cdef object index_col + cdef bool cpp_use_pandas_metadata + + def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None, + use_pandas_metadata=True, + size_t chunk_read_limit=0, + size_t pass_read_limit=1024000000): + + # Convert NativeFile buffers to NativeFileDatasource, + # but save original buffers in case we need to use + # pyarrow for metadata processing + # (See: https://github.com/rapidsai/cudf/issues/9599) + + pa_buffers = [] + for i, datasource in enumerate(filepaths_or_buffers): + if isinstance(datasource, NativeFile): + pa_buffers.append(datasource) + filepaths_or_buffers[i] = NativeFileDatasource(datasource) + self.pa_buffers = pa_buffers + cdef cudf_io_types.source_info source = make_source_info( + filepaths_or_buffers) + + self.cpp_use_pandas_metadata = use_pandas_metadata + + cdef vector[vector[size_type]] cpp_row_groups + if row_groups is not None: + cpp_row_groups = row_groups + cdef parquet_reader_options args + cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options( + source, cpp_row_groups, use_pandas_metadata, None, columns) + args, self.allow_range_index = c_res.first, c_res.second + + with nogil: + self.reader.reset( + new cpp_chunked_parquet_reader( + chunk_read_limit, + pass_read_limit, + args + ) + ) + self.initialized = False + self.row_groups = row_groups + self.filepaths_or_buffers = filepaths_or_buffers + + def _has_next(self): + cdef bool res + with nogil: + res = self.reader.get()[0].has_next() + return res + + def _read_chunk(self): + # Read Parquet + cdef cudf_io_types.table_with_metadata c_result + + with nogil: + c_result = move(self.reader.get()[0].read_chunk()) + + if not self.initialized: + self.names = [info.name.decode() for info in c_result.metadata.schema_info] + self.result_meta = c_result.metadata + + df = cudf.DataFrame._from_data(*data_from_unique_ptr( + move(c_result.tbl), + column_names=self.names, + )) + + self.initialized = True + return df + + def read(self): + dfs = [] + while self._has_next(): + dfs.append(self._read_chunk()) + df = cudf.concat(dfs) + df = _process_metadata(df, self.result_meta, self.names, self.row_groups, + self.filepaths_or_buffers, self.pa_buffers, + self.allow_range_index, self.cpp_use_pandas_metadata) + return df + cpdef merge_filemetadata(object filemetadata_list): """ Cython function to call into libcudf API, see `merge_row_group_metadata`. diff --git a/python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt b/python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt index 6beb7b0f506..ed396208f98 100644 --- a/python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt +++ b/python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt @@ -26,6 +26,7 @@ set(cython_sources join.pyx lists.pyx merge.pyx + quantiles.pyx reduce.pyx replace.pyx reshape.pyx diff --git a/python/cudf/cudf/_lib/pylibcudf/__init__.pxd b/python/cudf/cudf/_lib/pylibcudf/__init__.pxd index b289d112a90..a628ecdb038 100644 --- a/python/cudf/cudf/_lib/pylibcudf/__init__.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/__init__.pxd @@ -12,6 +12,7 @@ from . cimport ( join, lists, merge, + quantiles, reduce, replace, reshape, @@ -48,6 +49,7 @@ __all__ = [ "join", "lists", "merge", + "quantiles", "reduce", "replace", "rolling", diff --git a/python/cudf/cudf/_lib/pylibcudf/__init__.py b/python/cudf/cudf/_lib/pylibcudf/__init__.py index 2565332f3ed..46d0fe13cd1 100644 --- a/python/cudf/cudf/_lib/pylibcudf/__init__.py +++ b/python/cudf/cudf/_lib/pylibcudf/__init__.py @@ -12,6 +12,7 @@ join, lists, merge, + quantiles, reduce, replace, reshape, @@ -48,6 +49,7 @@ "join", "lists", "merge", + "quantiles", "reduce", "replace", "rolling", diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index 33a594b432f..fb98650308a 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -283,6 +283,18 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: vector[string] column_chunks_file_paths, ) except + + cdef cppclass chunked_parquet_reader: + chunked_parquet_reader() except + + chunked_parquet_reader( + size_t chunk_read_limit, + const parquet_reader_options& options) except + + chunked_parquet_reader( + size_t chunk_read_limit, + size_t pass_read_limit, + const parquet_reader_options& options) except + + bool has_next() except + + cudf_io_types.table_with_metadata read_chunk() except + + cdef unique_ptr[vector[uint8_t]] merge_row_group_metadata( const vector[unique_ptr[vector[uint8_t]]]& metadata_list ) except + diff --git a/python/cudf/cudf/_lib/pylibcudf/quantiles.pxd b/python/cudf/cudf/_lib/pylibcudf/quantiles.pxd new file mode 100644 index 00000000000..70ff135ca77 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/quantiles.pxd @@ -0,0 +1,25 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +from libcpp.vector cimport vector + +from cudf._lib.pylibcudf.libcudf.types cimport interpolation, sorted + +from .column cimport Column +from .table cimport Table + + +cpdef Column quantile( + Column input, + vector[double] q, + interpolation interp = *, + Column ordered_indices = *, + bint exact = * +) + +cpdef Table quantiles( + Table input, + vector[double] q, + interpolation interp = *, + sorted is_input_sorted = *, + list column_order = *, + list null_precedence = *, +) diff --git a/python/cudf/cudf/_lib/pylibcudf/quantiles.pyx b/python/cudf/cudf/_lib/pylibcudf/quantiles.pyx new file mode 100644 index 00000000000..c1f0e30ccd3 --- /dev/null +++ b/python/cudf/cudf/_lib/pylibcudf/quantiles.pyx @@ -0,0 +1,152 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libcpp cimport bool +from libcpp.memory cimport unique_ptr +from libcpp.utility cimport move +from libcpp.vector cimport vector + +from cudf._lib.pylibcudf.libcudf.column.column cimport column +from cudf._lib.pylibcudf.libcudf.column.column_view cimport column_view +from cudf._lib.pylibcudf.libcudf.quantiles cimport ( + quantile as cpp_quantile, + quantiles as cpp_quantiles, +) +from cudf._lib.pylibcudf.libcudf.table.table cimport table +from cudf._lib.pylibcudf.libcudf.types cimport null_order, order, sorted + +from .column cimport Column +from .table cimport Table +from .types cimport interpolation + + +cpdef Column quantile( + Column input, + vector[double] q, + interpolation interp = interpolation.LINEAR, + Column ordered_indices = None, + bool exact=True +): + """Computes quantiles with interpolation. + + Computes the specified quantiles by interpolating values between which they lie, + using the interpolation strategy specified in interp. + + Parameters + ---------- + input: Column + The Column to calculate quantiles on. + q: array-like that implements buffer-protocol + The quantiles to calculate in range [0,1] + interp: Interpolation, default Interpolation.LINEAR + The strategy used to select between values adjacent to a specified quantile. + ordered_indices: Column, default empty column + The column containing the sorted order of input. + + If empty, all input values are used in existing order. + Indices must be in range [0, input.size()), but are not required to be unique. + Values not indexed by this column will be ignored. + exact: bool, default True + Returns doubles if True. Otherwise, returns same type as input + + For details, see :cpp:func:`quantile`. + + Returns + ------- + Column + A Column containing specified quantiles, with nulls for indeterminable values + """ + cdef: + unique_ptr[column] c_result + column_view ordered_indices_view + + if ordered_indices is None: + ordered_indices_view = column_view() + else: + ordered_indices_view = ordered_indices.view() + + with nogil: + c_result = move( + cpp_quantile( + input.view(), + q, + interp, + ordered_indices_view, + exact, + ) + ) + + return Column.from_libcudf(move(c_result)) + + +cpdef Table quantiles( + Table input, + vector[double] q, + interpolation interp = interpolation.NEAREST, + sorted is_input_sorted = sorted.NO, + list column_order = None, + list null_precedence = None, +): + """Computes row quantiles with interpolation. + + Computes the specified quantiles by retrieving the row corresponding to the + specified quantiles. In the event a quantile lies in between rows, the specified + interpolation strategy is used to pick between the rows. + + Parameters + ---------- + input: Table + The Table to calculate row quantiles on. + q: array-like + The quantiles to calculate in range [0,1] + interp: Interpolation, default Interpolation.NEAREST + The strategy used to select between values adjacent to a specified quantile. + + Must be a non-arithmetic interpolation strategy + (i.e. one of + {`Interpolation.HIGHER`, `Interpolation.LOWER`, `Interpolation.NEAREST`}) + is_input_sorted: Sorted, default Sorted.NO + Whether the input table has been pre-sorted or not. + column_order: list, default None + A list of `Order` enums, + indicating the desired sort order for each column. + By default, will sort all columns so that they are in ascending order. + + Ignored if `is_input_sorted` is `Sorted.YES` + null_precedence: list, default None + A list of `NullOrder` enums, + indicating how nulls should be sorted. + By default, will sort all columns so that nulls appear before + all other elements. + + Ignored if `is_input_sorted` is `Sorted.YES` + + For details, see :cpp:func:`quantiles`. + + Returns + ------- + Column + A Column containing specified quantiles, with nulls for indeterminable values + """ + cdef: + unique_ptr[table] c_result + vector[order] column_order_vec + vector[null_order] null_precedence_vec + + if column_order is not None: + column_order_vec = column_order + if null_precedence is not None: + null_precedence_vec = null_precedence + + with nogil: + c_result = move( + cpp_quantiles( + input.view(), + q, + interp, + is_input_sorted, + column_order_vec, + null_precedence_vec, + ) + ) + + return Table.from_libcudf(move(c_result)) diff --git a/python/cudf/cudf/_lib/quantiles.pyx b/python/cudf/cudf/_lib/quantiles.pyx index 3d20454a7ce..7b50c00919a 100644 --- a/python/cudf/cudf/_lib/quantiles.pyx +++ b/python/cudf/cudf/_lib/quantiles.pyx @@ -3,76 +3,43 @@ from cudf.core.buffer import acquire_spill_lock from libcpp cimport bool -from libcpp.memory cimport unique_ptr -from libcpp.utility cimport move from libcpp.vector cimport vector from cudf._lib.column cimport Column from cudf._lib.types cimport ( underlying_type_t_interpolation, - underlying_type_t_null_order, - underlying_type_t_order, underlying_type_t_sorted, ) from cudf._lib.types import Interpolation -from cudf._lib.pylibcudf.libcudf.column.column cimport column -from cudf._lib.pylibcudf.libcudf.column.column_view cimport column_view -from cudf._lib.pylibcudf.libcudf.quantiles cimport ( - quantile as cpp_quantile, - quantiles as cpp_quantile_table, -) -from cudf._lib.pylibcudf.libcudf.table.table cimport table -from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view -from cudf._lib.pylibcudf.libcudf.types cimport ( - interpolation, - null_order, - order, - sorted, -) -from cudf._lib.utils cimport columns_from_unique_ptr, table_view_from_columns +from cudf._lib.pylibcudf.libcudf.types cimport interpolation, sorted +from cudf._lib.utils cimport columns_from_pylibcudf_table + +import cudf._lib.pylibcudf as plc @acquire_spill_lock() def quantile( Column input, - object q, + vector[double] q, str interp, Column ordered_indices, bool exact, - ): - cdef column_view c_input = input.view() - cdef column_view c_ordered_indices = ( - column_view() if ordered_indices is None - else ordered_indices.view() - ) cdef interpolation c_interp = ( Interpolation[interp.upper()] ) - cdef bool c_exact = exact - - cdef vector[double] c_q - c_q.reserve(len(q)) - - for value in q: - c_q.push_back(value) - cdef unique_ptr[column] c_result - - with nogil: - c_result = move( - cpp_quantile( - c_input, - c_q, - c_interp, - c_ordered_indices, - c_exact, - ) + return Column.from_pylibcudf( + plc.quantiles.quantile( + input.to_pylibcudf(mode="read"), + q, + c_interp, + ordered_indices.to_pylibcudf(mode="read"), + exact ) - - return Column.from_unique_ptr(move(c_result)) + ) def quantile_table( @@ -83,42 +50,23 @@ def quantile_table( list column_order, list null_precedence, ): - cdef table_view c_input = table_view_from_columns(source_columns) - cdef vector[double] c_q = q + cdef interpolation c_interp = ( interp ) cdef sorted c_is_input_sorted = ( is_input_sorted ) - cdef vector[order] c_column_order - cdef vector[null_order] c_null_precedence - - c_column_order.reserve(len(column_order)) - c_null_precedence.reserve(len(null_precedence)) - - for value in column_order: - c_column_order.push_back( - ( value) - ) - for value in null_precedence: - c_null_precedence.push_back( - ( value) + return columns_from_pylibcudf_table( + plc.quantiles.quantiles( + plc.Table([ + c.to_pylibcudf(mode="read") for c in source_columns + ]), + q, + c_interp, + c_is_input_sorted, + column_order, + null_precedence ) - - cdef unique_ptr[table] c_result - - with nogil: - c_result = move( - cpp_quantile_table( - c_input, - c_q, - c_interp, - c_is_input_sorted, - c_column_order, - c_null_precedence, - ) - ) - - return columns_from_unique_ptr(move(c_result)) + ) diff --git a/python/cudf/cudf/pylibcudf_tests/conftest.py b/python/cudf/cudf/pylibcudf_tests/conftest.py index 6d8284fb3db..f3c6584ef8c 100644 --- a/python/cudf/cudf/pylibcudf_tests/conftest.py +++ b/python/cudf/cudf/pylibcudf_tests/conftest.py @@ -7,6 +7,8 @@ import pyarrow as pa import pytest +import cudf._lib.pylibcudf as plc + sys.path.insert(0, os.path.join(os.path.dirname(__file__), "common")) from utils import DEFAULT_STRUCT_TESTING_TYPE @@ -29,3 +31,30 @@ ) def pa_type(request): return request.param + + +@pytest.fixture( + scope="session", + params=[ + pa.int64(), + pa.float64(), + pa.uint64(), + ], +) +def numeric_pa_type(request): + return request.param + + +@pytest.fixture( + scope="session", params=[opt for opt in plc.types.Interpolation] +) +def interp_opt(request): + return request.param + + +@pytest.fixture( + scope="session", + params=[opt for opt in plc.types.Sorted], +) +def sorted_opt(request): + return request.param diff --git a/python/cudf/cudf/pylibcudf_tests/test_quantiles.py b/python/cudf/cudf/pylibcudf_tests/test_quantiles.py new file mode 100644 index 00000000000..a5d332a7795 --- /dev/null +++ b/python/cudf/cudf/pylibcudf_tests/test_quantiles.py @@ -0,0 +1,234 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +import numpy as np +import pyarrow as pa +import pyarrow.compute as pc +import pytest +from utils import assert_column_eq, assert_table_eq + +import cudf._lib.pylibcudf as plc + +# Map pylibcudf interpolation options to pyarrow options +interp_mapping = { + plc.types.Interpolation.LINEAR: "linear", + plc.types.Interpolation.LOWER: "lower", + plc.types.Interpolation.HIGHER: "higher", + plc.types.Interpolation.MIDPOINT: "midpoint", + plc.types.Interpolation.NEAREST: "nearest", +} + + +@pytest.fixture(scope="module", params=[[1, 2, 3, 4, 5], [5, 4, 3, 2, 1]]) +def pa_col_data(request, numeric_pa_type): + return pa.array(request.param, type=numeric_pa_type) + + +@pytest.fixture(scope="module") +def plc_col_data(pa_col_data): + return plc.interop.from_arrow(pa_col_data) + + +@pytest.fixture( + scope="module", + params=[ + { + "arrays": [[1, 2, 3, 5, 4], [5.0, 6.0, 8.0, 7.0, 9.0]], + "schema": pa.schema( + [ + ("a", pa.int64()), + ("b", pa.int64()), + ] + ), + }, + { + "arrays": [ + [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + [1, 2.0, 2.2, 2.3, 2.4, None, None, 3.5, 4.5, 5.5], + ], + "schema": pa.schema( + [ + ("a", pa.int64()), + ("b", pa.float64()), + ] + ), + }, + ], +) +def plc_tbl_data(request): + return plc.interop.from_arrow(pa.Table.from_arrays(**request.param)) + + +@pytest.mark.parametrize("q", [[], [0], [0.5], [0.1, 0.5, 0.7, 0.9]]) +@pytest.mark.parametrize("exact", [True, False]) +def test_quantile(pa_col_data, plc_col_data, interp_opt, q, exact): + ordered_indices = plc.interop.from_arrow( + pc.cast(pc.sort_indices(pa_col_data), pa.int32()) + ) + res = plc.quantiles.quantile( + plc_col_data, q, interp_opt, ordered_indices, exact + ) + + pa_interp_opt = interp_mapping[interp_opt] + + if exact: + pa_col_data = pc.cast(pa_col_data, pa.float64()) + + if len(q) > 0: + # pyarrow quantile doesn't support empty q + exp = pc.quantile(pa_col_data, q=q, interpolation=pa_interp_opt) + else: + exp = pa.array([], type=pa.float64()) + + if not exact: + exp = pc.cast(exp, pa_col_data.type, safe=False) + + assert_column_eq(exp, res) + + +def _pyarrow_quantiles( + pa_tbl_data, + q, + interp_opt=plc.types.Interpolation.NEAREST, + sorted_opt=plc.types.Sorted.NO, + column_order=None, + null_precedence=None, +): + """ + The pyarrow equivalent of plc.quantiles.quantiles + + Takes the same arguments (except input should be a pyarrow table instead of + of a pylibcudf table) + + NOTE: This function doesn't support having different null precedences because of + a lack of support in pyarrow. + """ + if len(q) > 0: + # pyarrow quantile doesn't support empty q + pa_interp_opt = interp_mapping[interp_opt] + + if sorted_opt == plc.types.Sorted.NO: + order_mapper = { + plc.types.Order.ASCENDING: "ascending", + plc.types.Order.DESCENDING: "descending", + } + if null_precedence is None: + null_precedence = [plc.types.NullOrder.BEFORE] * len( + pa_tbl_data.columns + ) + if column_order is None: + column_order = [plc.types.Order.ASCENDING] * len( + pa_tbl_data.columns + ) + + if not all( + [ + null_prec == null_precedence[0] + for null_prec in null_precedence + ] + ): + raise NotImplementedError( + "Having varying null precendences is not implemented!" + ) + + pa_tbl_data = pa_tbl_data.sort_by( + [ + (name, order_mapper[order]) + for name, order in zip( + pa_tbl_data.column_names, column_order + ) + ], + null_placement="at_start" + if null_precedence[0] == plc.types.NullOrder.BEFORE + else "at_end", + ) + row_idxs = pc.quantile( + np.arange(0, len(pa_tbl_data)), q=q, interpolation=pa_interp_opt + ) + exp = pa_tbl_data.take(row_idxs) + else: + exp = pa.Table.from_arrays( + [[] for _ in range(len(pa_tbl_data.schema))], + schema=pa_tbl_data.schema, + ) + return exp + + +@pytest.mark.parametrize( + "q", [[], [0.1], [0.2], [0.3], [0.4], [0.5], [0.1, 0.5, 0.7, 0.9]] +) +@pytest.mark.parametrize( + "column_order", [[plc.types.Order.ASCENDING, plc.types.Order.ASCENDING]] +) +@pytest.mark.parametrize( + "null_precedence", + [ + [plc.types.NullOrder.BEFORE, plc.types.NullOrder.BEFORE], + [plc.types.NullOrder.AFTER, plc.types.NullOrder.AFTER], + ], +) +def test_quantiles( + plc_tbl_data, interp_opt, q, sorted_opt, column_order, null_precedence +): + if interp_opt in { + plc.types.Interpolation.LINEAR, + plc.types.Interpolation.MIDPOINT, + }: + pytest.skip( + "interp cannot be an arithmetic interpolation strategy for quantiles" + ) + + pa_tbl_data = plc.interop.to_arrow(plc_tbl_data, ["a", "b"]) + + exp = _pyarrow_quantiles( + pa_tbl_data, + q=q, + interp_opt=interp_opt, + sorted_opt=sorted_opt, + column_order=column_order, + null_precedence=null_precedence, + ) + + res = plc.quantiles.quantiles( + plc_tbl_data, q, interp_opt, sorted_opt, column_order, null_precedence + ) + + assert_table_eq(exp, res) + + +@pytest.mark.parametrize( + "invalid_interp", + [plc.types.Interpolation.LINEAR, plc.types.Interpolation.MIDPOINT], +) +def test_quantiles_invalid_interp(plc_tbl_data, invalid_interp): + with pytest.raises(ValueError): + plc.quantiles.quantiles( + plc_tbl_data, q=np.array([0.1]), interp=invalid_interp + ) + + +@pytest.mark.parametrize( + "q", + [[0.1], (0.1,), np.array([0.1])], +) +def test_quantile_q_array_like(pa_col_data, plc_col_data, q): + ordered_indices = plc.interop.from_arrow( + pc.cast(pc.sort_indices(pa_col_data), pa.int32()) + ) + res = plc.quantiles.quantile( + plc_col_data, + q=q, + ordered_indices=ordered_indices, + ) + exp = pc.quantile(pa_col_data, q=q) + assert_column_eq(exp, res) + + +@pytest.mark.parametrize( + "q", + [[0.1], (0.1,), np.array([0.1])], +) +def test_quantiles_q_array_like(plc_tbl_data, q): + res = plc.quantiles.quantiles(plc_tbl_data, q=q) + pa_tbl_data = plc.interop.to_arrow(plc_tbl_data, ["a", "b"]) + exp = _pyarrow_quantiles(pa_tbl_data, q=q) + assert_table_eq(exp, res) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index e32fdacd8d6..2596fe8cd37 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -22,6 +22,7 @@ from pyarrow import fs as pa_fs, parquet as pq import cudf +from cudf._lib.parquet import ParquetReader from cudf.io.parquet import ( ParquetDatasetWriter, ParquetWriter, @@ -3407,3 +3408,29 @@ def test_parquet_reader_roundtrip_structs_with_arrow_schema(): # Check results assert_eq(expected, got) + + +@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000]) +@pytest.mark.parametrize("use_pandas_metadata", [True, False]) +@pytest.mark.parametrize("row_groups", [[[0]], None, [[0, 1]]]) +def test_parquet_chunked_reader( + chunk_read_limit, pass_read_limit, use_pandas_metadata, row_groups +): + df = pd.DataFrame( + {"a": [1, 2, 3, 4] * 1000000, "b": ["av", "qw", "hi", "xyz"] * 1000000} + ) + buffer = BytesIO() + df.to_parquet(buffer) + reader = ParquetReader( + [buffer], + chunk_read_limit=chunk_read_limit, + pass_read_limit=pass_read_limit, + use_pandas_metadata=use_pandas_metadata, + row_groups=row_groups, + ) + expected = cudf.read_parquet( + buffer, use_pandas_metadata=use_pandas_metadata, row_groups=row_groups + ) + actual = reader.read() + assert_eq(expected, actual) diff --git a/python/cudf_polars/cudf_polars/__init__.py b/python/cudf_polars/cudf_polars/__init__.py index 74547fe2448..b19a282129a 100644 --- a/python/cudf_polars/cudf_polars/__init__.py +++ b/python/cudf_polars/cudf_polars/__init__.py @@ -10,4 +10,7 @@ from __future__ import annotations -__all__: list[str] = [] +from cudf_polars.callback import execute_with_cudf +from cudf_polars.dsl.translate import translate_ir + +__all__: list[str] = ["execute_with_cudf", "translate_ir"] diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index aabb8498ce2..979087d5273 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -16,6 +16,7 @@ import polars as pl from cudf_polars.dsl.ir import IR + from cudf_polars.typing import NodeTraverser __all__: list[str] = ["execute_with_cudf"] @@ -33,7 +34,7 @@ def _callback( return ir.evaluate(cache={}).to_polars() -def execute_with_cudf(nt, *, raise_on_fail: bool = False) -> None: +def execute_with_cudf(nt: NodeTraverser, *, raise_on_fail: bool = False) -> None: """ A post optimization callback that attempts to execute the plan with cudf. diff --git a/python/cudf_polars/cudf_polars/containers/__init__.py b/python/cudf_polars/cudf_polars/containers/__init__.py index ef9d9ca61b6..ee69e748eb5 100644 --- a/python/cudf_polars/cudf_polars/containers/__init__.py +++ b/python/cudf_polars/cudf_polars/containers/__init__.py @@ -5,8 +5,8 @@ from __future__ import annotations -__all__: list[str] = ["DataFrame", "Column", "Scalar"] +__all__: list[str] = ["DataFrame", "Column", "NamedColumn", "Scalar"] -from cudf_polars.containers.column import Column +from cudf_polars.containers.column import Column, NamedColumn from cudf_polars.containers.dataframe import DataFrame from cudf_polars.containers.scalar import Scalar diff --git a/python/cudf_polars/cudf_polars/containers/column.py b/python/cudf_polars/cudf_polars/containers/column.py index 49034b5f5c8..575d15d3ece 100644 --- a/python/cudf_polars/cudf_polars/containers/column.py +++ b/python/cudf_polars/cudf_polars/containers/column.py @@ -13,24 +13,29 @@ if TYPE_CHECKING: from typing_extensions import Self -__all__: list[str] = ["Column"] +__all__: list[str] = ["Column", "NamedColumn"] class Column: - """A column, a name, and sortedness.""" + """A column with sortedness metadata.""" obj: plc.Column - name: str is_sorted: plc.types.Sorted order: plc.types.Order null_order: plc.types.NullOrder - def __init__(self, column: plc.Column, name: str): + def __init__( + self, + column: plc.Column, + *, + is_sorted: plc.types.Sorted = plc.types.Sorted.NO, + order: plc.types.Order = plc.types.Order.ASCENDING, + null_order: plc.types.NullOrder = plc.types.NullOrder.BEFORE, + ): self.obj = column - self.name = name - self.is_sorted = plc.types.Sorted.NO - self.order = plc.types.Order.ASCENDING - self.null_order = plc.types.NullOrder.BEFORE + self.is_sorted = is_sorted + self.order = order + self.null_order = null_order def sorted_like(self, like: Column, /) -> Self: """ @@ -81,22 +86,20 @@ def set_sorted( self.null_order = null_order return self - def copy(self, *, new_name: str | None = None) -> Self: + def copy(self) -> Self: """ - Return a shallow copy of the column. - - Parameters - ---------- - new_name - Optional new name for the copied column. + A shallow copy of the column. Returns ------- New column sharing data with self. """ return type(self)( - self.obj, self.name if new_name is None else new_name - ).sorted_like(self) + self.obj, + is_sorted=self.is_sorted, + order=self.order, + null_order=self.null_order, + ) def mask_nans(self) -> Self: """Return a copy of self with nans masked out.""" @@ -117,3 +120,44 @@ def nan_count(self) -> int: plc.DataType(plc.TypeId.INT32), ) ).as_py() + + +class NamedColumn(Column): + """A column with a name.""" + + name: str + + def __init__( + self, + column: plc.Column, + name: str, + *, + is_sorted: plc.types.Sorted = plc.types.Sorted.NO, + order: plc.types.Order = plc.types.Order.ASCENDING, + null_order: plc.types.NullOrder = plc.types.NullOrder.BEFORE, + ) -> None: + super().__init__( + column, is_sorted=is_sorted, order=order, null_order=null_order + ) + self.name = name + + def copy(self, *, new_name: str | None = None) -> Self: + """ + A shallow copy of the column. + + Parameters + ---------- + new_name + Optional new name for the copied column. + + Returns + ------- + New column sharing data with self. + """ + return type(self)( + self.obj, + self.name if new_name is None else new_name, + is_sorted=self.is_sorted, + order=self.order, + null_order=self.null_order, + ) diff --git a/python/cudf_polars/cudf_polars/containers/dataframe.py b/python/cudf_polars/cudf_polars/containers/dataframe.py index de21a280020..ac7e748095e 100644 --- a/python/cudf_polars/cudf_polars/containers/dataframe.py +++ b/python/cudf_polars/cudf_polars/containers/dataframe.py @@ -6,22 +6,23 @@ from __future__ import annotations from functools import cached_property -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, cast import polars as pl import cudf._lib.pylibcudf as plc -from cudf_polars.containers.column import Column +from cudf_polars.containers.column import NamedColumn if TYPE_CHECKING: from collections.abc import Mapping, Sequence, Set + import pyarrow as pa from typing_extensions import Self import cudf - from cudf_polars.containers.scalar import Scalar + from cudf_polars.containers import Column __all__: list[str] = ["DataFrame"] @@ -30,33 +31,27 @@ class DataFrame: """A representation of a dataframe.""" - columns: list[Column] - scalars: list[Scalar] + columns: list[NamedColumn] table: plc.Table | None - def __init__(self, columns: Sequence[Column], scalars: Sequence[Scalar]) -> None: + def __init__(self, columns: Sequence[NamedColumn]) -> None: self.columns = list(columns) self._column_map = {c.name: c for c in self.columns} - self.scalars = list(scalars) - if len(scalars) == 0: - self.table = plc.Table([c.obj for c in columns]) - else: - self.table = None + self.table = plc.Table([c.obj for c in columns]) def copy(self) -> Self: """Return a shallow copy of self.""" - return type(self)(self.columns, self.scalars) + return type(self)(self.columns) def to_polars(self) -> pl.DataFrame: """Convert to a polars DataFrame.""" - assert len(self.scalars) == 0 - return pl.from_arrow( - plc.interop.to_arrow( - self.table, - [plc.interop.ColumnMetadata(name=c.name) for c in self.columns], - ) + table: pa.Table = plc.interop.to_arrow( + self.table, + [plc.interop.ColumnMetadata(name=c.name) for c in self.columns], ) + return cast(pl.DataFrame, pl.from_arrow(table)) + @cached_property def column_names_set(self) -> frozenset[str]: """Return the column names as a set.""" @@ -83,8 +78,10 @@ def num_rows(self) -> int: def from_cudf(cls, df: cudf.DataFrame) -> Self: """Create from a cudf dataframe.""" return cls( - [Column(c.to_pylibcudf(mode="read"), name) for name, c in df._data.items()], - [], + [ + NamedColumn(c.to_pylibcudf(mode="read"), name) + for name, c in df._data.items() + ] ) @classmethod @@ -105,13 +102,16 @@ def from_table(cls, table: plc.Table, names: Sequence[str]) -> Self: Raises ------ - ValueError if the number of provided names does not match the - number of columns in the table. + ValueError + If the number of provided names does not match the + number of columns in the table. """ - # TODO: strict=True when we drop py39 if table.num_columns() != len(names): raise ValueError("Mismatching name and table length.") - return cls([Column(c, name) for c, name in zip(table.columns(), names)], []) + return cls( + # TODO: strict=True when we drop py39 + [NamedColumn(c, name) for c, name in zip(table.columns(), names)] + ) def sorted_like( self, like: DataFrame, /, *, subset: Set[str] | None = None @@ -132,18 +132,20 @@ def sorted_like( Raises ------ - ValueError if there is a name mismatch between self and like. + ValueError + If there is a name mismatch between self and like. """ if like.column_names != self.column_names: raise ValueError("Can only copy from identically named frame") subset = self.column_names_set if subset is None else subset self.columns = [ c.sorted_like(other) if c.name in subset else c + # TODO: strict=True when we drop py39 for c, other in zip(self.columns, like.columns) ] return self - def with_columns(self, columns: Sequence[Column]) -> Self: + def with_columns(self, columns: Sequence[NamedColumn]) -> Self: """ Return a new dataframe with extra columns. @@ -160,35 +162,31 @@ def with_columns(self, columns: Sequence[Column]) -> Self: ----- If column names overlap, newer names replace older ones. """ - return type(self)([*self.columns, *columns], self.scalars) + return type(self)([*self.columns, *columns]) def discard_columns(self, names: Set[str]) -> Self: """Drop columns by name.""" - return type(self)( - [c for c in self.columns if c.name not in names], self.scalars - ) + return type(self)([c for c in self.columns if c.name not in names]) def select(self, names: Sequence[str]) -> Self: """Select columns by name returning DataFrame.""" want = set(names) if not want.issubset(self.column_names_set): raise ValueError("Can't select missing names") - return type(self)([self._column_map[name] for name in names], self.scalars) + return type(self)([self._column_map[name] for name in names]) - def replace_columns(self, *columns: Column) -> Self: + def replace_columns(self, *columns: NamedColumn) -> Self: """Return a new dataframe with columns replaced by name.""" new = {c.name: c for c in columns} if not set(new).issubset(self.column_names_set): raise ValueError("Cannot replace with non-existing names") - return type(self)([new.get(c.name, c) for c in self.columns], self.scalars) + return type(self)([new.get(c.name, c) for c in self.columns]) def rename_columns(self, mapping: Mapping[str, str]) -> Self: """Rename some columns.""" - return type(self)( - [c.copy(new_name=mapping.get(c.name)) for c in self.columns], self.scalars - ) + return type(self)([c.copy(new_name=mapping.get(c.name)) for c in self.columns]) - def select_columns(self, names: Set[str]) -> list[Column]: + def select_columns(self, names: Set[str]) -> list[NamedColumn]: """Select columns by name.""" return [c for c in self.columns if c.name in names] diff --git a/python/cudf_polars/cudf_polars/dsl/expr.py b/python/cudf_polars/cudf_polars/dsl/expr.py index 7187a36f21c..6d9435ce373 100644 --- a/python/cudf_polars/cudf_polars/dsl/expr.py +++ b/python/cudf_polars/cudf_polars/dsl/expr.py @@ -26,11 +26,11 @@ import cudf._lib.pylibcudf as plc -from cudf_polars.containers import Column, Scalar +from cudf_polars.containers import Column, NamedColumn, Scalar from cudf_polars.utils import sorting if TYPE_CHECKING: - from collections.abc import Sequence + from collections.abc import Mapping, Sequence import polars.type_aliases as pl_types @@ -110,7 +110,7 @@ def get_hash(self) -> int: """ return hash((type(self), self._ctor_arguments(self.children))) - def __hash__(self): + def __hash__(self) -> int: """Hash of an expression with caching.""" try: return self._hash_value @@ -139,18 +139,18 @@ def is_equal(self, other: Any) -> bool: other.children ) - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: """Equality of expressions.""" if type(self) != type(other) or hash(self) != hash(other): return False else: return self.is_equal(other) - def __ne__(self, other): + def __ne__(self, other: Any) -> bool: """Inequality of expressions.""" return not self.__eq__(other) - def __repr__(self): + def __repr__(self) -> str: """String representation of an expression with caching.""" try: return self._repr_value @@ -164,7 +164,7 @@ def do_evaluate( df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: # TODO: return type is a lie for Literal """ Evaluate this expression given a dataframe for context. @@ -185,15 +185,6 @@ def do_evaluate( Do not call this function directly, but rather :meth:`evaluate` which handles the mapping lookups. - The typed return value of :class:`Column` is not true when - evaluating :class:`Literal` nodes (which instead produce - :class:`Scalar` objects). However, these duck-type to having a - pylibcudf container object inside them, and usually they end - up appearing in binary expressions which pylibcudf handles - appropriately since there are overloads for (column, scalar) - pairs. We don't have to handle (scalar, scalar) in binops - since the polars optimizer has a constant-folding pass. - Returns ------- Column representing the evaluation of the expression (or maybe @@ -201,9 +192,10 @@ def do_evaluate( Raises ------ - NotImplementedError if we couldn't evaluate the expression. - Ideally all these are returned during translation to the IR, - but for now we are not perfect. + NotImplementedError + If we couldn't evaluate the expression. Ideally all these + are returned during translation to the IR, but for now we + are not perfect. """ raise NotImplementedError(f"Evaluation of {type(self).__name__}") @@ -212,7 +204,7 @@ def evaluate( df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: # TODO: return type is a lie for Literal """ Evaluate this expression given a dataframe for context. @@ -234,16 +226,26 @@ def evaluate( this method provides logic to handle lookups in the substitution mapping. + The typed return value of :class:`Column` is not true when + evaluating :class:`Literal` nodes (which instead produce + :class:`Scalar` objects). However, these duck-type to having a + pylibcudf container object inside them, and usually they end + up appearing in binary expressions which pylibcudf handles + appropriately since there are overloads for (column, scalar) + pairs. We don't have to handle (scalar, scalar) in binops + since the polars optimizer has a constant-folding pass. + Returns ------- Column representing the evaluation of the expression (or maybe - a scalar, annoying!). + a scalar). Raises ------ - NotImplementedError if we couldn't evaluate the expression. - Ideally all these are returned during translation to the IR, - but for now we are not perfect. + NotImplementedError + If we couldn't evaluate the expression. Ideally all these + are returned during translation to the IR, but for now we + are not perfect. """ if mapping is None: return self.do_evaluate(df, context=context, mapping=mapping) @@ -269,69 +271,106 @@ def collect_agg(self, *, depth: int) -> AggInfo: Raises ------ - NotImplementedError if we can't currently perform the - aggregation request (for example nested aggregations like - ``a.max().min()``). + NotImplementedError + If we can't currently perform the aggregation request, for + example nested aggregations like ``a.max().min()``. """ raise NotImplementedError( f"Collecting aggregation info for {type(self).__name__}" ) -class NamedExpr(Expr): - __slots__ = ("name", "children") - _non_child = ("dtype", "name") +class NamedExpr: + # NamedExpr does not inherit from Expr since it does not appear + # when evaluating expressions themselves, only when constructing + # named return values in dataframe (IR) nodes. + __slots__ = ("name", "value") + value: Expr + name: str - def __init__(self, dtype: plc.DataType, name: str, value: Expr) -> None: - super().__init__(dtype) + def __init__(self, name: str, value: Expr) -> None: self.name = name - self.children = (value,) + self.value = value + + def __hash__(self) -> int: + """Hash of the expression.""" + return hash((type(self), self.name, self.value)) + + def __repr__(self) -> str: + """Repr of the expression.""" + return f"NamedExpr({self.name}, {self.value}" + + def __eq__(self, other: Any) -> bool: + """Equality of two expressions.""" + return ( + type(self) is type(other) + and self.name == other.name + and self.value == other.value + ) - def do_evaluate( + def __ne__(self, other: Any) -> bool: + """Inequality of expressions.""" + return not self.__eq__(other) + + def evaluate( self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, - ) -> Column: + mapping: Mapping[Expr, Column] | None = None, + ) -> NamedColumn: """Evaluate this expression given a dataframe for context.""" - (child,) = self.children - return Column( - child.evaluate(df, context=context, mapping=mapping).obj, self.name - ) + obj = self.value.evaluate(df, context=context, mapping=mapping) + if isinstance(obj, Scalar): + return NamedColumn( + plc.Column.from_scalar(obj.obj, 1), + self.name, + is_sorted=plc.types.Sorted.YES, + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.BEFORE, + ) + else: + return NamedColumn( + obj.obj, + self.name, + is_sorted=obj.is_sorted, + order=obj.order, + null_order=obj.null_order, + ) def collect_agg(self, *, depth: int) -> AggInfo: """Collect information about aggregations in groupbys.""" - (value,) = self.children - return value.collect_agg(depth=depth) + return self.value.collect_agg(depth=depth) class Literal(Expr): __slots__ = ("value",) _non_child = ("dtype", "value") - value: pa.Scalar + value: pa.Scalar[Any] + children: tuple[()] - def __init__(self, dtype: plc.DataType, value: Any) -> None: + def __init__(self, dtype: plc.DataType, value: pa.Scalar[Any]) -> None: super().__init__(dtype) - self.value = pa.scalar(value) + assert value.type == plc.interop.to_arrow(dtype) + self.value = value def do_evaluate( self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" - # TODO: obey dtype - obj = plc.interop.from_arrow(self.value) - return Scalar(obj) # type: ignore + # datatype of pyarrow scalar is correct by construction. + return Scalar(plc.interop.from_arrow(self.value)) # type: ignore class Col(Expr): __slots__ = ("name",) _non_child = ("dtype", "name") name: str + children: tuple[()] def __init__(self, dtype: plc.DataType, name: str) -> None: self.dtype = dtype @@ -342,7 +381,7 @@ def do_evaluate( df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" return df._column_map[self.name] @@ -353,12 +392,14 @@ def collect_agg(self, *, depth: int) -> AggInfo: class Len(Expr): + children: tuple[()] + def do_evaluate( self, df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" # TODO: type is wrong, and dtype @@ -375,8 +416,15 @@ def collect_agg(self, *, depth: int) -> AggInfo: class BooleanFunction(Expr): __slots__ = ("name", "options", "children") _non_child = ("dtype", "name", "options") + children: tuple[Expr, ...] - def __init__(self, dtype: plc.DataType, name: str, options: tuple, *children: Expr): + def __init__( + self, + dtype: plc.DataType, + name: pl_expr.BooleanFunction, + options: tuple[Any, ...], + *children: Expr, + ) -> None: super().__init__(dtype) self.options = options self.name = name @@ -415,8 +463,7 @@ def _distinct( [source_value], indices, plc.Table([plc.Column.from_scalar(target_value, table.num_rows())]), - ).columns()[0], - column.name, + ).columns()[0] ) _BETWEEN_OPS: ClassVar[ @@ -448,7 +495,7 @@ def do_evaluate( df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" columns = [ @@ -467,18 +514,18 @@ def do_evaluate( ) if self.name == pl_expr.BooleanFunction.IsNull: (column,) = columns - return Column(plc.unary.is_null(column.obj), column.name) + return Column(plc.unary.is_null(column.obj)) elif self.name == pl_expr.BooleanFunction.IsNotNull: (column,) = columns - return Column(plc.unary.is_valid(column.obj), column.name) + return Column(plc.unary.is_valid(column.obj)) elif self.name == pl_expr.BooleanFunction.IsNan: # TODO: copy over null mask since is_nan(null) => null in polars (column,) = columns - return Column(plc.unary.is_nan(column.obj), column.name) + return Column(plc.unary.is_nan(column.obj)) elif self.name == pl_expr.BooleanFunction.IsNotNan: # TODO: copy over null mask since is_not_nan(null) => null in polars (column,) = columns - return Column(plc.unary.is_not_nan(column.obj), column.name) + return Column(plc.unary.is_not_nan(column.obj)) elif self.name == pl_expr.BooleanFunction.IsFirstDistinct: (column,) = columns return self._distinct( @@ -528,7 +575,6 @@ def do_evaluate( ), ) elif self.name == pl_expr.BooleanFunction.AllHorizontal: - name = columns[0].name if any(c.obj.null_count() > 0 for c in columns): raise NotImplementedError("Kleene logic for all_horizontal") return Column( @@ -539,11 +585,9 @@ def do_evaluate( output_type=self.dtype, ), (c.obj for c in columns), - ), - name, + ) ) elif self.name == pl_expr.BooleanFunction.AnyHorizontal: - name = columns[0].name if any(c.obj.null_count() > 0 for c in columns): raise NotImplementedError("Kleene logic for any_horizontal") return Column( @@ -554,8 +598,7 @@ def do_evaluate( output_type=self.dtype, ), (c.obj for c in columns), - ), - name, + ) ) elif self.name == pl_expr.BooleanFunction.IsBetween: column, lo, hi = columns @@ -571,8 +614,7 @@ def do_evaluate( ), plc.binaryop.BinaryOperator.LOGICAL_AND, self.dtype, - ), - column.name, + ) ) else: raise NotImplementedError(f"BooleanFunction {self.name}") @@ -581,14 +623,15 @@ def do_evaluate( class StringFunction(Expr): __slots__ = ("name", "options", "children") _non_child = ("dtype", "name", "options") + children: tuple[Expr, ...] def __init__( self, dtype: plc.DataType, name: pl_expr.StringFunction, - options: tuple, + options: tuple[Any, ...], *children: Expr, - ): + ) -> None: super().__init__(dtype) self.options = options self.name = name @@ -606,7 +649,7 @@ def do_evaluate( df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" columns = [ @@ -615,20 +658,16 @@ def do_evaluate( ] if self.name == pl_expr.StringFunction.Lowercase: (column,) = columns - return Column(plc.strings.case.to_lower(column.obj), column.name) + return Column(plc.strings.case.to_lower(column.obj)) elif self.name == pl_expr.StringFunction.Uppercase: (column,) = columns - return Column(plc.strings.case.to_upper(column.obj), column.name) + return Column(plc.strings.case.to_upper(column.obj)) elif self.name == pl_expr.StringFunction.EndsWith: column, suffix = columns - return Column( - plc.strings.find.ends_with(column.obj, suffix.obj), column.name - ) + return Column(plc.strings.find.ends_with(column.obj, suffix.obj)) elif self.name == pl_expr.StringFunction.StartsWith: column, suffix = columns - return Column( - plc.strings.find.starts_with(column.obj, suffix.obj), column.name - ) + return Column(plc.strings.find.starts_with(column.obj, suffix.obj)) else: raise NotImplementedError(f"StringFunction {self.name}") @@ -636,10 +675,11 @@ def do_evaluate( class Sort(Expr): __slots__ = ("options", "children") _non_child = ("dtype", "options") + children: tuple[Expr] def __init__( self, dtype: plc.DataType, options: tuple[bool, bool, bool], column: Expr - ): + ) -> None: super().__init__(dtype) self.options = options self.children = (column,) @@ -649,33 +689,37 @@ def do_evaluate( df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" (child,) = self.children column = child.evaluate(df, context=context, mapping=mapping) (stable, nulls_last, descending) = self.options order, null_order = sorting.sort_order( - [descending], nulls_last=nulls_last, num_keys=1 + [descending], nulls_last=[nulls_last], num_keys=1 ) do_sort = plc.sorting.stable_sort if stable else plc.sorting.sort table = do_sort(plc.Table([column.obj]), order, null_order) - return Column(table.columns()[0], column.name).set_sorted( - is_sorted=plc.types.Sorted.YES, order=order[0], null_order=null_order[0] + return Column( + table.columns()[0], + is_sorted=plc.types.Sorted.YES, + order=order[0], + null_order=null_order[0], ) class SortBy(Expr): __slots__ = ("options", "children") _non_child = ("dtype", "options") + children: tuple[Expr, ...] def __init__( self, dtype: plc.DataType, - options: tuple[bool, bool, tuple[bool]], + options: tuple[bool, tuple[bool], tuple[bool]], column: Expr, *by: Expr, - ): + ) -> None: super().__init__(dtype) self.options = options self.children = (column, *by) @@ -685,7 +729,7 @@ def do_evaluate( df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" column, *by = ( @@ -700,14 +744,15 @@ def do_evaluate( table = do_sort( plc.Table([column.obj]), plc.Table([c.obj for c in by]), order, null_order ) - return Column(table.columns()[0], column.name) + return Column(table.columns()[0]) class Gather(Expr): __slots__ = ("children",) _non_child = ("dtype",) + children: tuple[Expr, Expr] - def __init__(self, dtype: plc.DataType, values: Expr, indices: Expr): + def __init__(self, dtype: plc.DataType, values: Expr, indices: Expr) -> None: super().__init__(dtype) self.children = (values, indices) @@ -716,7 +761,7 @@ def do_evaluate( df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" values, indices = ( @@ -741,12 +786,13 @@ def do_evaluate( bounds_policy = plc.copying.OutOfBoundsPolicy.DONT_CHECK obj = indices.obj table = plc.copying.gather(plc.Table([values.obj]), obj, bounds_policy) - return Column(table.columns()[0], values.name) + return Column(table.columns()[0]) class Filter(Expr): __slots__ = ("children",) _non_child = ("dtype",) + children: tuple[Expr, Expr] def __init__(self, dtype: plc.DataType, values: Expr, indices: Expr): super().__init__(dtype) @@ -757,7 +803,7 @@ def do_evaluate( df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" values, mask = ( @@ -767,14 +813,15 @@ def do_evaluate( table = plc.stream_compaction.apply_boolean_mask( plc.Table([values.obj]), mask.obj ) - return Column(table.columns()[0], values.name).sorted_like(values) + return Column(table.columns()[0]).sorted_like(values) class RollingWindow(Expr): __slots__ = ("options", "children") _non_child = ("dtype", "options") + children: tuple[Expr] - def __init__(self, dtype: plc.DataType, options: Any, agg: Expr): + def __init__(self, dtype: plc.DataType, options: Any, agg: Expr) -> None: super().__init__(dtype) self.options = options self.children = (agg,) @@ -783,8 +830,9 @@ def __init__(self, dtype: plc.DataType, options: Any, agg: Expr): class GroupedRollingWindow(Expr): __slots__ = ("options", "children") _non_child = ("dtype", "options") + children: tuple[Expr, ...] - def __init__(self, dtype: plc.DataType, options: Any, agg: Expr, *by: Expr): + def __init__(self, dtype: plc.DataType, options: Any, agg: Expr, *by: Expr) -> None: super().__init__(dtype) self.options = options self.children = (agg, *by) @@ -793,8 +841,9 @@ def __init__(self, dtype: plc.DataType, options: Any, agg: Expr, *by: Expr): class Cast(Expr): __slots__ = ("children",) _non_child = ("dtype",) + children: tuple[Expr] - def __init__(self, dtype: plc.DataType, value: Expr): + def __init__(self, dtype: plc.DataType, value: Expr) -> None: super().__init__(dtype) self.children = (value,) @@ -803,14 +852,12 @@ def do_evaluate( df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" (child,) = self.children column = child.evaluate(df, context=context, mapping=mapping) - return Column(plc.unary.cast(column.obj, self.dtype), column.name).sorted_like( - column - ) + return Column(plc.unary.cast(column.obj, self.dtype)).sorted_like(column) def collect_agg(self, *, depth: int) -> AggInfo: """Collect information about aggregations in groupbys.""" @@ -822,6 +869,7 @@ def collect_agg(self, *, depth: int) -> AggInfo: class Agg(Expr): __slots__ = ("name", "options", "op", "request", "children") _non_child = ("dtype", "name", "options") + children: tuple[Expr] def __init__( self, dtype: plc.DataType, name: str, options: Any, value: Expr @@ -907,7 +955,9 @@ def _reduce( plc.reduce.reduce(column.obj, request, self.dtype), 1, ), - column.name, + is_sorted=plc.types.Sorted.YES, + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.BEFORE, ) def _count(self, column: Column) -> Column: @@ -921,7 +971,9 @@ def _count(self, column: Column) -> Column: ), 1, ), - column.name, + is_sorted=plc.types.Sorted.YES, + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.BEFORE, ) def _min(self, column: Column, *, propagate_nans: bool) -> Column: @@ -933,7 +985,9 @@ def _min(self, column: Column, *, propagate_nans: bool) -> Column: ), 1, ), - column.name, + is_sorted=plc.types.Sorted.YES, + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.BEFORE, ) if column.nan_count > 0: column = column.mask_nans() @@ -948,25 +1002,37 @@ def _max(self, column: Column, *, propagate_nans: bool) -> Column: ), 1, ), - column.name, + is_sorted=plc.types.Sorted.YES, + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.BEFORE, ) if column.nan_count > 0: column = column.mask_nans() return self._reduce(column, request=plc.aggregation.max()) def _first(self, column: Column) -> Column: - return Column(plc.copying.slice(column.obj, [0, 1])[0], column.name) + return Column( + plc.copying.slice(column.obj, [0, 1])[0], + is_sorted=plc.types.Sorted.YES, + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.BEFORE, + ) def _last(self, column: Column) -> Column: n = column.obj.size() - return Column(plc.copying.slice(column.obj, [n - 1, n])[0], column.name) + return Column( + plc.copying.slice(column.obj, [n - 1, n])[0], + is_sorted=plc.types.Sorted.YES, + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.BEFORE, + ) def do_evaluate( self, - df, + df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" if context is not ExecutionContext.FRAME: @@ -978,6 +1044,7 @@ def do_evaluate( class BinOp(Expr): __slots__ = ("op", "children") _non_child = ("dtype", "op") + children: tuple[Expr, Expr] def __init__( self, @@ -1018,7 +1085,7 @@ def do_evaluate( df: DataFrame, *, context: ExecutionContext = ExecutionContext.FRAME, - mapping: dict[Expr, Column] | None = None, + mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" left, right = ( @@ -1027,7 +1094,6 @@ def do_evaluate( ) return Column( plc.binaryop.binary_operation(left.obj, right.obj, self.op, self.dtype), - "what", ) def collect_agg(self, *, depth: int) -> AggInfo: diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index f8441b793b5..665bbe5be41 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -1,7 +1,5 @@ # SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. # SPDX-License-Identifier: Apache-2.0 -# TODO: remove need for this -# ruff: noqa: D101 """ DSL nodes for the LogicalPlan of polars. @@ -15,11 +13,11 @@ from __future__ import annotations +import dataclasses import itertools import types -from dataclasses import dataclass from functools import cache -from typing import TYPE_CHECKING, Any, Callable, ClassVar +from typing import TYPE_CHECKING, Any, Callable, ClassVar, NoReturn import pyarrow as pa from typing_extensions import assert_never @@ -30,12 +28,15 @@ import cudf._lib.pylibcudf as plc import cudf_polars.dsl.expr as expr -from cudf_polars.containers import Column, DataFrame +from cudf_polars.containers import DataFrame, NamedColumn from cudf_polars.utils import sorting if TYPE_CHECKING: + from collections.abc import MutableMapping from typing import Literal + from cudf_polars.typing import Schema + __all__ = [ "IR", @@ -59,14 +60,46 @@ ] -@dataclass(slots=True) +def broadcast( + *columns: NamedColumn, target_length: int | None = None +) -> list[NamedColumn]: + lengths = {column.obj.size() for column in columns} + if len(lengths - {1}) > 1: + raise RuntimeError("Mismatching column lengths") + if lengths == {1}: + if target_length is None: + return list(columns) + nrows = target_length + elif len(lengths) == 1: + if target_length is not None: + assert target_length in lengths + return list(columns) + else: + (nrows,) = lengths - {1} + if target_length is not None: + assert target_length == nrows + return [ + column + if column.obj.size() != 1 + else NamedColumn( + plc.Column.from_scalar(plc.copying.get_element(column.obj, 0), nrows), + column.name, + is_sorted=plc.types.Sorted.YES, + order=plc.types.Order.ASCENDING, + null_order=plc.types.NullOrder.BEFORE, + ) + for column in columns + ] + + +@dataclasses.dataclass(slots=True) class IR: """Abstract plan node, representing an unevaluated dataframe.""" - schema: dict[str, plc.DataType] + schema: Schema """Mapping from column names to their data types.""" - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """ Evaluate the node and return a dataframe. @@ -83,24 +116,25 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: Raises ------ - NotImplementedError if we couldn't evaluate things. Ideally - this should not occur, since the translation phase should pick - up things that we cannot handle. + NotImplementedError + If we couldn't evaluate things. Ideally this should not occur, + since the translation phase should pick up things that we + cannot handle. """ raise NotImplementedError -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class PythonScan(IR): """Representation of input from a python function.""" options: Any """Arbitrary options.""" - predicate: expr.Expr | None + predicate: expr.NamedExpr | None """Filter to apply to the constructed dataframe before returning it.""" -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class Scan(IR): """Input from files.""" @@ -117,17 +151,17 @@ class Scan(IR): - ``row_index: tuple[name, offset] | None``: Add an integer index column with given name. """ - predicate: expr.Expr | None + predicate: expr.NamedExpr | None """Mask to apply to the read dataframe.""" - def __post_init__(self): + def __post_init__(self) -> None: """Validate preconditions.""" if self.file_options.n_rows is not None: raise NotImplementedError("row limit in scan") if self.typ not in ("csv", "parquet"): raise NotImplementedError(f"Unhandled scan type: {self.typ}") - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" options = self.file_options with_columns = options.with_columns @@ -139,9 +173,9 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: ) ) elif self.typ == "parquet": - df = DataFrame.from_cudf( - cudf.read_parquet(self.paths, columns=with_columns) - ) + cdf = cudf.read_parquet(self.paths, columns=with_columns) + assert isinstance(cdf, cudf.DataFrame) + df = DataFrame.from_cudf(cdf) else: assert_never(self.typ) if row_index is not None: @@ -153,14 +187,14 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: init = plc.interop.from_arrow( pa.scalar(offset, type=plc.interop.to_arrow(dtype)) ) - index = Column( - plc.filling.sequence(df.num_rows, init, step), name - ).set_sorted( + index = NamedColumn( + plc.filling.sequence(df.num_rows, init, step), + name, is_sorted=plc.types.Sorted.YES, order=plc.types.Order.ASCENDING, null_order=plc.types.NullOrder.AFTER, ) - df = DataFrame([index, *df.columns], []) + df = DataFrame([index, *df.columns]) # TODO: should be true, but not the case until we get # cudf-classic out of the loop for IO since it converts date32 # to datetime. @@ -171,11 +205,11 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: if self.predicate is None: return df else: - mask = self.predicate.evaluate(df) + (mask,) = broadcast(self.predicate.evaluate(df), target_length=df.num_rows) return df.filter(mask) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class Cache(IR): """ Return a cached plan node. @@ -188,7 +222,7 @@ class Cache(IR): value: IR """The unevaluated node to cache.""" - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" try: return cache[self.key] @@ -196,7 +230,7 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: return cache.setdefault(self.key, self.value.evaluate(cache=cache)) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class DataFrameScan(IR): """ Input from an existing polars DataFrame. @@ -208,10 +242,10 @@ class DataFrameScan(IR): """Polars LazyFrame object.""" projection: list[str] """List of columns to project out.""" - predicate: expr.Expr | None + predicate: expr.NamedExpr | None """Mask to apply.""" - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" pdf = pl.DataFrame._from_pydf(self.df) if self.projection is not None: @@ -231,35 +265,30 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: c.obj.type() == dtype for c, dtype in zip(df.columns, self.schema.values()) ) if self.predicate is not None: - mask = self.predicate.evaluate(df) + (mask,) = broadcast(self.predicate.evaluate(df), target_length=df.num_rows) return df.filter(mask) else: return df -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class Select(IR): """Produce a new dataframe selecting given expressions from an input.""" df: IR """Input dataframe.""" - cse: list[expr.Expr] - """ - List of common subexpressions that will appear in the selected expressions. - - These must be evaluated before the returned expressions. - """ - expr: list[expr.Expr] + expr: list[expr.NamedExpr] """List of expressions to evaluate to form the new dataframe.""" - def evaluate(self, *, cache: dict[int, DataFrame]): + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" df = self.df.evaluate(cache=cache) - df = df.with_columns([e.evaluate(df) for e in self.cse]) - return DataFrame([e.evaluate(df) for e in self.expr], []) + # Handle any broadcasting + columns = broadcast(*(e.evaluate(df) for e in self.expr)) + return DataFrame(columns) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class Reduce(IR): """ Produce a new dataframe selecting given expressions from an input. @@ -269,16 +298,18 @@ class Reduce(IR): df: IR """Input dataframe.""" - expr: list[expr.Expr] + expr: list[expr.NamedExpr] """List of expressions to evaluate to form the new dataframe.""" - def evaluate(self, *, cache: dict[int, DataFrame]): + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" df = self.df.evaluate(cache=cache) - return DataFrame([e.evaluate(df) for e in self.expr], []) + columns = broadcast(*(e.evaluate(df) for e in self.expr)) + assert all(column.obj.size() == 1 for column in columns) + return DataFrame(columns) -def placeholder_column(n: int): +def placeholder_column(n: int) -> plc.Column: """ Produce a placeholder pylibcudf column with NO BACKING DATA. @@ -308,20 +339,21 @@ def placeholder_column(n: int): ) -@dataclass(slots=False) +@dataclasses.dataclass(slots=False) class GroupBy(IR): """Perform a groupby.""" df: IR """Input dataframe.""" - agg_requests: list[expr.Expr] + agg_requests: list[expr.NamedExpr] """List of expressions to evaluate groupwise.""" - keys: list[expr.Expr] + keys: list[expr.NamedExpr] """List of expressions forming the keys.""" maintain_order: bool """Should the order of the input dataframe be maintained?""" options: Any """Options controlling style of groupby.""" + agg_infos: list[expr.AggInfo] = dataclasses.field(init=False) @staticmethod def check_agg(agg: expr.Expr) -> int: @@ -339,9 +371,10 @@ def check_agg(agg: expr.Expr) -> int: Raises ------ - NotImplementedError for unsupported expression nodes. + NotImplementedError + For unsupported expression nodes. """ - if isinstance(agg, (expr.NamedExpr, expr.BinOp, expr.Cast)): + if isinstance(agg, (expr.BinOp, expr.Cast)): return max(GroupBy.check_agg(child) for child in agg.children) elif isinstance(agg, expr.Agg): if agg.name == "implode": @@ -352,20 +385,22 @@ def check_agg(agg: expr.Expr) -> int: else: raise NotImplementedError(f"No handler for {agg=}") - def __post_init__(self): + def __post_init__(self) -> None: """Check whether all the aggregations are implemented.""" if self.options.rolling is None and self.maintain_order: raise NotImplementedError("Maintaining order in groupby") if self.options.rolling: raise NotImplementedError("rolling window/groupby") - if any(GroupBy.check_agg(a) > 1 for a in self.agg_requests): + if any(GroupBy.check_agg(a.value) > 1 for a in self.agg_requests): raise NotImplementedError("Nested aggregations in groupby") self.agg_infos = [req.collect_agg(depth=0) for req in self.agg_requests] - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" df = self.df.evaluate(cache=cache) - keys = [k.evaluate(df) for k in self.keys] + keys = broadcast( + *(k.evaluate(df) for k in self.keys), target_length=df.num_rows + ) # TODO: use sorted information, need to expose column_order # and null_precedence in pylibcudf groupby constructor # sorted = ( @@ -379,7 +414,7 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: ) # TODO: uniquify requests = [] - replacements = [] + replacements: list[expr.Expr] = [] for info in self.agg_infos: for pre_eval, req, rep in info.requests: if pre_eval is None: @@ -389,20 +424,23 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: requests.append(plc.groupby.GroupByRequest(col, [req])) replacements.append(rep) group_keys, raw_tables = grouper.aggregate(requests) - raw_columns = [] + # TODO: names + raw_columns: list[NamedColumn] = [] for i, table in enumerate(raw_tables): (column,) = table.columns() - raw_columns.append(Column(column, f"column{i}")) + raw_columns.append(NamedColumn(column, f"tmp{i}")) mapping = dict(zip(replacements, raw_columns)) - result_keys = [Column(gk, k.name) for gk, k in zip(group_keys.columns(), keys)] - result_subs = DataFrame(raw_columns, []) + result_keys = [ + NamedColumn(gk, k.name) for gk, k in zip(group_keys.columns(), keys) + ] + result_subs = DataFrame(raw_columns) results = [ req.evaluate(result_subs, mapping=mapping) for req in self.agg_requests ] - return DataFrame([*result_keys, *results], []).slice(self.options.slice) + return DataFrame([*result_keys, *results]).slice(self.options.slice) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class Join(IR): """A join of two dataframes.""" @@ -410,9 +448,9 @@ class Join(IR): """Left frame.""" right: IR """Right frame.""" - left_on: list[expr.Expr] + left_on: list[expr.NamedExpr] """List of expressions used as keys in the left frame.""" - right_on: list[expr.Expr] + right_on: list[expr.NamedExpr] """List of expressions used as keys in the right frame.""" options: tuple[ Literal["inner", "left", "full", "leftsemi", "leftanti"], @@ -430,7 +468,7 @@ class Join(IR): - coalesce: should key columns be coalesced (only makes sense for outer joins) """ - def __post_init__(self): + def __post_init__(self) -> None: """Validate preconditions.""" if self.options[0] == "cross": raise NotImplementedError("cross join not implemented") @@ -475,12 +513,21 @@ def _joiners( else: assert_never(how) - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" left = self.left.evaluate(cache=cache) right = self.right.evaluate(cache=cache) - left_on = DataFrame([e.evaluate(left) for e in self.left_on], []) - right_on = DataFrame([e.evaluate(right) for e in self.right_on], []) + left_on = DataFrame( + broadcast( + *(e.evaluate(left) for e in self.left_on), target_length=left.num_rows + ) + ) + right_on = DataFrame( + broadcast( + *(e.evaluate(right) for e in self.right_on), + target_length=right.num_rows, + ) + ) how, join_nulls, zlice, suffix, coalesce = self.options null_equality = ( plc.types.NullEquality.EQUAL @@ -510,7 +557,7 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: if coalesce and how != "inner": left = left.replace_columns( *( - Column( + NamedColumn( plc.replace.replace_nulls(left_col.obj, right_col.obj), left_col.name, ) @@ -532,29 +579,27 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: return result.slice(zlice) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class HStack(IR): """Add new columns to a dataframe.""" df: IR """Input dataframe.""" - cse: list[expr.Expr] - """ - List of common subexpressions that will appear in the selected expressions. - - These must be evaluated before the returned expressions. - """ - columns: list[expr.Expr] + columns: list[expr.NamedExpr] """List of expressions to produce new columns.""" - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" df = self.df.evaluate(cache=cache) - ctx = df.copy().with_columns([e.evaluate(df) for e in self.cse]) - return df.with_columns([c.evaluate(ctx) for c in self.columns]) + columns = [c.evaluate(df) for c in self.columns] + # TODO: a bit of a hack, should inherit the should_broadcast + # property of polars' ProjectionOptions on the hstack node. + if not any(e.name.startswith("__POLARS_CSER_0x") for e in self.columns): + columns = broadcast(*columns, target_length=df.num_rows) + return df.with_columns(columns) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class Distinct(IR): """Produce a new dataframe with distinct rows.""" @@ -576,7 +621,7 @@ class Distinct(IR): "any": plc.stream_compaction.DuplicateKeepOption.KEEP_ANY, } - def __init__(self, schema: dict, df: IR, options: Any): + def __init__(self, schema: Schema, df: IR, options: Any) -> None: self.schema = schema self.df = df (keep, subset, maintain_order, zlice) = options @@ -585,7 +630,7 @@ def __init__(self, schema: dict, df: IR, options: Any): self.stable = maintain_order self.zlice = zlice - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" df = self.df.evaluate(cache=cache) if self.subset is None: @@ -614,20 +659,23 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: plc.types.NanEquality.ALL_EQUAL, ) result = DataFrame( - [Column(c, old.name) for c, old in zip(table.columns(), df.columns)], [] + [ + NamedColumn(c, old.name).sorted_like(old) + for c, old in zip(table.columns(), df.columns) + ] ) if keys_sorted or self.stable: result = result.sorted_like(df) return result.slice(self.zlice) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class Sort(IR): """Sort a dataframe.""" df: IR """Input.""" - by: list[expr.Expr] + by: list[expr.NamedExpr] """List of expressions to produce sort keys.""" do_sort: Callable[..., plc.Table] """pylibcudf sorting function.""" @@ -640,12 +688,12 @@ class Sort(IR): def __init__( self, - schema: dict, + schema: Schema, df: IR, - by: list[expr.Expr], + by: list[expr.NamedExpr], options: Any, zlice: tuple[int, int] | None, - ): + ) -> None: self.schema = schema self.df = df self.by = by @@ -658,10 +706,12 @@ def __init__( plc.sorting.stable_sort_by_key if stable else plc.sorting.sort_by_key ) - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" df = self.df.evaluate(cache=cache) - sort_keys = [k.evaluate(df) for k in self.by] + sort_keys = broadcast( + *(k.evaluate(df) for k in self.by), target_length=df.num_rows + ) names = {c.name: i for i, c in enumerate(df.columns)} # TODO: More robust identification here. keys_in_result = [ @@ -675,7 +725,9 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: self.order, self.null_order, ) - columns = [Column(c, old.name) for c, old in zip(table.columns(), df.columns)] + columns = [ + NamedColumn(c, old.name) for c, old in zip(table.columns(), df.columns) + ] # If a sort key is in the result table, set the sortedness property for k, i in enumerate(keys_in_result): columns[i] = columns[i].set_sorted( @@ -683,10 +735,10 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: order=self.order[k], null_order=self.null_order[k], ) - return DataFrame(columns, []).slice(self.zlice) + return DataFrame(columns).slice(self.zlice) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class Slice(IR): """Slice a dataframe.""" @@ -697,42 +749,46 @@ class Slice(IR): length: int """Length of the slice.""" - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" df = self.df.evaluate(cache=cache) return df.slice((self.offset, self.length)) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class Filter(IR): """Filter a dataframe with a boolean mask.""" df: IR """Input.""" - mask: expr.Expr + mask: expr.NamedExpr """Expression evaluating to a mask.""" - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" df = self.df.evaluate(cache=cache) - return df.filter(self.mask.evaluate(df)) + (mask,) = broadcast(self.mask.evaluate(df), target_length=df.num_rows) + return df.filter(mask) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class Projection(IR): """Select a subset of columns from a dataframe.""" df: IR """Input.""" - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" df = self.df.evaluate(cache=cache) # This can reorder things. - return df.select(list(self.schema.keys())) + columns = broadcast( + *df.select(list(self.schema.keys())).columns, target_length=df.num_rows + ) + return DataFrame(columns) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class MapFunction(IR): """Apply some function to a dataframe.""" @@ -753,7 +809,7 @@ class MapFunction(IR): ] ) - def __post_init__(self): + def __post_init__(self) -> None: """Validate preconditions.""" if self.name not in MapFunction._NAMES: raise NotImplementedError(f"Unhandled map function {self.name}") @@ -770,7 +826,7 @@ def __post_init__(self): if key_column not in self.df.dfs[0].schema: raise ValueError(f"Key column {key_column} not found") - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" if self.name == "merge_sorted": # merge_sorted operates on Union inputs @@ -822,7 +878,7 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: raise AssertionError("Should never be reached") -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class Union(IR): """Concatenate dataframes vertically.""" @@ -831,13 +887,13 @@ class Union(IR): zlice: tuple[int, int] | None """Optional slice to apply after concatenation.""" - def __post_init__(self): + def __post_init__(self) -> None: """Validated preconditions.""" schema = self.dfs[0].schema if not all(s.schema == schema for s in self.dfs[1:]): raise ValueError("Schema mismatch") - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" # TODO: only evaluate what we need if we have a slice dfs = [df.evaluate(cache=cache) for df in self.dfs] @@ -846,24 +902,22 @@ def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: ).slice(self.zlice) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class HConcat(IR): """Concatenate dataframes horizontally.""" dfs: list[IR] """List of inputs.""" - def evaluate(self, *, cache: dict[int, DataFrame]) -> DataFrame: + def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" dfs = [df.evaluate(cache=cache) for df in self.dfs] - columns, scalars = zip(*((df.columns, df.scalars) for df in dfs)) return DataFrame( - list(itertools.chain.from_iterable(columns)), - list(itertools.chain.from_iterable(scalars)), + list(itertools.chain.from_iterable(df.columns for df in dfs)), ) -@dataclass(slots=True) +@dataclasses.dataclass(slots=True) class ExtContext(IR): """ Concatenate dataframes horizontally. @@ -876,7 +930,7 @@ class ExtContext(IR): extra: list[IR] """List of extra inputs.""" - def __post_init__(self): + def __post_init__(self) -> NoReturn: """Validate preconditions.""" raise NotImplementedError( "ExtContext will be deprecated, use horizontal concat instead." diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index 9a301164beb..38107023365 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -16,95 +16,123 @@ import cudf._lib.pylibcudf as plc from cudf_polars.dsl import expr, ir +from cudf_polars.typing import NodeTraverser from cudf_polars.utils import dtypes -__all__ = ["translate_ir", "translate_expr"] +__all__ = ["translate_ir", "translate_named_expr"] -class set_node(AbstractContextManager): - """Run a block with current node set in the visitor.""" +class set_node(AbstractContextManager[None]): + """ + Run a block with current node set in the visitor. + + Parameters + ---------- + visitor + The internal Rust visitor object + n + The node to set as the current root. + + Notes + ----- + This is useful for translating expressions with a given node + active, restoring the node when the block exits. + """ __slots__ = ("n", "visitor") + visitor: NodeTraverser + n: int - def __init__(self, visitor, n: int): + def __init__(self, visitor: NodeTraverser, n: int) -> None: self.visitor = visitor self.n = n - def __enter__(self): + def __enter__(self) -> None: n = self.visitor.get_node() self.visitor.set_node(self.n) self.n = n - def __exit__(self, *args): + def __exit__(self, *args: Any) -> None: self.visitor.set_node(self.n) -noop_context: nullcontext = nullcontext() +noop_context: nullcontext[None] = nullcontext() @singledispatch -def _translate_ir(node: Any, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _translate_ir( + node: Any, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: raise NotImplementedError(f"Translation for {type(node).__name__}") @_translate_ir.register -def _(node: pl_ir.PythonScan, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.PythonScan, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: return ir.PythonScan( schema, node.options, - translate_expr(visitor, n=node.predicate) + translate_named_expr(visitor, n=node.predicate) if node.predicate is not None else None, ) @_translate_ir.register -def _(node: pl_ir.Scan, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.Scan, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: return ir.Scan( schema, node.scan_type, node.paths, node.file_options, - translate_expr(visitor, n=node.predicate) + translate_named_expr(visitor, n=node.predicate) if node.predicate is not None else None, ) @_translate_ir.register -def _(node: pl_ir.Cache, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.Cache, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: return ir.Cache(schema, node.id_, translate_ir(visitor, n=node.input)) @_translate_ir.register def _( - node: pl_ir.DataFrameScan, visitor: Any, schema: dict[str, plc.DataType] + node: pl_ir.DataFrameScan, visitor: NodeTraverser, schema: dict[str, plc.DataType] ) -> ir.IR: return ir.DataFrameScan( schema, node.df, node.projection, - translate_expr(visitor, n=node.selection) + translate_named_expr(visitor, n=node.selection) if node.selection is not None else None, ) @_translate_ir.register -def _(node: pl_ir.Select, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.Select, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) - cse_exprs = [translate_expr(visitor, n=e) for e in node.cse_expr] - exprs = [translate_expr(visitor, n=e) for e in node.expr] - return ir.Select(schema, inp, cse_exprs, exprs) + exprs = [translate_named_expr(visitor, n=e) for e in node.expr] + return ir.Select(schema, inp, exprs) @_translate_ir.register -def _(node: pl_ir.GroupBy, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.GroupBy, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) - aggs = [translate_expr(visitor, n=e) for e in node.aggs] - keys = [translate_expr(visitor, n=e) for e in node.keys] + aggs = [translate_named_expr(visitor, n=e) for e in node.aggs] + keys = [translate_named_expr(visitor, n=e) for e in node.keys] return ir.GroupBy( schema, inp, @@ -116,38 +144,45 @@ def _(node: pl_ir.GroupBy, visitor: Any, schema: dict[str, plc.DataType]) -> ir. @_translate_ir.register -def _(node: pl_ir.Join, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.Join, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: # Join key dtypes are dependent on the schema of the left and # right inputs, so these must be translated with the relevant # input active. with set_node(visitor, node.input_left): inp_left = translate_ir(visitor, n=None) - left_on = [translate_expr(visitor, n=e) for e in node.left_on] + left_on = [translate_named_expr(visitor, n=e) for e in node.left_on] with set_node(visitor, node.input_right): inp_right = translate_ir(visitor, n=None) - right_on = [translate_expr(visitor, n=e) for e in node.right_on] + right_on = [translate_named_expr(visitor, n=e) for e in node.right_on] return ir.Join(schema, inp_left, inp_right, left_on, right_on, node.options) @_translate_ir.register -def _(node: pl_ir.HStack, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.HStack, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) - cse_exprs = [translate_expr(visitor, n=e) for e in node.cse_exprs] - exprs = [translate_expr(visitor, n=e) for e in node.exprs] - return ir.HStack(schema, inp, cse_exprs, exprs) + exprs = [translate_named_expr(visitor, n=e) for e in node.exprs] + return ir.HStack(schema, inp, exprs) @_translate_ir.register -def _(node: pl_ir.Reduce, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.Reduce, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) - exprs = [translate_expr(visitor, n=e) for e in node.expr] + exprs = [translate_named_expr(visitor, n=e) for e in node.expr] return ir.Reduce(schema, inp, exprs) @_translate_ir.register -def _(node: pl_ir.Distinct, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.Distinct, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: return ir.Distinct( schema, translate_ir(visitor, n=node.input), @@ -156,35 +191,45 @@ def _(node: pl_ir.Distinct, visitor: Any, schema: dict[str, plc.DataType]) -> ir @_translate_ir.register -def _(node: pl_ir.Sort, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.Sort, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) - by = [translate_expr(visitor, n=e) for e in node.by_column] + by = [translate_named_expr(visitor, n=e) for e in node.by_column] return ir.Sort(schema, inp, by, node.sort_options, node.slice) @_translate_ir.register -def _(node: pl_ir.Slice, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.Slice, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: return ir.Slice(schema, translate_ir(visitor, n=node.input), node.offset, node.len) @_translate_ir.register -def _(node: pl_ir.Filter, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.Filter, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) - mask = translate_expr(visitor, n=node.predicate) + mask = translate_named_expr(visitor, n=node.predicate) return ir.Filter(schema, inp, mask) @_translate_ir.register def _( - node: pl_ir.SimpleProjection, visitor: Any, schema: dict[str, plc.DataType] + node: pl_ir.SimpleProjection, + visitor: NodeTraverser, + schema: dict[str, plc.DataType], ) -> ir.IR: return ir.Projection(schema, translate_ir(visitor, n=node.input)) @_translate_ir.register -def _(node: pl_ir.MapFunction, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.MapFunction, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: name, *options = node.function return ir.MapFunction( schema, @@ -196,19 +241,25 @@ def _(node: pl_ir.MapFunction, visitor: Any, schema: dict[str, plc.DataType]) -> @_translate_ir.register -def _(node: pl_ir.Union, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.Union, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: return ir.Union( schema, [translate_ir(visitor, n=n) for n in node.inputs], node.options ) @_translate_ir.register -def _(node: pl_ir.HConcat, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.HConcat, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: return ir.HConcat(schema, [translate_ir(visitor, n=n) for n in node.inputs]) @_translate_ir.register -def _(node: pl_ir.ExtContext, visitor: Any, schema: dict[str, plc.DataType]) -> ir.IR: +def _( + node: pl_ir.ExtContext, visitor: NodeTraverser, schema: dict[str, plc.DataType] +) -> ir.IR: return ir.ExtContext( schema, translate_ir(visitor, n=node.input), @@ -216,7 +267,7 @@ def _(node: pl_ir.ExtContext, visitor: Any, schema: dict[str, plc.DataType]) -> ) -def translate_ir(visitor: Any, *, n: int | None = None) -> ir.IR: +def translate_ir(visitor: NodeTraverser, *, n: int | None = None) -> ir.IR: """ Translate a polars-internal IR node to our representation. @@ -234,10 +285,10 @@ def translate_ir(visitor: Any, *, n: int | None = None) -> ir.IR: Raises ------ - NotImplementedError if we can't translate the nodes due to - unsupported functionality. + NotImplementedError + If we can't translate the nodes due to unsupported functionality. """ - ctx: AbstractContextManager = ( + ctx: AbstractContextManager[None] = ( set_node(visitor, n) if n is not None else noop_context ) with ctx: @@ -246,19 +297,47 @@ def translate_ir(visitor: Any, *, n: int | None = None) -> ir.IR: return _translate_ir(node, visitor, schema) -@singledispatch -def _translate_expr(node: Any, visitor: Any, dtype: plc.DataType) -> expr.Expr: - raise NotImplementedError(f"Translation for {type(node).__name__}") +def translate_named_expr( + visitor: NodeTraverser, *, n: pl_expr.PyExprIR +) -> expr.NamedExpr: + """ + Translate a polars-internal named expression IR object into our representation. + + Parameters + ---------- + visitor + Polars NodeTraverser object + n + Node to translate, a named expression node. + + Returns + ------- + Translated IR object. + Notes + ----- + The datatype of the internal expression will be obtained from the + visitor by calling ``get_dtype``, for this to work properly, the + caller should arrange that the expression is translated with the + node that it references "active" for the visitor (see :class:`set_node`). -@_translate_expr.register -def _(node: pl_expr.PyExprIR, visitor: Any, dtype: plc.DataType) -> expr.Expr: - e = translate_expr(visitor, n=node.node) - return expr.NamedExpr(dtype, node.output_name, e) + Raises + ------ + NotImplementedError + If any translation fails due to unsupported functionality. + """ + return expr.NamedExpr(n.output_name, translate_expr(visitor, n=n.node)) + + +@singledispatch +def _translate_expr( + node: Any, visitor: NodeTraverser, dtype: plc.DataType +) -> expr.Expr: + raise NotImplementedError(f"Translation for {type(node).__name__}") @_translate_expr.register -def _(node: pl_expr.Function, visitor: Any, dtype: plc.DataType) -> expr.Expr: +def _(node: pl_expr.Function, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr: name, *options = node.function_data options = tuple(options) if isinstance(name, pl_expr.StringFunction): @@ -280,7 +359,7 @@ def _(node: pl_expr.Function, visitor: Any, dtype: plc.DataType) -> expr.Expr: @_translate_expr.register -def _(node: pl_expr.Window, visitor: Any, dtype: plc.DataType) -> expr.Expr: +def _(node: pl_expr.Window, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr: # TODO: raise in groupby? if node.partition_by is None: return expr.RollingWindow( @@ -296,19 +375,19 @@ def _(node: pl_expr.Window, visitor: Any, dtype: plc.DataType) -> expr.Expr: @_translate_expr.register -def _(node: pl_expr.Literal, visitor: Any, dtype: plc.DataType) -> expr.Expr: +def _(node: pl_expr.Literal, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr: value = pa.scalar(node.value, type=plc.interop.to_arrow(dtype)) return expr.Literal(dtype, value) @_translate_expr.register -def _(node: pl_expr.Sort, visitor: Any, dtype: plc.DataType) -> expr.Expr: +def _(node: pl_expr.Sort, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr: # TODO: raise in groupby return expr.Sort(dtype, node.options, translate_expr(visitor, n=node.expr)) @_translate_expr.register -def _(node: pl_expr.SortBy, visitor: Any, dtype: plc.DataType) -> expr.Expr: +def _(node: pl_expr.SortBy, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr: return expr.SortBy( dtype, node.sort_options, @@ -318,7 +397,7 @@ def _(node: pl_expr.SortBy, visitor: Any, dtype: plc.DataType) -> expr.Expr: @_translate_expr.register -def _(node: pl_expr.Gather, visitor: Any, dtype: plc.DataType) -> expr.Expr: +def _(node: pl_expr.Gather, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr: return expr.Gather( dtype, translate_expr(visitor, n=node.expr), @@ -327,7 +406,7 @@ def _(node: pl_expr.Gather, visitor: Any, dtype: plc.DataType) -> expr.Expr: @_translate_expr.register -def _(node: pl_expr.Filter, visitor: Any, dtype: plc.DataType) -> expr.Expr: +def _(node: pl_expr.Filter, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr: return expr.Filter( dtype, translate_expr(visitor, n=node.input), @@ -336,7 +415,7 @@ def _(node: pl_expr.Filter, visitor: Any, dtype: plc.DataType) -> expr.Expr: @_translate_expr.register -def _(node: pl_expr.Cast, visitor: Any, dtype: plc.DataType) -> expr.Expr: +def _(node: pl_expr.Cast, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr: inner = translate_expr(visitor, n=node.expr) # Push casts into literals so we can handle Cast(Literal(Null)) if isinstance(inner, expr.Literal): @@ -346,12 +425,12 @@ def _(node: pl_expr.Cast, visitor: Any, dtype: plc.DataType) -> expr.Expr: @_translate_expr.register -def _(node: pl_expr.Column, visitor: Any, dtype: plc.DataType) -> expr.Expr: +def _(node: pl_expr.Column, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr: return expr.Col(dtype, node.name) @_translate_expr.register -def _(node: pl_expr.Agg, visitor: Any, dtype: plc.DataType) -> expr.Expr: +def _(node: pl_expr.Agg, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr: return expr.Agg( dtype, node.name, @@ -361,7 +440,9 @@ def _(node: pl_expr.Agg, visitor: Any, dtype: plc.DataType) -> expr.Expr: @_translate_expr.register -def _(node: pl_expr.BinaryExpr, visitor: Any, dtype: plc.DataType) -> expr.Expr: +def _( + node: pl_expr.BinaryExpr, visitor: NodeTraverser, dtype: plc.DataType +) -> expr.Expr: return expr.BinOp( dtype, expr.BinOp._MAPPING[node.op], @@ -371,11 +452,11 @@ def _(node: pl_expr.BinaryExpr, visitor: Any, dtype: plc.DataType) -> expr.Expr: @_translate_expr.register -def _(node: pl_expr.Len, visitor: Any, dtype: plc.DataType) -> expr.Expr: +def _(node: pl_expr.Len, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr: return expr.Len(dtype) -def translate_expr(visitor: Any, *, n: int | pl_expr.PyExprIR) -> expr.Expr: +def translate_expr(visitor: NodeTraverser, *, n: int) -> expr.Expr: """ Translate a polars-internal expression IR into our representation. @@ -384,8 +465,7 @@ def translate_expr(visitor: Any, *, n: int | pl_expr.PyExprIR) -> expr.Expr: visitor Polars NodeTraverser object n - Node to translate, either an integer referencing a polars - internal node, or a named expression node. + Node to translate, an integer referencing a polars internal node. Returns ------- @@ -393,14 +473,9 @@ def translate_expr(visitor: Any, *, n: int | pl_expr.PyExprIR) -> expr.Expr: Raises ------ - NotImplementedError if any translation fails due to unsupported functionality. + NotImplementedError + If any translation fails due to unsupported functionality. """ - if isinstance(n, pl_expr.PyExprIR): - # TODO: type narrowing doesn't rule out int since PyExprIR is Unknown - assert not isinstance(n, int) - node = n - dtype = dtypes.from_polars(visitor.get_dtype(node.node)) - else: - node = visitor.view_expression(n) - dtype = dtypes.from_polars(visitor.get_dtype(n)) + node = visitor.view_expression(n) + dtype = dtypes.from_polars(visitor.get_dtype(n)) return _translate_expr(node, visitor, dtype) diff --git a/python/cudf_polars/cudf_polars/py.typed b/python/cudf_polars/cudf_polars/py.typed new file mode 100644 index 00000000000..e69de29bb2d diff --git a/python/cudf_polars/cudf_polars/testing/asserts.py b/python/cudf_polars/cudf_polars/testing/asserts.py index a6e26a6425c..2f19b41cc3a 100644 --- a/python/cudf_polars/cudf_polars/testing/asserts.py +++ b/python/cudf_polars/cudf_polars/testing/asserts.py @@ -23,12 +23,12 @@ def assert_gpu_result_equal( *, check_row_order: bool = True, check_column_order: bool = True, - check_dtype: bool = True, + check_dtypes: bool = True, check_exact: bool = True, rtol: float = 1e-05, atol: float = 1e-08, categorical_as_str: bool = False, -): +) -> None: """ Assert that collection of a lazyframe on GPU produces correct results. @@ -40,7 +40,7 @@ def assert_gpu_result_equal( Expect rows to be in same order check_column_order Expect columns to be in same order - check_dtype + check_dtypes Expect dtypes to match check_exact Require exact equality for floats, if `False` compare using @@ -68,7 +68,7 @@ def assert_gpu_result_equal( got, check_row_order=check_row_order, check_column_order=check_column_order, - check_dtype=check_dtype, + check_dtypes=check_dtypes, check_exact=check_exact, rtol=rtol, atol=atol, diff --git a/python/cudf_polars/cudf_polars/typing/__init__.py b/python/cudf_polars/cudf_polars/typing/__init__.py new file mode 100644 index 00000000000..287c977f4eb --- /dev/null +++ b/python/cudf_polars/cudf_polars/typing/__init__.py @@ -0,0 +1,91 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +"""Typing utilities for cudf_polars.""" + +from __future__ import annotations + +from collections.abc import Mapping +from typing import TYPE_CHECKING, Protocol, TypeAlias + +from polars.polars import _expr_nodes as pl_expr, _ir_nodes as pl_ir + +import cudf._lib.pylibcudf as plc + +if TYPE_CHECKING: + from typing import Callable + + import polars as pl + +IR: TypeAlias = ( + pl_ir.PythonScan + | pl_ir.Scan + | pl_ir.Cache + | pl_ir.DataFrameScan + | pl_ir.Select + | pl_ir.GroupBy + | pl_ir.Join + | pl_ir.HStack + | pl_ir.Distinct + | pl_ir.Sort + | pl_ir.Slice + | pl_ir.Filter + | pl_ir.SimpleProjection + | pl_ir.MapFunction + | pl_ir.Union + | pl_ir.HConcat + | pl_ir.ExtContext +) + +Expr: TypeAlias = ( + pl_expr.Function + | pl_expr.Window + | pl_expr.Literal + | pl_expr.Sort + | pl_expr.SortBy + | pl_expr.Gather + | pl_expr.Filter + | pl_expr.Cast + | pl_expr.Column + | pl_expr.Agg + | pl_expr.BinaryExpr + | pl_expr.Len + | pl_expr.PyExprIR +) + +Schema: TypeAlias = Mapping[str, plc.DataType] + + +class NodeTraverser(Protocol): + """Abstract protocol for polars NodeTraverser.""" + + def get_node(self) -> int: + """Return current plan node id.""" + ... + + def set_node(self, n: int) -> None: + """Set the current plan node to n.""" + ... + + def view_current_node(self) -> IR: + """Convert current plan node to python rep.""" + ... + + def get_schema(self) -> Mapping[str, pl.DataType]: + """Get the schema of the current plan node.""" + ... + + def get_dtype(self, n: int) -> pl.DataType: + """Get the datatype of the given expression id.""" + ... + + def view_expression(self, n: int) -> Expr: + """Convert the given expression to python rep.""" + ... + + def set_udf( + self, + callback: Callable[[list[str] | None, str | None, int | None], pl.DataFrame], + ) -> None: + """Set the callback replacing the current node in the plan.""" + ... diff --git a/python/cudf_polars/cudf_polars/utils/dtypes.py b/python/cudf_polars/cudf_polars/utils/dtypes.py index bede0de3c9f..7b0049daf11 100644 --- a/python/cudf_polars/cudf_polars/utils/dtypes.py +++ b/python/cudf_polars/cudf_polars/utils/dtypes.py @@ -32,7 +32,8 @@ def from_polars(dtype: pl.DataType) -> plc.DataType: Raises ------ - NotImplementedError for unsupported conversions. + NotImplementedError + For unsupported conversions. """ if isinstance(dtype, pl.Boolean): return plc.DataType(plc.TypeId.BOOL8) diff --git a/python/cudf_polars/cudf_polars/utils/sorting.py b/python/cudf_polars/cudf_polars/utils/sorting.py index b3ecfdd3dd4..d35459db20d 100644 --- a/python/cudf_polars/cudf_polars/utils/sorting.py +++ b/python/cudf_polars/cudf_polars/utils/sorting.py @@ -14,7 +14,7 @@ def sort_order( - descending: Sequence[bool], *, nulls_last: bool, num_keys: int + descending: Sequence[bool], *, nulls_last: Sequence[bool], num_keys: int ) -> tuple[list[plc.types.Order], list[plc.types.NullOrder]]: """ Produce sort order arguments. @@ -36,14 +36,18 @@ def sort_order( # Mimicking polars broadcast handling of descending if num_keys > (n := len(descending)) and n == 1: descending = [descending[0]] * num_keys + if num_keys > (n := len(nulls_last)) and n == 1: + nulls_last = [nulls_last[0]] * num_keys column_order = [ plc.types.Order.DESCENDING if d else plc.types.Order.ASCENDING for d in descending ] null_precedence = [] - for asc in column_order: - if (asc == plc.types.Order.ASCENDING) ^ (not nulls_last): + # TODO: use strict=True when we drop py39 + assert len(descending) == len(nulls_last) + for asc, null_last in zip(column_order, nulls_last): + if (asc == plc.types.Order.ASCENDING) ^ (not null_last): null_precedence.append(plc.types.NullOrder.AFTER) - elif (asc == plc.types.Order.ASCENDING) ^ nulls_last: + elif (asc == plc.types.Order.ASCENDING) ^ null_last: null_precedence.append(plc.types.NullOrder.BEFORE) return column_order, null_precedence diff --git a/python/cudf_polars/docs/overview.md b/python/cudf_polars/docs/overview.md index cbf012f5881..b50d01c26db 100644 --- a/python/cudf_polars/docs/overview.md +++ b/python/cudf_polars/docs/overview.md @@ -34,6 +34,8 @@ pip install --upgrade uv uv pip install --upgrade -r py-polars/requirements-dev.txt ``` +> ![NOTE] plain `pip install` works fine, but `uv` is _much_ faster! + Now we have the necessary machinery to build polars ```sh cd py-polars @@ -57,7 +59,7 @@ The executor for the polars logical plan lives in the cudf repo, in ```sh cd cudf/python/cudf_polars -pip install --no-deps -e . +uv pip install --no-build-isolation --no-deps -e . ``` You should now be able to run the tests in the `cudf_polars` package: @@ -96,6 +98,21 @@ This should either transparently run on the GPU and deliver a polars dataframe, or else fail (but be handled) and just run the normal CPU execution. +If you want to fail during translation, set the keyword argument +`raise_on_fail` to `True`: + +```python +from functools import partial +from cudf_polars.callback import execute_with_cudf + +result = q.collect( + post_opt_callback=partial(execute_with_cudf, raise_on_fail=True) +) +``` + +This is mostly useful when writing tests, since in that case we want +any failures to propagate, rather than falling back to the CPU mode. + ## Adding a handler for a new plan node Plan node definitions live in `cudf_polars/dsl/ir.py`, these are @@ -153,22 +170,84 @@ the logical plan in any case, so is reasonably natural. # Containers Containers should be constructed as relatively lightweight objects -around their pylibcudf counterparts. We have three (in +around their pylibcudf counterparts. We have four (in `cudf_polars/containers/`): -1. Scalar (a wrapper around a pylibcudf Scalar) -2. Column (a wrapper around a pylibcudf Column) -3. DataFrame (a wrapper around a pylibcudf Table) +1. `Scalar` (a wrapper around a pylibcudf `Scalar`) +2. `Column` (a wrapper around a pylibcudf `Column`) +3. `NamedColumn` a `Column` with an additional name +4. `DataFrame` (a wrapper around a pylibcudf `Table`) The interfaces offered by these are somewhat in flux, but broadly -speaking, a `DataFrame` is just a list of `Column`s which each hold -data plus a string `name`, along with a collection of `Scalar`s (this -might go away). +speaking, a `DataFrame` is just a list of `NamedColumn`s which each +hold a `Column` plus a string `name`. `NamedColumn`s are only ever +constructed via `NamedExpr`s, which are the top-level expression node +that lives inside an `IR` node. This means that the expression +evaluator never has to concern itself with column names: columns are +only ever decorated with names when constructing a `DataFrame`. The columns keep track of metadata (for example, whether or not they -are sorted). +are sorted). We could imagine tracking more metadata, like minimum and +maximum, though perhaps that is better left to libcudf itself. We offer some utility methods for transferring metadata when constructing new dataframes and columns, both `DataFrame` and `Column` -offer a `with_metadata(*, like: Self)` call which copies metadata from -the template. +offer a `sorted_like(like: Self)` call which copies metadata from the +template. + +All methods on containers that modify in place should return `self`, +to facilitate use in a ["fluent" +style](https://en.wikipedia.org/wiki/Fluent_interface). It makes it +much easier to write iteration over objects and collect the results if +everyone always returns a value. + +# Writing tests + +We use `pytest`, tests live in the `tests/` subdirectory, +organisationally the top-level test files each handle one of the `IR` +nodes. The goal is that they are parametrized over all the options +each node will handle, to have reasonable coverage. Tests of +expression functionality should live in `tests/expressions/`. + +To write a test an assert correctness, build a lazyframe as a query, +and then use the utility assertion function from +`cudf_polars.testing.asserts`. This runs the query using both the cudf +executor and polars CPU, and checks that they match. So: + +```python +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +def test_whatever(): + query = pl.LazyFrame(...).(...) + + assert_gpu_result_equal(query) +``` + +# Debugging + +If the callback execution fails during the polars `collect` call, we +obtain an error, but are not able to drop into the debugger and +inspect the stack properly: we can't cross the language barrier. + +However, we can drive the translation and execution of the DSL by +hand. Given some `LazyFrame` representing a query, we can first +translate it to our intermediate representation (IR), and then execute +and convert back to polars: + +```python +from cudf_polars.dsl.translate import translate_ir + +q = ... + +# Convert to our IR +ir = translate_ir(q._ldf.visit()) + +# DataFrame living on the device +result = ir.evaluate(cache={}) + +# Polars dataframe +host_result = result.to_polars() +``` + +If we get any exceptions, we can then debug as normal in Python. diff --git a/python/cudf_polars/pyproject.toml b/python/cudf_polars/pyproject.toml index e50ee76a9b9..2faf8c3193f 100644 --- a/python/cudf_polars/pyproject.toml +++ b/python/cudf_polars/pyproject.toml @@ -62,8 +62,6 @@ target-version = "py39" fix = true [tool.ruff.lint] -# __init__.py must re-export everything it imports -ignore-init-module-imports = false select = [ "E", # pycodestyle "W", # pycodestyle diff --git a/python/cudf_polars/tests/expressions/test_agg.py b/python/cudf_polars/tests/expressions/test_agg.py index 645dbd26140..79018c80bf3 100644 --- a/python/cudf_polars/tests/expressions/test_agg.py +++ b/python/cudf_polars/tests/expressions/test_agg.py @@ -56,8 +56,8 @@ def test_agg(df, agg): q = df.select(expr) # https://github.com/rapidsai/cudf/issues/15852 - check_dtype = agg not in {"n_unique", "median"} - if not check_dtype and q.schema["a"] != pl.Float64: + check_dtypes = agg not in {"n_unique", "median"} + if not check_dtypes and q.schema["a"] != pl.Float64: with pytest.raises(AssertionError): assert_gpu_result_equal(q) - assert_gpu_result_equal(q, check_dtype=check_dtype, check_exact=False) + assert_gpu_result_equal(q, check_dtypes=check_dtypes, check_exact=False) diff --git a/python/cudf_polars/tests/test_select.py b/python/cudf_polars/tests/test_select.py index 503edef152e..037f3ab5428 100644 --- a/python/cudf_polars/tests/test_select.py +++ b/python/cudf_polars/tests/test_select.py @@ -36,3 +36,24 @@ def test_select_reduce(): ) assert_gpu_result_equal(query) + + +def test_select_with_cse_no_agg(): + df = pl.LazyFrame({"a": [1, 2, 3]}) + expr = pl.col("a") + pl.col("a") + + query = df.select(expr, (expr * 2).alias("b"), ((expr * 2) + 10).alias("c")) + + assert_gpu_result_equal(query) + + +def test_select_with_cse_with_agg(): + df = pl.LazyFrame({"a": [1, 2, 3]}) + expr = pl.col("a") + pl.col("a") + asum = pl.col("a").sum() + pl.col("a").sum() + + query = df.select( + expr, (expr * 2).alias("b"), asum.alias("c"), (asum + 10).alias("d") + ) + + assert_gpu_result_equal(query) diff --git a/python/cudf_polars/tests/test_union.py b/python/cudf_polars/tests/test_union.py index 2c85bb15a55..18cf4748692 100644 --- a/python/cudf_polars/tests/test_union.py +++ b/python/cudf_polars/tests/test_union.py @@ -2,14 +2,11 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations -import pytest - import polars as pl from cudf_polars.testing.asserts import assert_gpu_result_equal -@pytest.mark.xfail(reason="Need handling of null scalars that are cast") def test_union(): ldf = pl.DataFrame( { @@ -19,8 +16,6 @@ def test_union(): ).lazy() ldf2 = ldf.select((pl.col("a") + pl.col("b")).alias("c"), pl.col("a")) query = pl.concat([ldf, ldf2], how="diagonal") - # Plan for this produces a `None`.astype(Int64) which we don't - # handle correctly right now assert_gpu_result_equal(query)