Skip to content

Commit

Permalink
new converters have been added (#5147) (#5254)
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored Jun 6, 2024
1 parent 654f197 commit 07dce13
Show file tree
Hide file tree
Showing 2 changed files with 515 additions and 172 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,37 @@ std::shared_ptr<arrow::Array> ArrowTypeAsYqlTimestamp(const std::shared_ptr<arro
return builder.Build(true).make_array();
}

template <bool isOptional, typename TArrowType>
std::shared_ptr<arrow::Array> ArrowTypeAsYqlString(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value, ui64 multiplier, const TString& format = {}) {
::NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
::NYql::NUdf::TFixedSizeBlockReader<TArrowType, isOptional> reader;
for (i64 i = 0; i < value->length(); ++i) {
const NUdf::TBlockItem item = reader.GetItem(*value->data(), i);
if constexpr (isOptional) {
if (!item) {
builder.Add(item);
continue;
}
} else if (!item) {
throw parquet::ParquetException(TStringBuilder() << "null value for timestamp could not be represented in non-optional type");
}

const TArrowType baseValue = item.As<TArrowType>();
if (baseValue < 0 && baseValue > static_cast<i64>(::NYql::NUdf::MAX_TIMESTAMP)) {
throw parquet::ParquetException(TStringBuilder() << "timestamp in parquet is out of range [0, " << ::NYql::NUdf::MAX_TIMESTAMP << "]: " << baseValue);
}

if (static_cast<ui64>(baseValue) > ::NYql::NUdf::MAX_TIMESTAMP / multiplier) {
throw parquet::ParquetException(TStringBuilder() << "timestamp in parquet is out of range [0, " << ::NYql::NUdf::MAX_TIMESTAMP << "] after transformation: " << baseValue);
}

const ui64 v = baseValue * multiplier;
TString result = format ? TInstant::FromValue(v).FormatGmTime(format.c_str()) : TInstant::FromValue(v).ToString();
builder.Add(NUdf::TBlockItem(NUdf::TStringRef(result.c_str(), result.Size())));
}
return builder.Build(true).make_array();
}

template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowStringAsYqlTimestamp(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value, const NDB::FormatSettings& formatSettings) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui64, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
Expand Down Expand Up @@ -373,6 +404,30 @@ TColumnConverter ArrowTimestampAsYqlTimestamp(const std::shared_ptr<arrow::DataT
};
}

TColumnConverter ArrowTimestampAsYqlString(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, arrow::TimeUnit::type timeUnit) {
return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(timeUnit)](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowTypeAsYqlString<true, i64>(targetType, value, multiplier)
: ArrowTypeAsYqlString<false, i64>(targetType, value, multiplier);
};
}

TColumnConverter ArrowDate64AsYqlString(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, arrow::DateUnit dateUnit) {
return [targetType, isOptional, multiplier=GetMultiplierForDatetime(dateUnit)](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowTypeAsYqlString<true, i64>(targetType, value, multiplier, "%Y-%m-%d")
: ArrowTypeAsYqlString<false, i64>(targetType, value, multiplier, "%Y-%m-%d");
};
}

TColumnConverter ArrowDate32AsYqlString(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, arrow::DateUnit dateUnit) {
return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(dateUnit)](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowTypeAsYqlString<true, i32>(targetType, value, multiplier, "%Y-%m-%d")
: ArrowTypeAsYqlString<false, i32>(targetType, value, multiplier, "%Y-%m-%d");
};
}

TColumnConverter ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, arrow::DateUnit unit) {
if (unit == arrow::DateUnit::MILLI) {
throw parquet::ParquetException(TStringBuilder() << "millisecond accuracy does not fit into the date");
Expand Down Expand Up @@ -457,6 +512,9 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
return ArrowDate32AsYqlDatetime(targetType, isOptional, dateType.unit());
case NUdf::EDataSlot::Timestamp:
return ArrowDate32AsYqlTimestamp(targetType, isOptional, dateType.unit());
case NUdf::EDataSlot::String:
case NUdf::EDataSlot::Utf8:
return ArrowDate32AsYqlString(targetType, isOptional, dateType.unit());
default:
return {};
}
Expand All @@ -469,6 +527,9 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
return ArrowDate64AsYqlDatetime(targetType, isOptional, dateType.unit());
case NUdf::EDataSlot::Timestamp:
return ArrowDate64AsYqlTimestamp(targetType, isOptional, dateType.unit());
case NUdf::EDataSlot::String:
case NUdf::EDataSlot::Utf8:
return ArrowDate64AsYqlString(targetType, isOptional, dateType.unit());
default:
return {};
}
Expand All @@ -480,10 +541,14 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
return ArrowTimestampAsYqlDatetime(targetType, isOptional, timestampType.unit());
case NUdf::EDataSlot::Timestamp:
return ArrowTimestampAsYqlTimestamp(targetType, isOptional, timestampType.unit());
case NUdf::EDataSlot::String:
case NUdf::EDataSlot::Utf8:
return ArrowTimestampAsYqlString(targetType, isOptional, timestampType.unit());
default:
return {};
}
}
case arrow::Type::STRING:
case arrow::Type::BINARY: {
switch (slotItem) {
case NUdf::EDataSlot::Datetime:
Expand Down
Loading

0 comments on commit 07dce13

Please sign in to comment.