Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ydb yq stable converters #5255

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,37 @@ std::shared_ptr<arrow::Array> ArrowTypeAsYqlTimestamp(const std::shared_ptr<arro
return builder.Build(true).make_array();
}

template <bool isOptional, typename TArrowType>
std::shared_ptr<arrow::Array> ArrowTypeAsYqlString(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value, ui64 multiplier, const TString& format = {}) {
::NYql::NUdf::TStringArrayBuilder<arrow::BinaryType, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
::NYql::NUdf::TFixedSizeBlockReader<TArrowType, isOptional> reader;
for (i64 i = 0; i < value->length(); ++i) {
const NUdf::TBlockItem item = reader.GetItem(*value->data(), i);
if constexpr (isOptional) {
if (!item) {
builder.Add(item);
continue;
}
} else if (!item) {
throw parquet::ParquetException(TStringBuilder() << "null value for timestamp could not be represented in non-optional type");
}

const TArrowType baseValue = item.As<TArrowType>();
if (baseValue < 0 && baseValue > static_cast<i64>(::NYql::NUdf::MAX_TIMESTAMP)) {
throw parquet::ParquetException(TStringBuilder() << "timestamp in parquet is out of range [0, " << ::NYql::NUdf::MAX_TIMESTAMP << "]: " << baseValue);
}

if (static_cast<ui64>(baseValue) > ::NYql::NUdf::MAX_TIMESTAMP / multiplier) {
throw parquet::ParquetException(TStringBuilder() << "timestamp in parquet is out of range [0, " << ::NYql::NUdf::MAX_TIMESTAMP << "] after transformation: " << baseValue);
}

const ui64 v = baseValue * multiplier;
TString result = format ? TInstant::FromValue(v).FormatGmTime(format.c_str()) : TInstant::FromValue(v).ToString();
builder.Add(NUdf::TBlockItem(NUdf::TStringRef(result.c_str(), result.Size())));
}
return builder.Build(true).make_array();
}

template <bool isOptional>
std::shared_ptr<arrow::Array> ArrowStringAsYqlTimestamp(const std::shared_ptr<arrow::DataType>& targetType, const std::shared_ptr<arrow::Array>& value, const NDB::FormatSettings& formatSettings) {
::NYql::NUdf::TFixedSizeArrayBuilder<ui64, isOptional> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), targetType, *arrow::system_memory_pool(), value->length());
Expand Down Expand Up @@ -373,6 +404,30 @@ TColumnConverter ArrowTimestampAsYqlTimestamp(const std::shared_ptr<arrow::DataT
};
}

TColumnConverter ArrowTimestampAsYqlString(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, arrow::TimeUnit::type timeUnit) {
return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(timeUnit)](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowTypeAsYqlString<true, i64>(targetType, value, multiplier)
: ArrowTypeAsYqlString<false, i64>(targetType, value, multiplier);
};
}

TColumnConverter ArrowDate64AsYqlString(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, arrow::DateUnit dateUnit) {
return [targetType, isOptional, multiplier=GetMultiplierForDatetime(dateUnit)](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowTypeAsYqlString<true, i64>(targetType, value, multiplier, "%Y-%m-%d")
: ArrowTypeAsYqlString<false, i64>(targetType, value, multiplier, "%Y-%m-%d");
};
}

TColumnConverter ArrowDate32AsYqlString(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, arrow::DateUnit dateUnit) {
return [targetType, isOptional, multiplier=GetMultiplierForTimestamp(dateUnit)](const std::shared_ptr<arrow::Array>& value) {
return isOptional
? ArrowTypeAsYqlString<true, i32>(targetType, value, multiplier, "%Y-%m-%d")
: ArrowTypeAsYqlString<false, i32>(targetType, value, multiplier, "%Y-%m-%d");
};
}

TColumnConverter ArrowDate32AsYqlDate(const std::shared_ptr<arrow::DataType>& targetType, bool isOptional, arrow::DateUnit unit) {
if (unit == arrow::DateUnit::MILLI) {
throw parquet::ParquetException(TStringBuilder() << "millisecond accuracy does not fit into the date");
Expand Down Expand Up @@ -457,6 +512,9 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
return ArrowDate32AsYqlDatetime(targetType, isOptional, dateType.unit());
case NUdf::EDataSlot::Timestamp:
return ArrowDate32AsYqlTimestamp(targetType, isOptional, dateType.unit());
case NUdf::EDataSlot::String:
case NUdf::EDataSlot::Utf8:
return ArrowDate32AsYqlString(targetType, isOptional, dateType.unit());
default:
return {};
}
Expand All @@ -469,6 +527,9 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
return ArrowDate64AsYqlDatetime(targetType, isOptional, dateType.unit());
case NUdf::EDataSlot::Timestamp:
return ArrowDate64AsYqlTimestamp(targetType, isOptional, dateType.unit());
case NUdf::EDataSlot::String:
case NUdf::EDataSlot::Utf8:
return ArrowDate64AsYqlString(targetType, isOptional, dateType.unit());
default:
return {};
}
Expand All @@ -480,10 +541,14 @@ TColumnConverter BuildCustomConverter(const std::shared_ptr<arrow::DataType>& or
return ArrowTimestampAsYqlDatetime(targetType, isOptional, timestampType.unit());
case NUdf::EDataSlot::Timestamp:
return ArrowTimestampAsYqlTimestamp(targetType, isOptional, timestampType.unit());
case NUdf::EDataSlot::String:
case NUdf::EDataSlot::Utf8:
return ArrowTimestampAsYqlString(targetType, isOptional, timestampType.unit());
default:
return {};
}
}
case arrow::Type::STRING:
case arrow::Type::BINARY: {
switch (slotItem) {
case NUdf::EDataSlot::Datetime:
Expand Down
Loading
Loading