Skip to content

Commit

Permalink
Merge branch 'branch-24.08' of github.com:rapidsai/cudf into pylibcud…
Browse files Browse the repository at this point in the history
…f-io-writers
  • Loading branch information
lithomas1 committed Jun 25, 2024
2 parents 186a2fb + cdfb550 commit 9a6a896
Show file tree
Hide file tree
Showing 166 changed files with 1,737 additions and 672 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ add_library(
src/reductions/product.cu
src/reductions/reductions.cpp
src/reductions/scan/rank_scan.cu
src/reductions/scan/ewm.cu
src/reductions/scan/scan.cpp
src/reductions/scan/scan_exclusive.cu
src/reductions/scan/scan_inclusive.cu
Expand Down
5 changes: 2 additions & 3 deletions cpp/benchmarks/io/text/multibyte_split.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,7 @@ static cudf::string_scalar create_random_input(int32_t num_chars,

// extract the chars from the returned strings column.
auto input_column_contents = input_column->release();
auto chars_column_contents = input_column_contents.children[1]->release();
auto chars_buffer = chars_column_contents.data.release();
auto chars_buffer = input_column_contents.data.release();

// turn the chars in to a string scalar.
return cudf::string_scalar(std::move(*chars_buffer));
Expand Down Expand Up @@ -218,7 +217,7 @@ NVBENCH_BENCH_TYPES(bench_multibyte_split,
NVBENCH_BENCH_TYPES(bench_multibyte_split, NVBENCH_TYPE_AXES(source_type_list))
.set_name("multibyte_split_source")
.set_min_samples(4)
.add_int64_axis("strip_delimiters", {1})
.add_int64_axis("strip_delimiters", {0, 1})
.add_int64_axis("delim_size", {1})
.add_int64_axis("delim_percent", {1})
.add_int64_power_of_two_axis("size_approx", {15, 30})
Expand Down
41 changes: 40 additions & 1 deletion cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -103,6 +103,7 @@ class aggregation {
NUNIQUE, ///< count number of unique elements
NTH_ELEMENT, ///< get the nth element
ROW_NUMBER, ///< get row-number of current index (relative to rolling window)
EWMA, ///< get exponential weighted moving average at current index
RANK, ///< get rank of current index
COLLECT_LIST, ///< collect values into a list
COLLECT_SET, ///< collect values into a list without duplicate entries
Expand Down Expand Up @@ -250,6 +251,8 @@ class segmented_reduce_aggregation : public virtual aggregation {
enum class udf_type : bool { CUDA, PTX };
/// Type of correlation method.
enum class correlation_type : int32_t { PEARSON, KENDALL, SPEARMAN };
/// Type of treatment of EWM input values' first value
enum class ewm_history : int32_t { INFINITE, FINITE };

/// Factory to create a SUM aggregation
/// @return A SUM aggregation object
Expand Down Expand Up @@ -411,6 +414,42 @@ std::unique_ptr<Base> make_nth_element_aggregation(
template <typename Base = aggregation>
std::unique_ptr<Base> make_row_number_aggregation();

/**
* @brief Factory to create an EWMA aggregation
*
* `EWMA` returns a non-nullable column with the same type as the input,
* whose values are the exponentially weighted moving average of the input
* sequence. Let these values be known as the y_i.
*
* EWMA aggregations are parameterized by a center of mass (`com`) which
* affects the contribution of the previous values (y_0 ... y_{i-1}) in
* computing the y_i.
*
* EWMA aggregations are also parameterized by a history `cudf::ewm_history`.
* Special considerations have to be given to the mathematical treatment of
* the first value of the input sequence. There are two approaches to this,
* one which considers the first value of the sequence to be the exponential
* weighted moving average of some infinite history of data, and one which
* takes the first value to be the only datapoint known. These assumptions
* lead to two different formulas for the y_i. `ewm_history` selects which.
*
* EWMA aggregations have special null handling. Nulls have two effects. The
* first is to propagate forward the last valid value as far as it has been
* computed. This could be thought of as the nulls not affecting the average
* in any way. The second effect changes the way the y_i are computed. Since
* a moving average is conceptually designed to weight contributing values by
* their recency, nulls ought to count as valid periods even though they do
* not change the average. For example, if the input sequence is {1, NULL, 3}
* then when computing y_2 one should weigh y_0 as if it occurs two periods
* before y_2 rather than just one.
*
* @param center_of_mass the center of mass.
* @param history which assumption to make about the first value
* @return A EWM aggregation object
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_ewma_aggregation(double const center_of_mass, ewm_history history);

/**
* @brief Factory to create a RANK aggregation
*
Expand Down
44 changes: 44 additions & 0 deletions cpp/include/cudf/detail/aggregation/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ class simple_aggregations_collector { // Declares the interface for the simple
class nth_element_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(data_type col_type,
class row_number_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(data_type col_type,
class ewma_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(data_type col_type,
class rank_aggregation const& agg);
virtual std::vector<std::unique_ptr<aggregation>> visit(
Expand Down Expand Up @@ -141,6 +143,7 @@ class aggregation_finalizer { // Declares the interface for the finalizer
virtual void visit(class correlation_aggregation const& agg);
virtual void visit(class tdigest_aggregation const& agg);
virtual void visit(class merge_tdigest_aggregation const& agg);
virtual void visit(class ewma_aggregation const& agg);
};

/**
Expand Down Expand Up @@ -667,6 +670,40 @@ class row_number_aggregation final : public rolling_aggregation {
void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Derived class for specifying an ewma aggregation
*/
class ewma_aggregation final : public scan_aggregation {
public:
double const center_of_mass;
cudf::ewm_history history;

ewma_aggregation(double const center_of_mass, cudf::ewm_history history)
: aggregation{EWMA}, center_of_mass{center_of_mass}, history{history}
{
}

std::unique_ptr<aggregation> clone() const override
{
return std::make_unique<ewma_aggregation>(*this);
}

std::vector<std::unique_ptr<aggregation>> get_simple_aggregations(
data_type col_type, simple_aggregations_collector& collector) const override
{
return collector.visit(col_type, *this);
}

bool is_equal(aggregation const& _other) const override
{
if (!this->aggregation::is_equal(_other)) { return false; }
auto const& other = dynamic_cast<ewma_aggregation const&>(_other);
return this->center_of_mass == other.center_of_mass and this->history == other.history;
}

void finalize(aggregation_finalizer& finalizer) const override { finalizer.visit(*this); }
};

/**
* @brief Derived class for specifying a rank aggregation
*/
Expand Down Expand Up @@ -1336,6 +1373,11 @@ struct target_type_impl<Source, aggregation::ROW_NUMBER> {
using type = size_type;
};

template <typename Source>
struct target_type_impl<Source, aggregation::EWMA> {
using type = double;
};

// Always use size_type accumulator for RANK
template <typename Source>
struct target_type_impl<Source, aggregation::RANK> {
Expand Down Expand Up @@ -1536,6 +1578,8 @@ CUDF_HOST_DEVICE inline decltype(auto) aggregation_dispatcher(aggregation::Kind
return f.template operator()<aggregation::TDIGEST>(std::forward<Ts>(args)...);
case aggregation::MERGE_TDIGEST:
return f.template operator()<aggregation::MERGE_TDIGEST>(std::forward<Ts>(args)...);
case aggregation::EWMA:
return f.template operator()<aggregation::EWMA>(std::forward<Ts>(args)...);
default: {
#ifndef __CUDA_ARCH__
CUDF_FAIL("Unsupported aggregation.");
Expand Down
22 changes: 22 additions & 0 deletions cpp/src/aggregation/aggregation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@ std::vector<std::unique_ptr<aggregation>> simple_aggregations_collector::visit(
return visit(col_type, static_cast<aggregation const&>(agg));
}

std::vector<std::unique_ptr<aggregation>> simple_aggregations_collector::visit(
data_type col_type, ewma_aggregation const& agg)
{
return visit(col_type, static_cast<aggregation const&>(agg));
}

std::vector<std::unique_ptr<aggregation>> simple_aggregations_collector::visit(
data_type col_type, rank_aggregation const& agg)
{
Expand Down Expand Up @@ -333,6 +339,11 @@ void aggregation_finalizer::visit(row_number_aggregation const& agg)
visit(static_cast<aggregation const&>(agg));
}

void aggregation_finalizer::visit(ewma_aggregation const& agg)
{
visit(static_cast<aggregation const&>(agg));
}

void aggregation_finalizer::visit(rank_aggregation const& agg)
{
visit(static_cast<aggregation const&>(agg));
Expand Down Expand Up @@ -665,6 +676,17 @@ std::unique_ptr<Base> make_row_number_aggregation()
template std::unique_ptr<aggregation> make_row_number_aggregation<aggregation>();
template std::unique_ptr<rolling_aggregation> make_row_number_aggregation<rolling_aggregation>();

/// Factory to create an EWMA aggregation
template <typename Base>
std::unique_ptr<Base> make_ewma_aggregation(double const com, cudf::ewm_history history)
{
return std::make_unique<detail::ewma_aggregation>(com, history);
}
template std::unique_ptr<aggregation> make_ewma_aggregation<aggregation>(double const com,
cudf::ewm_history history);
template std::unique_ptr<scan_aggregation> make_ewma_aggregation<scan_aggregation>(
double const com, cudf::ewm_history history);

/// Factory to create a RANK aggregation
template <typename Base>
std::unique_ptr<Base> make_rank_aggregation(rank_method method,
Expand Down
28 changes: 12 additions & 16 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -296,19 +296,6 @@ size_t column_size(column_view const& column, rmm::cuda_stream_view stream)
CUDF_FAIL("Unexpected compound type");
}

// checks to see if the given column has a fixed size. This doesn't
// check every row, so assumes string and list columns are not fixed, even
// if each row is the same width.
// TODO: update this if FIXED_LEN_BYTE_ARRAY is ever supported for writes.
bool is_col_fixed_width(column_view const& column)
{
if (column.type().id() == type_id::STRUCT) {
return std::all_of(column.child_begin(), column.child_end(), is_col_fixed_width);
}

return is_fixed_width(column.type());
}

/**
* @brief Extends SchemaElement to add members required in constructing parquet_column_view
*
Expand Down Expand Up @@ -946,6 +933,15 @@ struct parquet_column_view {
return schema_node.converted_type.value_or(UNKNOWN);
}

// Checks to see if the given column has a fixed-width data type. This doesn't
// check every value, so it assumes string and list columns are not fixed-width, even
// if each value has the same size.
[[nodiscard]] bool is_fixed_width() const
{
// lists and strings are not fixed width
return max_rep_level() == 0 and physical_type() != Type::BYTE_ARRAY;
}

std::vector<std::string> const& get_path_in_schema() { return path_in_schema; }

// LIST related member functions
Expand Down Expand Up @@ -1764,7 +1760,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
// unbalanced in final page sizes, so using 4 which seems to be a good
// compromise at smoothing things out without getting fragment sizes too small.
auto frag_size_fn = [&](auto const& col, size_t col_size) {
int const target_frags_per_page = is_col_fixed_width(col) ? 1 : 4;
int const target_frags_per_page = col.is_fixed_width() ? 1 : 4;
auto const avg_len =
target_frags_per_page * util::div_rounding_up_safe<size_t>(col_size, input.num_rows());
if (avg_len > 0) {
Expand All @@ -1775,8 +1771,8 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
}
};

std::transform(single_streams_table.begin(),
single_streams_table.end(),
std::transform(parquet_columns.begin(),
parquet_columns.end(),
column_sizes.begin(),
column_frag_size.begin(),
frag_size_fn);
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/io/text/data_chunk_source_factories.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ class istream_data_chunk_reader : public data_chunk_reader {
{
}

void skip_bytes(std::size_t size) override { _datastream->ignore(size); };
void skip_bytes(std::size_t size) override
{
// 20% faster than _datastream->ignore(size) for large files
_datastream->seekg(_datastream->tellg() + static_cast<std::ifstream::pos_type>(size));
};

std::unique_ptr<device_data_chunk> get_next_chunk(std::size_t read_size,
rmm::cuda_stream_view stream) override
Expand Down Expand Up @@ -265,7 +269,7 @@ class file_data_chunk_source : public data_chunk_source {
[[nodiscard]] std::unique_ptr<data_chunk_reader> create_reader() const override
{
return std::make_unique<istream_data_chunk_reader>(
std::make_unique<std::ifstream>(_filename, std::ifstream::in));
std::make_unique<std::ifstream>(_filename, std::ifstream::in | std::ifstream::binary));
}

private:
Expand Down
Loading

0 comments on commit 9a6a896

Please sign in to comment.