Skip to content

Commit

Permalink
Implement cudf::rolling for decimal32 and decimal64(#7037)
Browse files Browse the repository at this point in the history
This PR resolves a part of #3556.

Aggregation ops supported:
* `MIN`
* `MAX`
* `COUNT` (both `null_policy` - `EX/INCLUDE`)
* `LEAD`
* `LAG`

**To Do List:**
* [x] Basic unit tests
* [x] Comprehensive unit tests
* [x] Implementation
* [x] Figure out which rolling ops to suppport

Authors:
  - Conor Hoekstra <codereport@outlook.com>

Approvers:
  - Vukasin Milovanovic
  - Ram (Ramakrishna Prabhu)

URL: #7037
  • Loading branch information
codereport authored Dec 31, 2020
1 parent 277bd9f commit 28d18d6
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 7 deletions.
7 changes: 5 additions & 2 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,10 +155,13 @@ std::unique_ptr<aggregation> make_udf_aggregation(udf_type type,
namespace detail {
namespace {
struct target_type_functor {
data_type type;
template <typename Source, aggregation::Kind k>
constexpr data_type operator()() const noexcept
{
return data_type{type_to_id<target_type_t<Source, k>>()};
auto const id = type_to_id<target_type_t<Source, k>>();
return id == type_id::DECIMAL32 || id == type_id::DECIMAL64 ? data_type{id, type.scale()}
: data_type{id};
}
};

Expand All @@ -174,7 +177,7 @@ struct is_valid_aggregation_impl {
// Return target data_type for the given source_type and aggregation
data_type target_type(data_type source, aggregation::Kind k)
{
return dispatch_type_and_aggregation(source, k, target_type_functor{});
return dispatch_type_and_aggregation(source, k, target_type_functor{source});
}

// Verifies the aggregation `k` is valid on the type `source`
Expand Down
14 changes: 10 additions & 4 deletions cpp/src/rolling/rolling_detail.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,9 @@ struct rolling_window_launcher {
std::unique_ptr<aggregation> const& agg,
rmm::cuda_stream_view stream)
{
using Type = device_storage_type_t<T>;
using OutType = device_storage_type_t<target_type_t<InputType, op>>;

constexpr cudf::size_type block_size = 256;
cudf::detail::grid_1d grid(input.size(), block_size);

Expand All @@ -516,7 +519,7 @@ struct rolling_window_launcher {
rmm::device_scalar<size_type> device_valid_count{0, stream};

if (input.has_nulls()) {
gpu_rolling<T, target_type_t<InputType, op>, agg_op, op, block_size, true>
gpu_rolling<Type, OutType, agg_op, op, block_size, true>
<<<grid.num_blocks, block_size, 0, stream.value()>>>(*input_device_view,
*default_outputs_device_view,
*output_device_view,
Expand All @@ -525,7 +528,7 @@ struct rolling_window_launcher {
following_window_begin,
min_periods);
} else {
gpu_rolling<T, target_type_t<InputType, op>, agg_op, op, block_size, false>
gpu_rolling<Type, OutType, agg_op, op, block_size, false>
<<<grid.num_blocks, block_size, 0, stream.value()>>>(*input_device_view,
*default_outputs_device_view,
*output_device_view,
Expand Down Expand Up @@ -558,6 +561,9 @@ struct rolling_window_launcher {
agg_op const& device_agg_op,
rmm::cuda_stream_view stream)
{
using Type = device_storage_type_t<T>;
using OutType = device_storage_type_t<target_type_t<InputType, op>>;

constexpr cudf::size_type block_size = 256;
cudf::detail::grid_1d grid(input.size(), block_size);

Expand All @@ -568,7 +574,7 @@ struct rolling_window_launcher {
rmm::device_scalar<size_type> device_valid_count{0, stream};

if (input.has_nulls()) {
gpu_rolling<T, target_type_t<InputType, op>, agg_op, op, block_size, true>
gpu_rolling<Type, OutType, agg_op, op, block_size, true>
<<<grid.num_blocks, block_size, 0, stream.value()>>>(*input_device_view,
*default_outputs_device_view,
*output_device_view,
Expand All @@ -578,7 +584,7 @@ struct rolling_window_launcher {
min_periods,
device_agg_op);
} else {
gpu_rolling<T, target_type_t<InputType, op>, agg_op, op, block_size, false>
gpu_rolling<Type, OutType, agg_op, op, block_size, false>
<<<grid.num_blocks, block_size, 0, stream.value()>>>(*input_device_view,
*default_outputs_device_view,
*output_device_view,
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/rolling/rolling_detail.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ static constexpr bool is_rolling_supported()
return (op == aggregation::MIN) or (op == aggregation::MAX) or
(op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or
(op == aggregation::ROW_NUMBER) or (op == aggregation::LEAD) or (op == aggregation::LAG);

} else if (cudf::is_fixed_point<ColumnType>()) {
return (op == aggregation::MIN) or (op == aggregation::MAX) or
(op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or
(op == aggregation::LEAD) or (op == aggregation::LAG);
} else if (std::is_same<ColumnType, cudf::string_view>()) {
return (op == aggregation::MIN) or (op == aggregation::MAX) or
(op == aggregation::COUNT_VALID) or (op == aggregation::COUNT_ALL) or
Expand Down
78 changes: 78 additions & 0 deletions cpp/tests/rolling/rolling_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -952,4 +952,82 @@ TEST_F(RollingTestUdf, DynamicWindow)
CUDF_TEST_EXPECT_COLUMNS_EQUAL(*output, expected);
}

template <typename T>
struct FixedPointTests : public cudf::test::BaseFixture {
};

TYPED_TEST_CASE(FixedPointTests, cudf::test::FixedPointTypes);

TYPED_TEST(FixedPointTests, MinMaxCountLagLead)
{
using namespace numeric;
using namespace cudf;
using decimalXX = TypeParam;
using RepType = cudf::device_storage_type_t<decimalXX>;
using fp_wrapper = cudf::test::fixed_point_column_wrapper<RepType>;
using fw_wrapper = cudf::test::fixed_width_column_wrapper<size_type>;

auto const scale = scale_type{-1};
auto const input = fp_wrapper{{42, 1729, 55, 3, 1, 2}, {1, 1, 1, 1, 1, 1}, scale};
auto const expected_min = fp_wrapper{{42, 42, 3, 1, 1, 1}, {1, 1, 1, 1, 1, 1}, scale};
auto const expected_max = fp_wrapper{{1729, 1729, 1729, 55, 3, 2}, {1, 1, 1, 1, 1, 1}, scale};
auto const expected_lag = fp_wrapper{{0, 42, 1729, 55, 3, 1}, {0, 1, 1, 1, 1, 1}, scale};
auto const expected_lead = fp_wrapper{{1729, 55, 3, 1, 2, 0}, {1, 1, 1, 1, 1, 0}, scale};
auto const expected_count_val = fw_wrapper{{2, 3, 3, 3, 3, 2}, {1, 1, 1, 1, 1, 1}};
auto const expected_count_all = fw_wrapper{{2, 3, 3, 3, 3, 2}, {1, 1, 1, 1, 1, 1}};
// auto const expected_rowno = fw_wrapper{{1, 2, 2, 2, 2, 2}, {1, 1, 1, 1, 1, 1}};

auto const min = rolling_window(input, 2, 1, 1, make_min_aggregation());
auto const max = rolling_window(input, 2, 1, 1, make_max_aggregation());
auto const lag = rolling_window(input, 2, 1, 1, make_lag_aggregation(1));
auto const lead = rolling_window(input, 2, 1, 1, make_lead_aggregation(1));
auto const valid = rolling_window(input, 2, 1, 1, make_count_aggregation());
auto const all = rolling_window(input, 2, 1, 1, make_count_aggregation(null_policy::INCLUDE));
EXPECT_THROW(rolling_window(input, 2, 1, 1, make_row_number_aggregation()), cudf::logic_error);

CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_min, min->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_max, max->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_lag, lag->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_lead, lead->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_val, valid->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_all, all->view());
// CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_rowno, rowno->view());
}

TYPED_TEST(FixedPointTests, MinMaxCountLagLeadNulls)
{
using namespace numeric;
using namespace cudf;
using decimalXX = TypeParam;
using RepType = cudf::device_storage_type_t<decimalXX>;
using fp_wrapper = cudf::test::fixed_point_column_wrapper<RepType>;
using fw_wrapper = cudf::test::fixed_width_column_wrapper<size_type>;

auto const scale = scale_type{-1};
auto const input = fp_wrapper{{42, 1729, 55, 343, 1, 2}, {1, 0, 1, 0, 1, 1}, scale};
auto const expected_min = fp_wrapper{{42, 42, 55, 1, 1, 1}, {1, 1, 1, 1, 1, 1}, scale};
auto const expected_max = fp_wrapper{{42, 55, 55, 55, 2, 2}, {1, 1, 1, 1, 1, 1}, scale};
auto const expected_lag = fp_wrapper{{0, 42, 1729, 55, 343, 1}, {0, 1, 0, 1, 0, 1}, scale};
auto const expected_lead = fp_wrapper{{1729, 55, 343, 1, 2, 0}, {0, 1, 0, 1, 1, 0}, scale};
auto const expected_count_val = fw_wrapper{{1, 2, 1, 2, 2, 2}, {1, 1, 1, 1, 1, 1}};
auto const expected_count_all = fw_wrapper{{2, 3, 3, 3, 3, 2}, {1, 1, 1, 1, 1, 1}};
// auto const expected_rowno = fw_wrapper{{1, 2, 2, 2, 2, 2}, {1, 1, 1, 1, 1, 1}};

auto const min = rolling_window(input, 2, 1, 1, make_min_aggregation());
auto const max = rolling_window(input, 2, 1, 1, make_max_aggregation());
auto const lag = rolling_window(input, 2, 1, 1, make_lag_aggregation(1));
auto const lead = rolling_window(input, 2, 1, 1, make_lead_aggregation(1));
auto const valid = rolling_window(input, 2, 1, 1, make_count_aggregation());
auto const all = rolling_window(input, 2, 1, 1, make_count_aggregation(null_policy::INCLUDE));
EXPECT_THROW(rolling_window(input, 2, 1, 1, make_row_number_aggregation()), cudf::logic_error);

CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_min, min->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_max, max->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_lag, lag->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_lead, lead->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_val, valid->view());
CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_count_all, all->view());
// CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected_rowno, rowno->view());
}

CUDF_TEST_PROGRAM_MAIN()

0 comments on commit 28d18d6

Please sign in to comment.