diff --git a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp index e417fcf535de..00c31f64ba53 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp @@ -195,6 +195,37 @@ class TTupleBlockItemConverter : public IBlockItemConverter { mutable TVector Items; }; +template +class TTzDateBlockItemConverter : public IBlockItemConverter { +public: + using TLayout = NYql::NUdf::TDataType::TLayout; + + NUdf::TUnboxedValuePod MakeValue(TBlockItem item, const THolderFactory& holderFactory) const final { + Y_UNUSED(holderFactory); + if constexpr (Nullable) { + if (!item) { + return {}; + } + } + + NUdf::TUnboxedValuePod value {item.Get()}; + value.SetTimezoneId(item.GetTimezoneId()); + return value; + } + + TBlockItem MakeItem(const NUdf::TUnboxedValuePod& value) const final { + if constexpr (Nullable) { + if (!value) { + return {}; + } + } + + TBlockItem item {value.Get()}; + item.SetTimezoneId(value.GetTimezoneId()); + return item; + } +}; + class TExternalOptionalBlockItemConverter : public IBlockItemConverter { public: TExternalOptionalBlockItemConverter(std::unique_ptr&& inner) @@ -229,6 +260,8 @@ struct TConverterTraits { template using TStrings = TStringBlockItemConverter; using TExtOptional = TExternalOptionalBlockItemConverter; + template + using TTzDateConverter = TTzDateBlockItemConverter; static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { if (desc.PassByValue) { @@ -258,6 +291,15 @@ struct TConverterTraits { return std::make_unique>(); } } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique>(); + } else { + return std::make_unique>(); + } + } }; } // namespace diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp index cd7a9a60cf01..abfebda40c64 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp @@ -516,6 +516,12 @@ struct TSerializerTraits { Y_UNUSED(isOptional); ythrow yexception() << "Serializer not implemented for block resources"; } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + Y_UNUSED(isOptional); + ythrow yexception() << "Serializer not implemented for block resources"; + } }; struct TDeserializerTraits { @@ -540,6 +546,12 @@ struct TDeserializerTraits { Y_UNUSED(isOptional); ythrow yexception() << "Deserializer not implemented for block resources"; } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + Y_UNUSED(isOptional); + ythrow yexception() << "Deserializer not implemented for block resources"; + } }; } // namespace diff --git a/ydb/library/yql/minikql/datetime/datetime.h b/ydb/library/yql/minikql/datetime/datetime.h index 71363b27cdf3..a86f28060cf0 100644 --- a/ydb/library/yql/minikql/datetime/datetime.h +++ b/ydb/library/yql/minikql/datetime/datetime.h @@ -21,6 +21,7 @@ struct TTMStorage { unsigned int Second : 6; unsigned int Microsecond : 20; unsigned int TimezoneId : 16; + ui8 Reserved[2]; TTMStorage() { Zero(*this); diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp index f9e88078dd38..1e067283ca72 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder.cpp @@ -1451,6 +1451,33 @@ bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr& ty case NUdf::EDataSlot::Json: type = arrow::utf8(); return true; + case NUdf::EDataSlot::TzDate: { + auto&& [dateType, timezoneType] = MakeTzDateArrowFieldTypes(); + std::vector> fields { + std::make_shared("date", std::move(dateType)), + std::make_shared("timezoneId", std::move(timezoneType)), + }; + type = std::make_shared(fields); + return true; + } + case NUdf::EDataSlot::TzDatetime: { + auto&& [dateType, timezoneType] = MakeTzDateArrowFieldTypes(); + std::vector> fields { + std::make_shared("datetime", std::move(dateType)), + std::make_shared("timezoneId", std::move(timezoneType)), + }; + type = std::make_shared(fields); + return true; + } + case NUdf::EDataSlot::TzTimestamp: { + auto&& [dateType, timezoneType] = MakeTzDateArrowFieldTypes(); + std::vector> fields { + std::make_shared("timestamp", std::move(dateType)), + std::make_shared("timezoneId", std::move(timezoneType)), + }; + type = std::make_shared(fields); + return true; + } default: return false; } @@ -2402,6 +2429,8 @@ struct TComparatorTraits { template using TStrings = NUdf::TStringBlockItemComparator; using TExtOptional = NUdf::TExternalOptionalBlockItemComparator; + template + using TTzDateComparator = NUdf::TTzDateBlockItemComparator; static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -2412,6 +2441,15 @@ struct TComparatorTraits { Y_UNUSED(isOptional); ythrow yexception() << "Comparator not implemented for block resources: "; } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique>(); + } else { + return std::make_unique>(); + } + } }; struct THasherTraits { @@ -2423,6 +2461,8 @@ struct THasherTraits { template using TStrings = NUdf::TStringBlockItemHasher; using TExtOptional = NUdf::TExternalOptionalBlockItemHasher; + template + using TTzDateHasher = NYql::NUdf::TTzDateBlockItemHasher; static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -2433,6 +2473,15 @@ struct THasherTraits { Y_UNUSED(isOptional); ythrow yexception() << "Hasher not implemented for block resources"; } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique>(); + } else { + return std::make_unique>(); + } + } }; NUdf::IBlockItemComparator::TPtr TBlockTypeHelper::MakeComparator(NUdf::TType* type) const { diff --git a/ydb/library/yql/minikql/mkql_type_builder.h b/ydb/library/yql/minikql/mkql_type_builder.h index e59bac8c8782..6dfa61fe8c2a 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.h +++ b/ydb/library/yql/minikql/mkql_type_builder.h @@ -32,6 +32,22 @@ inline size_t CalcBlockLen(size_t maxBlockItemSize) { bool ConvertArrowType(TType* itemType, std::shared_ptr& type); bool ConvertArrowType(NUdf::EDataSlot slot, std::shared_ptr& type); +template +std::pair, std::shared_ptr> MakeTzDateArrowFieldTypes() { + const auto make_arrow_date_type = [] () { + if constexpr (slot == NUdf::EDataSlot::TzDate) { + return arrow::uint16(); + } + if constexpr (slot == NUdf::EDataSlot::TzDatetime) { + return arrow::uint32(); + } + return arrow::uint64(); + }; + + const auto timezoneType = arrow::uint16(); + return { make_arrow_date_type(), timezoneType }; +} + class TArrowType : public NUdf::IArrowType { public: TArrowType(const std::shared_ptr& type) diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp index 0656f26f4cd0..4f4d9f35390d 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp @@ -438,6 +438,11 @@ struct TYsonBlockReaderTraits { Y_UNUSED(isOptional); ythrow yexception() << "Yson reader not implemented for block resources"; } + + static std::unique_ptr MakeTzDate(bool isOptional) { + Y_UNUSED(isOptional); + ythrow yexception() << "Yson reader not implemented for block tz dates"; + } }; template diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h index 8e82b46cbcbd..8ada8abf22fa 100644 --- a/ydb/library/yql/public/udf/arrow/block_builder.h +++ b/ydb/library/yql/public/udf/arrow/block_builder.h @@ -487,15 +487,16 @@ class TFixedSizeArrayBuilderBase : public TArrayBuilderBase { template class TFixedSizeArrayBuilder final: public TFixedSizeArrayBuilderBase> { - using TDerived = TFixedSizeArrayBuilder; + using TSelf = TFixedSizeArrayBuilder; + using TBase = TFixedSizeArrayBuilderBase; public: TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, std::shared_ptr arrowType, arrow::MemoryPool& pool, size_t maxLen) - : TFixedSizeArrayBuilderBase(typeInfoHelper, std::move(arrowType), pool, maxLen) + : TBase(typeInfoHelper, std::move(arrowType), pool, maxLen) {} TFixedSizeArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen) - : TFixedSizeArrayBuilderBase(typeInfoHelper, type, pool, maxLen) + : TBase(typeInfoHelper, type, pool, maxLen) {} void DoAddNotNull(TUnboxedValuePod value) { @@ -513,6 +514,14 @@ class TFixedSizeArrayBuilder final: public TFixedSizeArrayBuilderBaseDataPtr + this->GetCurrLen(), this->DataPtr + this->GetCurrLen() + count, value.Get()); } + + using TBase::Add; + + void Add(TLayout&& value) { + Y_DEBUG_ABORT_UNLESS(this->GetCurrLen() < this->MaxLen); + this->PlaceItem(std::move(value)); + this->SetCurrLen(this->GetCurrLen() + 1); + } }; template @@ -889,13 +898,11 @@ class TStringArrayBuilder final : public TArrayBuilderBase { i32 TypeLen = 0; }; -template -class TTupleArrayBuilder final : public TArrayBuilderBase { +template +class TTupleArrayBuilderBase : public TArrayBuilderBase { public: - TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, - TVector&& children) + TTupleArrayBuilderBase(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen) : TArrayBuilderBase(typeInfoHelper, type, pool, maxLen) - , Children(std::move(children)) { Reserve(); } @@ -904,70 +911,50 @@ class TTupleArrayBuilder final : public TArrayBuilderBase { if constexpr (Nullable) { if (!value) { NullBuilder->UnsafeAppend(0); - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->AddDefault(); - } + static_cast(this)->AddToChildrenDefault(); return; } NullBuilder->UnsafeAppend(1); } - auto elements = value.GetElements(); - if (elements) { - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->Add(elements[i]); - } - } else { - for (ui32 i = 0; i < Children.size(); ++i) { - auto element = value.GetElement(i); - Children[i]->Add(element); - } - } + static_cast(this)->AddToChildren(value); } void DoAdd(TBlockItem value) final { if constexpr (Nullable) { if (!value) { NullBuilder->UnsafeAppend(0); - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->AddDefault(); - } + static_cast(this)->AddToChildrenDefault(); return; } NullBuilder->UnsafeAppend(1); } - auto elements = value.AsTuple(); - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->Add(elements[i]); - } + static_cast(this)->AddToChildren(value); } void DoAdd(TInputBuffer& input) final { if constexpr (Nullable) { if (!input.PopChar()) { - return DoAdd(TBlockItem{}); + NullBuilder->UnsafeAppend(0); + static_cast(this)->AddToChildrenDefault(); + return; } NullBuilder->UnsafeAppend(1); } - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->Add(input); - } + static_cast(this)->AddToChildren(input); } void DoAddDefault() final { if constexpr (Nullable) { NullBuilder->UnsafeAppend(1); } - for (ui32 i = 0; i < Children.size(); ++i) { - Children[i]->AddDefault(); - } + static_cast(this)->AddToChildrenDefault(); } void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { Y_ABORT_UNLESS(!array.buffers.empty()); - Y_ABORT_UNLESS(array.child_data.size() == Children.size()); if constexpr (Nullable) { if (array.buffers.front()) { @@ -979,14 +966,11 @@ class TTupleArrayBuilder final : public TArrayBuilderBase { } } - for (size_t i = 0; i < Children.size(); ++i) { - Children[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length); - } + static_cast(this)->AddManyToChildren(array, sparseBitmap, popCount); } void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final { Y_ABORT_UNLESS(!array.buffers.empty()); - Y_ABORT_UNLESS(array.child_data.size() == Children.size()); if constexpr (Nullable) { for (ui64 i = beginIndex; i < beginIndex + count; ++i) { @@ -994,14 +978,11 @@ class TTupleArrayBuilder final : public TArrayBuilderBase { } } - for (size_t i = 0; i < Children.size(); ++i) { - Children[i]->AddMany(*array.child_data[i], beginIndex, count); - } + static_cast(this)->AddManyToChildren(array, beginIndex, count); } void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final { Y_ABORT_UNLESS(!array.buffers.empty()); - Y_ABORT_UNLESS(array.child_data.size() == Children.size()); if constexpr (Nullable) { for (size_t i = 0; i < count; ++i) { @@ -1009,9 +990,7 @@ class TTupleArrayBuilder final : public TArrayBuilderBase { } } - for (size_t i = 0; i < Children.size(); ++i) { - Children[i]->AddMany(*array.child_data[i], indexes, count); - } + static_cast(this)->AddManyToChildren(array, indexes, count); } TBlockArrayTree::Ptr DoBuildTree(bool finish) final { @@ -1027,10 +1006,7 @@ class TTupleArrayBuilder final : public TArrayBuilderBase { Y_ABORT_UNLESS(length); result->Payload.push_back(arrow::ArrayData::Make(ArrowType, length, { nullBitmap })); - result->Children.reserve(Children.size()); - for (ui32 i = 0; i < Children.size(); ++i) { - result->Children.emplace_back(Children[i]->BuildTree(finish)); - } + static_cast(this)->BuildChildrenTree(finish, result->Children); if (!finish) { Reserve(); @@ -1048,10 +1024,152 @@ class TTupleArrayBuilder final : public TArrayBuilderBase { } private: - TVector> Children; std::unique_ptr> NullBuilder; }; +template +class TTupleArrayBuilder final : public TTupleArrayBuilderBase> { +public: + + TTupleArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, + TVector&& children) + : TTupleArrayBuilderBase>(typeInfoHelper, type, pool, maxLen) + , Children_(std::move(children)) {} + + void AddToChildrenDefault() { + for (ui32 i = 0; i < Children_.size(); ++i) { + Children_[i]->AddDefault(); + } + } + + void AddToChildren(NUdf::TUnboxedValuePod value) { + auto elements = value.GetElements(); + if (elements) { + for (ui32 i = 0; i < Children_.size(); ++i) { + Children_[i]->Add(elements[i]); + } + } else { + for (ui32 i = 0; i < Children_.size(); ++i) { + auto element = value.GetElement(i); + Children_[i]->Add(element); + } + } + } + + void AddToChildren(TBlockItem value) { + auto elements = value.AsTuple(); + for (ui32 i = 0; i < Children_.size(); ++i) { + Children_[i]->Add(elements[i]); + } + } + + void AddToChildren(TInputBuffer& input) { + for (ui32 i = 0; i < Children_.size(); ++i) { + Children_[i]->Add(input); + } + } + + void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) { + Y_ABORT_UNLESS(array.child_data.size() == Children_.size()); + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->AddMany(*array.child_data[i], popCount, sparseBitmap, array.length); + } + } + + void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) { + Y_ABORT_UNLESS(array.child_data.size() == Children_.size()); + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->AddMany(*array.child_data[i], beginIndex, count); + } + } + + void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) { + Y_ABORT_UNLESS(array.child_data.size() == Children_.size()); + for (size_t i = 0; i < Children_.size(); ++i) { + Children_[i]->AddMany(*array.child_data[i], indexes, count); + } + } + + void BuildChildrenTree(bool finish, std::vector& resultChildren) { + resultChildren.reserve(Children_.size()); + for (ui32 i = 0; i < Children_.size(); ++i) { + resultChildren.emplace_back(Children_[i]->BuildTree(finish)); + } + } + +private: +TVector> Children_; +}; + +template +class TTzDateArrayBuilder final : public TTupleArrayBuilderBase> { + using TDateLayout = TDataType::TLayout; + + static std::shared_ptr GetArrowTypeForLayout() { + if constexpr (std::is_same_v) { + return arrow::uint16(); + } else if constexpr (std::is_same_v) { + return arrow::uint32(); + } + return arrow::uint64(); + } + +public: + TTzDateArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen) + : TTupleArrayBuilderBase>(typeInfoHelper, type, pool, maxLen) + , DateBuilder_(typeInfoHelper, GetArrowTypeForLayout(), pool, maxLen) + , TimezoneBuilder_(typeInfoHelper, arrow::uint16(), pool, maxLen) + { + } + + void AddToChildrenDefault() { + DateBuilder_.AddDefault(); + TimezoneBuilder_.AddDefault(); + } + + void AddToChildren(NUdf::TUnboxedValuePod value) { + DateBuilder_.Add(value); + TimezoneBuilder_.Add(value.GetTimezoneId()); + } + + void AddToChildren(TBlockItem value) { + DateBuilder_.Add(value); + TimezoneBuilder_.Add(value.GetTimezoneId()); + } + + void AddToChildren(TInputBuffer& input) { + AddToChildren(input.PopNumber()); + } + + void AddManyToChildren(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) { + Y_ABORT_UNLESS(array.child_data.size() == 2); + DateBuilder_.AddMany(*array.child_data[0], popCount, sparseBitmap, array.length); + TimezoneBuilder_.AddMany(*array.child_data[1], popCount, sparseBitmap, array.length); + } + + void AddManyToChildren(const arrow::ArrayData& array, ui64 beginIndex, size_t count) { + Y_ABORT_UNLESS(array.child_data.size() == 2); + DateBuilder_.AddMany(*array.child_data[0], beginIndex, count); + TimezoneBuilder_.AddMany(*array.child_data[1], beginIndex, count); + } + + void AddManyToChildren(const arrow::ArrayData& array, const ui64* indexes, size_t count) { + Y_ABORT_UNLESS(array.child_data.size() == 2); + DateBuilder_.AddMany(*array.child_data[0], indexes, count); + TimezoneBuilder_.AddMany(*array.child_data[1], indexes, count); + } + + void BuildChildrenTree(bool finish, std::vector& resultChildren) { + resultChildren.emplace_back(DateBuilder_.BuildTree(finish)); + resultChildren.emplace_back(TimezoneBuilder_.BuildTree(finish)); + } + +private: + TFixedSizeArrayBuilder DateBuilder_; + TFixedSizeArrayBuilder TimezoneBuilder_; +}; + + class TExternalOptionalArrayBuilder final : public TArrayBuilderBase { public: TExternalOptionalArrayBuilder(const ITypeInfoHelper& typeInfoHelper, const TType* type, arrow::MemoryPool& pool, size_t maxLen, std::unique_ptr&& inner) @@ -1232,6 +1350,12 @@ inline std::unique_ptr MakeArrayBuilderImpl( case NUdf::EDataSlot::Utf8: case NUdf::EDataSlot::Json: return std::make_unique>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::TzDate: + return std::make_unique>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::TzDatetime: + return std::make_unique>(typeInfoHelper, type, pool, maxLen); + case NUdf::EDataSlot::TzTimestamp: + return std::make_unique>(typeInfoHelper, type, pool, maxLen); default: Y_ENSURE(false, "Unsupported data slot"); } diff --git a/ydb/library/yql/public/udf/arrow/block_item.h b/ydb/library/yql/public/udf/arrow/block_item.h index 15edabe27ae1..9bf7254232df 100644 --- a/ydb/library/yql/public/udf/arrow/block_item.h +++ b/ydb/library/yql/public/udf/arrow/block_item.h @@ -155,6 +155,15 @@ class TBlockItem { bool IsBoxed() const { return EMarkers::Boxed == GetMarkers(); } bool IsEmbedded() const { return EMarkers::Embedded == GetMarkers(); } + inline void SetTimezoneId(ui16 id) { + UDF_VERIFY(GetMarkers() == EMarkers::Embedded, "Value is not a datetime"); + Raw.Simple.TimezoneId = id; + } + + inline ui16 GetTimezoneId() const { + UDF_VERIFY(GetMarkers() == EMarkers::Embedded, "Value is not a datetime"); + return Raw.Simple.TimezoneId; + } private: union TRaw { @@ -180,7 +189,8 @@ class TBlockItem { union { ui64 FullMeta; struct { - ui8 Reserved[7]; + ui16 TimezoneId; + ui8 Reserved[5]; ui8 Meta; }; }; diff --git a/ydb/library/yql/public/udf/arrow/block_item_comparator.h b/ydb/library/yql/public/udf/arrow/block_item_comparator.h index 98ca57d7f94d..01df5a3549aa 100644 --- a/ydb/library/yql/public/udf/arrow/block_item_comparator.h +++ b/ydb/library/yql/public/udf/arrow/block_item_comparator.h @@ -148,6 +148,48 @@ class TStringBlockItemComparator : public TBlockItemComparatorBase +class TTzDateBlockItemComparator : public TBlockItemComparatorBase, Nullable> { + using TLayout = typename TDataType::TLayout; + +public: + bool DoCompare(TBlockItem lhs, TBlockItem rhs) const { + const auto x = lhs.Get(); + const auto y = rhs.Get(); + + if (x == y) { + const auto tx = lhs.GetTimezoneId(); + const auto ty = rhs.GetTimezoneId(); + return (tx == ty) ? 0 : (tx < ty ? -1 : 1); + } + + if (x < y) { + return -1; + } + + return 1; + } + + bool DoEquals(TBlockItem lhs, TBlockItem rhs) const { + return lhs.Get() == rhs.Get() && lhs.GetTimezoneId() == rhs.GetTimezoneId(); + } + + + bool DoLess(TBlockItem lhs, TBlockItem rhs) const { + const auto x = lhs.Get(); + const auto y = rhs.Get(); + + if (x == y) { + const auto tx = lhs.GetTimezoneId(); + const auto ty = rhs.GetTimezoneId(); + return tx < ty; + } + + return x < y; + } +}; + + template class TTupleBlockItemComparator : public TBlockItemComparatorBase, Nullable> { public: diff --git a/ydb/library/yql/public/udf/arrow/block_item_hasher.h b/ydb/library/yql/public/udf/arrow/block_item_hasher.h index 4c3d89e998e8..b948a7bf3302 100644 --- a/ydb/library/yql/public/udf/arrow/block_item_hasher.h +++ b/ydb/library/yql/public/udf/arrow/block_item_hasher.h @@ -49,6 +49,17 @@ class TFixedSizeBlockItemHasher : public TBlockItemHasherBase +class TTzDateBlockItemHasher : public TBlockItemHasherBase, Nullable> { +public: + ui64 DoHash(TBlockItem value) const { + using TLayout = typename TDataType::TLayout; + TUnboxedValuePod uv {value.As()}; + uv.SetTimezoneId(value.GetTimezoneId()); + return GetValueHash::Slot>(uv); + } +}; + template class TStringBlockItemHasher : public TBlockItemHasherBase, Nullable> { public: diff --git a/ydb/library/yql/public/udf/arrow/block_reader.h b/ydb/library/yql/public/udf/arrow/block_reader.h index 3f6c958ddefe..49bf8925cef9 100644 --- a/ydb/library/yql/public/udf/arrow/block_reader.h +++ b/ydb/library/yql/public/udf/arrow/block_reader.h @@ -203,26 +203,16 @@ class TStringBlockReader final : public IBlockReader { } }; -template -class TTupleBlockReader final : public IBlockReader { +template +class TTupleBlockReaderBase : public IBlockReader { public: - TTupleBlockReader(TVector>&& children) - : Children(std::move(children)) - , Items(Children.size()) - {} - TBlockItem GetItem(const arrow::ArrayData& data, size_t index) final { if constexpr (Nullable) { if (IsNull(data, index)) { return {}; } } - - for (ui32 i = 0; i < Children.size(); ++i) { - Items[i] = Children[i]->GetItem(*data.child_data[i], index); - } - - return TBlockItem(Items.data()); + return static_cast(this)->GetChildrenNotNullItems(data, index); } TBlockItem GetScalarItem(const arrow::Scalar& scalar) final { @@ -233,33 +223,87 @@ class TTupleBlockReader final : public IBlockReader { } const auto& structScalar = arrow::internal::checked_cast(scalar); + return static_cast(this)->GetChildrenNotNullScalarItems(structScalar); + } - for (ui32 i = 0; i < Children.size(); ++i) { - Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]); + ui64 GetDataWeight(const arrow::ArrayData& data) const final { + ui64 size = 0; + if constexpr (Nullable) { + size += data.length; } - return TBlockItem(Items.data()); + size += static_cast(this)->GetChildrenDataWeight(data); + return size; } - ui64 GetDataWeight(const arrow::ArrayData& data) const final { + ui64 GetDataWeight(TBlockItem item) const final { + return static_cast(this)->GetDataWeightImpl(item); + } + + ui64 GetDefaultValueWeight() const final { ui64 size = 0; if constexpr (Nullable) { - size += data.length; + size = 1; } + size += static_cast(this)->GetChildrenDefaultDataWeight(); + return size; + } + void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { + if constexpr (Nullable) { + if (IsNull(data, index)) { + return out.PushChar(0); + } + out.PushChar(1); + } + + static_cast(this)->SaveChildrenNotNullItems(data, index, out); + } + + void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final { + if constexpr (Nullable) { + if (!scalar.is_valid) { + return out.PushChar(0); + } + out.PushChar(1); + } + + const auto& structScalar = arrow::internal::checked_cast(scalar); + + static_cast(this)->SaveChildrenNotNullScalarItems(structScalar, out); + } +}; + +template +class TTupleBlockReader final : public TTupleBlockReaderBase> { +public: + TTupleBlockReader(TVector>&& children) + : Children(std::move(children)) + , Items(Children.size()) + {} + + TBlockItem GetChildrenNotNullItems(const arrow::ArrayData& data, size_t index) { for (ui32 i = 0; i < Children.size(); ++i) { - size += Children[i]->GetDataWeight(*data.child_data[i]); + Items[i] = Children[i]->GetItem(*data.child_data[i], index); } - return size; + return TBlockItem(Items.data()); } - ui64 GetDataWeight(TBlockItem item) const final { + TBlockItem GetChildrenNotNullScalarItems(const arrow::StructScalar& structScalar) { + for (ui32 i = 0; i < Children.size(); ++i) { + Items[i] = Children[i]->GetScalarItem(*structScalar.value[i]); + } + + return TBlockItem(Items.data()); + } + + size_t GetDataWeightImpl(const TBlockItem& item) const { const TBlockItem* items = nullptr; ui64 size = 0; if constexpr (Nullable) { if (!item) { - return GetDefaultValueWeight(); + return this->GetDefaultValueWeight(); } size = 1; items = item.GetOptionalValue().GetElements(); @@ -274,40 +318,39 @@ class TTupleBlockReader final : public IBlockReader { return size; } - ui64 GetDefaultValueWeight() const final { - ui64 size = 0; - if constexpr (Nullable) { - size = 1; + size_t GetChildrenDataWeight(const arrow::ArrayData& data) const { + size_t size = 0; + for (ui32 i = 0; i < Children.size(); ++i) { + size += Children[i]->GetDataWeight(*data.child_data[i]); } + + return size; + } + + size_t GetChildrenDataWeight(const TBlockItem* items) const { + size_t size = 0; for (ui32 i = 0; i < Children.size(); ++i) { - size += Children[i]->GetDefaultValueWeight(); + size += Children[i]->GetDataWeight(items[i]); } + return size; } - void SaveItem(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const final { - if constexpr (Nullable) { - if (IsNull(data, index)) { - return out.PushChar(0); - } - out.PushChar(1); + size_t GetChildrenDefaultDataWeight() const { + size_t size = 0; + for (ui32 i = 0; i < Children.size(); ++i) { + size += Children[i]->GetDefaultValueWeight(); } + return size; + } + void SaveChildrenNotNullItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const { for (ui32 i = 0; i < Children.size(); ++i) { Children[i]->SaveItem(*data.child_data[i], index, out); } } - - void SaveScalarItem(const arrow::Scalar& scalar, TOutputBuffer& out) const final { - if constexpr (Nullable) { - if (!scalar.is_valid) { - return out.PushChar(0); - } - out.PushChar(1); - } - - const auto& structScalar = arrow::internal::checked_cast(scalar); - + + void SaveChildrenNotNullScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const { for (ui32 i = 0; i < Children.size(); ++i) { Children[i]->SaveScalarItem(*structScalar.value[i], out); } @@ -318,6 +361,65 @@ class TTupleBlockReader final : public IBlockReader { TVector Items; }; +template +class TTzDateBlockReader final : public TTupleBlockReaderBase> { +public: + TBlockItem GetChildrenNotNullItems(const arrow::ArrayData& data, size_t index) { + Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2); + + TBlockItem item {DateReader_.GetItem(*data.child_data[0], index)}; + item.SetTimezoneId(TimezoneReader_.GetItem(*data.child_data[1], index).Get()); + return item; + } + + TBlockItem GetChildrenNotNullScalarItems(const arrow::StructScalar& structScalar) { + Y_DEBUG_ABORT_UNLESS(structScalar.value.size() == 2); + + TBlockItem item {DateReader_.GetScalarItem(*structScalar.value[0])}; + item.SetTimezoneId(TimezoneReader_.GetScalarItem(*structScalar.value[1]).Get()); + return item; + } + + size_t GetChildrenDataWeight(const arrow::ArrayData& data) const { + Y_DEBUG_ABORT_UNLESS(data.child_data.size() == 2); + + size_t size = 0; + size += DateReader_.GetDataWeight(*data.child_data[0]); + size += TimezoneReader_.GetDataWeight(*data.child_data[1]); + return size; + } + + size_t GetDataWeightImpl(const TBlockItem& item) const { + Y_UNUSED(item); + return GetChildrenDefaultDataWeight(); + } + + size_t GetChildrenDefaultDataWeight() const { + ui64 size = 0; + if constexpr (Nullable) { + size = 1; + } + + size += DateReader_.GetDefaultValueWeight(); + size += TimezoneReader_.GetDefaultValueWeight(); + return size; + } + + void SaveChildrenNotNullItems(const arrow::ArrayData& data, size_t index, TOutputBuffer& out) const { + DateReader_.SaveItem(*data.child_data[0], index, out); + TimezoneReader_.SaveItem(*data.child_data[1], index, out); + } + + void SaveChildrenNotNullScalarItems(const arrow::StructScalar& structScalar, TOutputBuffer& out) const { + DateReader_.SaveScalarItem(*structScalar.value[0], out); + TimezoneReader_.SaveScalarItem(*structScalar.value[1], out); + } + +private: + TFixedSizeBlockReader::TLayout, /* Nullable */false> DateReader_; + TFixedSizeBlockReader TimezoneReader_; +}; + class TExternalOptionalBlockReader final : public IBlockReader { public: TExternalOptionalBlockReader(std::unique_ptr&& inner) @@ -390,6 +492,8 @@ struct TReaderTraits { using TExtOptional = TExternalOptionalBlockReader; template using TResource = TResourceBlockReader; + template + using TTzDateReader = TTzDateBlockReader; static std::unique_ptr MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) { Y_UNUSED(pgBuilder); @@ -407,6 +511,15 @@ struct TReaderTraits { return std::make_unique>(); } } + + template + static std::unique_ptr MakeTzDate(bool isOptional) { + if (isOptional) { + return std::make_unique>(); + } else { + return std::make_unique>(); + } + } }; template @@ -534,6 +647,12 @@ std::unique_ptr MakeBlockReaderImpl(const ITypeInfoHe return MakeStringBlockReaderImpl(isOptional); case NUdf::EDataSlot::Json: return MakeStringBlockReaderImpl(isOptional); + case NUdf::EDataSlot::TzDate: + return TTraits::template MakeTzDate(isOptional); + case NUdf::EDataSlot::TzDatetime: + return TTraits::template MakeTzDate(isOptional); + case NUdf::EDataSlot::TzTimestamp: + return TTraits::template MakeTzDate(isOptional); default: Y_ENSURE(false, "Unsupported data slot"); } diff --git a/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp b/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp index 074af4da90f9..a603bb045dd8 100644 --- a/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp +++ b/ydb/library/yql/public/udf/arrow/ut/array_builder_ut.cpp @@ -175,6 +175,30 @@ Y_UNIT_TEST_SUITE(TArrayBuilderTest) { UNIT_ASSERT_VALUES_EQUAL(resource2->GetResourceTag(), ResourceName); } + Y_UNIT_TEST(TestTzDateBuilder_Layout) { + TArrayBuilderTestData data; + const auto tzDateType = data.PgmBuilder.NewDataType(EDataSlot::TzDate); + const auto arrayBuilder = MakeArrayBuilder(NMiniKQL::TTypeInfoHelper(), tzDateType, + *data.ArrowPool, MAX_BLOCK_SIZE, /* pgBuilder */ nullptr); + + auto makeTzDate = [] (ui16 val, ui16 tz) { + TUnboxedValuePod tzDate {val}; + tzDate.SetTimezoneId(tz); + return tzDate; + }; + + TVector dates{makeTzDate(1234, 1), makeTzDate(1234, 2), makeTzDate(45678, 333)}; + for (auto date: dates) { + arrayBuilder->Add(date); + } + + const auto datum = arrayBuilder->Build(true); + UNIT_ASSERT(datum.is_array()); + UNIT_ASSERT_VALUES_EQUAL(datum.length(), dates.size()); + const auto childData = datum.array()->child_data; + UNIT_ASSERT_VALUES_EQUAL_C(childData.size(), 2, "Expected date and timezone children"); + } + Y_UNIT_TEST(TestResourceStringValueBuilderReader) { TArrayBuilderTestData data; const auto resourceType = data.PgmBuilder.NewResourceType(ResourceName);