diff --git a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp index 88073f208b93..1c92c389a6e5 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_reader.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_reader.cpp @@ -184,7 +184,7 @@ struct TConverterTraits { using TTuple = TTupleBlockItemConverter; template using TFixedSize = TFixedSizeBlockItemConverter; - template + template using TStrings = TStringBlockItemConverter; using TExtOptional = TExternalOptionalBlockItemConverter; @@ -193,15 +193,15 @@ struct TConverterTraits { return std::make_unique>(); } else { if (desc.Typelen == -1) { - auto ret = std::make_unique>(); + auto ret = std::make_unique>(); ret->SetPgBuilder(pgBuilder, desc.TypeId, desc.Typelen); return ret; } else if (desc.Typelen == -2) { - auto ret = std::make_unique>(); + auto ret = std::make_unique>(); ret->SetPgBuilder(pgBuilder, desc.TypeId, desc.Typelen); return ret; } else { - auto ret = std::make_unique>(); + auto ret = std::make_unique>(); ret->SetPgBuilder(pgBuilder, desc.TypeId, desc.Typelen); return ret; } diff --git a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp index e7697f0c12db..6cebab459452 100644 --- a/ydb/library/yql/minikql/computation/mkql_block_transport.cpp +++ b/ydb/library/yql/minikql/computation/mkql_block_transport.cpp @@ -500,7 +500,7 @@ struct TSerializerTraits { using TTuple = TTupleBlockSerializer; template using TFixedSize = TFixedSizeBlockSerializer; - template + template using TStrings = TStringBlockSerializer; using TExtOptional = TExtOptionalBlockSerializer; @@ -519,7 +519,7 @@ struct TDeserializerTraits { using TTuple = TTupleBlockDeserializer; template using TFixedSize = TFixedSizeBlockDeserializer; - template + template using TStrings = TStringBlockDeserializer; using TExtOptional = TExtOptionalBlockDeserializer; diff --git a/ydb/library/yql/minikql/mkql_type_builder.cpp b/ydb/library/yql/minikql/mkql_type_builder.cpp index 1aaf4cdb203f..f74999bc42fe 100644 --- a/ydb/library/yql/minikql/mkql_type_builder.cpp +++ b/ydb/library/yql/minikql/mkql_type_builder.cpp @@ -2425,7 +2425,7 @@ struct TComparatorTraits { using TTuple = NUdf::TTupleBlockItemComparator; template using TFixedSize = NUdf::TFixedSizeBlockItemComparator; - template + template using TStrings = NUdf::TStringBlockItemComparator; using TExtOptional = NUdf::TExternalOptionalBlockItemComparator; @@ -2441,7 +2441,7 @@ struct THasherTraits { using TTuple = NUdf::TTupleBlockItemHasher; template using TFixedSize = NUdf::TFixedSizeBlockItemHasher; - template + template using TStrings = NUdf::TStringBlockItemHasher; using TExtOptional = NUdf::TExternalOptionalBlockItemHasher; diff --git a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp index 6a65150f7f87..98f43a555531 100644 --- a/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp +++ b/ydb/library/yql/providers/common/dq/yql_dq_integration_impl.cpp @@ -22,20 +22,6 @@ TMaybe TDqIntegrationBase::EstimateReadSize(ui64, ui32, const TVectorGetItems()) { - // Check type - auto type = e->GetItemType(); - while (ETypeAnnotationKind::Optional == type->GetKind()) { - type = type->Cast()->GetItemType(); - } - if (ETypeAnnotationKind::Data != type->GetKind()) { - return false; - } - } - return true; -} - TExprNode::TPtr TDqIntegrationBase::WrapRead(const TDqSettings&, const TExprNode::TPtr& read, TExprContext&) { return read; } diff --git a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp index d8ae81155dbd..994de78b22b5 100644 --- a/ydb/library/yql/providers/dq/opt/dqs_opt.cpp +++ b/ydb/library/yql/providers/dq/opt/dqs_opt.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include @@ -92,12 +93,13 @@ namespace NYql::NDqs { } YQL_CLOG(INFO, ProviderDq) << "DqsRewritePhyBlockReadOnDqIntegration"; - return Build(ctx, node->Pos()) - .Input(Build(ctx, node->Pos()) - .Input(readWideWrap.Input()) - .Flags(readWideWrap.Flags()) - .Token(readWideWrap.Token()) + .Input(Build(ctx, node->Pos()) + .Input(Build(ctx, node->Pos()) + .Input(readWideWrap.Input()) + .Flags(readWideWrap.Flags()) + .Token(readWideWrap.Token()) + .Done()) .Done()) .Done().Ptr(); }, ctx, optSettings); diff --git a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp index b7e18d36a3b0..841e20f13335 100644 --- a/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp +++ b/ydb/library/yql/providers/dq/provider/yql_dq_datasource_type_ann.cpp @@ -155,6 +155,8 @@ class TDqsDataSourceTypeAnnotationTransformer : public TVisitorTransformerBase { } types.push_back(ctx.MakeType(ctx.MakeType(EDataSlot::Uint64))); + input->SetTypeAnn(ctx.MakeType(ctx.MakeType(types))); + return TStatus::Ok; } input->SetTypeAnn(ctx.MakeType(ctx.MakeType(types))); diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp new file mode 100644 index 000000000000..98961195376a --- /dev/null +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.cpp @@ -0,0 +1,622 @@ +#include "arrow_converter.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include +#include + +namespace NYql::NDqs { + +template +struct TypeHelper { + using Type = T; +}; + +#define GEN_TYPE(type)\ + NumericConverterImpl + +#define GEN_TYPE_STR(type)\ + StringConverterImpl + +template +arrow::Datum NumericConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr block) { + arrow::DictionaryArray dict(block); + typename ::arrow::TypeTraits::ArrayType val(dict.dictionary()->data()); + auto data = dict.indices()->data()->GetValues(1); + if (dict.null_count()) { + for (i64 i = 0; i < block->length; ++i) { + if (dict.IsNull(i)) { + builder->Add(NUdf::TBlockItem{}); + } else { + if constexpr (std::is_same_v) { + builder->Add(NUdf::TBlockItem((ui8)val.Value(data[i]))); + } else { + builder->Add(NUdf::TBlockItem(val.Value(data[i]))); + } + } + } + } else { + for (i64 i = 0; i < block->length; ++i) { + if constexpr (std::is_same_v) { + builder->Add(NUdf::TBlockItem((ui8)val.Value(data[i]))); + } else { + builder->Add(NUdf::TBlockItem(val.Value(data[i]))); + } + } + } + return builder->Build(false); +} + +template +arrow::Datum StringConverterImpl(NUdf::IArrayBuilder* builder, std::shared_ptr block) { + arrow::DictionaryArray dict(block); + typename ::arrow::TypeTraits::ArrayType val(dict.dictionary()->data()); + auto data = dict.indices()->data()->GetValues(1); + if (dict.null_count()) { + for (i64 i = 0; i < block->length; ++i) { + if (dict.IsNull(i)) { + builder->Add(NUdf::TBlockItem{}); + } else { + i32 len; + auto ptr = reinterpret_cast(val.GetValue(data[i], &len)); + builder->Add(NUdf::TBlockItem(std::string_view(ptr, len))); + } + } + } else { + for (i64 i = 0; i < block->length; ++i) { + i32 len; + auto ptr = reinterpret_cast(val.GetValue(data[i], &len)); + builder->Add(NUdf::TBlockItem(std::string_view(ptr, len))); + } + } + return builder->Build(false); +} + +using namespace NKikimr::NMiniKQL; +using namespace NYson::NDetail; + +class TYsonReaderDetails { +public: + TYsonReaderDetails(const std::string_view& s) : Data_(s.data()), Available_(s.size()) {} + + constexpr char Next() { + YQL_ENSURE(Available_-- > 0); + return *(++Data_); + } + + constexpr char Current() { + return *Data_; + } + + template + constexpr T ReadVarSlow() { + T shift = 0; + T value = Current() & 0x7f; + for (;;) { + shift += 7; + value |= T(Next() & 0x7f) << shift; + if (!(Current() & 0x80)) { + break; + } + } + Next(); + return value; + } + + ui32 ReadVarUI32() { + char prev = Current(); + if (Y_LIKELY(!(prev & 0x80))) { + Next(); + return prev; + } + + return ReadVarSlow(); + } + + ui64 ReadVarUI64() { + char prev = Current(); + if (Y_LIKELY(!(prev & 0x80))) { + Next(); + return prev; + } + + return ReadVarSlow(); + } + + i32 ReadVarI32() { + return NYson::ZigZagDecode32(ReadVarUI32()); + } + + i64 ReadVarI64() { + return NYson::ZigZagDecode64(ReadVarUI64()); + } + + double NextDouble() { + double val = *reinterpret_cast(Data_); + Data_ += sizeof(double); + return val; + } + + void Skip(i32 cnt) { + Data_ += cnt; + } + + const char* Data() { + return Data_; + } +private: + const char* Data_; + size_t Available_; +}; + +namespace { +void SkipYson(TYsonReaderDetails& buf) { + switch (buf.Current()) { + case BeginListSymbol: { + buf.Next(); + for (;;) { + SkipYson(buf); + if (buf.Current() == ListItemSeparatorSymbol) { + buf.Next(); + } + if (buf.Current() == EndListSymbol) { + break; + } + } + buf.Next(); + break; + } + case BeginAttributesSymbol: + case BeginMapSymbol: { + auto originalEnd = buf.Current() == BeginMapSymbol ? EndMapSymbol : EndAttributesSymbol; + buf.Next(); + for (;;) { + SkipYson(buf); + YQL_ENSURE(buf.Current() == KeyValueSeparatorSymbol); + buf.Next(); + SkipYson(buf); + if (buf.Current() == KeyedItemSeparatorSymbol) { + buf.Next(); + } + if (buf.Current() == originalEnd) { + break; + } + } + buf.Next(); + break; + } + case StringMarker: + buf.Next(); + buf.Skip(buf.ReadVarI32()); + break; + case Uint64Marker: + case Int64Marker: + buf.Next(); + Y_UNUSED(buf.ReadVarI64()); + break; + case TrueMarker: + case FalseMarker: + buf.Next(); + break; + case DoubleMarker: + buf.Next(); + Y_UNUSED(buf.NextDouble()); + break; + default: + YQL_ENSURE(false, "Unexpected char: " + std::string{buf.Current()}); + } +} + +NUdf::TBlockItem ReadYson(TYsonReaderDetails& buf) { + const char* beg = buf.Data(); + SkipYson(buf); + return NUdf::TBlockItem(std::string_view(beg, buf.Data() - beg)); +} +}; + +class IYsonBlockReader { +public: + virtual NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) = 0; + virtual ~IYsonBlockReader() = default; +}; + +template +class IYsonBlockReaderWithNativeFlag : public IYsonBlockReader { +public: + virtual NUdf::TBlockItem GetNotNull(TYsonReaderDetails&) = 0; + NUdf::TBlockItem GetNullableItem(TYsonReaderDetails& buf) { + char prev = buf.Current(); + if constexpr (Native) { + if (prev == EntitySymbol) { + buf.Next(); + return NUdf::TBlockItem(); + } + return GetNotNull(buf).MakeOptional(); + } + buf.Next(); + if (prev == EntitySymbol) { + return NUdf::TBlockItem(); + } + YQL_ENSURE(prev == BeginListSymbol); + auto result = GetNotNull(buf); + if (buf.Current() == ListItemSeparatorSymbol) { + buf.Next(); + } + YQL_ENSURE(buf.Current() == EndListSymbol); + buf.Next(); + return result.MakeOptional(); + } +private: +}; + +template +class TYsonTupleBlockReader final : public IYsonBlockReaderWithNativeFlag { +public: + TYsonTupleBlockReader(TVector>&& children) + : Children_(std::move(children)) + , Items_(Children_.size()) + {} + + NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final { + if constexpr (Nullable) { + return this->GetNullableItem(buf); + } + return GetNotNull(buf); + } + NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final { + YQL_ENSURE(buf.Current() == BeginListSymbol); + buf.Next(); + for (ui32 i = 0; i < Children_.size(); ++i) { + Items_[i] = Children_[i]->GetItem(buf); + if (buf.Current() == ListItemSeparatorSymbol) { + buf.Next(); + } + } + YQL_ENSURE(buf.Current() == EndListSymbol); + buf.Next(); + return NUdf::TBlockItem(Items_.data()); + } +private: + const TVector> Children_; + TVector Items_; +}; + +template +class TYsonStringBlockReader final : public IYsonBlockReaderWithNativeFlag { +public: + NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final { + if constexpr (Nullable) { + return this->GetNullableItem(buf); + } + return GetNotNull(buf); + } + + NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final { + if constexpr (NUdf::EDataSlot::Yson != OriginalT) { + YQL_ENSURE(buf.Current() == StringMarker); + buf.Next(); + const i32 length = buf.ReadVarI32(); + auto res = NUdf::TBlockItem(NUdf::TStringRef(buf.Data(), length)); + buf.Skip(length); + return res; + } else { + return ReadYson(buf); + } + } +private: + const TVector> Children_; + TVector Items_; +}; + +namespace { +struct TYtColumnConverterSettings { + TYtColumnConverterSettings(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative); + NKikimr::NMiniKQL::TType* Type; + const NUdf::IPgBuilder* PgBuilder; + arrow::MemoryPool& Pool; + const bool IsNative; + const bool IsTopOptional; + std::shared_ptr ArrowType; + std::unique_ptr Builder; +}; +} + +template +class TYsonFixedSizeBlockReader final : public IYsonBlockReaderWithNativeFlag { +public: + NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) override final { + if constexpr (Nullable) { + return this->GetNullableItem(buf); + } + return GetNotNull(buf); + } + + NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final { + if constexpr (std::is_same_v) { + YQL_ENSURE(buf.Current() == FalseMarker || buf.Current() == TrueMarker); + bool res = buf.Current() == TrueMarker; + buf.Next(); + return NUdf::TBlockItem(res); + } + + if constexpr (std::is_same_v) { + if (buf.Current() == FalseMarker || buf.Current() == TrueMarker) { + bool res = buf.Current() == TrueMarker; + buf.Next(); + return NUdf::TBlockItem(T(res)); + } + } + + if constexpr (std::is_integral_v) { + if constexpr (std::is_signed_v) { + YQL_ENSURE(buf.Current() == Int64Marker); + buf.Next(); + return NUdf::TBlockItem(T(buf.ReadVarI64())); + } else { + YQL_ENSURE(buf.Current() == Uint64Marker); + buf.Next(); + return NUdf::TBlockItem(T(buf.ReadVarUI64())); + } + } + + YQL_ENSURE(buf.Current() == DoubleMarker); + buf.Next(); + return NUdf::TBlockItem(T(buf.NextDouble())); + } +private: + const TVector> Children_; + TVector Items_; +}; + +template +class TYsonExternalOptBlockReader final : public IYsonBlockReaderWithNativeFlag { +public: + TYsonExternalOptBlockReader(std::unique_ptr&& inner) + : Inner_(std::move(inner)) + {} + + NUdf::TBlockItem GetItem(TYsonReaderDetails& buf) final { + char prev = buf.Current(); + buf.Next(); + if (prev == EntitySymbol) { + return NUdf::TBlockItem(); + } + YQL_ENSURE(prev == BeginListSymbol); + if constexpr (!Native) { + if (buf.Current() == EndListSymbol) { + buf.Next(); + return NUdf::TBlockItem(); + } + } + auto result = Inner_->GetItem(buf); + if (buf.Current() == ListItemSeparatorSymbol) { + buf.Next(); + } + YQL_ENSURE(buf.Current() == EndListSymbol); + buf.Next(); + return result.MakeOptional(); + } + + NUdf::TBlockItem GetNotNull(TYsonReaderDetails& buf) override final { + YQL_ENSURE(false, "Can't be called"); + } +private: + std::unique_ptr Inner_; +}; + +template +struct TYsonBlockReaderTraits { + using TResult = IYsonBlockReader; + template + using TTuple = TYsonTupleBlockReader; + template + using TFixedSize = TYsonFixedSizeBlockReader; + template + using TStrings = TYsonStringBlockReader; + using TExtOptional = TYsonExternalOptBlockReader; + + static std::unique_ptr MakePg(const NUdf::TPgTypeDescription& desc, const NUdf::IPgBuilder* pgBuilder) { + Y_UNUSED(pgBuilder); + if (desc.PassByValue) { + return std::make_unique>(); + } else { + return std::make_unique>(); + } + } + +}; + +template +class TPrimitiveColumnConverter { +public: + TPrimitiveColumnConverter(TYtColumnConverterSettings& settings) : Settings_(settings) { + if constexpr (IsDictionary) { + switch (Settings_.ArrowType->id()) { + case arrow::Type::BOOL: PrimitiveConverterImpl_ = GEN_TYPE(Boolean); break; + case arrow::Type::INT8: PrimitiveConverterImpl_ = GEN_TYPE(Int8); break; + case arrow::Type::UINT8: PrimitiveConverterImpl_ = GEN_TYPE(UInt8); break; + case arrow::Type::INT16: PrimitiveConverterImpl_ = GEN_TYPE(Int16); break; + case arrow::Type::UINT16: PrimitiveConverterImpl_ = GEN_TYPE(UInt16); break; + case arrow::Type::INT32: PrimitiveConverterImpl_ = GEN_TYPE(Int32); break; + case arrow::Type::UINT32: PrimitiveConverterImpl_ = GEN_TYPE(UInt32); break; + case arrow::Type::INT64: PrimitiveConverterImpl_ = GEN_TYPE(Int64); break; + case arrow::Type::UINT64: PrimitiveConverterImpl_ = GEN_TYPE(UInt64); break; + case arrow::Type::DOUBLE: PrimitiveConverterImpl_ = GEN_TYPE(Double); break; + case arrow::Type::FLOAT: PrimitiveConverterImpl_ = GEN_TYPE(Float); break; + case arrow::Type::STRING: PrimitiveConverterImpl_ = GEN_TYPE_STR(String); break; + case arrow::Type::BINARY: PrimitiveConverterImpl_ = GEN_TYPE_STR(Binary); break; + default: + return; // will check in runtime + }; + } + } + arrow::Datum Convert(std::shared_ptr block) { + if constexpr (IsDictionary) { + return PrimitiveConverterImpl_(Settings_.Builder.get(), block); + } + return block; + } +private: + TYtColumnConverterSettings& Settings_; + arrow::Datum (*PrimitiveConverterImpl_)(NUdf::IArrayBuilder*, std::shared_ptr); +}; + +template +class TYtYsonColumnConverter { +public: + TYtYsonColumnConverter(TYtColumnConverterSettings& settings) : Settings_(settings) { + Reader_ = NUdf::MakeBlockReaderImpl>(TTypeInfoHelper(), settings.Type, settings.PgBuilder); + } + + arrow::Datum Convert(std::shared_ptr block) { + if constexpr(!IsDictionary) { + arrow::BinaryArray binary(block); + if (block->GetNullCount()) { + for (i64 i = 0; i < block->length; ++i) { + if (binary.IsNull(i)) { + Settings_.Builder->Add(NUdf::TBlockItem{}); + } else { + i32 len; + auto ptr = reinterpret_cast(binary.GetValue(i, &len)); + TYsonReaderDetails inp(std::string_view(ptr, len)); + auto res = Reader_->GetItem(inp); + if constexpr (!Native && IsTopOptional) { + res = res.MakeOptional(); + } + Settings_.Builder->Add(std::move(res)); + } + } + } else { + for (i64 i = 0; i < block->length; ++i) { + i32 len; + auto ptr = reinterpret_cast(binary.GetValue(i, &len)); + TYsonReaderDetails inp(std::string_view(ptr, len)); + auto res = Reader_->GetItem(inp); + if constexpr (!Native && IsTopOptional) { + res = res.MakeOptional(); + } + Settings_.Builder->Add(std::move(res)); + } + } + return Settings_.Builder->Build(false); + } + arrow::DictionaryArray dict(block); + arrow::BinaryArray binary(block->dictionary); + auto data = dict.indices()->data()->GetValues(1); + if (dict.null_count()) { + for (i64 i = 0; i < block->length; ++i) { + if (dict.IsNull(i)) { + Settings_.Builder->Add(NUdf::TBlockItem{}); + } else { + i32 len; + auto ptr = reinterpret_cast(binary.GetValue(data[i], &len)); + TYsonReaderDetails inp(std::string_view(ptr, len)); + auto res = Reader_->GetItem(inp); + if constexpr (!Native && IsTopOptional) { + res = res.MakeOptional(); + } + Settings_.Builder->Add(std::move(res)); + } + } + } else { + for (i64 i = 0; i < block->length; ++i) { + i32 len; + auto ptr = reinterpret_cast(binary.GetValue(data[i], &len)); + TYsonReaderDetails inp(std::string_view(ptr, len)); + auto res = Reader_->GetItem(inp); + if constexpr (!Native && IsTopOptional) { + res = res.MakeOptional(); + } + Settings_.Builder->Add(std::move(res)); + } + } + return Settings_.Builder->Build(false); + } + +private: + std::shared_ptr::TResult> Reader_; + TYtColumnConverterSettings& Settings_; +}; + +template +class TYtColumnConverter final : public IYtColumnConverter { +public: + TYtColumnConverter(TYtColumnConverterSettings&& settings) + : Settings_(std::move(settings)) + , DictYsonConverter_(Settings_) + , YsonConverter_(Settings_) + , DictPrimitiveConverter_(Settings_) {} + + arrow::Datum Convert(std::shared_ptr block) override { + if (arrow::Type::DICTIONARY == block->type->id()) { + if (static_cast(*block->type).value_type()->Equals(Settings_.ArrowType)) { + return DictPrimitiveConverter_.Convert(block); + } else { + return DictYsonConverter_.Convert(block); + } + } else { + if (block->type->Equals(Settings_.ArrowType)) { + return block; + } else { + YQL_ENSURE(arrow::Type::BINARY == block->type->id()); + return YsonConverter_.Convert(block); + } + } + } +private: + TYtColumnConverterSettings Settings_; + TYtYsonColumnConverter DictYsonConverter_; + TYtYsonColumnConverter YsonConverter_; + TPrimitiveColumnConverter DictPrimitiveConverter_; +}; + +TYtColumnConverterSettings::TYtColumnConverterSettings(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative) + : Type(type), PgBuilder(pgBuilder), Pool(pool), IsNative(isNative), IsTopOptional(!isNative && type->IsOptional()) +{ + if (!isNative) { + if (Type->IsOptional()) { + Type = static_cast(Type)->GetItemType(); + } + } + YQL_ENSURE(ConvertArrowType(type, ArrowType), "Can't convert type to arrow"); + size_t maxBlockItemSize = CalcMaxBlockItemSize(type); + size_t maxBlockLen = CalcBlockLen(maxBlockItemSize); + Builder = std::move(NUdf::MakeArrayBuilder( + TTypeInfoHelper(), type, + pool, + maxBlockLen, + pgBuilder + )); +} + +template typename T, typename Args, bool... Acc> +struct TBoolDispatcher { + + std::unique_ptr Dispatch(Args&& args) const { + return std::make_unique>(std::forward(args)); + } + + template + auto Dispatch(Args&& args, bool head, Bools... tail) const { + return head ? + TBoolDispatcher().Dispatch(std::forward(args), tail...) : + TBoolDispatcher().Dispatch(std::forward(args), tail...); + } +}; + +std::unique_ptr MakeYtColumnConverter(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative) { + TYtColumnConverterSettings settings(type, pgBuilder, pool, isNative); + bool isTopOptional = settings.IsTopOptional; + return TBoolDispatcher().Dispatch(std::move(settings), isNative, isTopOptional); +} +} diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.h b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.h new file mode 100644 index 000000000000..aed8b5f311df --- /dev/null +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/arrow_converter.h @@ -0,0 +1,22 @@ +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NYql::NDqs { + +class IYtColumnConverter { +public: + virtual arrow::Datum Convert(std::shared_ptr block) = 0; + virtual ~IYtColumnConverter() = default; +}; + +std::unique_ptr MakeYtColumnConverter(NKikimr::NMiniKQL::TType* type, const NUdf::IPgBuilder* pgBuilder, arrow::MemoryPool& pool, bool isNative); +} diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp index 923deacc08c6..777e153e25c5 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_block_reader.cpp @@ -1,6 +1,7 @@ #include "dq_yt_block_reader.h" #include "stream_decoder.h" #include "dq_yt_rpc_helpers.h" +#include "arrow_converter.h" #include @@ -8,14 +9,15 @@ #include #include #include +#include #include +#include #include #include -#include -#include -#include +#include #include +#include #include #include #include @@ -30,6 +32,7 @@ #include #include #include +#include #include #include #include @@ -45,13 +48,16 @@ namespace NYql::NDqs { using namespace NKikimr::NMiniKQL; +TStatKey FallbackCount("YtBlockReader_Fallbacks", true); +TStatKey BlockCount("YtBlockReader_Blocks", true); + namespace { struct TResultBatch { using TPtr = std::shared_ptr; size_t RowsCnt; std::vector Columns; + TResultBatch(int64_t cnt) : RowsCnt(cnt) {} TResultBatch(int64_t cnt, decltype(Columns)&& columns) : RowsCnt(cnt), Columns(std::move(columns)) {} - TResultBatch(std::shared_ptr batch) : RowsCnt(batch->num_rows()), Columns(batch->columns().begin(), batch->columns().end()) {} }; template @@ -59,27 +65,35 @@ class TBlockingQueueWithLimit { struct Poison { TString Error; }; - using TPoisonOr = std::variant; + struct TFallbackNotify {}; + using TPoisonOr = std::variant; public: TBlockingQueueWithLimit(size_t limit) : Limit_(limit) {} - void Push(T&& val) { - PushInternal(std::move(val)); + void Push(T&& val, bool imm) { + PushInternal(std::move(val), imm); } void PushPoison(const TString& err) { - PushInternal(Poison{err}); + PushInternal(Poison{err}, true); + } + + void PushNotify() { + PushInternal(TFallbackNotify{}, true); } - T Get() { + TMaybe Get() { auto res = GetInternal(); if (std::holds_alternative(res)) { throw std::runtime_error(std::get(res).Error); } + if (std::holds_alternative(res)) { + return {}; + } return std::move(std::get(res)); } private: template - void PushInternal(X&& val) { + void PushInternal(X&& val, bool imm) { NYT::TPromise promise; { std::lock_guard _(Mtx_); @@ -88,7 +102,7 @@ class TBlockingQueueWithLimit { Awaiting_.pop(); return; } - if (Ready_.size() >= Limit_) { + if (!imm && Ready_.size() >= Limit_) { promise = NYT::NewPromise(); BlockedPushes_.push(promise); } else { @@ -135,7 +149,9 @@ class TListener { public: using TPromise = NYT::TPromise; using TPtr = std::shared_ptr; - TListener(size_t initLatch, size_t inflight) : Latch_(initLatch), Queue_(inflight) {} + TListener(size_t initLatch, size_t inflight) + : Latch_(initLatch) + , Queue_(inflight) {} void OnEOF() { bool excepted = 0; @@ -149,19 +165,11 @@ class TListener { } // Handles result - void HandleResult(TResultBatch::TPtr&& res) { - Queue_.Push(std::move(res)); - } - - void OnRecordBatchDecoded(TBatchPtr record_batch) { - YQL_ENSURE(record_batch); - // decode dictionary - record_batch = NKikimr::NArrow::DictionaryToArray(record_batch); - // and handle result - HandleResult(std::make_shared(record_batch)); + void HandleResult(TResultBatch::TPtr&& res, bool immediatly = false) { + Queue_.Push(std::move(res), immediatly); } - TResultBatch::TPtr Get() { + TMaybe Get() { return Queue_.Get(); } @@ -170,7 +178,11 @@ class TListener { } void HandleFallback(TResultBatch::TPtr&& block) { - HandleResult(std::move(block)); + HandleResult(std::move(block), true); + } + + void NotifyFallback() { + Queue_.PushNotify(); } void InputDone() { @@ -267,8 +279,16 @@ class TBlockBuilder { class TLocalListener : public arrow::ipc::Listener { public: - TLocalListener(std::shared_ptr consumer) - : Consumer_(consumer) {} + TLocalListener(std::shared_ptr consumer, std::shared_ptr> columnTypes, std::shared_ptr>> arrowTypes, arrow::MemoryPool& pool, const NUdf::IPgBuilder* pgBuilder, bool isNative, NKikimr::NMiniKQL::IStatsRegistry* jobStats) + : Consumer_(consumer) + , ColumnTypes_(columnTypes) + , JobStats_(jobStats) + { + ColumnConverters_.reserve(columnTypes->size()); + for (size_t i = 0; i < columnTypes->size(); ++i) { + ColumnConverters_.emplace_back(MakeYtColumnConverter(columnTypes->at(i), pgBuilder, pool, isNative)); + } + } void Init(std::shared_ptr self) { Self_ = self; @@ -281,7 +301,14 @@ class TLocalListener : public arrow::ipc::Listener { } arrow::Status OnRecordBatchDecoded(std::shared_ptr batch) override { - Consumer_->OnRecordBatchDecoded(batch); + YQL_ENSURE(batch); + MKQL_ADD_STAT(JobStats_, BlockCount, 1); + std::vector result; + result.reserve(ColumnConverters_.size()); + for (size_t i = 0; i < ColumnConverters_.size(); ++i) { + result.emplace_back(ColumnConverters_[i]->Convert(batch->column(i)->data())); + } + Consumer_->HandleResult(std::make_shared(batch->num_rows(), std::move(result))); return arrow::Status::OK(); } @@ -296,17 +323,21 @@ class TLocalListener : public arrow::ipc::Listener { std::shared_ptr Self_; std::shared_ptr Consumer_; std::shared_ptr Decoder_; + std::shared_ptr> ColumnTypes_; + NKikimr::NMiniKQL::IStatsRegistry* JobStats_; + std::vector> ColumnConverters_; }; class TSource : public TNonCopyable { public: using TPtr = std::shared_ptr; TSource(std::unique_ptr&& settings, - size_t inflight, TType* type, const THolderFactory& holderFactory) - : Settings_(std::move(settings)) + size_t inflight, TType* type, std::shared_ptr>> types, const THolderFactory& holderFactory, NKikimr::NMiniKQL::IStatsRegistry* jobStats) + : HolderFactory(holderFactory) + , Settings_(std::move(settings)) , Inputs_(std::move(Settings_->RawInputs)) , Listener_(std::make_shared(Inputs_.size(), inflight)) - , HolderFactory_(holderFactory) + , JobStats_(jobStats) { auto structType = AS_TYPE(TStructType, type); std::vector columnTypes_(structType->GetMembersCount()); @@ -319,11 +350,13 @@ class TSource : public TNonCopyable { LocalListeners_.reserve(Inputs_.size()); for (size_t i = 0; i < Inputs_.size(); ++i) { InputsQueue_.emplace(i); - LocalListeners_.emplace_back(std::make_shared(Listener_)); + auto& decoder = Settings_->Specs->Inputs[Settings_->OriginalIndexes[i]]; + bool native = decoder->NativeYtTypeFlags && !decoder->FieldsVec[i].ExplicitYson; + LocalListeners_.emplace_back(std::make_shared(Listener_, ptr, types, *Settings_->Pool, Settings_->PgBuilder, native, jobStats)); LocalListeners_.back()->Init(LocalListeners_.back()); } BlockBuilder_.Init(ptr, *Settings_->Pool, Settings_->PgBuilder); - FallbackReader_.SetSpecs(*Settings_->Specs, HolderFactory_); + FallbackReader_.SetSpecs(*Settings_->Specs, HolderFactory); } void RunRead() { @@ -336,8 +369,9 @@ class TSource : public TNonCopyable { inputIdx = InputsQueue_.front(); InputsQueue_.pop(); } + Inputs_[inputIdx]->Read().SubscribeUnique(BIND([inputIdx = inputIdx, self = Self_](NYT::TErrorOr&& res) { - self->Pool_->GetInvoker()->Invoke(BIND([inputIdx, self, res = std::move(res)] () mutable { + self->Pool_->GetInvoker()->Invoke(BIND([inputIdx, self, res = std::move(res)]() mutable { try { self->Accept(inputIdx, std::move(res)); self->RunRead(); @@ -365,23 +399,13 @@ class TSource : public TNonCopyable { NYT::NApi::NRpcProxy::NProto::TRowsetStatistics statistics; NYT::TSharedRef currentPayload = NYT::NApi::NRpcProxy::DeserializeRowStreamBlockEnvelope(res.Value(), &descriptor, &statistics); if (descriptor.rowset_format() != NYT::NApi::NRpcProxy::NProto::RF_ARROW) { - auto promise = NYT::NewPromise>(); - MainInvoker_->Invoke(BIND([inputIdx, currentPayload, self = Self_, promise] { - try { - promise.Set(self->FallbackHandler(inputIdx, currentPayload)); - } catch (std::exception& e) { - promise.Set(NYT::TError(e.what())); - } - })); - auto result = NYT::NConcurrency::WaitFor(promise.ToFuture()); - if (!result.IsOK()) { - Listener_->HandleError(result.GetMessage()); - return; - } - for (auto& e: result.Value()) { - Listener_->HandleFallback(std::move(e)); + if (currentPayload.Size()) { + std::lock_guard _(FallbackMtx_); + Fallbacks_.push({inputIdx, currentPayload}); + } else { + InputDone(inputIdx); } - InputDone(inputIdx); + Listener_->NotifyFallback(); return; } @@ -406,18 +430,38 @@ class TSource : public TNonCopyable { } TResultBatch::TPtr Next() { - auto result = Listener_->Get(); - return result; + for(;;) { + size_t inputIdx = 0; + NYT::TSharedRef payload; + { + std::lock_guard _(FallbackMtx_); + if (Fallbacks_.size()) { + inputIdx = Fallbacks_.front().first; + payload = Fallbacks_.front().second; + Fallbacks_.pop(); + } + } + if (payload) { + for (auto &e: FallbackHandler(inputIdx, payload)) { + Listener_->HandleFallback(std::move(e)); + } + InputDone(inputIdx); + RunRead(); + } + auto result = Listener_->Get(); + if (!result) { // Falled back + continue; + } + return *result; + } } std::vector FallbackHandler(size_t idx, NYT::TSharedRef payload) { if (!payload.Size()) { return {}; } - // We're have only one mkql reader, protect it if 2 fallbacks happen at the same time - std::lock_guard _(FallbackMtx_); auto currentReader_ = std::make_shared(std::move(payload)); - + MKQL_ADD_STAT(JobStats_, FallbackCount, 1); // TODO(): save and recover row indexes FallbackReader_.SetReader(*currentReader_, 1, 4_MB, ui32(Settings_->OriginalIndexes[idx]), true); // If we don't save the reader, after exiting FallbackHandler it will be destroyed, @@ -427,7 +471,7 @@ class TSource : public TNonCopyable { while (FallbackReader_.IsValid()) { auto currentRow = std::move(FallbackReader_.GetRow()); if (!Settings_->Specs->InputGroups.empty()) { - currentRow = std::move(HolderFactory_.CreateVariantHolder(currentRow.Release(), Settings_->Specs->InputGroups.at(Settings_->OriginalIndexes[idx]))); + currentRow = std::move(HolderFactory.CreateVariantHolder(currentRow.Release(), Settings_->Specs->InputGroups.at(Settings_->OriginalIndexes[idx]))); } BlockBuilder_.Add(currentRow); FallbackReader_.Next(); @@ -445,20 +489,21 @@ class TSource : public TNonCopyable { } void SetSelfAndRun(TPtr self) { - MainInvoker_ = NYT::GetCurrentInvoker(); Self_ = self; - Pool_ = NYT::NConcurrency::CreateThreadPool(Inflight_, "rpc_reader_inflight"); + Pool_ = NYT::NConcurrency::CreateThreadPool(Inflight_, "block_reader"); // Run Inflight_ reads at the same time for (size_t i = 0; i < Inflight_; ++i) { RunRead(); } } + const THolderFactory& HolderFactory; private: - NYT::IInvoker* MainInvoker_; NYT::NConcurrency::IThreadPoolPtr Pool_; std::mutex Mtx_; std::mutex FallbackMtx_; + std::queue> Fallbacks_; + std::queue FallbackBlocks_; std::unique_ptr Settings_; std::vector> LocalListeners_; std::vector Inputs_; @@ -469,108 +514,106 @@ class TSource : public TNonCopyable { TListener::TPtr Listener_; TPtr Self_; size_t Inflight_; - const THolderFactory& HolderFactory_; + NKikimr::NMiniKQL::IStatsRegistry* JobStats_; }; -class TState: public TComputationValue { - using TBase = TComputationValue; +class TReaderState: public TComputationValue { + using TBase = TComputationValue; public: - TState(TMemoryUsageInfo* memInfo, TSource::TPtr source, size_t width, TType* type) + TReaderState(TMemoryUsageInfo* memInfo, TSource::TPtr source, size_t width, std::shared_ptr>> arrowTypes) : TBase(memInfo) , Source_(std::move(source)) , Width_(width) - , Type_(type) - , Types_(width) + , Types_(arrowTypes) + , Result_(width) { - for (size_t i = 0; i < Width_; ++i) { - Types_[i] = NArrow::GetArrowType(AS_TYPE(TStructType, Type_)->GetMemberType(i)); - } } - - EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { + + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { + if (GotFinish_) { + return NUdf::EFetchStatus::Finish; + } + YQL_ENSURE(width == Width_ + 1); auto batch = Source_->Next(); if (!batch) { + GotFinish_ = 1; Source_->Finish(); - return EFetchResult::Finish; + return NUdf::EFetchStatus::Finish; } + for (size_t i = 0; i < Width_; ++i) { - if (!output[i]) { - continue; - } - if(!batch->Columns[i].type()->Equals(Types_[i])) { - *(output[i]) = ctx.HolderFactory.CreateArrowBlock(ARROW_RESULT(arrow::compute::Cast(batch->Columns[i], Types_[i]))); - continue; - } - *(output[i]) = ctx.HolderFactory.CreateArrowBlock(std::move(batch->Columns[i])); - } - if (output[Width_]) { - *(output[Width_]) = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(ui64(batch->RowsCnt))); + YQL_ENSURE(batch->Columns[i].type()->Equals(Types_->at(i))); + output[i] = Source_->HolderFactory.CreateArrowBlock(std::move(batch->Columns[i])); } - return EFetchResult::One; + output[Width_] = Source_->HolderFactory.CreateArrowBlock(arrow::Datum(ui64(batch->RowsCnt))); + return NUdf::EFetchStatus::Ok; } private: TSource::TPtr Source_; const size_t Width_; - TType* Type_; - std::vector> Types_; + std::shared_ptr>> Types_; + std::vector Result_; + bool GotFinish_ = 0; }; }; -class TDqYtReadBlockWrapper : public TStatefulWideFlowComputationNode { -using TBaseComputation = TStatefulWideFlowComputationNode; +class TDqYtReadBlockWrapper : public TMutableComputationNode { +using TBaseComputation = TMutableComputationNode; public: - + TDqYtReadBlockWrapper(const TComputationNodeFactoryContext& ctx, const TString& clusterName, const TString& token, const NYT::TNode& inputSpec, const NYT::TNode& samplingSpec, const TVector& inputGroups, TType* itemType, const TVector& tableNames, TVector>&& tables, NKikimr::NMiniKQL::IStatsRegistry* jobStats, size_t inflight, - size_t timeout) : TBaseComputation(ctx.Mutables, this, EValueRepresentation::Boxed) - , Width(AS_TYPE(TStructType, itemType)->GetMembersCount()) - , CodecCtx(ctx.Env, ctx.FunctionRegistry, &ctx.HolderFactory) - , ClusterName(clusterName) - , Token(token) - , SamplingSpec(samplingSpec) - , Tables(std::move(tables)) - , Inflight(inflight) - , Timeout(timeout) - , Type(itemType) + size_t timeout) : TBaseComputation(ctx.Mutables, EValueRepresentation::Boxed) + , Width_(AS_TYPE(TStructType, itemType)->GetMembersCount()) + , CodecCtx_(ctx.Env, ctx.FunctionRegistry, &ctx.HolderFactory) + , ClusterName_(clusterName) + , Token_(token) + , SamplingSpec_(samplingSpec) + , Tables_(std::move(tables)) + , Inflight_(inflight) + , Timeout_(timeout) + , Type_(itemType) + , JobStats_(jobStats) { - // TODO() Enable range indexes - Specs.SetUseSkiff("", TMkqlIOSpecs::ESystemField::RowIndex); - Specs.Init(CodecCtx, inputSpec, inputGroups, tableNames, itemType, {}, {}, jobStats); + // TODO() Enable range indexes + row indexes + Specs_.SetUseSkiff("", 0); + Specs_.Init(CodecCtx_, inputSpec, inputGroups, tableNames, itemType, {}, {}, jobStats); } void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { - auto settings = CreateInputStreams(true, Token, ClusterName, Timeout, Inflight > 1, Tables, SamplingSpec); - settings->Specs = &Specs; - settings->Pool = arrow::default_memory_pool(); - settings->PgBuilder = &ctx.Builder->GetPgBuilder(); - auto source = std::make_shared(std::move(settings), Inflight, Type, ctx.HolderFactory); - source->SetSelfAndRun(source); - state = ctx.HolderFactory.Create(source, Width, Type); } - EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { - if (!state.HasValue()) { - MakeState(ctx, state); + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + auto settings = CreateInputStreams(true, Token_, ClusterName_, Timeout_, Inflight_ > 1, Tables_, SamplingSpec_); + settings->Specs = &Specs_; + settings->Pool = arrow::default_memory_pool(); + settings->PgBuilder = &ctx.Builder->GetPgBuilder(); + auto types = std::make_shared>>(Width_); + for (size_t i = 0; i < Width_; ++i) { + YQL_ENSURE(ConvertArrowType(AS_TYPE(TStructType, Type_)->GetMemberType(i), types->at(i)), "Can't convert type to arrow"); } - return static_cast(*state.AsBoxed()).FetchValues(ctx, output); + auto source = std::make_shared(std::move(settings), Inflight_, Type_, types, ctx.HolderFactory, JobStats_); + source->SetSelfAndRun(source); + return ctx.HolderFactory.Create(source, Width_, types); } void RegisterDependencies() const final {} - - const ui32 Width; - NCommon::TCodecContext CodecCtx; - TMkqlIOSpecs Specs; - - TString ClusterName; - TString Token; - NYT::TNode SamplingSpec; - TVector> Tables; - size_t Inflight; - size_t Timeout; - TType* Type; +private: + const ui32 Width_; + NCommon::TCodecContext CodecCtx_; + TMkqlIOSpecs Specs_; + + TString ClusterName_; + TString Token_; + NYT::TNode SamplingSpec_; + TVector> Tables_; + size_t Inflight_; + size_t Timeout_; + TType* Type_; + NKikimr::NMiniKQL::IStatsRegistry* JobStats_; }; IComputationNode* CreateDqYtReadBlockWrapper(const TComputationNodeFactoryContext& ctx, const TString& clusterName, diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp index 560168730804..b801aa8ef5d2 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/dq_yt_rpc_helpers.cpp @@ -87,7 +87,8 @@ std::unique_ptr CreateInputStreams(bool isArrow, const TString& request->set_arrow_fallback_rowset_format(NYT::NApi::NRpcProxy::NProto::ERowsetFormat::RF_FORMAT); } - request->set_enable_row_index(true); + // TODO() Enable row indexes + request->set_enable_row_index(!isArrow); request->set_enable_table_index(true); // TODO() Enable range indexes request->set_enable_range_index(!isArrow); diff --git a/ydb/library/yql/providers/yt/comp_nodes/dq/ya.make b/ydb/library/yql/providers/yt/comp_nodes/dq/ya.make index f47be4ddd1bd..955b3691d4a2 100644 --- a/ydb/library/yql/providers/yt/comp_nodes/dq/ya.make +++ b/ydb/library/yql/providers/yt/comp_nodes/dq/ya.make @@ -1,11 +1,12 @@ LIBRARY() PEERDIR( + ydb/library/yql/minikql + ydb/library/yql/minikql/computation/llvm ydb/library/yql/minikql/computation ydb/library/yql/providers/yt/comp_nodes ydb/library/yql/providers/yt/codec ydb/library/yql/providers/common/codec - ydb/core/formats/arrow yt/cpp/mapreduce/interface yt/cpp/mapreduce/common library/cpp/yson/node @@ -27,6 +28,7 @@ IF(LINUX) ) SRCS( + arrow_converter.cpp stream_decoder.cpp dq_yt_rpc_reader.cpp dq_yt_rpc_helpers.cpp diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp index d1cd8aa55460..db36835db5ae 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_dq_integration.cpp @@ -360,20 +360,28 @@ class TYtDqIntegration: public TDqIntegrationBase { return false; } - bool CanBlockRead(const NNodes::TExprBase& node, TExprContext&, TTypeAnnotationContext&) override { + bool CanBlockRead(const NNodes::TExprBase& node, TExprContext& ctx, TTypeAnnotationContext&) override { auto wrap = node.Cast(); auto maybeRead = wrap.Input().Maybe(); if (!maybeRead) { return false; } - if (!State_->Configuration->UseRPCReaderInDQ.Get(maybeRead.Cast().DataSource().Cluster().StringValue()).GetOrElse(DEFAULT_USE_RPC_READER_IN_DQ)) { return false; } const auto structType = GetSeqItemType(maybeRead.Raw()->GetTypeAnn()->Cast()->GetItems().back())->Cast(); - if (!CanBlockReadTypes(structType)) { + TVector subTypeAnn(Reserve(structType->GetItems().size())); + for (const auto& type: structType->GetItems()) { + subTypeAnn.emplace_back(type->GetItemType()); + } + + if (!State_->Types->ArrowResolver) { + return false; + } + + if (State_->Types->ArrowResolver->AreTypesSupported(ctx.GetPosition(node.Pos()), subTypeAnn, ctx) != IArrowResolver::EStatus::OK) { return false; } diff --git a/ydb/library/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp b/ydb/library/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp index e4d97ea43bee..d4cf3f1d5a68 100644 --- a/ydb/library/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp +++ b/ydb/library/yql/providers/yt/provider/yql_yt_mkql_compiler.cpp @@ -369,7 +369,7 @@ TRuntimeNode BuildDqYtInputCall( } tablesNode.Add(refName); // TODO() Enable range indexes - auto skiffNode = SingleTableSpecToInputSkiff(specNode, structColumns, true, !enableBlockReader, false); + auto skiffNode = SingleTableSpecToInputSkiff(specNode, structColumns, !enableBlockReader, !enableBlockReader, false); const auto tmpFolder = GetTablesTmpFolder(*state->Configuration); auto tableName = pathInfo.Table->Name; if (pathInfo.Table->IsAnonymous && !TYtTableInfo::HasSubstAnonymousLabel(pathInfo.Table->FromNode.Cast())) { diff --git a/ydb/library/yql/public/udf/arrow/block_reader.h b/ydb/library/yql/public/udf/arrow/block_reader.h index 6b8200e50523..3985a44ac271 100644 --- a/ydb/library/yql/public/udf/arrow/block_reader.h +++ b/ydb/library/yql/public/udf/arrow/block_reader.h @@ -97,7 +97,7 @@ class TFixedSizeBlockReader final : public IBlockReader { } }; -template +template class TStringBlockReader final : public IBlockReader { public: using TOffset = typename TStringType::offset_type; @@ -364,8 +364,8 @@ struct TReaderTraits { using TTuple = TTupleBlockReader; template using TFixedSize = TFixedSizeBlockReader; - template - using TStrings = TStringBlockReader; + template + using TStrings = TStringBlockReader; using TExtOptional = TExternalOptionalBlockReader; static std::unique_ptr MakePg(const TPgTypeDescription& desc, const IPgBuilder* pgBuilder) { @@ -373,7 +373,7 @@ struct TReaderTraits { if (desc.PassByValue) { return std::make_unique>(); } else { - return std::make_unique>(); + return std::make_unique>(); } } }; @@ -396,12 +396,12 @@ std::unique_ptr MakeFixedSizeBlockReaderImpl(bool isO } } -template +template std::unique_ptr MakeStringBlockReaderImpl(bool isOptional) { if (isOptional) { - return std::make_unique>(); + return std::make_unique>(); } else { - return std::make_unique>(); + return std::make_unique>(); } } @@ -489,12 +489,15 @@ std::unique_ptr MakeBlockReaderImpl(const ITypeInfoHe case NUdf::EDataSlot::Double: return MakeFixedSizeBlockReaderImpl(isOptional); case NUdf::EDataSlot::String: + return MakeStringBlockReaderImpl(isOptional); case NUdf::EDataSlot::Yson: + return MakeStringBlockReaderImpl(isOptional); case NUdf::EDataSlot::JsonDocument: - return MakeStringBlockReaderImpl(isOptional); + return MakeStringBlockReaderImpl(isOptional); case NUdf::EDataSlot::Utf8: + return MakeStringBlockReaderImpl(isOptional); case NUdf::EDataSlot::Json: - return MakeStringBlockReaderImpl(isOptional); + return MakeStringBlockReaderImpl(isOptional); default: Y_ENSURE(false, "Unsupported data slot"); }